fix: eosdash startup (#915)
Some checks failed
Bump Version / Bump Version Workflow (push) Has been cancelled
docker-build / platform-excludes (push) Has been cancelled
docker-build / build (push) Has been cancelled
docker-build / merge (push) Has been cancelled
pre-commit / pre-commit (push) Has been cancelled
Run Pytest on Pull Request / test (push) Has been cancelled

Adapt uvicorn log level to allowed levels.

Ensure that EOSdash is started after EOS configuration is available.

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
This commit is contained in:
Bobby Noelte
2026-02-28 11:31:51 +01:00
committed by GitHub
parent 3ccc25d731
commit 237af5289f
27 changed files with 497 additions and 379 deletions

View File

@@ -59,7 +59,7 @@ from akkudoktoreos.prediction.loadakkudoktor import LoadAkkudoktorCommonSettings
from akkudoktoreos.prediction.pvforecast import PVForecastCommonSettings
from akkudoktoreos.server.rest.cli import cli_apply_args_to_config, cli_parse_args
from akkudoktoreos.server.rest.error import create_error_page
from akkudoktoreos.server.rest.starteosdash import run_eosdash_supervisor
from akkudoktoreos.server.rest.starteosdash import supervise_eosdash
from akkudoktoreos.server.retentionmanager import RetentionManager
from akkudoktoreos.server.server import (
drop_root_privileges,
@@ -138,18 +138,37 @@ async def server_shutdown_task() -> None:
sys.exit(0)
def config_eos_ready() -> bool:
"""Check whether the EOS configuration system is ready.
This function can be used as an activation condition for the
RetentionManager to delay the start of its tick loop until the
configuration system is available.
Returns:
bool: ``True`` if the configuration system is ready (``get_config``
is not ``None``), ``False`` otherwise.
"""
return get_config() is not None
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Lifespan manager for the app."""
# On startup
eosdash_supervisor_task = asyncio.create_task(run_eosdash_supervisor())
load_eos_state()
config_eos = get_config()
# Prepare the Manager and all task that are handled by the manager
manager = RetentionManager(config_getter=get_config().get_nested_value, shutdown_timeout=10)
manager = RetentionManager(
config_getter=get_config().get_nested_value,
shutdown_timeout=10,
)
manager.register(
name="supervise_eosdash",
func=supervise_eosdash,
interval_attr="server/eosdash_supervise_interval_sec",
fallback_interval=5.0,
)
manager.register("cache_clear", cache_clear, interval_attr="cache/cleanup_interval")
manager.register(
"save_eos_database", save_eos_database, interval_attr="database/autosave_interval_sec"
@@ -160,7 +179,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
manager.register("manage_energy", ems_manage_energy, interval_attr="ems/interval")
# Start the manager an by this all EOS repeated tasks
retention_manager_task = asyncio.create_task(manager.run())
retention_manager_task = asyncio.create_task(manager.run(activation_condition=config_eos_ready))
# Handover to application
yield
@@ -168,8 +187,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# waits for any in-flight job to finish cleanly
retention_manager_task.cancel()
await asyncio.gather(retention_manager_task, return_exceptions=True)
eosdash_supervisor_task.cancel()
await asyncio.gather(eosdash_supervisor_task, return_exceptions=True)
# On shutdown
save_eos_state()
@@ -1457,13 +1474,25 @@ def run_eos() -> None:
port = 8503
wait_for_port_free(port, timeout=120, waiting_app_name="EOS")
# Normalize log_level to uvicorn log level
VALID_UVICORN_LEVELS = {"critical", "error", "warning", "info", "debug", "trace"}
uv_log_level: Optional[str] = config_eos.logging.console_level
if uv_log_level is None:
uv_log_level = "critical" # effectively disables logging
else:
uv_log_level = uv_log_level.lower()
if uv_log_level == "none":
uv_log_level = "critical" # effectively disables logging
elif uv_log_level not in VALID_UVICORN_LEVELS:
uv_log_level = "info" # fallback
try:
# Let uvicorn run the fastAPI app
uvicorn.run(
"akkudoktoreos.server.eos:app",
host=str(config_eos.server.host),
port=port,
log_level="info", # Fix log level for uvicorn to info
log_level=uv_log_level,
access_log=True, # Fix server access logging to True
reload=reload,
proxy_headers=True,

View File

@@ -119,7 +119,7 @@ config_eos.reset_settings()
# Remember parameters in config
config_eosdash = {}
# Setup EOS logging level - first to have the other logging messages logged
# Setup EOSdash logging level - first to have the other logging messages logged
# - log level
if args and args.log_level is not None:
config_eosdash["log_level"] = args.log_level.upper()
@@ -477,12 +477,20 @@ def run_eosdash() -> None:
# Wait for EOSdash port to be free - e.g. in case of restart
wait_for_port_free(config_eosdash["eosdash_port"], timeout=120, waiting_app_name="EOSdash")
# Normalize log_level to uvicorn log level
VALID_UVICORN_LEVELS = {"critical", "error", "warning", "info", "debug", "trace"}
uv_log_level = config_eosdash["log_level"].lower()
if uv_log_level == "none":
uv_log_level = "critical" # effectively disables logging
elif uv_log_level not in VALID_UVICORN_LEVELS:
uv_log_level = "info" # fallback
try:
uvicorn.run(
"akkudoktoreos.server.eosdash:app",
host=config_eosdash["eosdash_host"],
port=config_eosdash["eosdash_port"],
log_level=config_eosdash["log_level"].lower(),
log_level=uv_log_level,
access_log=config_eosdash["access_log"],
reload=config_eosdash["reload"],
proxy_headers=True,

View File

@@ -4,7 +4,7 @@ import re
import sys
import time
from pathlib import Path
from typing import Any, MutableMapping
from typing import Any, MutableMapping, Optional
from loguru import logger
@@ -44,6 +44,8 @@ EOSDASH_DROP_WARNING_INTERVAL = 5.0 # seconds
# The queue to handle dropping of EOSdash logs on overload
eosdash_log_queue: asyncio.Queue | None = None
eosdash_last_drop_warning: float = 0.0
eosdash_proc: Optional[asyncio.subprocess.Process] = None
eosdash_path = Path(__file__).parent.resolve().joinpath("eosdash.py")
async def _eosdash_log_worker() -> None:
@@ -55,15 +57,34 @@ async def _eosdash_log_worker() -> None:
while True:
item = await eosdash_log_queue.get()
if item is None:
break # shutdown signal
log_fn, args = item
if item is None:
return # clean shutdown
# Process current
try:
log_fn, args = item
log_fn(*args)
except Exception:
logger.exception("Error while emitting EOSdash log")
# Drain burst, avoid repetitive yield overhead by get()
for _ in range(100):
try:
item = eosdash_log_queue.get_nowait()
except asyncio.QueueEmpty:
break
if item is None:
return # clean shutdown
try:
log_fn, args = item
log_fn(*args)
except Exception:
logger.exception("Error while emitting EOSdash log")
break
def _emit_drop_warning() -> None:
global eosdash_last_drop_warning
@@ -275,152 +296,107 @@ async def forward_stream(stream: asyncio.StreamReader, prefix: str = "") -> None
_emit_drop_warning()
# Path to eosdash
eosdash_path = Path(__file__).parent.resolve().joinpath("eosdash.py")
async def supervise_eosdash() -> None:
"""Supervise EOSdash.
Tracks internal EOSdash state and ensures only one instance is started.
On each tick, the task checks if EOSdash should be started, monitored, or restarted.
async def run_eosdash_supervisor() -> None:
"""Starts EOSdash, pipes its logs, restarts it if it crashes.
Runs forever.
Safe to run repeatedly under RetentionManager.
"""
global eosdash_log_queue, eosdash_path
global eosdash_proc, eosdash_log_queue, eosdash_path
config_eos = get_config()
while True:
await asyncio.sleep(5)
# Skip if EOSdash not configured to start
if not getattr(config_eos.server, "startup_eosdash", False):
return
if not config_eos.server.startup_eosdash:
continue
host = config_eos.server.eosdash_host
port = config_eos.server.eosdash_port
eos_host = config_eos.server.host
eos_port = config_eos.server.port
if (
config_eos.server.eosdash_host is None
or config_eos.server.eosdash_port is None
or config_eos.server.host is None
or config_eos.server.port is None
):
error_msg = (
f"Invalid configuration for EOSdash server startup.\n"
f"- server/eosdash_host: {config_eos.server.eosdash_host}\n"
f"- server/eosdash_port: {config_eos.server.eosdash_port}\n"
f"- server/host: {config_eos.server.host}\n"
f"- server/port: {config_eos.server.port}"
)
logger.error(error_msg)
continue
if host is None or port is None or eos_host is None or eos_port is None:
logger.error("EOSdash supervisor skipped: invalid configuration")
return
# Get all the parameters
host = str(config_eos.server.eosdash_host)
port = config_eos.server.eosdash_port
eos_host = str(config_eos.server.host)
eos_port = config_eos.server.port
access_log = True
reload = False
log_level = config_eos.logging.console_level if config_eos.logging.console_level else "info"
# Check host validity
try:
validate_ip_or_hostname(host)
validate_ip_or_hostname(eos_host)
except Exception as ex:
logger.error(f"EOSdash supervisor: invalid host configuration: {ex}")
return
try:
validate_ip_or_hostname(host)
validate_ip_or_hostname(eos_host)
except Exception as ex:
error_msg = f"Could not start EOSdash: {ex}"
logger.error(error_msg)
continue
if eos_host != host:
# EOSdash runs on a different server - we can not start.
error_msg = (
f"EOSdash server startup not possible on different hosts.\n"
f"- server/eosdash_host: {config_eos.server.eosdash_host}\n"
f"- server/host: {config_eos.server.host}"
)
logger.error(error_msg)
continue
# Do a one time check for port free to generate warnings if not so
wait_for_port_free(port, timeout=0, waiting_app_name="EOSdash")
cmd = [
sys.executable,
"-m",
"akkudoktoreos.server.eosdash",
"--host",
str(host),
"--port",
str(port),
"--eos-host",
str(eos_host),
"--eos-port",
str(eos_port),
"--log_level",
log_level,
"--access_log",
str(access_log),
"--reload",
str(reload),
]
# Set environment before any subprocess run, to keep custom config dir
eos_dir = str(config_eos.package_root_path)
eos_data_dir = str(config_eos.general.data_folder_path)
eos_config_dir = str(config_eos.general.config_folder_path)
env = os.environ.copy()
env["EOS_DIR"] = eos_dir
env["EOS_DATA_DIR"] = eos_data_dir
env["EOS_CONFIG_DIR"] = eos_config_dir
logger.info("Starting EOSdash subprocess...")
# Start EOSdash server
try:
proc = await asyncio.create_subprocess_exec(
*cmd, env=env, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
except FileNotFoundError:
logger.error(
"Failed to start EOSdash: 'python' executable '{sys.executable}' not found."
)
continue
except PermissionError:
logger.error("Failed to start EOSdash: permission denied on 'eosdash.py'.")
continue
except asyncio.CancelledError:
logger.warning("EOSdash startup cancelled (shutdown?).")
# Only start EOSdash if not already running
if eosdash_proc is not None:
# Check if process is alive
if eosdash_proc.returncode is None:
# Still running — monitoring mode
return
except Exception as e:
logger.exception(f"Unexpected error launching EOSdash: {e}")
continue
if eosdash_log_queue is None:
# Initialize EOSdash log queue + worker once
eosdash_log_queue = asyncio.Queue(maxsize=EOSDASH_LOG_QUEUE_SIZE)
asyncio.create_task(_eosdash_log_worker())
if proc.stdout is None:
logger.error("Failed to forward EOSdash output to EOS pipe.")
else:
# Forward log
asyncio.create_task(forward_stream(proc.stdout, prefix="[EOSdash] "))
logger.warning("EOSdash subprocess exited — restarting...")
eosdash_proc = None
if proc.stderr is None:
logger.error("Failed to forward EOSdash error output to EOS pipe.")
else:
# Forward log
asyncio.create_task(forward_stream(proc.stderr, prefix="[EOSdash] "))
# Ensure port is free
wait_for_port_free(port, timeout=0, waiting_app_name="EOSdash")
# If we reach here, the subprocess started successfully
logger.info("EOSdash subprocess started successfully.")
cmd = [
sys.executable,
"-m",
"akkudoktoreos.server.eosdash",
"--host",
str(host),
"--port",
str(port),
"--eos-host",
str(eos_host),
"--eos-port",
str(eos_port),
"--log_level",
str(getattr(config_eos.logging, "console_level", "info")),
"--access_log",
"True",
"--reload",
"False",
]
# Wait for exit
try:
exit_code = await proc.wait()
logger.error(f"EOSdash exited with code {exit_code}")
env = os.environ.copy()
env.update(
{
"EOS_DIR": str(config_eos.package_root_path),
"EOS_DATA_DIR": str(config_eos.general.data_folder_path),
"EOS_CONFIG_DIR": str(config_eos.general.config_folder_path),
}
)
except asyncio.CancelledError:
logger.warning("EOSdash wait cancelled (shutdown?).")
return
logger.info("Starting EOSdash subprocess...")
except Exception as e:
logger.exception(f"Error while waiting for EOSdash to terminate: {e}")
try:
eosdash_proc = await asyncio.create_subprocess_exec(
*cmd,
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except asyncio.CancelledError:
logger.warning("EOSdash supervisor cancelled before start")
return
except Exception as e:
logger.exception(f"Failed to start EOSdash subprocess: {e}")
eosdash_proc = None
return
# Restart after a delay
logger.info("Restarting EOSdash...")
# Initialize log queue once
if eosdash_log_queue is None:
eosdash_log_queue = asyncio.Queue(maxsize=EOSDASH_LOG_QUEUE_SIZE)
asyncio.create_task(_eosdash_log_worker()) # existing worker
# Forward stdout/stderr
if eosdash_proc.stdout:
asyncio.create_task(forward_stream(eosdash_proc.stdout, prefix="[EOSdash] "))
if eosdash_proc.stderr:
asyncio.create_task(forward_stream(eosdash_proc.stderr, prefix="[EOSdash] "))
logger.info("EOSdash subprocess started successfully.")

View File

@@ -250,34 +250,121 @@ class RetentionManager:
# Tick — called by the external heartbeat loop
# ------------------------------------------------------------------
async def run(self, *, tick_interval: float = 5.0) -> None:
async def run(
self,
*,
tick_interval: float = 5.0,
activation_condition: Union[
Callable[[], bool],
Callable[[], Coroutine[Any, Any, bool]],
None,
] = None,
activation_check_interval: float = 5.0,
) -> None:
"""Run the RetentionManager tick loop indefinitely.
Calls `tick` every ``tick_interval`` seconds until the task is
cancelled (e.g. during application shutdown). On cancellation,
`shutdown` is awaited so any in-flight jobs can finish cleanly
before the loop exits.
Starts a periodic loop that calls `tick()` every ``tick_interval`` seconds
until the task is cancelled. If an ``activation_condition`` coroutine is
provided, the manager will first wait until that condition evaluates to
``True`` before entering the tick loop. The condition is polled every
``activation_check_interval`` seconds.
This allows the manager task to be created early in the application
lifecycle while deferring actual job execution until the surrounding
system is fully operational (e.g. database ready, caches warmed, leader
elected, etc.).
If the activation condition raises an exception, it is logged and treated
as not satisfied. The condition will be retried after the configured
check interval.
On cancellation (e.g. during application shutdown), the manager waits
for all in-flight jobs to complete by invoking `shutdown()` before
exiting.
Args:
tick_interval: Seconds between ticks. Defaults to ``5.0``.
tick_interval: Number of seconds between successive calls to `tick()`.
Defaults to ``5.0``.
activation_condition: Optional callable returning ``bool``. The callable
may be synchronous or asynchronous. When provided, the manager
repeatedly evaluates this callable until it returns ``True`` before
starting the tick loop. Asynchronous callables are awaited.
Defaults to ``None``.
activation_check_interval: Number of seconds to wait between
successive evaluations of ``activation_condition``. Ignored if
no activation condition is provided. Defaults to ``5.0``.
Example::
Raises:
asyncio.CancelledError: Propagated when the task is cancelled after
performing graceful shutdown of running jobs.
@asynccontextmanager
async def lifespan(app: FastAPI):
task = asyncio.create_task(manager.run())
yield
task.cancel()
await asyncio.gather(task, return_exceptions=True)
Example:
Using FastAPI lifespan to delay job execution until the
application signals readiness:
>>> from contextlib import asynccontextmanager
>>> from fastapi import FastAPI
>>>
>>> app = FastAPI()
>>> manager = RetentionManager(config_getter)
>>>
>>> async def is_system_ready() -> bool:
... return getattr(app.state, "system_ready", False)
>>>
>>> @asynccontextmanager
... async def lifespan(app: FastAPI):
... task = asyncio.create_task(
... manager.run(
... activation_condition=is_system_ready,
... activation_check_interval=5.0,
... )
... )
...
... # Application startup phase
... app.state.system_ready = False
... yield # Application is now running
...
... # Signal readiness after external initialization
... app.state.system_ready = True
...
... # Shutdown phase
... task.cancel()
... await asyncio.gather(task, return_exceptions=True)
...
>>> app = FastAPI(lifespan=lifespan)
"""
if activation_condition is not None:
logger.info("RetentionManager: waiting for startup condition...")
while True:
try:
if asyncio.iscoroutinefunction(activation_condition):
ready = await activation_condition()
else:
ready = activation_condition()
except Exception as exc:
logger.exception(
"RetentionManager: startup condition raised exception: {}", exc
)
if ready:
break
await asyncio.sleep(activation_check_interval)
logger.info("RetentionManager: startup condition satisfied")
logger.info("RetentionManager: tick loop started (interval={}s)", tick_interval)
try:
while True:
try:
await self.tick()
except Exception as exc: # noqa: BLE001
except Exception as exc:
logger.exception("RetentionManager: unhandled exception in tick: {}", exc)
await asyncio.sleep(tick_interval)
except asyncio.CancelledError:
logger.info("RetentionManager: tick loop cancelled, shutting down...")
await self.shutdown()
@@ -296,7 +383,6 @@ class RetentionManager:
Jobs that are still running from a previous tick are skipped to prevent
overlapping executions.
"""
logger.info("RetentionManager: tick")
due = [job for job in self._jobs.values() if not job.is_running and job.is_due()]
if not due:

View File

@@ -365,6 +365,15 @@ class ServerCommonSettings(SettingsBaseModel):
],
},
)
eosdash_supervise_interval_sec: int = Field(
default=10,
json_schema_extra={
"description": "Supervision interval for EOS server to supervise EOSdash [seconds].",
"examples": [
10,
],
},
)
@field_validator("host", "eosdash_host", mode="before")
def validate_server_host(cls, value: Optional[str]) -> Optional[str]: