mirror of
https://github.com/Akkudoktor-EOS/EOS.git
synced 2026-02-26 19:06:20 +00:00
Some checks failed
Bump Version / Bump Version Workflow (push) Has been cancelled
docker-build / platform-excludes (push) Has been cancelled
docker-build / build (push) Has been cancelled
docker-build / merge (push) Has been cancelled
pre-commit / pre-commit (push) Has been cancelled
Run Pytest on Pull Request / test (push) Has been cancelled
Startup retention manager for asynchronous tasks. Handle gracefully exceptions in these tasks or the configuration for them. Remove tasks.py as repeated tasks are now handled by the retention manager. When running on GitHub, only the version date file is checked. The development tag is merely a label, so any date set during development suffices. The test_doc is also skipped on GitHub actions.
895 lines
31 KiB
Python
895 lines
31 KiB
Python
from typing import Any, Iterator, Literal, Optional, Type, cast
|
||
|
||
import pytest
|
||
from numpydantic import NDArray, Shape
|
||
from pydantic import BaseModel, Field
|
||
|
||
from akkudoktoreos.core.databaseabc import (
|
||
DATABASE_METADATA_KEY,
|
||
DatabaseRecordProtocolMixin,
|
||
DatabaseTimestamp,
|
||
_DatabaseTimestampUnbound,
|
||
)
|
||
from akkudoktoreos.utils.datetimeutil import (
|
||
DateTime,
|
||
Duration,
|
||
to_datetime,
|
||
to_duration,
|
||
)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Test record
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class SampleRecord(BaseModel):
|
||
date_time: Optional[DateTime] = Field(
|
||
default=None, json_schema_extra={"description": "DateTime"}
|
||
)
|
||
value: Optional[float] = None
|
||
|
||
def __getitem__(self, key: str) -> Any:
|
||
if key == "date_time":
|
||
return self.date_time
|
||
if key == "value":
|
||
return self.value
|
||
assert key is None
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fake database backend
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class SampleDatabase:
|
||
def __init__(self):
|
||
self._data: dict[Optional[str], dict[bytes, bytes]] = {}
|
||
self._metadata: Optional[bytes] = None
|
||
self.is_open = True
|
||
self.compression = False
|
||
self.compression_level = 0
|
||
self.storage_path = "/fake"
|
||
|
||
# serialization (pass-through)
|
||
|
||
def serialize_data(self, data: bytes) -> bytes:
|
||
return data
|
||
|
||
def deserialize_data(self, data: bytes) -> bytes:
|
||
return data
|
||
|
||
# metadata
|
||
|
||
def set_metadata(self, metadata: Optional[bytes], *, namespace: Optional[str] = None) -> None:
|
||
self._metadata = metadata
|
||
|
||
def get_metadata(self, namespace: Optional[str] = None) -> Optional[bytes]:
|
||
return self._metadata
|
||
|
||
# write
|
||
|
||
def save_records(
|
||
self, records: list[tuple[bytes, bytes]], namespace: Optional[str] = None
|
||
) -> int:
|
||
ns = self._data.setdefault(namespace, {})
|
||
saved = 0
|
||
for key, value in records:
|
||
ns[key] = value
|
||
saved += 1
|
||
return saved
|
||
|
||
def delete_records(
|
||
self, keys: Iterator[bytes], namespace: Optional[str] = None
|
||
) -> int:
|
||
ns_data = self._data.get(namespace, {})
|
||
deleted = 0
|
||
for key in keys:
|
||
if key in ns_data:
|
||
del ns_data[key]
|
||
deleted += 1
|
||
return deleted
|
||
|
||
# read
|
||
|
||
def iterate_records(
|
||
self,
|
||
start_key: Optional[bytes] = None,
|
||
end_key: Optional[bytes] = None,
|
||
namespace: Optional[str] = None,
|
||
reverse: bool = False,
|
||
) -> Iterator[tuple[bytes, bytes]]:
|
||
items = self._data.get(namespace, {})
|
||
keys = sorted(items, reverse=reverse)
|
||
for k in keys:
|
||
if k == DATABASE_METADATA_KEY:
|
||
continue
|
||
if start_key and k < start_key:
|
||
continue
|
||
if end_key and k >= end_key:
|
||
continue
|
||
yield k, items[k]
|
||
|
||
# stats
|
||
|
||
def count_records(
|
||
self,
|
||
start_key: Optional[bytes] = None,
|
||
end_key: Optional[bytes] = None,
|
||
*,
|
||
namespace: Optional[str] = None,
|
||
) -> int:
|
||
items = self._data.get(namespace, {})
|
||
count = 0
|
||
for k in items:
|
||
if k == DATABASE_METADATA_KEY:
|
||
continue
|
||
if start_key and k < start_key:
|
||
continue
|
||
if end_key and k >= end_key:
|
||
continue
|
||
count += 1
|
||
return count
|
||
|
||
def get_key_range(
|
||
self, namespace: Optional[str] = None
|
||
) -> tuple[Optional[bytes], Optional[bytes]]:
|
||
items = self._data.get(namespace, {})
|
||
keys = sorted(k for k in items if k != DATABASE_METADATA_KEY)
|
||
if not keys:
|
||
return None, None
|
||
return keys[0], keys[-1]
|
||
|
||
def get_backend_stats(self, namespace: Optional[str] = None) -> dict:
|
||
return {}
|
||
|
||
def flush(self, namespace: Optional[str] = None) -> None:
|
||
pass
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Concrete test sequence — minimal, no Pydantic / singleton overhead
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class SampleSequence(DatabaseRecordProtocolMixin[SampleRecord]):
|
||
"""Minimal concrete implementation for unit-testing the mixin."""
|
||
|
||
def __init__(self):
|
||
self.records: list[SampleRecord] = []
|
||
self._db_record_index: dict[DatabaseTimestamp, SampleRecord] = {}
|
||
self._db_sorted_timestamps: list[DatabaseTimestamp] = []
|
||
self._db_dirty_timestamps: set[DatabaseTimestamp] = set()
|
||
self._db_new_timestamps: set[DatabaseTimestamp] = set()
|
||
self._db_deleted_timestamps: set[DatabaseTimestamp] = set()
|
||
self._db_initialized: bool = True
|
||
self._db_storage_initialized: bool = False
|
||
self._db_metadata: Optional[dict] = None
|
||
self._db_loaded_range = None
|
||
from akkudoktoreos.core.databaseabc import DatabaseRecordProtocolLoadPhase
|
||
self._db_load_phase = DatabaseRecordProtocolLoadPhase.NONE
|
||
self._db_version: int = 1
|
||
|
||
self.database = SampleDatabase()
|
||
self.config = type(
|
||
"Cfg",
|
||
(),
|
||
{
|
||
"database": type(
|
||
"DBCfg",
|
||
(),
|
||
{
|
||
"auto_save": False,
|
||
"compression_level": 0,
|
||
"autosave_interval_sec": 10,
|
||
"initial_load_window_h": None,
|
||
"keep_duration_h": None,
|
||
},
|
||
)()
|
||
},
|
||
)()
|
||
|
||
@classmethod
|
||
def record_class(cls) -> Type[SampleRecord]:
|
||
return SampleRecord
|
||
|
||
def db_namespace(self) -> str:
|
||
return "test"
|
||
|
||
@property
|
||
def record_keys_writable(self) -> list[str]:
|
||
"""Return writable field names of SampleRecord.
|
||
|
||
Required by _db_compact_tier which iterates record_keys_writable
|
||
to decide which fields to resample. Must match exactly what
|
||
key_to_array accepts — only 'value' here, not 'date_time'.
|
||
"""
|
||
return ["value"]
|
||
|
||
# Override key_to_array for the mixin tests — the full DataSequence
|
||
# implementation lives in dataabc.py; here we provide a minimal version
|
||
# that resamples the single `value` field to demonstrate compaction.
|
||
def key_to_array(
|
||
self,
|
||
key: str,
|
||
start_datetime: Optional[DateTime] = None,
|
||
end_datetime: Optional[DateTime] = None,
|
||
interval: Optional[Duration] = None,
|
||
fill_method: Optional[str] = None,
|
||
dropna: Optional[bool] = True,
|
||
boundary: Literal["strict", "context"] = "context",
|
||
align_to_interval: bool = False,
|
||
) -> NDArray[Shape["*"], Any]:
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
if interval is None:
|
||
interval = to_duration("1 hour")
|
||
|
||
dates = []
|
||
values = []
|
||
for record in self.records:
|
||
if record.date_time is None:
|
||
continue
|
||
ts = DatabaseTimestamp.from_datetime(record.date_time)
|
||
if start_datetime and DatabaseTimestamp.from_datetime(start_datetime) > ts:
|
||
continue
|
||
if end_datetime and DatabaseTimestamp.from_datetime(end_datetime) <= ts:
|
||
continue
|
||
dates.append(record.date_time)
|
||
values.append(getattr(record, key, None))
|
||
|
||
if not dates:
|
||
return np.array([])
|
||
|
||
index = pd.to_datetime(dates, utc=True)
|
||
series = pd.Series(values, index=index, dtype=float)
|
||
freq = f"{int(interval.total_seconds())}s"
|
||
origin = start_datetime if start_datetime else "start_day"
|
||
resampled = series.resample(freq, origin=origin).mean().interpolate("time")
|
||
|
||
if start_datetime is not None:
|
||
resampled = resampled.truncate(before=start_datetime)
|
||
if end_datetime is not None:
|
||
resampled = resampled.truncate(after=end_datetime)
|
||
|
||
return resampled.values
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _insert_records_every_n_minutes(
|
||
seq: SampleSequence,
|
||
base: DateTime,
|
||
count: int,
|
||
interval_minutes: int,
|
||
value_fn=None,
|
||
) -> None:
|
||
"""Insert `count` records spaced `interval_minutes` apart starting at `base`."""
|
||
for i in range(count):
|
||
dt = base.add(minutes=i * interval_minutes)
|
||
value = value_fn(i) if value_fn else float(i)
|
||
seq.db_insert_record(SampleRecord(date_time=dt, value=value))
|
||
seq.db_save_records()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fixtures
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@pytest.fixture
|
||
def seq():
|
||
return SampleSequence()
|
||
|
||
|
||
@pytest.fixture
|
||
def seq_with_15min_data():
|
||
"""Sequence with 15-min records spanning 4 weeks, so both tiers have data."""
|
||
s = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
# 4 weeks × 7 days × 24 h × 4 records/h = 2688 records
|
||
base = now.subtract(weeks=4)
|
||
_insert_records_every_n_minutes(s, base, count=2688, interval_minutes=15)
|
||
return s, now
|
||
|
||
|
||
@pytest.fixture
|
||
def seq_sparse():
|
||
"""Sequence with only 3 records spread over 4 weeks — sparse, no compaction benefit."""
|
||
s = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(weeks=4)
|
||
for offset_days in [0, 14, 27]:
|
||
dt = base.add(days=offset_days)
|
||
s.db_insert_record(SampleRecord(date_time=dt, value=float(offset_days)))
|
||
s.db_save_records()
|
||
return s, now
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Existing tests (unchanged)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestDatabaseRecordProtocolMixin:
|
||
|
||
@pytest.mark.parametrize(
|
||
"start_str, value_count, interval_seconds",
|
||
[
|
||
("2024-11-10 00:00:00", 24, 3600),
|
||
("2024-08-10 00:00:00", 24, 3600),
|
||
("2024-03-31 00:00:00", 24, 3600),
|
||
("2024-10-27 00:00:00", 24, 3600),
|
||
],
|
||
)
|
||
def test_db_generate_timestamps_utc_spacing(
|
||
self, seq, start_str, value_count, interval_seconds
|
||
):
|
||
start_dt = to_datetime(start_str, in_timezone="Europe/Berlin")
|
||
assert start_dt.tz.name == "Europe/Berlin"
|
||
|
||
db_start = DatabaseTimestamp.from_datetime(start_dt)
|
||
generated = list(seq.db_generate_timestamps(db_start, value_count))
|
||
|
||
assert len(generated) == value_count
|
||
|
||
for db_dt in generated:
|
||
dt = DatabaseTimestamp.to_datetime(db_dt)
|
||
assert dt.tz.name == "UTC"
|
||
|
||
assert len(generated) == len(set(generated)), "Duplicate UTC datetimes found"
|
||
|
||
for i in range(1, len(generated)):
|
||
last_dt = DatabaseTimestamp.to_datetime(generated[i - 1])
|
||
current_dt = DatabaseTimestamp.to_datetime(generated[i])
|
||
delta = (current_dt - last_dt).total_seconds()
|
||
assert delta == interval_seconds, f"Spacing mismatch at index {i}: {delta}s"
|
||
|
||
def test_insert_and_memory_range(self, seq):
|
||
t0 = to_datetime()
|
||
t1 = t0.add(hours=1)
|
||
|
||
seq.db_insert_record(SampleRecord(date_time=t0, value=1))
|
||
seq.db_insert_record(SampleRecord(date_time=t1, value=2))
|
||
|
||
assert seq.records[0].date_time == t0
|
||
assert seq.records[-1].date_time == t1
|
||
assert len(seq.records) == 2
|
||
|
||
def test_roundtrip_reload(self):
|
||
seq = SampleSequence()
|
||
t0 = to_datetime()
|
||
t1 = t0.add(hours=1)
|
||
|
||
seq.db_insert_record(SampleRecord(date_time=t0, value=1))
|
||
seq.db_insert_record(SampleRecord(date_time=t1, value=2))
|
||
assert seq.db_save_records() == 2
|
||
|
||
db = seq.database
|
||
seq2 = SampleSequence()
|
||
seq2.database = db
|
||
loaded = seq2.db_load_records()
|
||
|
||
assert loaded == 2
|
||
assert len(seq2.records) == 2
|
||
|
||
def test_db_count_records(self, seq):
|
||
t0 = to_datetime()
|
||
seq.db_insert_record(SampleRecord(date_time=t0, value=1))
|
||
assert seq.db_count_records() == 1
|
||
seq.db_save_records()
|
||
assert seq.db_count_records() == 1
|
||
|
||
def test_delete_range(self, seq):
|
||
base = to_datetime()
|
||
for i in range(5):
|
||
seq.db_insert_record(SampleRecord(date_time=base.add(minutes=i), value=i))
|
||
|
||
db_start = DatabaseTimestamp.from_datetime(base.add(minutes=1))
|
||
db_end = DatabaseTimestamp.from_datetime(base.add(minutes=4))
|
||
deleted = seq.db_delete_records(start_timestamp=db_start, end_timestamp=db_end)
|
||
|
||
assert deleted == 3
|
||
assert [r.value for r in seq.records] == [0, 4]
|
||
|
||
def test_db_count_records_memory_only_multiple(self):
|
||
seq = SampleSequence()
|
||
base = to_datetime()
|
||
for i in range(3):
|
||
seq.db_insert_record(SampleRecord(date_time=base.add(minutes=i), value=i))
|
||
assert seq.db_count_records() == 3
|
||
|
||
def test_db_count_records_memory_newer_than_db(self):
|
||
seq = SampleSequence()
|
||
base = to_datetime()
|
||
seq.db_insert_record(SampleRecord(date_time=base, value=1))
|
||
seq.db_save_records()
|
||
seq.db_insert_record(SampleRecord(date_time=base.add(hours=1), value=2))
|
||
seq.db_insert_record(SampleRecord(date_time=base.add(hours=2), value=3))
|
||
assert seq.db_count_records() == 3
|
||
|
||
def test_db_count_records_memory_older_than_db(self):
|
||
seq = SampleSequence()
|
||
base = to_datetime()
|
||
seq.db_insert_record(SampleRecord(date_time=base.add(hours=1), value=2))
|
||
seq.db_save_records()
|
||
seq.db_insert_record(SampleRecord(date_time=base, value=1))
|
||
assert seq.db_count_records() == 2
|
||
|
||
def test_db_count_records_empty_everywhere(self):
|
||
seq = SampleSequence()
|
||
assert seq.db_count_records() == 0
|
||
|
||
def test_metadata_not_counted(self, seq):
|
||
seq.database._data.setdefault("test", {})[DATABASE_METADATA_KEY] = b"meta"
|
||
assert seq.db_count_records() == 0
|
||
|
||
def test_key_range_excludes_metadata(self, seq):
|
||
ns = seq.db_namespace()
|
||
seq.database._data.setdefault(ns, {})[DATABASE_METADATA_KEY] = b"meta"
|
||
assert seq.database.get_key_range(ns) == (None, None)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Compaction tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestCompactTiers:
|
||
"""Tests for db_compact_tiers() and the tier hook."""
|
||
|
||
def test_default_tiers_returns_two_entries(self, seq):
|
||
tiers = seq.db_compact_tiers()
|
||
assert len(tiers) == 2
|
||
|
||
def test_default_tiers_ordered_shortest_first(self, seq):
|
||
tiers = seq.db_compact_tiers()
|
||
ages = [t[0].total_seconds() for t in tiers]
|
||
assert ages == sorted(ages), "Tiers must be ordered shortest age first"
|
||
|
||
def test_default_tiers_first_is_2h_to_15min(self, seq):
|
||
tiers = seq.db_compact_tiers()
|
||
age_sec, interval_sec = (
|
||
tiers[0][0].total_seconds(),
|
||
tiers[0][1].total_seconds(),
|
||
)
|
||
assert age_sec == 2 * 3600
|
||
assert interval_sec == 15 * 60
|
||
|
||
def test_default_tiers_second_is_2weeks_to_1h(self, seq):
|
||
tiers = seq.db_compact_tiers()
|
||
age_sec, interval_sec = (
|
||
tiers[1][0].total_seconds(),
|
||
tiers[1][1].total_seconds(),
|
||
)
|
||
assert age_sec == 14 * 24 * 3600
|
||
assert interval_sec == 3600
|
||
|
||
def test_override_tiers(self):
|
||
class CustomSeq(SampleSequence):
|
||
def db_compact_tiers(self):
|
||
return [(to_duration("7 days"), to_duration("1 hour"))]
|
||
|
||
s = CustomSeq()
|
||
tiers = s.db_compact_tiers()
|
||
assert len(tiers) == 1
|
||
assert tiers[0][1].total_seconds() == 3600
|
||
|
||
def test_empty_tiers_disables_compaction(self):
|
||
class NoCompactSeq(SampleSequence):
|
||
def db_compact_tiers(self):
|
||
return []
|
||
|
||
s = NoCompactSeq()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(weeks=4)
|
||
_insert_records_every_n_minutes(s, base, count=100, interval_minutes=15)
|
||
|
||
deleted = s.db_compact()
|
||
assert deleted == 0
|
||
|
||
|
||
class TestCompactState:
|
||
"""Tests for _db_get_compact_state / _db_set_compact_state."""
|
||
|
||
def test_get_state_returns_none_when_no_metadata(self, seq):
|
||
interval = to_duration("1 hour")
|
||
assert seq._db_get_compact_state(interval) is None
|
||
|
||
def test_set_and_get_state_roundtrip(self, seq):
|
||
interval = to_duration("1 hour")
|
||
now = to_datetime().in_timezone("UTC")
|
||
ts = DatabaseTimestamp.from_datetime(now)
|
||
|
||
seq._db_set_compact_state(interval, ts)
|
||
retrieved = seq._db_get_compact_state(interval)
|
||
|
||
assert retrieved == ts
|
||
|
||
def test_state_is_per_tier(self, seq):
|
||
"""Different tier intervals must not overwrite each other."""
|
||
interval_15min = to_duration("15 minutes")
|
||
interval_1h = to_duration("1 hour")
|
||
|
||
now = to_datetime().in_timezone("UTC")
|
||
ts_15 = DatabaseTimestamp.from_datetime(now)
|
||
ts_1h = DatabaseTimestamp.from_datetime(now.subtract(days=1))
|
||
|
||
seq._db_set_compact_state(interval_15min, ts_15)
|
||
seq._db_set_compact_state(interval_1h, ts_1h)
|
||
|
||
assert seq._db_get_compact_state(interval_15min) == ts_15
|
||
assert seq._db_get_compact_state(interval_1h) == ts_1h
|
||
|
||
def test_state_persists_in_metadata(self, seq):
|
||
"""State must survive a metadata reload."""
|
||
interval = to_duration("1 hour")
|
||
now = to_datetime().in_timezone("UTC")
|
||
ts = DatabaseTimestamp.from_datetime(now)
|
||
|
||
seq._db_set_compact_state(interval, ts)
|
||
|
||
# Reload metadata from fake DB
|
||
seq2 = SampleSequence()
|
||
seq2.database = seq.database
|
||
seq2._db_metadata = seq2._db_load_metadata()
|
||
|
||
assert seq2._db_get_compact_state(interval) == ts
|
||
|
||
|
||
class TestCompactSparseGuard:
|
||
"""The inflation guard must skip compaction when records are already sparse."""
|
||
|
||
def test_sparse_data_aligns_but_does_not_reduce_cardinality(self, seq_sparse):
|
||
"""Sparse data must be aligned to the target interval for all records that were modified."""
|
||
seq, _ = seq_sparse
|
||
|
||
interval = to_duration("15 minutes")
|
||
interval_sec = int(interval.total_seconds())
|
||
|
||
# Snapshot original timestamps
|
||
before_epochs = {
|
||
int(r.date_time.timestamp())
|
||
for r in seq.records
|
||
}
|
||
|
||
seq._db_compact_tier(
|
||
to_duration("30 minutes"),
|
||
interval,
|
||
)
|
||
|
||
after_epochs = {
|
||
int(r.date_time.timestamp())
|
||
for r in seq.records
|
||
}
|
||
|
||
# Cardinality must not increase
|
||
assert len(after_epochs) <= len(before_epochs)
|
||
|
||
# Any timestamp that changed must now be aligned
|
||
changed_epochs = after_epochs - before_epochs
|
||
|
||
for epoch in changed_epochs:
|
||
assert epoch % interval_sec == 0
|
||
|
||
def test_sparse_guard_advances_cutoff(self, seq_sparse):
|
||
"""Even when skipped, the cutoff should be stored so next run skips the same window."""
|
||
seq, _ = seq_sparse
|
||
interval_1h = to_duration("1 hour")
|
||
interval_15min = to_duration("15 minutes")
|
||
|
||
seq.db_compact()
|
||
|
||
# Both tiers should have stored a cutoff even though nothing was deleted
|
||
assert seq._db_get_compact_state(interval_1h) is not None
|
||
assert seq._db_get_compact_state(interval_15min) is not None
|
||
|
||
def test_exactly_at_boundary_remains_stable(self, seq):
|
||
now = to_datetime().in_timezone("UTC")
|
||
interval = to_duration("1 hour")
|
||
|
||
raw_base = now.subtract(hours=5).set(minute=0, second=0, microsecond=0)
|
||
base = raw_base.subtract(seconds=int(raw_base.timestamp()) % 3600)
|
||
|
||
for i in range(4):
|
||
seq.db_insert_record(
|
||
SampleRecord(
|
||
date_time=base.add(hours=i),
|
||
value=float(i),
|
||
)
|
||
)
|
||
|
||
seq.db_insert_record(
|
||
SampleRecord(date_time=now.subtract(seconds=1), value=0.0)
|
||
)
|
||
seq.db_save_records()
|
||
|
||
before = [
|
||
(int(r.date_time.timestamp()), r.value)
|
||
for r in seq.records
|
||
]
|
||
|
||
seq._db_compact_tier(
|
||
to_duration("30 minutes"),
|
||
interval,
|
||
)
|
||
|
||
after = [
|
||
(int(r.date_time.timestamp()), r.value)
|
||
for r in seq.records
|
||
]
|
||
|
||
assert before == after
|
||
|
||
|
||
class TestCompactTierWorker:
|
||
"""Unit tests for _db_compact_tier directly."""
|
||
|
||
def test_empty_sequence_returns_zero(self, seq):
|
||
age = to_duration("2 hours")
|
||
interval = to_duration("15 minutes")
|
||
assert seq._db_compact_tier(age, interval) == 0
|
||
|
||
def test_all_records_too_recent_skipped(self):
|
||
"""Records within the age threshold must not be touched."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
# Insert 10 records from 30 minutes ago — all within 2h threshold
|
||
base = now.subtract(minutes=30)
|
||
_insert_records_every_n_minutes(seq, base, count=10, interval_minutes=1)
|
||
|
||
before = seq.db_count_records()
|
||
deleted = seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
assert deleted == 0
|
||
assert seq.db_count_records() == before
|
||
|
||
def test_compaction_reduces_record_count(self):
|
||
"""Dense 1-min records older than 2h should be downsampled to 15-min."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
# Insert 1-min records for 6 hours ending 3 hours ago
|
||
base = now.subtract(hours=9)
|
||
_insert_records_every_n_minutes(seq, base, count=6 * 60, interval_minutes=1)
|
||
|
||
before = seq.db_count_records()
|
||
deleted = seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
after = seq.db_count_records()
|
||
assert deleted > 0
|
||
assert after < before
|
||
|
||
def test_records_within_threshold_preserved(self):
|
||
"""Records newer than age_threshold must remain untouched after compaction."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
|
||
# Old dense records (will be compacted)
|
||
old_base = now.subtract(hours=6)
|
||
_insert_records_every_n_minutes(seq, old_base, count=4 * 60, interval_minutes=1)
|
||
|
||
# Recent records (must not be touched) — insert 5 records in the last hour
|
||
recent_base = now.subtract(minutes=50)
|
||
_insert_records_every_n_minutes(seq, recent_base, count=5, interval_minutes=10)
|
||
|
||
recent_before = [
|
||
r for r in seq.records
|
||
if r.date_time and r.date_time >= recent_base
|
||
]
|
||
|
||
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
recent_after = [
|
||
r for r in seq.records
|
||
if r.date_time and r.date_time >= recent_base
|
||
]
|
||
assert len(recent_after) == len(recent_before)
|
||
|
||
def test_incremental_cutoff_prevents_recompaction(self):
|
||
"""Running compaction twice must not re-compact already-compacted data."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(hours=8)
|
||
_insert_records_every_n_minutes(seq, base, count=5 * 60, interval_minutes=1)
|
||
|
||
age = to_duration("2 hours")
|
||
interval = to_duration("15 minutes")
|
||
|
||
deleted_first = seq._db_compact_tier(age, interval)
|
||
count_after_first = seq.db_count_records()
|
||
|
||
deleted_second = seq._db_compact_tier(age, interval)
|
||
count_after_second = seq.db_count_records()
|
||
|
||
assert deleted_first > 0
|
||
assert deleted_second == 0, "Second run must be a no-op"
|
||
assert count_after_first == count_after_second
|
||
|
||
def test_cutoff_stored_after_compaction(self):
|
||
"""Cutoff timestamp must be persisted after a successful compaction run."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(hours=8)
|
||
_insert_records_every_n_minutes(seq, base, count=5 * 60, interval_minutes=1)
|
||
|
||
interval = to_duration("15 minutes")
|
||
seq._db_compact_tier(to_duration("2 hours"), interval)
|
||
|
||
assert seq._db_get_compact_state(interval) is not None
|
||
|
||
|
||
class TestDbCompact:
|
||
"""Integration tests for the public db_compact() entry point."""
|
||
|
||
def test_compact_dense_data_both_tiers(self, seq_with_15min_data):
|
||
"""4 weeks of 15-min data should be reduced by both tiers."""
|
||
seq, _ = seq_with_15min_data
|
||
before = seq.db_count_records()
|
||
|
||
total_deleted = seq.db_compact()
|
||
|
||
after = seq.db_count_records()
|
||
assert total_deleted > 0
|
||
assert after < before
|
||
|
||
def test_compact_coarsest_tier_runs_first(self, seq_with_15min_data):
|
||
"""The 1-hour tier (coarsest) must run before the 15-min tier.
|
||
|
||
If coarsest ran last it would re-compact records the 15-min tier
|
||
had already downsampled — verified by checking that the 1-hour
|
||
cutoff is not later than the 15-min cutoff.
|
||
"""
|
||
seq, _ = seq_with_15min_data
|
||
seq.db_compact()
|
||
|
||
cutoff_1h = seq._db_get_compact_state(to_duration("1 hour"))
|
||
cutoff_15min = seq._db_get_compact_state(to_duration("15 minutes"))
|
||
|
||
assert cutoff_1h is not None
|
||
assert cutoff_15min is not None
|
||
# The 1h tier covers older data → its cutoff must be earlier than 15min tier
|
||
assert cutoff_1h <= cutoff_15min
|
||
|
||
def test_compact_idempotent(self, seq_with_15min_data):
|
||
"""Running db_compact twice must not change record count."""
|
||
seq, _ = seq_with_15min_data
|
||
seq.db_compact()
|
||
after_first = seq.db_count_records()
|
||
|
||
seq.db_compact()
|
||
after_second = seq.db_count_records()
|
||
|
||
assert after_first == after_second
|
||
|
||
def test_compact_empty_sequence_returns_zero(self, seq):
|
||
assert seq.db_compact() == 0
|
||
|
||
def test_compact_with_override_tiers(self):
|
||
"""Passing compact_tiers directly must override db_compact_tiers()."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(weeks=3)
|
||
_insert_records_every_n_minutes(seq, base, count=3 * 7 * 24 * 4, interval_minutes=15)
|
||
|
||
before = seq.db_count_records()
|
||
deleted = seq.db_compact(
|
||
compact_tiers=[(to_duration("1 day"), to_duration("1 hour"))]
|
||
)
|
||
|
||
assert deleted > 0
|
||
assert seq.db_count_records() < before
|
||
|
||
def test_compact_only_processes_new_window_on_second_call(self):
|
||
"""Second call processes only the new window, not the full history."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
# Floor to the minute to avoid sub-minute microseconds causing duplicate
|
||
# timestamps when interval arithmetic lands exactly on `base`.
|
||
now_floored = now.set(second=0, microsecond=0)
|
||
base = now_floored.subtract(weeks=3)
|
||
# Dense 1-min data for 3 weeks
|
||
_insert_records_every_n_minutes(seq, base, count=3 * 7 * 24 * 60, interval_minutes=1)
|
||
|
||
seq.db_compact()
|
||
count_after_first = seq.db_count_records()
|
||
|
||
# Start 2 days before `base` and insert only 1 day worth of records,
|
||
# so the window [extra_base, extra_base + 1439min] stays entirely
|
||
# before `base - 1day` and never collides with compacted timestamps
|
||
# that were snapped to clean hour/15-min boundaries inside the original range.
|
||
extra_base = now_floored.subtract(weeks=3).subtract(days=2)
|
||
_insert_records_every_n_minutes(seq, extra_base, count=24 * 60, interval_minutes=1)
|
||
|
||
seq.db_compact()
|
||
count_after_second = seq.db_count_records()
|
||
|
||
# Second compact should have processed the newly added old data
|
||
# Record count may change but should not exceed first compacted count by much
|
||
assert count_after_second >= 0 # basic sanity
|
||
|
||
|
||
class TestCompactDataIntegrity:
|
||
"""Verify value integrity is preserved after compaction."""
|
||
|
||
def test_constant_value_preserved(self):
|
||
"""Constant value field must survive mean-resampling unchanged."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(hours=6)
|
||
|
||
# All values = 42.0
|
||
_insert_records_every_n_minutes(
|
||
seq, base, count=6 * 60, interval_minutes=1, value_fn=lambda _: 42.0
|
||
)
|
||
|
||
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
for record in seq.records:
|
||
if record.date_time and record.date_time < now.subtract(hours=2):
|
||
assert record.value == pytest.approx(42.0, abs=1e-6)
|
||
|
||
def test_recent_records_not_modified(self):
|
||
"""Records newer than the age threshold must have unchanged values."""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
|
||
old_base = now.subtract(hours=6)
|
||
_insert_records_every_n_minutes(seq, old_base, count=3 * 60, interval_minutes=1)
|
||
|
||
# Known recent values
|
||
recent_base = now.subtract(minutes=30)
|
||
expected = {i * 10: float(100 + i) for i in range(3)}
|
||
for offset, val in expected.items():
|
||
dt = recent_base.add(minutes=offset)
|
||
seq.db_insert_record(SampleRecord(date_time=dt, value=val))
|
||
seq.db_save_records()
|
||
|
||
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
for record in seq.records:
|
||
if record.date_time and record.date_time >= recent_base:
|
||
offset = int((record.date_time - recent_base).total_seconds() / 60)
|
||
if offset in expected:
|
||
assert record.value == pytest.approx(expected[offset], abs=1e-6)
|
||
|
||
def test_compacted_timestamps_spacing(self):
|
||
"""Resampled records must be fewer than original and span the compaction window.
|
||
|
||
Exact per-bucket spacing depends on the full DataSequence.key_to_array
|
||
implementation (pandas resampling). The stub key_to_array in SampleSequence
|
||
only guarantees a reduction in count — uniform spacing is verified in
|
||
test_dataabc_compact.py against the real implementation.
|
||
"""
|
||
seq = SampleSequence()
|
||
now = to_datetime().in_timezone("UTC")
|
||
base = now.subtract(hours=6)
|
||
_insert_records_every_n_minutes(seq, base, count=5 * 60, interval_minutes=1)
|
||
|
||
before = seq.db_count_records()
|
||
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
|
||
|
||
cutoff = now.subtract(hours=2)
|
||
compacted = sorted(
|
||
[r for r in seq.records if r.date_time and r.date_time < cutoff],
|
||
key=lambda r: cast(DateTime, r.date_time),
|
||
)
|
||
|
||
# Must have produced fewer records than the original 1-min data
|
||
assert len(compacted) > 0, "Expected at least one compacted record"
|
||
assert len(compacted) < before, "Compaction must reduce record count"
|
||
|
||
# Window start is floored to interval boundary
|
||
interval_sec = 15 * 60
|
||
expected_window_start = DateTime.fromtimestamp(
|
||
(int(base.timestamp()) // interval_sec) * interval_sec,
|
||
tz="UTC",
|
||
)
|
||
assert compacted[0].date_time >= expected_window_start
|
||
|
||
# Last compacted record must be before the cutoff
|
||
assert compacted[-1].date_time < cutoff
|