Improve caching. (#431)

* Move the caching module to core.

Add an in memory cache that for caching function and method results
during an energy management run (optimization run). Two decorators
are provided for methods and functions.

* Improve the file cache store by load and save functions.

Make EOS load the cache file store on startup and save it on shutdown.
Add a cyclic task that cleans the cache file store from outdated cache files.

* Improve startup of EOSdash by EOS

Make EOS starting EOSdash adhere to path configuration given in EOS.
The whole environment from EOS is now passed to EOSdash.
Should also prevent test errors due to unwanted/ wrong config file creation.

Both servers now provide a health endpoint that can be used to detect whether
the server is running. This is also used for testing now.

* Improve startup of EOS

EOS now has got an energy management task that runs shortly after startup.
It tries to execute energy management runs with predictions newly fetched
or initialized from cached data on first run.

* Improve shutdown of EOS

EOS has now a shutdown task that shuts EOS down gracefully with some
time delay to allow REST API requests for shutdwon or restart to be fully serviced.

* Improve EMS

Add energy management task for repeated energy management controlled by
startup delay and interval configuration parameters.
Translate EnergieManagementSystem to english EnergyManagement.

* Add administration endpoints

  - endpoints to control caching from REST API.
  - endpoints to control server restart (will not work on Windows) and shutdown from REST API

* Improve doc generation

Use "\n" linenend convention also on Windows when generating doc files.
Replace Windows specific 127.0.0.1 address by standard 0.0.0.0.

* Improve test support (to be able to test caching)

  - Add system test option to pytest for running tests with "real" resources
  - Add new test fixture to start server for test class and test function
  - Make kill signal adapt to Windows/ Linux
  - Use consistently "\n" for lineends when writing text files in  doc test
  - Fix test_logging under Windows
  - Fix conftest config_default_dirs test fixture under Windows

From @Lasall

* Improve Windows support

 - Use 127.0.0.1 as default config host (model defaults) and
   addionally redirect 0.0.0.0 to localhost on Windows (because default
   config file still has 0.0.0.0).
 - Update install/startup instructions as package installation is
   required atm.

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
This commit is contained in:
Bobby Noelte
2025-02-12 21:35:51 +01:00
committed by GitHub
parent 1a2cb4d37d
commit 80bfe4d0f0
54 changed files with 3661 additions and 894 deletions

View File

@@ -1,22 +1,34 @@
#!/usr/bin/env python3
import argparse
import asyncio
import json
import os
import signal
import subprocess
import sys
import time
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Annotated, Any, AsyncGenerator, Dict, List, Optional, Union
import httpx
import psutil
import uvicorn
from fastapi import Body, FastAPI
from fastapi import Path as FastapiPath
from fastapi import Query, Request
from fastapi.exceptions import HTTPException
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse, Response
from fastapi.responses import (
FileResponse,
HTMLResponse,
JSONResponse,
RedirectResponse,
Response,
)
from akkudoktoreos.config.config import ConfigEOS, SettingsEOS, get_config
from akkudoktoreos.core.cache import CacheFileStore
from akkudoktoreos.core.ems import get_ems
from akkudoktoreos.core.logging import get_logger
from akkudoktoreos.core.pydantic import (
@@ -36,6 +48,8 @@ from akkudoktoreos.prediction.load import LoadCommonSettings
from akkudoktoreos.prediction.loadakkudoktor import LoadAkkudoktorCommonSettings
from akkudoktoreos.prediction.prediction import PredictionCommonSettings, get_prediction
from akkudoktoreos.prediction.pvforecast import PVForecastCommonSettings
from akkudoktoreos.server.rest.tasks import repeat_every
from akkudoktoreos.server.server import get_default_host
from akkudoktoreos.utils.datetimeutil import to_datetime, to_duration
logger = get_logger(__name__)
@@ -145,35 +159,58 @@ def create_error_page(
# ----------------------
def start_eosdash() -> subprocess.Popen:
def start_eosdash(
host: str,
port: int,
eos_host: str,
eos_port: int,
log_level: str,
access_log: bool,
reload: bool,
eos_dir: str,
eos_config_dir: str,
) -> subprocess.Popen:
"""Start the EOSdash server as a subprocess.
This function starts the EOSdash server by launching it as a subprocess. It checks if the server
is already running on the specified port and either returns the existing process or starts a new one.
Args:
host (str): The hostname for the EOSdash server.
port (int): The port for the EOSdash server.
eos_host (str): The hostname for the EOS server.
eos_port (int): The port for the EOS server.
log_level (str): The logging level for the EOSdash server.
access_log (bool): Flag to enable or disable access logging.
reload (bool): Flag to enable or disable auto-reloading.
eos_dir (str): Path to the EOS data directory.
eos_config_dir (str): Path to the EOS configuration directory.
Returns:
server_process: The process of the EOSdash server
subprocess.Popen: The process of the EOSdash server.
Raises:
RuntimeError: If the EOSdash server fails to start.
"""
eosdash_path = Path(__file__).parent.resolve().joinpath("eosdash.py")
if args is None:
# No command line arguments
host = config_eos.server.eosdash_host
port = config_eos.server.eosdash_port
eos_host = config_eos.server.host
eos_port = config_eos.server.port
log_level = "info"
access_log = False
reload = False
else:
host = args.host
port = config_eos.server.eosdash_port if config_eos.server.eosdash_port else (args.port + 1)
eos_host = args.host
eos_port = args.port
log_level = args.log_level
access_log = args.access_log
reload = args.reload
# Check if the EOSdash process is still/ already running, e.g. in case of server restart
process_info = None
for conn in psutil.net_connections(kind="inet"):
if conn.laddr.port == port:
process = psutil.Process(conn.pid)
# Get the fresh process info
process_info = process.as_dict(attrs=["pid", "cmdline"])
break
if process_info:
# Just warn
logger.warning(f"EOSdash port `{port}` still/ already in use.")
logger.warning(f"PID: `{process_info['pid']}`, CMD: `{process_info['cmdline']}`")
cmd = [
sys.executable,
str(eosdash_path),
"-m",
"akkudoktoreos.server.eosdash",
"--host",
str(host),
"--port",
@@ -189,11 +226,23 @@ def start_eosdash() -> subprocess.Popen:
"--reload",
str(reload),
]
server_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Set environment before any subprocess run, to keep custom config dir
env = os.environ.copy()
env["EOS_DIR"] = eos_dir
env["EOS_CONFIG_DIR"] = eos_config_dir
try:
server_process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True,
)
except subprocess.CalledProcessError as ex:
error_msg = f"Could not start EOSdash: {ex}"
logger.error(error_msg)
raise RuntimeError(error_msg)
return server_process
@@ -203,20 +252,130 @@ def start_eosdash() -> subprocess.Popen:
# ----------------------
def cache_clear(clear_all: Optional[bool] = None) -> None:
"""Cleanup expired cache files."""
if clear_all:
CacheFileStore().clear(clear_all=True)
else:
CacheFileStore().clear(before_datetime=to_datetime())
def cache_load() -> dict:
"""Load cache from cachefilestore.json."""
return CacheFileStore().load_store()
def cache_save() -> dict:
"""Save cache to cachefilestore.json."""
return CacheFileStore().save_store()
@repeat_every(seconds=float(config_eos.cache.cleanup_interval))
def cache_cleanup_task() -> None:
"""Repeating task to clear cache from expired cache files."""
cache_clear()
@repeat_every(
seconds=10,
wait_first=config_eos.ems.startup_delay,
)
def energy_management_task() -> None:
"""Repeating task for energy management."""
ems_eos.manage_energy()
async def server_shutdown_task() -> None:
"""One-shot task for shutting down the EOS server.
This coroutine performs the following actions:
1. Ensures the cache is saved by calling the cache_save function.
2. Waits for 5 seconds to allow the EOS server to complete any ongoing tasks.
3. Gracefully shuts down the current process by sending the appropriate signal.
If running on Windows, the CTRL_C_EVENT signal is sent to terminate the process.
On other operating systems, the SIGTERM signal is used.
Finally, logs a message indicating that the EOS server has been terminated.
"""
# Assure cache is saved
cache_save()
# Give EOS time to finish some work
await asyncio.sleep(5)
# Gracefully shut down this process.
pid = psutil.Process().pid
if os.name == "nt":
os.kill(pid, signal.CTRL_C_EVENT) # type: ignore[attr-defined]
else:
os.kill(pid, signal.SIGTERM)
logger.info(f"🚀 EOS terminated, PID {pid}")
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Lifespan manager for the app."""
# On startup
if config_eos.server.startup_eosdash:
try:
eosdash_process = start_eosdash()
if args is None:
# No command line arguments
host = config_eos.server.eosdash_host
port = config_eos.server.eosdash_port
eos_host = config_eos.server.host
eos_port = config_eos.server.port
log_level = "info"
access_log = False
reload = False
else:
host = args.host
port = (
config_eos.server.eosdash_port
if config_eos.server.eosdash_port
else (args.port + 1)
)
eos_host = args.host
eos_port = args.port
log_level = args.log_level
access_log = args.access_log
reload = args.reload
host = host if host else get_default_host()
port = port if port else 8504
eos_host = eos_host if eos_host else get_default_host()
eos_port = eos_port if eos_port else 8503
eos_dir = str(config_eos.general.data_folder_path)
eos_config_dir = str(config_eos.general.config_folder_path)
eosdash_process = start_eosdash(
host=host,
port=port,
eos_host=eos_host,
eos_port=eos_port,
log_level=log_level,
access_log=access_log,
reload=reload,
eos_dir=eos_dir,
eos_config_dir=eos_config_dir,
)
except Exception as e:
logger.error(f"Failed to start EOSdash server. Error: {e}")
sys.exit(1)
cache_load()
if config_eos.cache.cleanup_interval is None:
logger.warning("Cache file cleanup disabled. Set cache.cleanup_interval.")
else:
await cache_cleanup_task()
await energy_management_task()
# Handover to application
yield
# On shutdown
# nothing to do
cache_save()
app = FastAPI(
@@ -229,9 +388,9 @@ app = FastAPI(
"url": "https://www.apache.org/licenses/LICENSE-2.0.html",
},
lifespan=lifespan,
root_path=str(Path(__file__).parent),
)
server_dir = Path(__file__).parent.resolve()
@@ -239,9 +398,132 @@ class PdfResponse(FileResponse):
media_type = "application/pdf"
@app.put("/v1/config/reset", tags=["config"])
def fastapi_config_update_post() -> ConfigEOS:
"""Reset the configuration to the EOS configuration file.
@app.post("/v1/admin/cache/clear", tags=["admin"])
def fastapi_admin_cache_clear_post(clear_all: Optional[bool] = None) -> dict:
"""Clear the cache from expired data.
Deletes expired cache files.
Args:
clear_all (Optional[bool]): Delete all cached files. Default is False.
Returns:
data (dict): The management data after cleanup.
"""
try:
cache_clear(clear_all=clear_all)
data = CacheFileStore().current_store()
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error on cache clear: {e}")
return data
@app.post("/v1/admin/cache/save", tags=["admin"])
def fastapi_admin_cache_save_post() -> dict:
"""Save the current cache management data.
Returns:
data (dict): The management data that was saved.
"""
try:
data = cache_save()
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error on cache save: {e}")
return data
@app.post("/v1/admin/cache/load", tags=["admin"])
def fastapi_admin_cache_load_post() -> dict:
"""Load cache management data.
Returns:
data (dict): The management data that was loaded.
"""
try:
data = cache_save()
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error on cache load: {e}")
return data
@app.get("/v1/admin/cache", tags=["admin"])
def fastapi_admin_cache_get() -> dict:
"""Current cache management data.
Returns:
data (dict): The management data.
"""
try:
data = CacheFileStore().current_store()
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error on cache data retrieval: {e}")
return data
@app.post("/v1/admin/server/restart", tags=["admin"])
async def fastapi_admin_server_restart_post() -> dict:
"""Restart the server.
Restart EOS properly by starting a new instance before exiting the old one.
"""
logger.info("🔄 Restarting EOS...")
# Start a new EOS (Uvicorn) process
# Force a new process group to make the new process easily distinguishable from the current one
# Set environment before any subprocess run, to keep custom config dir
env = os.environ.copy()
env["EOS_DIR"] = str(config_eos.general.data_folder_path)
env["EOS_CONFIG_DIR"] = str(config_eos.general.config_folder_path)
new_process = subprocess.Popen(
[
sys.executable,
]
+ sys.argv,
env=env,
start_new_session=True,
)
logger.info(f"🚀 EOS restarted, PID {new_process.pid}")
# Gracefully shut down this process.
asyncio.create_task(server_shutdown_task())
# Will be executed because shutdown is delegated to async coroutine
return {
"message": "Restarting EOS...",
"pid": new_process.pid,
}
@app.post("/v1/admin/server/shutdown", tags=["admin"])
async def fastapi_admin_server_shutdown_post() -> dict:
"""Shutdown the server."""
logger.info("🔄 Stopping EOS...")
# Gracefully shut down this process.
asyncio.create_task(server_shutdown_task())
# Will be executed because shutdown is delegated to async coroutine
return {
"message": "Stopping EOS...",
"pid": psutil.Process().pid,
}
@app.get("/v1/health")
def fastapi_health_get(): # type: ignore
"""Health check endpoint to verify that the EOS server is alive."""
return JSONResponse(
{
"status": "alive",
"pid": psutil.Process().pid,
}
)
@app.post("/v1/config/reset", tags=["config"])
def fastapi_config_reset_post() -> ConfigEOS:
"""Reset the configuration.
Returns:
configuration (ConfigEOS): The current configuration after update.
@@ -251,7 +533,7 @@ def fastapi_config_update_post() -> ConfigEOS:
except Exception as e:
raise HTTPException(
status_code=404,
detail=f"Cannot update configuration from file '{config_eos.config_file_path}': {e}",
detail=f"Cannot reset configuration: {e}",
)
return config_eos
@@ -543,7 +825,7 @@ def fastapi_prediction_list_get(
] = None,
interval: Annotated[
Optional[str],
Query(description="Time duration for each interval."),
Query(description="Time duration for each interval. Defaults to 1 hour."),
] = None,
) -> List[Any]:
"""Get prediction for given key within given date range as value list.
@@ -580,8 +862,40 @@ def fastapi_prediction_list_get(
return prediction_list
@app.put("/v1/prediction/import/{provider_id}", tags=["prediction"])
def fastapi_prediction_import_provider(
provider_id: str = FastapiPath(..., description="Provider ID."),
data: Optional[Union[PydanticDateTimeDataFrame, PydanticDateTimeData, dict]] = None,
force_enable: Optional[bool] = None,
) -> Response:
"""Import prediction for given provider ID.
Args:
provider_id: ID of provider to update.
data: Prediction data.
force_enable: Update data even if provider is disabled.
Defaults to False.
"""
try:
provider = prediction_eos.provider_by_id(provider_id)
except ValueError:
raise HTTPException(status_code=404, detail=f"Provider '{provider_id}' not found.")
if not provider.enabled() and not force_enable:
raise HTTPException(status_code=404, detail=f"Provider '{provider_id}' not enabled.")
try:
provider.import_from_json(json_str=json.dumps(data))
provider.update_datetime = to_datetime(in_timezone=config_eos.general.timezone)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Error on import for provider '{provider_id}': {e}"
)
return Response()
@app.post("/v1/prediction/update", tags=["prediction"])
def fastapi_prediction_update(force_update: bool = False, force_enable: bool = False) -> Response:
def fastapi_prediction_update(
force_update: Optional[bool] = False, force_enable: Optional[bool] = False
) -> Response:
"""Update predictions for all providers.
Args:
@@ -593,8 +907,7 @@ def fastapi_prediction_update(force_update: bool = False, force_enable: bool = F
try:
prediction_eos.update_data(force_update=force_update, force_enable=force_enable)
except Exception as e:
raise e
# raise HTTPException(status_code=400, detail=f"Error on update of provider: {e}")
raise HTTPException(status_code=400, detail=f"Error on prediction update: {e}")
return Response()
@@ -912,34 +1225,35 @@ def site_map() -> RedirectResponse:
# Keep the proxy last to handle all requests that are not taken by the Rest API.
if config_eos.server.startup_eosdash:
@app.delete("/{path:path}", include_in_schema=False)
async def proxy_delete(request: Request, path: str) -> Response:
return await proxy(request, path)
@app.delete("/{path:path}", include_in_schema=False)
async def proxy_delete(request: Request, path: str) -> Response:
return await proxy(request, path)
@app.get("/{path:path}", include_in_schema=False)
async def proxy_get(request: Request, path: str) -> Response:
return await proxy(request, path)
@app.post("/{path:path}", include_in_schema=False)
async def proxy_post(request: Request, path: str) -> Response:
return await proxy(request, path)
@app.get("/{path:path}", include_in_schema=False)
async def proxy_get(request: Request, path: str) -> Response:
return await proxy(request, path)
@app.put("/{path:path}", include_in_schema=False)
async def proxy_put(request: Request, path: str) -> Response:
return await proxy(request, path)
else:
@app.get("/", include_in_schema=False)
def root() -> RedirectResponse:
return RedirectResponse(url="/docs")
@app.post("/{path:path}", include_in_schema=False)
async def proxy_post(request: Request, path: str) -> Response:
return await proxy(request, path)
@app.put("/{path:path}", include_in_schema=False)
async def proxy_put(request: Request, path: str) -> Response:
return await proxy(request, path)
async def proxy(request: Request, path: str) -> Union[Response | RedirectResponse | HTMLResponse]:
if config_eos.server.eosdash_host and config_eos.server.eosdash_port:
# Make hostname Windows friendly
host = str(config_eos.server.eosdash_host)
if host == "0.0.0.0" and os.name == "nt":
host = "localhost"
if host and config_eos.server.eosdash_port:
# Proxy to EOSdash server
url = f"http://{config_eos.server.eosdash_host}:{config_eos.server.eosdash_port}/{path}"
url = f"http://{host}:{config_eos.server.eosdash_port}/{path}"
headers = dict(request.headers)
data = await request.body()
@@ -1004,6 +1318,29 @@ def run_eos(host: str, port: int, log_level: str, access_log: bool, reload: bool
# Make hostname Windows friendly
if host == "0.0.0.0" and os.name == "nt":
host = "localhost"
# Wait for EOS port to be free - e.g. in case of restart
timeout = 120 # Maximum 120 seconds to wait
process_info: list[dict] = []
for retries in range(int(timeout / 10)):
process_info = []
pids: list[int] = []
for conn in psutil.net_connections(kind="inet"):
if conn.laddr.port == port:
if conn.pid not in pids:
# Get fresh process info
process = psutil.Process(conn.pid)
pids.append(conn.pid)
process_info.append(process.as_dict(attrs=["pid", "cmdline"]))
if len(process_info) == 0:
break
logger.info(f"EOS waiting for port `{port}` ...")
time.sleep(10)
if len(process_info) > 0:
logger.warning(f"EOS port `{port}` in use.")
for info in process_info:
logger.warning(f"PID: `{info["pid"]}`, CMD: `{info["cmdline"]}`")
try:
uvicorn.run(
"akkudoktoreos.server.eos:app",
@@ -1071,8 +1408,11 @@ def main() -> None:
args = parser.parse_args()
host = args.host if args.host else get_default_host()
port = args.port if args.port else 8503
try:
run_eos(args.host, args.port, args.log_level, args.access_log, args.reload)
run_eos(host, port, args.log_level, args.access_log, args.reload)
except:
sys.exit(1)