mirror of
https://github.com/Akkudoktor-EOS/EOS.git
synced 2025-04-19 08:55:15 +00:00
* Move the caching module to core. Add an in memory cache that for caching function and method results during an energy management run (optimization run). Two decorators are provided for methods and functions. * Improve the file cache store by load and save functions. Make EOS load the cache file store on startup and save it on shutdown. Add a cyclic task that cleans the cache file store from outdated cache files. * Improve startup of EOSdash by EOS Make EOS starting EOSdash adhere to path configuration given in EOS. The whole environment from EOS is now passed to EOSdash. Should also prevent test errors due to unwanted/ wrong config file creation. Both servers now provide a health endpoint that can be used to detect whether the server is running. This is also used for testing now. * Improve startup of EOS EOS now has got an energy management task that runs shortly after startup. It tries to execute energy management runs with predictions newly fetched or initialized from cached data on first run. * Improve shutdown of EOS EOS has now a shutdown task that shuts EOS down gracefully with some time delay to allow REST API requests for shutdwon or restart to be fully serviced. * Improve EMS Add energy management task for repeated energy management controlled by startup delay and interval configuration parameters. Translate EnergieManagementSystem to english EnergyManagement. * Add administration endpoints - endpoints to control caching from REST API. - endpoints to control server restart (will not work on Windows) and shutdown from REST API * Improve doc generation Use "\n" linenend convention also on Windows when generating doc files. Replace Windows specific 127.0.0.1 address by standard 0.0.0.0. * Improve test support (to be able to test caching) - Add system test option to pytest for running tests with "real" resources - Add new test fixture to start server for test class and test function - Make kill signal adapt to Windows/ Linux - Use consistently "\n" for lineends when writing text files in doc test - Fix test_logging under Windows - Fix conftest config_default_dirs test fixture under Windows From @Lasall * Improve Windows support - Use 127.0.0.1 as default config host (model defaults) and addionally redirect 0.0.0.0 to localhost on Windows (because default config file still has 0.0.0.0). - Update install/startup instructions as package installation is required atm. Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
511 lines
16 KiB
Python
511 lines
16 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from contextlib import contextmanager
|
|
from http import HTTPStatus
|
|
from pathlib import Path
|
|
from typing import Generator, Optional, Union
|
|
from unittest.mock import PropertyMock, patch
|
|
|
|
import pendulum
|
|
import psutil
|
|
import pytest
|
|
import requests
|
|
from xprocess import ProcessStarter, XProcess
|
|
|
|
from akkudoktoreos.config.config import ConfigEOS, get_config
|
|
from akkudoktoreos.core.logging import get_logger
|
|
from akkudoktoreos.server.server import get_default_host
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
@pytest.fixture()
|
|
def disable_debug_logging(scope="session", autouse=True):
|
|
"""Automatically disable debug logging for all tests."""
|
|
original_levels = {}
|
|
root_logger = logging.getLogger()
|
|
|
|
original_levels[root_logger] = root_logger.level
|
|
root_logger.setLevel(logging.INFO)
|
|
|
|
for logger_name, logger in logging.root.manager.loggerDict.items():
|
|
if isinstance(logger, logging.Logger):
|
|
original_levels[logger] = logger.level
|
|
if logger.level <= logging.DEBUG:
|
|
logger.setLevel(logging.INFO)
|
|
|
|
yield
|
|
|
|
for logger, level in original_levels.items():
|
|
logger.setLevel(level)
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
parser.addoption(
|
|
"--full-run", action="store_true", default=False, help="Run with all optimization tests."
|
|
)
|
|
parser.addoption(
|
|
"--check-config-side-effect",
|
|
action="store_true",
|
|
default=False,
|
|
help="Verify that user config file is non-existent (will also fail if user config file exists before test run).",
|
|
)
|
|
parser.addoption(
|
|
"--system-test",
|
|
action="store_true",
|
|
default=False,
|
|
help="System test mode. Tests may access real resources, like prediction providers!",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def is_full_run(request):
|
|
yield bool(request.config.getoption("--full-run"))
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def config_mixin(config_eos):
|
|
with patch(
|
|
"akkudoktoreos.core.coreabc.ConfigMixin.config", new_callable=PropertyMock
|
|
) as config_mixin_patch:
|
|
config_mixin_patch.return_value = config_eos
|
|
yield config_mixin_patch
|
|
|
|
|
|
@pytest.fixture
|
|
def is_system_test(request):
|
|
yield bool(request.config.getoption("--system-test"))
|
|
|
|
|
|
@pytest.fixture
|
|
def prediction_eos():
|
|
from akkudoktoreos.prediction.prediction import get_prediction
|
|
|
|
return get_prediction()
|
|
|
|
|
|
@pytest.fixture
|
|
def devices_eos(config_mixin):
|
|
from akkudoktoreos.devices.devices import get_devices
|
|
|
|
devices = get_devices()
|
|
print("devices_eos reset!")
|
|
devices.reset()
|
|
return devices
|
|
|
|
|
|
@pytest.fixture
|
|
def devices_mixin(devices_eos):
|
|
with patch(
|
|
"akkudoktoreos.core.coreabc.DevicesMixin.devices", new_callable=PropertyMock
|
|
) as devices_mixin_patch:
|
|
devices_mixin_patch.return_value = devices_eos
|
|
yield devices_mixin_patch
|
|
|
|
|
|
# Test if test has side effect of writing to system (user) config file
|
|
# Before activating, make sure that no user config file exists (e.g. ~/.config/net.akkudoktoreos.eos/EOS.config.json)
|
|
@pytest.fixture(autouse=True)
|
|
def cfg_non_existent(request):
|
|
if not bool(request.config.getoption("--check-config-side-effect")):
|
|
yield
|
|
return
|
|
|
|
# Before test
|
|
from platformdirs import user_config_dir
|
|
|
|
user_dir = user_config_dir(ConfigEOS.APP_NAME)
|
|
user_config_file = Path(user_dir).joinpath(ConfigEOS.CONFIG_FILE_NAME)
|
|
cwd_config_file = Path.cwd().joinpath(ConfigEOS.CONFIG_FILE_NAME)
|
|
assert (
|
|
not user_config_file.exists()
|
|
), f"Config file {user_config_file} exists, please delete before test!"
|
|
assert (
|
|
not cwd_config_file.exists()
|
|
), f"Config file {cwd_config_file} exists, please delete before test!"
|
|
|
|
# Yield to test
|
|
yield
|
|
|
|
# After test
|
|
assert (
|
|
not user_config_file.exists()
|
|
), f"Config file {user_config_file} created, please check test!"
|
|
assert (
|
|
not cwd_config_file.exists()
|
|
), f"Config file {cwd_config_file} created, please check test!"
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def user_cwd(config_default_dirs):
|
|
with patch(
|
|
"pathlib.Path.cwd",
|
|
return_value=config_default_dirs[1],
|
|
) as user_cwd_patch:
|
|
yield user_cwd_patch
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def user_config_dir(config_default_dirs):
|
|
with patch(
|
|
"akkudoktoreos.config.config.user_config_dir",
|
|
return_value=str(config_default_dirs[0]),
|
|
) as user_dir_patch:
|
|
yield user_dir_patch
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def user_data_dir(config_default_dirs):
|
|
with patch(
|
|
"akkudoktoreos.config.config.user_data_dir",
|
|
return_value=str(config_default_dirs[-1] / "data"),
|
|
) as user_dir_patch:
|
|
yield user_dir_patch
|
|
|
|
|
|
@pytest.fixture
|
|
def config_eos(
|
|
disable_debug_logging,
|
|
user_config_dir,
|
|
user_data_dir,
|
|
user_cwd,
|
|
config_default_dirs,
|
|
monkeypatch,
|
|
) -> ConfigEOS:
|
|
"""Fixture to reset EOS config to default values."""
|
|
monkeypatch.setenv(
|
|
"EOS_CONFIG__DATA_CACHE_SUBPATH", str(config_default_dirs[-1] / "data/cache")
|
|
)
|
|
monkeypatch.setenv(
|
|
"EOS_CONFIG__DATA_OUTPUT_SUBPATH", str(config_default_dirs[-1] / "data/output")
|
|
)
|
|
config_file = config_default_dirs[0] / ConfigEOS.CONFIG_FILE_NAME
|
|
config_file_cwd = config_default_dirs[1] / ConfigEOS.CONFIG_FILE_NAME
|
|
assert not config_file.exists()
|
|
assert not config_file_cwd.exists()
|
|
config_eos = get_config()
|
|
config_eos.reset_settings()
|
|
assert config_file == config_eos.general.config_file_path
|
|
assert config_file.exists()
|
|
assert not config_file_cwd.exists()
|
|
assert config_default_dirs[-1] / "data" == config_eos.general.data_folder_path
|
|
assert config_default_dirs[-1] / "data/cache" == config_eos.cache.path()
|
|
assert config_default_dirs[-1] / "data/output" == config_eos.general.data_output_path
|
|
return config_eos
|
|
|
|
|
|
@pytest.fixture
|
|
def config_default_dirs(tmpdir):
|
|
"""Fixture that provides a list of directories to be used as config dir."""
|
|
tmp_user_home_dir = Path(tmpdir)
|
|
|
|
# Default config directory from platform user config directory
|
|
config_default_dir_user = tmp_user_home_dir / "config"
|
|
|
|
# Default config directory from current working directory
|
|
config_default_dir_cwd = tmp_user_home_dir / "cwd"
|
|
config_default_dir_cwd.mkdir()
|
|
|
|
# Default config directory from default config file
|
|
config_default_dir_default = Path(__file__).parent.parent.joinpath("src/akkudoktoreos/data")
|
|
|
|
# Default data directory from platform user data directory
|
|
data_default_dir_user = tmp_user_home_dir
|
|
|
|
return (
|
|
config_default_dir_user,
|
|
config_default_dir_cwd,
|
|
config_default_dir_default,
|
|
data_default_dir_user,
|
|
)
|
|
|
|
|
|
@contextmanager
|
|
def server_base(xprocess: XProcess) -> Generator[dict[str, Union[str, int]], None, None]:
|
|
"""Fixture to start the server with temporary EOS_DIR and default config.
|
|
|
|
Args:
|
|
xprocess (XProcess): The pytest-xprocess fixture to manage the server process.
|
|
|
|
Yields:
|
|
dict[str, str]: A dictionary containing:
|
|
- "server" (str): URL of the server.
|
|
- "eos_dir" (str): Path to the temporary EOS_DIR.
|
|
"""
|
|
host = get_default_host()
|
|
port = 8503
|
|
eosdash_port = 8504
|
|
|
|
# Port of server may be still blocked by a server usage despite the other server already
|
|
# shut down. CLOSE_WAIT, TIME_WAIT may typically take up to 120 seconds.
|
|
server_timeout = 120
|
|
|
|
server = f"http://{host}:{port}"
|
|
eosdash_server = f"http://{host}:{eosdash_port}"
|
|
eos_tmp_dir = tempfile.TemporaryDirectory()
|
|
eos_dir = str(eos_tmp_dir.name)
|
|
|
|
class Starter(ProcessStarter):
|
|
# assure server to be installed
|
|
try:
|
|
project_dir = Path(__file__).parent.parent
|
|
subprocess.run(
|
|
[sys.executable, "-c", "import", "akkudoktoreos.server.eos"],
|
|
check=True,
|
|
env=os.environ,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
cwd=project_dir,
|
|
)
|
|
except subprocess.CalledProcessError:
|
|
subprocess.run(
|
|
[sys.executable, "-m", "pip", "install", "-e", str(project_dir)],
|
|
env=os.environ,
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
cwd=project_dir,
|
|
)
|
|
|
|
# Set environment for server run
|
|
env = os.environ.copy()
|
|
env["EOS_DIR"] = eos_dir
|
|
env["EOS_CONFIG_DIR"] = eos_dir
|
|
|
|
# command to start server process
|
|
args = [
|
|
sys.executable,
|
|
"-m",
|
|
"akkudoktoreos.server.eos",
|
|
"--host",
|
|
host,
|
|
"--port",
|
|
str(port),
|
|
]
|
|
|
|
# Will wait for 'server_timeout' seconds before timing out
|
|
timeout = server_timeout
|
|
|
|
# xprocess will now attempt to clean up upon interruptions
|
|
terminate_on_interrupt = True
|
|
|
|
# checks if our server is ready
|
|
def startup_check(self):
|
|
try:
|
|
result = requests.get(f"{server}/v1/health", timeout=2)
|
|
if result.status_code == 200:
|
|
return True
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
def cleanup_eos_eosdash():
|
|
# Cleanup any EOS process left.
|
|
if os.name == "nt":
|
|
# Windows does not provide SIGKILL
|
|
sigkill = signal.SIGTERM
|
|
else:
|
|
sigkill = signal.SIGKILL
|
|
# - Use pid on EOS health endpoint
|
|
try:
|
|
result = requests.get(f"{server}/v1/health", timeout=2)
|
|
if result.status_code == HTTPStatus.OK:
|
|
pid = result.json()["pid"]
|
|
os.kill(pid, sigkill)
|
|
time.sleep(1)
|
|
result = requests.get(f"{server}/v1/health", timeout=2)
|
|
assert result.status_code != HTTPStatus.OK
|
|
except:
|
|
pass
|
|
# - Use pids from processes on EOS port
|
|
for retries in range(int(server_timeout / 3)):
|
|
pids: list[int] = []
|
|
for conn in psutil.net_connections(kind="inet"):
|
|
if conn.laddr.port == port:
|
|
if conn.pid not in pids:
|
|
# Get fresh process info
|
|
try:
|
|
process = psutil.Process(conn.pid)
|
|
process_info = process.as_dict(attrs=["pid", "cmdline"])
|
|
if "akkudoktoreos.server.eos" in process_info["cmdline"]:
|
|
pids.append(conn.pid)
|
|
except:
|
|
# PID may already be dead
|
|
pass
|
|
for pid in pids:
|
|
os.kill(pid, sigkill)
|
|
if len(pids) == 0:
|
|
break
|
|
time.sleep(3)
|
|
assert len(pids) == 0
|
|
# Cleanup any EOSdash processes left.
|
|
# - Use pid on EOSdash health endpoint
|
|
try:
|
|
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
|
|
if result.status_code == HTTPStatus.OK:
|
|
pid = result.json()["pid"]
|
|
os.kill(pid, sigkill)
|
|
time.sleep(1)
|
|
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
|
|
assert result.status_code != HTTPStatus.OK
|
|
except:
|
|
pass
|
|
# - Use pids from processes on EOSdash port
|
|
for retries in range(int(server_timeout / 3)):
|
|
pids = []
|
|
for conn in psutil.net_connections(kind="inet"):
|
|
if conn.laddr.port == eosdash_port:
|
|
if conn.pid not in pids:
|
|
# Get fresh process info
|
|
try:
|
|
process = psutil.Process(conn.pid)
|
|
process_info = process.as_dict(attrs=["pid", "cmdline"])
|
|
if "akkudoktoreos.server.eosdash" in process_info["cmdline"]:
|
|
pids.append(conn.pid)
|
|
except:
|
|
# PID may already be dead
|
|
pass
|
|
for pid in pids:
|
|
os.kill(pid, sigkill)
|
|
if len(pids) == 0:
|
|
break
|
|
time.sleep(3)
|
|
assert len(pids) == 0
|
|
|
|
# Kill all running eos and eosdash process - just to be sure
|
|
cleanup_eos_eosdash()
|
|
|
|
# Ensure there is an empty config file in the temporary EOS directory
|
|
config_file_path = Path(eos_dir).joinpath(ConfigEOS.CONFIG_FILE_NAME)
|
|
with config_file_path.open(mode="w", encoding="utf-8", newline="\n") as fd:
|
|
json.dump({}, fd)
|
|
|
|
# ensure process is running and return its logfile
|
|
pid, logfile = xprocess.ensure("eos", Starter)
|
|
logger.info(f"Started EOS ({pid}). This may take very long (up to {server_timeout} seconds).")
|
|
logger.info(f"View xprocess logfile at: {logfile}")
|
|
|
|
yield {
|
|
"server": server,
|
|
"eosdash_server": eosdash_server,
|
|
"eos_dir": eos_dir,
|
|
"timeout": server_timeout,
|
|
}
|
|
|
|
# clean up whole process tree afterwards
|
|
xprocess.getinfo("eos").terminate()
|
|
|
|
# Cleanup any EOS process left.
|
|
cleanup_eos_eosdash()
|
|
|
|
# Remove temporary EOS_DIR
|
|
eos_tmp_dir.cleanup()
|
|
|
|
|
|
@pytest.fixture(scope="class")
|
|
def server_setup_for_class(xprocess) -> Generator[dict[str, Union[str, int]], None, None]:
|
|
"""A fixture to start the server for a test class."""
|
|
with server_base(xprocess) as result:
|
|
yield result
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def server_setup_for_function(xprocess) -> Generator[dict[str, Union[str, int]], None, None]:
|
|
"""A fixture to start the server for a test function."""
|
|
with server_base(xprocess) as result:
|
|
yield result
|
|
|
|
|
|
@pytest.fixture
|
|
def server(xprocess, config_eos, config_default_dirs) -> Generator[str, None, None]:
|
|
"""Fixture to start the server.
|
|
|
|
Provides URL of the server.
|
|
"""
|
|
# create url/port info to the server
|
|
url = "http://0.0.0.0:8503"
|
|
|
|
class Starter(ProcessStarter):
|
|
# Set environment before any subprocess run, to keep custom config dir
|
|
env = os.environ.copy()
|
|
env["EOS_DIR"] = str(config_default_dirs[-1])
|
|
project_dir = config_eos.package_root_path.parent.parent
|
|
|
|
# assure server to be installed
|
|
try:
|
|
subprocess.run(
|
|
[sys.executable, "-c", "import", "akkudoktoreos.server.eos"],
|
|
check=True,
|
|
env=env,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
cwd=project_dir,
|
|
)
|
|
except subprocess.CalledProcessError:
|
|
subprocess.run(
|
|
[sys.executable, "-m", "pip", "install", "-e", str(project_dir)],
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
# command to start server process
|
|
args = [sys.executable, "-m", "akkudoktoreos.server.eos"]
|
|
|
|
# will wait for xx seconds before timing out
|
|
timeout = 10
|
|
|
|
# xprocess will now attempt to clean up upon interruptions
|
|
terminate_on_interrupt = True
|
|
|
|
# checks if our server is ready
|
|
def startup_check(self):
|
|
try:
|
|
result = requests.get(f"{url}/v1/health")
|
|
if result.status_code == 200:
|
|
return True
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
# ensure process is running and return its logfile
|
|
pid, logfile = xprocess.ensure("eos", Starter)
|
|
print(f"View xprocess logfile at: {logfile}")
|
|
|
|
yield url
|
|
|
|
# clean up whole process tree afterwards
|
|
xprocess.getinfo("eos").terminate()
|
|
|
|
|
|
@pytest.fixture
|
|
def set_other_timezone():
|
|
"""Temporarily sets a timezone for Pendulum during a test.
|
|
|
|
Resets to the original timezone after the test completes.
|
|
"""
|
|
original_timezone = pendulum.local_timezone()
|
|
|
|
default_other_timezone = "Atlantic/Canary"
|
|
if default_other_timezone == original_timezone:
|
|
default_other_timezone = "Asia/Singapore"
|
|
|
|
def _set_timezone(other_timezone: Optional[str] = None) -> str:
|
|
if other_timezone is None:
|
|
other_timezone = default_other_timezone
|
|
pendulum.set_local_timezone(other_timezone)
|
|
assert pendulum.local_timezone() == other_timezone
|
|
return other_timezone
|
|
|
|
yield _set_timezone
|
|
|
|
# Restore the original timezone
|
|
pendulum.set_local_timezone(original_timezone)
|
|
assert pendulum.local_timezone() == original_timezone
|