mirror of
https://github.com/Akkudoktor-EOS/EOS.git
synced 2026-02-23 17:36:19 +00:00
The database supports backend selection, compression, incremental data load, automatic data saving to storage, automatic vaccum and compaction. Make SQLite3 and LMDB database backends available. Update tests for new interface conventions regarding data sequences, data containers, data providers. This includes the measurements provider and the prediction providers. Add database documentation. The fix includes several bug fixes that are not directly related to the database implementation but are necessary to keep EOS running properly and to test and document the changes. * fix: config eos test setup Make the config_eos fixture generate a new instance of the config_eos singleton. Use correct env names to setup data folder path. * fix: startup with no config Make cache and measurements complain about missing data path configuration but do not bail out. * fix: soc data preparation and usage for genetic optimization. Search for soc measurments 48 hours around the optimization start time. Only clamp soc to maximum in battery device simulation. * fix: dashboard bailout on zero value solution display Do not use zero values to calculate the chart values adjustment for display. * fix: openapi generation script Make the script also replace data_folder_path and data_output_path to hide real (test) environment pathes. * feat: add make repeated task function make_repeated_task allows to wrap a function to be repeated cyclically. * chore: removed index based data sequence access Index based data sequence access does not make sense as the sequence can be backed by the database. The sequence is now purely time series data. * chore: refactor eos startup to avoid module import startup Avoid module import initialisation expecially of the EOS configuration. Config mutation, singleton initialization, logging setup, argparse parsing, background task definitions depending on config and environment-dependent behavior is now done at function startup. * chore: introduce retention manager A single long-running background task that owns the scheduling of all periodic server-maintenance jobs (cache cleanup, DB autosave, …) * chore: canonicalize timezone name for UTC Timezone names that are semantically identical to UTC are canonicalized to UTC. * chore: extend config file migration for default value handling Extend the config file migration handling values None or nonexisting values that will invoke a default value generation in the new config file. Also adapt test to handle this situation. * chore: extend datetime util test cases * chore: make version test check for untracked files Check for files that are not tracked by git. Version calculation will be wrong if these files will not be commited. * chore: bump pandas to 3.0.0 Pandas 3.0 now performs inference on the appropriate resolution (a.k.a. unit) for the output dtype which may become datetime64[us] (before it was ns). Also numeric dtype detection is now more strict which needs a different detection for numerics. * chore: bump pydantic-settings to 2.12.0 pydantic-settings 2.12.0 under pytest creates a different behaviour. The tests were adapted and a workaround was introduced. Also ConfigEOS was adapted to allow for fine grain initialization control to be able to switch off certain settings such as file settings during test. * chore: remove sci learn kit from dependencies The sci learn kit is not strictly necessary as long as we have scipy. * chore: add documentation mode guarding for sphinx autosummary Sphinx autosummary excecutes functions. Prevent exceptions in case of pure doc mode. * chore: adapt docker-build CI workflow to stricter GitHub handling Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
1115 lines
46 KiB
Python
1115 lines
46 KiB
Python
"""Compaction tests for DataSequence and DataContainer.
|
|
|
|
These tests sit on top of the full DataSequence / DataProvider / DataContainer
|
|
stack (dataabc.py) and exercise compaction end-to-end, including the
|
|
DataContainer delegation path.
|
|
|
|
A temporary SQLite database is configured for the entire test session via the
|
|
`configure_database` autouse fixture so that DataSequence instances — which
|
|
use the real Database singleton via DatabaseMixin — have a working backend.
|
|
"""
|
|
|
|
from typing import List, Optional, Type
|
|
|
|
import numpy as np
|
|
import pytest
|
|
from pydantic import Field
|
|
|
|
from akkudoktoreos.core.dataabc import (
|
|
DataContainer,
|
|
DataProvider,
|
|
DataRecord,
|
|
DataSequence,
|
|
)
|
|
from akkudoktoreos.core.database import Database
|
|
from akkudoktoreos.core.databaseabc import DatabaseTimestamp
|
|
from akkudoktoreos.utils.datetimeutil import DateTime, to_datetime, to_duration
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Minimal concrete record / sequence / provider
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class EnergyRecord(DataRecord):
|
|
"""Simple numeric record for compaction testing."""
|
|
|
|
power_w: Optional[float] = Field(
|
|
default=None, json_schema_extra={"description": "Power in Watts"}
|
|
)
|
|
price_eur: Optional[float] = Field(
|
|
default=None, json_schema_extra={"description": "Price in EUR/kWh"}
|
|
)
|
|
|
|
|
|
class EnergySequence(DataSequence):
|
|
records: List[EnergyRecord] = Field(
|
|
default_factory=list,
|
|
json_schema_extra={"description": "List of energy records"},
|
|
)
|
|
|
|
@classmethod
|
|
def record_class(cls) -> Type[EnergyRecord]:
|
|
return EnergyRecord
|
|
|
|
def db_namespace(self) -> str:
|
|
return "energy_test"
|
|
|
|
|
|
class PriceSequence(DataSequence):
|
|
"""Price data — overrides tiers to keep 15-min resolution for 2 weeks."""
|
|
|
|
records: List[EnergyRecord] = Field(
|
|
default_factory=list,
|
|
json_schema_extra={"description": "List of price records"},
|
|
)
|
|
|
|
@classmethod
|
|
def record_class(cls) -> Type[EnergyRecord]:
|
|
return EnergyRecord
|
|
|
|
def db_namespace(self) -> str:
|
|
return "price_test"
|
|
|
|
def db_compact_tiers(self):
|
|
# Price data: skip first tier (already at target resolution for 2 weeks)
|
|
return [(to_duration("14 days"), to_duration("1 hour"))]
|
|
|
|
|
|
class EnergyProvider(DataProvider):
|
|
records: List[EnergyRecord] = Field(
|
|
default_factory=list,
|
|
json_schema_extra={"description": "List of energy records"},
|
|
)
|
|
|
|
@classmethod
|
|
def record_class(cls) -> Type[EnergyRecord]:
|
|
return EnergyRecord
|
|
|
|
def provider_id(self) -> str:
|
|
return "EnergyProvider"
|
|
|
|
def enabled(self) -> bool:
|
|
return True
|
|
|
|
def _update_data(self, force_update=False) -> None:
|
|
pass
|
|
|
|
def db_namespace(self) -> str:
|
|
return self.provider_id()
|
|
|
|
|
|
class PriceProvider(DataProvider):
|
|
records: List[EnergyRecord] = Field(
|
|
default_factory=list,
|
|
json_schema_extra={"description": "List of price records"},
|
|
)
|
|
|
|
@classmethod
|
|
def record_class(cls) -> Type[EnergyRecord]:
|
|
return EnergyRecord
|
|
|
|
def provider_id(self) -> str:
|
|
return "PriceProvider"
|
|
|
|
def enabled(self) -> bool:
|
|
return True
|
|
|
|
def _update_data(self, force_update=False) -> None:
|
|
pass
|
|
|
|
def db_namespace(self) -> str:
|
|
return self.provider_id()
|
|
|
|
def db_compact_tiers(self):
|
|
return [(to_duration("14 days"), to_duration("1 hour"))]
|
|
|
|
|
|
class EnergyContainer(DataContainer):
|
|
providers: List[DataProvider] = Field(default_factory=list)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _aligned_base(now: DateTime, interval_minutes: int = 15) -> DateTime:
|
|
"""Floor ``now`` to the nearest ``interval_minutes`` boundary.
|
|
|
|
All fixtures that feed _fill_sequence use this so that compacted timestamps
|
|
are predictably on clock-round boundaries and tests are deterministic.
|
|
"""
|
|
interval_sec = interval_minutes * 60
|
|
epoch = int(now.timestamp())
|
|
return now.subtract(seconds=epoch % interval_sec).set(microsecond=0)
|
|
|
|
|
|
def _fill_sequence(
|
|
seq: DataSequence,
|
|
base: DateTime,
|
|
count: int,
|
|
interval_minutes: int,
|
|
power_w: float = 1000.0,
|
|
price_eur: float = 0.25,
|
|
) -> None:
|
|
"""Insert ``count`` EnergyRecords spaced ``interval_minutes`` apart.
|
|
|
|
``base`` should be interval-aligned (use ``_aligned_base``) so that
|
|
compacted bucket timestamps are deterministic across all tests.
|
|
"""
|
|
for i in range(count):
|
|
dt = base.add(minutes=i * interval_minutes)
|
|
rec = EnergyRecord(date_time=dt, power_w=power_w + i, price_eur=price_eur)
|
|
seq.db_insert_record(rec)
|
|
seq.db_save_records()
|
|
|
|
|
|
def _reset_singletons() -> None:
|
|
"""Reset all singleton classes used in these tests.
|
|
|
|
DataProvider and DataSequence inherit SingletonMixin, meaning each subclass
|
|
only ever has one instance. Without resetting between tests, state from one
|
|
test (records, compaction metadata, monkey-patches) leaks into the next.
|
|
"""
|
|
for cls in (EnergySequence, PriceSequence, EnergyProvider, PriceProvider, EnergyContainer):
|
|
try:
|
|
cls.reset_instance()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def configure_database(tmp_path):
|
|
"""Configure a fresh temporary SQLite database for every test.
|
|
|
|
DataSequence uses the real Database singleton via DatabaseMixin.
|
|
Without an open database backend, count_records() and all other DB
|
|
operations raise RuntimeError('Database not configured').
|
|
|
|
This fixture:
|
|
1. Resets the Database singleton so the previous test's state is gone.
|
|
2. Points the database config at a fresh per-test tmp_path directory.
|
|
3. Opens a SQLite backend.
|
|
4. Resets all sequence/provider/container singletons before and after.
|
|
5. Tears everything down cleanly after each test.
|
|
"""
|
|
_reset_singletons()
|
|
|
|
# Reset the Database singleton itself
|
|
Database.reset_instance()
|
|
|
|
# Patch config to use SQLite in tmp_path
|
|
db = Database()
|
|
db.config.database.provider = "SQLite"
|
|
db.config.general.data_folder_path = tmp_path
|
|
db.open()
|
|
|
|
yield
|
|
|
|
# Teardown
|
|
_reset_singletons()
|
|
try:
|
|
Database.reset_instance()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def energy_seq():
|
|
"""Fresh EnergySequence with no data."""
|
|
return EnergySequence()
|
|
|
|
|
|
@pytest.fixture
|
|
def dense_energy_seq():
|
|
"""EnergySequence with 4 weeks of 15-min records (~2688 records).
|
|
|
|
The base timestamp is floored to a 15-min boundary so compacted bucket
|
|
timestamps are deterministic and on clock-round marks.
|
|
"""
|
|
seq = EnergySequence()
|
|
now = to_datetime().in_timezone("UTC")
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
|
_fill_sequence(seq, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
|
return seq, now
|
|
|
|
|
|
@pytest.fixture
|
|
def dense_price_seq():
|
|
"""PriceSequence with 4 weeks of 15-min records.
|
|
|
|
The base timestamp is floored to a 15-min boundary so compacted bucket
|
|
timestamps are deterministic and on clock-round marks.
|
|
"""
|
|
seq = PriceSequence()
|
|
now = to_datetime().in_timezone("UTC")
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
|
_fill_sequence(seq, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
|
return seq, now
|
|
|
|
|
|
@pytest.fixture
|
|
def energy_container(energy_seq):
|
|
"""DataContainer with one EnergyProvider and one PriceProvider."""
|
|
ep = EnergyProvider()
|
|
pp = PriceProvider()
|
|
container = EnergyContainer(providers=[ep, pp])
|
|
return container, ep, pp
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DataSequence — tier configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDataSequenceCompactTiers:
|
|
|
|
def test_default_tiers_two_entries(self, energy_seq):
|
|
tiers = energy_seq.db_compact_tiers()
|
|
assert len(tiers) == 2
|
|
|
|
def test_default_first_tier_2h_15min(self, energy_seq):
|
|
tiers = energy_seq.db_compact_tiers()
|
|
age_sec = tiers[0][0].total_seconds()
|
|
interval_sec = tiers[0][1].total_seconds()
|
|
assert age_sec == 2 * 3600
|
|
assert interval_sec == 15 * 60
|
|
|
|
def test_default_second_tier_2weeks_1h(self, energy_seq):
|
|
tiers = energy_seq.db_compact_tiers()
|
|
age_sec = tiers[1][0].total_seconds()
|
|
interval_sec = tiers[1][1].total_seconds()
|
|
assert age_sec == 14 * 24 * 3600
|
|
assert interval_sec == 3600
|
|
|
|
def test_price_sequence_overrides_to_single_tier(self):
|
|
seq = PriceSequence()
|
|
tiers = seq.db_compact_tiers()
|
|
assert len(tiers) == 1
|
|
assert tiers[0][0].total_seconds() == 14 * 24 * 3600
|
|
assert tiers[0][1].total_seconds() == 3600
|
|
|
|
def test_empty_tiers_disables_compaction(self):
|
|
class NoCompact(EnergySequence):
|
|
def db_compact_tiers(self):
|
|
return []
|
|
|
|
seq = NoCompact()
|
|
now = to_datetime().in_timezone("UTC")
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
|
_fill_sequence(seq, base, count=500, interval_minutes=15)
|
|
assert seq.db_compact() == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DataSequence — compaction behaviour
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDataSequenceCompact:
|
|
|
|
def test_empty_sequence_returns_zero(self, energy_seq):
|
|
assert energy_seq.db_compact() == 0
|
|
|
|
def test_dense_data_reduces_count(self, dense_energy_seq):
|
|
seq, _ = dense_energy_seq
|
|
before = seq.db_count_records()
|
|
deleted = seq.db_compact()
|
|
assert deleted > 0
|
|
assert seq.db_count_records() < before
|
|
|
|
def test_all_fields_compacted(self, dense_energy_seq):
|
|
"""Both power_w and price_eur should be present on compacted records."""
|
|
seq, now = dense_energy_seq
|
|
seq.db_compact()
|
|
|
|
cutoff = now.subtract(weeks=2)
|
|
old_records = [r for r in seq.records if r.date_time and r.date_time < cutoff]
|
|
|
|
assert len(old_records) > 0
|
|
for rec in old_records:
|
|
assert rec.power_w is not None, "power_w must survive compaction"
|
|
assert rec.price_eur is not None, "price_eur must survive compaction"
|
|
|
|
def test_recent_records_untouched(self, dense_energy_seq):
|
|
"""Records within 2 hours of now must not be compacted."""
|
|
seq, now = dense_energy_seq
|
|
cutoff = now.subtract(hours=2)
|
|
|
|
# Snapshot recent values
|
|
recent_before = {
|
|
DatabaseTimestamp.from_datetime(r.date_time): r.power_w
|
|
for r in seq.records
|
|
if r.date_time and r.date_time >= cutoff
|
|
}
|
|
|
|
seq.db_compact()
|
|
|
|
recent_after = {
|
|
DatabaseTimestamp.from_datetime(r.date_time): r.power_w
|
|
for r in seq.records
|
|
if r.date_time and r.date_time >= cutoff
|
|
}
|
|
|
|
assert recent_before == recent_after
|
|
|
|
def test_idempotent(self, dense_energy_seq):
|
|
seq, _ = dense_energy_seq
|
|
seq.db_compact()
|
|
after_first = seq.db_count_records()
|
|
|
|
seq.db_compact()
|
|
after_second = seq.db_count_records()
|
|
|
|
assert after_first == after_second
|
|
|
|
def test_price_sequence_preserves_15min_in_recent_2weeks(self, dense_price_seq):
|
|
"""PriceSequence keeps 15-min resolution for data younger than 2 weeks."""
|
|
seq, now = dense_price_seq
|
|
seq.db_compact()
|
|
|
|
two_weeks_ago = now.subtract(weeks=2)
|
|
recent_records = [
|
|
r for r in seq.records
|
|
if r.date_time and r.date_time >= two_weeks_ago
|
|
]
|
|
# Should still have ~4 records per hour = 15-min resolution
|
|
if len(recent_records) > 1:
|
|
diffs = []
|
|
sorted_recs = sorted(recent_records, key=lambda r: r.date_time)
|
|
for i in range(1, min(len(sorted_recs), 10)):
|
|
diff = (sorted_recs[i].date_time - sorted_recs[i - 1].date_time).total_seconds()
|
|
diffs.append(diff)
|
|
# Average spacing should be ~15 min, not 60 min
|
|
avg_spacing = sum(diffs) / len(diffs)
|
|
assert avg_spacing <= 20 * 60, (
|
|
f"Expected ~15min spacing in recent 2 weeks, got {avg_spacing/60:.1f} min"
|
|
)
|
|
|
|
def test_price_sequence_compacts_older_than_2weeks_to_1h(self, dense_price_seq):
|
|
"""PriceSequence compacts data older than 2 weeks to 1-hour resolution."""
|
|
seq, now = dense_price_seq
|
|
seq.db_compact()
|
|
|
|
two_weeks_ago = now.subtract(weeks=2)
|
|
old_records = sorted(
|
|
[r for r in seq.records if r.date_time and r.date_time < two_weeks_ago],
|
|
key=lambda r: r.date_time,
|
|
)
|
|
|
|
if len(old_records) > 1:
|
|
diffs = []
|
|
for i in range(1, min(len(old_records), 10)):
|
|
diff = (old_records[i].date_time - old_records[i - 1].date_time).total_seconds()
|
|
diffs.append(diff)
|
|
avg_spacing = sum(diffs) / len(diffs)
|
|
assert avg_spacing >= 50 * 60, (
|
|
f"Expected ~1h spacing for old price data, got {avg_spacing/60:.1f} min"
|
|
)
|
|
|
|
def test_compact_with_custom_tiers_argument(self, dense_energy_seq):
|
|
"""db_compact(compact_tiers=...) overrides the instance's tiers."""
|
|
seq, _ = dense_energy_seq
|
|
before = seq.db_count_records()
|
|
|
|
deleted = seq.db_compact(
|
|
compact_tiers=[(to_duration("1 day"), to_duration("1 hour"))]
|
|
)
|
|
|
|
assert deleted > 0
|
|
assert seq.db_count_records() < before
|
|
|
|
def test_compacted_timestamps_are_clock_aligned(self, dense_energy_seq):
|
|
"""All timestamps produced by compaction must sit on UTC clock boundaries.
|
|
|
|
_db_compact_tier floors its cutoff timestamps to interval boundaries, so
|
|
the boundary between tiers is not exactly ``now - age`` but the floored
|
|
version of it. We compute the same floored cutoffs here.
|
|
|
|
- Records older than floored 2-week cutoff → multiple of 3600 s
|
|
- Records in floored 2h..2week band → multiple of 900 s
|
|
- Records younger than floored 2h cutoff → unchanged
|
|
"""
|
|
seq, now = dense_energy_seq
|
|
seq.db_compact()
|
|
|
|
# _db_compact_tier floors new_cutoff from db_max, not from wall-clock now.
|
|
# Compute the same floored cutoffs that the implementation used.
|
|
_, db_max_ts = seq.db_timestamp_range()
|
|
# DatabaseTimestamp already imported at top of file
|
|
db_max_epoch = int(DatabaseTimestamp.to_datetime(db_max_ts).timestamp())
|
|
two_weeks_cutoff_epoch = ((db_max_epoch - 14*24*3600) // 3600) * 3600
|
|
two_hours_cutoff_epoch = ((db_max_epoch - 2*3600) // 900) * 900
|
|
|
|
for rec in seq.records:
|
|
if rec.date_time is None:
|
|
continue
|
|
epoch = int(rec.date_time.timestamp())
|
|
if epoch < two_weeks_cutoff_epoch:
|
|
assert epoch % 3600 == 0, (
|
|
f"Old record {rec.date_time} not on hour boundary"
|
|
)
|
|
elif epoch < two_hours_cutoff_epoch:
|
|
assert epoch % 900 == 0, (
|
|
f"Mid record {rec.date_time} not on 15-min boundary"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DataSequence — data integrity after compaction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDataSequenceCompactIntegrity:
|
|
|
|
@staticmethod
|
|
def _tier_cutoff(now, age_seconds: int, interval_seconds: int):
|
|
"""Compute the floored compaction cutoff the same way _db_compact_tier does.
|
|
|
|
_db_compact_tier floors new_cutoff_dt to the interval boundary, so
|
|
``newest - age_threshold`` rounded down. Tests must use the same value
|
|
to correctly classify which tier a record falls into.
|
|
"""
|
|
import math
|
|
raw_epoch = int(now.subtract(seconds=age_seconds).timestamp())
|
|
floored_epoch = (raw_epoch // interval_seconds) * interval_seconds
|
|
return now.__class__.fromtimestamp(floored_epoch, tz=now.tzinfo)
|
|
|
|
def test_constant_power_preserved(self):
|
|
"""Mean resampling of a constant must equal the constant."""
|
|
seq = EnergySequence()
|
|
now = to_datetime().in_timezone("UTC")
|
|
# Use aligned base so bucket boundaries are deterministic
|
|
base = _aligned_base(now.subtract(hours=6), interval_minutes=15)
|
|
|
|
for i in range(6 * 60): # 1-min records for 6 hours
|
|
dt = base.add(minutes=i)
|
|
seq.db_insert_record(EnergyRecord(date_time=dt, power_w=500.0, price_eur=0.30))
|
|
seq.db_save_records()
|
|
|
|
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
|
|
|
cutoff = now.subtract(hours=2)
|
|
for rec in seq.records:
|
|
if rec.date_time and rec.date_time < cutoff:
|
|
assert rec.power_w == pytest.approx(500.0, abs=1e-3)
|
|
assert rec.price_eur == pytest.approx(0.30, abs=1e-6)
|
|
|
|
def test_record_count_monotonically_decreases(self):
|
|
"""Each successive tier run should never increase record count."""
|
|
seq = EnergySequence()
|
|
now = to_datetime().in_timezone("UTC")
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
|
_fill_sequence(seq, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
|
|
|
counts = [seq.db_count_records()]
|
|
for age, interval in reversed(seq.db_compact_tiers()):
|
|
seq._db_compact_tier(age, interval)
|
|
counts.append(seq.db_count_records())
|
|
|
|
for i in range(1, len(counts)):
|
|
assert counts[i] <= counts[i - 1], (
|
|
f"Record count increased from {counts[i-1]} to {counts[i]} at tier {i}"
|
|
)
|
|
|
|
def test_no_duplicate_timestamps_after_compaction(self, dense_energy_seq):
|
|
"""Compaction must not create duplicate timestamps."""
|
|
seq, _ = dense_energy_seq
|
|
seq.db_compact()
|
|
|
|
timestamps = [
|
|
DatabaseTimestamp.from_datetime(r.date_time)
|
|
for r in seq.records
|
|
if r.date_time is not None
|
|
]
|
|
assert len(timestamps) == len(set(timestamps)), "Duplicate timestamps after compaction"
|
|
|
|
def test_timestamps_remain_sorted(self, dense_energy_seq):
|
|
"""Records must remain in ascending order after compaction."""
|
|
seq, _ = dense_energy_seq
|
|
seq.db_compact()
|
|
|
|
dts = [r.date_time for r in seq.records if r.date_time is not None]
|
|
assert dts == sorted(dts)
|
|
|
|
def test_compacted_old_timestamps_on_1h_boundaries(self, dense_energy_seq):
|
|
"""Records older than the floored 2-week cutoff must be on whole-hour UTC boundaries.
|
|
|
|
_db_compact_tier floors new_cutoff to the interval boundary, so we must
|
|
use the same floored cutoff to decide which records were compacted by the
|
|
1-hour tier. Records between the floored and raw cutoff may still be at
|
|
15-min resolution from the previous tier.
|
|
"""
|
|
seq, now = dense_energy_seq
|
|
seq.db_compact()
|
|
|
|
# _db_compact_tier floors new_cutoff from db_max (the newest record),
|
|
# not from wall-clock now. Derive the same floored cutoff here.
|
|
_, db_max_ts = seq.db_timestamp_range()
|
|
# DatabaseTimestamp already imported at top of file
|
|
db_max_epoch = int(DatabaseTimestamp.to_datetime(db_max_ts).timestamp())
|
|
two_weeks_cutoff_epoch = ((db_max_epoch - 14*24*3600) // 3600) * 3600
|
|
two_weeks_cutoff_dt = DateTime.fromtimestamp(two_weeks_cutoff_epoch, tz="UTC")
|
|
|
|
old_records = [r for r in seq.records if r.date_time and r.date_time < two_weeks_cutoff_dt]
|
|
|
|
assert len(old_records) > 0, "Expected compacted records older than 2-week floored cutoff"
|
|
for rec in old_records:
|
|
epoch = int(rec.date_time.timestamp())
|
|
assert epoch % 3600 == 0, (
|
|
f"Old record at {rec.date_time} is not on an hour boundary"
|
|
)
|
|
|
|
def test_compacted_mid_timestamps_on_15min_boundaries(self):
|
|
"""Records compacted by the 15-min tier must land on 15-min UTC boundaries.
|
|
|
|
We run _db_compact_tier directly with the 2h/15min tier on a sequence
|
|
of 1-min records spanning 6 hours, then verify every compacted record
|
|
sits on a :00/:15/:30/:45 UTC mark.
|
|
|
|
The implementation computes new_cutoff as floor(newest - age, 900).
|
|
We replicate that exact calculation to identify which records were in
|
|
the compaction window.
|
|
"""
|
|
seq = EnergySequence()
|
|
now = to_datetime().in_timezone("UTC")
|
|
base = _aligned_base(now.subtract(hours=6), interval_minutes=15)
|
|
|
|
# 1-min records for 6 hours; newest record is at base + 359 min
|
|
for i in range(6 * 60):
|
|
dt = base.add(minutes=i)
|
|
seq.db_insert_record(EnergyRecord(date_time=dt, power_w=500.0, price_eur=0.30))
|
|
seq.db_save_records()
|
|
|
|
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
|
|
|
# Replicate the implementation's floored cutoff exactly:
|
|
# newest_dt = last inserted record = base + 359min
|
|
# new_cutoff = floor(newest_dt - 2h, 900)
|
|
newest_dt = base.add(minutes=6 * 60 - 1)
|
|
raw_cutoff_epoch = int(newest_dt.subtract(hours=2).timestamp())
|
|
window_end_epoch = (raw_cutoff_epoch // 900) * 900
|
|
|
|
# Records before window_end_epoch must all be on 15-min boundaries
|
|
compacted = [
|
|
r for r in seq.records
|
|
if r.date_time is not None
|
|
and int(r.date_time.timestamp()) < window_end_epoch
|
|
]
|
|
|
|
assert len(compacted) > 0, (
|
|
f"Expected compacted records before window_end={window_end_epoch}; "
|
|
f"got records at {[int(r.date_time.timestamp()) for r in seq.records if r.date_time]}"
|
|
)
|
|
for rec in compacted:
|
|
assert rec.date_time is not None
|
|
epoch = int(rec.date_time.timestamp())
|
|
assert epoch % 900 == 0, (
|
|
f"15-min-tier record at {rec.date_time} (epoch={epoch}) "
|
|
f"is not on a 15-min boundary (epoch % 900 = {epoch % 900})"
|
|
)
|
|
|
|
def test_no_compacted_timestamps_between_boundaries(self, dense_energy_seq):
|
|
"""After compaction no record timestamp must fall between expected bucket boundaries.
|
|
|
|
Records older than the floored 2-week cutoff (processed by the 1h tier)
|
|
must be on hour marks. Records in the 15-min band must be on 15-min marks.
|
|
"""
|
|
seq, now = dense_energy_seq
|
|
seq.db_compact()
|
|
|
|
# Derive floored cutoffs from db_max — same reference as the implementation.
|
|
_, db_max_ts = seq.db_timestamp_range()
|
|
# DatabaseTimestamp already imported at top of file
|
|
db_max_epoch = int(DatabaseTimestamp.to_datetime(db_max_ts).timestamp())
|
|
two_weeks_cutoff_epoch = ((db_max_epoch - 14*24*3600) // 3600) * 3600
|
|
two_hours_cutoff_epoch = ((db_max_epoch - 2*3600) // 900) * 900
|
|
|
|
for rec in seq.records:
|
|
if rec.date_time is None:
|
|
continue
|
|
epoch = int(rec.date_time.timestamp())
|
|
if epoch < two_weeks_cutoff_epoch:
|
|
assert epoch % 3600 == 0, (
|
|
f"Record at {rec.date_time} is not hour-aligned in 1h-tier region"
|
|
)
|
|
elif epoch < two_hours_cutoff_epoch:
|
|
assert epoch % (15 * 60) == 0, (
|
|
f"Record at {rec.date_time} is not 15min-aligned in 15min-tier region"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DataContainer — delegation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDataContainerCompact:
|
|
|
|
def test_compact_delegates_to_all_providers(self, energy_container):
|
|
container, ep, pp = energy_container
|
|
now = to_datetime().in_timezone("UTC")
|
|
|
|
# Fill both providers with 4 weeks of 15-min data
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
|
_fill_sequence(ep, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
|
_fill_sequence(pp, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
|
|
|
ep_before = ep.db_count_records()
|
|
pp_before = pp.db_count_records()
|
|
|
|
container.db_compact()
|
|
|
|
assert ep.db_count_records() < ep_before, "EnergyProvider records should be compacted"
|
|
assert pp.db_count_records() < pp_before, "PriceProvider records should be compacted"
|
|
|
|
def test_compact_empty_container_no_error(self):
|
|
container = EnergyContainer(providers=[])
|
|
container.db_compact() # must not raise
|
|
|
|
def test_compact_provider_tiers_respected(self, energy_container):
|
|
"""PriceProvider with single 2-week tier must not compact recent 15-min data."""
|
|
container, ep, pp = energy_container
|
|
now = to_datetime().in_timezone("UTC")
|
|
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
|
_fill_sequence(pp, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
|
|
|
container.db_compact()
|
|
|
|
# Price data in last 2 weeks should still be at 15-min resolution
|
|
two_weeks_ago = now.subtract(weeks=2)
|
|
recent = sorted(
|
|
[r for r in pp.records if r.date_time and r.date_time >= two_weeks_ago],
|
|
key=lambda r: r.date_time,
|
|
)
|
|
if len(recent) > 1:
|
|
diff = (recent[1].date_time - recent[0].date_time).total_seconds()
|
|
assert diff <= 20 * 60, (
|
|
f"PriceProvider recent data should be ~15min, got {diff/60:.1f} min"
|
|
)
|
|
|
|
def test_compact_raises_on_provider_failure(self):
|
|
"""A provider that raises during compaction must bubble up as RuntimeError.
|
|
|
|
Monkey-patching is blocked by Pydantic v2's __setattr__ validation, so
|
|
we use a subclass that overrides db_compact instead.
|
|
"""
|
|
class BrokenProvider(EnergyProvider):
|
|
def db_compact(self, *args, **kwargs):
|
|
raise ValueError("simulated failure")
|
|
|
|
def provider_id(self) -> str:
|
|
# Distinct id so it doesn't collide with EnergyProvider singleton
|
|
return "BrokenProvider"
|
|
|
|
def db_namespace(self) -> str:
|
|
return self.provider_id()
|
|
|
|
bp = BrokenProvider()
|
|
container = EnergyContainer(providers=[bp])
|
|
|
|
with pytest.raises(RuntimeError, match="fails on db_compact"):
|
|
container.db_compact()
|
|
|
|
def test_compact_idempotent_on_container(self, energy_container):
|
|
container, ep, pp = energy_container
|
|
now = to_datetime().in_timezone("UTC")
|
|
base = _aligned_base(now.subtract(weeks=4), interval_minutes=15)
|
|
_fill_sequence(ep, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
|
_fill_sequence(pp, base, count=4 * 7 * 24 * 4, interval_minutes=15)
|
|
|
|
container.db_compact()
|
|
ep_after_first = ep.db_count_records()
|
|
pp_after_first = pp.db_count_records()
|
|
|
|
container.db_compact()
|
|
assert ep.db_count_records() == ep_after_first
|
|
assert pp.db_count_records() == pp_after_first
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sparse guard — DataSequence level
|
|
# ---------------------------------------------------------------------------
|
|
#
|
|
# The sparse guard distinguishes three cases:
|
|
#
|
|
# 1. Sparse + already aligned → skip entirely (deleted=0, count unchanged)
|
|
# 2. Sparse + misaligned → snap timestamps in place (deleted>0, but
|
|
# count stays the same or decreases if two
|
|
# records collide on the same bucket)
|
|
# 3. Sparse collision → two records snap to the same bucket; values
|
|
# are merged key-by-key; count decreases by 1
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDataSequenceSparseGuard:
|
|
|
|
# ------------------------------------------------------------------
|
|
# Case 1: sparse + already aligned → pure skip
|
|
# ------------------------------------------------------------------
|
|
|
|
def test_sparse_aligned_data_not_modified(self):
|
|
"""Sparse records that already sit on interval boundaries must not be touched.
|
|
|
|
deleted must be 0 and record count must be unchanged.
|
|
"""
|
|
seq = EnergySequence()
|
|
now = to_datetime().in_timezone("UTC")
|
|
base = now.subtract(weeks=4)
|
|
|
|
# Insert exactly 3 records, each snapped to a whole hour (aligned)
|
|
for offset_days in [0, 14, 27]:
|
|
raw = base.add(days=offset_days)
|
|
# Floor to nearest hour boundary so timestamp is already aligned
|
|
aligned = raw.set(minute=0, second=0, microsecond=0)
|
|
seq.db_insert_record(EnergyRecord(date_time=aligned, power_w=100.0))
|
|
seq.db_save_records()
|
|
|
|
before = seq.db_count_records()
|
|
deleted = seq.db_compact()
|
|
|
|
assert deleted == 0, "Aligned sparse records must not be deleted"
|
|
assert seq.db_count_records() == before, "Record count must not change"
|
|
|
|
def test_sparse_aligned_data_values_untouched(self):
|
|
"""Values of aligned sparse records must be preserved exactly."""
|
|
seq = EnergySequence()
|
|
now = to_datetime().in_timezone("UTC")
|
|
base = now.subtract(weeks=4).set(minute=0, second=0, microsecond=0)
|
|
|
|
seq.db_insert_record(EnergyRecord(date_time=base, power_w=42.0, price_eur=0.99))
|
|
seq.db_save_records()
|
|
|
|
seq.db_compact()
|
|
|
|
remaining = [r for r in seq.records if r.date_time == base]
|
|
assert len(remaining) == 1
|
|
assert remaining[0].power_w == pytest.approx(42.0)
|
|
assert remaining[0].price_eur == pytest.approx(0.99)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Case 2: sparse + misaligned → timestamp snapping
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _make_snapping_seq(now, offsets_minutes, interval_minutes=10, age_minutes=30):
|
|
"""Build a sequence guaranteed to enter the sparse-snapping path.
|
|
|
|
Key insight: _db_compact_tier measures age_threshold from db_max (the
|
|
newest record in the database), not from wall-clock now. We therefore
|
|
insert a "newest anchor" record 1 second before now so that
|
|
db_max ≈ now, making cutoff = db_max - age_threshold ≈ now - age_minutes.
|
|
|
|
The test records are placed at now - (age_minutes + margin) + offset,
|
|
which puts them clearly before the cutoff and inside the compaction window.
|
|
|
|
resampled_count = age_minutes / interval_minutes (the window width in
|
|
buckets). We require len(offsets_minutes) > resampled_count so the
|
|
snapping path is entered rather than the pure-skip path.
|
|
|
|
Returns (seq, age_threshold, target_interval, record_datetimes).
|
|
"""
|
|
age_td = to_duration(f"{age_minutes} minutes")
|
|
interval_td = to_duration(f"{interval_minutes} minutes")
|
|
interval_sec = interval_minutes * 60
|
|
|
|
# Margin must be larger than the maximum offset so that ALL test records
|
|
# land before window_end = floor(now - age_minutes, interval_sec).
|
|
# We need: base + max(offsets) < now - age_minutes
|
|
# => now - (age_minutes + margin) + max(offsets) < now - age_minutes
|
|
# => max(offsets) < margin
|
|
# Use margin = max(offsets_minutes) + 2*interval_minutes + 1 (generous).
|
|
max_offset = max(offsets_minutes) if offsets_minutes else 0
|
|
margin = max_offset + 2 * interval_minutes + 1
|
|
|
|
# Floor base to interval boundary so snapping arithmetic is exact
|
|
raw_base = now.subtract(minutes=age_minutes + margin).set(second=0, microsecond=0)
|
|
base_epoch = int(raw_base.timestamp())
|
|
base = raw_base.subtract(seconds=base_epoch % interval_sec)
|
|
|
|
seq = EnergySequence()
|
|
dts = []
|
|
for off in offsets_minutes:
|
|
dt = base.add(minutes=off)
|
|
seq.db_insert_record(EnergyRecord(date_time=dt, power_w=float(off * 10)))
|
|
dts.append(dt)
|
|
|
|
# Newest anchor: makes db_max ≈ now so cutoff = now - age_threshold
|
|
anchor = now.subtract(seconds=1)
|
|
seq.db_insert_record(EnergyRecord(date_time=anchor, power_w=0.0))
|
|
seq.db_save_records()
|
|
return seq, age_td, interval_td, dts
|
|
|
|
def test_sparse_misaligned_records_are_snapped(self):
|
|
"""Sparse misaligned records must be moved to the nearest boundary.
|
|
|
|
Uses a tight window (30 min age, 10 min interval → 3 resampled buckets)
|
|
with 4 misaligned records so existing_count(4) > resampled_count(3) and
|
|
the snapping path is entered deterministically.
|
|
"""
|
|
now = to_datetime().in_timezone("UTC")
|
|
# 4 records at :03, :08, :13, :18 — all misaligned for a 10-min interval
|
|
seq, age_td, interval_td, _ = self._make_snapping_seq(
|
|
now, offsets_minutes=[3, 8, 13, 18]
|
|
)
|
|
# before includes the anchor record which is NOT in the compaction window
|
|
# and therefore NOT deleted. Only the 4 test records are in-window.
|
|
n_test_records = len([3, 8, 13, 18]) # offsets_minutes
|
|
deleted = seq._db_compact_tier(age_td, interval_td)
|
|
after = seq.db_count_records()
|
|
|
|
assert deleted == n_test_records, (
|
|
f"All {n_test_records} in-window records must be deleted (whole-window delete); "
|
|
f"got deleted={deleted}"
|
|
)
|
|
# Net count after: anchor(1) + snapped buckets re-inserted.
|
|
# Implementation uses FLOOR division: (epoch // interval_sec) * interval_sec
|
|
# offsets [3,8,13,18] with interval=10min map to buckets:
|
|
# 3 // 10 = 0 → :00
|
|
# 8 // 10 = 0 → :00 (collision with :03)
|
|
# 13 // 10 = 1 → :10
|
|
# 18 // 10 = 1 → :10 (collision with :13)
|
|
# → 2 unique buckets
|
|
interval_minutes = 10
|
|
n_snapped = len({(off // interval_minutes) * interval_minutes for off in [3, 8, 13, 18]})
|
|
assert after == 1 + n_snapped, (
|
|
f"Expected 1 anchor + {n_snapped} snapped buckets = {1 + n_snapped} records; "
|
|
f"got {after}"
|
|
)
|
|
|
|
def test_sparse_misaligned_timestamps_become_aligned(self):
|
|
"""After snapping, in-window timestamps must be on the target interval boundary.
|
|
|
|
The anchor record lives outside the compaction window (it is younger than
|
|
age_threshold) and is intentionally misaligned — it must NOT be checked.
|
|
"""
|
|
now = to_datetime().in_timezone("UTC")
|
|
interval_minutes = 10
|
|
age_minutes = 30
|
|
seq, age_td, interval_td, dts = self._make_snapping_seq(
|
|
now, offsets_minutes=[3, 8, 13, 18], interval_minutes=interval_minutes,
|
|
age_minutes=age_minutes,
|
|
)
|
|
seq._db_compact_tier(age_td, interval_td)
|
|
|
|
# Compute window_end the same way _db_compact_tier does
|
|
# (anchor is db_max; raw_cutoff = anchor - age_threshold ≈ now - 30min)
|
|
anchor_epoch = int(now.subtract(seconds=1).timestamp())
|
|
raw_cutoff_epoch = anchor_epoch - age_minutes * 60
|
|
window_end_epoch = (raw_cutoff_epoch // (interval_minutes * 60)) * (interval_minutes * 60)
|
|
|
|
interval_sec = interval_minutes * 60
|
|
for rec in seq.records:
|
|
if rec.date_time is None:
|
|
continue
|
|
epoch = int(rec.date_time.timestamp())
|
|
if epoch >= window_end_epoch:
|
|
continue # anchor or other post-cutoff record — not compacted
|
|
assert epoch % interval_sec == 0, (
|
|
f"Snapped timestamp {rec.date_time} (epoch={epoch}) is not on a "
|
|
f"{interval_minutes}-min boundary (epoch % {interval_sec} = {epoch % interval_sec})"
|
|
)
|
|
|
|
def test_sparse_misaligned_values_preserved_after_snap(self):
|
|
"""Snapping must not alter the field values of sparse records."""
|
|
seq = EnergySequence()
|
|
now = to_datetime().in_timezone("UTC")
|
|
# Single misaligned record, old enough for both tiers
|
|
dt = now.subtract(weeks=4).set(minute=7, second=0, microsecond=0)
|
|
seq.db_insert_record(EnergyRecord(date_time=dt, power_w=777.0, price_eur=0.55))
|
|
seq.db_save_records()
|
|
|
|
seq.db_compact()
|
|
|
|
# Exactly one record must remain and its values must be unchanged
|
|
assert len(seq.records) == 1
|
|
assert seq.records[0].power_w == pytest.approx(777.0)
|
|
assert seq.records[0].price_eur == pytest.approx(0.55)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Case 3: two sparse records collide on the same snapped bucket
|
|
# ------------------------------------------------------------------
|
|
|
|
def test_sparse_collision_merges_records(self):
|
|
"""Two sparse records that snap to the same bucket must be merged.
|
|
|
|
Records at :03 and :04 both round to :00 with a 10-min interval.
|
|
With 4 test records and resampled_count=3, the snapping path is entered.
|
|
A newest-anchor record at now-1s pushes db_max ≈ now so the compaction
|
|
cutoff lands at now-30min, which is after all test records.
|
|
"""
|
|
now = to_datetime().in_timezone("UTC")
|
|
age_td = to_duration("30 minutes")
|
|
interval_td = to_duration("10 minutes")
|
|
interval_sec = 600
|
|
# Place test records 41+ min ago so they are before cutoff = now - 30min
|
|
# base must be far enough back that all records (+17min max) land before
|
|
# window_end = floor(now - 30min, 600). Use now - 52min.
|
|
raw_base = now.subtract(minutes=52).set(second=0, microsecond=0)
|
|
base = raw_base.subtract(seconds=int(raw_base.timestamp()) % interval_sec)
|
|
|
|
seq = EnergySequence()
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=3),
|
|
power_w=100.0, price_eur=None))
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=4),
|
|
power_w=None, price_eur=0.25))
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=13), power_w=10.0))
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=17), power_w=20.0))
|
|
# Anchor: makes db_max ≈ now → cutoff = now - 30min (after all test records)
|
|
seq.db_insert_record(EnergyRecord(date_time=now.subtract(seconds=1), power_w=0.0))
|
|
seq.db_save_records()
|
|
|
|
# existing_count in window = 4, resampled_count = 3 → snapping path
|
|
seq._db_compact_tier(age_td, interval_td)
|
|
|
|
snapped_epoch = int(base.timestamp())
|
|
snapped = [
|
|
r for r in seq.records
|
|
if r.date_time is not None and int(r.date_time.timestamp()) == snapped_epoch
|
|
]
|
|
assert len(snapped) == 1, "The :03 and :04 records must merge into one :00 bucket"
|
|
assert snapped[0].power_w == pytest.approx(100.0), "power_w from :03 must survive"
|
|
assert snapped[0].price_eur == pytest.approx(0.25), "price_eur from :04 must survive"
|
|
|
|
def test_sparse_collision_keeps_first_value_for_shared_key(self):
|
|
"""When two sparse records floor to the same bucket, the earlier value wins.
|
|
|
|
Two records at :03 (power_w=111) and :04 (power_w=222) both floor to :00
|
|
with a 10-min interval (floor division: 3//10=0, 4//10=0).
|
|
existing_count(2) <= resampled_count for the ~22-min window, so the sparse
|
|
snapping path is taken rather than full resampling. The merged record at
|
|
:00 must carry power_w=111 because the chronologically earlier record wins.
|
|
"""
|
|
now = to_datetime().in_timezone("UTC")
|
|
interval_sec = 600
|
|
# Place both records 52 min ago so they are before window_end ≈ now - 30min.
|
|
# Only 2 test records → existing_count(2) <= resampled_count → sparse path.
|
|
raw_base = now.subtract(minutes=52).set(second=0, microsecond=0)
|
|
base = raw_base.subtract(seconds=int(raw_base.timestamp()) % interval_sec)
|
|
seq = EnergySequence()
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=3), power_w=111.0))
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=4), power_w=222.0))
|
|
# Anchor at now-1s: makes db_max ≈ now so cutoff = now - 30min
|
|
seq.db_insert_record(EnergyRecord(date_time=now.subtract(seconds=1), power_w=0.0))
|
|
seq.db_save_records()
|
|
seq._db_compact_tier(to_duration("30 minutes"), to_duration("10 minutes"))
|
|
snapped_epoch = int(base.timestamp())
|
|
snapped = [
|
|
r for r in seq.records
|
|
if r.date_time is not None and int(r.date_time.timestamp()) == snapped_epoch
|
|
]
|
|
assert len(snapped) == 1, ":03 and :04 must floor-snap into one :00 record"
|
|
assert snapped[0].power_w == pytest.approx(111.0), "Earlier record's value must win"
|
|
|
|
def test_sparse_collision_with_existing_aligned_record(self):
|
|
"""A misaligned record that snaps onto an already-aligned record must merge
|
|
into it without raising ValueError. The aligned record's existing values win.
|
|
|
|
:00 (aligned, power_w=50, price_eur=None) and :03 (misaligned,
|
|
power_w=None, price_eur=0.30) both map to :00. Result: power_w=50
|
|
(aligned wins) and price_eur=0.30 (filled from :03).
|
|
"""
|
|
now = to_datetime().in_timezone("UTC")
|
|
interval_sec = 600
|
|
# base must be far enough back that all records (+17min max) land before
|
|
# window_end = floor(now - 30min, 600). Use now - 52min.
|
|
raw_base = now.subtract(minutes=52).set(second=0, microsecond=0)
|
|
base = raw_base.subtract(seconds=int(raw_base.timestamp()) % interval_sec)
|
|
|
|
seq = EnergySequence()
|
|
seq.db_insert_record(EnergyRecord(date_time=base,
|
|
power_w=50.0, price_eur=None))
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=3),
|
|
power_w=None, price_eur=0.30))
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=13), power_w=10.0))
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=17), power_w=20.0))
|
|
# Anchor: db_max ≈ now → cutoff = now - 30min, after all test records
|
|
seq.db_insert_record(EnergyRecord(date_time=now.subtract(seconds=1), power_w=0.0))
|
|
seq.db_save_records()
|
|
|
|
# Must not raise ValueError
|
|
seq._db_compact_tier(to_duration("30 minutes"), to_duration("10 minutes"))
|
|
|
|
snapped_epoch = int(base.timestamp())
|
|
snapped = [
|
|
r for r in seq.records
|
|
if r.date_time is not None and int(r.date_time.timestamp()) == snapped_epoch
|
|
]
|
|
assert len(snapped) == 1, ":00 and :03 must merge into one :00 record"
|
|
rec = snapped[0]
|
|
assert rec.power_w == pytest.approx(50.0), "Aligned record's power_w must win"
|
|
assert rec.price_eur == pytest.approx(0.30), ":03 record's price_eur must fill in"
|
|
assert rec.date_time is not None
|
|
assert int(rec.date_time.timestamp()) % interval_sec == 0
|
|
|
|
def test_sparse_no_duplicate_timestamps_after_collision(self):
|
|
"""After collision merging, no duplicate timestamps must remain.
|
|
|
|
Three records at :02, :03, :04 all round to :00 with a 10-min interval.
|
|
Together with a record at :13 this gives existing_count(4) >
|
|
resampled_count(3) so the snapping path is entered.
|
|
"""
|
|
now = to_datetime().in_timezone("UTC")
|
|
interval_sec = 600
|
|
# base must be far enough back that all records (+17min max) land before
|
|
# window_end = floor(now - 30min, 600). Use now - 52min.
|
|
raw_base = now.subtract(minutes=52).set(second=0, microsecond=0)
|
|
base = raw_base.subtract(seconds=int(raw_base.timestamp()) % interval_sec)
|
|
|
|
seq = EnergySequence()
|
|
for offset_min in [2, 3, 4]: # all snap to :00
|
|
seq.db_insert_record(EnergyRecord(
|
|
date_time=base.add(minutes=offset_min), power_w=float(offset_min)
|
|
))
|
|
seq.db_insert_record(EnergyRecord(date_time=base.add(minutes=13), power_w=10.0))
|
|
# Anchor: db_max ≈ now → cutoff = now - 30min, after all test records
|
|
seq.db_insert_record(EnergyRecord(date_time=now.subtract(seconds=1), power_w=0.0))
|
|
seq.db_save_records()
|
|
|
|
seq._db_compact_tier(to_duration("30 minutes"), to_duration("10 minutes"))
|
|
|
|
timestamps = [
|
|
int(r.date_time.timestamp())
|
|
for r in seq.records
|
|
if r.date_time is not None
|
|
]
|
|
assert len(timestamps) == len(set(timestamps)), "Duplicate timestamps after collision merge"
|
|
|
|
# ------------------------------------------------------------------
|
|
# Existing tier-skip tests (unchanged semantics)
|
|
# ------------------------------------------------------------------
|
|
|
|
def test_hourly_data_skips_1h_tier(self):
|
|
"""Data already at 1-hour resolution and aligned must not be re-compacted."""
|
|
seq = EnergySequence()
|
|
now = to_datetime().in_timezone("UTC")
|
|
# Use an hour-aligned base so records are on clean boundaries
|
|
base = now.subtract(weeks=3).set(minute=0, second=0, microsecond=0)
|
|
|
|
_fill_sequence(seq, base, count=3 * 7 * 24, interval_minutes=60)
|
|
|
|
before = seq.db_count_records()
|
|
deleted = seq._db_compact_tier(to_duration("14 days"), to_duration("1 hour"))
|
|
|
|
assert deleted == 0
|
|
assert seq.db_count_records() == before
|
|
|
|
def test_15min_data_younger_than_2weeks_skips_1h_tier(self):
|
|
"""15-min data between 2h and 2weeks old must NOT be compacted by the 1h tier."""
|
|
seq = EnergySequence()
|
|
now = to_datetime().in_timezone("UTC")
|
|
base = now.subtract(weeks=1).set(minute=0, second=0, microsecond=0)
|
|
_fill_sequence(seq, base, count=7 * 24 * 4, interval_minutes=15)
|
|
|
|
before = seq.db_count_records()
|
|
deleted = seq._db_compact_tier(to_duration("14 days"), to_duration("1 hour"))
|
|
|
|
assert deleted == 0
|
|
assert seq.db_count_records() == before
|