mirror of
https://github.com/Akkudoktor-EOS/EOS.git
synced 2026-01-26 12:36:17 +00:00
fix: logging long lines from EOSdash (#843)
Some checks failed
Bump Version / Bump Version Workflow (push) Has been cancelled
docker-build / platform-excludes (push) Has been cancelled
pre-commit / pre-commit (push) Has been cancelled
Run Pytest on Pull Request / test (push) Has been cancelled
docker-build / build (push) Has been cancelled
docker-build / merge (push) Has been cancelled
Close stale pull requests/issues / Find Stale issues and PRs (push) Has been cancelled
Some checks failed
Bump Version / Bump Version Workflow (push) Has been cancelled
docker-build / platform-excludes (push) Has been cancelled
pre-commit / pre-commit (push) Has been cancelled
Run Pytest on Pull Request / test (push) Has been cancelled
docker-build / build (push) Has been cancelled
docker-build / merge (push) Has been cancelled
Close stale pull requests/issues / Find Stale issues and PRs (push) Has been cancelled
Truncate long lines on logging from EOSdash. Rate limit log messages from EOSdash to prevent overload. Log messages read and dropped to avoid EOSdash is blocked on standard or error output. Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
This commit is contained in:
@@ -2,6 +2,7 @@ import asyncio
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, MutableMapping
|
||||
|
||||
@@ -15,11 +16,13 @@ from akkudoktoreos.server.server import (
|
||||
|
||||
config_eos = get_config()
|
||||
|
||||
|
||||
# Loguru to HA stdout
|
||||
logger.add(sys.stdout, format="{time} | {level} | {message}", enqueue=True)
|
||||
|
||||
|
||||
# Maximum bytes per line to log
|
||||
EOSDASH_LOG_MAX_LINE_BYTES = 128 * 1024 # 128 kB safety cap
|
||||
|
||||
LOG_PATTERN = re.compile(
|
||||
r"""
|
||||
(?:(?P<timestamp>^\S+\s+\S+)\s*\|\s*)? # Optional timestamp
|
||||
@@ -36,6 +39,45 @@ LOG_PATTERN = re.compile(
|
||||
re.VERBOSE,
|
||||
)
|
||||
|
||||
# Drop-on-overload settings
|
||||
EOSDASH_LOG_QUEUE_SIZE = 50
|
||||
EOSDASH_DROP_WARNING_INTERVAL = 5.0 # seconds
|
||||
|
||||
# The queue to handle dropping of EOSdash logs on overload
|
||||
eosdash_log_queue: asyncio.Queue | None = None
|
||||
eosdash_last_drop_warning: float = 0.0
|
||||
|
||||
|
||||
async def _eosdash_log_worker() -> None:
|
||||
"""Consumes queued log calls and emits them via Loguru."""
|
||||
if eosdash_log_queue is None:
|
||||
error_msg = "EOSdash log queue not initialized"
|
||||
logger.error(error_msg)
|
||||
raise RuntimeError(error_msg)
|
||||
|
||||
while True:
|
||||
item = await eosdash_log_queue.get()
|
||||
if item is None:
|
||||
break # shutdown signal
|
||||
|
||||
log_fn, args = item
|
||||
try:
|
||||
log_fn(*args)
|
||||
except Exception:
|
||||
logger.exception("Error while emitting EOSdash log")
|
||||
|
||||
|
||||
def _emit_drop_warning() -> None:
|
||||
global eosdash_last_drop_warning
|
||||
|
||||
now = time.monotonic()
|
||||
if now - eosdash_last_drop_warning >= EOSDASH_DROP_WARNING_INTERVAL:
|
||||
eosdash_last_drop_warning = now
|
||||
logger.warning("EOSdash log queue full — dropping subprocess log lines")
|
||||
|
||||
|
||||
# Loguru log message patching
|
||||
|
||||
|
||||
def patch_loguru_record(
|
||||
record: MutableMapping[str, Any],
|
||||
@@ -116,12 +158,50 @@ async def forward_stream(stream: asyncio.StreamReader, prefix: str = "") -> None
|
||||
- The function runs until ``stream`` reaches EOF.
|
||||
|
||||
"""
|
||||
while True:
|
||||
line = await stream.readline()
|
||||
if not line:
|
||||
break # End of stream
|
||||
buffer = bytearray()
|
||||
|
||||
while True:
|
||||
try:
|
||||
chunk = await stream.readuntil(b"\n")
|
||||
buffer.extend(chunk)
|
||||
complete = True
|
||||
|
||||
except asyncio.LimitOverrunError as e:
|
||||
# Read buffered data without delimiter
|
||||
chunk = await stream.readexactly(e.consumed)
|
||||
buffer.extend(chunk)
|
||||
complete = False
|
||||
|
||||
except asyncio.IncompleteReadError as e:
|
||||
buffer.extend(e.partial)
|
||||
complete = False
|
||||
|
||||
if not buffer:
|
||||
break # true EOF
|
||||
|
||||
# Enforce memory bound
|
||||
truncated = False
|
||||
if len(buffer) > EOSDASH_LOG_MAX_LINE_BYTES:
|
||||
buffer = buffer[:EOSDASH_LOG_MAX_LINE_BYTES]
|
||||
truncated = True
|
||||
|
||||
# Drain until newline or EOF
|
||||
try:
|
||||
while True:
|
||||
await stream.readuntil(b"\n")
|
||||
except (asyncio.LimitOverrunError, asyncio.IncompleteReadError):
|
||||
pass
|
||||
|
||||
# If we don't yet have a full line, continue accumulating
|
||||
if not complete and not truncated:
|
||||
continue
|
||||
|
||||
raw = buffer.decode(errors="replace").rstrip()
|
||||
if truncated:
|
||||
raw += " [TRUNCATED]"
|
||||
|
||||
buffer.clear()
|
||||
|
||||
raw = line.decode(errors="replace").rstrip()
|
||||
match = LOG_PATTERN.search(raw)
|
||||
|
||||
if match:
|
||||
@@ -155,15 +235,25 @@ async def forward_stream(stream: asyncio.StreamReader, prefix: str = "") -> None
|
||||
function=func_name,
|
||||
)
|
||||
)
|
||||
|
||||
patched.log(level, f"{prefix}{message}")
|
||||
if eosdash_log_queue is None:
|
||||
patched.log(level, f"{prefix}{message}")
|
||||
else:
|
||||
try:
|
||||
eosdash_log_queue.put_nowait(
|
||||
(
|
||||
patched.log,
|
||||
(level, f"{prefix}{message}"),
|
||||
)
|
||||
)
|
||||
except asyncio.QueueFull:
|
||||
_emit_drop_warning()
|
||||
|
||||
else:
|
||||
# Fallback: unstructured log line
|
||||
file_name = "subprocess.py"
|
||||
file_path = f"/subprocess/{file_name}"
|
||||
|
||||
logger.patch(
|
||||
patched = logger.patch(
|
||||
lambda r: patch_loguru_record(
|
||||
r,
|
||||
file_name=file_name,
|
||||
@@ -172,7 +262,19 @@ async def forward_stream(stream: asyncio.StreamReader, prefix: str = "") -> None
|
||||
function="<subprocess>",
|
||||
logger_name="EOSdash",
|
||||
)
|
||||
).info(f"{prefix}{raw}")
|
||||
)
|
||||
if eosdash_log_queue is None:
|
||||
patched.info(f"{prefix}{raw}")
|
||||
else:
|
||||
try:
|
||||
eosdash_log_queue.put_nowait(
|
||||
(
|
||||
patched.info,
|
||||
(f"{prefix}{raw}",),
|
||||
)
|
||||
)
|
||||
except asyncio.QueueFull:
|
||||
_emit_drop_warning()
|
||||
|
||||
|
||||
async def run_eosdash_supervisor() -> None:
|
||||
@@ -180,6 +282,8 @@ async def run_eosdash_supervisor() -> None:
|
||||
|
||||
Runs forever.
|
||||
"""
|
||||
global eosdash_log_queue
|
||||
|
||||
eosdash_path = Path(__file__).parent.resolve().joinpath("eosdash.py")
|
||||
|
||||
while True:
|
||||
@@ -284,6 +388,11 @@ async def run_eosdash_supervisor() -> None:
|
||||
logger.exception(f"Unexpected error launching EOSdash: {e}")
|
||||
continue
|
||||
|
||||
if eosdash_log_queue is None:
|
||||
# Initialize EOSdash log queue + worker once
|
||||
eosdash_log_queue = asyncio.Queue(maxsize=EOSDASH_LOG_QUEUE_SIZE)
|
||||
asyncio.create_task(_eosdash_log_worker())
|
||||
|
||||
if proc.stdout is None:
|
||||
logger.error("Failed to forward EOSdash output to EOS pipe.")
|
||||
else:
|
||||
@@ -294,7 +403,7 @@ async def run_eosdash_supervisor() -> None:
|
||||
logger.error("Failed to forward EOSdash error output to EOS pipe.")
|
||||
else:
|
||||
# Forward log
|
||||
asyncio.create_task(forward_stream(proc.stderr, prefix="[EOSdash-ERR] "))
|
||||
asyncio.create_task(forward_stream(proc.stderr, prefix="[EOSdash] "))
|
||||
|
||||
# If we reach here, the subprocess started successfully
|
||||
logger.info("EOSdash subprocess started successfully.")
|
||||
|
||||
Reference in New Issue
Block a user