diff --git a/.env b/.env index 9ee08b4..4f9e32a 100644 --- a/.env +++ b/.env @@ -11,7 +11,7 @@ DOCKER_COMPOSE_DATA_DIR=${HOME}/.local/share/net.akkudoktor.eos # ----------------------------------------------------------------------------- # Image / build # ----------------------------------------------------------------------------- -VERSION=0.2.0.dev44099868 +VERSION=0.2.0.dev84352035 PYTHON_VERSION=3.13.9 # ----------------------------------------------------------------------------- diff --git a/config.yaml b/config.yaml index 7b66fc1..d747830 100644 --- a/config.yaml +++ b/config.yaml @@ -6,7 +6,7 @@ # the root directory (no add-on folder as usual). name: "Akkudoktor-EOS" -version: "0.2.0.dev44099868" +version: "0.2.0.dev84352035" slug: "eos" description: "Akkudoktor-EOS add-on" url: "https://github.com/Akkudoktor-EOS/EOS" diff --git a/docs/_generated/configexample.md b/docs/_generated/configexample.md index 6a3baa8..1b1a09a 100644 --- a/docs/_generated/configexample.md +++ b/docs/_generated/configexample.md @@ -138,7 +138,7 @@ } }, "general": { - "version": "0.2.0.dev44099868", + "version": "0.2.0.dev84352035", "data_folder_path": null, "data_output_subpath": "output", "latitude": 52.52, diff --git a/docs/_generated/configgeneral.md b/docs/_generated/configgeneral.md index e6f8c68..8b42f74 100644 --- a/docs/_generated/configgeneral.md +++ b/docs/_generated/configgeneral.md @@ -16,7 +16,7 @@ | latitude | `EOS_GENERAL__LATITUDE` | `Optional[float]` | `rw` | `52.52` | Latitude in decimal degrees between -90 and 90. North is positive (ISO 19115) (°) | | longitude | `EOS_GENERAL__LONGITUDE` | `Optional[float]` | `rw` | `13.405` | Longitude in decimal degrees within -180 to 180 (°) | | timezone | | `Optional[str]` | `ro` | `N/A` | Computed timezone based on latitude and longitude. | -| version | `EOS_GENERAL__VERSION` | `str` | `rw` | `0.2.0.dev44099868` | Configuration file version. Used to check compatibility. | +| version | `EOS_GENERAL__VERSION` | `str` | `rw` | `0.2.0.dev84352035` | Configuration file version. Used to check compatibility. | ::: @@ -28,7 +28,7 @@ ```json { "general": { - "version": "0.2.0.dev44099868", + "version": "0.2.0.dev84352035", "data_folder_path": null, "data_output_subpath": "output", "latitude": 52.52, @@ -46,7 +46,7 @@ ```json { "general": { - "version": "0.2.0.dev44099868", + "version": "0.2.0.dev84352035", "data_folder_path": null, "data_output_subpath": "output", "latitude": 52.52, diff --git a/docs/_generated/openapi.md b/docs/_generated/openapi.md index fad29b8..da82a6b 100644 --- a/docs/_generated/openapi.md +++ b/docs/_generated/openapi.md @@ -1,6 +1,6 @@ # Akkudoktor-EOS -**Version**: `v0.2.0.dev44099868` +**Version**: `v0.2.0.dev84352035` **Description**: This project provides a comprehensive solution for simulating and optimizing an energy system based on renewable energy sources. With a focus on photovoltaic (PV) systems, battery storage (batteries), load management (consumer requirements), heat pumps, electric vehicles, and consideration of electricity price data, this system enables forecasting and optimization of energy flow and costs over a specified period. diff --git a/openapi.json b/openapi.json index d042389..3cfe8de 100644 --- a/openapi.json +++ b/openapi.json @@ -3,7 +3,7 @@ "info": { "title": "Akkudoktor-EOS", "description": "This project provides a comprehensive solution for simulating and optimizing an energy system based on renewable energy sources. With a focus on photovoltaic (PV) systems, battery storage (batteries), load management (consumer requirements), heat pumps, electric vehicles, and consideration of electricity price data, this system enables forecasting and optimization of energy flow and costs over a specified period.", - "version": "v0.2.0.dev44099868" + "version": "v0.2.0.dev84352035" }, "paths": { "/v1/admin/cache/clear": { @@ -2525,7 +2525,7 @@ "general": { "$ref": "#/components/schemas/GeneralSettings-Output", "default": { - "version": "0.2.0.dev44099868", + "version": "0.2.0.dev84352035", "data_output_subpath": "output", "latitude": 52.52, "longitude": 13.405, @@ -4272,7 +4272,7 @@ "type": "string", "title": "Version", "description": "Configuration file version. Used to check compatibility.", - "default": "0.2.0.dev44099868" + "default": "0.2.0.dev84352035" }, "data_folder_path": { "anyOf": [ @@ -4346,7 +4346,7 @@ "type": "string", "title": "Version", "description": "Configuration file version. Used to check compatibility.", - "default": "0.2.0.dev44099868" + "default": "0.2.0.dev84352035" }, "data_folder_path": { "anyOf": [ diff --git a/src/akkudoktoreos/server/rest/starteosdash.py b/src/akkudoktoreos/server/rest/starteosdash.py index 7365412..fbb1fe7 100644 --- a/src/akkudoktoreos/server/rest/starteosdash.py +++ b/src/akkudoktoreos/server/rest/starteosdash.py @@ -2,6 +2,7 @@ import asyncio import os import re import sys +import time from pathlib import Path from typing import Any, MutableMapping @@ -15,11 +16,13 @@ from akkudoktoreos.server.server import ( config_eos = get_config() - # Loguru to HA stdout logger.add(sys.stdout, format="{time} | {level} | {message}", enqueue=True) +# Maximum bytes per line to log +EOSDASH_LOG_MAX_LINE_BYTES = 128 * 1024 # 128 kB safety cap + LOG_PATTERN = re.compile( r""" (?:(?P^\S+\s+\S+)\s*\|\s*)? # Optional timestamp @@ -36,6 +39,45 @@ LOG_PATTERN = re.compile( re.VERBOSE, ) +# Drop-on-overload settings +EOSDASH_LOG_QUEUE_SIZE = 50 +EOSDASH_DROP_WARNING_INTERVAL = 5.0 # seconds + +# The queue to handle dropping of EOSdash logs on overload +eosdash_log_queue: asyncio.Queue | None = None +eosdash_last_drop_warning: float = 0.0 + + +async def _eosdash_log_worker() -> None: + """Consumes queued log calls and emits them via Loguru.""" + if eosdash_log_queue is None: + error_msg = "EOSdash log queue not initialized" + logger.error(error_msg) + raise RuntimeError(error_msg) + + while True: + item = await eosdash_log_queue.get() + if item is None: + break # shutdown signal + + log_fn, args = item + try: + log_fn(*args) + except Exception: + logger.exception("Error while emitting EOSdash log") + + +def _emit_drop_warning() -> None: + global eosdash_last_drop_warning + + now = time.monotonic() + if now - eosdash_last_drop_warning >= EOSDASH_DROP_WARNING_INTERVAL: + eosdash_last_drop_warning = now + logger.warning("EOSdash log queue full — dropping subprocess log lines") + + +# Loguru log message patching + def patch_loguru_record( record: MutableMapping[str, Any], @@ -116,12 +158,50 @@ async def forward_stream(stream: asyncio.StreamReader, prefix: str = "") -> None - The function runs until ``stream`` reaches EOF. """ - while True: - line = await stream.readline() - if not line: - break # End of stream + buffer = bytearray() + + while True: + try: + chunk = await stream.readuntil(b"\n") + buffer.extend(chunk) + complete = True + + except asyncio.LimitOverrunError as e: + # Read buffered data without delimiter + chunk = await stream.readexactly(e.consumed) + buffer.extend(chunk) + complete = False + + except asyncio.IncompleteReadError as e: + buffer.extend(e.partial) + complete = False + + if not buffer: + break # true EOF + + # Enforce memory bound + truncated = False + if len(buffer) > EOSDASH_LOG_MAX_LINE_BYTES: + buffer = buffer[:EOSDASH_LOG_MAX_LINE_BYTES] + truncated = True + + # Drain until newline or EOF + try: + while True: + await stream.readuntil(b"\n") + except (asyncio.LimitOverrunError, asyncio.IncompleteReadError): + pass + + # If we don't yet have a full line, continue accumulating + if not complete and not truncated: + continue + + raw = buffer.decode(errors="replace").rstrip() + if truncated: + raw += " [TRUNCATED]" + + buffer.clear() - raw = line.decode(errors="replace").rstrip() match = LOG_PATTERN.search(raw) if match: @@ -155,15 +235,25 @@ async def forward_stream(stream: asyncio.StreamReader, prefix: str = "") -> None function=func_name, ) ) - - patched.log(level, f"{prefix}{message}") + if eosdash_log_queue is None: + patched.log(level, f"{prefix}{message}") + else: + try: + eosdash_log_queue.put_nowait( + ( + patched.log, + (level, f"{prefix}{message}"), + ) + ) + except asyncio.QueueFull: + _emit_drop_warning() else: # Fallback: unstructured log line file_name = "subprocess.py" file_path = f"/subprocess/{file_name}" - logger.patch( + patched = logger.patch( lambda r: patch_loguru_record( r, file_name=file_name, @@ -172,7 +262,19 @@ async def forward_stream(stream: asyncio.StreamReader, prefix: str = "") -> None function="", logger_name="EOSdash", ) - ).info(f"{prefix}{raw}") + ) + if eosdash_log_queue is None: + patched.info(f"{prefix}{raw}") + else: + try: + eosdash_log_queue.put_nowait( + ( + patched.info, + (f"{prefix}{raw}",), + ) + ) + except asyncio.QueueFull: + _emit_drop_warning() async def run_eosdash_supervisor() -> None: @@ -180,6 +282,8 @@ async def run_eosdash_supervisor() -> None: Runs forever. """ + global eosdash_log_queue + eosdash_path = Path(__file__).parent.resolve().joinpath("eosdash.py") while True: @@ -284,6 +388,11 @@ async def run_eosdash_supervisor() -> None: logger.exception(f"Unexpected error launching EOSdash: {e}") continue + if eosdash_log_queue is None: + # Initialize EOSdash log queue + worker once + eosdash_log_queue = asyncio.Queue(maxsize=EOSDASH_LOG_QUEUE_SIZE) + asyncio.create_task(_eosdash_log_worker()) + if proc.stdout is None: logger.error("Failed to forward EOSdash output to EOS pipe.") else: @@ -294,7 +403,7 @@ async def run_eosdash_supervisor() -> None: logger.error("Failed to forward EOSdash error output to EOS pipe.") else: # Forward log - asyncio.create_task(forward_stream(proc.stderr, prefix="[EOSdash-ERR] ")) + asyncio.create_task(forward_stream(proc.stderr, prefix="[EOSdash] ")) # If we reach here, the subprocess started successfully logger.info("EOSdash subprocess started successfully.") diff --git a/tests/test_server.py b/tests/test_server.py index 772fcca..4f885fd 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -10,6 +10,7 @@ import psutil import pytest import requests from conftest import cleanup_eos_eosdash +from loguru import logger from akkudoktoreos.core.version import __version__ from akkudoktoreos.server.server import get_default_host, wait_for_port_free @@ -46,6 +47,73 @@ class TestServer: class TestServerStartStop: + @pytest.mark.asyncio + async def test_forward_stream_truncates_very_long_line(self, monkeypatch, tmp_path): + """Test logging from EOSdash can also handle very long lines.""" + + eos_dir = tmp_path + monkeypatch.setenv("EOS_DIR", str(eos_dir)) + monkeypatch.setenv("EOS_CONFIG_DIR", str(eos_dir)) + + # Import after env vars are set + from akkudoktoreos.server.rest.starteosdash import ( + EOSDASH_LOG_MAX_LINE_BYTES, + _eosdash_log_worker, + eosdash_log_queue, + forward_stream, + ) + + # ---- Ensure queue + worker are initialized ---- + if eosdash_log_queue is None: + from akkudoktoreos.server.rest import starteosdash + + starteosdash.eosdash_log_queue = asyncio.Queue(maxsize=10) + worker_task = asyncio.create_task(_eosdash_log_worker()) + else: + worker_task = None + + long_message = "X" * (EOSDASH_LOG_MAX_LINE_BYTES + 10_000) + raw_line = f"INFO some.module:123 some_func - {long_message}\n" + raw_bytes = raw_line.encode() + + reader = asyncio.StreamReader() + reader.feed_data(raw_bytes) + reader.feed_eof() + + # ---- Capture Loguru output ---- + records = [] + + def sink(message): + records.append(message.record) + + logger_id = logger.add(sink, level="INFO") + + try: + await forward_stream(reader) + + # Allow log worker to flush queue + await asyncio.sleep(0) + + finally: + logger.remove(logger_id) + + # Clean shutdown of worker (important for pytest) + if worker_task: + from akkudoktoreos.server.rest import starteosdash + + if starteosdash.eosdash_log_queue: + starteosdash.eosdash_log_queue.put_nowait(None) + await worker_task + + # ---- Assert ---- + assert len(records) == 1, "Expected exactly one log record" + + record = records[0] + msg = record["message"] + + assert msg.endswith("[TRUNCATED]"), "Expected truncation marker" + assert len(msg) <= EOSDASH_LOG_MAX_LINE_BYTES + 20 + @pytest.mark.asyncio async def test_server_start_eosdash(self, config_eos, monkeypatch, tmp_path): """Test the EOSdash server startup from EOS.