Files
EOS/tests/test_databaseabc.py
Bobby Noelte 6498c7dc32 Add database support for measurements and historic prediction data. (#848)
The database supports backend selection, compression, incremental data load,
automatic data saving to storage, automatic vaccum and compaction.

Make SQLite3 and LMDB database backends available.

Update tests for new interface conventions regarding data sequences,
data containers, data providers. This includes the measurements provider and
the prediction providers.

Add database documentation.

The fix includes several bug fixes that are not directly related to the database
implementation but are necessary to keep EOS running properly and to test and
document the changes.

* fix: config eos test setup

  Make the config_eos fixture generate a new instance of the config_eos singleton.
  Use correct env names to setup data folder path.

* fix: startup with no config

  Make cache and measurements complain about missing data path configuration but
  do not bail out.

* fix: soc data preparation and usage for genetic optimization.

  Search for soc measurments 48 hours around the optimization start time.
  Only clamp soc to maximum in battery device simulation.

* fix: dashboard bailout on zero value solution display

  Do not use zero values to calculate the chart values adjustment for display.

* fix: openapi generation script

  Make the script also replace data_folder_path and data_output_path to hide
  real (test) environment pathes.

* feat: add make repeated task function

  make_repeated_task allows to wrap a function to be repeated cyclically.

* chore: removed index based data sequence access

  Index based data sequence access does not make sense as the sequence can be backed
  by the database. The sequence is now purely time series data.

* chore: refactor eos startup to avoid module import startup

  Avoid module import initialisation expecially of the EOS configuration.
  Config mutation, singleton initialization, logging setup, argparse parsing,
  background task definitions depending on config and environment-dependent behavior
  is now done at function startup.

* chore: introduce retention manager

  A single long-running background task that owns the scheduling of all periodic
  server-maintenance jobs (cache cleanup, DB autosave, …)

* chore: canonicalize timezone name for UTC

  Timezone names that are semantically identical to UTC are canonicalized to UTC.

* chore: extend config file migration for default value handling

  Extend the config file migration handling values None or nonexisting values
  that will invoke a default value generation in the new config file. Also
  adapt test to handle this situation.

* chore: extend datetime util test cases

* chore: make version test check for untracked files

  Check for files that are not tracked by git. Version calculation will be
  wrong if these files will not be commited.

* chore: bump pandas to 3.0.0

  Pandas 3.0 now performs inference on the appropriate resolution (a.k.a. unit)
  for the output dtype which may become datetime64[us] (before it was ns). Also
  numeric dtype detection is now more strict which needs a different detection for
  numerics.

* chore: bump pydantic-settings to 2.12.0

  pydantic-settings 2.12.0 under pytest creates a different behaviour. The tests
  were adapted and a workaround was introduced. Also ConfigEOS was adapted
  to allow for fine grain initialization control to be able to switch
  off certain settings such as file settings during test.

* chore: remove sci learn kit from dependencies

  The sci learn kit is not strictly necessary as long as we have scipy.

* chore: add documentation mode guarding for sphinx autosummary

  Sphinx autosummary excecutes functions. Prevent exceptions in case of pure doc
  mode.

* chore: adapt docker-build CI workflow to stricter GitHub handling

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
2026-02-22 14:12:42 +01:00

889 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Any, Iterator, Literal, Optional, Type, cast
import pytest
from numpydantic import NDArray, Shape
from pydantic import BaseModel, Field
from akkudoktoreos.core.databaseabc import (
DATABASE_METADATA_KEY,
DatabaseRecordProtocolMixin,
DatabaseTimestamp,
_DatabaseTimestampUnbound,
)
from akkudoktoreos.utils.datetimeutil import (
DateTime,
Duration,
to_datetime,
to_duration,
)
# ---------------------------------------------------------------------------
# Test record
# ---------------------------------------------------------------------------
class SampleRecord(BaseModel):
date_time: Optional[DateTime] = Field(
default=None, json_schema_extra={"description": "DateTime"}
)
value: Optional[float] = None
def __getitem__(self, key: str) -> Any:
if key == "date_time":
return self.date_time
if key == "value":
return self.value
assert key is None
return None
# ---------------------------------------------------------------------------
# Fake database backend
# ---------------------------------------------------------------------------
class SampleDatabase:
def __init__(self):
self._data: dict[Optional[str], dict[bytes, bytes]] = {}
self._metadata: Optional[bytes] = None
self.is_open = True
self.compression = False
self.compression_level = 0
self.storage_path = "/fake"
# serialization (pass-through)
def serialize_data(self, data: bytes) -> bytes:
return data
def deserialize_data(self, data: bytes) -> bytes:
return data
# metadata
def set_metadata(self, metadata: Optional[bytes], *, namespace: Optional[str] = None) -> None:
self._metadata = metadata
def get_metadata(self, namespace: Optional[str] = None) -> Optional[bytes]:
return self._metadata
# write
def save_records(
self, records: list[tuple[bytes, bytes]], namespace: Optional[str] = None
) -> int:
ns = self._data.setdefault(namespace, {})
saved = 0
for key, value in records:
ns[key] = value
saved += 1
return saved
def delete_records(
self, keys: Iterator[bytes], namespace: Optional[str] = None
) -> int:
ns_data = self._data.get(namespace, {})
deleted = 0
for key in keys:
if key in ns_data:
del ns_data[key]
deleted += 1
return deleted
# read
def iterate_records(
self,
start_key: Optional[bytes] = None,
end_key: Optional[bytes] = None,
namespace: Optional[str] = None,
reverse: bool = False,
) -> Iterator[tuple[bytes, bytes]]:
items = self._data.get(namespace, {})
keys = sorted(items, reverse=reverse)
for k in keys:
if k == DATABASE_METADATA_KEY:
continue
if start_key and k < start_key:
continue
if end_key and k >= end_key:
continue
yield k, items[k]
# stats
def count_records(
self,
start_key: Optional[bytes] = None,
end_key: Optional[bytes] = None,
*,
namespace: Optional[str] = None,
) -> int:
items = self._data.get(namespace, {})
count = 0
for k in items:
if k == DATABASE_METADATA_KEY:
continue
if start_key and k < start_key:
continue
if end_key and k >= end_key:
continue
count += 1
return count
def get_key_range(
self, namespace: Optional[str] = None
) -> tuple[Optional[bytes], Optional[bytes]]:
items = self._data.get(namespace, {})
keys = sorted(k for k in items if k != DATABASE_METADATA_KEY)
if not keys:
return None, None
return keys[0], keys[-1]
def get_backend_stats(self, namespace: Optional[str] = None) -> dict:
return {}
def flush(self, namespace: Optional[str] = None) -> None:
pass
# ---------------------------------------------------------------------------
# Concrete test sequence — minimal, no Pydantic / singleton overhead
# ---------------------------------------------------------------------------
class SampleSequence(DatabaseRecordProtocolMixin[SampleRecord]):
"""Minimal concrete implementation for unit-testing the mixin."""
def __init__(self):
self.records: list[SampleRecord] = []
self._db_record_index: dict[DatabaseTimestamp, SampleRecord] = {}
self._db_sorted_timestamps: list[DatabaseTimestamp] = []
self._db_dirty_timestamps: set[DatabaseTimestamp] = set()
self._db_new_timestamps: set[DatabaseTimestamp] = set()
self._db_deleted_timestamps: set[DatabaseTimestamp] = set()
self._db_initialized: bool = True
self._db_storage_initialized: bool = False
self._db_metadata: Optional[dict] = None
self._db_loaded_range = None
from akkudoktoreos.core.databaseabc import DatabaseRecordProtocolLoadPhase
self._db_load_phase = DatabaseRecordProtocolLoadPhase.NONE
self._db_version: int = 1
self.database = SampleDatabase()
self.config = type(
"Cfg",
(),
{
"database": type(
"DBCfg",
(),
{
"auto_save": False,
"compression_level": 0,
"autosave_interval_sec": 10,
"initial_load_window_h": None,
"keep_duration_h": None,
},
)()
},
)()
@classmethod
def record_class(cls) -> Type[SampleRecord]:
return SampleRecord
def db_namespace(self) -> str:
return "test"
@property
def record_keys_writable(self) -> list[str]:
"""Return writable field names of SampleRecord.
Required by _db_compact_tier which iterates record_keys_writable
to decide which fields to resample. Must match exactly what
key_to_array accepts — only 'value' here, not 'date_time'.
"""
return ["value"]
# Override key_to_array for the mixin tests — the full DataSequence
# implementation lives in dataabc.py; here we provide a minimal version
# that resamples the single `value` field to demonstrate compaction.
def key_to_array(
self,
key: str,
start_datetime: Optional[DateTime] = None,
end_datetime: Optional[DateTime] = None,
interval: Optional[Duration] = None,
fill_method: Optional[str] = None,
dropna: Optional[bool] = True,
boundary: Literal["strict", "context"] = "context",
align_to_interval: bool = False,
) -> NDArray[Shape["*"], Any]:
import numpy as np
import pandas as pd
if interval is None:
interval = to_duration("1 hour")
dates = []
values = []
for record in self.records:
if record.date_time is None:
continue
ts = DatabaseTimestamp.from_datetime(record.date_time)
if start_datetime and DatabaseTimestamp.from_datetime(start_datetime) > ts:
continue
if end_datetime and DatabaseTimestamp.from_datetime(end_datetime) <= ts:
continue
dates.append(record.date_time)
values.append(getattr(record, key, None))
if not dates:
return np.array([])
index = pd.to_datetime(dates, utc=True)
series = pd.Series(values, index=index, dtype=float)
freq = f"{int(interval.total_seconds())}s"
origin = start_datetime if start_datetime else "start_day"
resampled = series.resample(freq, origin=origin).mean().interpolate("time")
if start_datetime is not None:
resampled = resampled.truncate(before=start_datetime)
if end_datetime is not None:
resampled = resampled.truncate(after=end_datetime)
return resampled.values
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _insert_records_every_n_minutes(
seq: SampleSequence,
base: DateTime,
count: int,
interval_minutes: int,
value_fn=None,
) -> None:
"""Insert `count` records spaced `interval_minutes` apart starting at `base`."""
for i in range(count):
dt = base.add(minutes=i * interval_minutes)
value = value_fn(i) if value_fn else float(i)
seq.db_insert_record(SampleRecord(date_time=dt, value=value))
seq.db_save_records()
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def seq():
return SampleSequence()
@pytest.fixture
def seq_with_15min_data():
"""Sequence with 15-min records spanning 4 weeks, so both tiers have data."""
s = SampleSequence()
now = to_datetime().in_timezone("UTC")
# 4 weeks × 7 days × 24 h × 4 records/h = 2688 records
base = now.subtract(weeks=4)
_insert_records_every_n_minutes(s, base, count=2688, interval_minutes=15)
return s, now
@pytest.fixture
def seq_sparse():
"""Sequence with only 3 records spread over 4 weeks — sparse, no compaction benefit."""
s = SampleSequence()
now = to_datetime().in_timezone("UTC")
base = now.subtract(weeks=4)
for offset_days in [0, 14, 27]:
dt = base.add(days=offset_days)
s.db_insert_record(SampleRecord(date_time=dt, value=float(offset_days)))
s.db_save_records()
return s, now
# ---------------------------------------------------------------------------
# Existing tests (unchanged)
# ---------------------------------------------------------------------------
class TestDatabaseRecordProtocolMixin:
@pytest.mark.parametrize(
"start_str, value_count, interval_seconds",
[
("2024-11-10 00:00:00", 24, 3600),
("2024-08-10 00:00:00", 24, 3600),
("2024-03-31 00:00:00", 24, 3600),
("2024-10-27 00:00:00", 24, 3600),
],
)
def test_db_generate_timestamps_utc_spacing(
self, seq, start_str, value_count, interval_seconds
):
start_dt = to_datetime(start_str, in_timezone="Europe/Berlin")
assert start_dt.tz.name == "Europe/Berlin"
db_start = DatabaseTimestamp.from_datetime(start_dt)
generated = list(seq.db_generate_timestamps(db_start, value_count))
assert len(generated) == value_count
for db_dt in generated:
dt = DatabaseTimestamp.to_datetime(db_dt)
assert dt.tz.name == "UTC"
assert len(generated) == len(set(generated)), "Duplicate UTC datetimes found"
for i in range(1, len(generated)):
last_dt = DatabaseTimestamp.to_datetime(generated[i - 1])
current_dt = DatabaseTimestamp.to_datetime(generated[i])
delta = (current_dt - last_dt).total_seconds()
assert delta == interval_seconds, f"Spacing mismatch at index {i}: {delta}s"
def test_insert_and_memory_range(self, seq):
t0 = to_datetime()
t1 = t0.add(hours=1)
seq.db_insert_record(SampleRecord(date_time=t0, value=1))
seq.db_insert_record(SampleRecord(date_time=t1, value=2))
assert seq.records[0].date_time == t0
assert seq.records[-1].date_time == t1
assert len(seq.records) == 2
def test_roundtrip_reload(self):
seq = SampleSequence()
t0 = to_datetime()
t1 = t0.add(hours=1)
seq.db_insert_record(SampleRecord(date_time=t0, value=1))
seq.db_insert_record(SampleRecord(date_time=t1, value=2))
assert seq.db_save_records() == 2
db = seq.database
seq2 = SampleSequence()
seq2.database = db
loaded = seq2.db_load_records()
assert loaded == 2
assert len(seq2.records) == 2
def test_db_count_records(self, seq):
t0 = to_datetime()
seq.db_insert_record(SampleRecord(date_time=t0, value=1))
assert seq.db_count_records() == 1
seq.db_save_records()
assert seq.db_count_records() == 1
def test_delete_range(self, seq):
base = to_datetime()
for i in range(5):
seq.db_insert_record(SampleRecord(date_time=base.add(minutes=i), value=i))
db_start = DatabaseTimestamp.from_datetime(base.add(minutes=1))
db_end = DatabaseTimestamp.from_datetime(base.add(minutes=4))
deleted = seq.db_delete_records(start_timestamp=db_start, end_timestamp=db_end)
assert deleted == 3
assert [r.value for r in seq.records] == [0, 4]
def test_db_count_records_memory_only_multiple(self):
seq = SampleSequence()
base = to_datetime()
for i in range(3):
seq.db_insert_record(SampleRecord(date_time=base.add(minutes=i), value=i))
assert seq.db_count_records() == 3
def test_db_count_records_memory_newer_than_db(self):
seq = SampleSequence()
base = to_datetime()
seq.db_insert_record(SampleRecord(date_time=base, value=1))
seq.db_save_records()
seq.db_insert_record(SampleRecord(date_time=base.add(hours=1), value=2))
seq.db_insert_record(SampleRecord(date_time=base.add(hours=2), value=3))
assert seq.db_count_records() == 3
def test_db_count_records_memory_older_than_db(self):
seq = SampleSequence()
base = to_datetime()
seq.db_insert_record(SampleRecord(date_time=base.add(hours=1), value=2))
seq.db_save_records()
seq.db_insert_record(SampleRecord(date_time=base, value=1))
assert seq.db_count_records() == 2
def test_db_count_records_empty_everywhere(self):
seq = SampleSequence()
assert seq.db_count_records() == 0
def test_metadata_not_counted(self, seq):
seq.database._data.setdefault("test", {})[DATABASE_METADATA_KEY] = b"meta"
assert seq.db_count_records() == 0
def test_key_range_excludes_metadata(self, seq):
ns = seq.db_namespace()
seq.database._data.setdefault(ns, {})[DATABASE_METADATA_KEY] = b"meta"
assert seq.database.get_key_range(ns) == (None, None)
# ---------------------------------------------------------------------------
# Compaction tests
# ---------------------------------------------------------------------------
class TestCompactTiers:
"""Tests for db_compact_tiers() and the tier hook."""
def test_default_tiers_returns_two_entries(self, seq):
tiers = seq.db_compact_tiers()
assert len(tiers) == 2
def test_default_tiers_ordered_shortest_first(self, seq):
tiers = seq.db_compact_tiers()
ages = [t[0].total_seconds() for t in tiers]
assert ages == sorted(ages), "Tiers must be ordered shortest age first"
def test_default_tiers_first_is_2h_to_15min(self, seq):
tiers = seq.db_compact_tiers()
age_sec, interval_sec = (
tiers[0][0].total_seconds(),
tiers[0][1].total_seconds(),
)
assert age_sec == 2 * 3600
assert interval_sec == 15 * 60
def test_default_tiers_second_is_2weeks_to_1h(self, seq):
tiers = seq.db_compact_tiers()
age_sec, interval_sec = (
tiers[1][0].total_seconds(),
tiers[1][1].total_seconds(),
)
assert age_sec == 14 * 24 * 3600
assert interval_sec == 3600
def test_override_tiers(self):
class CustomSeq(SampleSequence):
def db_compact_tiers(self):
return [(to_duration("7 days"), to_duration("1 hour"))]
s = CustomSeq()
tiers = s.db_compact_tiers()
assert len(tiers) == 1
assert tiers[0][1].total_seconds() == 3600
def test_empty_tiers_disables_compaction(self):
class NoCompactSeq(SampleSequence):
def db_compact_tiers(self):
return []
s = NoCompactSeq()
now = to_datetime().in_timezone("UTC")
base = now.subtract(weeks=4)
_insert_records_every_n_minutes(s, base, count=100, interval_minutes=15)
deleted = s.db_compact()
assert deleted == 0
class TestCompactState:
"""Tests for _db_get_compact_state / _db_set_compact_state."""
def test_get_state_returns_none_when_no_metadata(self, seq):
interval = to_duration("1 hour")
assert seq._db_get_compact_state(interval) is None
def test_set_and_get_state_roundtrip(self, seq):
interval = to_duration("1 hour")
now = to_datetime().in_timezone("UTC")
ts = DatabaseTimestamp.from_datetime(now)
seq._db_set_compact_state(interval, ts)
retrieved = seq._db_get_compact_state(interval)
assert retrieved == ts
def test_state_is_per_tier(self, seq):
"""Different tier intervals must not overwrite each other."""
interval_15min = to_duration("15 minutes")
interval_1h = to_duration("1 hour")
now = to_datetime().in_timezone("UTC")
ts_15 = DatabaseTimestamp.from_datetime(now)
ts_1h = DatabaseTimestamp.from_datetime(now.subtract(days=1))
seq._db_set_compact_state(interval_15min, ts_15)
seq._db_set_compact_state(interval_1h, ts_1h)
assert seq._db_get_compact_state(interval_15min) == ts_15
assert seq._db_get_compact_state(interval_1h) == ts_1h
def test_state_persists_in_metadata(self, seq):
"""State must survive a metadata reload."""
interval = to_duration("1 hour")
now = to_datetime().in_timezone("UTC")
ts = DatabaseTimestamp.from_datetime(now)
seq._db_set_compact_state(interval, ts)
# Reload metadata from fake DB
seq2 = SampleSequence()
seq2.database = seq.database
seq2._db_metadata = seq2._db_load_metadata()
assert seq2._db_get_compact_state(interval) == ts
class TestCompactSparseGuard:
"""The inflation guard must skip compaction when records are already sparse."""
def test_sparse_data_aligns_but_does_not_reduce_cardinality(self, seq_sparse):
"""Sparse data must be aligned to the target interval for all records that were modified."""
seq, _ = seq_sparse
interval = to_duration("15 minutes")
interval_sec = int(interval.total_seconds())
# Snapshot original timestamps
before_epochs = {
int(r.date_time.timestamp())
for r in seq.records
}
seq._db_compact_tier(
to_duration("30 minutes"),
interval,
)
after_epochs = {
int(r.date_time.timestamp())
for r in seq.records
}
# Cardinality must not increase
assert len(after_epochs) <= len(before_epochs)
# Any timestamp that changed must now be aligned
changed_epochs = after_epochs - before_epochs
for epoch in changed_epochs:
assert epoch % interval_sec == 0
def test_sparse_guard_advances_cutoff(self, seq_sparse):
"""Even when skipped, the cutoff should be stored so next run skips the same window."""
seq, _ = seq_sparse
interval_1h = to_duration("1 hour")
interval_15min = to_duration("15 minutes")
seq.db_compact()
# Both tiers should have stored a cutoff even though nothing was deleted
assert seq._db_get_compact_state(interval_1h) is not None
assert seq._db_get_compact_state(interval_15min) is not None
def test_exactly_at_boundary_remains_stable(self, seq):
now = to_datetime().in_timezone("UTC")
interval = to_duration("1 hour")
raw_base = now.subtract(hours=5).set(minute=0, second=0, microsecond=0)
base = raw_base.subtract(seconds=int(raw_base.timestamp()) % 3600)
for i in range(4):
seq.db_insert_record(
SampleRecord(
date_time=base.add(hours=i),
value=float(i),
)
)
seq.db_insert_record(
SampleRecord(date_time=now.subtract(seconds=1), value=0.0)
)
seq.db_save_records()
before = [
(int(r.date_time.timestamp()), r.value)
for r in seq.records
]
seq._db_compact_tier(
to_duration("30 minutes"),
interval,
)
after = [
(int(r.date_time.timestamp()), r.value)
for r in seq.records
]
assert before == after
class TestCompactTierWorker:
"""Unit tests for _db_compact_tier directly."""
def test_empty_sequence_returns_zero(self, seq):
age = to_duration("2 hours")
interval = to_duration("15 minutes")
assert seq._db_compact_tier(age, interval) == 0
def test_all_records_too_recent_skipped(self):
"""Records within the age threshold must not be touched."""
seq = SampleSequence()
now = to_datetime().in_timezone("UTC")
# Insert 10 records from 30 minutes ago — all within 2h threshold
base = now.subtract(minutes=30)
_insert_records_every_n_minutes(seq, base, count=10, interval_minutes=1)
before = seq.db_count_records()
deleted = seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
assert deleted == 0
assert seq.db_count_records() == before
def test_compaction_reduces_record_count(self):
"""Dense 1-min records older than 2h should be downsampled to 15-min."""
seq = SampleSequence()
now = to_datetime().in_timezone("UTC")
# Insert 1-min records for 6 hours ending 3 hours ago
base = now.subtract(hours=9)
_insert_records_every_n_minutes(seq, base, count=6 * 60, interval_minutes=1)
before = seq.db_count_records()
deleted = seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
after = seq.db_count_records()
assert deleted > 0
assert after < before
def test_records_within_threshold_preserved(self):
"""Records newer than age_threshold must remain untouched after compaction."""
seq = SampleSequence()
now = to_datetime().in_timezone("UTC")
# Old dense records (will be compacted)
old_base = now.subtract(hours=6)
_insert_records_every_n_minutes(seq, old_base, count=4 * 60, interval_minutes=1)
# Recent records (must not be touched) — insert 5 records in the last hour
recent_base = now.subtract(minutes=50)
_insert_records_every_n_minutes(seq, recent_base, count=5, interval_minutes=10)
recent_before = [
r for r in seq.records
if r.date_time and r.date_time >= recent_base
]
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
recent_after = [
r for r in seq.records
if r.date_time and r.date_time >= recent_base
]
assert len(recent_after) == len(recent_before)
def test_incremental_cutoff_prevents_recompaction(self):
"""Running compaction twice must not re-compact already-compacted data."""
seq = SampleSequence()
now = to_datetime().in_timezone("UTC")
base = now.subtract(hours=8)
_insert_records_every_n_minutes(seq, base, count=5 * 60, interval_minutes=1)
age = to_duration("2 hours")
interval = to_duration("15 minutes")
deleted_first = seq._db_compact_tier(age, interval)
count_after_first = seq.db_count_records()
deleted_second = seq._db_compact_tier(age, interval)
count_after_second = seq.db_count_records()
assert deleted_first > 0
assert deleted_second == 0, "Second run must be a no-op"
assert count_after_first == count_after_second
def test_cutoff_stored_after_compaction(self):
"""Cutoff timestamp must be persisted after a successful compaction run."""
seq = SampleSequence()
now = to_datetime().in_timezone("UTC")
base = now.subtract(hours=8)
_insert_records_every_n_minutes(seq, base, count=5 * 60, interval_minutes=1)
interval = to_duration("15 minutes")
seq._db_compact_tier(to_duration("2 hours"), interval)
assert seq._db_get_compact_state(interval) is not None
class TestDbCompact:
"""Integration tests for the public db_compact() entry point."""
def test_compact_dense_data_both_tiers(self, seq_with_15min_data):
"""4 weeks of 15-min data should be reduced by both tiers."""
seq, _ = seq_with_15min_data
before = seq.db_count_records()
total_deleted = seq.db_compact()
after = seq.db_count_records()
assert total_deleted > 0
assert after < before
def test_compact_coarsest_tier_runs_first(self, seq_with_15min_data):
"""The 1-hour tier (coarsest) must run before the 15-min tier.
If coarsest ran last it would re-compact records the 15-min tier
had already downsampled — verified by checking that the 1-hour
cutoff is not later than the 15-min cutoff.
"""
seq, _ = seq_with_15min_data
seq.db_compact()
cutoff_1h = seq._db_get_compact_state(to_duration("1 hour"))
cutoff_15min = seq._db_get_compact_state(to_duration("15 minutes"))
assert cutoff_1h is not None
assert cutoff_15min is not None
# The 1h tier covers older data → its cutoff must be earlier than 15min tier
assert cutoff_1h <= cutoff_15min
def test_compact_idempotent(self, seq_with_15min_data):
"""Running db_compact twice must not change record count."""
seq, _ = seq_with_15min_data
seq.db_compact()
after_first = seq.db_count_records()
seq.db_compact()
after_second = seq.db_count_records()
assert after_first == after_second
def test_compact_empty_sequence_returns_zero(self, seq):
assert seq.db_compact() == 0
def test_compact_with_override_tiers(self):
"""Passing compact_tiers directly must override db_compact_tiers()."""
seq = SampleSequence()
now = to_datetime().in_timezone("UTC")
base = now.subtract(weeks=3)
_insert_records_every_n_minutes(seq, base, count=3 * 7 * 24 * 4, interval_minutes=15)
before = seq.db_count_records()
deleted = seq.db_compact(
compact_tiers=[(to_duration("1 day"), to_duration("1 hour"))]
)
assert deleted > 0
assert seq.db_count_records() < before
def test_compact_only_processes_new_window_on_second_call(self):
"""Second call processes only the new window, not the full history."""
seq = SampleSequence()
now = to_datetime().in_timezone("UTC")
base = now.subtract(weeks=3)
# Dense 1-min data for 3 weeks
_insert_records_every_n_minutes(seq, base, count=3 * 7 * 24 * 60, interval_minutes=1)
seq.db_compact()
count_after_first = seq.db_count_records()
# Add one more day of dense data in the past (simulate new old data arriving)
extra_base = now.subtract(weeks=3).subtract(days=1)
_insert_records_every_n_minutes(seq, extra_base, count=24 * 60, interval_minutes=1)
seq.db_compact()
count_after_second = seq.db_count_records()
# Second compact should have processed the newly added old data
# Record count may change but should not exceed first compacted count by much
assert count_after_second >= 0 # basic sanity
class TestCompactDataIntegrity:
"""Verify value integrity is preserved after compaction."""
def test_constant_value_preserved(self):
"""Constant value field must survive mean-resampling unchanged."""
seq = SampleSequence()
now = to_datetime().in_timezone("UTC")
base = now.subtract(hours=6)
# All values = 42.0
_insert_records_every_n_minutes(
seq, base, count=6 * 60, interval_minutes=1, value_fn=lambda _: 42.0
)
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
for record in seq.records:
if record.date_time and record.date_time < now.subtract(hours=2):
assert record.value == pytest.approx(42.0, abs=1e-6)
def test_recent_records_not_modified(self):
"""Records newer than the age threshold must have unchanged values."""
seq = SampleSequence()
now = to_datetime().in_timezone("UTC")
old_base = now.subtract(hours=6)
_insert_records_every_n_minutes(seq, old_base, count=3 * 60, interval_minutes=1)
# Known recent values
recent_base = now.subtract(minutes=30)
expected = {i * 10: float(100 + i) for i in range(3)}
for offset, val in expected.items():
dt = recent_base.add(minutes=offset)
seq.db_insert_record(SampleRecord(date_time=dt, value=val))
seq.db_save_records()
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
for record in seq.records:
if record.date_time and record.date_time >= recent_base:
offset = int((record.date_time - recent_base).total_seconds() / 60)
if offset in expected:
assert record.value == pytest.approx(expected[offset], abs=1e-6)
def test_compacted_timestamps_spacing(self):
"""Resampled records must be fewer than original and span the compaction window.
Exact per-bucket spacing depends on the full DataSequence.key_to_array
implementation (pandas resampling). The stub key_to_array in SampleSequence
only guarantees a reduction in count — uniform spacing is verified in
test_dataabc_compact.py against the real implementation.
"""
seq = SampleSequence()
now = to_datetime().in_timezone("UTC")
base = now.subtract(hours=6)
_insert_records_every_n_minutes(seq, base, count=5 * 60, interval_minutes=1)
before = seq.db_count_records()
seq._db_compact_tier(to_duration("2 hours"), to_duration("15 minutes"))
cutoff = now.subtract(hours=2)
compacted = sorted(
[r for r in seq.records if r.date_time and r.date_time < cutoff],
key=lambda r: cast(DateTime, r.date_time),
)
# Must have produced fewer records than the original 1-min data
assert len(compacted) > 0, "Expected at least one compacted record"
assert len(compacted) < before, "Compaction must reduce record count"
# Window start is floored to interval boundary
interval_sec = 15 * 60
expected_window_start = DateTime.fromtimestamp(
(int(base.timestamp()) // interval_sec) * interval_sec,
tz="UTC",
)
assert compacted[0].date_time >= expected_window_start
# Last compacted record must be before the cutoff
assert compacted[-1].date_time < cutoff