"""Abstract and base classes for generic data. This module provides classes for managing and processing generic data in a flexible, configurable manner. It includes classes to handle configurations, record structures, sequences, and containers for generic data, enabling efficient storage, retrieval, and manipulation of data records. This module is designed for use in predictive modeling workflows, facilitating the organization, serialization, and manipulation of configuration and generic data in a clear, scalable, and structured manner. """ import difflib import json from abc import abstractmethod from collections.abc import KeysView, MutableMapping from itertools import chain from pathlib import Path from typing import ( Any, Dict, Iterator, Literal, Optional, Tuple, Type, Union, get_args, overload, ) import numpy as np import pandas as pd from loguru import logger from numpydantic import NDArray, Shape from pydantic import ( AwareDatetime, ConfigDict, Field, ValidationError, computed_field, field_validator, model_validator, ) from akkudoktoreos.core.coreabc import ( ConfigMixin, SingletonMixin, StartMixin, ) from akkudoktoreos.core.databaseabc import ( UNBOUND_WINDOW, DatabaseRecordProtocolMixin, DatabaseTimestamp, DatabaseTimeWindowType, ) from akkudoktoreos.core.pydantic import ( PydanticBaseModel, PydanticDateTimeData, PydanticDateTimeDataFrame, ) from akkudoktoreos.utils.datetimeutil import ( DateTime, Duration, compare_datetimes, to_datetime, to_duration, ) class DataABC(ConfigMixin, StartMixin, PydanticBaseModel): """Base class for handling generic data. Enables access to EOS configuration data (attribute `config`). """ pass # ==================== DataRecord ==================== class DataRecord(DataABC, MutableMapping): """Base class for data records, enabling dynamic access to fields defined in derived classes. Fields can be accessed and mutated both using dictionary-style access (`record['field_name']`) and attribute-style access (`record.field_name`). The data record also provides configured field like data. Configuration has to be done by the derived class. Configuration is a list of key strings, which is usually taken from the EOS configuration. The internal field for these data `configured_data` is mostly hidden from dictionary-style and attribute-style access. Attributes: date_time (DateTime): Aware datetime indicating when the data record applies. Defaults to now. Configurations: - Allows mutation after creation. - Supports non-standard data types like `datetime`. """ date_time: Optional[DateTime] = Field( default=None, json_schema_extra={"description": "DateTime"} ) configured_data: dict[str, Any] = Field( default_factory=dict, json_schema_extra={ "description": "Configured field like data", "examples": [{"load0_mr": 40421}], }, ) # Pydantic v2 model configuration model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True) @model_validator(mode="before") @classmethod def init_configured_field_like_data(cls, data: Any) -> Any: """Extracts configured data keys from the input and assigns them to `configured_data`. This validator is called before the model is initialized. It filters out any keys from the input dictionary that are listed in the configured data keys, and moves them into the `configured_data` field of the model. This enables flexible, key-driven population of dynamic data while keeping the model schema clean. Args: data (Any): The raw input data used to initialize the model. Returns: Any: The modified input data dictionary, with configured keys moved to `configured_data`. """ if not isinstance(data, dict): return data configured_keys: Union[list[str], set] = cls.configured_data_keys() or set() extracted = {k: data.pop(k) for k in list(data.keys()) if k in configured_keys} if extracted: data.setdefault("configured_data", {}).update(extracted) return data @field_validator("date_time", mode="before") @classmethod def transform_to_datetime(cls, value: Any) -> Optional[DateTime]: """Converts various datetime formats into DateTime.""" if value is None: # Allow to set to default. return None return to_datetime(value) @classmethod def configured_data_keys(cls) -> Optional[list[str]]: """Return the keys for the configured field like data. Can be overwritten by derived classes to define specific field like data. Usually provided by configuration data. """ return None @classmethod def record_keys(cls) -> list[str]: """Returns the keys of all fields in the data record.""" key_list = [] key_list.extend(list(cls.model_fields.keys())) key_list.extend(list(cls.__pydantic_decorators__.computed_fields.keys())) # Add also keys that may be added by configuration key_list.remove("configured_data") configured_keys = cls.configured_data_keys() if configured_keys is not None: key_list.extend(configured_keys) return key_list @classmethod def record_keys_writable(cls) -> list[str]: """Returns the keys of all fields in the data record that are writable.""" keys_writable = [] keys_writable.extend(list(cls.model_fields.keys())) # Add also keys that may be added by configuration keys_writable.remove("configured_data") configured_keys = cls.configured_data_keys() if configured_keys is not None: keys_writable.extend(configured_keys) return keys_writable def _validate_key_writable(self, key: str) -> None: """Verify that a specified key exists and is writable in the current record keys. Args: key (str): The key to check for in the records. Raises: KeyError: If the specified key is not in the expected list of keys for the records. """ if key not in self.record_keys_writable(): raise KeyError( f"Key '{key}' is not in writable record keys: {self.record_keys_writable()}" ) def __dir__(self) -> list[str]: """Extend the default `dir()` output to include configured field like data keys. This enables editor auto-completion and interactive introspection, while hiding the internal `configured_data` dictionary. This ensures the configured field like data values appear like native fields, in line with the base model's attribute behavior. """ base = super().__dir__() keys = set(base) # Expose configured data keys as attributes configured_keys = self.configured_data_keys() if configured_keys is not None: keys.update(configured_keys) # Explicitly hide the 'configured_data' internal dict keys.discard("configured_data") return sorted(keys) def __eq__(self, other: Any) -> bool: """Ensure equality comparison includes the contents of the `configured_data` dict. Contents of the `configured_data` dict are in addition to the base model fields. """ if not isinstance(other, self.__class__): return NotImplemented # Compare all fields except `configured_data` if self.model_dump(exclude={"configured_data"}) != other.model_dump( exclude={"configured_data"} ): return False # Compare `configured_data` explicitly return self.configured_data == other.configured_data def __getitem__(self, key: str) -> Any: """Retrieve the value of a field by key name. Args: key (str): The name of the field to retrieve. Returns: Any: The value of the requested field. Raises: KeyError: If the specified key does not exist. """ try: # Let getattr do the work return self.__getattr__(key) except: raise KeyError(f"'{key}' not found in the record fields.") def __setitem__(self, key: str, value: Any) -> None: """Set the value of a field by key name. Args: key (str): The name of the field to set. value (Any): The value to assign to the field. Raises: KeyError: If the specified key does not exist in the fields. """ try: # Let setattr do the work self.__setattr__(key, value) except: raise KeyError(f"'{key}' is not a recognized field.") def __delitem__(self, key: str) -> None: """Delete the value of a field by key name by setting it to None. Args: key (str): The name of the field to delete. Raises: KeyError: If the specified key does not exist in the fields. """ try: self.__delattr__(key) except: raise KeyError(f"'{key}' is not a recognized field.") def __iter__(self) -> Iterator[str]: """Iterate over the field names in the data record. Returns: Iterator[str]: An iterator over field names. """ return iter(self.record_keys_writable()) def __len__(self) -> int: """Return the number of fields in the data record. Returns: int: The number of defined fields. """ return len(self.record_keys_writable()) def __repr__(self) -> str: """Provide a string representation of the data record. Returns: str: A string representation showing field names and their values. """ field_values = {field: getattr(self, field) for field in self.__class__.model_fields} return f"{self.__class__.__name__}({field_values})" def __getattr__(self, key: str) -> Any: """Dynamic attribute access for fields. Args: key (str): The name of the field to access. Returns: Any: The value of the requested field. Raises: AttributeError: If the field does not exist. """ if key in self.__class__.model_fields: return getattr(self, key) if key in self.configured_data.keys(): return self.configured_data[key] configured_keys = self.configured_data_keys() if configured_keys is not None and key in configured_keys: return None raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{key}'") def __setattr__(self, key: str, value: Any) -> None: """Set attribute values directly if they are recognized fields. Args: key (str): The name of the attribute/field to set. value (Any): The value to assign to the attribute/field. Raises: AttributeError: If the attribute/field does not exist. """ if key in self.__class__.model_fields: super().__setattr__(key, value) return configured_keys = self.configured_data_keys() if configured_keys is not None and key in configured_keys: self.configured_data[key] = value return raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{key}'") def __delattr__(self, key: str) -> None: """Delete an attribute by setting it to None if it exists as a field. Args: key (str): The name of the attribute/field to delete. Raises: AttributeError: If the attribute/field does not exist. """ if key in self.__class__.model_fields: data: Optional[dict] if key == "configured_data": data = dict() else: data = None setattr(self, key, data) return if key in self.configured_data: del self.configured_data[key] return configured_keys = self.configured_data_keys() if configured_keys is not None and key in configured_keys: return super().__delattr__(key) @classmethod def key_from_description(cls, description: str, threshold: float = 0.8) -> Optional[str]: """Returns the attribute key that best matches the provided description. Fuzzy matching is used. Args: description (str): The description text to search for. threshold (float): The minimum ratio for a match (0-1). Default is 0.8. Returns: Optional[str]: The attribute key if a match is found above the threshold, else None. """ if description is None: return None # Get all descriptions from the fields descriptions: dict[str, str] = {} for field_name in cls.model_fields.keys(): desc = cls.field_description(field_name) if desc: descriptions[field_name] = desc # Use difflib to get close matches matches = difflib.get_close_matches( description, descriptions.values(), n=1, cutoff=threshold ) # Check if there is a match if matches: best_match = matches[0] # Return the key that corresponds to the best match for key, desc in descriptions.items(): if desc == best_match: return key return None @classmethod def keys_from_descriptions( cls, descriptions: list[str], threshold: float = 0.8 ) -> list[Optional[str]]: """Returns a list of attribute keys that best matches the provided list of descriptions. Fuzzy matching is used. Args: descriptions (list[str]): A list of description texts to search for. threshold (float): The minimum ratio for a match (0-1). Default is 0.8. Returns: list[Optional[str]]: A list of attribute keys matching the descriptions, with None for unmatched descriptions. """ keys = [] for description in descriptions: key = cls.key_from_description(description, threshold) keys.append(key) return keys # ==================== DataSequence ==================== class DataSequence(DataABC, DatabaseRecordProtocolMixin[DataRecord]): """A managed sequence of DataRecord instances with ltime series behavior. The DataSequence class provides an ordered, mutable collection of DataRecord instances. It also supports advanced data operations such as - JSON serialization, - conversion to Pandas Series, - sorting by timestamp, - and data storage in a database. Attributes: records (list[DataRecord]): A list of DataRecord instances representing individual generic data points. record_keys (Optional[list[str]]): A list of field names (keys) expected in each DataRecord. Invariant: ``self.records`` is always kept sorted in ascending ``date_time`` order whenever it contains any records. Note: Derived classes have to provide their own records field with correct record type set. Usage: .. code-block:: python # Example of creating, adding, and using DataSequence class DerivedSequence(DataSquence): records: list[DerivedDataRecord] = Field(default_factory=list, json_schema_extra={ "description": "List of data records" }) seq = DerivedSequence() seq.insert(DerivedDataRecord(date_time=datetime.now(), temperature=72)) seq.insert(DerivedDataRecord(date_time=datetime.now(), temperature=75)) # Convert to JSON and back json_data = seq.to_json() new_seq = DerivedSequence.from_json(json_data) # Convert to Pandas Series series = seq.key_to_series('temperature') """ # To be overloaded by derived classes. records: list[DataRecord] = Field( default_factory=list, json_schema_extra={"description": "List of data records"} ) # Sequence helpers def _validate_key(self, key: str) -> None: """Verify that a specified key exists in the current record keys. Args: key (str): The key to check for in the records. Raises: KeyError: If the specified key is not in the expected list of keys for the records. """ if key not in self.record_keys: raise KeyError(f"Key '{key}' is not in record keys: {self.record_keys}") def _validate_key_writable(self, key: str) -> None: """Verify that a specified key exists and is writable in the current record keys. Args: key (str): The key to check for in the records. Raises: KeyError: If the specified key is not in the expected list of keys for the records. """ if key not in self.record_keys_writable: raise KeyError( f"Key '{key}' is not in writable record keys: {self.record_keys_writable}" ) def _validate_record(self, value: DataRecord) -> None: """Check if the provided value is a valid DataRecord with compatible keys. Args: value (DataRecord): The record to validate. Raises: ValueError: If the value is not an instance of DataRecord or has an invalid date_time type. KeyError: If the value has different keys from those expected in the sequence. """ # Assure value is of correct type if value.__class__.__name__ != self.record_class().__name__: raise ValueError(f"Value must be an instance of `{self.record_class().__name__}`.") # Assure datetime value can be converted to datetime object value.date_time = to_datetime(value.date_time) # Sequence state # Derived fields (computed) @computed_field # type: ignore[prop-decorator] @property def min_datetime(self) -> Optional[DateTime]: """Minimum (earliest) datetime in the time series sequence of data records. This property computes the earliest datetime from the sequence of data records. If no records are present, it returns `None`. Returns: Optional[DateTime]: The earliest datetime in the sequence, or `None` if no data records exist. """ min_timestamp, _ = self.db_timestamp_range() if min_timestamp is None: return None # Timestamps are in UTC - convert to timezone utc_datetime = DatabaseTimestamp.to_datetime(min_timestamp) return utc_datetime.in_timezone(self.config.general.timezone) @computed_field # type: ignore[prop-decorator] @property def max_datetime(self) -> Optional[DateTime]: """Maximum (latest) datetime in the time series sequence of data records. This property computes the latest datetime from the sequence of data records. If no records are present, it returns `None`. Returns: Optional[DateTime]: The latest datetime in the sequence, or `None` if no data records exist. """ _, max_timestamp = self.db_timestamp_range() if max_timestamp is None: return None # Timestamps are in UTC - convert to timezone utc_datetime = DatabaseTimestamp.to_datetime(max_timestamp) return utc_datetime.in_timezone(self.config.general.timezone) @computed_field # type: ignore[prop-decorator] @property def record_keys(self) -> list[str]: """Returns the keys of all fields in the data records.""" return self.record_class().record_keys() @computed_field # type: ignore[prop-decorator] @property def record_keys_writable(self) -> list[str]: """Get the keys of all writable fields in the data records. This property retrieves the keys of all fields in the data records that can be written to. It uses the `record_class` to determine the model's field structure. Returns: list[str]: A list of field keys that are writable in the data records. """ return self.record_class().record_keys_writable() @classmethod def record_class(cls) -> Type: """Get the class of the data record handled by this data sequence. This method determines the class of the data record type associated with the `records` field of the model. The field is expected to be a list, and the element type of the list should be a subclass of `DataRecord`. Raises: ValueError: If the record type is not a subclass of `DataRecord`. Returns: Type: The class of the data record handled by the data sequence. """ # Access the model field metadata field_info = cls.model_fields["records"] # Get the list element type from the 'type_' attribute list_element_type = get_args(field_info.annotation)[0] if not isinstance(list_element_type(), DataRecord): raise ValueError( f"Data record must be an instance of DataRecord: '{list_element_type}'." ) return list_element_type @classmethod def from_dict(cls, data: dict) -> "DataSequence": """Reconstruct a sequence from its serialized dictionary form. Fully subclass-safe and invariant-safe. """ if not isinstance(data, dict): raise TypeError("from_dict() expects a dictionary") records_data = data.get("records", []) if not isinstance(records_data, list): raise ValueError("'records' must be a list") # Create empty instance of *actual class* sequence = cls() # Rebuild records using the sequence's record model record_model = sequence.record_class() for record_dict in records_data: if not isinstance(record_dict, dict): raise ValueError("Each record must be a dictionary") record = record_model(**record_dict) # Important: use insert_by_datetime to rebuild invariants sequence.insert_by_datetime(record) return sequence def __len__(self) -> int: """Get total number of DataRecords in sequence (DB + memory-only).""" return self.db_count_records() def __repr__(self) -> str: """Provide a string representation of the DataSequence. Returns: str: A string representation of the DataSequence. """ return f"{self.__class__.__name__}([{', '.join(repr(record) for record in self.records)}])" # Sequence methods def __iter__(self) -> Iterator[DataRecord]: """Create an iterator for accessing DataRecords sequentially. Returns: Iterator[DataRecord]: An iterator for the records. """ return iter(self.records) def get_by_datetime( self, target_datetime: DateTime, *, time_window: Optional[Duration] = None ) -> Optional[DataRecord]: """Get the record at the specified datetime, with an optional fallback search window. Args: target_datetime: The datetime to search for. time_window: Optional total width of the symmetric search window centered on ``target_datetime``. If provided and no exact match exists, the nearest record within this window is returned. Returns: The matching DataRecord, the nearest DataRecord within the specified time window if no exact match exists, or ``None`` if no suitable record is found. """ # Ensure datetime objects are normalized db_target = DatabaseTimestamp.from_datetime(target_datetime) return self.db_get_record(db_target, time_window=time_window) def get_nearest_by_datetime( self, target_datetime: DateTime, time_window: Optional[Duration] = None ) -> Optional[DataRecord]: """Get the record nearest to the specified datetime within an optional time window. Args: target_datetime: The datetime to search near. time_window: Total width of the symmetric search window centered on ``target_datetime``. If ``None``, searches all records. Returns: The nearest DataRecord within the specified time window, or ``None`` if no records exist or no records fall within the window. Raises: ValueError: If ``time_window`` is negative. """ # Ensure datetime objects are normalized db_target = DatabaseTimestamp.from_datetime(target_datetime) if time_window is None: twin: DatabaseTimeWindowType = UNBOUND_WINDOW else: twin = time_window return self.db_get_record(db_target, time_window=twin) def insert_by_datetime(self, record: DataRecord) -> None: """Insert or merge a DataRecord into the sequence based on its date. If a record with the same date exists, merges new data fields with the existing record. Otherwise, appends the record and maintains chronological order. Args: record (DataRecord): The record to add or merge. Note: record.date_time shall be a DateTime or None """ self._validate_record(record) # Ensure datetime objects are normalized record_date_time_timestamp = DatabaseTimestamp.from_datetime(record.date_time) avail_record = self.db_get_record(record_date_time_timestamp) if avail_record: # Merge values, only updating fields where data record has a non-None value for field, val in record.model_dump(exclude_unset=True).items(): if field in record.record_keys_writable(): setattr(avail_record, field, val) self.db_mark_dirty_record(record) else: self.db_insert_record(record) @overload def update_value(self, date: DateTime, key: str, value: Any) -> None: ... @overload def update_value(self, date: DateTime, values: Dict[str, Any]) -> None: ... def update_value(self, date: DateTime, *args: Any, **kwargs: Any) -> None: """Updates specific values in the data record for a given date. If a record for the date exists, updates the specified attributes with the new values. Otherwise, appends a new record with the given values and maintains chronological order. Args: date (datetime): The date for which the values are to be added or updated. key (str), value (Any): Single key-value pair to update OR values (Dict[str, Any]): Dictionary of key-value pairs to update OR **kwargs: Key-value pairs as keyword arguments Examples: .. code-block:: python update_value(date, 'temperature', 25.5) update_value(date, {'temperature': 25.5, 'humidity': 80}) update_value(date, temperature=25.5, humidity=80) """ # Process input arguments into a dictionary values: Dict[str, Any] = {} if len(args) == 2: # Single key-value pair values[args[0]] = args[1] elif len(args) == 1 and isinstance(args[0], dict): # Dictionary input values.update(args[0]) elif len(args) > 0: # Invalid number of arguments raise ValueError("Expected either 2 arguments (key, value) or 1 dictionary argument") values.update(kwargs) # Add any keyword arguments # Validate all keys are writable for key in values: self._validate_key_writable(key) # Ensure datetime objects are normalized db_target = DatabaseTimestamp.from_datetime(date) # Check if a record with the given date already exists record = self.db_get_record(db_target) if record is None: # Create a new record and append to the list new_record = self.record_class()(date_time=date, **values) self.db_insert_record(new_record) else: # Update the DataRecord with all new values for key, value in values.items(): setattr(record, key, value) self.db_mark_dirty_record(record) def key_to_dict( self, key: str, start_datetime: Optional[DateTime] = None, end_datetime: Optional[DateTime] = None, dropna: Optional[bool] = None, ) -> Dict[DateTime, Any]: """Extract a dictionary indexed by the date_time field of the DataRecords. The dictionary will contain values extracted from the specified key attribute of each DataRecord, using the date_time field as the key. Args: key (str): The field name in the DataRecord from which to extract values. start_datetime (datetime, optional): The start date to filter records (inclusive). end_datetime (datetime, optional): The end date to filter records (exclusive). dropna: (bool, optional): Whether to drop NAN/ None values before processing. Defaults to True. Returns: Dict[datetime, Any]: A dictionary with the date_time of each record as the key and the values extracted from the specified key. Raises: KeyError: If the specified key is not found in any of the DataRecords. """ self._validate_key(key) # Ensure datetime objects are normalized start_timestamp = ( DatabaseTimestamp.from_datetime(start_datetime) if start_datetime else None ) end_timestamp = DatabaseTimestamp.from_datetime(end_datetime) if end_datetime else None # Create a dictionary to hold date_time and corresponding values if dropna is None: dropna = True filtered_data = {} for record in self.db_iterate_records(start_timestamp, end_timestamp): if ( record.date_time is None or (dropna and getattr(record, key, None) is None) or (dropna and getattr(record, key, None) == float("nan")) ): continue record_date_time_timestamp = DatabaseTimestamp.from_datetime(record.date_time) if (start_timestamp is None or record_date_time_timestamp >= start_timestamp) and ( end_timestamp is None or record_date_time_timestamp < end_timestamp ): filtered_data[to_datetime(record.date_time, as_string=True)] = getattr( record, key, None ) return filtered_data def key_to_value( self, key: str, target_datetime: DateTime, time_window: Optional[Duration] = None ) -> Optional[float]: """Returns the value corresponding to the specified key that is nearest to the given datetime. Args: key (str): The key of the attribute in DataRecord to extract. target_datetime (datetime): The datetime to search for. time_window: Optional total width of the symmetric search window centered on ``target_datetime``. If provided and no exact match exists, the nearest record within this window is returned. Returns: Optional[float]: The value nearest to the given datetime, or None if no valid records are found. Raises: KeyError: If the specified key is not found in any of the DataRecords. """ self._validate_key(key) # Ensure datetime objects are normalized db_target = DatabaseTimestamp.from_datetime(to_datetime(target_datetime)) record = self.db_get_record(db_target, time_window=time_window) return getattr(record, key, None) def key_to_lists( self, key: str, start_datetime: Optional[DateTime] = None, end_datetime: Optional[DateTime] = None, dropna: Optional[bool] = None, ) -> Tuple[list[DateTime], list[Optional[float]]]: """Extracts two lists from data records within an optional date range. The lists are: Dates: List of datetime elements. Values: List of values corresponding to the specified key in the data records. Args: key (str): The key of the attribute in DataRecord to extract. start_datetime (datetime, optional): The start date for filtering the records (inclusive). end_datetime (datetime, optional): The end date for filtering the records (exclusive). dropna: (bool, optional): Whether to drop NAN/ None values before processing. Defaults to True. Returns: tuple: A tuple containing a list of datetime values and a list of extracted values. Raises: KeyError: If the specified key is not found in any of the DataRecords. """ self._validate_key(key) # Ensure datetime objects are normalized start_timestamp = ( DatabaseTimestamp.from_datetime(start_datetime) if start_datetime else None ) end_timestamp = DatabaseTimestamp.from_datetime(end_datetime) if end_datetime else None # Create two lists to hold date_time and corresponding values if dropna is None: dropna = True filtered_records = [] for record in self.db_iterate_records(start_timestamp, end_timestamp): if ( record.date_time is None or (dropna and getattr(record, key, None) is None) or (dropna and getattr(record, key, None) == float("nan")) ): continue record_date_time_timestamp = DatabaseTimestamp.from_datetime(record.date_time) if (start_timestamp is None or record_date_time_timestamp >= start_timestamp) and ( end_timestamp is None or record_date_time_timestamp < end_timestamp ): filtered_records.append(record) dates = [record.date_time for record in filtered_records] values = [getattr(record, key, None) for record in filtered_records] return dates, values def key_from_lists(self, key: str, dates: list[DateTime], values: list[float]) -> None: """Update the DataSequence from lists of datetime and value elements. The dates list should represent the date_time of each DataRecord, and the values list should represent the corresponding data values for the specified key. The list must be ordered starting with the oldest date. Args: key (str): The field name in the DataRecord that corresponds to the values in the Series. dates: List of datetime elements. values: List of values corresponding to the specified key in the data records. """ self._validate_key_writable(key) for i, date_time in enumerate(dates): # Ensure datetime objects are normalized db_target = DatabaseTimestamp.from_datetime(date_time) # Check if there's an existing record for this date_time avail_record = self.db_get_record(db_target) if avail_record is None: # Create a new DataRecord if none exists new_record = self.record_class()(date_time=date_time, **{key: values[i]}) self.db_insert_record(new_record) else: # Update existing record's specified key setattr(avail_record, key, values[i]) self.db_mark_dirty_record(avail_record) def key_to_series( self, key: str, start_datetime: Optional[DateTime] = None, end_datetime: Optional[DateTime] = None, dropna: Optional[bool] = None, ) -> pd.Series: """Extract a series indexed by the date_time field from data records within an optional date range. Args: key (str): The field name in the DataRecord from which to extract values. start_datetime (datetime, optional): The start date for filtering the records (inclusive). end_datetime (datetime, optional): The end date for filtering the records (exclusive). dropna: (bool, optional): Whether to drop NAN/ None values before processing. Defaults to True. Returns: pd.Series: A Pandas Series with the index as the date_time of each record and the values extracted from the specified key. Raises: KeyError: If the specified key is not found in any of the DataRecords. """ dates, values = self.key_to_lists( key=key, start_datetime=start_datetime, end_datetime=end_datetime, dropna=dropna ) series = pd.Series(data=values, index=pd.DatetimeIndex(dates), name=key) return series def key_from_series(self, key: str, series: pd.Series) -> None: """Update the DataSequence from a Pandas Series. The series index should represent the date_time of each DataRecord, and the series values should represent the corresponding data values for the specified key. Args: series (pd.Series): A Pandas Series containing data to update the DataSequence. key (str): The field name in the DataRecord that corresponds to the values in the Series. """ self._validate_key_writable(key) for date_time, value in series.items(): # Ensure datetime objects are normalized db_target = DatabaseTimestamp.from_datetime(to_datetime(date_time)) # Check if there's an existing record for this date_time avail_record = self.db_get_record(db_target) if avail_record is None: # Create a new DataRecord if none exists new_record = self.record_class()(date_time=date_time, **{key: value}) self.db_insert_record(new_record) else: # Update existing record's specified key setattr(avail_record, key, value) self.db_mark_dirty_record(avail_record) def key_to_array( self, key: str, start_datetime: Optional[DateTime] = None, end_datetime: Optional[DateTime] = None, interval: Optional[Duration] = None, fill_method: Optional[str] = None, dropna: Optional[bool] = True, boundary: Literal["strict", "context"] = "context", align_to_interval: bool = False, ) -> NDArray[Shape["*"], Any]: """Extract an array indexed by fixed time intervals from data records within an optional date range. Args: key (str): The field name in the DataRecord from which to extract values. start_datetime (datetime, optional): The start date for filtering the records (inclusive). end_datetime (datetime, optional): The end date for filtering the records (exclusive). interval (duration, optional): The fixed time interval. Defaults to 1 hour. fill_method (str): Method to handle missing values during resampling. - 'linear': Linearly interpolate missing values (for numeric data only). - 'time': Interpolate missing values (for numeric data only). - 'ffill': Forward fill missing values. - 'bfill': Backward fill missing values. - 'none': Defaults to 'linear' for numeric values, otherwise 'ffill'. dropna: (bool, optional): Whether to drop NAN/ None values before processing. Defaults to True. boundary (Literal["strict", "context"]): "strict" → only values inside [start, end) "context" → include one value before and after for proper resampling align_to_interval (bool): When True, snap the resample origin to the nearest UTC epoch-aligned boundary of ``interval`` before resampling. This ensures that bucket timestamps always fall on wall-clock-round times regardless of when ``start_datetime`` falls: - 15-minute interval → buckets on :00, :15, :30, :45 - 1-hour interval → buckets on the hour When False (default), the origin is ``query_start`` (or ``"start_day"`` when no start is given), preserving the existing behaviour where buckets are aligned to the query window rather than the clock. Set to True when storing compacted records back to the database so that the resulting timestamps are predictable and human-readable. Leave False for forecast or reporting queries where alignment to the exact query window is more important than clock-round boundaries. Returns: np.ndarray: A NumPy Array of the values at the chosen frequency extracted from the specified key. Raises: KeyError: If the specified key is not found in any of the DataRecords. """ self._validate_key(key) # Validate fill method if fill_method not in ("ffill", "bfill", "linear", "time", "none", None): raise ValueError(f"Unsupported fill method: {fill_method}") if boundary not in ("strict", "context"): raise ValueError(f"Unsupported boundary mode: {boundary}") # Ensure datetime objects are normalized start_datetime = to_datetime(start_datetime, to_maxtime=False) if start_datetime else None end_datetime = to_datetime(end_datetime, to_maxtime=False) if end_datetime else None if interval is None: interval = to_duration("1 hour") resample_freq = "1h" else: resample_freq = to_duration(interval, as_string="pandas") # Extend window for context resampling query_start = start_datetime query_end = end_datetime if boundary == "context": # include one timestamp before and after for proper resampling if query_start is not None: # We have a start datetime - look for previous entry start_timestamp = DatabaseTimestamp.from_datetime(query_start) query_start_timestamp = self.db_previous_timestamp(start_timestamp) if query_start_timestamp: query_start = DatabaseTimestamp.to_datetime(query_start_timestamp) if end_datetime is not None: # We have a end datetime - look for next entry end_timestamp = DatabaseTimestamp.from_datetime(query_end) query_end_timestamp = self.db_next_timestamp(end_timestamp) if query_end_timestamp is None: # Ensure at least end_datetime is included (excluded by definition) query_end = end_datetime.add(seconds=1) else: query_end = DatabaseTimestamp.to_datetime(query_end_timestamp).add(seconds=1) # Load raw lists (already sorted & filtered) dates, values = self.key_to_lists( key=key, start_datetime=query_start, end_datetime=query_end, dropna=dropna ) values_len = len(values) # Bring lists into shape if values_len < 1: # No values, assume at least one value set to None if query_start is not None: dates.append(query_start - interval) else: dates.append(to_datetime(to_maxtime=False)) values.append(None) if query_start is not None: start_index = 0 while start_index < values_len: if compare_datetimes(dates[start_index], query_start).ge: break start_index += 1 if start_index == 0: # No value before start # Add dummy value dates.insert(0, query_start - interval) values.insert(0, values[0]) elif start_index > 1: # Truncate all values before latest value before query_start dates = dates[start_index - 1 :] values = values[start_index - 1 :] # Determine resample origin if align_to_interval: # Snap to nearest UTC epoch-aligned floor of the interval so that bucket # timestamps land on wall-clock-round boundaries (:00, :15, :30, :45 etc.) # regardless of sub-second jitter in query_start. interval_sec = int(interval.total_seconds()) if interval_sec > 0: start_epoch = int(query_start.timestamp()) floored_epoch = (start_epoch // interval_sec) * interval_sec resample_origin: Union[str, pd.Timestamp] = pd.Timestamp( floored_epoch, unit="s", tz="UTC" ) else: resample_origin = query_start else: # Original behaviour: align to the query window start. resample_origin = query_start else: # We do not have a query_start, align resample buckets to midnight of first day resample_origin = "start_day" if query_end is not None: if compare_datetimes(dates[-1], query_end).lt: # Add dummy value at query_end dates.append(query_end) values.append(values[-1]) # Construct series index = pd.to_datetime(dates, utc=True) series = pd.Series(values, index=index, name=key) if series.index.inferred_type != "datetime64": raise TypeError( f"Expected DatetimeIndex, but got {type(series.index)} " f"infered to {series.index.inferred_type}: {series}" ) # Check for numeric values numeric = pd.to_numeric(series.dropna(), errors="coerce") is_numeric = numeric.notna().all() # Determine default fill method depending on dtype if fill_method is None: if is_numeric: fill_method = "time" else: fill_method = "ffill" # Perform the resampling if is_numeric: # Step 1: aggregate — collapses sub-interval data (e.g. 4x 15min → 1h mean). # Produces NaN for buckets where no data existed at all. resampled = pd.to_numeric( series.resample(resample_freq, origin=resample_origin).mean(), errors="coerce", # ← ensures float64, not object dtype ) # Step 2: fill gaps — interpolates or fills the NaN buckets from step 1. if fill_method in ("linear", "time"): # Both are equivalent post-resample (equally-spaced index), # but 'time' is kept as the label for clarity. resampled = resampled.interpolate("time") elif fill_method == "ffill": resampled = resampled.ffill() elif fill_method == "bfill": resampled = resampled.bfill() # fill_method == "none": leave NaNs in place else: resampled = series.resample(resample_freq, origin=resample_origin).first() if fill_method == "ffill": resampled = resampled.ffill() elif fill_method == "bfill": resampled = resampled.bfill() logger.debug( "Resampled for '{}' with length {}: {}...{}", key, len(resampled), resampled[:10], resampled[-10:], ) # Convert the resampled series to a NumPy array if start_datetime is not None and len(resampled) > 0: resampled = resampled.truncate(before=start_datetime) if end_datetime is not None and len(resampled) > 0: resampled = resampled.truncate(after=end_datetime.subtract(seconds=1)) array = resampled.values # Convert NaN to None if there are actually NaNs if ( isinstance(array, np.ndarray) and np.issubdtype(array.dtype.type, np.floating) and pd.isna(array).any() ): array = array.astype(object) array[pd.isna(array)] = None logger.debug( "Array for '{}' with length {}: {}...{}", key, len(array), array[:10], array[-10:] ) return array def to_dataframe( self, start_datetime: Optional[DateTime] = None, end_datetime: Optional[DateTime] = None, ) -> pd.DataFrame: """Converts the sequence of DataRecord instances into a Pandas DataFrame. Args: start_datetime (Optional[datetime]): The lower bound for filtering (inclusive). Defaults to the earliest possible datetime if None. end_datetime (Optional[datetime]): The upper bound for filtering (exclusive). Defaults to the latest possible datetime if None. Returns: pd.DataFrame: A DataFrame containing the filtered data from all records. """ if not self.records: return pd.DataFrame() # Return empty DataFrame if no records exist # Ensure datetime objects are normalized start_timestamp = ( DatabaseTimestamp.from_datetime(start_datetime) if start_datetime else None ) end_timestamp = DatabaseTimestamp.from_datetime(end_datetime) if end_datetime else None # Convert filtered records to a dictionary list data = [ record.model_dump() for record in self.db_iterate_records( start_timestamp=start_timestamp, end_timestamp=end_timestamp ) ] # Convert to DataFrame df = pd.DataFrame(data) if df.empty: return df # Ensure `date_time` column exists and use it for the index if not "date_time" in df.columns: error_msg = f"Cannot create dataframe: no `date_time` column in `{df}`." logger.error(error_msg) raise TypeError(error_msg) df.index = pd.DatetimeIndex(df["date_time"]) return df def delete_by_datetime( self, start_datetime: Optional[DateTime] = None, end_datetime: Optional[DateTime] = None, ) -> int: """Delete records in the given datetime range. Deletes records from memory and, if database storage is enabled, from the database. Returns the maximum of in-memory and database deletions. Args: start_datetime: Start datetime (inclusive) end_datetime: End datetime (exclusive) Returns: Number of records deleted (max of memory and database deletions) """ # Ensure datetime objects are normalized start_timestamp = ( DatabaseTimestamp.from_datetime(start_datetime) if start_datetime else None ) end_timestamp = DatabaseTimestamp.from_datetime(end_datetime) if end_datetime else None return self.db_delete_records(start_timestamp=start_timestamp, end_timestamp=end_timestamp) def key_delete_by_datetime( self, key: str, start_datetime: Optional[DateTime] = None, end_datetime: Optional[DateTime] = None, ) -> None: """Delete an attribute specified by `key` from records in the sequence within a given datetime range. This method removes the attribute identified by `key` from records that have a `date_time` value falling within the specified `start_datetime` (inclusive) and `end_datetime` (exclusive) range. - If only `start_datetime` is specified, attributes will be removed from records from that date onward. - If only `end_datetime` is specified, attributes will be removed from records up to that date. - If neither `start_datetime` nor `end_datetime` is given, the attribute will be removed from all records. Args: key (str): The attribute name to delete from each record. start_datetime (datetime, optional): The start datetime to begin attribute deletion (inclusive). end_datetime (datetime, optional): The end datetime to stop attribute deletion (exclusive). Raises: KeyError: If `key` is not a valid attribute of the records. """ self._validate_key_writable(key) # Ensure datetime objects are normalized start_timestamp = ( DatabaseTimestamp.from_datetime(start_datetime) if start_datetime else None ) end_timestamp = DatabaseTimestamp.from_datetime(end_datetime) if end_datetime else None for record in self.db_iterate_records(start_timestamp, end_timestamp): del record[key] self.db_mark_dirty_record(record) def save(self) -> bool: """Save data records to persistent storage. Returns: True in case the data records were saved, False otherwise. """ if not self.db_enabled: return False saved = self.db_save_records() return saved > 0 def load(self) -> bool: """Load data records from from persistent storage. Returns: True in case the data records were loaded, False otherwise. """ if not self.db_enabled: return False loaded = self.db_load_records() return loaded > 0 # ----------------------- DataSequence Database Protocol --------------------- # Required interface propagated to derived class. # - db_keep_duration # - db_namespace # ==================== DataProvider ==================== class DataProvider(SingletonMixin, DataSequence): """Abstract base class for data providers with singleton thread-safety and configurable data parameters. This class serves as a base for managing generic data, providing an interface for derived classes to maintain a single instance across threads. It offers attributes for managing data and historical data retention. Note: Derived classes have to provide their own records field with correct record type set. """ update_datetime: Optional[AwareDatetime] = Field( None, json_schema_extra={"description": "Latest update datetime for generic data"} ) @abstractmethod def provider_id(self) -> str: """Return the unique identifier for the data provider. To be implemented by derived classes. """ return "DataProvider" @abstractmethod def enabled(self) -> bool: """Return True if the provider is enabled according to configuration. To be implemented by derived classes. """ raise NotImplementedError() @abstractmethod def _update_data(self, force_update: Optional[bool] = False) -> None: """Abstract method for custom data update logic, to be implemented by derived classes. Args: force_update (bool, optional): If True, forces the provider to update the data even if still cached. """ pass def __init__(self, *args: Any, **kwargs: Any) -> None: if hasattr(self, "_initialized"): return super().__init__(*args, **kwargs) def db_namespace(self) -> str: """Namespace of database.""" return self.provider_id() def update_data( self, force_enable: Optional[bool] = False, force_update: Optional[bool] = False, ) -> None: """Calls the custom update function if enabled or forced. Args: force_enable (bool, optional): If True, forces the update even if the provider is disabled. force_update (bool, optional): If True, forces the provider to update the data even if still cached. """ # Check after configuration is updated. if not force_enable and not self.enabled(): return # Call the custom update logic self._update_data(force_update=force_update) # ==================== DataImportMixin ==================== class DataImportMixin(StartMixin): """Mixin class for import of generic data. This class is designed to handle generic data provided in the form of a key-value dictionary. - **Keys**: Represent identifiers from the record keys of a specific data. - **Values**: Are lists of data values starting at a specified start_datetime, where each value corresponds to a subsequent time interval (e.g., hourly). Two special keys are handled. start_datetime may be used to defined the starting datetime of the values. ìnterval may be used to define the fixed time interval between two values. On import self.update_value(datetime, key, value) is called which has to be provided. Also self.ems_start_datetime may be necessary as a default in case start_datetime is not given. """ # Attributes required but defined elsehere. # - start_datetime # - record_keys_writable # - update_value def import_from_dict( self, import_data: dict, key_prefix: str = "", start_datetime: Optional[DateTime] = None, interval: Optional[Duration] = None, ) -> None: """Updates generic data by importing it from a dictionary. This method reads generic data from a dictionary, matches keys based on the record keys and the provided `key_prefix`, and updates the data values sequentially. All value lists must have the same length. Args: import_data (dict): Dictionary containing the generic data with optional 'start_datetime' and 'interval' keys. key_prefix (str, optional): A prefix to filter relevant keys from the generic data. Only keys starting with this prefix will be considered. Defaults to an empty string. start_datetime (DateTime, optional): Start datetime of values if not in dict. interval (Duration, optional): The fixed time interval if not in dict. Raises: ValueError: If value lists have different lengths or if datetime conversion fails. """ # Handle datetime and interval from dict or parameters if "start_datetime" in import_data: try: start_datetime = to_datetime(import_data["start_datetime"]) except (ValueError, TypeError) as e: raise ValueError(f"Invalid start_datetime in import data: {e}") if start_datetime is None: start_datetime = self.ems_start_datetime if "interval" in import_data: try: interval = to_duration(import_data["interval"]) except (ValueError, TypeError) as e: raise ValueError(f"Invalid interval in import data: {e}") if interval is None: interval = to_duration("1 hour") interval_steps_per_hour = int(3600 / interval.total_seconds()) if interval.total_seconds() * interval_steps_per_hour != 3600: error_msg = f"Interval {interval} does not fit into hour." logger.error(error_msg) raise NotImplementedError(error_msg) # Filter keys based on key_prefix and record_keys_writable valid_keys = [ key for key in import_data.keys() if key.startswith(key_prefix) and key in self.record_keys_writable # type: ignore and key not in ("start_datetime", "interval") ] if not valid_keys: return # Validate all value lists have the same length value_lengths = [] for key in valid_keys: value_list = import_data[key] if not isinstance(value_list, (list, tuple, np.ndarray)): raise ValueError(f"Value for key '{key}' must be a list, tuple, or array") value_lengths.append(len(value_list)) if len(set(value_lengths)) > 1: raise ValueError( f"All value lists must have the same length. Found lengths: " f"{dict(zip(valid_keys, value_lengths))}" ) values_count = value_lengths[0] # Process each valid key start_timestamp = DatabaseTimestamp.from_datetime(start_datetime) for key in valid_keys: try: values = import_data[key] # Update values, skipping any None/NaN for value_index, value_db_datetime in enumerate( self.db_generate_timestamps(start_timestamp, values_count, interval) # type: ignore[attr-defined] ): value = values[value_index] value_datetime = DatabaseTimestamp.to_datetime(value_db_datetime) if value is not None and not pd.isna(value): self.update_value(value_datetime, key, value) # type: ignore except (IndexError, TypeError) as e: raise ValueError(f"Error processing values for key '{key}': {e}") def import_from_dataframe( self, df: pd.DataFrame, key_prefix: str = "", start_datetime: Optional[DateTime] = None, interval: Optional[Duration] = None, ) -> None: """Updates generic data by importing it from a pandas DataFrame. This method reads generic data from a DataFrame, matches columns based on the record keys and the provided `key_prefix`, and updates the data values using the DataFrame's index as timestamps. Args: df (pd.DataFrame): DataFrame containing the generic data with datetime index or sequential values. key_prefix (str, optional): A prefix to filter relevant columns from the DataFrame. Only columns starting with this prefix will be considered. Defaults to an empty string. start_datetime (DateTime, optional): Start datetime if DataFrame doesn't have datetime index. interval (Duration, optional): The fixed time interval if DataFrame doesn't have datetime index. Raises: ValueError: If DataFrame structure is invalid or datetime conversion fails. """ # Validate DataFrame if not isinstance(df, pd.DataFrame): raise ValueError("Input must be a pandas DataFrame") # Handle datetime index if isinstance(df.index, pd.DatetimeIndex): try: index_datetimes = [to_datetime(dt) for dt in df.index] has_datetime_index = True except (ValueError, TypeError) as e: raise ValueError(f"Invalid datetime index in DataFrame: {e}") else: if start_datetime is None: start_datetime = self.ems_start_datetime has_datetime_index = False # Filter columns based on key_prefix and record_keys_writable valid_columns = [ col for col in df.columns if col.startswith(key_prefix) and col in self.record_keys_writable # type: ignore ] if not valid_columns: return # For DataFrame, length validation is implicit since all columns have same length values_count = len(df) # Generate value_datetime_mapping once if not using datetime index if not has_datetime_index: # Create values datetime list start_timestamp = DatabaseTimestamp.from_datetime(start_datetime) value_db_datetimes = list( self.db_generate_timestamps(start_timestamp, values_count, interval) # type: ignore[attr-defined] ) # Process each valid column for column in valid_columns: try: values = df[column].tolist() if has_datetime_index: # Use the DataFrame's datetime index for dt, value in zip(index_datetimes, values): if value is not None and not pd.isna(value): self.update_value(dt, column, value) # type: ignore else: # Use the pre-generated datetime index for value_index in range(values_count): value = values[value_index] value_datetime = DatabaseTimestamp.to_datetime( value_db_datetimes[value_index] ) if value is not None and not pd.isna(value): self.update_value(value_datetime, column, value) # type: ignore except Exception as e: raise ValueError(f"Error processing column '{column}': {e}") def import_from_json( self, json_str: str, key_prefix: str = "", start_datetime: Optional[DateTime] = None, interval: Optional[Duration] = None, ) -> None: """Updates generic data by importing it from a JSON string. This method reads generic data from a JSON string, matches keys based on the record keys and the provided `key_prefix`, and updates the data values sequentially, starting from the `start_datetime`. If start_datetime and or interval is given in the JSON dict it will be used. Otherwise the given parameters are used. If None is given start_datetime defaults to 'self.ems_start_datetime' and interval defaults to 1 hour. Args: json_str (str): The JSON string containing the generic data. key_prefix (str, optional): A prefix to filter relevant keys from the generic data. Only keys starting with this prefix will be considered. Defaults to an empty string. start_datetime (DateTime, optional): Start datetime of values. interval (duration, optional): The fixed time interval. Defaults to 1 hour. Raises: JSONDecodeError: If the file content is not valid JSON. Example: Given a JSON string with the following content and `key_prefix = "load"`, only the "loadforecast_power_w" key will be processed even though both keys are in the record. .. code-block:: json { "start_datetime": "2024-11-10 00:00:00", "interval": "30 minutes", "loadforecast_power_w": [20.5, 21.0, 22.1], "other_xyz: [10.5, 11.0, 12.1] } """ # Strip quotes if provided - does not effect unquoted string json_str = json_str.strip() # strip white space at start and end if (json_str.startswith("'") and json_str.endswith("'")) or ( json_str.startswith('"') and json_str.endswith('"') ): json_str = json_str[1:-1] # strip outer quotes json_str = json_str.strip() # strip remaining white space at start and end # Try pandas dataframe with orient="split" try: import_data = PydanticDateTimeDataFrame.model_validate_json(json_str) self.import_from_dataframe(import_data.to_dataframe()) return except ValidationError as e: error_msg = "" for error in e.errors(): field = " -> ".join(str(x) for x in error["loc"]) message = error["msg"] error_type = error["type"] error_msg += f"Field: {field}\nError: {message}\nType: {error_type}\n" logger.debug(f"PydanticDateTimeDataFrame import: {error_msg}") # Try dictionary with special keys start_datetime and interval try: import_data = PydanticDateTimeData.model_validate_json(json_str) self.import_from_dict(import_data.to_dict()) return except ValidationError as e: error_msg = "" for error in e.errors(): field = " -> ".join(str(x) for x in error["loc"]) message = error["msg"] error_type = error["type"] error_msg += f"Field: {field}\nError: {message}\nType: {error_type}\n" logger.debug(f"PydanticDateTimeData import: {error_msg}") # Use simple dict format try: import_data = json.loads(json_str) self.import_from_dict( import_data, key_prefix=key_prefix, start_datetime=start_datetime, interval=interval ) except Exception as e: error_msg = f"Invalid JSON string '{json_str}': {e}" logger.debug(error_msg) raise ValueError(error_msg) from e def import_from_file( self, import_file_path: Path, key_prefix: str = "", start_datetime: Optional[DateTime] = None, interval: Optional[Duration] = None, ) -> None: """Updates generic data by importing it from a file. This method reads generic data from a JSON file, matches keys based on the record keys and the provided `key_prefix`, and updates the data values sequentially, starting from the `start_datetime`. Each data value is associated with an hourly interval. If start_datetime and or interval is given in the JSON dict it will be used. Otherwise the given parameters are used. If None is given start_datetime defaults to 'self.ems_start_datetime' and interval defaults to 1 hour. Args: import_file_path (Path): The path to the JSON file containing the generic data. key_prefix (str, optional): A prefix to filter relevant keys from the generic data. Only keys starting with this prefix will be considered. Defaults to an empty string. start_datetime (DateTime, optional): Start datetime of values. interval (duration, optional): The fixed time interval. Defaults to 1 hour. Raises: FileNotFoundError: If the specified file does not exist. JSONDecodeError: If the file content is not valid JSON. Example: Given a JSON file with the following content and `key_prefix = "load"`, only the "loadforecast_power_w" key will be processed even though both keys are in the record. .. code-block:: json { "loadforecast_power_w": [20.5, 21.0, 22.1], "other_xyz: [10.5, 11.0, 12.1], } """ with import_file_path.open("r", encoding="utf-8", newline=None) as import_file: import_str = import_file.read() self.import_from_json( import_str, key_prefix=key_prefix, start_datetime=start_datetime, interval=interval ) # ==================== DataImportProvider ==================== class DataImportProvider(DataImportMixin, DataProvider): """Abstract base class for data providers that import generic data. This class is designed to handle generic data provided in the form of a key-value dictionary. - **Keys**: Represent identifiers from the record keys of a specific data. - **Values**: Are lists of data values starting at a specified `start_datetime`, where each value corresponds to a subsequent time interval (e.g., hourly). Subclasses must implement the logic for managing generic data based on the imported records. """ pass # ==================== DataContainer ==================== class DataContainer(SingletonMixin, DataABC, MutableMapping): """A container for managing multiple DataProvider instances. This class enables access to data from multiple data providers, supporting retrieval and aggregation of their data as Pandas Series objects. It acts as a dictionary-like structure where each key represents a specific data field, and the value is a Pandas Series containing combined data from all DataProvider instances for that key. Note: Derived classes have to provide their own providers field with correct provider type set. """ # To be overloaded by derived classes. providers: list[DataProvider] = Field( default_factory=list, json_schema_extra={"description": "List of data providers"} ) @field_validator("providers", mode="after") def check_providers(cls, value: list[DataProvider]) -> list[DataProvider]: # Check each item in the list for item in value: if not isinstance(item, DataProvider): raise TypeError( f"Each item in the providers list must be a DataProvider, got {type(item).__name__}" ) return value @property def enabled_providers(self) -> list[Any]: """List of providers that are currently enabled.""" enab = [] for provider in self.providers: if provider.enabled(): enab.append(provider) return enab @property def record_keys(self) -> list[str]: """Returns the keys of all fields in the data records of all enabled providers.""" key_set = set( chain.from_iterable(provider.record_keys for provider in self.enabled_providers) ) return list(key_set) @property def record_keys_writable(self) -> list[str]: """Returns the keys of all fields in the data records that are writable of all enabled providers.""" key_set = set( chain.from_iterable( provider.record_keys_writable for provider in self.enabled_providers ) ) return list(key_set) def __init__(self, *args: Any, **kwargs: Any) -> None: if hasattr(self, "_initialized"): return super().__init__(*args, **kwargs) def __getitem__(self, key: str) -> pd.Series: """Retrieve a Pandas Series for a specified key from the data in each DataProvider. Iterates through providers to find and return the first available Series for the specified key. Args: key (str): The field name to retrieve, representing a data attribute in DataRecords. Returns: pd.Series: A Pandas Series containing aggregated data for the specified key. Raises: KeyError: If no provider contains data for the specified key. """ series = None for provider in self.enabled_providers: try: series = provider.key_to_series(key) break except KeyError: continue if series is None: raise KeyError(f"No data found for key '{key}'.") return series def __setitem__(self, key: str, value: pd.Series) -> None: """Add or merge a Pandas Series for a specified key into the records of an appropriate provider. Attempts to update or insert the provided Series data in each provider. If no provider supports the specified key, an error is raised. Args: key (str): The field name to update, representing a data attribute in DataRecords. value (pd.Series): A Pandas Series containing data for the specified key. Raises: ValueError: If `value` is not an instance of `pd.Series`. KeyError: If no provider supports the specified key. """ if not isinstance(value, pd.Series): raise ValueError("Value must be an instance of pd.Series.") for provider in self.enabled_providers: try: provider.key_from_series(key, value) break except KeyError: continue else: raise KeyError(f"Key '{key}' not found in any provider.") def __delitem__(self, key: str) -> None: """Set the value of the specified key in the data records of each provider to None. Args: key (str): The field name in DataRecords to clear. Raises: KeyError: If the key is not found in any provider. """ for provider in self.enabled_providers: try: provider.key_delete_by_datetime(key) break except KeyError: continue else: raise KeyError(f"Key '{key}' not found in any provider.") def __iter__(self) -> Iterator[str]: """Return an iterator over all unique keys available across providers. Returns: Iterator[str]: An iterator over the unique keys from all providers. """ return iter(self.record_keys) def __len__(self) -> int: """Return the number of keys in the container. Returns: int: The total number of keys in this container. """ return len(self.record_keys) def __repr__(self) -> str: """Provide a string representation of the DataContainer instance. Returns: str: A string representing the container and its contained providers. """ return f"{self.__class__.__name__}({self.providers})" def keys(self) -> KeysView[str]: return dict.fromkeys(self.record_keys).keys() def update_data( self, force_enable: Optional[bool] = False, force_update: Optional[bool] = False, ) -> None: """Update data. Args: force_enable (bool, optional): If True, forces the update even if a provider is disabled. force_update (bool, optional): If True, forces the providers to update the data even if still cached. """ for provider in self.providers: try: provider.update_data(force_enable=force_enable, force_update=force_update) except Exception as ex: error = f"Provider {provider.provider_id()} fails on update - enabled={provider.enabled()}, force_enable={force_enable}, force_update={force_update}: {ex}" logger.error(error) raise RuntimeError(error) def key_to_series( self, key: str, start_datetime: Optional[DateTime] = None, end_datetime: Optional[DateTime] = None, dropna: Optional[bool] = None, ) -> pd.Series: """Extract a series indexed by the date_time field from data records within an optional date range. Iterates through providers to find and return the first available series for the specified key. Args: key (str): The field name in the DataRecord from which to extract values. start_datetime (datetime, optional): The start date for filtering the records (inclusive). end_datetime (datetime, optional): The end date for filtering the records (exclusive). dropna: (bool, optional): Whether to drop NAN/ None values before processing. Defaults to True. Returns: pd.Series: A Pandas Series with the index as the date_time of each record and the values extracted from the specified key. Raises: KeyError: If the specified key is not found in any of the DataRecords. """ series = None for provider in self.enabled_providers: try: series = provider.key_to_series( key, start_datetime=start_datetime, end_datetime=end_datetime, dropna=dropna, ) break except KeyError: continue if series is None: raise KeyError(f"No data found for key '{key}'.") return series def key_to_array( self, key: str, start_datetime: Optional[DateTime] = None, end_datetime: Optional[DateTime] = None, interval: Optional[Duration] = None, fill_method: Optional[str] = None, boundary: Optional[str] = "context", ) -> NDArray[Shape["*"], Any]: """Retrieve an array indexed by fixed time intervals for a specified key from the data in each DataProvider. Iterates through providers to find and return the first available array for the specified key. Args: key (str): The field name to retrieve, representing a data attribute in DataRecords. start_datetime (datetime, optional): The start date for filtering the records (inclusive). end_datetime (datetime, optional): The end date for filtering the records (exclusive). interval (duration, optional): The fixed time interval. Defaults to 1 hour. fill_method (str): Method to handle missing values during resampling. - 'linear': Linearly interpolate missing values (for numeric data only). - 'ffill': Forward fill missing values. - 'bfill': Backward fill missing values. - 'none': Defaults to 'linear' for numeric values, otherwise 'ffill'. Returns: np.ndarray: A NumPy array containing aggregated data for the specified key. Raises: KeyError: If no provider contains data for the specified key. Todo: Cache the result in memory until the next `update_data` call. """ array = None for provider in self.enabled_providers: try: array = provider.key_to_array( key, start_datetime=start_datetime, end_datetime=end_datetime, interval=interval, fill_method=fill_method, boundary=boundary, ) break except KeyError: continue if array is None: raise KeyError(f"No data found for key '{key}'.") return array def keys_to_dataframe( self, keys: list[str], start_datetime: Optional[DateTime] = None, end_datetime: Optional[DateTime] = None, interval: Optional[Any] = None, # Duration assumed fill_method: Optional[str] = None, ) -> pd.DataFrame: """Retrieve a dataframe indexed by fixed time intervals for specified keys from the data in each DataProvider. Generates a pandas DataFrame using the NumPy arrays for each specified key, ensuring a common time index. Args: keys (list[str]): A list of field names to retrieve. start_datetime (datetime, optional): Start date for filtering records (inclusive). end_datetime (datetime, optional): End date for filtering records (exclusive). interval (duration, optional): The fixed time interval. Defaults to 1 hour. fill_method (str, optional): Method to handle missing values during resampling. - 'linear': Linearly interpolate missing values (for numeric data only). - 'ffill': Forward fill missing values. - 'bfill': Backward fill missing values. - 'none': Defaults to 'linear' for numeric values, otherwise 'ffill'. Returns: pd.DataFrame: A DataFrame where each column represents a key's array with a common time index. Raises: KeyError: If no valid data is found for any of the requested keys. ValueError: If any retrieved array has a different time index than the first one. """ # Ensure datetime objects are normalized start_datetime = to_datetime(start_datetime, to_maxtime=False) if start_datetime else None end_datetime = to_datetime(end_datetime, to_maxtime=False) if end_datetime else None if interval is None: interval = to_duration("1 hour") if start_datetime is None: # Take earliest datetime of all providers that are enabled for provider in self.enabled_providers: if start_datetime is None: start_datetime = provider.min_datetime elif ( provider.min_datetime and compare_datetimes(provider.min_datetime, start_datetime).lt ): start_datetime = provider.min_datetime if end_datetime is None: # Take latest datetime of all providers that are enabled for provider in self.enabled_providers: if end_datetime is None: end_datetime = provider.max_datetime elif ( provider.max_datetime and compare_datetimes(provider.max_datetime, end_datetime).gt ): end_datetime = provider.min_datetime if end_datetime: end_datetime.add(seconds=1) # Create a DatetimeIndex based on start, end, and interval if start_datetime is None or end_datetime is None: raise ValueError( f"Can not determine datetime range. Got '{start_datetime}'..'{end_datetime}'." ) reference_index = pd.date_range( start=start_datetime, end=end_datetime, freq=interval, inclusive="left", ) data = {} for key in keys: try: array = self.key_to_array(key, start_datetime, end_datetime, interval, fill_method) if len(array) != len(reference_index): raise ValueError( f"Array length mismatch for key '{key}' (expected {len(reference_index)}, got {len(array)})" ) data[key] = array except KeyError as e: raise KeyError(f"Failed to retrieve data for key '{key}': {e}") if not data: raise KeyError(f"No valid data found for the requested keys {keys}.") return pd.DataFrame(data, index=reference_index) def provider_by_id(self, provider_id: str) -> DataProvider: """Retrieves a data provider by its unique identifier. This method searches through the list of all available providers and returns the first provider whose `provider_id` matches the given `provider_id`. If no matching provider is found, the method returns `None`. Args: provider_id (str): The unique identifier of the desired data provider. Returns: DataProvider: The data provider matching the given `provider_id`. Raises: ValueError if provider id is unknown. Example: provider = data.provider_by_id("WeatherImport") """ providers = {provider.provider_id(): provider for provider in self.providers} if provider_id not in providers: error_msg = f"Unknown provider id: '{provider_id}' of '{providers.keys()}'." logger.error(error_msg) raise ValueError(error_msg) return providers[provider_id] # ----------------------- DataContainer Database Protocol --------------------- def save(self) -> None: """Save data records to persistent storage.""" for provider in self.providers: try: provider.save() except Exception as ex: error = f"Provider {provider.provider_id()} fails on save: {ex}" logger.error(error) raise RuntimeError(error) def load(self) -> None: """Load data records from from persistent storage.""" for provider in self.providers: try: provider.load() except Exception as ex: error = f"Provider {provider.provider_id()} fails on load: {ex}" logger.error(error) raise RuntimeError(error) def db_vacuum(self) -> None: """Remove old records of all providers from database to free space.""" for provider in self.providers: try: provider.db_vacuum() except Exception as ex: error = f"Provider {provider.provider_id()} fails on db vacuum: {ex}" logger.error(error) raise RuntimeError(error) def db_compact(self) -> None: """Apply tiered compaction to all providers to reduce storage while retaining coverage.""" for provider in self.providers: try: provider.db_compact() except Exception as ex: error = f"Provider {provider.provider_id()} fails on db_compact: {ex}" logger.error(error) raise RuntimeError(error) def db_get_stats(self) -> dict: """Get comprehensive statistics about database storage for all providers. Returns: Dictionary with statistics """ db_stats = {} for provider in self.providers: try: db_stats[provider.db_namespace()] = provider.db_get_stats() except Exception as ex: error = f"Provider {provider.provider_id()} fails on db vacuum: {ex}" logger.error(error) raise RuntimeError(error) return db_stats