mirror of
https://github.com/Akkudoktor-EOS/EOS.git
synced 2026-02-23 17:36:19 +00:00
The database supports backend selection, compression, incremental data load, automatic data saving to storage, automatic vaccum and compaction. Make SQLite3 and LMDB database backends available. Update tests for new interface conventions regarding data sequences, data containers, data providers. This includes the measurements provider and the prediction providers. Add database documentation. The fix includes several bug fixes that are not directly related to the database implementation but are necessary to keep EOS running properly and to test and document the changes. * fix: config eos test setup Make the config_eos fixture generate a new instance of the config_eos singleton. Use correct env names to setup data folder path. * fix: startup with no config Make cache and measurements complain about missing data path configuration but do not bail out. * fix: soc data preparation and usage for genetic optimization. Search for soc measurments 48 hours around the optimization start time. Only clamp soc to maximum in battery device simulation. * fix: dashboard bailout on zero value solution display Do not use zero values to calculate the chart values adjustment for display. * fix: openapi generation script Make the script also replace data_folder_path and data_output_path to hide real (test) environment pathes. * feat: add make repeated task function make_repeated_task allows to wrap a function to be repeated cyclically. * chore: removed index based data sequence access Index based data sequence access does not make sense as the sequence can be backed by the database. The sequence is now purely time series data. * chore: refactor eos startup to avoid module import startup Avoid module import initialisation expecially of the EOS configuration. Config mutation, singleton initialization, logging setup, argparse parsing, background task definitions depending on config and environment-dependent behavior is now done at function startup. * chore: introduce retention manager A single long-running background task that owns the scheduling of all periodic server-maintenance jobs (cache cleanup, DB autosave, …) * chore: canonicalize timezone name for UTC Timezone names that are semantically identical to UTC are canonicalized to UTC. * chore: extend config file migration for default value handling Extend the config file migration handling values None or nonexisting values that will invoke a default value generation in the new config file. Also adapt test to handle this situation. * chore: extend datetime util test cases * chore: make version test check for untracked files Check for files that are not tracked by git. Version calculation will be wrong if these files will not be commited. * chore: bump pandas to 3.0.0 Pandas 3.0 now performs inference on the appropriate resolution (a.k.a. unit) for the output dtype which may become datetime64[us] (before it was ns). Also numeric dtype detection is now more strict which needs a different detection for numerics. * chore: bump pydantic-settings to 2.12.0 pydantic-settings 2.12.0 under pytest creates a different behaviour. The tests were adapted and a workaround was introduced. Also ConfigEOS was adapted to allow for fine grain initialization control to be able to switch off certain settings such as file settings during test. * chore: remove sci learn kit from dependencies The sci learn kit is not strictly necessary as long as we have scipy. * chore: add documentation mode guarding for sphinx autosummary Sphinx autosummary excecutes functions. Prevent exceptions in case of pure doc mode. * chore: adapt docker-build CI workflow to stricter GitHub handling Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
889 lines
31 KiB
Python
889 lines
31 KiB
Python
from typing import Any, Iterator, Literal, Optional, Type, cast
|
||
|
||
import pytest
|
||
from numpydantic import NDArray, Shape
|
||
from pydantic import BaseModel, Field
|
||
|
||
from akkudoktoreos.core.databaseabc import (
|
||
DATABASE_METADATA_KEY,
|
||
DatabaseRecordProtocolMixin,
|
||
DatabaseTimestamp,
|
||
_DatabaseTimestampUnbound,
|
||
)
|
||
from akkudoktoreos.utils.datetimeutil import (
|
||
DateTime,
|
||
Duration,
|
||
to_datetime,
|
||
to_duration,
|
||
)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Test record
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class SampleRecord(BaseModel):
|
||
date_time: Optional[DateTime] = Field(
|
||
default=None, json_schema_extra={"description": "DateTime"}
|
||
)
|
||
value: Optional[float] = None
|
||
|
||
def __getitem__(self, key: str) -> Any:
|
||
if key == "date_time":
|
||
return self.date_time
|
||
if key == "value":
|
||
return self.value
|
||
assert key is None
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fake database backend
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class SampleDatabase:
|
||
def __init__(self):
|
||
self._data: dict[Optional[str], dict[bytes, bytes]] = {}
|
||
self._metadata: Optional[bytes] = None
|
||
self.is_open = True
|
||
self.compression = False
|
||
self.compression_level = 0
|
||
self.storage_path = "/fake"
|
||
|
||
# serialization (pass-through)
|
||
|
||
def serialize_data(self, data: bytes) -> bytes:
|
||
return data
|
||
|
||
def deserialize_data(self, data: bytes) -> bytes:
|
||
return data
|
||
|
||
# metadata
|
||
|
||
def set_metadata(self, metadata: Optional[bytes], *, namespace: Optional[str] = None) -> None:
|
||
self._metadata = metadata
|
||
|
||
def get_metadata(self, namespace: Optional[str] = None) -> Optional[bytes]:
|
||
return self._metadata
|
||
|
||
# write
|
||
|
||
def save_records(
|
||
self, records: list[tuple[bytes, bytes]], namespace: Optional[str] = None
|
||
) -> int:
|
||
ns = self._data.setdefault(namespace, {})
|
||
saved = 0
|
||
for key, value in records:
|
||
ns[key] = value
|
||
saved += 1
|
||
return saved
|
||
|
||
def delete_records(
|
||
self, keys: Iterator[bytes], namespace: Optional[str] = None
|
||
) -> int:
|
||
ns_data = self._data.get(namespace, {})
|
||
deleted = 0
|
||
for key in keys:
|
||
if key in ns_data:
|
||
del ns_data[key]
|
||
deleted += 1
|
||
return deleted
|
||
|
||
# read
|
||
|
||
def iterate_records(
|
||
self,
|
||
start_key: Optional[bytes] = None,
|
||
end_key: Optional[bytes] = None,
|
||
namespace: Optional[str] = None,
|
||
reverse: bool = False,
|
||
) -> Iterator[tuple[bytes, bytes]]:
|
||
items = self._data.get(namespace, {})
|
||
keys = sorted(items, reverse=reverse)
|
||
for k in keys:
|
||
if k == DATABASE_METADATA_KEY:
|
||
continue
|
||
if start_key and k < start_key:
|
||
continue
|
||
if end_key and k >= end_key:
|
||
continue
|
||
yield k, items[k]
|
||
|
||
# stats
|
||
|
||
def count_records(
|
||
self,
|
||
start_key: Optional[bytes] = None,
|
||
end_key: Optional[bytes] = None,
|
||
*,
|
||
namespace: Optional[str] = None,
|
||
) -> int:
|
||
items = self._data.get(namespace, {})
|
||
count = 0
|
||
for k in items:
|
||
if k == DATABASE_METADATA_KEY:
|
||
continue
|
||
if start_key and k < start_key:
|
||
continue
|
||
if end_key and k >= end_key:
|
||
continue
|
||
count += 1
|
||
return count
|
||
|
||
def get_key_range(
|
||
self, namespace: Optional[str] = None
|
||
) -> tuple[Optional[bytes], Optional[bytes]]:
|
||
items = self._data.get(namespace, {})
|
||
keys = sorted(k for k in items if k != DATABASE_METADATA_KEY)
|
||
if not keys:
|
||
return None, None
|
||
return keys[0], keys[-1]
|
||
|
||
def get_backend_stats(self, namespace: Optional[str] = None) -> dict:
|
||
return {}
|
||
|
||
def flush(self, namespace: Optional[str] = None) -> None:
|
||
pass
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Concrete test sequence — minimal, no Pydantic / singleton overhead
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class SampleSequence(DatabaseRecordProtocolMixin[SampleRecord]):
|
||
"""Minimal concrete implementation for unit-testing the mixin."""
|
||
|
||
def __init__(self):
|
||
self.records: list[SampleRecord] = []
|
||
self._db_record_index: dict[DatabaseTimestamp, SampleRecord] = {}
|
||
self._db_sorted_timestamps: list[DatabaseTimestamp] = []
|
||
self._db_dirty_timestamps: set[DatabaseTimestamp] = set()
|
||
self._db_new_timestamps: set[DatabaseTimestamp] = set()
|
||
self._db_deleted_timestamps: set[DatabaseTimestamp] = set()
|
||
self._db_initialized: bool = True
|
||
self._db_storage_initialized: bool = False
|
||
self._db_metadata: Optional[dict] = None
|
||
self._db_loaded_range = None
|
||
from akkudoktoreos.core.databaseabc import DatabaseRecordProtocolLoadPhase
|
||
self._db_load_phase = DatabaseRecordProtocolLoadPhase.NONE
|
||
self._db_version: int = 1
|
||
|
||
self.database = SampleDatabase()
|
||
self.config = type(
|
||
"Cfg",
|
||
(),
|
||
{
|
||
"database": type(
|
||
"DBCfg",
|
||
(),
|
||
{
|
||
"auto_save": False,
|
||
"compression_level": 0,
|
||
"autosave_interval_sec": 10,
|
||
"initial_load_window_h": None,
|
||
"keep_duration_h": None,
|
||
},
|
||
)()
|
||
},
|
||
)()
|
||
|
||
@classmethod
|
||
def record_class(cls) -> Type[SampleRecord]:
|
||
return SampleRecord
|
||
|
||
def db_namespace(self) -> str:
|
||
return "test"
|
||
|
||
@property
|
||
def record_keys_writable(self) -> list[str]:
|
||
"""Return writable field names of SampleRecord.
|
||
|
||
Required by _db_compact_tier which iterates record_keys_writable
|
||
to decide which fields to resample. Must match exactly what
|
||
key_to_array accepts — only 'value' here, not 'date_time'.
|
||
"""
|
||
return ["value"]
|
||
|
||
# Override key_to_array for the mixin tests — the full DataSequence
|
||
# implementation lives in dataabc.py; here we provide a minimal version
|
||
# that resamples the single `value` field to demonstrate compaction.
|
||
def key_to_array(
|
||
self,
|
||
key: str,
|
||
start_datetime: Optional[DateTime] = None,
|
||
end_datetime: Optional[DateTime] = None,
|
||
interval: Optional[Duration] = None,
|
||
fill_method: Optional[str] = None,
|
||
dropna: Optional[bool] = True,
|
||
boundary: Literal["strict", "context"] = "context",
|
||
align_to_interval: bool = False,
|
||
) -> NDArray[Shape["*"], Any]:
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
if interval is None:
|
||
interval = to_duration("1 hour")
|
||
|
||
dates = []
|
||
values = []
|
||
for record in self.records:
|
||
if record.date_time is None:
|
||
continue
|
||
ts = DatabaseTimestamp.from_datetime(record.date_time)
|
||
if start_datetime and DatabaseTimestamp.from_datetime(start_datetime) > ts:
|
||
continue
|
||
if end_datetime and DatabaseTimestamp.from_datetime(end_datetime) <= ts:
|
||
continue
|
||
dates.append(record.date_time)
|
||
values.append(getattr(record, key, None))
|
||
|
||
if not dates:
|
||
return np.array([])
|
||
|
||
index = pd.to_datetime(dates, utc=True)
|
||
series = pd.Series(values, index=index, dtype=float)
|
||
freq = f"{int(interval.total_seconds())}s"
|
||
origin = start_datetime if start_datetime else "start_day"
|
||
resampled = series.resample(freq, origin=origin).mean().interpolate("time")
|
||
|
||
if start_datetime is not None:
|
||
resampled = resampled.truncate(before=start_datetime)
|
||
if end_datetime is not None:
|
||
resampled = resampled.truncate(after=end_datetime)
|
||
|
||
return resampled.values
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _insert_records_every_n_minutes(
|
||
seq: SampleSequence,
|
||
base: DateTime,
|
||
count: int,
|
||
interval_minutes: int,
|
||
value_fn=None,
|
||
) -> None:
|
||
"""Insert `count` records spaced `interval_minutes` apart starting at `base`."""
|
||
for i in range(count):
|
||
dt = base.add(minutes=i * interval_minutes)
|
||
value = value_fn(i) if value_fn else float(i)
|
||
seq.db_insert_record(SampleRecord(date_time=dt, value=value))
|
||
seq.db_save_records()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fixtures
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.fixture
|
||
def seq():
|
||
return SampleSequence()
|
||
|
||
|
||
@pytest.fixture
|
||
def seq_with_15min_data():
|
||
"""Sequence with 15-min records spanning 4 weeks, so both tiers have data."""
|
||
s = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
# 4 weeks × 7 days × 24 h × 4 records/h = 2688 records
|
||
base = now.subtract(weeks=4)
|
||
_insert_records_every_n_minutes(s, base, count=2688, interval_minutes=15)
|
||
return s, now
|
||
|
||
|
||
@pytest.fixture
|
||
def seq_sparse():
|
||
"""Sequence with only 3 records spread over 4 weeks — sparse, no compaction benefit."""
|
||
s = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(weeks=4)
|
||
for offset_days in [0, 14, 27]:
|
||
dt = base.add(days=offset_days)
|
||
s.db_insert_record(SampleRecord(date_time=dt, value=float(offset_days)))
|
||
s.db_save_records()
|
||
return s, now
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Existing tests (unchanged)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestDatabaseRecordProtocolMixin:
|
||
|
||
@pytest.mark.parametrize(
|
||
"start_str, value_count, interval_seconds",
|
||
[
|
||
("2024-11-10 00:00:00", 24, 3600),
|
||
("2024-08-10 00:00:00", 24, 3600),
|
||
("2024-03-31 00:00:00", 24, 3600),
|
||
("2024-10-27 00:00:00", 24, 3600),
|
||
],
|
||
)
|
||
def test_db_generate_timestamps_utc_spacing(
|
||
self, seq, start_str, value_count, interval_seconds
|
||
):
|
||
start_dt = to_datetime(start_str, in_timezone="Europe/Berlin")
|
||
assert start_dt.tz.name == "Europe/Berlin"
|
||
|
||
db_start = DatabaseTimestamp.from_datetime(start_dt)
|
||
generated = list(seq.db_generate_timestamps(db_start, value_count))
|
||
|
||
assert len(generated) == value_count
|
||
|
||
for db_dt in generated:
|
||
dt = DatabaseTimestamp.to_datetime(db_dt)
|
||
assert dt.tz.name == "UTC"
|
||
|
||
assert len(generated) == len(set(generated)), "Duplicate UTC datetimes found"
|
||
|
||
for i in range(1, len(generated)):
|
||
last_dt = DatabaseTimestamp.to_datetime(generated[i - 1])
|
||
current_dt = DatabaseTimestamp.to_datetime(generated[i])
|
||
delta = (current_dt - last_dt).total_seconds()
|
||
assert delta == interval_seconds, f"Spacing mismatch at index {i}: {delta}s"
|
||
|
||
def test_insert_and_memory_range(self, seq):
|
||
t0 = to_datetime()
|
||
t1 = t0.add(hours=1)
|
||
|
||
seq.db_insert_record(SampleRecord(date_time=t0, value=1))
|
||
seq.db_insert_record(SampleRecord(date_time=t1, value=2))
|
||
|
||
assert seq.records[0].date_time == t0
|
||
assert seq.records[-1].date_time == t1
|
||
assert len(seq.records) == 2
|
||
|
||
def test_roundtrip_reload(self):
|
||
seq = SampleSequence()
|
||
t0 = to_datetime()
|
||
t1 = t0.add(hours=1)
|
||
|
||
seq.db_insert_record(SampleRecord(date_time=t0, value=1))
|
||
seq.db_insert_record(SampleRecord(date_time=t1, value=2))
|
||
assert seq.db_save_records() == 2
|
||
|
||
db = seq.database
|
||
seq2 = SampleSequence()
|
||
seq2.database = db
|
||
loaded = seq2.db_load_records()
|
||
|
||
assert loaded == 2
|
||
assert len(seq2.records) == 2
|
||
|
||
def test_db_count_records(self, seq):
|
||
t0 = to_datetime()
|
||
seq.db_insert_record(SampleRecord(date_time=t0, value=1))
|
||
assert seq.db_count_records() == 1
|
||
seq.db_save_records()
|
||
assert seq.db_count_records() == 1
|
||
|
||
def test_delete_range(self, seq):
|
||
base = to_datetime()
|
||
for i in range(5):
|
||
seq.db_insert_record(SampleRecord(date_time=base.add(minutes=i), value=i))
|
||
|
||
db_start = DatabaseTimestamp.from_datetime(base.add(minutes=1))
|
||
db_end = DatabaseTimestamp.from_datetime(base.add(minutes=4))
|
||
deleted = seq.db_delete_records(start_timestamp=db_start, end_timestamp=db_end)
|
||
|
||
assert deleted == 3
|
||
assert [r.value for r in seq.records] == [0, 4]
|
||
|
||
def test_db_count_records_memory_only_multiple(self):
|
||
seq = SampleSequence()
|
||
base = to_datetime()
|
||
for i in range(3):
|
||
seq.db_insert_record(SampleRecord(date_time=base.add(minutes=i), value=i))
|
||
assert seq.db_count_records() == 3
|
||
|
||
def test_db_count_records_memory_newer_than_db(self):
|
||
seq = SampleSequence()
|
||
base = to_datetime()
|
||
seq.db_insert_record(SampleRecord(date_time=base, value=1))
|
||
seq.db_save_records()
|
||
seq.db_insert_record(SampleRecord(date_time=base.add(hours=1), value=2))
|
||
seq.db_insert_record(SampleRecord(date_time=base.add(hours=2), value=3))
|
||
assert seq.db_count_records() == 3
|
||
|
||
def test_db_count_records_memory_older_than_db(self):
|
||
seq = SampleSequence()
|
||
base = to_datetime()
|
||
seq.db_insert_record(SampleRecord(date_time=base.add(hours=1), value=2))
|
||
seq.db_save_records()
|
||
seq.db_insert_record(SampleRecord(date_time=base, value=1))
|
||
assert seq.db_count_records() == 2
|
||
|
||
def test_db_count_records_empty_everywhere(self):
|
||
seq = SampleSequence()
|
||
assert seq.db_count_records() == 0
|
||
|
||
def test_metadata_not_counted(self, seq):
|
||
seq.database._data.setdefault("test", {})[DATABASE_METADATA_KEY] = b"meta"
|
||
assert seq.db_count_records() == 0
|
||
|
||
def test_key_range_excludes_metadata(self, seq):
|
||
ns = seq.db_namespace()
|
||
seq.database._data.setdefault(ns, {})[DATABASE_METADATA_KEY] = b"meta"
|
||
assert seq.database.get_key_range(ns) == (None, None)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Compaction tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestCompactTiers:
|
||
"""Tests for db_compact_tiers() and the tier hook."""
|
||
|
||
def test_default_tiers_returns_two_entries(self, seq):
|
||
tiers = seq.db_compact_tiers()
|
||
assert len(tiers) == 2
|
||
|
||
def test_default_tiers_ordered_shortest_first(self, seq):
|
||
tiers = seq.db_compact_tiers()
|
||
ages = [t[0].total_seconds() for t in tiers]
|
||
assert ages == sorted(ages), "Tiers must be ordered shortest age first"
|
||
|
||
def test_default_tiers_first_is_2h_to_15min(self, seq):
|
||
tiers = seq.db_compact_tiers()
|
||
age_sec, interval_sec = (
|
||
tiers[0][0].total_seconds(),
|
||
tiers[0][1].total_seconds(),
|
||
)
|
||
assert age_sec == 2 * 3600
|
||
assert interval_sec == 15 * 60
|
||
|
||
def test_default_tiers_second_is_2weeks_to_1h(self, seq):
|
||
tiers = seq.db_compact_tiers()
|
||
age_sec, interval_sec = (
|
||
tiers[1][0].total_seconds(),
|
||
tiers[1][1].total_seconds(),
|
||
)
|
||
assert age_sec == 14 * 24 * 3600
|
||
assert interval_sec == 3600
|
||
|
||
def test_override_tiers(self):
|
||
class CustomSeq(SampleSequence):
|
||
def db_compact_tiers(self):
|
||
return [(to_duration("7 days"), to_duration("1 hour"))]
|
||
|
||
s = CustomSeq()
|
||
tiers = s.db_compact_tiers()
|
||
assert len(tiers) == 1
|
||
assert tiers[0][1].total_seconds() == 3600
|
||
|
||
def test_empty_tiers_disables_compaction(self):
|
||
class NoCompactSeq(SampleSequence):
|
||
def db_compact_tiers(self):
|
||
return []
|
||
|
||
s = NoCompactSeq()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(weeks=4)
|
||
_insert_records_every_n_minutes(s, base, count=100, interval_minutes=15)
|
||
|
||
deleted = s.db_compact()
|
||
assert deleted == 0
|
||
|
||
|
||
class TestCompactState:
|
||
"""Tests for _db_get_compact_state / _db_set_compact_state."""
|
||
|
||
def test_get_state_returns_none_when_no_metadata(self, seq):
|
||
interval = to_duration("1 hour")
|
||
assert seq._db_get_compact_state(interval) is None
|
||
|
||
def test_set_and_get_state_roundtrip(self, seq):
|
||
interval = to_duration("1 hour")
|
||
now = to_datetime().in_timezone("UTC")
|
||
ts = DatabaseTimestamp.from_datetime(now)
|
||
|
||
seq._db_set_compact_state(interval, ts)
|
||
retrieved = seq._db_get_compact_state(interval)
|
||
|
||
assert retrieved == ts
|
||
|
||
def test_state_is_per_tier(self, seq):
|
||
"""Different tier intervals must not overwrite each other."""
|
||
interval_15min = to_duration("15 minutes")
|
||
interval_1h = to_duration("1 hour")
|
||
|
||
now = to_datetime().in_timezone("UTC")
|
||
ts_15 = DatabaseTimestamp.from_datetime(now)
|
||
ts_1h = DatabaseTimestamp.from_datetime(now.subtract(days=1))
|
||
|
||
seq._db_set_compact_state(interval_15min, ts_15)
|
||
seq._db_set_compact_state(interval_1h, ts_1h)
|
||
|
||
assert seq._db_get_compact_state(interval_15min) == ts_15
|
||
assert seq._db_get_compact_state(interval_1h) == ts_1h
|
||
|
||
def test_state_persists_in_metadata(self, seq):
|
||
"""State must survive a metadata reload."""
|
||
interval = to_duration("1 hour")
|
||
now = to_datetime().in_timezone("UTC")
|
||
ts = DatabaseTimestamp.from_datetime(now)
|
||
|
||
seq._db_set_compact_state(interval, ts)
|
||
|
||
# Reload metadata from fake DB
|
||
seq2 = SampleSequence()
|
||
seq2.database = seq.database
|
||
seq2._db_metadata = seq2._db_load_metadata()
|
||
|
||
assert seq2._db_get_compact_state(interval) == ts
|
||
|
||
|
||
class TestCompactSparseGuard:
|
||
"""The inflation guard must skip compaction when records are already sparse."""
|
||
|
||
def test_sparse_data_aligns_but_does_not_reduce_cardinality(self, seq_sparse):
|
||
"""Sparse data must be aligned to the target interval for all records that were modified."""
|
||
seq, _ = seq_sparse
|
||
|
||
interval = to_duration("15 minutes")
|
||
interval_sec = int(interval.total_seconds())
|
||
|
||
# Snapshot original timestamps
|
||
before_epochs = {
|
||
int(r.date_time.timestamp())
|
||
for r in seq.records
|
||
}
|
||
|
||
seq._db_compact_tier(
|
||
to_duration("30 minutes"),
|
||
interval,
|
||
)
|
||
|
||
after_epochs = {
|
||
int(r.date_time.timestamp())
|
||
for r in seq.records
|
||
}
|
||
|
||
# Cardinality must not increase
|
||
assert len(after_epochs) <= len(before_epochs)
|
||
|
||
# Any timestamp that changed must now be aligned
|
||
changed_epochs = after_epochs - before_epochs
|
||
|
||
for epoch in changed_epochs:
|
||
assert epoch % interval_sec == 0
|
||
|
||
def test_sparse_guard_advances_cutoff(self, seq_sparse):
|
||
"""Even when skipped, the cutoff should be stored so next run skips the same window."""
|
||
seq, _ = seq_sparse
|
||
interval_1h = to_duration("1 hour")
|
||
interval_15min = to_duration("15 minutes")
|
||
|
||
seq.db_compact()
|
||
|
||
# Both tiers should have stored a cutoff even though nothing was deleted
|
||
assert seq._db_get_compact_state(interval_1h) is not None
|
||
assert seq._db_get_compact_state(interval_15min) is not None
|
||
|
||
def test_exactly_at_boundary_remains_stable(self, seq):
|
||
now = to_datetime().in_timezone("UTC")
|
||
interval = to_duration("1 hour")
|
||
|
||
raw_base = now.subtract(hours=5).set(minute=0, second=0, microsecond=0)
|
||
base = raw_base.subtract(seconds=int(raw_base.timestamp()) % 3600)
|
||
|
||
for i in range(4):
|
||
seq.db_insert_record(
|
||
SampleRecord(
|
||
date_time=base.add(hours=i),
|
||
value=float(i),
|
||
)
|
||
)
|
||
|
||
seq.db_insert_record(
|
||
SampleRecord(date_time=now.subtract(seconds=1), value=0.0)
|
||
)
|
||
seq.db_save_records()
|
||
|
||
before = [
|
||
(int(r.date_time.timestamp()), r.value)
|
||
for r in seq.records
|
||
]
|
||
|
||
seq._db_compact_tier(
|
||
to_duration("30 minutes"),
|
||
interval,
|
||
)
|
||
|
||
after = [
|
||
(int(r.date_time.timestamp()), r.value)
|
||
for r in seq.records
|
||
]
|
||
|
||
assert before == after
|
||
|
||
|
||
class TestCompactTierWorker:
|
||
"""Unit tests for _db_compact_tier directly."""
|
||
|
||
def test_empty_sequence_returns_zero(self, seq):
|
||
age = to_duration("2 hours")
|
||
interval = to_duration("15 minutes")
|
||
assert seq._db_compact_tier(age, interval) == 0
|
||
|
||
def test_all_records_too_recent_skipped(self):
|
||
"""Records within the age threshold must not be touched."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
# Insert 10 records from 30 minutes ago — all within 2h threshold
|
||
base = now.subtract(minutes=30)
|
||
_insert_records_every_n_minutes(seq, base, count=10, interval_minutes=1)
|
||
|
||
before = seq.db_count_records()
|
||
deleted = seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
assert deleted == 0
|
||
assert seq.db_count_records() == before
|
||
|
||
def test_compaction_reduces_record_count(self):
|
||
"""Dense 1-min records older than 2h should be downsampled to 15-min."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
# Insert 1-min records for 6 hours ending 3 hours ago
|
||
base = now.subtract(hours=9)
|
||
_insert_records_every_n_minutes(seq, base, count=6 * 60, interval_minutes=1)
|
||
|
||
before = seq.db_count_records()
|
||
deleted = seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
after = seq.db_count_records()
|
||
assert deleted > 0
|
||
assert after < before
|
||
|
||
def test_records_within_threshold_preserved(self):
|
||
"""Records newer than age_threshold must remain untouched after compaction."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
|
||
# Old dense records (will be compacted)
|
||
old_base = now.subtract(hours=6)
|
||
_insert_records_every_n_minutes(seq, old_base, count=4 * 60, interval_minutes=1)
|
||
|
||
# Recent records (must not be touched) — insert 5 records in the last hour
|
||
recent_base = now.subtract(minutes=50)
|
||
_insert_records_every_n_minutes(seq, recent_base, count=5, interval_minutes=10)
|
||
|
||
recent_before = [
|
||
r for r in seq.records
|
||
if r.date_time and r.date_time >= recent_base
|
||
]
|
||
|
||
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
recent_after = [
|
||
r for r in seq.records
|
||
if r.date_time and r.date_time >= recent_base
|
||
]
|
||
assert len(recent_after) == len(recent_before)
|
||
|
||
def test_incremental_cutoff_prevents_recompaction(self):
|
||
"""Running compaction twice must not re-compact already-compacted data."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(hours=8)
|
||
_insert_records_every_n_minutes(seq, base, count=5 * 60, interval_minutes=1)
|
||
|
||
age = to_duration("2 hours")
|
||
interval = to_duration("15 minutes")
|
||
|
||
deleted_first = seq._db_compact_tier(age, interval)
|
||
count_after_first = seq.db_count_records()
|
||
|
||
deleted_second = seq._db_compact_tier(age, interval)
|
||
count_after_second = seq.db_count_records()
|
||
|
||
assert deleted_first > 0
|
||
assert deleted_second == 0, "Second run must be a no-op"
|
||
assert count_after_first == count_after_second
|
||
|
||
def test_cutoff_stored_after_compaction(self):
|
||
"""Cutoff timestamp must be persisted after a successful compaction run."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(hours=8)
|
||
_insert_records_every_n_minutes(seq, base, count=5 * 60, interval_minutes=1)
|
||
|
||
interval = to_duration("15 minutes")
|
||
seq._db_compact_tier(to_duration("2 hours"), interval)
|
||
|
||
assert seq._db_get_compact_state(interval) is not None
|
||
|
||
|
||
class TestDbCompact:
|
||
"""Integration tests for the public db_compact() entry point."""
|
||
|
||
def test_compact_dense_data_both_tiers(self, seq_with_15min_data):
|
||
"""4 weeks of 15-min data should be reduced by both tiers."""
|
||
seq, _ = seq_with_15min_data
|
||
before = seq.db_count_records()
|
||
|
||
total_deleted = seq.db_compact()
|
||
|
||
after = seq.db_count_records()
|
||
assert total_deleted > 0
|
||
assert after < before
|
||
|
||
def test_compact_coarsest_tier_runs_first(self, seq_with_15min_data):
|
||
"""The 1-hour tier (coarsest) must run before the 15-min tier.
|
||
|
||
If coarsest ran last it would re-compact records the 15-min tier
|
||
had already downsampled — verified by checking that the 1-hour
|
||
cutoff is not later than the 15-min cutoff.
|
||
"""
|
||
seq, _ = seq_with_15min_data
|
||
seq.db_compact()
|
||
|
||
cutoff_1h = seq._db_get_compact_state(to_duration("1 hour"))
|
||
cutoff_15min = seq._db_get_compact_state(to_duration("15 minutes"))
|
||
|
||
assert cutoff_1h is not None
|
||
assert cutoff_15min is not None
|
||
# The 1h tier covers older data → its cutoff must be earlier than 15min tier
|
||
assert cutoff_1h <= cutoff_15min
|
||
|
||
def test_compact_idempotent(self, seq_with_15min_data):
|
||
"""Running db_compact twice must not change record count."""
|
||
seq, _ = seq_with_15min_data
|
||
seq.db_compact()
|
||
after_first = seq.db_count_records()
|
||
|
||
seq.db_compact()
|
||
after_second = seq.db_count_records()
|
||
|
||
assert after_first == after_second
|
||
|
||
def test_compact_empty_sequence_returns_zero(self, seq):
|
||
assert seq.db_compact() == 0
|
||
|
||
def test_compact_with_override_tiers(self):
|
||
"""Passing compact_tiers directly must override db_compact_tiers()."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(weeks=3)
|
||
_insert_records_every_n_minutes(seq, base, count=3 * 7 * 24 * 4, interval_minutes=15)
|
||
|
||
before = seq.db_count_records()
|
||
deleted = seq.db_compact(
|
||
compact_tiers=[(to_duration("1 day"), to_duration("1 hour"))]
|
||
)
|
||
|
||
assert deleted > 0
|
||
assert seq.db_count_records() < before
|
||
|
||
def test_compact_only_processes_new_window_on_second_call(self):
|
||
"""Second call processes only the new window, not the full history."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(weeks=3)
|
||
# Dense 1-min data for 3 weeks
|
||
_insert_records_every_n_minutes(seq, base, count=3 * 7 * 24 * 60, interval_minutes=1)
|
||
|
||
seq.db_compact()
|
||
count_after_first = seq.db_count_records()
|
||
|
||
# Add one more day of dense data in the past (simulate new old data arriving)
|
||
extra_base = now.subtract(weeks=3).subtract(days=1)
|
||
_insert_records_every_n_minutes(seq, extra_base, count=24 * 60, interval_minutes=1)
|
||
|
||
seq.db_compact()
|
||
count_after_second = seq.db_count_records()
|
||
|
||
# Second compact should have processed the newly added old data
|
||
# Record count may change but should not exceed first compacted count by much
|
||
assert count_after_second >= 0 # basic sanity
|
||
|
||
|
||
class TestCompactDataIntegrity:
|
||
"""Verify value integrity is preserved after compaction."""
|
||
|
||
def test_constant_value_preserved(self):
|
||
"""Constant value field must survive mean-resampling unchanged."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(hours=6)
|
||
|
||
# All values = 42.0
|
||
_insert_records_every_n_minutes(
|
||
seq, base, count=6 * 60, interval_minutes=1, value_fn=lambda _: 42.0
|
||
)
|
||
|
||
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
for record in seq.records:
|
||
if record.date_time and record.date_time < now.subtract(hours=2):
|
||
assert record.value == pytest.approx(42.0, abs=1e-6)
|
||
|
||
def test_recent_records_not_modified(self):
|
||
"""Records newer than the age threshold must have unchanged values."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
|
||
old_base = now.subtract(hours=6)
|
||
_insert_records_every_n_minutes(seq, old_base, count=3 * 60, interval_minutes=1)
|
||
|
||
# Known recent values
|
||
recent_base = now.subtract(minutes=30)
|
||
expected = {i * 10: float(100 + i) for i in range(3)}
|
||
for offset, val in expected.items():
|
||
dt = recent_base.add(minutes=offset)
|
||
seq.db_insert_record(SampleRecord(date_time=dt, value=val))
|
||
seq.db_save_records()
|
||
|
||
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
for record in seq.records:
|
||
if record.date_time and record.date_time >= recent_base:
|
||
offset = int((record.date_time - recent_base).total_seconds() / 60)
|
||
if offset in expected:
|
||
assert record.value == pytest.approx(expected[offset], abs=1e-6)
|
||
|
||
def test_compacted_timestamps_spacing(self):
|
||
"""Resampled records must be fewer than original and span the compaction window.
|
||
|
||
Exact per-bucket spacing depends on the full DataSequence.key_to_array
|
||
implementation (pandas resampling). The stub key_to_array in SampleSequence
|
||
only guarantees a reduction in count — uniform spacing is verified in
|
||
test_dataabc_compact.py against the real implementation.
|
||
"""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(hours=6)
|
||
_insert_records_every_n_minutes(seq, base, count=5 * 60, interval_minutes=1)
|
||
|
||
before = seq.db_count_records()
|
||
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
cutoff = now.subtract(hours=2)
|
||
compacted = sorted(
|
||
[r for r in seq.records if r.date_time and r.date_time < cutoff],
|
||
key=lambda r: cast(DateTime, r.date_time),
|
||
)
|
||
|
||
# Must have produced fewer records than the original 1-min data
|
||
assert len(compacted) > 0, "Expected at least one compacted record"
|
||
assert len(compacted) < before, "Compaction must reduce record count"
|
||
|
||
# Window start is floored to interval boundary
|
||
interval_sec = 15 * 60
|
||
expected_window_start = DateTime.fromtimestamp(
|
||
(int(base.timestamp()) // interval_sec) * interval_sec,
|
||
tz="UTC",
|
||
)
|
||
assert compacted[0].date_time >= expected_window_start
|
||
|
||
# Last compacted record must be before the cutoff
|
||
assert compacted[-1].date_time < cutoff
|