mirror of
https://github.com/Akkudoktor-EOS/EOS.git
synced 2026-01-01 00:06:18 +00:00
Adapters for Home Assistant and NodeRED integration are added. Akkudoktor-EOS can now be run as Home Assistant add-on and standalone. As Home Assistant add-on EOS uses ingress to fully integrate the EOSdash dashboard in Home Assistant. The fix includes several bug fixes that are not directly related to the adapter implementation but are necessary to keep EOS running properly and to test and document the changes. * fix: development version scheme The development versioning scheme is adaptet to fit to docker and home assistant expectations. The new scheme is x.y.z and x.y.z.dev<hash>. Hash is only digits as expected by home assistant. Development version is appended by .dev as expected by docker. * fix: use mean value in interval on resampling for array When downsampling data use the mean value of all values within the new sampling interval. * fix: default battery ev soc and appliance wh Make the genetic simulation return default values for the battery SoC, electric vehicle SoC and appliance load if these assets are not used. * fix: import json string Strip outer quotes from JSON strings on import to be compliant to json.loads() expectation. * fix: default interval definition for import data Default interval must be defined in lowercase human definition to be accepted by pendulum. * fix: clearoutside schema change * feat: add adapters for integrations Adapters for Home Assistant and NodeRED integration are added. Akkudoktor-EOS can now be run as Home Assistant add-on and standalone. As Home Assistant add-on EOS uses ingress to fully integrate the EOSdash dashboard in Home Assistant. * feat: allow eos to be started with root permissions and drop priviledges Home assistant starts all add-ons with root permissions. Eos now drops root permissions if an applicable user is defined by paramter --run_as_user. The docker image defines the user eos to be used. * feat: make eos supervise and monitor EOSdash Eos now not only starts EOSdash but also monitors EOSdash during runtime and restarts EOSdash on fault. EOSdash logging is captured by EOS and forwarded to the EOS log to provide better visibility. * feat: add duration to string conversion Make to_duration to also return the duration as string on request. * chore: Use info logging to report missing optimization parameters In parameter preparation for automatic optimization an error was logged for missing paramters. Log is now down using the info level. * chore: make EOSdash use the EOS data directory for file import/ export EOSdash use the EOS data directory for file import/ export by default. This allows to use the configuration import/ export function also within docker images. * chore: improve EOSdash config tab display Improve display of JSON code and add more forms for config value update. * chore: make docker image file system layout similar to home assistant Only use /data directory for persistent data. This is handled as a docker volume. The /data volume is mapped to ~/.local/share/net.akkudoktor.eos if using docker compose. * chore: add home assistant add-on development environment Add VSCode devcontainer and task definition for home assistant add-on development. * chore: improve documentation
1190 lines
49 KiB
Python
1190 lines
49 KiB
Python
from datetime import datetime, timezone
|
|
from typing import Any, ClassVar, List, Optional, Union
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import pendulum
|
|
import pytest
|
|
from pydantic import Field, ValidationError
|
|
|
|
from akkudoktoreos.config.configabc import SettingsBaseModel
|
|
from akkudoktoreos.core.dataabc import (
|
|
DataBase,
|
|
DataContainer,
|
|
DataImportProvider,
|
|
DataProvider,
|
|
DataRecord,
|
|
DataSequence,
|
|
)
|
|
from akkudoktoreos.core.ems import get_ems
|
|
from akkudoktoreos.utils.datetimeutil import compare_datetimes, to_datetime, to_duration
|
|
|
|
# Derived classes for testing
|
|
# ---------------------------
|
|
|
|
class DerivedConfig(SettingsBaseModel):
|
|
env_var: Optional[int] = Field(default=None, description="Test config by environment var")
|
|
instance_field: Optional[str] = Field(default=None, description="Test config by instance field")
|
|
class_constant: Optional[int] = Field(default=None, description="Test config by class constant")
|
|
|
|
|
|
class DerivedBase(DataBase):
|
|
instance_field: Optional[str] = Field(default=None, description="Field Value")
|
|
class_constant: ClassVar[int] = 30
|
|
|
|
|
|
class DerivedRecord(DataRecord):
|
|
"""Date Record derived from base class DataRecord.
|
|
|
|
The derived data record got the
|
|
- `data_value` field and the
|
|
- `dish_washer_emr`, `solar_power`, `temp` configurable field like data.
|
|
"""
|
|
|
|
data_value: Optional[float] = Field(default=None, description="Data Value")
|
|
|
|
@classmethod
|
|
def configured_data_keys(cls) -> Optional[list[str]]:
|
|
return ["dish_washer_emr", "solar_power", "temp"]
|
|
|
|
|
|
class DerivedSequence(DataSequence):
|
|
# overload
|
|
records: List[DerivedRecord] = Field(
|
|
default_factory=list, description="List of DerivedRecord records"
|
|
)
|
|
|
|
@classmethod
|
|
def record_class(cls) -> Any:
|
|
return DerivedRecord
|
|
|
|
|
|
class DerivedDataProvider(DataProvider):
|
|
"""A concrete subclass of DataProvider for testing purposes."""
|
|
|
|
# overload
|
|
records: List[DerivedRecord] = Field(
|
|
default_factory=list, description="List of DerivedRecord records"
|
|
)
|
|
provider_enabled: ClassVar[bool] = False
|
|
provider_updated: ClassVar[bool] = False
|
|
|
|
@classmethod
|
|
def record_class(cls) -> Any:
|
|
return DerivedRecord
|
|
|
|
# Implement abstract methods for test purposes
|
|
def provider_id(self) -> str:
|
|
return "DerivedDataProvider"
|
|
|
|
def enabled(self) -> bool:
|
|
return self.provider_enabled
|
|
|
|
def _update_data(self, force_update: Optional[bool] = False) -> None:
|
|
# Simulate update logic
|
|
DerivedDataProvider.provider_updated = True
|
|
|
|
|
|
class DerivedDataImportProvider(DataImportProvider):
|
|
"""A concrete subclass of DataImportProvider for testing purposes."""
|
|
|
|
# overload
|
|
records: List[DerivedRecord] = Field(
|
|
default_factory=list, description="List of DerivedRecord records"
|
|
)
|
|
provider_enabled: ClassVar[bool] = False
|
|
provider_updated: ClassVar[bool] = False
|
|
|
|
@classmethod
|
|
def record_class(cls) -> Any:
|
|
return DerivedRecord
|
|
|
|
# Implement abstract methods for test purposes
|
|
def provider_id(self) -> str:
|
|
return "DerivedDataImportProvider"
|
|
|
|
def enabled(self) -> bool:
|
|
return self.provider_enabled
|
|
|
|
def _update_data(self, force_update: Optional[bool] = False) -> None:
|
|
# Simulate update logic
|
|
DerivedDataImportProvider.provider_updated = True
|
|
|
|
|
|
class DerivedDataContainer(DataContainer):
|
|
providers: List[Union[DerivedDataProvider, DataProvider]] = Field(
|
|
default_factory=list, description="List of data providers"
|
|
)
|
|
|
|
|
|
# Tests
|
|
# ----------
|
|
|
|
|
|
class TestDataBase:
|
|
@pytest.fixture
|
|
def base(self):
|
|
# Provide default values for configuration
|
|
derived = DerivedBase()
|
|
return derived
|
|
|
|
def test_get_config_value_key_error(self, base):
|
|
with pytest.raises(AttributeError):
|
|
base.config.non_existent_key
|
|
|
|
|
|
class TestDataRecord:
|
|
def create_test_record(self, date, value):
|
|
"""Helper function to create a test DataRecord."""
|
|
return DerivedRecord(date_time=date, data_value=value)
|
|
|
|
@pytest.fixture
|
|
def record(self):
|
|
"""Fixture to create a sample DerivedDataRecord with some data set."""
|
|
rec = DerivedRecord(date_time=None, data_value=10.0)
|
|
rec.configured_data = {"dish_washer_emr": 123.0, "solar_power": 456.0}
|
|
return rec
|
|
|
|
def test_getitem(self):
|
|
record = self.create_test_record(datetime(2024, 1, 3, tzinfo=timezone.utc), 10.0)
|
|
assert record["data_value"] == 10.0
|
|
|
|
def test_setitem(self):
|
|
record = self.create_test_record(datetime(2024, 1, 3, tzinfo=timezone.utc), 10.0)
|
|
record["data_value"] = 20.0
|
|
assert record.data_value == 20.0
|
|
|
|
def test_delitem(self):
|
|
record = self.create_test_record(datetime(2024, 1, 3, tzinfo=timezone.utc), 10.0)
|
|
record.data_value = 20.0
|
|
del record["data_value"]
|
|
assert record.data_value is None
|
|
|
|
def test_len(self):
|
|
record = self.create_test_record(datetime(2024, 1, 3, tzinfo=timezone.utc), 10.0)
|
|
record.date_time = None
|
|
record.data_value = 20.0
|
|
assert len(record) == 5 # 2 regular fields + 3 configured data "fields"
|
|
|
|
def test_to_dict(self):
|
|
record = self.create_test_record(datetime(2024, 1, 3, tzinfo=timezone.utc), 10.0)
|
|
record.data_value = 20.0
|
|
record_dict = record.to_dict()
|
|
assert "data_value" in record_dict
|
|
assert record_dict["data_value"] == 20.0
|
|
record2 = DerivedRecord.from_dict(record_dict)
|
|
assert record2.model_dump() == record.model_dump()
|
|
|
|
def test_to_json(self):
|
|
record = self.create_test_record(datetime(2024, 1, 3, tzinfo=timezone.utc), 10.0)
|
|
record.data_value = 20.0
|
|
json_str = record.to_json()
|
|
assert "data_value" in json_str
|
|
assert "20.0" in json_str
|
|
record2 = DerivedRecord.from_json(json_str)
|
|
assert record2.model_dump() == record.model_dump()
|
|
|
|
def test_record_keys_includes_configured_data_keys(self, record):
|
|
"""Ensure record_keys includes all configured configured data keys."""
|
|
assert set(record.record_keys()) >= set(record.configured_data_keys())
|
|
|
|
def test_record_keys_writable_includes_configured_data_keys(self, record):
|
|
"""Ensure record_keys_writable includes all configured configured data keys."""
|
|
assert set(record.record_keys_writable()) >= set(record.configured_data_keys())
|
|
|
|
def test_getitem_existing_field(self, record):
|
|
"""Test that __getitem__ returns correct value for existing native field."""
|
|
record.date_time = "2024-01-01T00:00:00+00:00"
|
|
assert record["date_time"] is not None
|
|
|
|
def test_getitem_existing_configured_data(self, record):
|
|
"""Test that __getitem__ retrieves existing configured data values."""
|
|
assert record["dish_washer_emr"] == 123.0
|
|
assert record["solar_power"] == 456.0
|
|
|
|
def test_getitem_missing_configured_data_returns_none(self, record):
|
|
"""Test that __getitem__ returns None for missing but known configured data keys."""
|
|
assert record["temp"] is None
|
|
|
|
def test_getitem_raises_keyerror(self, record):
|
|
"""Test that __getitem__ raises KeyError for completely unknown keys."""
|
|
with pytest.raises(KeyError):
|
|
_ = record["nonexistent"]
|
|
|
|
def test_setitem_field(self, record):
|
|
"""Test setting a native field using __setitem__."""
|
|
record["date_time"] = "2025-01-01T12:00:00+00:00"
|
|
assert str(record.date_time).startswith("2025-01-01")
|
|
|
|
def test_setitem_configured_data(self, record):
|
|
"""Test setting a known configured data key using __setitem__."""
|
|
record["temp"] = 25.5
|
|
assert record.configured_data["temp"] == 25.5
|
|
|
|
def test_setitem_invalid_key_raises(self, record):
|
|
"""Test that __setitem__ raises KeyError for unknown keys."""
|
|
with pytest.raises(KeyError):
|
|
record["unknown_key"] = 123
|
|
|
|
def test_delitem_field(self, record):
|
|
"""Test deleting a native field using __delitem__."""
|
|
record["date_time"] = "2025-01-01T12:00:00+00:00"
|
|
del record["date_time"]
|
|
assert record.date_time is None
|
|
|
|
def test_delitem_configured_data(self, record):
|
|
"""Test deleting a known configured data key using __delitem__."""
|
|
del record["solar_power"]
|
|
assert "solar_power" not in record.configured_data
|
|
|
|
def test_delitem_unknown_raises(self, record):
|
|
"""Test that __delitem__ raises KeyError for unknown keys."""
|
|
with pytest.raises(KeyError):
|
|
del record["nonexistent"]
|
|
|
|
def test_attribute_get_existing_field(self, record):
|
|
"""Test accessing a native field via attribute."""
|
|
record.date_time = "2025-01-01T12:00:00+00:00"
|
|
assert record.date_time is not None
|
|
|
|
def test_attribute_get_existing_configured_data(self, record):
|
|
"""Test accessing an existing configured data via attribute."""
|
|
assert record.dish_washer_emr == 123.0
|
|
|
|
def test_attribute_get_missing_configured_data(self, record):
|
|
"""Test accessing a missing but known configured data returns None."""
|
|
assert record.temp is None
|
|
|
|
def test_attribute_get_invalid_raises(self, record):
|
|
"""Test accessing an unknown attribute raises AttributeError."""
|
|
with pytest.raises(AttributeError):
|
|
_ = record.nonexistent
|
|
|
|
def test_attribute_set_existing_field(self, record):
|
|
"""Test setting a native field via attribute."""
|
|
record.date_time = "2025-06-25T12:00:00+00:00"
|
|
assert record.date_time is not None
|
|
|
|
def test_attribute_set_existing_configured_data(self, record):
|
|
"""Test setting a known configured data key via attribute."""
|
|
record.temp = 99.9
|
|
assert record.configured_data["temp"] == 99.9
|
|
|
|
def test_attribute_set_invalid_raises(self, record):
|
|
"""Test setting an unknown attribute raises AttributeError."""
|
|
with pytest.raises(AttributeError):
|
|
record.invalid = 123
|
|
|
|
def test_delattr_field(self, record):
|
|
"""Test deleting a native field via attribute."""
|
|
record.date_time = "2025-06-25T12:00:00+00:00"
|
|
del record.date_time
|
|
assert record.date_time is None
|
|
|
|
def test_delattr_configured_data(self, record):
|
|
"""Test deleting a known configured data key via attribute."""
|
|
record.temp = 88.0
|
|
del record.temp
|
|
assert "temp" not in record.configured_data
|
|
|
|
def test_delattr_ignored_missing_configured_data_key(self, record):
|
|
"""Test deleting a known configured data key that was never set is a no-op."""
|
|
del record.temp
|
|
assert "temp" not in record.configured_data
|
|
|
|
def test_len_and_iter(self, record):
|
|
"""Test that __len__ and __iter__ behave as expected."""
|
|
keys = list(iter(record))
|
|
assert set(record.record_keys_writable()) == set(keys)
|
|
assert len(record) == len(keys)
|
|
|
|
def test_in_operator_includes_configured_data(self, record):
|
|
"""Test that 'in' operator includes configured data keys."""
|
|
assert "dish_washer_emr" in record
|
|
assert "temp" in record # known key, even if not yet set
|
|
assert "nonexistent" not in record
|
|
|
|
def test_hasattr_behavior(self, record):
|
|
"""Test that hasattr returns True for fields and known configured dataWs."""
|
|
assert hasattr(record, "date_time")
|
|
assert hasattr(record, "dish_washer_emr")
|
|
assert hasattr(record, "temp") # allowed, even if not yet set
|
|
assert not hasattr(record, "nonexistent")
|
|
|
|
def test_model_validate_roundtrip(self, record):
|
|
"""Test that MeasurementDataRecord can be serialized and revalidated."""
|
|
dumped = record.model_dump()
|
|
restored = DerivedRecord.model_validate(dumped)
|
|
assert restored.dish_washer_emr == 123.0
|
|
assert restored.solar_power == 456.0
|
|
assert restored.temp is None # not set
|
|
|
|
def test_copy_preserves_configured_data(self, record):
|
|
"""Test that copying preserves configured data values."""
|
|
record.temp = 22.2
|
|
copied = record.model_copy()
|
|
assert copied.dish_washer_emr == 123.0
|
|
assert copied.temp == 22.2
|
|
assert copied is not record
|
|
|
|
def test_equality_includes_configured_data(self, record):
|
|
"""Test that equality includes the `configured data` content."""
|
|
other = record.model_copy()
|
|
assert record == other
|
|
|
|
def test_inequality_differs_with_configured_data(self, record):
|
|
"""Test that records with different configured datas are not equal."""
|
|
other = record.model_copy(deep=True)
|
|
# Modify one configured data value in the copy
|
|
other.configured_data["dish_washer_emr"] = 999.9
|
|
assert record != other
|
|
|
|
def test_in_operator_for_configured_data_and_fields(self, record):
|
|
"""Ensure 'in' works for both fields and configured configured data keys."""
|
|
assert "dish_washer_emr" in record
|
|
assert "solar_power" in record
|
|
assert "date_time" in record # standard field
|
|
assert "temp" in record # allowed but not yet set
|
|
assert "unknown" not in record
|
|
|
|
def test_hasattr_equivalence_to_getattr(self, record):
|
|
"""hasattr should return True for all valid keys/configured datas."""
|
|
assert hasattr(record, "dish_washer_emr")
|
|
assert hasattr(record, "temp")
|
|
assert hasattr(record, "date_time")
|
|
assert not hasattr(record, "nonexistent")
|
|
|
|
def test_dir_includes_configured_data_keys(self, record):
|
|
"""`dir(record)` should include configured data keys for introspection.
|
|
It shall not include the internal 'configured datas' attribute.
|
|
"""
|
|
keys = dir(record)
|
|
assert "configured datas" not in keys
|
|
for key in record.configured_data_keys():
|
|
assert key in keys
|
|
|
|
def test_init_configured_field_like_data_applies_before_model_init(self):
|
|
"""Test that keys listed in `_configured_data_keys` are moved to `configured_data` at init time."""
|
|
record = DerivedRecord(
|
|
date_time="2024-01-03T00:00:00+00:00",
|
|
data_value=42.0,
|
|
dish_washer_emr=111.1,
|
|
solar_power=222.2,
|
|
temp=333.3 # assume `temp` is also a valid configured key
|
|
)
|
|
|
|
assert record.data_value == 42.0
|
|
assert record.configured_data == {
|
|
"dish_washer_emr": 111.1,
|
|
"solar_power": 222.2,
|
|
"temp": 333.3,
|
|
}
|
|
|
|
|
|
class TestDataSequence:
|
|
@pytest.fixture
|
|
def sequence(self):
|
|
sequence0 = DerivedSequence()
|
|
assert len(sequence0) == 0
|
|
return sequence0
|
|
|
|
@pytest.fixture
|
|
def sequence2(self):
|
|
sequence = DerivedSequence()
|
|
record1 = self.create_test_record(datetime(1970, 1, 1), 1970)
|
|
record2 = self.create_test_record(datetime(1971, 1, 1), 1971)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
assert len(sequence) == 2
|
|
return sequence
|
|
|
|
def create_test_record(self, date, value):
|
|
"""Helper function to create a test DataRecord."""
|
|
return DerivedRecord(date_time=date, data_value=value)
|
|
|
|
# Test cases
|
|
def test_getitem(self, sequence):
|
|
assert len(sequence) == 0
|
|
record = self.create_test_record("2024-01-01 00:00:00", 0)
|
|
sequence.insert_by_datetime(record)
|
|
assert isinstance(sequence[0], DerivedRecord)
|
|
|
|
def test_setitem(self, sequence2):
|
|
new_record = self.create_test_record(datetime(2024, 1, 3, tzinfo=timezone.utc), 1)
|
|
sequence2[0] = new_record
|
|
assert sequence2[0].date_time == datetime(2024, 1, 3, tzinfo=timezone.utc)
|
|
|
|
def test_set_record_at_index(self, sequence2):
|
|
record1 = self.create_test_record(datetime(2024, 1, 3, tzinfo=timezone.utc), 1)
|
|
record2 = self.create_test_record(datetime(2023, 11, 5), 0.8)
|
|
sequence2[1] = record1
|
|
assert sequence2[1].date_time == datetime(2024, 1, 3, tzinfo=timezone.utc)
|
|
sequence2[0] = record2
|
|
assert len(sequence2) == 2
|
|
assert sequence2[0] == record2
|
|
|
|
def test_insert_duplicate_date_record(self, sequence):
|
|
record1 = self.create_test_record(datetime(2023, 11, 5), 0.8)
|
|
record2 = self.create_test_record(datetime(2023, 11, 5), 0.9) # Duplicate date
|
|
sequence.insert_by_datetime(record1)
|
|
sequence.insert_by_datetime(record2)
|
|
assert len(sequence) == 1
|
|
assert sequence[0].data_value == 0.9 # Record should have merged with new value
|
|
|
|
def test_sort_by_datetime_ascending(self, sequence):
|
|
"""Test sorting records in ascending order by date_time."""
|
|
records = [
|
|
self.create_test_record(pendulum.datetime(2024, 11, 1), 0.7),
|
|
self.create_test_record(pendulum.datetime(2024, 10, 1), 0.8),
|
|
self.create_test_record(pendulum.datetime(2024, 12, 1), 0.9),
|
|
]
|
|
for i, record in enumerate(records):
|
|
sequence.insert(i, record)
|
|
sequence.sort_by_datetime()
|
|
sorted_dates = [record.date_time for record in sequence.records]
|
|
for i, expected_date in enumerate(
|
|
[
|
|
pendulum.datetime(2024, 10, 1),
|
|
pendulum.datetime(2024, 11, 1),
|
|
pendulum.datetime(2024, 12, 1),
|
|
]
|
|
):
|
|
assert compare_datetimes(sorted_dates[i], expected_date).equal
|
|
|
|
def test_sort_by_datetime_descending(self, sequence):
|
|
"""Test sorting records in descending order by date_time."""
|
|
records = [
|
|
self.create_test_record(pendulum.datetime(2024, 11, 1), 0.7),
|
|
self.create_test_record(pendulum.datetime(2024, 10, 1), 0.8),
|
|
self.create_test_record(pendulum.datetime(2024, 12, 1), 0.9),
|
|
]
|
|
for i, record in enumerate(records):
|
|
sequence.insert(i, record)
|
|
sequence.sort_by_datetime(reverse=True)
|
|
sorted_dates = [record.date_time for record in sequence.records]
|
|
for i, expected_date in enumerate(
|
|
[
|
|
pendulum.datetime(2024, 12, 1),
|
|
pendulum.datetime(2024, 11, 1),
|
|
pendulum.datetime(2024, 10, 1),
|
|
]
|
|
):
|
|
assert compare_datetimes(sorted_dates[i], expected_date).equal
|
|
|
|
def test_sort_by_datetime_with_none(self, sequence):
|
|
"""Test sorting records when some date_time values are None."""
|
|
records = [
|
|
self.create_test_record(pendulum.datetime(2024, 11, 1), 0.7),
|
|
self.create_test_record(pendulum.datetime(2024, 10, 1), 0.8),
|
|
self.create_test_record(pendulum.datetime(2024, 12, 1), 0.9),
|
|
]
|
|
for i, record in enumerate(records):
|
|
sequence.insert(i, record)
|
|
sequence.records[2].date_time = None
|
|
assert sequence.records[2].date_time is None
|
|
sequence.sort_by_datetime()
|
|
sorted_dates = [record.date_time for record in sequence.records]
|
|
for i, expected_date in enumerate(
|
|
[
|
|
None, # None values should come first
|
|
pendulum.datetime(2024, 10, 1),
|
|
pendulum.datetime(2024, 11, 1),
|
|
]
|
|
):
|
|
if expected_date is None:
|
|
assert sorted_dates[i] is None
|
|
else:
|
|
assert compare_datetimes(sorted_dates[i], expected_date).equal
|
|
|
|
def test_sort_by_datetime_error_on_uncomparable(self, sequence):
|
|
"""Test error is raised when date_time contains uncomparable values."""
|
|
records = [
|
|
self.create_test_record(pendulum.datetime(2024, 11, 1), 0.7),
|
|
self.create_test_record(pendulum.datetime(2024, 12, 1), 0.9),
|
|
self.create_test_record(pendulum.datetime(2024, 10, 1), 0.8),
|
|
]
|
|
for i, record in enumerate(records):
|
|
sequence.insert(i, record)
|
|
with pytest.raises(
|
|
ValidationError, match="Date string not_a_datetime does not match any known formats."
|
|
):
|
|
sequence.records[2].date_time = "not_a_datetime" # Invalid date_time
|
|
sequence.sort_by_datetime()
|
|
|
|
def test_key_to_series(self, sequence):
|
|
record = self.create_test_record(datetime(2023, 11, 6), 0.8)
|
|
sequence.append(record)
|
|
series = sequence.key_to_series("data_value")
|
|
assert isinstance(series, pd.Series)
|
|
assert series[to_datetime(datetime(2023, 11, 6))] == 0.8
|
|
|
|
def test_key_from_series(self, sequence):
|
|
series = pd.Series(
|
|
data=[0.8, 0.9], index=pd.to_datetime([datetime(2023, 11, 5), datetime(2023, 11, 6)])
|
|
)
|
|
sequence.key_from_series("data_value", series)
|
|
assert len(sequence) == 2
|
|
assert sequence[0].data_value == 0.8
|
|
assert sequence[1].data_value == 0.9
|
|
|
|
def test_key_to_array(self, sequence):
|
|
interval = to_duration("1 day")
|
|
start_datetime = to_datetime("2023-11-6")
|
|
last_datetime = to_datetime("2023-11-8")
|
|
end_datetime = to_datetime("2023-11-9")
|
|
record = self.create_test_record(start_datetime, float(start_datetime.day))
|
|
sequence.insert_by_datetime(record)
|
|
record = self.create_test_record(last_datetime, float(last_datetime.day))
|
|
sequence.insert_by_datetime(record)
|
|
assert sequence[0].data_value == 6.0
|
|
assert sequence[1].data_value == 8.0
|
|
|
|
series = sequence.key_to_series(
|
|
key="data_value", start_datetime=start_datetime, end_datetime=end_datetime
|
|
)
|
|
assert len(series) == 2
|
|
assert series[to_datetime("2023-11-6")] == 6
|
|
assert series[to_datetime("2023-11-8")] == 8
|
|
|
|
array = sequence.key_to_array(
|
|
key="data_value",
|
|
start_datetime=start_datetime,
|
|
end_datetime=end_datetime,
|
|
interval=interval,
|
|
)
|
|
assert isinstance(array, np.ndarray)
|
|
assert len(array) == 3
|
|
assert array[0] == start_datetime.day
|
|
assert array[1] == 7
|
|
assert array[2] == last_datetime.day
|
|
|
|
def test_key_to_array_linear_interpolation(self, sequence):
|
|
"""Test key_to_array with linear interpolation for numeric data."""
|
|
interval = to_duration("1 hour")
|
|
record1 = self.create_test_record(pendulum.datetime(2023, 11, 6, 0), 0.8)
|
|
record2 = self.create_test_record(pendulum.datetime(2023, 11, 6, 2), 1.0) # Gap of 2 hours
|
|
sequence.insert_by_datetime(record1)
|
|
sequence.insert_by_datetime(record2)
|
|
|
|
array = sequence.key_to_array(
|
|
key="data_value",
|
|
start_datetime=pendulum.datetime(2023, 11, 6),
|
|
end_datetime=pendulum.datetime(2023, 11, 6, 3),
|
|
interval=interval,
|
|
fill_method="linear",
|
|
)
|
|
assert len(array) == 3
|
|
assert array[0] == 0.8
|
|
assert array[1] == 0.9 # Interpolated value
|
|
assert array[2] == 1.0
|
|
|
|
def test_key_to_array_ffill(self, sequence):
|
|
"""Test key_to_array with forward filling for missing values."""
|
|
interval = to_duration("1 hour")
|
|
record1 = self.create_test_record(pendulum.datetime(2023, 11, 6, 0), 0.8)
|
|
record2 = self.create_test_record(pendulum.datetime(2023, 11, 6, 2), 1.0)
|
|
sequence.insert_by_datetime(record1)
|
|
sequence.insert_by_datetime(record2)
|
|
|
|
array = sequence.key_to_array(
|
|
key="data_value",
|
|
start_datetime=pendulum.datetime(2023, 11, 6),
|
|
end_datetime=pendulum.datetime(2023, 11, 6, 3),
|
|
interval=interval,
|
|
fill_method="ffill",
|
|
)
|
|
assert len(array) == 3
|
|
assert array[0] == 0.8
|
|
assert array[1] == 0.8 # Forward-filled value
|
|
assert array[2] == 1.0
|
|
|
|
def test_key_to_array_ffill_one_value(self, sequence):
|
|
"""Test key_to_array with forward filling for missing values and only one value at end available."""
|
|
interval = to_duration("1 hour")
|
|
record1 = self.create_test_record(pendulum.datetime(2023, 11, 6, 2), 1.0)
|
|
sequence.insert_by_datetime(record1)
|
|
|
|
array = sequence.key_to_array(
|
|
key="data_value",
|
|
start_datetime=pendulum.datetime(2023, 11, 6),
|
|
end_datetime=pendulum.datetime(2023, 11, 6, 4),
|
|
interval=interval,
|
|
fill_method="ffill",
|
|
)
|
|
assert len(array) == 4
|
|
assert array[0] == 1.0 # Backward-filled value
|
|
assert array[1] == 1.0 # Backward-filled value
|
|
assert array[2] == 1.0
|
|
assert array[2] == 1.0 # Forward-filled value
|
|
|
|
def test_key_to_array_bfill(self, sequence):
|
|
"""Test key_to_array with backward filling for missing values."""
|
|
interval = to_duration("1 hour")
|
|
record1 = self.create_test_record(pendulum.datetime(2023, 11, 6, 0), 0.8)
|
|
record2 = self.create_test_record(pendulum.datetime(2023, 11, 6, 2), 1.0)
|
|
sequence.insert_by_datetime(record1)
|
|
sequence.insert_by_datetime(record2)
|
|
|
|
array = sequence.key_to_array(
|
|
key="data_value",
|
|
start_datetime=pendulum.datetime(2023, 11, 6),
|
|
end_datetime=pendulum.datetime(2023, 11, 6, 3),
|
|
interval=interval,
|
|
fill_method="bfill",
|
|
)
|
|
assert len(array) == 3
|
|
assert array[0] == 0.8
|
|
assert array[1] == 1.0 # Backward-filled value
|
|
assert array[2] == 1.0
|
|
|
|
def test_key_to_array_with_truncation(self, sequence):
|
|
"""Test truncation behavior in key_to_array."""
|
|
interval = to_duration("1 hour")
|
|
record1 = self.create_test_record(pendulum.datetime(2023, 11, 5, 23), 0.8)
|
|
record2 = self.create_test_record(pendulum.datetime(2023, 11, 6, 1), 1.0)
|
|
sequence.insert_by_datetime(record1)
|
|
sequence.insert_by_datetime(record2)
|
|
|
|
array = sequence.key_to_array(
|
|
key="data_value",
|
|
start_datetime=pendulum.datetime(2023, 11, 6),
|
|
end_datetime=pendulum.datetime(2023, 11, 6, 2),
|
|
interval=interval,
|
|
)
|
|
assert len(array) == 2
|
|
assert array[0] == 0.9 # Interpolated from previous day
|
|
assert array[1] == 1.0
|
|
|
|
def test_key_to_array_with_none(self, sequence):
|
|
"""Test handling of empty series in key_to_array."""
|
|
interval = to_duration("1 hour")
|
|
array = sequence.key_to_array(
|
|
key="data_value",
|
|
start_datetime=pendulum.datetime(2023, 11, 6),
|
|
end_datetime=pendulum.datetime(2023, 11, 6, 3),
|
|
interval=interval,
|
|
)
|
|
assert isinstance(array, np.ndarray)
|
|
assert np.all(array == None)
|
|
|
|
def test_key_to_array_with_one(self, sequence):
|
|
"""Test handling of one element series in key_to_array."""
|
|
interval = to_duration("1 hour")
|
|
record1 = self.create_test_record(pendulum.datetime(2023, 11, 5, 23), 0.8)
|
|
sequence.insert_by_datetime(record1)
|
|
|
|
array = sequence.key_to_array(
|
|
key="data_value",
|
|
start_datetime=pendulum.datetime(2023, 11, 6),
|
|
end_datetime=pendulum.datetime(2023, 11, 6, 2),
|
|
interval=interval,
|
|
)
|
|
assert len(array) == 2
|
|
assert array[0] == 0.8 # Interpolated from previous day
|
|
assert array[1] == 0.8
|
|
|
|
def test_key_to_array_invalid_fill_method(self, sequence):
|
|
"""Test invalid fill_method raises an error."""
|
|
interval = to_duration("1 hour")
|
|
record1 = self.create_test_record(pendulum.datetime(2023, 11, 6, 0), 0.8)
|
|
sequence.insert_by_datetime(record1)
|
|
|
|
with pytest.raises(ValueError, match="Unsupported fill method: invalid"):
|
|
sequence.key_to_array(
|
|
key="data_value",
|
|
start_datetime=pendulum.datetime(2023, 11, 6),
|
|
end_datetime=pendulum.datetime(2023, 11, 6, 1),
|
|
interval=interval,
|
|
fill_method="invalid",
|
|
)
|
|
|
|
def test_key_to_array_resample_mean(self, sequence):
|
|
"""Test that numeric resampling uses mean when multiple values fall into one interval."""
|
|
interval = to_duration("1 hour")
|
|
# Insert values every 15 minutes within the same hour
|
|
record1 = self.create_test_record(pendulum.datetime(2023, 11, 6, 0, 0), 1.0)
|
|
record2 = self.create_test_record(pendulum.datetime(2023, 11, 6, 0, 15), 2.0)
|
|
record3 = self.create_test_record(pendulum.datetime(2023, 11, 6, 0, 30), 3.0)
|
|
record4 = self.create_test_record(pendulum.datetime(2023, 11, 6, 0, 45), 4.0)
|
|
|
|
sequence.insert_by_datetime(record1)
|
|
sequence.insert_by_datetime(record2)
|
|
sequence.insert_by_datetime(record3)
|
|
sequence.insert_by_datetime(record4)
|
|
|
|
# Resample to hourly interval, expecting the mean of the 4 values
|
|
array = sequence.key_to_array(
|
|
key="data_value",
|
|
start_datetime=pendulum.datetime(2023, 11, 6, 0),
|
|
end_datetime=pendulum.datetime(2023, 11, 6, 1),
|
|
interval=interval,
|
|
)
|
|
|
|
assert isinstance(array, np.ndarray)
|
|
assert len(array) == 1 # one interval: 0:00-1:00
|
|
# The first interval mean = (1+2+3+4)/4 = 2.5
|
|
assert array[0] == pytest.approx(2.5)
|
|
|
|
def test_to_datetimeindex(self, sequence2):
|
|
record1 = self.create_test_record(datetime(2023, 11, 5), 0.8)
|
|
record2 = self.create_test_record(datetime(2023, 11, 6), 0.9)
|
|
sequence2.insert(0, record1)
|
|
sequence2.insert(1, record2)
|
|
dt_index = sequence2.to_datetimeindex()
|
|
assert isinstance(dt_index, pd.DatetimeIndex)
|
|
assert dt_index[0] == to_datetime(datetime(2023, 11, 5))
|
|
assert dt_index[1] == to_datetime(datetime(2023, 11, 6))
|
|
|
|
def test_delete_by_datetime_range(self, sequence):
|
|
record1 = self.create_test_record(datetime(2023, 11, 5), 0.8)
|
|
record2 = self.create_test_record(datetime(2023, 11, 6), 0.9)
|
|
record3 = self.create_test_record(datetime(2023, 11, 7), 1.0)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
sequence.append(record3)
|
|
assert len(sequence) == 3
|
|
sequence.delete_by_datetime(
|
|
start_datetime=datetime(2023, 11, 6), end_datetime=datetime(2023, 11, 7)
|
|
)
|
|
assert len(sequence) == 2
|
|
assert sequence[0].date_time == to_datetime(datetime(2023, 11, 5))
|
|
assert sequence[1].date_time == to_datetime(datetime(2023, 11, 7))
|
|
|
|
def test_delete_by_datetime_start(self, sequence):
|
|
record1 = self.create_test_record(datetime(2023, 11, 5), 0.8)
|
|
record2 = self.create_test_record(datetime(2023, 11, 6), 0.9)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
assert len(sequence) == 2
|
|
sequence.delete_by_datetime(start_datetime=datetime(2023, 11, 6))
|
|
assert len(sequence) == 1
|
|
assert sequence[0].date_time == to_datetime(datetime(2023, 11, 5))
|
|
|
|
def test_delete_by_datetime_end(self, sequence):
|
|
record1 = self.create_test_record(datetime(2023, 11, 5), 0.8)
|
|
record2 = self.create_test_record(datetime(2023, 11, 6), 0.9)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
assert len(sequence) == 2
|
|
sequence.delete_by_datetime(end_datetime=datetime(2023, 11, 6))
|
|
assert len(sequence) == 1
|
|
assert sequence[0].date_time == to_datetime(datetime(2023, 11, 6))
|
|
|
|
def test_filter_by_datetime(self, sequence):
|
|
record1 = self.create_test_record(datetime(2023, 11, 5), 0.8)
|
|
record2 = self.create_test_record(datetime(2023, 11, 6), 0.9)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
filtered_sequence = sequence.filter_by_datetime(start_datetime=datetime(2023, 11, 6))
|
|
assert len(filtered_sequence) == 1
|
|
assert filtered_sequence[0].date_time == to_datetime(datetime(2023, 11, 6))
|
|
|
|
def test_to_dict(self, sequence):
|
|
record = self.create_test_record(datetime(2023, 11, 6), 0.8)
|
|
sequence.append(record)
|
|
data_dict = sequence.to_dict()
|
|
assert isinstance(data_dict, dict)
|
|
sequence_other = sequence.from_dict(data_dict)
|
|
assert sequence_other.model_dump() == sequence.model_dump()
|
|
|
|
def test_to_json(self, sequence):
|
|
record = self.create_test_record(datetime(2023, 11, 6), 0.8)
|
|
sequence.append(record)
|
|
json_str = sequence.to_json()
|
|
assert isinstance(json_str, str)
|
|
assert "2023-11-06" in json_str
|
|
assert ": 0.8" in json_str
|
|
|
|
def test_from_json(self, sequence, sequence2):
|
|
json_str = sequence2.to_json()
|
|
sequence = sequence.from_json(json_str)
|
|
assert len(sequence) == len(sequence2)
|
|
assert sequence[0].date_time == sequence2[0].date_time
|
|
assert sequence[0].data_value == sequence2[0].data_value
|
|
|
|
def test_key_to_value_exact_match(self, sequence):
|
|
"""Test key_to_value returns exact match when datetime matches a record."""
|
|
dt = datetime(2023, 11, 5)
|
|
record = self.create_test_record(dt, 0.75)
|
|
sequence.append(record)
|
|
result = sequence.key_to_value("data_value", dt)
|
|
assert result == 0.75
|
|
|
|
def test_key_to_value_nearest(self, sequence):
|
|
"""Test key_to_value returns value closest in time to the given datetime."""
|
|
record1 = self.create_test_record(datetime(2023, 11, 5, 12), 0.6)
|
|
record2 = self.create_test_record(datetime(2023, 11, 6, 12), 0.9)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
dt = datetime(2023, 11, 6, 10) # closer to record2
|
|
result = sequence.key_to_value("data_value", dt)
|
|
assert result == 0.9
|
|
|
|
def test_key_to_value_nearest_after(self, sequence):
|
|
"""Test key_to_value returns value nearest after the given datetime."""
|
|
record1 = self.create_test_record(datetime(2023, 11, 5, 10), 0.7)
|
|
record2 = self.create_test_record(datetime(2023, 11, 5, 15), 0.8)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
dt = datetime(2023, 11, 5, 14) # closer to record2
|
|
result = sequence.key_to_value("data_value", dt)
|
|
assert result == 0.8
|
|
|
|
def test_key_to_value_empty_sequence(self, sequence):
|
|
"""Test key_to_value returns None when sequence is empty."""
|
|
result = sequence.key_to_value("data_value", datetime(2023, 11, 5))
|
|
assert result is None
|
|
|
|
def test_key_to_value_missing_key(self, sequence):
|
|
"""Test key_to_value returns None when key is missing in records."""
|
|
record = self.create_test_record(datetime(2023, 11, 5), None)
|
|
sequence.append(record)
|
|
result = sequence.key_to_value("data_value", datetime(2023, 11, 5))
|
|
assert result is None
|
|
|
|
def test_key_to_value_multiple_records_with_none(self, sequence):
|
|
"""Test key_to_value skips records with None values."""
|
|
r1 = self.create_test_record(datetime(2023, 11, 5), None)
|
|
r2 = self.create_test_record(datetime(2023, 11, 6), 1.0)
|
|
sequence.append(r1)
|
|
sequence.append(r2)
|
|
result = sequence.key_to_value("data_value", datetime(2023, 11, 5, 12))
|
|
assert result == 1.0
|
|
|
|
def test_key_to_dict(self, sequence):
|
|
record1 = self.create_test_record(datetime(2023, 11, 5), 0.8)
|
|
record2 = self.create_test_record(datetime(2023, 11, 6), 0.9)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
data_dict = sequence.key_to_dict("data_value")
|
|
assert isinstance(data_dict, dict)
|
|
assert data_dict[to_datetime(datetime(2023, 11, 5), as_string=True)] == 0.8
|
|
assert data_dict[to_datetime(datetime(2023, 11, 6), as_string=True)] == 0.9
|
|
|
|
def test_key_to_lists(self, sequence):
|
|
record1 = self.create_test_record(datetime(2023, 11, 5), 0.8)
|
|
record2 = self.create_test_record(datetime(2023, 11, 6), 0.9)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
dates, values = sequence.key_to_lists("data_value")
|
|
assert dates == [to_datetime(datetime(2023, 11, 5)), to_datetime(datetime(2023, 11, 6))]
|
|
assert values == [0.8, 0.9]
|
|
|
|
def test_to_dataframe_full_data(self, sequence):
|
|
"""Test conversion of all records to a DataFrame without filtering."""
|
|
record1 = self.create_test_record("2024-01-01T12:00:00Z", 10)
|
|
record2 = self.create_test_record("2024-01-01T13:00:00Z", 20)
|
|
record3 = self.create_test_record("2024-01-01T14:00:00Z", 30)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
sequence.append(record3)
|
|
|
|
df = sequence.to_dataframe()
|
|
|
|
# Validate DataFrame structure
|
|
assert isinstance(df, pd.DataFrame)
|
|
assert not df.empty
|
|
assert len(df) == 3 # All records should be included
|
|
assert "data_value" in df.columns
|
|
|
|
def test_to_dataframe_with_filter(self, sequence):
|
|
"""Test filtering records by datetime range."""
|
|
record1 = self.create_test_record("2024-01-01T12:00:00Z", 10)
|
|
record2 = self.create_test_record("2024-01-01T13:00:00Z", 20)
|
|
record3 = self.create_test_record("2024-01-01T14:00:00Z", 30)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
sequence.append(record3)
|
|
|
|
start = to_datetime("2024-01-01T12:30:00Z")
|
|
end = to_datetime("2024-01-01T14:00:00Z")
|
|
|
|
df = sequence.to_dataframe(start_datetime=start, end_datetime=end)
|
|
|
|
assert isinstance(df, pd.DataFrame)
|
|
assert not df.empty
|
|
assert len(df) == 1 # Only one record should match the range
|
|
assert df.index[0] == pd.Timestamp("2024-01-01T13:00:00Z")
|
|
|
|
def test_to_dataframe_no_matching_records(self, sequence):
|
|
"""Test when no records match the given datetime filter."""
|
|
record1 = self.create_test_record("2024-01-01T12:00:00Z", 10)
|
|
record2 = self.create_test_record("2024-01-01T13:00:00Z", 20)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
|
|
start = to_datetime("2024-01-01T14:00:00Z") # Start time after all records
|
|
end = to_datetime("2024-01-01T15:00:00Z")
|
|
|
|
df = sequence.to_dataframe(start_datetime=start, end_datetime=end)
|
|
|
|
assert isinstance(df, pd.DataFrame)
|
|
assert df.empty # No records should match
|
|
|
|
def test_to_dataframe_empty_sequence(self, sequence):
|
|
"""Test when DataSequence has no records."""
|
|
sequence = DataSequence(records=[])
|
|
|
|
df = sequence.to_dataframe()
|
|
|
|
assert isinstance(df, pd.DataFrame)
|
|
assert df.empty # Should return an empty DataFrame
|
|
|
|
def test_to_dataframe_no_start_datetime(self, sequence):
|
|
"""Test when only end_datetime is given (all past records should be included)."""
|
|
record1 = self.create_test_record("2024-01-01T12:00:00Z", 10)
|
|
record2 = self.create_test_record("2024-01-01T13:00:00Z", 20)
|
|
record3 = self.create_test_record("2024-01-01T14:00:00Z", 30)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
sequence.append(record3)
|
|
|
|
end = to_datetime("2024-01-01T13:00:00Z") # Include only first record
|
|
|
|
df = sequence.to_dataframe(end_datetime=end)
|
|
|
|
assert isinstance(df, pd.DataFrame)
|
|
assert not df.empty
|
|
assert len(df) == 1
|
|
assert df.index[0] == pd.Timestamp("2024-01-01T12:00:00Z")
|
|
|
|
def test_to_dataframe_no_end_datetime(self, sequence):
|
|
"""Test when only start_datetime is given (all future records should be included)."""
|
|
record1 = self.create_test_record("2024-01-01T12:00:00Z", 10)
|
|
record2 = self.create_test_record("2024-01-01T13:00:00Z", 20)
|
|
record3 = self.create_test_record("2024-01-01T14:00:00Z", 30)
|
|
sequence.append(record1)
|
|
sequence.append(record2)
|
|
sequence.append(record3)
|
|
|
|
start = to_datetime("2024-01-01T13:00:00Z") # Include last two records
|
|
|
|
df = sequence.to_dataframe(start_datetime=start)
|
|
|
|
assert isinstance(df, pd.DataFrame)
|
|
assert not df.empty
|
|
assert len(df) == 2
|
|
assert df.index[0] == pd.Timestamp("2024-01-01T13:00:00Z")
|
|
|
|
|
|
class TestDataProvider:
|
|
# Fixtures and helper functions
|
|
@pytest.fixture
|
|
def provider(self):
|
|
"""Fixture to provide an instance of TestDataProvider for testing."""
|
|
DerivedDataProvider.provider_enabled = True
|
|
DerivedDataProvider.provider_updated = False
|
|
return DerivedDataProvider()
|
|
|
|
@pytest.fixture
|
|
def sample_start_datetime(self):
|
|
"""Fixture for a sample start datetime."""
|
|
return to_datetime(datetime(2024, 11, 1, 12, 0))
|
|
|
|
def create_test_record(self, date, value):
|
|
"""Helper function to create a test DataRecord."""
|
|
return DerivedRecord(date_time=date, data_value=value)
|
|
|
|
# Tests
|
|
|
|
def test_singleton_behavior(self, provider):
|
|
"""Test that DataProvider enforces singleton behavior."""
|
|
instance1 = provider
|
|
instance2 = DerivedDataProvider()
|
|
assert instance1 is instance2, (
|
|
"Singleton pattern is not enforced; instances are not the same."
|
|
)
|
|
|
|
def test_update_method_with_defaults(self, provider, sample_start_datetime, monkeypatch):
|
|
"""Test the `update` method with default parameters."""
|
|
ems_eos = get_ems()
|
|
|
|
ems_eos.set_start_datetime(sample_start_datetime)
|
|
provider.update_data()
|
|
|
|
assert provider.ems_start_datetime == sample_start_datetime
|
|
|
|
def test_update_method_force_enable(self, provider, monkeypatch):
|
|
"""Test that `update` executes when `force_enable` is True, even if `enabled` is False."""
|
|
# Override enabled to return False for this test
|
|
DerivedDataProvider.provider_enabled = False
|
|
DerivedDataProvider.provider_updated = False
|
|
provider.update_data(force_enable=True)
|
|
assert provider.enabled() is False, "Provider should be disabled, but enabled() is True."
|
|
assert DerivedDataProvider.provider_updated is True, (
|
|
"Provider should have been executed, but was not."
|
|
)
|
|
|
|
def test_delete_by_datetime(self, provider, sample_start_datetime):
|
|
"""Test `delete_by_datetime` method for removing records by datetime range."""
|
|
# Add records to the provider for deletion testing
|
|
provider.records = [
|
|
self.create_test_record(sample_start_datetime - to_duration("3 hours"), 1),
|
|
self.create_test_record(sample_start_datetime - to_duration("1 hour"), 2),
|
|
self.create_test_record(sample_start_datetime + to_duration("1 hour"), 3),
|
|
]
|
|
|
|
provider.delete_by_datetime(
|
|
start_datetime=sample_start_datetime - to_duration("2 hours"),
|
|
end_datetime=sample_start_datetime + to_duration("2 hours"),
|
|
)
|
|
assert len(provider.records) == 1, (
|
|
"Only one record should remain after deletion by datetime."
|
|
)
|
|
assert provider.records[0].date_time == sample_start_datetime - to_duration("3 hours"), (
|
|
"Unexpected record remains."
|
|
)
|
|
|
|
|
|
class TestDataImportProvider:
|
|
# Fixtures and helper functions
|
|
@pytest.fixture
|
|
def provider(self):
|
|
"""Fixture to provide an instance of DerivedDataImportProvider for testing."""
|
|
DerivedDataImportProvider.provider_enabled = True
|
|
DerivedDataImportProvider.provider_updated = False
|
|
return DerivedDataImportProvider()
|
|
|
|
@pytest.mark.parametrize(
|
|
"start_datetime, value_count, expected_mapping_count",
|
|
[
|
|
("2024-11-10 00:00:00", 24, 24), # No DST in Germany
|
|
("2024-08-10 00:00:00", 24, 24), # DST in Germany
|
|
("2024-03-31 00:00:00", 24, 23), # DST change in Germany (23 hours/ day)
|
|
("2024-10-27 00:00:00", 24, 25), # DST change in Germany (25 hours/ day)
|
|
],
|
|
)
|
|
def test_import_datetimes(self, provider, start_datetime, value_count, expected_mapping_count):
|
|
start_datetime = to_datetime(start_datetime, in_timezone="Europe/Berlin")
|
|
|
|
value_datetime_mapping = provider.import_datetimes(start_datetime, value_count)
|
|
|
|
assert len(value_datetime_mapping) == expected_mapping_count
|
|
|
|
@pytest.mark.parametrize(
|
|
"start_datetime, value_count, expected_mapping_count",
|
|
[
|
|
("2024-11-10 00:00:00", 24, 24), # No DST in Germany
|
|
("2024-08-10 00:00:00", 24, 24), # DST in Germany
|
|
("2024-03-31 00:00:00", 24, 23), # DST change in Germany (23 hours/ day)
|
|
("2024-10-27 00:00:00", 24, 25), # DST change in Germany (25 hours/ day)
|
|
],
|
|
)
|
|
def test_import_datetimes_utc(
|
|
self, set_other_timezone, provider, start_datetime, value_count, expected_mapping_count
|
|
):
|
|
original_tz = set_other_timezone("Etc/UTC")
|
|
start_datetime = to_datetime(start_datetime, in_timezone="Europe/Berlin")
|
|
assert start_datetime.timezone.name == "Europe/Berlin"
|
|
|
|
value_datetime_mapping = provider.import_datetimes(start_datetime, value_count)
|
|
|
|
assert len(value_datetime_mapping) == expected_mapping_count
|
|
|
|
|
|
class TestDataContainer:
|
|
# Fixture and helpers
|
|
@pytest.fixture
|
|
def container(self):
|
|
container = DerivedDataContainer()
|
|
return container
|
|
|
|
@pytest.fixture
|
|
def container_with_providers(self):
|
|
record1 = self.create_test_record(datetime(2023, 11, 5), 1)
|
|
record2 = self.create_test_record(datetime(2023, 11, 6), 2)
|
|
record3 = self.create_test_record(datetime(2023, 11, 7), 3)
|
|
provider = DerivedDataProvider()
|
|
provider.clear()
|
|
assert len(provider) == 0
|
|
provider.append(record1)
|
|
provider.append(record2)
|
|
provider.append(record3)
|
|
assert len(provider) == 3
|
|
container = DerivedDataContainer()
|
|
container.providers.clear()
|
|
assert len(container.providers) == 0
|
|
container.providers.append(provider)
|
|
assert len(container.providers) == 1
|
|
return container
|
|
|
|
def create_test_record(self, date, value):
|
|
"""Helper function to create a test DataRecord."""
|
|
return DerivedRecord(date_time=date, data_value=value)
|
|
|
|
def test_append_provider(self, container):
|
|
assert len(container.providers) == 0
|
|
container.providers.append(DerivedDataProvider())
|
|
assert len(container.providers) == 1
|
|
assert isinstance(container.providers[0], DerivedDataProvider)
|
|
|
|
@pytest.mark.skip(reason="type check not implemented")
|
|
def test_append_provider_invalid_type(self, container):
|
|
with pytest.raises(ValueError, match="must be an instance of DataProvider"):
|
|
container.providers.append("not_a_provider")
|
|
|
|
def test_getitem_existing_key(self, container_with_providers):
|
|
assert len(container_with_providers.providers) == 1
|
|
# check all keys are available (don't care for position)
|
|
for key in ["data_value", "date_time"]:
|
|
assert key in list(container_with_providers.keys())
|
|
series = container_with_providers["data_value"]
|
|
assert isinstance(series, pd.Series)
|
|
assert series.name == "data_value"
|
|
assert series.tolist() == [1.0, 2.0, 3.0]
|
|
|
|
def test_getitem_non_existing_key(self, container_with_providers):
|
|
with pytest.raises(KeyError, match="No data found for key 'non_existent_key'"):
|
|
container_with_providers["non_existent_key"]
|
|
|
|
def test_setitem_existing_key(self, container_with_providers):
|
|
new_series = container_with_providers["data_value"]
|
|
new_series[:] = [4, 5, 6]
|
|
container_with_providers["data_value"] = new_series
|
|
series = container_with_providers["data_value"]
|
|
assert series.name == "data_value"
|
|
assert series.tolist() == [4, 5, 6]
|
|
|
|
def test_setitem_invalid_value(self, container_with_providers):
|
|
with pytest.raises(ValueError, match="Value must be an instance of pd.Series"):
|
|
container_with_providers["test_key"] = "not_a_series"
|
|
|
|
def test_setitem_non_existing_key(self, container_with_providers):
|
|
new_series = pd.Series([4, 5, 6], name="non_existent_key")
|
|
with pytest.raises(KeyError, match="Key 'non_existent_key' not found"):
|
|
container_with_providers["non_existent_key"] = new_series
|
|
|
|
def test_delitem_existing_key(self, container_with_providers):
|
|
del container_with_providers["data_value"]
|
|
series = container_with_providers["data_value"]
|
|
assert series.name == "data_value"
|
|
assert series.tolist() == []
|
|
|
|
def test_delitem_non_existing_key(self, container_with_providers):
|
|
with pytest.raises(KeyError, match="Key 'non_existent_key' not found"):
|
|
del container_with_providers["non_existent_key"]
|
|
|
|
def test_len(self, container_with_providers):
|
|
assert len(container_with_providers) == 5
|
|
|
|
def test_repr(self, container_with_providers):
|
|
representation = repr(container_with_providers)
|
|
assert representation.startswith("DerivedDataContainer(")
|
|
assert "DerivedDataProvider" in representation
|
|
|
|
def test_to_json(self, container_with_providers):
|
|
json_str = container_with_providers.to_json()
|
|
container_other = DerivedDataContainer.from_json(json_str)
|
|
assert container_other == container_with_providers
|
|
|
|
def test_from_json(self, container_with_providers):
|
|
json_str = container_with_providers.to_json()
|
|
container = DerivedDataContainer.from_json(json_str)
|
|
assert isinstance(container, DerivedDataContainer)
|
|
assert len(container.providers) == 1
|
|
assert container.providers[0] == container_with_providers.providers[0]
|
|
|
|
def test_provider_by_id(self, container_with_providers):
|
|
provider = container_with_providers.provider_by_id("DerivedDataProvider")
|
|
assert isinstance(provider, DerivedDataProvider)
|