EOS/tests/test_server.py
Normann a7d58eed9a
pre-commit update and ignore changes (#461)
* pre-commit autoupdate
* type: ignore changes
* [attr-defined,unused-ignore] usage
2025-02-24 10:00:09 +01:00

445 lines
17 KiB
Python

import json
import os
import signal
import time
from http import HTTPStatus
from pathlib import Path
import psutil
import pytest
import requests
from akkudoktoreos.server.server import get_default_host
DIR_TESTDATA = Path(__file__).absolute().parent.joinpath("testdata")
FILE_TESTDATA_EOSSERVER_CONFIG_1 = DIR_TESTDATA.joinpath("eosserver_config_1.json")
class TestServer:
def test_server_setup_for_class(self, server_setup_for_class):
"""Ensure server is started."""
server = server_setup_for_class["server"]
eos_dir = server_setup_for_class["eos_dir"]
result = requests.get(f"{server}/v1/config")
assert result.status_code == HTTPStatus.OK
# Get testing config
config_json = result.json()
config_folder_path = Path(config_json["general"]["config_folder_path"])
config_file_path = Path(config_json["general"]["config_file_path"])
data_folder_path = Path(config_json["general"]["data_folder_path"])
data_ouput_path = Path(config_json["general"]["data_output_path"])
# Assure we are working in test environment
assert str(config_folder_path).startswith(eos_dir)
assert str(config_file_path).startswith(eos_dir)
assert str(data_folder_path).startswith(eos_dir)
assert str(data_ouput_path).startswith(eos_dir)
def test_prediction_brightsky(self, server_setup_for_class, is_system_test):
"""Test weather prediction by BrightSky."""
server = server_setup_for_class["server"]
eos_dir = server_setup_for_class["eos_dir"]
result = requests.get(f"{server}/v1/config")
assert result.status_code == HTTPStatus.OK
# Get testing config
config_json = result.json()
config_folder_path = Path(config_json["general"]["config_folder_path"])
# Assure we are working in test environment
assert str(config_folder_path).startswith(eos_dir)
result = requests.put(f"{server}/v1/config/weather/provider", json="BrightSky")
assert result.status_code == HTTPStatus.OK
# Assure prediction is enabled
result = requests.get(f"{server}/v1/prediction/providers?enabled=true")
assert result.status_code == HTTPStatus.OK
providers = result.json()
assert "BrightSky" in providers
if is_system_test:
result = requests.post(f"{server}/v1/prediction/update/BrightSky")
assert result.status_code == HTTPStatus.OK
result = requests.get(f"{server}/v1/prediction/series?key=weather_temp_air")
assert result.status_code == HTTPStatus.OK
data = result.json()
assert len(data["data"]) > 24
else:
pass
def test_prediction_clearoutside(self, server_setup_for_class, is_system_test):
"""Test weather prediction by ClearOutside."""
server = server_setup_for_class["server"]
eos_dir = server_setup_for_class["eos_dir"]
result = requests.put(f"{server}/v1/config/weather/provider", json="ClearOutside")
assert result.status_code == HTTPStatus.OK
# Assure prediction is enabled
result = requests.get(f"{server}/v1/prediction/providers?enabled=true")
assert result.status_code == HTTPStatus.OK
providers = result.json()
assert "ClearOutside" in providers
if is_system_test:
result = requests.post(f"{server}/v1/prediction/update/ClearOutside")
assert result.status_code == HTTPStatus.OK
result = requests.get(f"{server}/v1/prediction/series?key=weather_temp_air")
assert result.status_code == HTTPStatus.OK
data = result.json()
assert len(data["data"]) > 24
else:
pass
def test_prediction_pvforecastakkudoktor(self, server_setup_for_class, is_system_test):
"""Test PV prediction by PVForecastAkkudoktor."""
server = server_setup_for_class["server"]
eos_dir = server_setup_for_class["eos_dir"]
# Reset config
with FILE_TESTDATA_EOSSERVER_CONFIG_1.open("r", encoding="utf-8", newline=None) as fd:
config = json.load(fd)
config["pvforecast"]["provider"] = "PVForecastAkkudoktor"
result = requests.put(f"{server}/v1/config", json=config)
assert result.status_code == HTTPStatus.OK
# Assure prediction is enabled
result = requests.get(f"{server}/v1/prediction/providers?enabled=true")
assert result.status_code == HTTPStatus.OK
providers = result.json()
assert "PVForecastAkkudoktor" in providers
if is_system_test:
result = requests.post(f"{server}/v1/prediction/update/PVForecastAkkudoktor")
assert result.status_code == HTTPStatus.OK
result = requests.get(f"{server}/v1/prediction/series?key=pvforecast_ac_power")
assert result.status_code == HTTPStatus.OK
data = result.json()
assert len(data["data"]) > 24
else:
pass
def test_prediction_elecpriceakkudoktor(self, server_setup_for_class, is_system_test):
"""Test electricity price prediction by ElecPriceImport."""
server = server_setup_for_class["server"]
eos_dir = server_setup_for_class["eos_dir"]
# Reset config
with FILE_TESTDATA_EOSSERVER_CONFIG_1.open("r", encoding="utf-8", newline=None) as fd:
config = json.load(fd)
config["elecprice"]["provider"] = "ElecPriceAkkudoktor"
result = requests.put(f"{server}/v1/config", json=config)
assert result.status_code == HTTPStatus.OK
# Assure prediction is enabled
result = requests.get(f"{server}/v1/prediction/providers?enabled=true")
assert result.status_code == HTTPStatus.OK
providers = result.json()
assert "ElecPriceAkkudoktor" in providers
if is_system_test:
result = requests.post(f"{server}/v1/prediction/update/ElecPriceAkkudoktor")
assert result.status_code == HTTPStatus.OK
result = requests.get(f"{server}/v1/prediction/series?key=elecprice_marketprice_wh")
assert result.status_code == HTTPStatus.OK
data = result.json()
assert len(data["data"]) > 24
else:
pass
def test_prediction_loadakkudoktor(self, server_setup_for_class, is_system_test):
"""Test load prediction by LoadAkkudoktor."""
server = server_setup_for_class["server"]
eos_dir = server_setup_for_class["eos_dir"]
result = requests.put(f"{server}/v1/config/load/provider", json="LoadAkkudoktor")
assert result.status_code == HTTPStatus.OK
# Assure prediction is enabled
result = requests.get(f"{server}/v1/prediction/providers?enabled=true")
assert result.status_code == HTTPStatus.OK
providers = result.json()
assert "LoadAkkudoktor" in providers
if is_system_test:
result = requests.post(f"{server}/v1/prediction/update/LoadAkkudoktor")
assert result.status_code == HTTPStatus.OK
result = requests.get(f"{server}/v1/prediction/series?key=load_mean")
assert result.status_code == HTTPStatus.OK
data = result.json()
assert len(data["data"]) > 24
else:
pass
def test_admin_cache(self, server_setup_for_class, is_system_test):
"""Test whether cache is reconstructed from cached files."""
server = server_setup_for_class["server"]
eos_dir = server_setup_for_class["eos_dir"]
result = requests.get(f"{server}/v1/admin/cache")
assert result.status_code == HTTPStatus.OK
cache = result.json()
if is_system_test:
# There should be some cache data
assert cache != {}
# Save cache
result = requests.post(f"{server}/v1/admin/cache/save")
assert result.status_code == HTTPStatus.OK
cache_saved = result.json()
assert cache_saved == cache
# Clear cache - should clear nothing as all cache files expire in the future
result = requests.post(f"{server}/v1/admin/cache/clear")
assert result.status_code == HTTPStatus.OK
cache_cleared = result.json()
assert cache_cleared == cache
# Force clear cache
result = requests.post(f"{server}/v1/admin/cache/clear?clear_all=true")
assert result.status_code == HTTPStatus.OK
cache_cleared = result.json()
assert cache_cleared == {}
# Try to load already deleted cache entries
result = requests.post(f"{server}/v1/admin/cache/load")
assert result.status_code == HTTPStatus.OK
cache_loaded = result.json()
assert cache_loaded == {}
# Cache should still be empty
result = requests.get(f"{server}/v1/admin/cache")
assert result.status_code == HTTPStatus.OK
cache = result.json()
assert cache == {}
class TestServerStartStop:
def test_server_start_eosdash(self, tmpdir):
"""Test the EOSdash server startup from EOS."""
# Do not use any fixture as this will make pytest the owner of the EOSdash port.
host = get_default_host()
if os.name == "nt":
# Windows does not provide SIGKILL
sigkill = signal.SIGTERM # type: ignore[attr-defined,unused-ignore]
else:
sigkill = signal.SIGKILL # type: ignore
port = 8503
eosdash_port = 8504
timeout = 120
server = f"http://{host}:{port}"
eosdash_server = f"http://{host}:{eosdash_port}"
eos_dir = str(tmpdir)
# Cleanup any EOSdash process left.
try:
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
if result.status_code == HTTPStatus.OK:
pid = result.json()["pid"]
os.kill(pid, sigkill)
time.sleep(1)
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
assert result.status_code != HTTPStatus.OK
except:
pass
# Wait for EOSdash port to be freed
process_info: list[dict] = []
for retries in range(int(timeout / 3)):
process_info = []
pids: list[int] = []
for conn in psutil.net_connections(kind="inet"):
if conn.laddr.port == eosdash_port:
if conn.pid not in pids:
# Get fresh process info
process = psutil.Process(conn.pid)
pids.append(conn.pid)
process_info.append(process.as_dict(attrs=["pid", "cmdline"]))
if len(process_info) == 0:
break
time.sleep(3)
assert len(process_info) == 0
# Import after test setup to prevent creation of config file before test
from akkudoktoreos.server.eos import start_eosdash
process = start_eosdash(
host=host,
port=eosdash_port,
eos_host=host,
eos_port=port,
log_level="debug",
access_log=False,
reload=False,
eos_dir=eos_dir,
eos_config_dir=eos_dir,
)
# Assure EOSdash is up
startup = False
error = ""
for retries in range(int(timeout / 3)):
try:
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
if result.status_code == HTTPStatus.OK:
startup = True
break
error = f"{result.status_code}, {str(result.content)}"
except Exception as ex:
error = str(ex)
time.sleep(3)
assert startup, f"Connection to {eosdash_server}/eosdash/health failed: {error}"
assert result.json()["status"] == "alive"
# Shutdown eosdash
try:
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
if result.status_code == HTTPStatus.OK:
pid = result.json()["pid"]
os.kill(pid, signal.SIGTERM)
time.sleep(1)
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
assert result.status_code != HTTPStatus.OK
except:
pass
@pytest.mark.skipif(os.name == "nt", reason="Server restart not supported on Windows")
def test_server_restart(self, server_setup_for_function, is_system_test):
"""Test server restart."""
server = server_setup_for_function["server"]
eos_dir = server_setup_for_function["eos_dir"]
timeout = server_setup_for_function["timeout"]
result = requests.get(f"{server}/v1/config")
assert result.status_code == HTTPStatus.OK
# Get testing config
config_json = result.json()
config_folder_path = Path(config_json["general"]["config_folder_path"])
config_file_path = Path(config_json["general"]["config_file_path"])
data_folder_path = Path(config_json["general"]["data_folder_path"])
data_ouput_path = Path(config_json["general"]["data_output_path"])
cache_file_path = data_folder_path.joinpath(config_json["cache"]["subpath"]).joinpath(
"cachefilestore.json"
)
# Assure we are working in test environment
assert str(config_folder_path).startswith(eos_dir)
assert str(config_file_path).startswith(eos_dir)
assert str(data_folder_path).startswith(eos_dir)
assert str(data_ouput_path).startswith(eos_dir)
if is_system_test:
# Prepare cache entry and get cached data
result = requests.put(f"{server}/v1/config/weather/provider", json="BrightSky")
assert result.status_code == HTTPStatus.OK
result = requests.post(f"{server}/v1/prediction/update/BrightSky")
assert result.status_code == HTTPStatus.OK
result = requests.get(f"{server}/v1/prediction/series?key=weather_temp_air")
assert result.status_code == HTTPStatus.OK
data = result.json()
assert data["data"] != {}
result = requests.put(f"{server}/v1/config/file")
assert result.status_code == HTTPStatus.OK
# Save cache
result = requests.post(f"{server}/v1/admin/cache/save")
assert result.status_code == HTTPStatus.OK
cache = result.json()
assert cache_file_path.exists()
result = requests.get(f"{server}/v1/admin/cache")
assert result.status_code == HTTPStatus.OK
cache = result.json()
result = requests.get(f"{server}/v1/health")
assert result.status_code == HTTPStatus.OK
pid = result.json()["pid"]
result = requests.post(f"{server}/v1/admin/server/restart")
assert result.status_code == HTTPStatus.OK
assert "Restarting EOS.." in result.json()["message"]
new_pid = result.json()["pid"]
# Wait for server to shut down
for retries in range(10):
try:
result = requests.get(f"{server}/v1/health", timeout=2)
if result.status_code == HTTPStatus.OK:
pid = result.json()["pid"]
if pid == new_pid:
# Already started
break
else:
break
except Exception as ex:
break
time.sleep(3)
# Assure EOS is up again
startup = False
error = ""
for retries in range(int(timeout / 3)):
try:
result = requests.get(f"{server}/v1/health", timeout=2)
if result.status_code == HTTPStatus.OK:
startup = True
break
error = f"{result.status_code}, {str(result.content)}"
except Exception as ex:
error = str(ex)
time.sleep(3)
assert startup, f"Connection to {server}/v1/health failed: {error}"
assert result.json()["status"] == "alive"
pid = result.json()["pid"]
assert pid == new_pid
result = requests.get(f"{server}/v1/admin/cache")
assert result.status_code == HTTPStatus.OK
new_cache = result.json()
assert cache.items() <= new_cache.items()
if is_system_test:
result = requests.get(f"{server}/v1/config")
assert result.status_code == HTTPStatus.OK
assert result.json()["weather"]["provider"] == "BrightSky"
# Wait for initialisation task to have finished
time.sleep(5)
result = requests.get(f"{server}/v1/prediction/series?key=weather_temp_air")
assert result.status_code == HTTPStatus.OK
assert result.json() == data
# Shutdown the newly created server
result = requests.post(f"{server}/v1/admin/server/shutdown")
assert result.status_code == HTTPStatus.OK
assert "Stopping EOS.." in result.json()["message"]
new_pid = result.json()["pid"]