import json import logging import os import signal import subprocess import sys import tempfile import time from contextlib import contextmanager from http import HTTPStatus from pathlib import Path from typing import Generator, Optional, Union from unittest.mock import PropertyMock, patch import pendulum import psutil import pytest import requests from _pytest.logging import LogCaptureFixture from loguru import logger from xprocess import ProcessStarter, XProcess from akkudoktoreos.config.config import ConfigEOS, get_config from akkudoktoreos.server.server import get_default_host # ----------------------------------------------- # Adapt pytest logging handling to Loguru logging # ----------------------------------------------- @pytest.fixture def caplog(caplog: LogCaptureFixture): """Propagate Loguru logs to the pytest caplog handler.""" handler_id = logger.add( caplog.handler, format="{message}", level=0, filter=lambda record: record["level"].no >= caplog.handler.level, enqueue=False, # Set to 'True' if your test is spawning child processes. ) yield caplog try: logger.remove(handler_id) except: # May already be deleted pass @pytest.fixture def reportlog(pytestconfig): """Propagate Loguru logs to the pytest terminal reporter.""" logging_plugin = pytestconfig.pluginmanager.getplugin("logging-plugin") handler_id = logger.add(logging_plugin.report_handler, format="{message}") yield try: logger.remove(handler_id) except: # May already be deleted pass @pytest.fixture(autouse=True) def propagate_logs(): """Deal with the pytest --log-cli-level command-line flag. This option controls the standard logging logs, not loguru ones. For this reason, we first install a PropagateHandler for compatibility. """ class PropagateHandler(logging.Handler): def emit(self, record): if logging.getLogger(record.name).isEnabledFor(record.levelno): logging.getLogger(record.name).handle(record) logger.remove() logger.add(PropagateHandler(), format="{message}") yield @pytest.fixture() def disable_debug_logging(scope="session", autouse=True): """Automatically disable debug logging for all tests.""" logger.remove() # Remove all loggers logger.add(sys.stderr, level="INFO") # Only show INFO and above # ----------------------------------------------- # Provide pytest options for specific test setups # ----------------------------------------------- def pytest_addoption(parser): parser.addoption( "--full-run", action="store_true", default=False, help="Run with all optimization tests." ) parser.addoption( "--check-config-side-effect", action="store_true", default=False, help="Verify that user config file is non-existent (will also fail if user config file exists before test run).", ) parser.addoption( "--system-test", action="store_true", default=False, help="System test mode. Tests may access real resources, like prediction providers!", ) @pytest.fixture def is_full_run(request): yield bool(request.config.getoption("--full-run")) @pytest.fixture(autouse=True) def config_mixin(config_eos): with patch( "akkudoktoreos.core.coreabc.ConfigMixin.config", new_callable=PropertyMock ) as config_mixin_patch: config_mixin_patch.return_value = config_eos yield config_mixin_patch @pytest.fixture def is_system_test(request): yield bool(request.config.getoption("--system-test")) @pytest.fixture def prediction_eos(): from akkudoktoreos.prediction.prediction import get_prediction return get_prediction() # Test if test has side effect of writing to system (user) config file # Before activating, make sure that no user config file exists (e.g. ~/.config/net.akkudoktoreos.eos/EOS.config.json) @pytest.fixture(autouse=True) def cfg_non_existent(request): if not bool(request.config.getoption("--check-config-side-effect")): yield return # Before test from platformdirs import user_config_dir user_dir = user_config_dir(ConfigEOS.APP_NAME) user_config_file = Path(user_dir).joinpath(ConfigEOS.CONFIG_FILE_NAME) cwd_config_file = Path.cwd().joinpath(ConfigEOS.CONFIG_FILE_NAME) assert not user_config_file.exists(), ( f"Config file {user_config_file} exists, please delete before test!" ) assert not cwd_config_file.exists(), ( f"Config file {cwd_config_file} exists, please delete before test!" ) # Yield to test yield # After test assert not user_config_file.exists(), ( f"Config file {user_config_file} created, please check test!" ) assert not cwd_config_file.exists(), ( f"Config file {cwd_config_file} created, please check test!" ) @pytest.fixture(autouse=True) def user_cwd(config_default_dirs): """Patch cwd provided by module pathlib.Path.cwd.""" with patch( "pathlib.Path.cwd", return_value=config_default_dirs[1], ) as user_cwd_patch: yield user_cwd_patch @pytest.fixture(autouse=True) def user_config_dir(config_default_dirs): """Patch user_config_dir provided by module platformdirs.""" with patch( "akkudoktoreos.config.config.user_config_dir", return_value=str(config_default_dirs[0]), ) as user_dir_patch: yield user_dir_patch @pytest.fixture(autouse=True) def user_data_dir(config_default_dirs): """Patch user_data_dir provided by module platformdirs.""" with patch( "akkudoktoreos.config.config.user_data_dir", return_value=str(config_default_dirs[-1] / "data"), ) as user_dir_patch: yield user_dir_patch @pytest.fixture def config_eos( disable_debug_logging, user_config_dir, user_data_dir, user_cwd, config_default_dirs, monkeypatch, ) -> ConfigEOS: """Fixture to reset EOS config to default values.""" monkeypatch.setenv( "EOS_CONFIG__DATA_CACHE_SUBPATH", str(config_default_dirs[-1] / "data/cache") ) monkeypatch.setenv( "EOS_CONFIG__DATA_OUTPUT_SUBPATH", str(config_default_dirs[-1] / "data/output") ) config_file = config_default_dirs[0] / ConfigEOS.CONFIG_FILE_NAME config_file_cwd = config_default_dirs[1] / ConfigEOS.CONFIG_FILE_NAME assert not config_file.exists() assert not config_file_cwd.exists() config_eos = get_config() config_eos.reset_settings() assert config_file == config_eos.general.config_file_path assert config_file.exists() assert not config_file_cwd.exists() # Check user data directory pathes (config_default_dirs[-1] == data_default_dir_user) assert config_default_dirs[-1] / "data" == config_eos.general.data_folder_path assert config_default_dirs[-1] / "data/cache" == config_eos.cache.path() assert config_default_dirs[-1] / "data/output" == config_eos.general.data_output_path assert config_default_dirs[-1] / "data/output/eos.log" == config_eos.logging.file_path return config_eos @pytest.fixture def config_default_dirs(tmpdir): """Fixture that provides a list of directories to be used as config dir.""" tmp_user_home_dir = Path(tmpdir) # Default config directory from platform user config directory config_default_dir_user = tmp_user_home_dir / "config" # Default config directory from current working directory config_default_dir_cwd = tmp_user_home_dir / "cwd" config_default_dir_cwd.mkdir() # Default config directory from default config file config_default_dir_default = Path(__file__).parent.parent.joinpath("src/akkudoktoreos/data") # Default data directory from platform user data directory data_default_dir_user = tmp_user_home_dir return ( config_default_dir_user, config_default_dir_cwd, config_default_dir_default, data_default_dir_user, ) # ------------------------------------ # Provide pytest EOS server management # ------------------------------------ def cleanup_eos_eosdash( host: str, port: int, eosdash_host: str, eosdash_port: int, server_timeout: float = 10.0, ) -> None: """Clean up any running EOS and EOSdash processes. Args: host (str): EOS server host (e.g., "127.0.0.1"). port (int): Port number used by the EOS process. eosdash_hostr (str): EOSdash server host. eosdash_port (int): Port used by EOSdash. server_timeout (float): Timeout in seconds before giving up. """ server = f"http://{host}:{port}" eosdash_server = f"http://{eosdash_host}:{eosdash_port}" sigkill = signal.SIGTERM if os.name == "nt" else signal.SIGKILL # Attempt to shut down EOS via health endpoint try: result = requests.get(f"{server}/v1/health", timeout=2) if result.status_code == HTTPStatus.OK: pid = result.json()["pid"] os.kill(pid, sigkill) time.sleep(1) result = requests.get(f"{server}/v1/health", timeout=2) assert result.status_code != HTTPStatus.OK except Exception: pass # Fallback: kill processes bound to the EOS port pids: list[int] = [] for _ in range(int(server_timeout / 3)): for conn in psutil.net_connections(kind="inet"): if conn.laddr.port == port and conn.pid is not None: try: process = psutil.Process(conn.pid) cmdline = process.as_dict(attrs=["cmdline"])["cmdline"] if "akkudoktoreos.server.eos" in " ".join(cmdline): pids.append(conn.pid) except Exception: pass for pid in pids: os.kill(pid, sigkill) running = False for pid in pids: try: proc = psutil.Process(pid) status = proc.status() if status != psutil.STATUS_ZOMBIE: running = True break except psutil.NoSuchProcess: continue if not running: break time.sleep(3) # Check for processes still running (maybe zombies). for pid in pids: try: proc = psutil.Process(pid) status = proc.status() assert status == psutil.STATUS_ZOMBIE, f"Cleanup EOS expected zombie, got {status} for PID {pid}" except psutil.NoSuchProcess: # Process already reaped (possibly by init/systemd) continue # Attempt to shut down EOSdash via health endpoint for srv in (eosdash_server, "http://127.0.0.1:8504", "http://127.0.0.1:8555"): try: result = requests.get(f"{srv}/eosdash/health", timeout=2) if result.status_code == HTTPStatus.OK: pid = result.json()["pid"] os.kill(pid, sigkill) time.sleep(1) result = requests.get(f"{srv}/eosdash/health", timeout=2) assert result.status_code != HTTPStatus.OK except Exception: pass # Fallback: kill EOSdash processes bound to known ports pids = [] for _ in range(int(server_timeout / 3)): for conn in psutil.net_connections(kind="inet"): if conn.laddr.port in (eosdash_port, 8504, 8555) and conn.pid is not None: try: process = psutil.Process(conn.pid) cmdline = process.as_dict(attrs=["cmdline"])["cmdline"] if "akkudoktoreos.server.eosdash" in " ".join(cmdline): pids.append(conn.pid) except Exception: pass for pid in pids: os.kill(pid, sigkill) running = False for pid in pids: try: proc = psutil.Process(pid) status = proc.status() if status != psutil.STATUS_ZOMBIE: running = True break except psutil.NoSuchProcess: continue if not running: break time.sleep(3) # Check for processes still running (maybe zombies). for pid in pids: try: proc = psutil.Process(pid) status = proc.status() assert status == psutil.STATUS_ZOMBIE, f"Cleanup EOSdash expected zombie, got {status} for PID {pid}" except psutil.NoSuchProcess: # Process already reaped (possibly by init/systemd) continue @contextmanager def server_base( xprocess: XProcess, extra_env: Optional[dict[str, str]] = None ) -> Generator[dict[str, Union[str, int]], None, None]: """Fixture to start the server with temporary EOS_DIR and default config. Args: xprocess (XProcess): The pytest-xprocess fixture to manage the server process. extra_env (Optional[dict[str, str]]): Environment variables to set before server startup. Yields: dict[str, str]: A dictionary containing: - "server" (str): URL of the server. - "eos_dir" (str): Path to the temporary EOS_DIR. """ host = get_default_host() port = 8503 server = f"http://{host}:{port}" # Port of server may be still blocked by a server usage despite the other server already # shut down. CLOSE_WAIT, TIME_WAIT may typically take up to 120 seconds. server_timeout = 120 if extra_env and extra_env.get("EOS_SERVER__EOSDASH_HOST", None): eosdash_host = extra_env["EOS_SERVER__EOSDASH_HOST"] else: eosdash_host = host if extra_env and extra_env.get("EOS_SERVER__EOSDASH_PORT", None): eosdash_port: int = int(extra_env["EOS_SERVER__EOSDASH_PORT"]) else: eosdash_port = 8504 eosdash_server = f"http://{eosdash_host}:{eosdash_port}" eos_tmp_dir = tempfile.TemporaryDirectory() eos_dir = str(eos_tmp_dir.name) class Starter(ProcessStarter): # assure server to be installed try: project_dir = Path(__file__).parent.parent subprocess.run( [sys.executable, "-c", "import", "akkudoktoreos.server.eos"], check=True, env=os.environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=project_dir, ) except subprocess.CalledProcessError: subprocess.run( [sys.executable, "-m", "pip", "install", "-e", str(project_dir)], env=os.environ, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=project_dir, ) # Set environment for server run env = os.environ.copy() env["EOS_DIR"] = eos_dir env["EOS_CONFIG_DIR"] = eos_dir if extra_env: env.update(extra_env) # Set command to start server process args = [ sys.executable, "-m", "akkudoktoreos.server.eos", "--host", host, "--port", str(port), ] # Will wait for 'server_timeout' seconds before timing out timeout = server_timeout # xprocess will now attempt to clean up upon interruptions terminate_on_interrupt = True # checks if our server is ready def startup_check(self): try: response = requests.get(f"{server}/v1/health", timeout=10) logger.debug(f"[xprocess] Health check: {response.status_code}") if response.status_code == 200: return True logger.debug(f"[xprocess] Health check: {response}") except Exception as e: logger.debug(f"[xprocess] Exception during health check: {e}") return False # Kill all running eos and eosdash process - just to be sure cleanup_eos_eosdash(host, port, eosdash_host, eosdash_port, server_timeout) # Ensure there is an empty config file in the temporary EOS directory config_file_path = Path(eos_dir).joinpath(ConfigEOS.CONFIG_FILE_NAME) with config_file_path.open(mode="w", encoding="utf-8", newline="\n") as fd: json.dump({}, fd) # ensure process is running and return its logfile pid, logfile = xprocess.ensure("eos", Starter) logger.info(f"Started EOS ({pid}). This may take very long (up to {server_timeout} seconds).") logger.info(f"View xprocess logfile at: {logfile}") yield { "server": server, "port": port, "eosdash_server": eosdash_server, "eosdash_port": eosdash_port, "eos_dir": eos_dir, "timeout": server_timeout, } # clean up whole process tree afterwards xprocess.getinfo("eos").terminate() # Cleanup any EOS process left. cleanup_eos_eosdash(host, port, eosdash_host, eosdash_port, server_timeout) # Remove temporary EOS_DIR eos_tmp_dir.cleanup() @pytest.fixture(scope="class") def server_setup_for_class(request, xprocess) -> Generator[dict[str, Union[str, int]], None, None]: """A fixture to start the server for a test class. Get env vars from the test class attribute `eos_env`, if defined """ extra_env = getattr(request.cls, "eos_env", None) with server_base(xprocess, extra_env=extra_env) as result: yield result @pytest.fixture(scope="function") def server_setup_for_function(xprocess) -> Generator[dict[str, Union[str, int]], None, None]: """A fixture to start the server for a test function.""" with server_base(xprocess) as result: yield result # ------------------------------ # Provide pytest timezone change # ------------------------------ @pytest.fixture def set_other_timezone(): """Temporarily sets a timezone for Pendulum during a test. Resets to the original timezone after the test completes. """ original_timezone = pendulum.local_timezone() default_other_timezone = "Atlantic/Canary" if default_other_timezone == original_timezone: default_other_timezone = "Asia/Singapore" def _set_timezone(other_timezone: Optional[str] = None) -> str: if other_timezone is None: other_timezone = default_other_timezone pendulum.set_local_timezone(other_timezone) assert pendulum.local_timezone() == other_timezone return other_timezone yield _set_timezone # Restore the original timezone pendulum.set_local_timezone(original_timezone) assert pendulum.local_timezone() == original_timezone