mirror of
https://github.com/Akkudoktor-EOS/EOS.git
synced 2026-02-23 17:36:19 +00:00
1115 lines
46 KiB
Python
1115 lines
46 KiB
Python
|
|
"""Compaction tests for DataSequence and DataContainer.
|
||
|
|
|
||
|
|
These tests sit on top of the full DataSequence / DataProvider / DataContainer
|
||
|
|
stack (dataabc.py) and exercise compaction end-to-end, including the
|
||
|
|
DataContainer delegation path.
|
||
|
|
|
||
|
|
A temporary SQLite database is configured for the entire test session via the
|
||
|
|
`configure_database` autouse fixture so that DataSequence instances — which
|
||
|
|
use the real Database singleton via DatabaseMixin — have a working backend.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from typing import List, Optional, Type
|
||
|
|
|
||
|
|
import numpy as np
|
||
|
|
import pytest
|
||
|
|
from pydantic import Field
|
||
|
|
|
||
|
|
from akkudoktoreos.core.dataabc import (
|
||
|
|
DataContainer,
|
||
|
|
DataProvider,
|
||
|
|
DataRecord,
|
||
|
|
DataSequence,
|
||
|
|
)
|
||
|
|
from akkudoktoreos.core.database import Database
|
||
|
|
from akkudoktoreos.core.databaseabc import DatabaseTimestamp
|
||
|
|
from akkudoktoreos.utils.datetimeutil import DateTime, to_datetime, to_duration
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Minimal concrete record / sequence / provider
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
class EnergyRecord(DataRecord):
|
||
|
|
"""Simple numeric record for compaction testing."""
|
||
|
|
|
||
|
|
power_w: Optional[float] = Field(
|
||
|
|
default=None, json_schema_extra={"description": "Power in Watts"}
|
||
|
|
)
|
||
|
|
price_eur: Optional[float] = Field(
|
||
|
|
default=None, json_schema_extra={"description": "Price in EUR/kWh"}
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
class EnergySequence(DataSequence):
|
||
|
|
records: List[EnergyRecord] = Field(
|
||
|
|
default_factory=list,
|
||
|
|
json_schema_extra={"description": "List of energy records"},
|
||
|
|
)
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def record_class(cls) -> Type[EnergyRecord]:
|
||
|
|
return EnergyRecord
|
||
|
|
|
||
|
|
def db_namespace(self) -> str:
|
||
|
|
return "energy_test"
|
||
|
|
|
||
|
|
|
||
|
|
class PriceSequence(DataSequence):
|
||
|
|
"""Price data — overrides tiers to keep 15-min resolution for 2 weeks."""
|
||
|
|
|
||
|
|
records: List[EnergyRecord] = Field(
|
||
|
|
default_factory=list,
|
||
|
|
json_schema_extra={"description": "List of price records"},
|
||
|
|
)
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def record_class(cls) -> Type[EnergyRecord]:
|
||
|
|
return EnergyRecord
|
||
|
|
|
||
|
|
def db_namespace(self) -> str:
|
||
|
|
return "price_test"
|
||
|
|
|
||
|
|
def db_compact_tiers(self):
|
||
|
|
# Price data: skip first tier (already at target resolution for 2 weeks)
|
||
|
|
return [(to_duration("14 days"), to_duration("1 hour"))]
|
||
|
|
|
||
|
|
|
||
|
|
class EnergyProvider(DataProvider):
|
||
|
|
records: List[EnergyRecord] = Field(
|
||
|
|
default_factory=list,
|
||
|
|
json_schema_extra={"description": "List of energy records"},
|
||
|
|
)
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def record_class(cls) -> Type[EnergyRecord]:
|
||
|
|
return EnergyRecord
|
||
|
|
|
||
|
|
def provider_id(self) -> str:
|
||
|
|
return "EnergyProvider"
|
||
|
|
|
||
|
|
def enabled(self) -> bool:
|
||
|
|
return True
|
||
|
|
|
||
|
|
def _update_data(self, force_update=False) -> None:
|
||
|
|
pass
|
||
|
|
|
||
|
|
def db_namespace(self) -> str:
|
||
|
|
return self.provider_id()
|
||
|
|
|
||
|
|
|
||
|
|
class PriceProvider(DataProvider):
|
||
|
|
records: List[EnergyRecord] = Field(
|
||
|
|
default_factory=list,
|
||
|
|
json_schema_extra={"description": "List of price records"},
|
||
|
|
)
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def record_class(cls) -> Type[EnergyRecord]:
|
||
|
|
return EnergyRecord
|
||
|
|
|
||
|
|
def provider_id(self) -> str:
|
||
|
|
return "PriceProvider"
|
||
|
|
|
||
|
|
def enabled(self) -> bool:
|
||
|
|
return True
|
||
|
|
|
||
|
|
def _update_data(self, force_update=False) -> None:
|
||
|
|
pass
|
||
|
|
|
||
|
|
def db_namespace(self) -> str:
|
||
|
|
return self.provider_id()
|
||
|
|
|
||
|
|
def db_compact_tiers(self):
|
||
|
|
return [(to_duration("14 days"), to_duration("1 hour"))]
|
||
|
|
|
||
|
|
|
||
|
|
class EnergyContainer(DataContainer):
|
||
|
|
providers: List[DataProvider] = Field(default_factory=list)
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Helpers
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
def _aligned_base(now: DateTime, interval_minutes: int = 15) -> DateTime:
|
||
|
|
"""Floor ``now`` to the nearest ``interval_minutes`` boundary.
|
||
|
|
|
||
|
|
All fixtures that feed _fill_sequence use this so that compacted timestamps
|
||
|
|
are predictably on clock-round boundaries and tests are deterministic.
|
||
|
|
"""
|
||
|
|
interval_sec = interval_minutes * 60
|
||
|
|
epoch = int(now.timestamp())
|
||
|
|
return now.subtract(seconds=epoch % interval_sec).set(microsecond=0)
|
||
|
|
|
||
|
|
|
||
|
|
def _fill_sequence(
|
||
|
|
seq: DataSequence,
|
||
|
|
base: DateTime,
|
||
|
|
count: int,
|
||
|
|
interval_minutes: int,
|
||
|
|
power_w: float = 1000.0,
|
||
|
|
price_eur: float = 0.25,
|
||
|
|
) -> None:
|
||
|
|
"""Insert ``count`` EnergyRecords spaced ``interval_minutes`` apart.
|
||
|
|
|
||
|
|
``base`` should be interval-aligned (use ``_aligned_base``) so that
|
||
|
|
compacted bucket timestamps are deterministic across all tests.
|
||
|
|
"""
|
||
|
|
for i in range(count):
|
||
|
|
dt = base.add(minutes=i * interval_minutes)
|
||
|
|
rec = EnergyRecord(date_time=dt, power_w=power_w + i, price_eur=price_eur)
|
||
|
|
seq.db_insert_record(rec)
|
||
|
|
seq.db_save_records()
|
||
|
|
|
||
|
|
|
||
|
|
def _reset_singletons() -> None:
|
||
|
|
"""Reset all singleton classes used in these tests.
|
||
|
|
|
||
|
|
DataProvider and DataSequence inherit SingletonMixin, meaning each subclass
|
||
|
|
only ever has one instance. Without resetting between tests, state from one
|
||
|
|
test (records, compaction metadata, monkey-patches) leaks into the next.
|
||
|
|
"""
|
||
|
|
for cls in (EnergySequence, PriceSequence, EnergyProvider, PriceProvider, EnergyContainer):
|
||
|
|
try:
|
||
|
|
cls.reset_instance()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture(autouse=True)
|
||
|
|
def configure_database(tmp_path):
|
||
|
|
"""Configure a fresh temporary SQLite database for every test.
|
||
|
|
|
||
|
|
DataSequence uses the real Database singleton via DatabaseMixin.
|
||
|
|
Without an open database backend, count_records() and all other DB
|
||
|
|
operations raise RuntimeError('Database not configured').
|
||
|
|
|
||
|
|
This fixture:
|
||
|
|
1. Resets the Database singleton so the previous test's state is gone.
|
||
|
|
2. Points the database config at a fresh per-test tmp_path directory.
|
||
|
|
3. Opens a SQLite backend.
|
||
|
|
4. Resets all sequence/provider/container singletons before and after.
|
||
|
|
5. Tears everything down cleanly after each test.
|
||
|
|
"""
|
||
|
|
_reset_singletons()
|
||
|
|
|
||
|
|
# Reset the Database singleton itself
|
||
|
|
Database.reset_instance()
|
||
|
|
|
||
|
|
# Patch config to use SQLite in tmp_path
|
||
|
|
db = Database()
|
||
|
|
db.config.database.provider = "SQLite"
|
||
|
|
db.config.general.data_folder_path = tmp_path
|
||
|
|
db.open()
|
||
|
|
|
||
|
|
yield
|
||
|
|
|
||
|
|
# Teardown
|
||
|
|
_reset_singletons()
|
||
|
|
try:
|
||
|
|
Database.reset_instance()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Fixtures
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def energy_seq():
|
||
|
|
"""Fresh EnergySequence with no data."""
|
||
|
|
return EnergySequence()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def dense_energy_seq():
|
||
|
|
"""EnergySequence with 4 weeks of 15-min records (~2688 records).
|
||
|
|
|
||
|
|
The base timestamp is floored to a 15-min boundary so compacted bucket
|
||
|
|
timestamps are deterministic and on clock-round marks.
|
||
|
|
"""
|
||
|
|
seq = EnergySequence()
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
||
|
|
_fill_sequence(seq, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
||
|
|
return seq, now
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def dense_price_seq():
|
||
|
|
"""PriceSequence with 4 weeks of 15-min records.
|
||
|
|
|
||
|
|
The base timestamp is floored to a 15-min boundary so compacted bucket
|
||
|
|
timestamps are deterministic and on clock-round marks.
|
||
|
|
"""
|
||
|
|
seq = PriceSequence()
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
||
|
|
_fill_sequence(seq, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
||
|
|
return seq, now
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def energy_container(energy_seq):
|
||
|
|
"""DataContainer with one EnergyProvider and one PriceProvider."""
|
||
|
|
ep = EnergyProvider()
|
||
|
|
pp = PriceProvider()
|
||
|
|
container = EnergyContainer(providers=[ep, pp])
|
||
|
|
return container, ep, pp
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# DataSequence — tier configuration
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
class TestDataSequenceCompactTiers:
|
||
|
|
|
||
|
|
def test_default_tiers_two_entries(self, energy_seq):
|
||
|
|
tiers = energy_seq.db_compact_tiers()
|
||
|
|
assert len(tiers) == 2
|
||
|
|
|
||
|
|
def test_default_first_tier_2h_15min(self, energy_seq):
|
||
|
|
tiers = energy_seq.db_compact_tiers()
|
||
|
|
age_sec = tiers[0][0].total_seconds()
|
||
|
|
interval_sec = tiers[0][1].total_seconds()
|
||
|
|
assert age_sec == 2 * 3600
|
||
|
|
assert interval_sec == 15 * 60
|
||
|
|
|
||
|
|
def test_default_second_tier_2weeks_1h(self, energy_seq):
|
||
|
|
tiers = energy_seq.db_compact_tiers()
|
||
|
|
age_sec = tiers[1][0].total_seconds()
|
||
|
|
interval_sec = tiers[1][1].total_seconds()
|
||
|
|
assert age_sec == 14 * 24 * 3600
|
||
|
|
assert interval_sec == 3600
|
||
|
|
|
||
|
|
def test_price_sequence_overrides_to_single_tier(self):
|
||
|
|
seq = PriceSequence()
|
||
|
|
tiers = seq.db_compact_tiers()
|
||
|
|
assert len(tiers) == 1
|
||
|
|
assert tiers[0][0].total_seconds() == 14 * 24 * 3600
|
||
|
|
assert tiers[0][1].total_seconds() == 3600
|
||
|
|
|
||
|
|
def test_empty_tiers_disables_compaction(self):
|
||
|
|
class NoCompact(EnergySequence):
|
||
|
|
def db_compact_tiers(self):
|
||
|
|
return []
|
||
|
|
|
||
|
|
seq = NoCompact()
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
||
|
|
_fill_sequence(seq, base, count=500, interval_minutes=15)
|
||
|
|
assert seq.db_compact() == 0
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# DataSequence — compaction behaviour
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
class TestDataSequenceCompact:
|
||
|
|
|
||
|
|
def test_empty_sequence_returns_zero(self, energy_seq):
|
||
|
|
assert energy_seq.db_compact() == 0
|
||
|
|
|
||
|
|
def test_dense_data_reduces_count(self, dense_energy_seq):
|
||
|
|
seq, _ = dense_energy_seq
|
||
|
|
before = seq.db_count_records()
|
||
|
|
deleted = seq.db_compact()
|
||
|
|
assert deleted > 0
|
||
|
|
assert seq.db_count_records() < before
|
||
|
|
|
||
|
|
def test_all_fields_compacted(self, dense_energy_seq):
|
||
|
|
"""Both power_w and price_eur should be present on compacted records."""
|
||
|
|
seq, now = dense_energy_seq
|
||
|
|
seq.db_compact()
|
||
|
|
|
||
|
|
cutoff = now.subtract(weeks=2)
|
||
|
|
old_records = [r for r in seq.records if r.date_time and r.date_time < cutoff]
|
||
|
|
|
||
|
|
assert len(old_records) > 0
|
||
|
|
for rec in old_records:
|
||
|
|
assert rec.power_w is not None, "power_w must survive compaction"
|
||
|
|
assert rec.price_eur is not None, "price_eur must survive compaction"
|
||
|
|
|
||
|
|
def test_recent_records_untouched(self, dense_energy_seq):
|
||
|
|
"""Records within 2 hours of now must not be compacted."""
|
||
|
|
seq, now = dense_energy_seq
|
||
|
|
cutoff = now.subtract(hours=2)
|
||
|
|
|
||
|
|
# Snapshot recent values
|
||
|
|
recent_before = {
|
||
|
|
DatabaseTimestamp.from_datetime(r.date_time): r.power_w
|
||
|
|
for r in seq.records
|
||
|
|
if r.date_time and r.date_time >= cutoff
|
||
|
|
}
|
||
|
|
|
||
|
|
seq.db_compact()
|
||
|
|
|
||
|
|
recent_after = {
|
||
|
|
DatabaseTimestamp.from_datetime(r.date_time): r.power_w
|
||
|
|
for r in seq.records
|
||
|
|
if r.date_time and r.date_time >= cutoff
|
||
|
|
}
|
||
|
|
|
||
|
|
assert recent_before == recent_after
|
||
|
|
|
||
|
|
def test_idempotent(self, dense_energy_seq):
|
||
|
|
seq, _ = dense_energy_seq
|
||
|
|
seq.db_compact()
|
||
|
|
after_first = seq.db_count_records()
|
||
|
|
|
||
|
|
seq.db_compact()
|
||
|
|
after_second = seq.db_count_records()
|
||
|
|
|
||
|
|
assert after_first == after_second
|
||
|
|
|
||
|
|
def test_price_sequence_preserves_15min_in_recent_2weeks(self, dense_price_seq):
|
||
|
|
"""PriceSequence keeps 15-min resolution for data younger than 2 weeks."""
|
||
|
|
seq, now = dense_price_seq
|
||
|
|
seq.db_compact()
|
||
|
|
|
||
|
|
two_weeks_ago = now.subtract(weeks=2)
|
||
|
|
recent_records = [
|
||
|
|
r for r in seq.records
|
||
|
|
if r.date_time and r.date_time >= two_weeks_ago
|
||
|
|
]
|
||
|
|
# Should still have ~4 records per hour = 15-min resolution
|
||
|
|
if len(recent_records) > 1:
|
||
|
|
diffs = []
|
||
|
|
sorted_recs = sorted(recent_records, key=lambda r: r.date_time)
|
||
|
|
for i in range(1, min(len(sorted_recs), 10)):
|
||
|
|
diff = (sorted_recs[i].date_time - sorted_recs[i - 1].date_time).total_seconds()
|
||
|
|
diffs.append(diff)
|
||
|
|
# Average spacing should be ~15 min, not 60 min
|
||
|
|
avg_spacing = sum(diffs) / len(diffs)
|
||
|
|
assert avg_spacing <= 20 * 60, (
|
||
|
|
f"Expected ~15min spacing in recent 2 weeks, got {avg_spacing/60:.1f} min"
|
||
|
|
)
|
||
|
|
|
||
|
|
def test_price_sequence_compacts_older_than_2weeks_to_1h(self, dense_price_seq):
|
||
|
|
"""PriceSequence compacts data older than 2 weeks to 1-hour resolution."""
|
||
|
|
seq, now = dense_price_seq
|
||
|
|
seq.db_compact()
|
||
|
|
|
||
|
|
two_weeks_ago = now.subtract(weeks=2)
|
||
|
|
old_records = sorted(
|
||
|
|
[r for r in seq.records if r.date_time and r.date_time < two_weeks_ago],
|
||
|
|
key=lambda r: r.date_time,
|
||
|
|
)
|
||
|
|
|
||
|
|
if len(old_records) > 1:
|
||
|
|
diffs = []
|
||
|
|
for i in range(1, min(len(old_records), 10)):
|
||
|
|
diff = (old_records[i].date_time - old_records[i - 1].date_time).total_seconds()
|
||
|
|
diffs.append(diff)
|
||
|
|
avg_spacing = sum(diffs) / len(diffs)
|
||
|
|
assert avg_spacing >= 50 * 60, (
|
||
|
|
f"Expected ~1h spacing for old price data, got {avg_spacing/60:.1f} min"
|
||
|
|
)
|
||
|
|
|
||
|
|
def test_compact_with_custom_tiers_argument(self, dense_energy_seq):
|
||
|
|
"""db_compact(compact_tiers=...) overrides the instance's tiers."""
|
||
|
|
seq, _ = dense_energy_seq
|
||
|
|
before = seq.db_count_records()
|
||
|
|
|
||
|
|
deleted = seq.db_compact(
|
||
|
|
compact_tiers=[(to_duration("1 day"), to_duration("1 hour"))]
|
||
|
|
)
|
||
|
|
|
||
|
|
assert deleted > 0
|
||
|
|
assert seq.db_count_records() < before
|
||
|
|
|
||
|
|
def test_compacted_timestamps_are_clock_aligned(self, dense_energy_seq):
|
||
|
|
"""All timestamps produced by compaction must sit on UTC clock boundaries.
|
||
|
|
|
||
|
|
_db_compact_tier floors its cutoff timestamps to interval boundaries, so
|
||
|
|
the boundary between tiers is not exactly ``now - age`` but the floored
|
||
|
|
version of it. We compute the same floored cutoffs here.
|
||
|
|
|
||
|
|
- Records older than floored 2-week cutoff → multiple of 3600 s
|
||
|
|
- Records in floored 2h..2week band → multiple of 900 s
|
||
|
|
- Records younger than floored 2h cutoff → unchanged
|
||
|
|
"""
|
||
|
|
seq, now = dense_energy_seq
|
||
|
|
seq.db_compact()
|
||
|
|
|
||
|
|
# _db_compact_tier floors new_cutoff from db_max, not from wall-clock now.
|
||
|
|
# Compute the same floored cutoffs that the implementation used.
|
||
|
|
_, db_max_ts = seq.db_timestamp_range()
|
||
|
|
# DatabaseTimestamp already imported at top of file
|
||
|
|
db_max_epoch = int(DatabaseTimestamp.to_datetime(db_max_ts).timestamp())
|
||
|
|
two_weeks_cutoff_epoch = ((db_max_epoch - 14*24*3600) // 3600) * 3600
|
||
|
|
two_hours_cutoff_epoch = ((db_max_epoch - 2*3600) // 900) * 900
|
||
|
|
|
||
|
|
for rec in seq.records:
|
||
|
|
if rec.date_time is None:
|
||
|
|
continue
|
||
|
|
epoch = int(rec.date_time.timestamp())
|
||
|
|
if epoch < two_weeks_cutoff_epoch:
|
||
|
|
assert epoch % 3600 == 0, (
|
||
|
|
f"Old record {rec.date_time} not on hour boundary"
|
||
|
|
)
|
||
|
|
elif epoch < two_hours_cutoff_epoch:
|
||
|
|
assert epoch % 900 == 0, (
|
||
|
|
f"Mid record {rec.date_time} not on 15-min boundary"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# DataSequence — data integrity after compaction
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
class TestDataSequenceCompactIntegrity:
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _tier_cutoff(now, age_seconds: int, interval_seconds: int):
|
||
|
|
"""Compute the floored compaction cutoff the same way _db_compact_tier does.
|
||
|
|
|
||
|
|
_db_compact_tier floors new_cutoff_dt to the interval boundary, so
|
||
|
|
``newest - age_threshold`` rounded down. Tests must use the same value
|
||
|
|
to correctly classify which tier a record falls into.
|
||
|
|
"""
|
||
|
|
import math
|
||
|
|
raw_epoch = int(now.subtract(seconds=age_seconds).timestamp())
|
||
|
|
floored_epoch = (raw_epoch // interval_seconds) * interval_seconds
|
||
|
|
return now.__class__.fromtimestamp(floored_epoch, tz=now.tzinfo)
|
||
|
|
|
||
|
|
def test_constant_power_preserved(self):
|
||
|
|
"""Mean resampling of a constant must equal the constant."""
|
||
|
|
seq = EnergySequence()
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
# Use aligned base so bucket boundaries are deterministic
|
||
|
|
base = _aligned_base(now.subtract(hours=6), interval_minutes=15)
|
||
|
|
|
||
|
|
for i in range(6 * 60): # 1-min records for 6 hours
|
||
|
|
dt = base.add(minutes=i)
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=dt, power_w=500.0, price_eur=0.30))
|
||
|
|
seq.db_save_records()
|
||
|
|
|
||
|
|
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
|
|
||
|
|
cutoff = now.subtract(hours=2)
|
||
|
|
for rec in seq.records:
|
||
|
|
if rec.date_time and rec.date_time < cutoff:
|
||
|
|
assert rec.power_w == pytest.approx(500.0, abs=1e-3)
|
||
|
|
assert rec.price_eur == pytest.approx(0.30, abs=1e-6)
|
||
|
|
|
||
|
|
def test_record_count_monotonically_decreases(self):
|
||
|
|
"""Each successive tier run should never increase record count."""
|
||
|
|
seq = EnergySequence()
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
||
|
|
_fill_sequence(seq, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
||
|
|
|
||
|
|
counts = [seq.db_count_records()]
|
||
|
|
for age, interval in reversed(seq.db_compact_tiers()):
|
||
|
|
seq._db_compact_tier(age, interval)
|
||
|
|
counts.append(seq.db_count_records())
|
||
|
|
|
||
|
|
for i in range(1, len(counts)):
|
||
|
|
assert counts[i] <= counts[i - 1], (
|
||
|
|
f"Record count increased from {counts[i-1]} to {counts[i]} at tier {i}"
|
||
|
|
)
|
||
|
|
|
||
|
|
def test_no_duplicate_timestamps_after_compaction(self, dense_energy_seq):
|
||
|
|
"""Compaction must not create duplicate timestamps."""
|
||
|
|
seq, _ = dense_energy_seq
|
||
|
|
seq.db_compact()
|
||
|
|
|
||
|
|
timestamps = [
|
||
|
|
DatabaseTimestamp.from_datetime(r.date_time)
|
||
|
|
for r in seq.records
|
||
|
|
if r.date_time is not None
|
||
|
|
]
|
||
|
|
assert len(timestamps) == len(set(timestamps)), "Duplicate timestamps after compaction"
|
||
|
|
|
||
|
|
def test_timestamps_remain_sorted(self, dense_energy_seq):
|
||
|
|
"""Records must remain in ascending order after compaction."""
|
||
|
|
seq, _ = dense_energy_seq
|
||
|
|
seq.db_compact()
|
||
|
|
|
||
|
|
dts = [r.date_time for r in seq.records if r.date_time is not None]
|
||
|
|
assert dts == sorted(dts)
|
||
|
|
|
||
|
|
def test_compacted_old_timestamps_on_1h_boundaries(self, dense_energy_seq):
|
||
|
|
"""Records older than the floored 2-week cutoff must be on whole-hour UTC boundaries.
|
||
|
|
|
||
|
|
_db_compact_tier floors new_cutoff to the interval boundary, so we must
|
||
|
|
use the same floored cutoff to decide which records were compacted by the
|
||
|
|
1-hour tier. Records between the floored and raw cutoff may still be at
|
||
|
|
15-min resolution from the previous tier.
|
||
|
|
"""
|
||
|
|
seq, now = dense_energy_seq
|
||
|
|
seq.db_compact()
|
||
|
|
|
||
|
|
# _db_compact_tier floors new_cutoff from db_max (the newest record),
|
||
|
|
# not from wall-clock now. Derive the same floored cutoff here.
|
||
|
|
_, db_max_ts = seq.db_timestamp_range()
|
||
|
|
# DatabaseTimestamp already imported at top of file
|
||
|
|
db_max_epoch = int(DatabaseTimestamp.to_datetime(db_max_ts).timestamp())
|
||
|
|
two_weeks_cutoff_epoch = ((db_max_epoch - 14*24*3600) // 3600) * 3600
|
||
|
|
two_weeks_cutoff_dt = DateTime.fromtimestamp(two_weeks_cutoff_epoch, tz="UTC")
|
||
|
|
|
||
|
|
old_records = [r for r in seq.records if r.date_time and r.date_time < two_weeks_cutoff_dt]
|
||
|
|
|
||
|
|
assert len(old_records) > 0, "Expected compacted records older than 2-week floored cutoff"
|
||
|
|
for rec in old_records:
|
||
|
|
epoch = int(rec.date_time.timestamp())
|
||
|
|
assert epoch % 3600 == 0, (
|
||
|
|
f"Old record at {rec.date_time} is not on an hour boundary"
|
||
|
|
)
|
||
|
|
|
||
|
|
def test_compacted_mid_timestamps_on_15min_boundaries(self):
|
||
|
|
"""Records compacted by the 15-min tier must land on 15-min UTC boundaries.
|
||
|
|
|
||
|
|
We run _db_compact_tier directly with the 2h/15min tier on a sequence
|
||
|
|
of 1-min records spanning 6 hours, then verify every compacted record
|
||
|
|
sits on a :00/:15/:30/:45 UTC mark.
|
||
|
|
|
||
|
|
The implementation computes new_cutoff as floor(newest - age, 900).
|
||
|
|
We replicate that exact calculation to identify which records were in
|
||
|
|
the compaction window.
|
||
|
|
"""
|
||
|
|
seq = EnergySequence()
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
base = _aligned_base(now.subtract(hours=6), interval_minutes=15)
|
||
|
|
|
||
|
|
# 1-min records for 6 hours; newest record is at base + 359 min
|
||
|
|
for i in range(6 * 60):
|
||
|
|
dt = base.add(minutes=i)
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=dt, power_w=500.0, price_eur=0.30))
|
||
|
|
seq.db_save_records()
|
||
|
|
|
||
|
|
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
|
|
||
|
|
# Replicate the implementation's floored cutoff exactly:
|
||
|
|
# newest_dt = last inserted record = base + 359min
|
||
|
|
# new_cutoff = floor(newest_dt - 2h, 900)
|
||
|
|
newest_dt = base.add(minutes=6 * 60 - 1)
|
||
|
|
raw_cutoff_epoch = int(newest_dt.subtract(hours=2).timestamp())
|
||
|
|
window_end_epoch = (raw_cutoff_epoch // 900) * 900
|
||
|
|
|
||
|
|
# Records before window_end_epoch must all be on 15-min boundaries
|
||
|
|
compacted = [
|
||
|
|
r for r in seq.records
|
||
|
|
if r.date_time is not None
|
||
|
|
and int(r.date_time.timestamp()) < window_end_epoch
|
||
|
|
]
|
||
|
|
|
||
|
|
assert len(compacted) > 0, (
|
||
|
|
f"Expected compacted records before window_end={window_end_epoch}; "
|
||
|
|
f"got records at {[int(r.date_time.timestamp()) for r in seq.records if r.date_time]}"
|
||
|
|
)
|
||
|
|
for rec in compacted:
|
||
|
|
assert rec.date_time is not None
|
||
|
|
epoch = int(rec.date_time.timestamp())
|
||
|
|
assert epoch % 900 == 0, (
|
||
|
|
f"15-min-tier record at {rec.date_time} (epoch={epoch}) "
|
||
|
|
f"is not on a 15-min boundary (epoch % 900 = {epoch % 900})"
|
||
|
|
)
|
||
|
|
|
||
|
|
def test_no_compacted_timestamps_between_boundaries(self, dense_energy_seq):
|
||
|
|
"""After compaction no record timestamp must fall between expected bucket boundaries.
|
||
|
|
|
||
|
|
Records older than the floored 2-week cutoff (processed by the 1h tier)
|
||
|
|
must be on hour marks. Records in the 15-min band must be on 15-min marks.
|
||
|
|
"""
|
||
|
|
seq, now = dense_energy_seq
|
||
|
|
seq.db_compact()
|
||
|
|
|
||
|
|
# Derive floored cutoffs from db_max — same reference as the implementation.
|
||
|
|
_, db_max_ts = seq.db_timestamp_range()
|
||
|
|
# DatabaseTimestamp already imported at top of file
|
||
|
|
db_max_epoch = int(DatabaseTimestamp.to_datetime(db_max_ts).timestamp())
|
||
|
|
two_weeks_cutoff_epoch = ((db_max_epoch - 14*24*3600) // 3600) * 3600
|
||
|
|
two_hours_cutoff_epoch = ((db_max_epoch - 2*3600) // 900) * 900
|
||
|
|
|
||
|
|
for rec in seq.records:
|
||
|
|
if rec.date_time is None:
|
||
|
|
continue
|
||
|
|
epoch = int(rec.date_time.timestamp())
|
||
|
|
if epoch < two_weeks_cutoff_epoch:
|
||
|
|
assert epoch % 3600 == 0, (
|
||
|
|
f"Record at {rec.date_time} is not hour-aligned in 1h-tier region"
|
||
|
|
)
|
||
|
|
elif epoch < two_hours_cutoff_epoch:
|
||
|
|
assert epoch % (15 * 60) == 0, (
|
||
|
|
f"Record at {rec.date_time} is not 15min-aligned in 15min-tier region"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# DataContainer — delegation
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
class TestDataContainerCompact:
|
||
|
|
|
||
|
|
def test_compact_delegates_to_all_providers(self, energy_container):
|
||
|
|
container, ep, pp = energy_container
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
|
||
|
|
# Fill both providers with 4 weeks of 15-min data
|
||
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
||
|
|
_fill_sequence(ep, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
||
|
|
_fill_sequence(pp, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
||
|
|
|
||
|
|
ep_before = ep.db_count_records()
|
||
|
|
pp_before = pp.db_count_records()
|
||
|
|
|
||
|
|
container.db_compact()
|
||
|
|
|
||
|
|
assert ep.db_count_records() < ep_before, "EnergyProvider records should be compacted"
|
||
|
|
assert pp.db_count_records() < pp_before, "PriceProvider records should be compacted"
|
||
|
|
|
||
|
|
def test_compact_empty_container_no_error(self):
|
||
|
|
container = EnergyContainer(providers=[])
|
||
|
|
container.db_compact() # must not raise
|
||
|
|
|
||
|
|
def test_compact_provider_tiers_respected(self, energy_container):
|
||
|
|
"""PriceProvider with single 2-week tier must not compact recent 15-min data."""
|
||
|
|
container, ep, pp = energy_container
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
|
||
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
||
|
|
_fill_sequence(pp, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
||
|
|
|
||
|
|
container.db_compact()
|
||
|
|
|
||
|
|
# Price data in last 2 weeks should still be at 15-min resolution
|
||
|
|
two_weeks_ago = now.subtract(weeks=2)
|
||
|
|
recent = sorted(
|
||
|
|
[r for r in pp.records if r.date_time and r.date_time >= two_weeks_ago],
|
||
|
|
key=lambda r: r.date_time,
|
||
|
|
)
|
||
|
|
if len(recent) > 1:
|
||
|
|
diff = (recent[1].date_time - recent[0].date_time).total_seconds()
|
||
|
|
assert diff <= 20 * 60, (
|
||
|
|
f"PriceProvider recent data should be ~15min, got {diff/60:.1f} min"
|
||
|
|
)
|
||
|
|
|
||
|
|
def test_compact_raises_on_provider_failure(self):
|
||
|
|
"""A provider that raises during compaction must bubble up as RuntimeError.
|
||
|
|
|
||
|
|
Monkey-patching is blocked by Pydantic v2's __setattr__ validation, so
|
||
|
|
we use a subclass that overrides db_compact instead.
|
||
|
|
"""
|
||
|
|
class BrokenProvider(EnergyProvider):
|
||
|
|
def db_compact(self, *args, **kwargs):
|
||
|
|
raise ValueError("simulated failure")
|
||
|
|
|
||
|
|
def provider_id(self) -> str:
|
||
|
|
# Distinct id so it doesn't collide with EnergyProvider singleton
|
||
|
|
return "BrokenProvider"
|
||
|
|
|
||
|
|
def db_namespace(self) -> str:
|
||
|
|
return self.provider_id()
|
||
|
|
|
||
|
|
bp = BrokenProvider()
|
||
|
|
container = EnergyContainer(providers=[bp])
|
||
|
|
|
||
|
|
with pytest.raises(RuntimeError, match="fails on db_compact"):
|
||
|
|
container.db_compact()
|
||
|
|
|
||
|
|
def test_compact_idempotent_on_container(self, energy_container):
|
||
|
|
container, ep, pp = energy_container
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
||
|
|
_fill_sequence(ep, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
||
|
|
_fill_sequence(pp, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
||
|
|
|
||
|
|
container.db_compact()
|
||
|
|
ep_after_first = ep.db_count_records()
|
||
|
|
pp_after_first = pp.db_count_records()
|
||
|
|
|
||
|
|
container.db_compact()
|
||
|
|
assert ep.db_count_records() == ep_after_first
|
||
|
|
assert pp.db_count_records() == pp_after_first
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Sparse guard — DataSequence level
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
#
|
||
|
|
# The sparse guard distinguishes three cases:
|
||
|
|
#
|
||
|
|
# 1. Sparse + already aligned → skip entirely (deleted=0, count unchanged)
|
||
|
|
# 2. Sparse + misaligned → snap timestamps in place (deleted>0, but
|
||
|
|
# count stays the same or decreases if two
|
||
|
|
# records collide on the same bucket)
|
||
|
|
# 3. Sparse collision → two records snap to the same bucket; values
|
||
|
|
# are merged key-by-key; count decreases by 1
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
|
||
|
|
class TestDataSequenceSparseGuard:
|
||
|
|
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
# Case 1: sparse + already aligned → pure skip
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
|
||
|
|
def test_sparse_aligned_data_not_modified(self):
|
||
|
|
"""Sparse records that already sit on interval boundaries must not be touched.
|
||
|
|
|
||
|
|
deleted must be 0 and record count must be unchanged.
|
||
|
|
"""
|
||
|
|
seq = EnergySequence()
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
base = now.subtract(weeks=4)
|
||
|
|
|
||
|
|
# Insert exactly 3 records, each snapped to a whole hour (aligned)
|
||
|
|
for offset_days in [0, 14, 27]:
|
||
|
|
raw = base.add(days=offset_days)
|
||
|
|
# Floor to nearest hour boundary so timestamp is already aligned
|
||
|
|
aligned = raw.set(minute=0, second=0, microsecond=0)
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=aligned, power_w=100.0))
|
||
|
|
seq.db_save_records()
|
||
|
|
|
||
|
|
before = seq.db_count_records()
|
||
|
|
deleted = seq.db_compact()
|
||
|
|
|
||
|
|
assert deleted == 0, "Aligned sparse records must not be deleted"
|
||
|
|
assert seq.db_count_records() == before, "Record count must not change"
|
||
|
|
|
||
|
|
def test_sparse_aligned_data_values_untouched(self):
|
||
|
|
"""Values of aligned sparse records must be preserved exactly."""
|
||
|
|
seq = EnergySequence()
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
base = now.subtract(weeks=4).set(minute=0, second=0, microsecond=0)
|
||
|
|
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base, power_w=42.0, price_eur=0.99))
|
||
|
|
seq.db_save_records()
|
||
|
|
|
||
|
|
seq.db_compact()
|
||
|
|
|
||
|
|
remaining = [r for r in seq.records if r.date_time == base]
|
||
|
|
assert len(remaining) == 1
|
||
|
|
assert remaining[0].power_w == pytest.approx(42.0)
|
||
|
|
assert remaining[0].price_eur == pytest.approx(0.99)
|
||
|
|
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
# Case 2: sparse + misaligned → timestamp snapping
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _make_snapping_seq(now, offsets_minutes, interval_minutes=10, age_minutes=30):
|
||
|
|
"""Build a sequence guaranteed to enter the sparse-snapping path.
|
||
|
|
|
||
|
|
Key insight: _db_compact_tier measures age_threshold from db_max (the
|
||
|
|
newest record in the database), not from wall-clock now. We therefore
|
||
|
|
insert a "newest anchor" record 1 second before now so that
|
||
|
|
db_max ≈ now, making cutoff = db_max - age_threshold ≈ now - age_minutes.
|
||
|
|
|
||
|
|
The test records are placed at now - (age_minutes + margin) + offset,
|
||
|
|
which puts them clearly before the cutoff and inside the compaction window.
|
||
|
|
|
||
|
|
resampled_count = age_minutes / interval_minutes (the window width in
|
||
|
|
buckets). We require len(offsets_minutes) > resampled_count so the
|
||
|
|
snapping path is entered rather than the pure-skip path.
|
||
|
|
|
||
|
|
Returns (seq, age_threshold, target_interval, record_datetimes).
|
||
|
|
"""
|
||
|
|
age_td = to_duration(f"{age_minutes} minutes")
|
||
|
|
interval_td = to_duration(f"{interval_minutes} minutes")
|
||
|
|
interval_sec = interval_minutes * 60
|
||
|
|
|
||
|
|
# Margin must be larger than the maximum offset so that ALL test records
|
||
|
|
# land before window_end = floor(now - age_minutes, interval_sec).
|
||
|
|
# We need: base + max(offsets) < now - age_minutes
|
||
|
|
# => now - (age_minutes + margin) + max(offsets) < now - age_minutes
|
||
|
|
# => max(offsets) < margin
|
||
|
|
# Use margin = max(offsets_minutes) + 2*interval_minutes + 1 (generous).
|
||
|
|
max_offset = max(offsets_minutes) if offsets_minutes else 0
|
||
|
|
margin = max_offset + 2 * interval_minutes + 1
|
||
|
|
|
||
|
|
# Floor base to interval boundary so snapping arithmetic is exact
|
||
|
|
raw_base = now.subtract(minutes=age_minutes + margin).set(second=0, microsecond=0)
|
||
|
|
base_epoch = int(raw_base.timestamp())
|
||
|
|
base = raw_base.subtract(seconds=base_epoch % interval_sec)
|
||
|
|
|
||
|
|
seq = EnergySequence()
|
||
|
|
dts = []
|
||
|
|
for off in offsets_minutes:
|
||
|
|
dt = base.add(minutes=off)
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=dt, power_w=float(off * 10)))
|
||
|
|
dts.append(dt)
|
||
|
|
|
||
|
|
# Newest anchor: makes db_max ≈ now so cutoff = now - age_threshold
|
||
|
|
anchor = now.subtract(seconds=1)
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=anchor, power_w=0.0))
|
||
|
|
seq.db_save_records()
|
||
|
|
return seq, age_td, interval_td, dts
|
||
|
|
|
||
|
|
def test_sparse_misaligned_records_are_snapped(self):
|
||
|
|
"""Sparse misaligned records must be moved to the nearest boundary.
|
||
|
|
|
||
|
|
Uses a tight window (30 min age, 10 min interval → 3 resampled buckets)
|
||
|
|
with 4 misaligned records so existing_count(4) > resampled_count(3) and
|
||
|
|
the snapping path is entered deterministically.
|
||
|
|
"""
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
# 4 records at :03, :08, :13, :18 — all misaligned for a 10-min interval
|
||
|
|
seq, age_td, interval_td, _ = self._make_snapping_seq(
|
||
|
|
now, offsets_minutes=[3, 8, 13, 18]
|
||
|
|
)
|
||
|
|
# before includes the anchor record which is NOT in the compaction window
|
||
|
|
# and therefore NOT deleted. Only the 4 test records are in-window.
|
||
|
|
n_test_records = len([3, 8, 13, 18]) # offsets_minutes
|
||
|
|
deleted = seq._db_compact_tier(age_td, interval_td)
|
||
|
|
after = seq.db_count_records()
|
||
|
|
|
||
|
|
assert deleted == n_test_records, (
|
||
|
|
f"All {n_test_records} in-window records must be deleted (whole-window delete); "
|
||
|
|
f"got deleted={deleted}"
|
||
|
|
)
|
||
|
|
# Net count after: anchor(1) + snapped buckets re-inserted.
|
||
|
|
# Implementation uses FLOOR division: (epoch // interval_sec) * interval_sec
|
||
|
|
# offsets [3,8,13,18] with interval=10min map to buckets:
|
||
|
|
# 3 // 10 = 0 → :00
|
||
|
|
# 8 // 10 = 0 → :00 (collision with :03)
|
||
|
|
# 13 // 10 = 1 → :10
|
||
|
|
# 18 // 10 = 1 → :10 (collision with :13)
|
||
|
|
# → 2 unique buckets
|
||
|
|
interval_minutes = 10
|
||
|
|
n_snapped = len({(off // interval_minutes) * interval_minutes for off in [3, 8, 13, 18]})
|
||
|
|
assert after == 1 + n_snapped, (
|
||
|
|
f"Expected 1 anchor + {n_snapped} snapped buckets = {1 + n_snapped} records; "
|
||
|
|
f"got {after}"
|
||
|
|
)
|
||
|
|
|
||
|
|
def test_sparse_misaligned_timestamps_become_aligned(self):
|
||
|
|
"""After snapping, in-window timestamps must be on the target interval boundary.
|
||
|
|
|
||
|
|
The anchor record lives outside the compaction window (it is younger than
|
||
|
|
age_threshold) and is intentionally misaligned — it must NOT be checked.
|
||
|
|
"""
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
interval_minutes = 10
|
||
|
|
age_minutes = 30
|
||
|
|
seq, age_td, interval_td, dts = self._make_snapping_seq(
|
||
|
|
now, offsets_minutes=[3, 8, 13, 18], interval_minutes=interval_minutes,
|
||
|
|
age_minutes=age_minutes,
|
||
|
|
)
|
||
|
|
seq._db_compact_tier(age_td, interval_td)
|
||
|
|
|
||
|
|
# Compute window_end the same way _db_compact_tier does
|
||
|
|
# (anchor is db_max; raw_cutoff = anchor - age_threshold ≈ now - 30min)
|
||
|
|
anchor_epoch = int(now.subtract(seconds=1).timestamp())
|
||
|
|
raw_cutoff_epoch = anchor_epoch - age_minutes * 60
|
||
|
|
window_end_epoch = (raw_cutoff_epoch // (interval_minutes * 60)) * (interval_minutes * 60)
|
||
|
|
|
||
|
|
interval_sec = interval_minutes * 60
|
||
|
|
for rec in seq.records:
|
||
|
|
if rec.date_time is None:
|
||
|
|
continue
|
||
|
|
epoch = int(rec.date_time.timestamp())
|
||
|
|
if epoch >= window_end_epoch:
|
||
|
|
continue # anchor or other post-cutoff record — not compacted
|
||
|
|
assert epoch % interval_sec == 0, (
|
||
|
|
f"Snapped timestamp {rec.date_time} (epoch={epoch}) is not on a "
|
||
|
|
f"{interval_minutes}-min boundary (epoch % {interval_sec} = {epoch % interval_sec})"
|
||
|
|
)
|
||
|
|
|
||
|
|
def test_sparse_misaligned_values_preserved_after_snap(self):
|
||
|
|
"""Snapping must not alter the field values of sparse records."""
|
||
|
|
seq = EnergySequence()
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
# Single misaligned record, old enough for both tiers
|
||
|
|
dt = now.subtract(weeks=4).set(minute=7, second=0, microsecond=0)
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=dt, power_w=777.0, price_eur=0.55))
|
||
|
|
seq.db_save_records()
|
||
|
|
|
||
|
|
seq.db_compact()
|
||
|
|
|
||
|
|
# Exactly one record must remain and its values must be unchanged
|
||
|
|
assert len(seq.records) == 1
|
||
|
|
assert seq.records[0].power_w == pytest.approx(777.0)
|
||
|
|
assert seq.records[0].price_eur == pytest.approx(0.55)
|
||
|
|
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
# Case 3: two sparse records collide on the same snapped bucket
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
|
||
|
|
def test_sparse_collision_merges_records(self):
|
||
|
|
"""Two sparse records that snap to the same bucket must be merged.
|
||
|
|
|
||
|
|
Records at :03 and :04 both round to :00 with a 10-min interval.
|
||
|
|
With 4 test records and resampled_count=3, the snapping path is entered.
|
||
|
|
A newest-anchor record at now-1s pushes db_max ≈ now so the compaction
|
||
|
|
cutoff lands at now-30min, which is after all test records.
|
||
|
|
"""
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
age_td = to_duration("30 minutes")
|
||
|
|
interval_td = to_duration("10 minutes")
|
||
|
|
interval_sec = 600
|
||
|
|
# Place test records 41+ min ago so they are before cutoff = now - 30min
|
||
|
|
# base must be far enough back that all records (+17min max) land before
|
||
|
|
# window_end = floor(now - 30min, 600). Use now - 52min.
|
||
|
|
raw_base = now.subtract(minutes=52).set(second=0, microsecond=0)
|
||
|
|
base = raw_base.subtract(seconds=int(raw_base.timestamp()) % interval_sec)
|
||
|
|
|
||
|
|
seq = EnergySequence()
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=3),
|
||
|
|
power_w=100.0, price_eur=None))
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=4),
|
||
|
|
power_w=None, price_eur=0.25))
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=13), power_w=10.0))
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=17), power_w=20.0))
|
||
|
|
# Anchor: makes db_max ≈ now → cutoff = now - 30min (after all test records)
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=now.subtract(seconds=1), power_w=0.0))
|
||
|
|
seq.db_save_records()
|
||
|
|
|
||
|
|
# existing_count in window = 4, resampled_count = 3 → snapping path
|
||
|
|
seq._db_compact_tier(age_td, interval_td)
|
||
|
|
|
||
|
|
snapped_epoch = int(base.timestamp())
|
||
|
|
snapped = [
|
||
|
|
r for r in seq.records
|
||
|
|
if r.date_time is not None and int(r.date_time.timestamp()) == snapped_epoch
|
||
|
|
]
|
||
|
|
assert len(snapped) == 1, "The :03 and :04 records must merge into one :00 bucket"
|
||
|
|
assert snapped[0].power_w == pytest.approx(100.0), "power_w from :03 must survive"
|
||
|
|
assert snapped[0].price_eur == pytest.approx(0.25), "price_eur from :04 must survive"
|
||
|
|
|
||
|
|
def test_sparse_collision_keeps_first_value_for_shared_key(self):
|
||
|
|
"""When two sparse records floor to the same bucket, the earlier value wins.
|
||
|
|
|
||
|
|
Two records at :03 (power_w=111) and :04 (power_w=222) both floor to :00
|
||
|
|
with a 10-min interval (floor division: 3//10=0, 4//10=0).
|
||
|
|
existing_count(2) <= resampled_count for the ~22-min window, so the sparse
|
||
|
|
snapping path is taken rather than full resampling. The merged record at
|
||
|
|
:00 must carry power_w=111 because the chronologically earlier record wins.
|
||
|
|
"""
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
interval_sec = 600
|
||
|
|
# Place both records 52 min ago so they are before window_end ≈ now - 30min.
|
||
|
|
# Only 2 test records → existing_count(2) <= resampled_count → sparse path.
|
||
|
|
raw_base = now.subtract(minutes=52).set(second=0, microsecond=0)
|
||
|
|
base = raw_base.subtract(seconds=int(raw_base.timestamp()) % interval_sec)
|
||
|
|
seq = EnergySequence()
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=3), power_w=111.0))
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=4), power_w=222.0))
|
||
|
|
# Anchor at now-1s: makes db_max ≈ now so cutoff = now - 30min
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=now.subtract(seconds=1), power_w=0.0))
|
||
|
|
seq.db_save_records()
|
||
|
|
seq._db_compact_tier(to_duration("30 minutes"), to_duration("10 minutes"))
|
||
|
|
snapped_epoch = int(base.timestamp())
|
||
|
|
snapped = [
|
||
|
|
r for r in seq.records
|
||
|
|
if r.date_time is not None and int(r.date_time.timestamp()) == snapped_epoch
|
||
|
|
]
|
||
|
|
assert len(snapped) == 1, ":03 and :04 must floor-snap into one :00 record"
|
||
|
|
assert snapped[0].power_w == pytest.approx(111.0), "Earlier record's value must win"
|
||
|
|
|
||
|
|
def test_sparse_collision_with_existing_aligned_record(self):
|
||
|
|
"""A misaligned record that snaps onto an already-aligned record must merge
|
||
|
|
into it without raising ValueError. The aligned record's existing values win.
|
||
|
|
|
||
|
|
:00 (aligned, power_w=50, price_eur=None) and :03 (misaligned,
|
||
|
|
power_w=None, price_eur=0.30) both map to :00. Result: power_w=50
|
||
|
|
(aligned wins) and price_eur=0.30 (filled from :03).
|
||
|
|
"""
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
interval_sec = 600
|
||
|
|
# base must be far enough back that all records (+17min max) land before
|
||
|
|
# window_end = floor(now - 30min, 600). Use now - 52min.
|
||
|
|
raw_base = now.subtract(minutes=52).set(second=0, microsecond=0)
|
||
|
|
base = raw_base.subtract(seconds=int(raw_base.timestamp()) % interval_sec)
|
||
|
|
|
||
|
|
seq = EnergySequence()
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base,
|
||
|
|
power_w=50.0, price_eur=None))
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=3),
|
||
|
|
power_w=None, price_eur=0.30))
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=13), power_w=10.0))
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=17), power_w=20.0))
|
||
|
|
# Anchor: db_max ≈ now → cutoff = now - 30min, after all test records
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=now.subtract(seconds=1), power_w=0.0))
|
||
|
|
seq.db_save_records()
|
||
|
|
|
||
|
|
# Must not raise ValueError
|
||
|
|
seq._db_compact_tier(to_duration("30 minutes"), to_duration("10 minutes"))
|
||
|
|
|
||
|
|
snapped_epoch = int(base.timestamp())
|
||
|
|
snapped = [
|
||
|
|
r for r in seq.records
|
||
|
|
if r.date_time is not None and int(r.date_time.timestamp()) == snapped_epoch
|
||
|
|
]
|
||
|
|
assert len(snapped) == 1, ":00 and :03 must merge into one :00 record"
|
||
|
|
rec = snapped[0]
|
||
|
|
assert rec.power_w == pytest.approx(50.0), "Aligned record's power_w must win"
|
||
|
|
assert rec.price_eur == pytest.approx(0.30), ":03 record's price_eur must fill in"
|
||
|
|
assert rec.date_time is not None
|
||
|
|
assert int(rec.date_time.timestamp()) % interval_sec == 0
|
||
|
|
|
||
|
|
def test_sparse_no_duplicate_timestamps_after_collision(self):
|
||
|
|
"""After collision merging, no duplicate timestamps must remain.
|
||
|
|
|
||
|
|
Three records at :02, :03, :04 all round to :00 with a 10-min interval.
|
||
|
|
Together with a record at :13 this gives existing_count(4) >
|
||
|
|
resampled_count(3) so the snapping path is entered.
|
||
|
|
"""
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
interval_sec = 600
|
||
|
|
# base must be far enough back that all records (+17min max) land before
|
||
|
|
# window_end = floor(now - 30min, 600). Use now - 52min.
|
||
|
|
raw_base = now.subtract(minutes=52).set(second=0, microsecond=0)
|
||
|
|
base = raw_base.subtract(seconds=int(raw_base.timestamp()) % interval_sec)
|
||
|
|
|
||
|
|
seq = EnergySequence()
|
||
|
|
for offset_min in [2, 3, 4]: # all snap to :00
|
||
|
|
seq.db_insert_record(EnergyRecord(
|
||
|
|
date_time=base.add(minutes=offset_min), power_w=float(offset_min)
|
||
|
|
))
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=13), power_w=10.0))
|
||
|
|
# Anchor: db_max ≈ now → cutoff = now - 30min, after all test records
|
||
|
|
seq.db_insert_record(EnergyRecord(date_time=now.subtract(seconds=1), power_w=0.0))
|
||
|
|
seq.db_save_records()
|
||
|
|
|
||
|
|
seq._db_compact_tier(to_duration("30 minutes"), to_duration("10 minutes"))
|
||
|
|
|
||
|
|
timestamps = [
|
||
|
|
int(r.date_time.timestamp())
|
||
|
|
for r in seq.records
|
||
|
|
if r.date_time is not None
|
||
|
|
]
|
||
|
|
assert len(timestamps) == len(set(timestamps)), "Duplicate timestamps after collision merge"
|
||
|
|
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
# Existing tier-skip tests (unchanged semantics)
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
|
||
|
|
def test_hourly_data_skips_1h_tier(self):
|
||
|
|
"""Data already at 1-hour resolution and aligned must not be re-compacted."""
|
||
|
|
seq = EnergySequence()
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
# Use an hour-aligned base so records are on clean boundaries
|
||
|
|
base = now.subtract(weeks=3).set(minute=0, second=0, microsecond=0)
|
||
|
|
|
||
|
|
_fill_sequence(seq, base, count=3 * 7 * 24, interval_minutes=60)
|
||
|
|
|
||
|
|
before = seq.db_count_records()
|
||
|
|
deleted = seq._db_compact_tier(to_duration("14 days"), to_duration("1 hour"))
|
||
|
|
|
||
|
|
assert deleted == 0
|
||
|
|
assert seq.db_count_records() == before
|
||
|
|
|
||
|
|
def test_15min_data_younger_than_2weeks_skips_1h_tier(self):
|
||
|
|
"""15-min data between 2h and 2weeks old must NOT be compacted by the 1h tier."""
|
||
|
|
seq = EnergySequence()
|
||
|
|
now = to_datetime().in_timezone("UTC")
|
||
|
|
base = now.subtract(weeks=1).set(minute=0, second=0, microsecond=0)
|
||
|
|
_fill_sequence(seq, base, count=7 * 24 * 4, interval_minutes=15)
|
||
|
|
|
||
|
|
before = seq.db_count_records()
|
||
|
|
deleted = seq._db_compact_tier(to_duration("14 days"), to_duration("1 hour"))
|
||
|
|
|
||
|
|
assert deleted == 0
|
||
|
|
assert seq.db_count_records() == before
|