mirror of
https://github.com/Akkudoktor-EOS/EOS.git
synced 2026-02-26 19:06:20 +00:00
Some checks are pending
Bump Version / Bump Version Workflow (push) Waiting to run
docker-build / platform-excludes (push) Waiting to run
docker-build / build (push) Blocked by required conditions
docker-build / merge (push) Blocked by required conditions
pre-commit / pre-commit (push) Waiting to run
Run Pytest on Pull Request / test (push) Waiting to run
* fix: improve error handling for provider updates Distinguishes failures of active providers from inactive ones. Propagates errors only for enabled providers, allowing execution to continue if a non-active provider fails, which avoids unnecessary interruptions and improves robustness. * fix: add provider settings validation for forecast requests Prevents potential runtime errors by checking if provider settings are configured before accessing forecast credentials. Raises a clear error when settings are missing to help with debugging misconfigurations. * refactor(load): move provider settings to top-level fields Transitions load provider settings from a nested "provider_settings" object with provider-specific keys to dedicated top-level fields.\n\nRemoves the legacy "provider_settings" mapping and updates migration logic to ensure backward compatibility with existing configurations. * docs: update version numbers and documantation --------- Co-authored-by: Normann <github@koldrack.com>
2261 lines
89 KiB
Python
2261 lines
89 KiB
Python
"""Abstract and base classes for generic data.
|
|
|
|
This module provides classes for managing and processing generic data in a flexible, configurable manner.
|
|
It includes classes to handle configurations, record structures, sequences, and containers for generic data,
|
|
enabling efficient storage, retrieval, and manipulation of data records.
|
|
|
|
This module is designed for use in predictive modeling workflows, facilitating the organization, serialization,
|
|
and manipulation of configuration and generic data in a clear, scalable, and structured manner.
|
|
"""
|
|
|
|
import difflib
|
|
import json
|
|
from abc import abstractmethod
|
|
from collections.abc import KeysView, MutableMapping
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
from typing import (
|
|
Any,
|
|
Dict,
|
|
Iterator,
|
|
Literal,
|
|
Optional,
|
|
Tuple,
|
|
Type,
|
|
Union,
|
|
get_args,
|
|
overload,
|
|
)
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from loguru import logger
|
|
from numpydantic import NDArray, Shape
|
|
from pydantic import (
|
|
AwareDatetime,
|
|
ConfigDict,
|
|
Field,
|
|
ValidationError,
|
|
computed_field,
|
|
field_validator,
|
|
model_validator,
|
|
)
|
|
|
|
from akkudoktoreos.core.coreabc import (
|
|
ConfigMixin,
|
|
SingletonMixin,
|
|
StartMixin,
|
|
)
|
|
from akkudoktoreos.core.databaseabc import (
|
|
UNBOUND_WINDOW,
|
|
DatabaseRecordProtocolMixin,
|
|
DatabaseTimestamp,
|
|
DatabaseTimeWindowType,
|
|
)
|
|
from akkudoktoreos.core.pydantic import (
|
|
PydanticBaseModel,
|
|
PydanticDateTimeData,
|
|
PydanticDateTimeDataFrame,
|
|
)
|
|
from akkudoktoreos.utils.datetimeutil import (
|
|
DateTime,
|
|
Duration,
|
|
compare_datetimes,
|
|
to_datetime,
|
|
to_duration,
|
|
)
|
|
|
|
|
|
class DataABC(ConfigMixin, StartMixin, PydanticBaseModel):
|
|
"""Base class for handling generic data.
|
|
|
|
Enables access to EOS configuration data (attribute `config`).
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
# ==================== DataRecord ====================
|
|
|
|
|
|
class DataRecord(DataABC, MutableMapping):
|
|
"""Base class for data records, enabling dynamic access to fields defined in derived classes.
|
|
|
|
Fields can be accessed and mutated both using dictionary-style access (`record['field_name']`)
|
|
and attribute-style access (`record.field_name`).
|
|
|
|
The data record also provides configured field like data. Configuration has to be done by the
|
|
derived class. Configuration is a list of key strings, which is usually taken from the EOS
|
|
configuration. The internal field for these data `configured_data` is mostly hidden from
|
|
dictionary-style and attribute-style access.
|
|
|
|
Attributes:
|
|
date_time (DateTime): Aware datetime indicating when the data record applies. Defaults
|
|
to now.
|
|
|
|
Configurations:
|
|
- Allows mutation after creation.
|
|
- Supports non-standard data types like `datetime`.
|
|
"""
|
|
|
|
date_time: Optional[DateTime] = Field(
|
|
default=None, json_schema_extra={"description": "DateTime"}
|
|
)
|
|
|
|
configured_data: dict[str, Any] = Field(
|
|
default_factory=dict,
|
|
json_schema_extra={
|
|
"description": "Configured field like data",
|
|
"examples": [{"load0_mr": 40421}],
|
|
},
|
|
)
|
|
|
|
# Pydantic v2 model configuration
|
|
model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True)
|
|
|
|
@model_validator(mode="before")
|
|
@classmethod
|
|
def init_configured_field_like_data(cls, data: Any) -> Any:
|
|
"""Extracts configured data keys from the input and assigns them to `configured_data`.
|
|
|
|
This validator is called before the model is initialized. It filters out any keys from the input
|
|
dictionary that are listed in the configured data keys, and moves them into
|
|
the `configured_data` field of the model. This enables flexible, key-driven population of
|
|
dynamic data while keeping the model schema clean.
|
|
|
|
Args:
|
|
data (Any): The raw input data used to initialize the model.
|
|
|
|
Returns:
|
|
Any: The modified input data dictionary, with configured keys moved to `configured_data`.
|
|
"""
|
|
if not isinstance(data, dict):
|
|
return data
|
|
|
|
configured_keys: Union[list[str], set] = cls.configured_data_keys() or set()
|
|
extracted = {k: data.pop(k) for k in list(data.keys()) if k in configured_keys}
|
|
|
|
if extracted:
|
|
data.setdefault("configured_data", {}).update(extracted)
|
|
|
|
return data
|
|
|
|
@field_validator("date_time", mode="before")
|
|
@classmethod
|
|
def transform_to_datetime(cls, value: Any) -> Optional[DateTime]:
|
|
"""Converts various datetime formats into DateTime."""
|
|
if value is None:
|
|
# Allow to set to default.
|
|
return None
|
|
return to_datetime(value)
|
|
|
|
@classmethod
|
|
def configured_data_keys(cls) -> Optional[list[str]]:
|
|
"""Return the keys for the configured field like data.
|
|
|
|
Can be overwritten by derived classes to define specific field like data. Usually provided
|
|
by configuration data.
|
|
"""
|
|
return None
|
|
|
|
@classmethod
|
|
def record_keys(cls) -> list[str]:
|
|
"""Returns the keys of all fields in the data record."""
|
|
key_list = []
|
|
key_list.extend(list(cls.model_fields.keys()))
|
|
key_list.extend(list(cls.__pydantic_decorators__.computed_fields.keys()))
|
|
# Add also keys that may be added by configuration
|
|
key_list.remove("configured_data")
|
|
configured_keys = cls.configured_data_keys()
|
|
if configured_keys is not None:
|
|
key_list.extend(configured_keys)
|
|
return key_list
|
|
|
|
@classmethod
|
|
def record_keys_writable(cls) -> list[str]:
|
|
"""Returns the keys of all fields in the data record that are writable."""
|
|
keys_writable = []
|
|
keys_writable.extend(list(cls.model_fields.keys()))
|
|
# Add also keys that may be added by configuration
|
|
keys_writable.remove("configured_data")
|
|
configured_keys = cls.configured_data_keys()
|
|
if configured_keys is not None:
|
|
keys_writable.extend(configured_keys)
|
|
return keys_writable
|
|
|
|
def _validate_key_writable(self, key: str) -> None:
|
|
"""Verify that a specified key exists and is writable in the current record keys.
|
|
|
|
Args:
|
|
key (str): The key to check for in the records.
|
|
|
|
Raises:
|
|
KeyError: If the specified key is not in the expected list of keys for the records.
|
|
"""
|
|
if key not in self.record_keys_writable():
|
|
raise KeyError(
|
|
f"Key '{key}' is not in writable record keys: {self.record_keys_writable()}"
|
|
)
|
|
|
|
def __dir__(self) -> list[str]:
|
|
"""Extend the default `dir()` output to include configured field like data keys.
|
|
|
|
This enables editor auto-completion and interactive introspection, while hiding the internal
|
|
`configured_data` dictionary.
|
|
|
|
This ensures the configured field like data values appear like native fields,
|
|
in line with the base model's attribute behavior.
|
|
"""
|
|
base = super().__dir__()
|
|
keys = set(base)
|
|
# Expose configured data keys as attributes
|
|
configured_keys = self.configured_data_keys()
|
|
if configured_keys is not None:
|
|
keys.update(configured_keys)
|
|
# Explicitly hide the 'configured_data' internal dict
|
|
keys.discard("configured_data")
|
|
return sorted(keys)
|
|
|
|
def __eq__(self, other: Any) -> bool:
|
|
"""Ensure equality comparison includes the contents of the `configured_data` dict.
|
|
|
|
Contents of the `configured_data` dict are in addition to the base model fields.
|
|
"""
|
|
if not isinstance(other, self.__class__):
|
|
return NotImplemented
|
|
# Compare all fields except `configured_data`
|
|
if self.model_dump(exclude={"configured_data"}) != other.model_dump(
|
|
exclude={"configured_data"}
|
|
):
|
|
return False
|
|
# Compare `configured_data` explicitly
|
|
return self.configured_data == other.configured_data
|
|
|
|
def __getitem__(self, key: str) -> Any:
|
|
"""Retrieve the value of a field by key name.
|
|
|
|
Args:
|
|
key (str): The name of the field to retrieve.
|
|
|
|
Returns:
|
|
Any: The value of the requested field.
|
|
|
|
Raises:
|
|
KeyError: If the specified key does not exist.
|
|
"""
|
|
try:
|
|
# Let getattr do the work
|
|
return self.__getattr__(key)
|
|
except:
|
|
raise KeyError(f"'{key}' not found in the record fields.")
|
|
|
|
def __setitem__(self, key: str, value: Any) -> None:
|
|
"""Set the value of a field by key name.
|
|
|
|
Args:
|
|
key (str): The name of the field to set.
|
|
value (Any): The value to assign to the field.
|
|
|
|
Raises:
|
|
KeyError: If the specified key does not exist in the fields.
|
|
"""
|
|
try:
|
|
# Let setattr do the work
|
|
self.__setattr__(key, value)
|
|
except:
|
|
raise KeyError(f"'{key}' is not a recognized field.")
|
|
|
|
def __delitem__(self, key: str) -> None:
|
|
"""Delete the value of a field by key name by setting it to None.
|
|
|
|
Args:
|
|
key (str): The name of the field to delete.
|
|
|
|
Raises:
|
|
KeyError: If the specified key does not exist in the fields.
|
|
"""
|
|
try:
|
|
self.__delattr__(key)
|
|
except:
|
|
raise KeyError(f"'{key}' is not a recognized field.")
|
|
|
|
def __iter__(self) -> Iterator[str]:
|
|
"""Iterate over the field names in the data record.
|
|
|
|
Returns:
|
|
Iterator[str]: An iterator over field names.
|
|
"""
|
|
return iter(self.record_keys_writable())
|
|
|
|
def __len__(self) -> int:
|
|
"""Return the number of fields in the data record.
|
|
|
|
Returns:
|
|
int: The number of defined fields.
|
|
"""
|
|
return len(self.record_keys_writable())
|
|
|
|
def __repr__(self) -> str:
|
|
"""Provide a string representation of the data record.
|
|
|
|
Returns:
|
|
str: A string representation showing field names and their values.
|
|
"""
|
|
field_values = {field: getattr(self, field) for field in self.__class__.model_fields}
|
|
return f"{self.__class__.__name__}({field_values})"
|
|
|
|
def __getattr__(self, key: str) -> Any:
|
|
"""Dynamic attribute access for fields.
|
|
|
|
Args:
|
|
key (str): The name of the field to access.
|
|
|
|
Returns:
|
|
Any: The value of the requested field.
|
|
|
|
Raises:
|
|
AttributeError: If the field does not exist.
|
|
"""
|
|
if key in self.__class__.model_fields:
|
|
return getattr(self, key)
|
|
if key in self.configured_data.keys():
|
|
return self.configured_data[key]
|
|
configured_keys = self.configured_data_keys()
|
|
if configured_keys is not None and key in configured_keys:
|
|
return None
|
|
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{key}'")
|
|
|
|
def __setattr__(self, key: str, value: Any) -> None:
|
|
"""Set attribute values directly if they are recognized fields.
|
|
|
|
Args:
|
|
key (str): The name of the attribute/field to set.
|
|
value (Any): The value to assign to the attribute/field.
|
|
|
|
Raises:
|
|
AttributeError: If the attribute/field does not exist.
|
|
"""
|
|
if key in self.__class__.model_fields:
|
|
super().__setattr__(key, value)
|
|
return
|
|
configured_keys = self.configured_data_keys()
|
|
if configured_keys is not None and key in configured_keys:
|
|
self.configured_data[key] = value
|
|
return
|
|
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{key}'")
|
|
|
|
def __delattr__(self, key: str) -> None:
|
|
"""Delete an attribute by setting it to None if it exists as a field.
|
|
|
|
Args:
|
|
key (str): The name of the attribute/field to delete.
|
|
|
|
Raises:
|
|
AttributeError: If the attribute/field does not exist.
|
|
"""
|
|
if key in self.__class__.model_fields:
|
|
data: Optional[dict]
|
|
if key == "configured_data":
|
|
data = dict()
|
|
else:
|
|
data = None
|
|
setattr(self, key, data)
|
|
return
|
|
if key in self.configured_data:
|
|
del self.configured_data[key]
|
|
return
|
|
configured_keys = self.configured_data_keys()
|
|
if configured_keys is not None and key in configured_keys:
|
|
return
|
|
super().__delattr__(key)
|
|
|
|
@classmethod
|
|
def key_from_description(cls, description: str, threshold: float = 0.8) -> Optional[str]:
|
|
"""Returns the attribute key that best matches the provided description.
|
|
|
|
Fuzzy matching is used.
|
|
|
|
Args:
|
|
description (str): The description text to search for.
|
|
threshold (float): The minimum ratio for a match (0-1). Default is 0.8.
|
|
|
|
Returns:
|
|
Optional[str]: The attribute key if a match is found above the threshold, else None.
|
|
"""
|
|
if description is None:
|
|
return None
|
|
|
|
# Get all descriptions from the fields
|
|
descriptions: dict[str, str] = {}
|
|
for field_name in cls.model_fields.keys():
|
|
desc = cls.field_description(field_name)
|
|
if desc:
|
|
descriptions[field_name] = desc
|
|
|
|
# Use difflib to get close matches
|
|
matches = difflib.get_close_matches(
|
|
description, descriptions.values(), n=1, cutoff=threshold
|
|
)
|
|
|
|
# Check if there is a match
|
|
if matches:
|
|
best_match = matches[0]
|
|
# Return the key that corresponds to the best match
|
|
for key, desc in descriptions.items():
|
|
if desc == best_match:
|
|
return key
|
|
return None
|
|
|
|
@classmethod
|
|
def keys_from_descriptions(
|
|
cls, descriptions: list[str], threshold: float = 0.8
|
|
) -> list[Optional[str]]:
|
|
"""Returns a list of attribute keys that best matches the provided list of descriptions.
|
|
|
|
Fuzzy matching is used.
|
|
|
|
Args:
|
|
descriptions (list[str]): A list of description texts to search for.
|
|
threshold (float): The minimum ratio for a match (0-1). Default is 0.8.
|
|
|
|
Returns:
|
|
list[Optional[str]]: A list of attribute keys matching the descriptions, with None for unmatched descriptions.
|
|
"""
|
|
keys = []
|
|
for description in descriptions:
|
|
key = cls.key_from_description(description, threshold)
|
|
keys.append(key)
|
|
return keys
|
|
|
|
|
|
# ==================== DataSequence ====================
|
|
|
|
|
|
class DataSequence(DataABC, DatabaseRecordProtocolMixin[DataRecord]):
|
|
"""A managed sequence of DataRecord instances with ltime series behavior.
|
|
|
|
The DataSequence class provides an ordered, mutable collection of DataRecord
|
|
instances.
|
|
|
|
It also supports advanced data operations such as
|
|
|
|
- JSON serialization,
|
|
- conversion to Pandas Series,
|
|
- sorting by timestamp,
|
|
- and data storage in a database.
|
|
|
|
Attributes:
|
|
records (list[DataRecord]): A list of DataRecord instances representing
|
|
individual generic data points.
|
|
record_keys (Optional[list[str]]): A list of field names (keys) expected in each
|
|
DataRecord.
|
|
|
|
Invariant:
|
|
``self.records`` is always kept sorted in ascending ``date_time`` order
|
|
whenever it contains any records.
|
|
|
|
Note:
|
|
Derived classes have to provide their own records field with correct record type set.
|
|
|
|
Usage:
|
|
.. code-block:: python
|
|
|
|
# Example of creating, adding, and using DataSequence
|
|
class DerivedSequence(DataSquence):
|
|
records: list[DerivedDataRecord] = Field(default_factory=list, json_schema_extra={ "description": "List of data records" })
|
|
|
|
seq = DerivedSequence()
|
|
seq.insert(DerivedDataRecord(date_time=datetime.now(), temperature=72))
|
|
seq.insert(DerivedDataRecord(date_time=datetime.now(), temperature=75))
|
|
|
|
# Convert to JSON and back
|
|
json_data = seq.to_json()
|
|
new_seq = DerivedSequence.from_json(json_data)
|
|
|
|
# Convert to Pandas Series
|
|
series = seq.key_to_series('temperature')
|
|
|
|
"""
|
|
|
|
# To be overloaded by derived classes.
|
|
records: list[DataRecord] = Field(
|
|
default_factory=list, json_schema_extra={"description": "List of data records"}
|
|
)
|
|
|
|
# Sequence helpers
|
|
|
|
def _validate_key(self, key: str) -> None:
|
|
"""Verify that a specified key exists in the current record keys.
|
|
|
|
Args:
|
|
key (str): The key to check for in the records.
|
|
|
|
Raises:
|
|
KeyError: If the specified key is not in the expected list of keys for the records.
|
|
"""
|
|
if key not in self.record_keys:
|
|
raise KeyError(f"Key '{key}' is not in record keys: {self.record_keys}")
|
|
|
|
def _validate_key_writable(self, key: str) -> None:
|
|
"""Verify that a specified key exists and is writable in the current record keys.
|
|
|
|
Args:
|
|
key (str): The key to check for in the records.
|
|
|
|
Raises:
|
|
KeyError: If the specified key is not in the expected list of keys for the records.
|
|
"""
|
|
if key not in self.record_keys_writable:
|
|
raise KeyError(
|
|
f"Key '{key}' is not in writable record keys: {self.record_keys_writable}"
|
|
)
|
|
|
|
def _validate_record(self, value: DataRecord) -> None:
|
|
"""Check if the provided value is a valid DataRecord with compatible keys.
|
|
|
|
Args:
|
|
value (DataRecord): The record to validate.
|
|
|
|
Raises:
|
|
ValueError: If the value is not an instance of DataRecord or has an invalid date_time type.
|
|
KeyError: If the value has different keys from those expected in the sequence.
|
|
"""
|
|
# Assure value is of correct type
|
|
if value.__class__.__name__ != self.record_class().__name__:
|
|
raise ValueError(f"Value must be an instance of `{self.record_class().__name__}`.")
|
|
|
|
# Assure datetime value can be converted to datetime object
|
|
value.date_time = to_datetime(value.date_time)
|
|
|
|
# Sequence state
|
|
|
|
# Derived fields (computed)
|
|
@computed_field # type: ignore[prop-decorator]
|
|
@property
|
|
def min_datetime(self) -> Optional[DateTime]:
|
|
"""Minimum (earliest) datetime in the time series sequence of data records.
|
|
|
|
This property computes the earliest datetime from the sequence of data records.
|
|
If no records are present, it returns `None`.
|
|
|
|
Returns:
|
|
Optional[DateTime]: The earliest datetime in the sequence, or `None` if no
|
|
data records exist.
|
|
"""
|
|
min_timestamp, _ = self.db_timestamp_range()
|
|
if min_timestamp is None:
|
|
return None
|
|
# Timestamps are in UTC - convert to timezone
|
|
utc_datetime = DatabaseTimestamp.to_datetime(min_timestamp)
|
|
return utc_datetime.in_timezone(self.config.general.timezone)
|
|
|
|
@computed_field # type: ignore[prop-decorator]
|
|
@property
|
|
def max_datetime(self) -> Optional[DateTime]:
|
|
"""Maximum (latest) datetime in the time series sequence of data records.
|
|
|
|
This property computes the latest datetime from the sequence of data records.
|
|
If no records are present, it returns `None`.
|
|
|
|
Returns:
|
|
Optional[DateTime]: The latest datetime in the sequence, or `None` if no
|
|
data records exist.
|
|
"""
|
|
_, max_timestamp = self.db_timestamp_range()
|
|
if max_timestamp is None:
|
|
return None
|
|
# Timestamps are in UTC - convert to timezone
|
|
utc_datetime = DatabaseTimestamp.to_datetime(max_timestamp)
|
|
return utc_datetime.in_timezone(self.config.general.timezone)
|
|
|
|
@computed_field # type: ignore[prop-decorator]
|
|
@property
|
|
def record_keys(self) -> list[str]:
|
|
"""Returns the keys of all fields in the data records."""
|
|
return self.record_class().record_keys()
|
|
|
|
@computed_field # type: ignore[prop-decorator]
|
|
@property
|
|
def record_keys_writable(self) -> list[str]:
|
|
"""Get the keys of all writable fields in the data records.
|
|
|
|
This property retrieves the keys of all fields in the data records that
|
|
can be written to. It uses the `record_class` to determine the model's
|
|
field structure.
|
|
|
|
Returns:
|
|
list[str]: A list of field keys that are writable in the data records.
|
|
"""
|
|
return self.record_class().record_keys_writable()
|
|
|
|
@classmethod
|
|
def record_class(cls) -> Type:
|
|
"""Get the class of the data record handled by this data sequence.
|
|
|
|
This method determines the class of the data record type associated with
|
|
the `records` field of the model. The field is expected to be a list, and
|
|
the element type of the list should be a subclass of `DataRecord`.
|
|
|
|
Raises:
|
|
ValueError: If the record type is not a subclass of `DataRecord`.
|
|
|
|
Returns:
|
|
Type: The class of the data record handled by the data sequence.
|
|
"""
|
|
# Access the model field metadata
|
|
field_info = cls.model_fields["records"]
|
|
# Get the list element type from the 'type_' attribute
|
|
list_element_type = get_args(field_info.annotation)[0]
|
|
if not isinstance(list_element_type(), DataRecord):
|
|
raise ValueError(
|
|
f"Data record must be an instance of DataRecord: '{list_element_type}'."
|
|
)
|
|
return list_element_type
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> "DataSequence":
|
|
"""Reconstruct a sequence from its serialized dictionary form.
|
|
|
|
Fully subclass-safe and invariant-safe.
|
|
"""
|
|
if not isinstance(data, dict):
|
|
raise TypeError("from_dict() expects a dictionary")
|
|
|
|
records_data = data.get("records", [])
|
|
if not isinstance(records_data, list):
|
|
raise ValueError("'records' must be a list")
|
|
|
|
# Create empty instance of *actual class*
|
|
sequence = cls()
|
|
|
|
# Rebuild records using the sequence's record model
|
|
record_model = sequence.record_class()
|
|
|
|
for record_dict in records_data:
|
|
if not isinstance(record_dict, dict):
|
|
raise ValueError("Each record must be a dictionary")
|
|
|
|
record = record_model(**record_dict)
|
|
|
|
# Important: use insert_by_datetime to rebuild invariants
|
|
sequence.insert_by_datetime(record)
|
|
|
|
return sequence
|
|
|
|
def __len__(self) -> int:
|
|
"""Get total number of DataRecords in sequence (DB + memory-only)."""
|
|
return self.db_count_records()
|
|
|
|
def __repr__(self) -> str:
|
|
"""Provide a string representation of the DataSequence.
|
|
|
|
Returns:
|
|
str: A string representation of the DataSequence.
|
|
"""
|
|
return f"{self.__class__.__name__}([{', '.join(repr(record) for record in self.records)}])"
|
|
|
|
# Sequence methods
|
|
|
|
def __iter__(self) -> Iterator[DataRecord]:
|
|
"""Create an iterator for accessing DataRecords sequentially.
|
|
|
|
Returns:
|
|
Iterator[DataRecord]: An iterator for the records.
|
|
"""
|
|
return iter(self.records)
|
|
|
|
def get_by_datetime(
|
|
self, target_datetime: DateTime, *, time_window: Optional[Duration] = None
|
|
) -> Optional[DataRecord]:
|
|
"""Get the record at the specified datetime, with an optional fallback search window.
|
|
|
|
Args:
|
|
target_datetime: The datetime to search for.
|
|
time_window: Optional total width of the symmetric search window centered on
|
|
``target_datetime``. If provided and no exact match exists, the nearest
|
|
record within this window is returned.
|
|
|
|
Returns:
|
|
The matching DataRecord, the nearest DataRecord within the specified time window
|
|
if no exact match exists, or ``None`` if no suitable record is found.
|
|
"""
|
|
# Ensure datetime objects are normalized
|
|
db_target = DatabaseTimestamp.from_datetime(target_datetime)
|
|
|
|
return self.db_get_record(db_target, time_window=time_window)
|
|
|
|
def get_nearest_by_datetime(
|
|
self, target_datetime: DateTime, time_window: Optional[Duration] = None
|
|
) -> Optional[DataRecord]:
|
|
"""Get the record nearest to the specified datetime within an optional time window.
|
|
|
|
Args:
|
|
target_datetime: The datetime to search near.
|
|
time_window: Total width of the symmetric search window centered on
|
|
``target_datetime``. If ``None``, searches all records.
|
|
|
|
Returns:
|
|
The nearest DataRecord within the specified time window, or ``None`` if no records
|
|
exist or no records fall within the window.
|
|
|
|
Raises:
|
|
ValueError: If ``time_window`` is negative.
|
|
"""
|
|
# Ensure datetime objects are normalized
|
|
db_target = DatabaseTimestamp.from_datetime(target_datetime)
|
|
|
|
if time_window is None:
|
|
twin: DatabaseTimeWindowType = UNBOUND_WINDOW
|
|
else:
|
|
twin = time_window
|
|
return self.db_get_record(db_target, time_window=twin)
|
|
|
|
def insert_by_datetime(self, record: DataRecord) -> None:
|
|
"""Insert or merge a DataRecord into the sequence based on its date.
|
|
|
|
If a record with the same date exists, merges new data fields with the existing record.
|
|
Otherwise, appends the record and maintains chronological order.
|
|
|
|
Args:
|
|
record (DataRecord): The record to add or merge.
|
|
|
|
Note:
|
|
record.date_time shall be a DateTime or None
|
|
"""
|
|
self._validate_record(record)
|
|
|
|
# Ensure datetime objects are normalized
|
|
record_date_time_timestamp = DatabaseTimestamp.from_datetime(record.date_time)
|
|
|
|
avail_record = self.db_get_record(record_date_time_timestamp)
|
|
if avail_record:
|
|
# Merge values, only updating fields where data record has a non-None value
|
|
for field, val in record.model_dump(exclude_unset=True).items():
|
|
if field in record.record_keys_writable():
|
|
setattr(avail_record, field, val)
|
|
self.db_mark_dirty_record(record)
|
|
else:
|
|
self.db_insert_record(record)
|
|
|
|
@overload
|
|
def update_value(self, date: DateTime, key: str, value: Any) -> None: ...
|
|
|
|
@overload
|
|
def update_value(self, date: DateTime, values: Dict[str, Any]) -> None: ...
|
|
|
|
def update_value(self, date: DateTime, *args: Any, **kwargs: Any) -> None:
|
|
"""Updates specific values in the data record for a given date.
|
|
|
|
If a record for the date exists, updates the specified attributes with the new values.
|
|
Otherwise, appends a new record with the given values and maintains chronological order.
|
|
|
|
Args:
|
|
date (datetime): The date for which the values are to be added or updated.
|
|
key (str), value (Any): Single key-value pair to update
|
|
OR
|
|
values (Dict[str, Any]): Dictionary of key-value pairs to update
|
|
OR
|
|
**kwargs: Key-value pairs as keyword arguments
|
|
|
|
Examples:
|
|
.. code-block:: python
|
|
|
|
update_value(date, 'temperature', 25.5)
|
|
update_value(date, {'temperature': 25.5, 'humidity': 80})
|
|
update_value(date, temperature=25.5, humidity=80)
|
|
|
|
"""
|
|
# Process input arguments into a dictionary
|
|
values: Dict[str, Any] = {}
|
|
if len(args) == 2: # Single key-value pair
|
|
values[args[0]] = args[1]
|
|
elif len(args) == 1 and isinstance(args[0], dict): # Dictionary input
|
|
values.update(args[0])
|
|
elif len(args) > 0: # Invalid number of arguments
|
|
raise ValueError("Expected either 2 arguments (key, value) or 1 dictionary argument")
|
|
values.update(kwargs) # Add any keyword arguments
|
|
|
|
# Validate all keys are writable
|
|
for key in values:
|
|
self._validate_key_writable(key)
|
|
|
|
# Ensure datetime objects are normalized
|
|
db_target = DatabaseTimestamp.from_datetime(date)
|
|
|
|
# Check if a record with the given date already exists
|
|
record = self.db_get_record(db_target)
|
|
if record is None:
|
|
# Create a new record and append to the list
|
|
new_record = self.record_class()(date_time=date, **values)
|
|
self.db_insert_record(new_record)
|
|
else:
|
|
# Update the DataRecord with all new values
|
|
for key, value in values.items():
|
|
setattr(record, key, value)
|
|
self.db_mark_dirty_record(record)
|
|
|
|
def key_to_dict(
|
|
self,
|
|
key: str,
|
|
start_datetime: Optional[DateTime] = None,
|
|
end_datetime: Optional[DateTime] = None,
|
|
dropna: Optional[bool] = None,
|
|
) -> Dict[DateTime, Any]:
|
|
"""Extract a dictionary indexed by the date_time field of the DataRecords.
|
|
|
|
The dictionary will contain values extracted from the specified key attribute of each DataRecord,
|
|
using the date_time field as the key.
|
|
|
|
Args:
|
|
key (str): The field name in the DataRecord from which to extract values.
|
|
start_datetime (datetime, optional): The start date to filter records (inclusive).
|
|
end_datetime (datetime, optional): The end date to filter records (exclusive).
|
|
dropna: (bool, optional): Whether to drop NAN/ None values before processing. Defaults to True.
|
|
|
|
Returns:
|
|
Dict[datetime, Any]: A dictionary with the date_time of each record as the key
|
|
and the values extracted from the specified key.
|
|
|
|
Raises:
|
|
KeyError: If the specified key is not found in any of the DataRecords.
|
|
"""
|
|
self._validate_key(key)
|
|
|
|
# Ensure datetime objects are normalized
|
|
start_timestamp = (
|
|
DatabaseTimestamp.from_datetime(start_datetime) if start_datetime else None
|
|
)
|
|
end_timestamp = DatabaseTimestamp.from_datetime(end_datetime) if end_datetime else None
|
|
|
|
# Create a dictionary to hold date_time and corresponding values
|
|
if dropna is None:
|
|
dropna = True
|
|
filtered_data = {}
|
|
for record in self.db_iterate_records(start_timestamp, end_timestamp):
|
|
if (
|
|
record.date_time is None
|
|
or (dropna and getattr(record, key, None) is None)
|
|
or (dropna and getattr(record, key, None) == float("nan"))
|
|
):
|
|
continue
|
|
record_date_time_timestamp = DatabaseTimestamp.from_datetime(record.date_time)
|
|
if (start_timestamp is None or record_date_time_timestamp >= start_timestamp) and (
|
|
end_timestamp is None or record_date_time_timestamp < end_timestamp
|
|
):
|
|
filtered_data[to_datetime(record.date_time, as_string=True)] = getattr(
|
|
record, key, None
|
|
)
|
|
|
|
return filtered_data
|
|
|
|
def key_to_value(
|
|
self, key: str, target_datetime: DateTime, time_window: Optional[Duration] = None
|
|
) -> Optional[float]:
|
|
"""Returns the value corresponding to the specified key that is nearest to the given datetime.
|
|
|
|
Args:
|
|
key (str): The key of the attribute in DataRecord to extract.
|
|
target_datetime (datetime): The datetime to search for.
|
|
time_window: Optional total width of the symmetric search window centered on
|
|
``target_datetime``. If provided and no exact match exists, the nearest
|
|
record within this window is returned.
|
|
|
|
Returns:
|
|
Optional[float]: The value nearest to the given datetime, or None if no valid records are found.
|
|
|
|
Raises:
|
|
KeyError: If the specified key is not found in any of the DataRecords.
|
|
"""
|
|
self._validate_key(key)
|
|
|
|
# Ensure datetime objects are normalized
|
|
db_target = DatabaseTimestamp.from_datetime(to_datetime(target_datetime))
|
|
|
|
record = self.db_get_record(db_target, time_window=time_window)
|
|
|
|
return getattr(record, key, None)
|
|
|
|
def key_to_lists(
|
|
self,
|
|
key: str,
|
|
start_datetime: Optional[DateTime] = None,
|
|
end_datetime: Optional[DateTime] = None,
|
|
dropna: Optional[bool] = None,
|
|
) -> Tuple[list[DateTime], list[Optional[float]]]:
|
|
"""Extracts two lists from data records within an optional date range.
|
|
|
|
The lists are:
|
|
Dates: List of datetime elements.
|
|
Values: List of values corresponding to the specified key in the data records.
|
|
|
|
Args:
|
|
key (str): The key of the attribute in DataRecord to extract.
|
|
start_datetime (datetime, optional): The start date for filtering the records (inclusive).
|
|
end_datetime (datetime, optional): The end date for filtering the records (exclusive).
|
|
dropna: (bool, optional): Whether to drop NAN/ None values before processing. Defaults to True.
|
|
|
|
Returns:
|
|
tuple: A tuple containing a list of datetime values and a list of extracted values.
|
|
|
|
Raises:
|
|
KeyError: If the specified key is not found in any of the DataRecords.
|
|
"""
|
|
self._validate_key(key)
|
|
|
|
# Ensure datetime objects are normalized
|
|
start_timestamp = (
|
|
DatabaseTimestamp.from_datetime(start_datetime) if start_datetime else None
|
|
)
|
|
end_timestamp = DatabaseTimestamp.from_datetime(end_datetime) if end_datetime else None
|
|
|
|
# Create two lists to hold date_time and corresponding values
|
|
if dropna is None:
|
|
dropna = True
|
|
filtered_records = []
|
|
for record in self.db_iterate_records(start_timestamp, end_timestamp):
|
|
if (
|
|
record.date_time is None
|
|
or (dropna and getattr(record, key, None) is None)
|
|
or (dropna and getattr(record, key, None) == float("nan"))
|
|
):
|
|
continue
|
|
record_date_time_timestamp = DatabaseTimestamp.from_datetime(record.date_time)
|
|
if (start_timestamp is None or record_date_time_timestamp >= start_timestamp) and (
|
|
end_timestamp is None or record_date_time_timestamp < end_timestamp
|
|
):
|
|
filtered_records.append(record)
|
|
dates = [record.date_time for record in filtered_records]
|
|
values = [getattr(record, key, None) for record in filtered_records]
|
|
|
|
return dates, values
|
|
|
|
def key_from_lists(self, key: str, dates: list[DateTime], values: list[float]) -> None:
|
|
"""Update the DataSequence from lists of datetime and value elements.
|
|
|
|
The dates list should represent the date_time of each DataRecord, and the values list
|
|
should represent the corresponding data values for the specified key.
|
|
|
|
The list must be ordered starting with the oldest date.
|
|
|
|
Args:
|
|
key (str): The field name in the DataRecord that corresponds to the values in the Series.
|
|
dates: List of datetime elements.
|
|
values: List of values corresponding to the specified key in the data records.
|
|
"""
|
|
self._validate_key_writable(key)
|
|
|
|
for i, date_time in enumerate(dates):
|
|
# Ensure datetime objects are normalized
|
|
db_target = DatabaseTimestamp.from_datetime(date_time)
|
|
# Check if there's an existing record for this date_time
|
|
avail_record = self.db_get_record(db_target)
|
|
if avail_record is None:
|
|
# Create a new DataRecord if none exists
|
|
new_record = self.record_class()(date_time=date_time, **{key: values[i]})
|
|
self.db_insert_record(new_record)
|
|
else:
|
|
# Update existing record's specified key
|
|
setattr(avail_record, key, values[i])
|
|
self.db_mark_dirty_record(avail_record)
|
|
|
|
def key_to_series(
|
|
self,
|
|
key: str,
|
|
start_datetime: Optional[DateTime] = None,
|
|
end_datetime: Optional[DateTime] = None,
|
|
dropna: Optional[bool] = None,
|
|
) -> pd.Series:
|
|
"""Extract a series indexed by the date_time field from data records within an optional date range.
|
|
|
|
Args:
|
|
key (str): The field name in the DataRecord from which to extract values.
|
|
start_datetime (datetime, optional): The start date for filtering the records (inclusive).
|
|
end_datetime (datetime, optional): The end date for filtering the records (exclusive).
|
|
dropna: (bool, optional): Whether to drop NAN/ None values before processing. Defaults to True.
|
|
|
|
Returns:
|
|
pd.Series: A Pandas Series with the index as the date_time of each record
|
|
and the values extracted from the specified key.
|
|
|
|
Raises:
|
|
KeyError: If the specified key is not found in any of the DataRecords.
|
|
"""
|
|
dates, values = self.key_to_lists(
|
|
key=key, start_datetime=start_datetime, end_datetime=end_datetime, dropna=dropna
|
|
)
|
|
series = pd.Series(data=values, index=pd.DatetimeIndex(dates), name=key)
|
|
return series
|
|
|
|
def key_from_series(self, key: str, series: pd.Series) -> None:
|
|
"""Update the DataSequence from a Pandas Series.
|
|
|
|
The series index should represent the date_time of each DataRecord, and the series values
|
|
should represent the corresponding data values for the specified key.
|
|
|
|
Args:
|
|
series (pd.Series): A Pandas Series containing data to update the DataSequence.
|
|
key (str): The field name in the DataRecord that corresponds to the values in the Series.
|
|
"""
|
|
self._validate_key_writable(key)
|
|
|
|
for date_time, value in series.items():
|
|
# Ensure datetime objects are normalized
|
|
db_target = DatabaseTimestamp.from_datetime(to_datetime(date_time))
|
|
# Check if there's an existing record for this date_time
|
|
avail_record = self.db_get_record(db_target)
|
|
if avail_record is None:
|
|
# Create a new DataRecord if none exists
|
|
new_record = self.record_class()(date_time=date_time, **{key: value})
|
|
self.db_insert_record(new_record)
|
|
else:
|
|
# Update existing record's specified key
|
|
setattr(avail_record, key, value)
|
|
self.db_mark_dirty_record(avail_record)
|
|
|
|
def key_to_array(
|
|
self,
|
|
key: str,
|
|
start_datetime: Optional[DateTime] = None,
|
|
end_datetime: Optional[DateTime] = None,
|
|
interval: Optional[Duration] = None,
|
|
fill_method: Optional[str] = None,
|
|
dropna: Optional[bool] = True,
|
|
boundary: Literal["strict", "context"] = "context",
|
|
align_to_interval: bool = False,
|
|
) -> NDArray[Shape["*"], Any]:
|
|
"""Extract an array indexed by fixed time intervals from data records within an optional date range.
|
|
|
|
Args:
|
|
key (str): The field name in the DataRecord from which to extract values.
|
|
start_datetime (datetime, optional): The start date for filtering the records (inclusive).
|
|
end_datetime (datetime, optional): The end date for filtering the records (exclusive).
|
|
interval (duration, optional): The fixed time interval. Defaults to 1 hour.
|
|
fill_method (str): Method to handle missing values during resampling.
|
|
- 'linear': Linearly interpolate missing values (for numeric data only).
|
|
- 'time': Interpolate missing values (for numeric data only).
|
|
- 'ffill': Forward fill missing values.
|
|
- 'bfill': Backward fill missing values.
|
|
- 'none': Defaults to 'linear' for numeric values, otherwise 'ffill'.
|
|
dropna: (bool, optional): Whether to drop NAN/ None values before processing.
|
|
Defaults to True.
|
|
boundary (Literal["strict", "context"]):
|
|
"strict" → only values inside [start, end)
|
|
"context" → include one value before and after for proper resampling
|
|
align_to_interval (bool): When True, snap the resample origin to the nearest
|
|
UTC epoch-aligned boundary of ``interval`` before resampling. This ensures
|
|
that bucket timestamps always fall on wall-clock-round times regardless of
|
|
when ``start_datetime`` falls:
|
|
|
|
- 15-minute interval → buckets on :00, :15, :30, :45
|
|
- 1-hour interval → buckets on the hour
|
|
|
|
When False (default), the origin is ``query_start`` (or ``"start_day"`` when
|
|
no start is given), preserving the existing behaviour where buckets are
|
|
aligned to the query window rather than the clock.
|
|
|
|
Set to True when storing compacted records back to the database so that the
|
|
resulting timestamps are predictable and human-readable. Leave False for
|
|
forecast or reporting queries where alignment to the exact query window is
|
|
more important than clock-round boundaries.
|
|
|
|
Returns:
|
|
np.ndarray: A NumPy Array of the values at the chosen frequency extracted from the
|
|
specified key.
|
|
|
|
Raises:
|
|
KeyError: If the specified key is not found in any of the DataRecords.
|
|
"""
|
|
self._validate_key(key)
|
|
|
|
# Validate fill method
|
|
if fill_method not in ("ffill", "bfill", "linear", "time", "none", None):
|
|
raise ValueError(f"Unsupported fill method: {fill_method}")
|
|
|
|
if boundary not in ("strict", "context"):
|
|
raise ValueError(f"Unsupported boundary mode: {boundary}")
|
|
|
|
# Ensure datetime objects are normalized
|
|
start_datetime = to_datetime(start_datetime, to_maxtime=False) if start_datetime else None
|
|
end_datetime = to_datetime(end_datetime, to_maxtime=False) if end_datetime else None
|
|
|
|
if interval is None:
|
|
interval = to_duration("1 hour")
|
|
resample_freq = "1h"
|
|
else:
|
|
resample_freq = to_duration(interval, as_string="pandas")
|
|
|
|
# Extend window for context resampling
|
|
query_start = start_datetime
|
|
query_end = end_datetime
|
|
|
|
if boundary == "context":
|
|
# include one timestamp before and after for proper resampling
|
|
if query_start is not None:
|
|
# We have a start datetime - look for previous entry
|
|
start_timestamp = DatabaseTimestamp.from_datetime(query_start)
|
|
query_start_timestamp = self.db_previous_timestamp(start_timestamp)
|
|
if query_start_timestamp:
|
|
query_start = DatabaseTimestamp.to_datetime(query_start_timestamp)
|
|
if end_datetime is not None:
|
|
# We have a end datetime - look for next entry
|
|
end_timestamp = DatabaseTimestamp.from_datetime(query_end)
|
|
query_end_timestamp = self.db_next_timestamp(end_timestamp)
|
|
if query_end_timestamp is None:
|
|
# Ensure at least end_datetime is included (excluded by definition)
|
|
query_end = end_datetime.add(seconds=1)
|
|
else:
|
|
query_end = DatabaseTimestamp.to_datetime(query_end_timestamp).add(seconds=1)
|
|
|
|
# Load raw lists (already sorted & filtered)
|
|
dates, values = self.key_to_lists(
|
|
key=key, start_datetime=query_start, end_datetime=query_end, dropna=dropna
|
|
)
|
|
values_len = len(values)
|
|
|
|
# Bring lists into shape
|
|
if values_len < 1:
|
|
# No values, assume at least one value set to None
|
|
if query_start is not None:
|
|
dates.append(query_start - interval)
|
|
else:
|
|
dates.append(to_datetime(to_maxtime=False))
|
|
values.append(None)
|
|
|
|
if query_start is not None:
|
|
start_index = 0
|
|
while start_index < values_len:
|
|
if compare_datetimes(dates[start_index], query_start).ge:
|
|
break
|
|
start_index += 1
|
|
if start_index == 0:
|
|
# No value before start
|
|
# Add dummy value
|
|
dates.insert(0, query_start - interval)
|
|
values.insert(0, values[0])
|
|
elif start_index > 1:
|
|
# Truncate all values before latest value before query_start
|
|
dates = dates[start_index - 1 :]
|
|
values = values[start_index - 1 :]
|
|
|
|
# Determine resample origin
|
|
if align_to_interval:
|
|
# Snap to nearest UTC epoch-aligned floor of the interval so that bucket
|
|
# timestamps land on wall-clock-round boundaries (:00, :15, :30, :45 etc.)
|
|
# regardless of sub-second jitter in query_start.
|
|
interval_sec = int(interval.total_seconds())
|
|
if interval_sec > 0:
|
|
start_epoch = int(query_start.timestamp())
|
|
floored_epoch = (start_epoch // interval_sec) * interval_sec
|
|
resample_origin: Union[str, pd.Timestamp] = pd.Timestamp(
|
|
floored_epoch, unit="s", tz="UTC"
|
|
)
|
|
else:
|
|
resample_origin = query_start
|
|
else:
|
|
# Original behaviour: align to the query window start.
|
|
resample_origin = query_start
|
|
else:
|
|
# We do not have a query_start, align resample buckets to midnight of first day
|
|
resample_origin = "start_day"
|
|
|
|
if query_end is not None:
|
|
if compare_datetimes(dates[-1], query_end).lt:
|
|
# Add dummy value at query_end
|
|
dates.append(query_end)
|
|
values.append(values[-1])
|
|
|
|
# Construct series
|
|
index = pd.to_datetime(dates, utc=True)
|
|
series = pd.Series(values, index=index, name=key)
|
|
if series.index.inferred_type != "datetime64":
|
|
raise TypeError(
|
|
f"Expected DatetimeIndex, but got {type(series.index)} "
|
|
f"infered to {series.index.inferred_type}: {series}"
|
|
)
|
|
|
|
# Check for numeric values
|
|
numeric = pd.to_numeric(series.dropna(), errors="coerce")
|
|
is_numeric = numeric.notna().all()
|
|
|
|
# Determine default fill method depending on dtype
|
|
if fill_method is None:
|
|
if is_numeric:
|
|
fill_method = "time"
|
|
else:
|
|
fill_method = "ffill"
|
|
|
|
# Perform the resampling
|
|
if is_numeric:
|
|
# Step 1: aggregate — collapses sub-interval data (e.g. 4x 15min → 1h mean).
|
|
# Produces NaN for buckets where no data existed at all.
|
|
resampled = pd.to_numeric(
|
|
series.resample(resample_freq, origin=resample_origin).mean(),
|
|
errors="coerce", # ← ensures float64, not object dtype
|
|
)
|
|
|
|
# Step 2: fill gaps — interpolates or fills the NaN buckets from step 1.
|
|
if fill_method in ("linear", "time"):
|
|
# Both are equivalent post-resample (equally-spaced index),
|
|
# but 'time' is kept as the label for clarity.
|
|
resampled = resampled.interpolate("time")
|
|
elif fill_method == "ffill":
|
|
resampled = resampled.ffill()
|
|
elif fill_method == "bfill":
|
|
resampled = resampled.bfill()
|
|
# fill_method == "none": leave NaNs in place
|
|
else:
|
|
resampled = series.resample(resample_freq, origin=resample_origin).first()
|
|
if fill_method == "ffill":
|
|
resampled = resampled.ffill()
|
|
elif fill_method == "bfill":
|
|
resampled = resampled.bfill()
|
|
|
|
logger.debug(
|
|
"Resampled for '{}' with length {}: {}...{}",
|
|
key,
|
|
len(resampled),
|
|
resampled[:10],
|
|
resampled[-10:],
|
|
)
|
|
|
|
# Convert the resampled series to a NumPy array
|
|
if start_datetime is not None and len(resampled) > 0:
|
|
resampled = resampled.truncate(before=start_datetime)
|
|
if end_datetime is not None and len(resampled) > 0:
|
|
resampled = resampled.truncate(after=end_datetime.subtract(seconds=1))
|
|
array = resampled.values
|
|
|
|
# Convert NaN to None if there are actually NaNs
|
|
if (
|
|
isinstance(array, np.ndarray)
|
|
and np.issubdtype(array.dtype.type, np.floating)
|
|
and pd.isna(array).any()
|
|
):
|
|
array = array.astype(object)
|
|
array[pd.isna(array)] = None
|
|
|
|
logger.debug(
|
|
"Array for '{}' with length {}: {}...{}", key, len(array), array[:10], array[-10:]
|
|
)
|
|
|
|
return array
|
|
|
|
def to_dataframe(
|
|
self,
|
|
start_datetime: Optional[DateTime] = None,
|
|
end_datetime: Optional[DateTime] = None,
|
|
) -> pd.DataFrame:
|
|
"""Converts the sequence of DataRecord instances into a Pandas DataFrame.
|
|
|
|
Args:
|
|
start_datetime (Optional[datetime]): The lower bound for filtering (inclusive).
|
|
Defaults to the earliest possible datetime if None.
|
|
end_datetime (Optional[datetime]): The upper bound for filtering (exclusive).
|
|
Defaults to the latest possible datetime if None.
|
|
|
|
Returns:
|
|
pd.DataFrame: A DataFrame containing the filtered data from all records.
|
|
"""
|
|
if not self.records:
|
|
return pd.DataFrame() # Return empty DataFrame if no records exist
|
|
|
|
# Ensure datetime objects are normalized
|
|
start_timestamp = (
|
|
DatabaseTimestamp.from_datetime(start_datetime) if start_datetime else None
|
|
)
|
|
end_timestamp = DatabaseTimestamp.from_datetime(end_datetime) if end_datetime else None
|
|
|
|
# Convert filtered records to a dictionary list
|
|
data = [
|
|
record.model_dump()
|
|
for record in self.db_iterate_records(
|
|
start_timestamp=start_timestamp, end_timestamp=end_timestamp
|
|
)
|
|
]
|
|
|
|
# Convert to DataFrame
|
|
df = pd.DataFrame(data)
|
|
if df.empty:
|
|
return df
|
|
|
|
# Ensure `date_time` column exists and use it for the index
|
|
if not "date_time" in df.columns:
|
|
error_msg = f"Cannot create dataframe: no `date_time` column in `{df}`."
|
|
logger.error(error_msg)
|
|
raise TypeError(error_msg)
|
|
df.index = pd.DatetimeIndex(df["date_time"])
|
|
return df
|
|
|
|
def delete_by_datetime(
|
|
self,
|
|
start_datetime: Optional[DateTime] = None,
|
|
end_datetime: Optional[DateTime] = None,
|
|
) -> int:
|
|
"""Delete records in the given datetime range.
|
|
|
|
Deletes records from memory and, if database storage is enabled, from the database.
|
|
Returns the maximum of in-memory and database deletions.
|
|
|
|
Args:
|
|
start_datetime: Start datetime (inclusive)
|
|
end_datetime: End datetime (exclusive)
|
|
|
|
Returns:
|
|
Number of records deleted (max of memory and database deletions)
|
|
"""
|
|
# Ensure datetime objects are normalized
|
|
start_timestamp = (
|
|
DatabaseTimestamp.from_datetime(start_datetime) if start_datetime else None
|
|
)
|
|
end_timestamp = DatabaseTimestamp.from_datetime(end_datetime) if end_datetime else None
|
|
|
|
return self.db_delete_records(start_timestamp=start_timestamp, end_timestamp=end_timestamp)
|
|
|
|
def key_delete_by_datetime(
|
|
self,
|
|
key: str,
|
|
start_datetime: Optional[DateTime] = None,
|
|
end_datetime: Optional[DateTime] = None,
|
|
) -> None:
|
|
"""Delete an attribute specified by `key` from records in the sequence within a given datetime range.
|
|
|
|
This method removes the attribute identified by `key` from records that have a `date_time` value falling
|
|
within the specified `start_datetime` (inclusive) and `end_datetime` (exclusive) range.
|
|
|
|
- If only `start_datetime` is specified, attributes will be removed from records from that date onward.
|
|
- If only `end_datetime` is specified, attributes will be removed from records up to that date.
|
|
- If neither `start_datetime` nor `end_datetime` is given, the attribute will be removed from all records.
|
|
|
|
Args:
|
|
key (str): The attribute name to delete from each record.
|
|
start_datetime (datetime, optional): The start datetime to begin attribute deletion (inclusive).
|
|
end_datetime (datetime, optional): The end datetime to stop attribute deletion (exclusive).
|
|
|
|
Raises:
|
|
KeyError: If `key` is not a valid attribute of the records.
|
|
"""
|
|
self._validate_key_writable(key)
|
|
|
|
# Ensure datetime objects are normalized
|
|
start_timestamp = (
|
|
DatabaseTimestamp.from_datetime(start_datetime) if start_datetime else None
|
|
)
|
|
end_timestamp = DatabaseTimestamp.from_datetime(end_datetime) if end_datetime else None
|
|
|
|
for record in self.db_iterate_records(start_timestamp, end_timestamp):
|
|
del record[key]
|
|
self.db_mark_dirty_record(record)
|
|
|
|
def save(self) -> bool:
|
|
"""Save data records to persistent storage.
|
|
|
|
Returns:
|
|
True in case the data records were saved, False otherwise.
|
|
"""
|
|
if not self.db_enabled:
|
|
return False
|
|
|
|
saved = self.db_save_records()
|
|
return saved > 0
|
|
|
|
def load(self) -> bool:
|
|
"""Load data records from from persistent storage.
|
|
|
|
Returns:
|
|
True in case the data records were loaded, False otherwise.
|
|
"""
|
|
if not self.db_enabled:
|
|
return False
|
|
|
|
loaded = self.db_load_records()
|
|
return loaded > 0
|
|
|
|
# ----------------------- DataSequence Database Protocol ---------------------
|
|
|
|
# Required interface propagated to derived class.
|
|
# - db_keep_duration
|
|
# - db_namespace
|
|
|
|
|
|
# ==================== DataProvider ====================
|
|
|
|
|
|
class DataProvider(SingletonMixin, DataSequence):
|
|
"""Abstract base class for data providers with singleton thread-safety and configurable data parameters.
|
|
|
|
This class serves as a base for managing generic data, providing an interface for derived
|
|
classes to maintain a single instance across threads. It offers attributes for managing
|
|
data and historical data retention.
|
|
|
|
Note:
|
|
Derived classes have to provide their own records field with correct record type set.
|
|
"""
|
|
|
|
update_datetime: Optional[AwareDatetime] = Field(
|
|
None, json_schema_extra={"description": "Latest update datetime for generic data"}
|
|
)
|
|
|
|
@abstractmethod
|
|
def provider_id(self) -> str:
|
|
"""Return the unique identifier for the data provider.
|
|
|
|
To be implemented by derived classes.
|
|
"""
|
|
return "DataProvider"
|
|
|
|
@abstractmethod
|
|
def enabled(self) -> bool:
|
|
"""Return True if the provider is enabled according to configuration.
|
|
|
|
To be implemented by derived classes.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
@abstractmethod
|
|
def _update_data(self, force_update: Optional[bool] = False) -> None:
|
|
"""Abstract method for custom data update logic, to be implemented by derived classes.
|
|
|
|
Args:
|
|
force_update (bool, optional): If True, forces the provider to update the data even if still cached.
|
|
"""
|
|
pass
|
|
|
|
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
if hasattr(self, "_initialized"):
|
|
return
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def db_namespace(self) -> str:
|
|
"""Namespace of database."""
|
|
return self.provider_id()
|
|
|
|
def update_data(
|
|
self,
|
|
force_enable: Optional[bool] = False,
|
|
force_update: Optional[bool] = False,
|
|
) -> None:
|
|
"""Calls the custom update function if enabled or forced.
|
|
|
|
Args:
|
|
force_enable (bool, optional): If True, forces the update even if the provider is disabled.
|
|
force_update (bool, optional): If True, forces the provider to update the data even if still cached.
|
|
"""
|
|
# Check after configuration is updated.
|
|
if not force_enable and not self.enabled():
|
|
return
|
|
|
|
# Call the custom update logic
|
|
self._update_data(force_update=force_update)
|
|
|
|
|
|
# ==================== DataImportMixin ====================
|
|
|
|
|
|
class DataImportMixin(StartMixin):
|
|
"""Mixin class for import of generic data.
|
|
|
|
This class is designed to handle generic data provided in the form of a key-value dictionary.
|
|
|
|
- **Keys**: Represent identifiers from the record keys of a specific data.
|
|
- **Values**: Are lists of data values starting at a specified start_datetime, where
|
|
each value corresponds to a subsequent time interval (e.g., hourly).
|
|
|
|
Two special keys are handled. start_datetime may be used to defined the starting datetime of
|
|
the values. ìnterval may be used to define the fixed time interval between two values.
|
|
|
|
On import self.update_value(datetime, key, value) is called which has to be provided.
|
|
Also self.ems_start_datetime may be necessary as a default in case start_datetime is not
|
|
given.
|
|
|
|
"""
|
|
|
|
# Attributes required but defined elsehere.
|
|
# - start_datetime
|
|
# - record_keys_writable
|
|
# - update_value
|
|
|
|
def import_from_dict(
|
|
self,
|
|
import_data: dict,
|
|
key_prefix: str = "",
|
|
start_datetime: Optional[DateTime] = None,
|
|
interval: Optional[Duration] = None,
|
|
) -> None:
|
|
"""Updates generic data by importing it from a dictionary.
|
|
|
|
This method reads generic data from a dictionary, matches keys based on the
|
|
record keys and the provided `key_prefix`, and updates the data values sequentially.
|
|
All value lists must have the same length.
|
|
|
|
Args:
|
|
import_data (dict): Dictionary containing the generic data with optional
|
|
'start_datetime' and 'interval' keys.
|
|
key_prefix (str, optional): A prefix to filter relevant keys from the generic data.
|
|
Only keys starting with this prefix will be considered. Defaults to an empty string.
|
|
start_datetime (DateTime, optional): Start datetime of values if not in dict.
|
|
interval (Duration, optional): The fixed time interval if not in dict.
|
|
|
|
Raises:
|
|
ValueError: If value lists have different lengths or if datetime conversion fails.
|
|
"""
|
|
# Handle datetime and interval from dict or parameters
|
|
if "start_datetime" in import_data:
|
|
try:
|
|
start_datetime = to_datetime(import_data["start_datetime"])
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid start_datetime in import data: {e}")
|
|
|
|
if start_datetime is None:
|
|
start_datetime = self.ems_start_datetime
|
|
|
|
if "interval" in import_data:
|
|
try:
|
|
interval = to_duration(import_data["interval"])
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid interval in import data: {e}")
|
|
|
|
if interval is None:
|
|
interval = to_duration("1 hour")
|
|
interval_steps_per_hour = int(3600 / interval.total_seconds())
|
|
if interval.total_seconds() * interval_steps_per_hour != 3600:
|
|
error_msg = f"Interval {interval} does not fit into hour."
|
|
logger.error(error_msg)
|
|
raise NotImplementedError(error_msg)
|
|
|
|
# Filter keys based on key_prefix and record_keys_writable
|
|
valid_keys = [
|
|
key
|
|
for key in import_data.keys()
|
|
if key.startswith(key_prefix)
|
|
and key in self.record_keys_writable # type: ignore
|
|
and key not in ("start_datetime", "interval")
|
|
]
|
|
|
|
if not valid_keys:
|
|
return
|
|
|
|
# Validate all value lists have the same length
|
|
value_lengths = []
|
|
for key in valid_keys:
|
|
value_list = import_data[key]
|
|
if not isinstance(value_list, (list, tuple, np.ndarray)):
|
|
raise ValueError(f"Value for key '{key}' must be a list, tuple, or array")
|
|
value_lengths.append(len(value_list))
|
|
|
|
if len(set(value_lengths)) > 1:
|
|
raise ValueError(
|
|
f"All value lists must have the same length. Found lengths: "
|
|
f"{dict(zip(valid_keys, value_lengths))}"
|
|
)
|
|
|
|
values_count = value_lengths[0]
|
|
|
|
# Process each valid key
|
|
start_timestamp = DatabaseTimestamp.from_datetime(start_datetime)
|
|
for key in valid_keys:
|
|
try:
|
|
values = import_data[key]
|
|
|
|
# Update values, skipping any None/NaN
|
|
for value_index, value_db_datetime in enumerate(
|
|
self.db_generate_timestamps(start_timestamp, values_count, interval) # type: ignore[attr-defined]
|
|
):
|
|
value = values[value_index]
|
|
value_datetime = DatabaseTimestamp.to_datetime(value_db_datetime)
|
|
if value is not None and not pd.isna(value):
|
|
self.update_value(value_datetime, key, value) # type: ignore
|
|
|
|
except (IndexError, TypeError) as e:
|
|
raise ValueError(f"Error processing values for key '{key}': {e}")
|
|
|
|
def import_from_dataframe(
|
|
self,
|
|
df: pd.DataFrame,
|
|
key_prefix: str = "",
|
|
start_datetime: Optional[DateTime] = None,
|
|
interval: Optional[Duration] = None,
|
|
) -> None:
|
|
"""Updates generic data by importing it from a pandas DataFrame.
|
|
|
|
This method reads generic data from a DataFrame, matches columns based on the
|
|
record keys and the provided `key_prefix`, and updates the data values using
|
|
the DataFrame's index as timestamps.
|
|
|
|
Args:
|
|
df (pd.DataFrame): DataFrame containing the generic data with datetime index
|
|
or sequential values.
|
|
key_prefix (str, optional): A prefix to filter relevant columns from the DataFrame.
|
|
Only columns starting with this prefix will be considered. Defaults to an empty string.
|
|
start_datetime (DateTime, optional): Start datetime if DataFrame doesn't have datetime index.
|
|
interval (Duration, optional): The fixed time interval if DataFrame doesn't have datetime index.
|
|
|
|
Raises:
|
|
ValueError: If DataFrame structure is invalid or datetime conversion fails.
|
|
"""
|
|
# Validate DataFrame
|
|
if not isinstance(df, pd.DataFrame):
|
|
raise ValueError("Input must be a pandas DataFrame")
|
|
|
|
# Handle datetime index
|
|
if isinstance(df.index, pd.DatetimeIndex):
|
|
try:
|
|
index_datetimes = [to_datetime(dt) for dt in df.index]
|
|
has_datetime_index = True
|
|
except (ValueError, TypeError) as e:
|
|
raise ValueError(f"Invalid datetime index in DataFrame: {e}")
|
|
else:
|
|
if start_datetime is None:
|
|
start_datetime = self.ems_start_datetime
|
|
has_datetime_index = False
|
|
|
|
# Filter columns based on key_prefix and record_keys_writable
|
|
valid_columns = [
|
|
col
|
|
for col in df.columns
|
|
if col.startswith(key_prefix) and col in self.record_keys_writable # type: ignore
|
|
]
|
|
|
|
if not valid_columns:
|
|
return
|
|
|
|
# For DataFrame, length validation is implicit since all columns have same length
|
|
values_count = len(df)
|
|
|
|
# Generate value_datetime_mapping once if not using datetime index
|
|
if not has_datetime_index:
|
|
# Create values datetime list
|
|
start_timestamp = DatabaseTimestamp.from_datetime(start_datetime)
|
|
value_db_datetimes = list(
|
|
self.db_generate_timestamps(start_timestamp, values_count, interval) # type: ignore[attr-defined]
|
|
)
|
|
|
|
# Process each valid column
|
|
for column in valid_columns:
|
|
try:
|
|
values = df[column].tolist()
|
|
|
|
if has_datetime_index:
|
|
# Use the DataFrame's datetime index
|
|
for dt, value in zip(index_datetimes, values):
|
|
if value is not None and not pd.isna(value):
|
|
self.update_value(dt, column, value) # type: ignore
|
|
else:
|
|
# Use the pre-generated datetime index
|
|
for value_index in range(values_count):
|
|
value = values[value_index]
|
|
value_datetime = DatabaseTimestamp.to_datetime(
|
|
value_db_datetimes[value_index]
|
|
)
|
|
if value is not None and not pd.isna(value):
|
|
self.update_value(value_datetime, column, value) # type: ignore
|
|
|
|
except Exception as e:
|
|
raise ValueError(f"Error processing column '{column}': {e}")
|
|
|
|
def import_from_json(
|
|
self,
|
|
json_str: str,
|
|
key_prefix: str = "",
|
|
start_datetime: Optional[DateTime] = None,
|
|
interval: Optional[Duration] = None,
|
|
) -> None:
|
|
"""Updates generic data by importing it from a JSON string.
|
|
|
|
This method reads generic data from a JSON string, matches keys based on the
|
|
record keys and the provided `key_prefix`, and updates the data values sequentially,
|
|
starting from the `start_datetime`.
|
|
|
|
If start_datetime and or interval is given in the JSON dict it will be used. Otherwise
|
|
the given parameters are used. If None is given start_datetime defaults to
|
|
'self.ems_start_datetime' and interval defaults to 1 hour.
|
|
|
|
Args:
|
|
json_str (str): The JSON string containing the generic data.
|
|
key_prefix (str, optional): A prefix to filter relevant keys from the generic data.
|
|
Only keys starting with this prefix will be considered. Defaults to an empty string.
|
|
start_datetime (DateTime, optional): Start datetime of values.
|
|
interval (duration, optional): The fixed time interval. Defaults to 1 hour.
|
|
|
|
Raises:
|
|
JSONDecodeError: If the file content is not valid JSON.
|
|
|
|
Example:
|
|
Given a JSON string with the following content and `key_prefix = "load"`, only the
|
|
"loadforecast_power_w" key will be processed even though both keys are in the record.
|
|
|
|
.. code-block:: json
|
|
|
|
{
|
|
"start_datetime": "2024-11-10 00:00:00",
|
|
"interval": "30 minutes",
|
|
"loadforecast_power_w": [20.5, 21.0, 22.1],
|
|
"other_xyz: [10.5, 11.0, 12.1]
|
|
}
|
|
|
|
"""
|
|
# Strip quotes if provided - does not effect unquoted string
|
|
json_str = json_str.strip() # strip white space at start and end
|
|
if (json_str.startswith("'") and json_str.endswith("'")) or (
|
|
json_str.startswith('"') and json_str.endswith('"')
|
|
):
|
|
json_str = json_str[1:-1] # strip outer quotes
|
|
json_str = json_str.strip() # strip remaining white space at start and end
|
|
|
|
# Try pandas dataframe with orient="split"
|
|
try:
|
|
import_data = PydanticDateTimeDataFrame.model_validate_json(json_str)
|
|
self.import_from_dataframe(import_data.to_dataframe())
|
|
return
|
|
except ValidationError as e:
|
|
error_msg = ""
|
|
for error in e.errors():
|
|
field = " -> ".join(str(x) for x in error["loc"])
|
|
message = error["msg"]
|
|
error_type = error["type"]
|
|
error_msg += f"Field: {field}\nError: {message}\nType: {error_type}\n"
|
|
logger.debug(f"PydanticDateTimeDataFrame import: {error_msg}")
|
|
|
|
# Try dictionary with special keys start_datetime and interval
|
|
try:
|
|
import_data = PydanticDateTimeData.model_validate_json(json_str)
|
|
self.import_from_dict(import_data.to_dict())
|
|
return
|
|
except ValidationError as e:
|
|
error_msg = ""
|
|
for error in e.errors():
|
|
field = " -> ".join(str(x) for x in error["loc"])
|
|
message = error["msg"]
|
|
error_type = error["type"]
|
|
error_msg += f"Field: {field}\nError: {message}\nType: {error_type}\n"
|
|
logger.debug(f"PydanticDateTimeData import: {error_msg}")
|
|
|
|
# Use simple dict format
|
|
try:
|
|
import_data = json.loads(json_str)
|
|
self.import_from_dict(
|
|
import_data, key_prefix=key_prefix, start_datetime=start_datetime, interval=interval
|
|
)
|
|
except Exception as e:
|
|
error_msg = f"Invalid JSON string '{json_str}': {e}"
|
|
logger.debug(error_msg)
|
|
raise ValueError(error_msg) from e
|
|
|
|
def import_from_file(
|
|
self,
|
|
import_file_path: Path,
|
|
key_prefix: str = "",
|
|
start_datetime: Optional[DateTime] = None,
|
|
interval: Optional[Duration] = None,
|
|
) -> None:
|
|
"""Updates generic data by importing it from a file.
|
|
|
|
This method reads generic data from a JSON file, matches keys based on the
|
|
record keys and the provided `key_prefix`, and updates the data values sequentially,
|
|
starting from the `start_datetime`. Each data value is associated with an hourly
|
|
interval.
|
|
|
|
If start_datetime and or interval is given in the JSON dict it will be used. Otherwise
|
|
the given parameters are used. If None is given start_datetime defaults to
|
|
'self.ems_start_datetime' and interval defaults to 1 hour.
|
|
|
|
Args:
|
|
import_file_path (Path): The path to the JSON file containing the generic data.
|
|
key_prefix (str, optional): A prefix to filter relevant keys from the generic data.
|
|
Only keys starting with this prefix will be considered. Defaults to an empty string.
|
|
start_datetime (DateTime, optional): Start datetime of values.
|
|
interval (duration, optional): The fixed time interval. Defaults to 1 hour.
|
|
|
|
Raises:
|
|
FileNotFoundError: If the specified file does not exist.
|
|
JSONDecodeError: If the file content is not valid JSON.
|
|
|
|
Example:
|
|
Given a JSON file with the following content and `key_prefix = "load"`, only the
|
|
"loadforecast_power_w" key will be processed even though both keys are in the record.
|
|
|
|
.. code-block:: json
|
|
|
|
{
|
|
"loadforecast_power_w": [20.5, 21.0, 22.1],
|
|
"other_xyz: [10.5, 11.0, 12.1],
|
|
}
|
|
|
|
"""
|
|
with import_file_path.open("r", encoding="utf-8", newline=None) as import_file:
|
|
import_str = import_file.read()
|
|
self.import_from_json(
|
|
import_str, key_prefix=key_prefix, start_datetime=start_datetime, interval=interval
|
|
)
|
|
|
|
|
|
# ==================== DataImportProvider ====================
|
|
|
|
|
|
class DataImportProvider(DataImportMixin, DataProvider):
|
|
"""Abstract base class for data providers that import generic data.
|
|
|
|
This class is designed to handle generic data provided in the form of a key-value dictionary.
|
|
|
|
- **Keys**: Represent identifiers from the record keys of a specific data.
|
|
- **Values**: Are lists of data values starting at a specified `start_datetime`, where
|
|
each value corresponds to a subsequent time interval (e.g., hourly).
|
|
|
|
Subclasses must implement the logic for managing generic data based on the imported records.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
# ==================== DataContainer ====================
|
|
|
|
|
|
class DataContainer(SingletonMixin, DataABC, MutableMapping):
|
|
"""A container for managing multiple DataProvider instances.
|
|
|
|
This class enables access to data from multiple data providers, supporting retrieval and
|
|
aggregation of their data as Pandas Series objects. It acts as a dictionary-like structure
|
|
where each key represents a specific data field, and the value is a Pandas Series containing
|
|
combined data from all DataProvider instances for that key.
|
|
|
|
Note:
|
|
Derived classes have to provide their own providers field with correct provider type set.
|
|
"""
|
|
|
|
# To be overloaded by derived classes.
|
|
providers: list[DataProvider] = Field(
|
|
default_factory=list, json_schema_extra={"description": "List of data providers"}
|
|
)
|
|
|
|
@field_validator("providers", mode="after")
|
|
def check_providers(cls, value: list[DataProvider]) -> list[DataProvider]:
|
|
# Check each item in the list
|
|
for item in value:
|
|
if not isinstance(item, DataProvider):
|
|
raise TypeError(
|
|
f"Each item in the providers list must be a DataProvider, got {type(item).__name__}"
|
|
)
|
|
return value
|
|
|
|
@property
|
|
def enabled_providers(self) -> list[Any]:
|
|
"""List of providers that are currently enabled."""
|
|
enab = []
|
|
for provider in self.providers:
|
|
if provider.enabled():
|
|
enab.append(provider)
|
|
return enab
|
|
|
|
@property
|
|
def record_keys(self) -> list[str]:
|
|
"""Returns the keys of all fields in the data records of all enabled providers."""
|
|
key_set = set(
|
|
chain.from_iterable(provider.record_keys for provider in self.enabled_providers)
|
|
)
|
|
return list(key_set)
|
|
|
|
@property
|
|
def record_keys_writable(self) -> list[str]:
|
|
"""Returns the keys of all fields in the data records that are writable of all enabled providers."""
|
|
key_set = set(
|
|
chain.from_iterable(
|
|
provider.record_keys_writable for provider in self.enabled_providers
|
|
)
|
|
)
|
|
return list(key_set)
|
|
|
|
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
if hasattr(self, "_initialized"):
|
|
return
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def __getitem__(self, key: str) -> pd.Series:
|
|
"""Retrieve a Pandas Series for a specified key from the data in each DataProvider.
|
|
|
|
Iterates through providers to find and return the first available Series for the specified key.
|
|
|
|
Args:
|
|
key (str): The field name to retrieve, representing a data attribute in DataRecords.
|
|
|
|
Returns:
|
|
pd.Series: A Pandas Series containing aggregated data for the specified key.
|
|
|
|
Raises:
|
|
KeyError: If no provider contains data for the specified key.
|
|
"""
|
|
series = None
|
|
for provider in self.enabled_providers:
|
|
try:
|
|
series = provider.key_to_series(key)
|
|
break
|
|
except KeyError:
|
|
continue
|
|
|
|
if series is None:
|
|
raise KeyError(f"No data found for key '{key}'.")
|
|
|
|
return series
|
|
|
|
def __setitem__(self, key: str, value: pd.Series) -> None:
|
|
"""Add or merge a Pandas Series for a specified key into the records of an appropriate provider.
|
|
|
|
Attempts to update or insert the provided Series data in each provider. If no provider supports
|
|
the specified key, an error is raised.
|
|
|
|
Args:
|
|
key (str): The field name to update, representing a data attribute in DataRecords.
|
|
value (pd.Series): A Pandas Series containing data for the specified key.
|
|
|
|
Raises:
|
|
ValueError: If `value` is not an instance of `pd.Series`.
|
|
KeyError: If no provider supports the specified key.
|
|
"""
|
|
if not isinstance(value, pd.Series):
|
|
raise ValueError("Value must be an instance of pd.Series.")
|
|
|
|
for provider in self.enabled_providers:
|
|
try:
|
|
provider.key_from_series(key, value)
|
|
break
|
|
except KeyError:
|
|
continue
|
|
else:
|
|
raise KeyError(f"Key '{key}' not found in any provider.")
|
|
|
|
def __delitem__(self, key: str) -> None:
|
|
"""Set the value of the specified key in the data records of each provider to None.
|
|
|
|
Args:
|
|
key (str): The field name in DataRecords to clear.
|
|
|
|
Raises:
|
|
KeyError: If the key is not found in any provider.
|
|
"""
|
|
for provider in self.enabled_providers:
|
|
try:
|
|
provider.key_delete_by_datetime(key)
|
|
break
|
|
except KeyError:
|
|
continue
|
|
else:
|
|
raise KeyError(f"Key '{key}' not found in any provider.")
|
|
|
|
def __iter__(self) -> Iterator[str]:
|
|
"""Return an iterator over all unique keys available across providers.
|
|
|
|
Returns:
|
|
Iterator[str]: An iterator over the unique keys from all providers.
|
|
"""
|
|
return iter(self.record_keys)
|
|
|
|
def __len__(self) -> int:
|
|
"""Return the number of keys in the container.
|
|
|
|
Returns:
|
|
int: The total number of keys in this container.
|
|
"""
|
|
return len(self.record_keys)
|
|
|
|
def __repr__(self) -> str:
|
|
"""Provide a string representation of the DataContainer instance.
|
|
|
|
Returns:
|
|
str: A string representing the container and its contained providers.
|
|
"""
|
|
return f"{self.__class__.__name__}({self.providers})"
|
|
|
|
def keys(self) -> KeysView[str]:
|
|
return dict.fromkeys(self.record_keys).keys()
|
|
|
|
def update_data(
|
|
self,
|
|
force_enable: Optional[bool] = False,
|
|
force_update: Optional[bool] = False,
|
|
) -> None:
|
|
"""Update data.
|
|
|
|
Args:
|
|
force_enable (bool, optional): If True, forces the update even if a provider is disabled.
|
|
force_update (bool, optional): If True, forces the providers to update the data even if still cached.
|
|
"""
|
|
for provider in self.providers:
|
|
try:
|
|
provider.update_data(force_enable=force_enable, force_update=force_update)
|
|
except Exception as ex:
|
|
error = f"Provider {provider.provider_id()} fails on update - enabled={provider.enabled()}, force_enable={force_enable}, force_update={force_update}: {ex}"
|
|
if provider.enabled():
|
|
# The active provider failed — this is a real error worth propagating.
|
|
logger.error(error)
|
|
raise RuntimeError(error)
|
|
else:
|
|
# A non-active provider failed (e.g. missing config while force_enable=True).
|
|
# Log as warning and continue so the remaining providers still run.
|
|
logger.warning(error)
|
|
|
|
def key_to_series(
|
|
self,
|
|
key: str,
|
|
start_datetime: Optional[DateTime] = None,
|
|
end_datetime: Optional[DateTime] = None,
|
|
dropna: Optional[bool] = None,
|
|
) -> pd.Series:
|
|
"""Extract a series indexed by the date_time field from data records within an optional date range.
|
|
|
|
Iterates through providers to find and return the first available series for the specified key.
|
|
|
|
Args:
|
|
key (str): The field name in the DataRecord from which to extract values.
|
|
start_datetime (datetime, optional): The start date for filtering the records (inclusive).
|
|
end_datetime (datetime, optional): The end date for filtering the records (exclusive).
|
|
dropna: (bool, optional): Whether to drop NAN/ None values before processing. Defaults to True.
|
|
|
|
Returns:
|
|
pd.Series: A Pandas Series with the index as the date_time of each record
|
|
and the values extracted from the specified key.
|
|
|
|
Raises:
|
|
KeyError: If the specified key is not found in any of the DataRecords.
|
|
"""
|
|
series = None
|
|
for provider in self.enabled_providers:
|
|
try:
|
|
series = provider.key_to_series(
|
|
key,
|
|
start_datetime=start_datetime,
|
|
end_datetime=end_datetime,
|
|
dropna=dropna,
|
|
)
|
|
break
|
|
except KeyError:
|
|
continue
|
|
|
|
if series is None:
|
|
raise KeyError(f"No data found for key '{key}'.")
|
|
|
|
return series
|
|
|
|
def key_to_array(
|
|
self,
|
|
key: str,
|
|
start_datetime: Optional[DateTime] = None,
|
|
end_datetime: Optional[DateTime] = None,
|
|
interval: Optional[Duration] = None,
|
|
fill_method: Optional[str] = None,
|
|
boundary: Optional[str] = "context",
|
|
) -> NDArray[Shape["*"], Any]:
|
|
"""Retrieve an array indexed by fixed time intervals for a specified key from the data in each DataProvider.
|
|
|
|
Iterates through providers to find and return the first available array for the specified key.
|
|
|
|
Args:
|
|
key (str): The field name to retrieve, representing a data attribute in DataRecords.
|
|
start_datetime (datetime, optional): The start date for filtering the records (inclusive).
|
|
end_datetime (datetime, optional): The end date for filtering the records (exclusive).
|
|
interval (duration, optional): The fixed time interval. Defaults to 1 hour.
|
|
fill_method (str): Method to handle missing values during resampling.
|
|
- 'linear': Linearly interpolate missing values (for numeric data only).
|
|
- 'ffill': Forward fill missing values.
|
|
- 'bfill': Backward fill missing values.
|
|
- 'none': Defaults to 'linear' for numeric values, otherwise 'ffill'.
|
|
|
|
Returns:
|
|
np.ndarray: A NumPy array containing aggregated data for the specified key.
|
|
|
|
Raises:
|
|
KeyError: If no provider contains data for the specified key.
|
|
|
|
Todo:
|
|
Cache the result in memory until the next `update_data` call.
|
|
"""
|
|
array = None
|
|
for provider in self.enabled_providers:
|
|
try:
|
|
array = provider.key_to_array(
|
|
key,
|
|
start_datetime=start_datetime,
|
|
end_datetime=end_datetime,
|
|
interval=interval,
|
|
fill_method=fill_method,
|
|
boundary=boundary,
|
|
)
|
|
break
|
|
except KeyError:
|
|
continue
|
|
|
|
if array is None:
|
|
raise KeyError(f"No data found for key '{key}'.")
|
|
|
|
return array
|
|
|
|
def keys_to_dataframe(
|
|
self,
|
|
keys: list[str],
|
|
start_datetime: Optional[DateTime] = None,
|
|
end_datetime: Optional[DateTime] = None,
|
|
interval: Optional[Any] = None, # Duration assumed
|
|
fill_method: Optional[str] = None,
|
|
) -> pd.DataFrame:
|
|
"""Retrieve a dataframe indexed by fixed time intervals for specified keys from the data in each DataProvider.
|
|
|
|
Generates a pandas DataFrame using the NumPy arrays for each specified key, ensuring a common time index.
|
|
|
|
Args:
|
|
keys (list[str]): A list of field names to retrieve.
|
|
start_datetime (datetime, optional): Start date for filtering records (inclusive).
|
|
end_datetime (datetime, optional): End date for filtering records (exclusive).
|
|
interval (duration, optional): The fixed time interval. Defaults to 1 hour.
|
|
fill_method (str, optional): Method to handle missing values during resampling.
|
|
- 'linear': Linearly interpolate missing values (for numeric data only).
|
|
- 'ffill': Forward fill missing values.
|
|
- 'bfill': Backward fill missing values.
|
|
- 'none': Defaults to 'linear' for numeric values, otherwise 'ffill'.
|
|
|
|
Returns:
|
|
pd.DataFrame: A DataFrame where each column represents a key's array with a common time index.
|
|
|
|
Raises:
|
|
KeyError: If no valid data is found for any of the requested keys.
|
|
ValueError: If any retrieved array has a different time index than the first one.
|
|
"""
|
|
# Ensure datetime objects are normalized
|
|
start_datetime = to_datetime(start_datetime, to_maxtime=False) if start_datetime else None
|
|
end_datetime = to_datetime(end_datetime, to_maxtime=False) if end_datetime else None
|
|
if interval is None:
|
|
interval = to_duration("1 hour")
|
|
if start_datetime is None:
|
|
# Take earliest datetime of all providers that are enabled
|
|
for provider in self.enabled_providers:
|
|
if start_datetime is None:
|
|
start_datetime = provider.min_datetime
|
|
elif (
|
|
provider.min_datetime
|
|
and compare_datetimes(provider.min_datetime, start_datetime).lt
|
|
):
|
|
start_datetime = provider.min_datetime
|
|
if end_datetime is None:
|
|
# Take latest datetime of all providers that are enabled
|
|
for provider in self.enabled_providers:
|
|
if end_datetime is None:
|
|
end_datetime = provider.max_datetime
|
|
elif (
|
|
provider.max_datetime
|
|
and compare_datetimes(provider.max_datetime, end_datetime).gt
|
|
):
|
|
end_datetime = provider.min_datetime
|
|
if end_datetime:
|
|
end_datetime.add(seconds=1)
|
|
|
|
# Create a DatetimeIndex based on start, end, and interval
|
|
if start_datetime is None or end_datetime is None:
|
|
raise ValueError(
|
|
f"Can not determine datetime range. Got '{start_datetime}'..'{end_datetime}'."
|
|
)
|
|
reference_index = pd.date_range(
|
|
start=start_datetime,
|
|
end=end_datetime,
|
|
freq=interval,
|
|
inclusive="left",
|
|
)
|
|
|
|
data = {}
|
|
for key in keys:
|
|
try:
|
|
array = self.key_to_array(key, start_datetime, end_datetime, interval, fill_method)
|
|
|
|
if len(array) != len(reference_index):
|
|
raise ValueError(
|
|
f"Array length mismatch for key '{key}' (expected {len(reference_index)}, got {len(array)})"
|
|
)
|
|
|
|
data[key] = array
|
|
except KeyError as e:
|
|
raise KeyError(f"Failed to retrieve data for key '{key}': {e}")
|
|
|
|
if not data:
|
|
raise KeyError(f"No valid data found for the requested keys {keys}.")
|
|
|
|
return pd.DataFrame(data, index=reference_index)
|
|
|
|
def provider_by_id(self, provider_id: str) -> DataProvider:
|
|
"""Retrieves a data provider by its unique identifier.
|
|
|
|
This method searches through the list of all available providers and
|
|
returns the first provider whose `provider_id` matches the given
|
|
`provider_id`. If no matching provider is found, the method returns `None`.
|
|
|
|
Args:
|
|
provider_id (str): The unique identifier of the desired data provider.
|
|
|
|
Returns:
|
|
DataProvider: The data provider matching the given `provider_id`.
|
|
|
|
Raises:
|
|
ValueError if provider id is unknown.
|
|
|
|
Example:
|
|
provider = data.provider_by_id("WeatherImport")
|
|
"""
|
|
providers = {provider.provider_id(): provider for provider in self.providers}
|
|
if provider_id not in providers:
|
|
error_msg = f"Unknown provider id: '{provider_id}' of '{providers.keys()}'."
|
|
logger.error(error_msg)
|
|
raise ValueError(error_msg)
|
|
return providers[provider_id]
|
|
|
|
# ----------------------- DataContainer Database Protocol ---------------------
|
|
|
|
def save(self) -> None:
|
|
"""Save data records to persistent storage."""
|
|
for provider in self.providers:
|
|
try:
|
|
provider.save()
|
|
except Exception as ex:
|
|
error = f"Provider {provider.provider_id()} fails on save: {ex}"
|
|
logger.error(error)
|
|
raise RuntimeError(error)
|
|
|
|
def load(self) -> None:
|
|
"""Load data records from from persistent storage."""
|
|
for provider in self.providers:
|
|
try:
|
|
provider.load()
|
|
except Exception as ex:
|
|
error = f"Provider {provider.provider_id()} fails on load: {ex}"
|
|
logger.error(error)
|
|
raise RuntimeError(error)
|
|
|
|
def db_vacuum(self) -> None:
|
|
"""Remove old records of all providers from database to free space."""
|
|
for provider in self.providers:
|
|
try:
|
|
provider.db_vacuum()
|
|
except Exception as ex:
|
|
error = f"Provider {provider.provider_id()} fails on db vacuum: {ex}"
|
|
logger.error(error)
|
|
raise RuntimeError(error)
|
|
|
|
def db_compact(self) -> None:
|
|
"""Apply tiered compaction to all providers to reduce storage while retaining coverage."""
|
|
for provider in self.providers:
|
|
try:
|
|
provider.db_compact()
|
|
except Exception as ex:
|
|
error = f"Provider {provider.provider_id()} fails on db_compact: {ex}"
|
|
logger.error(error)
|
|
raise RuntimeError(error)
|
|
|
|
def db_get_stats(self) -> dict:
|
|
"""Get comprehensive statistics about database storage for all providers.
|
|
|
|
Returns:
|
|
Dictionary with statistics
|
|
"""
|
|
db_stats = {}
|
|
for provider in self.providers:
|
|
try:
|
|
db_stats[provider.db_namespace()] = provider.db_get_stats()
|
|
except Exception as ex:
|
|
error = f"Provider {provider.provider_id()} fails on db vacuum: {ex}"
|
|
logger.error(error)
|
|
raise RuntimeError(error)
|
|
return db_stats
|