mirror of
https://github.com/Akkudoktor-EOS/EOS.git
synced 2025-04-17 07:55:15 +00:00
445 lines
17 KiB
Python
445 lines
17 KiB
Python
import json
|
|
import os
|
|
import signal
|
|
import time
|
|
from http import HTTPStatus
|
|
from pathlib import Path
|
|
|
|
import psutil
|
|
import pytest
|
|
import requests
|
|
|
|
from akkudoktoreos.server.server import get_default_host
|
|
|
|
DIR_TESTDATA = Path(__file__).absolute().parent.joinpath("testdata")
|
|
|
|
FILE_TESTDATA_EOSSERVER_CONFIG_1 = DIR_TESTDATA.joinpath("eosserver_config_1.json")
|
|
|
|
|
|
class TestServer:
|
|
def test_server_setup_for_class(self, server_setup_for_class):
|
|
"""Ensure server is started."""
|
|
server = server_setup_for_class["server"]
|
|
eos_dir = server_setup_for_class["eos_dir"]
|
|
|
|
result = requests.get(f"{server}/v1/config")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
# Get testing config
|
|
config_json = result.json()
|
|
config_folder_path = Path(config_json["general"]["config_folder_path"])
|
|
config_file_path = Path(config_json["general"]["config_file_path"])
|
|
data_folder_path = Path(config_json["general"]["data_folder_path"])
|
|
data_ouput_path = Path(config_json["general"]["data_output_path"])
|
|
# Assure we are working in test environment
|
|
assert str(config_folder_path).startswith(eos_dir)
|
|
assert str(config_file_path).startswith(eos_dir)
|
|
assert str(data_folder_path).startswith(eos_dir)
|
|
assert str(data_ouput_path).startswith(eos_dir)
|
|
|
|
def test_prediction_brightsky(self, server_setup_for_class, is_system_test):
|
|
"""Test weather prediction by BrightSky."""
|
|
server = server_setup_for_class["server"]
|
|
eos_dir = server_setup_for_class["eos_dir"]
|
|
|
|
result = requests.get(f"{server}/v1/config")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
# Get testing config
|
|
config_json = result.json()
|
|
config_folder_path = Path(config_json["general"]["config_folder_path"])
|
|
# Assure we are working in test environment
|
|
assert str(config_folder_path).startswith(eos_dir)
|
|
|
|
result = requests.put(f"{server}/v1/config/weather/provider", json="BrightSky")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
# Assure prediction is enabled
|
|
result = requests.get(f"{server}/v1/prediction/providers?enabled=true")
|
|
assert result.status_code == HTTPStatus.OK
|
|
providers = result.json()
|
|
assert "BrightSky" in providers
|
|
|
|
if is_system_test:
|
|
result = requests.post(f"{server}/v1/prediction/update/BrightSky")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
result = requests.get(f"{server}/v1/prediction/series?key=weather_temp_air")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
data = result.json()
|
|
assert len(data["data"]) > 24
|
|
|
|
else:
|
|
pass
|
|
|
|
def test_prediction_clearoutside(self, server_setup_for_class, is_system_test):
|
|
"""Test weather prediction by ClearOutside."""
|
|
server = server_setup_for_class["server"]
|
|
eos_dir = server_setup_for_class["eos_dir"]
|
|
|
|
result = requests.put(f"{server}/v1/config/weather/provider", json="ClearOutside")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
# Assure prediction is enabled
|
|
result = requests.get(f"{server}/v1/prediction/providers?enabled=true")
|
|
assert result.status_code == HTTPStatus.OK
|
|
providers = result.json()
|
|
assert "ClearOutside" in providers
|
|
|
|
if is_system_test:
|
|
result = requests.post(f"{server}/v1/prediction/update/ClearOutside")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
result = requests.get(f"{server}/v1/prediction/series?key=weather_temp_air")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
data = result.json()
|
|
assert len(data["data"]) > 24
|
|
|
|
else:
|
|
pass
|
|
|
|
def test_prediction_pvforecastakkudoktor(self, server_setup_for_class, is_system_test):
|
|
"""Test PV prediction by PVForecastAkkudoktor."""
|
|
server = server_setup_for_class["server"]
|
|
eos_dir = server_setup_for_class["eos_dir"]
|
|
|
|
# Reset config
|
|
with FILE_TESTDATA_EOSSERVER_CONFIG_1.open("r", encoding="utf-8", newline=None) as fd:
|
|
config = json.load(fd)
|
|
config["pvforecast"]["provider"] = "PVForecastAkkudoktor"
|
|
result = requests.put(f"{server}/v1/config", json=config)
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
# Assure prediction is enabled
|
|
result = requests.get(f"{server}/v1/prediction/providers?enabled=true")
|
|
assert result.status_code == HTTPStatus.OK
|
|
providers = result.json()
|
|
assert "PVForecastAkkudoktor" in providers
|
|
|
|
if is_system_test:
|
|
result = requests.post(f"{server}/v1/prediction/update/PVForecastAkkudoktor")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
result = requests.get(f"{server}/v1/prediction/series?key=pvforecast_ac_power")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
data = result.json()
|
|
assert len(data["data"]) > 24
|
|
|
|
else:
|
|
pass
|
|
|
|
def test_prediction_elecpriceakkudoktor(self, server_setup_for_class, is_system_test):
|
|
"""Test electricity price prediction by ElecPriceImport."""
|
|
server = server_setup_for_class["server"]
|
|
eos_dir = server_setup_for_class["eos_dir"]
|
|
|
|
# Reset config
|
|
with FILE_TESTDATA_EOSSERVER_CONFIG_1.open("r", encoding="utf-8", newline=None) as fd:
|
|
config = json.load(fd)
|
|
config["elecprice"]["provider"] = "ElecPriceAkkudoktor"
|
|
result = requests.put(f"{server}/v1/config", json=config)
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
# Assure prediction is enabled
|
|
result = requests.get(f"{server}/v1/prediction/providers?enabled=true")
|
|
assert result.status_code == HTTPStatus.OK
|
|
providers = result.json()
|
|
assert "ElecPriceAkkudoktor" in providers
|
|
|
|
if is_system_test:
|
|
result = requests.post(f"{server}/v1/prediction/update/ElecPriceAkkudoktor")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
result = requests.get(f"{server}/v1/prediction/series?key=elecprice_marketprice_wh")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
data = result.json()
|
|
assert len(data["data"]) > 24
|
|
|
|
else:
|
|
pass
|
|
|
|
def test_prediction_loadakkudoktor(self, server_setup_for_class, is_system_test):
|
|
"""Test load prediction by LoadAkkudoktor."""
|
|
server = server_setup_for_class["server"]
|
|
eos_dir = server_setup_for_class["eos_dir"]
|
|
|
|
result = requests.put(f"{server}/v1/config/load/provider", json="LoadAkkudoktor")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
# Assure prediction is enabled
|
|
result = requests.get(f"{server}/v1/prediction/providers?enabled=true")
|
|
assert result.status_code == HTTPStatus.OK
|
|
providers = result.json()
|
|
assert "LoadAkkudoktor" in providers
|
|
|
|
if is_system_test:
|
|
result = requests.post(f"{server}/v1/prediction/update/LoadAkkudoktor")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
result = requests.get(f"{server}/v1/prediction/series?key=load_mean")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
data = result.json()
|
|
assert len(data["data"]) > 24
|
|
|
|
else:
|
|
pass
|
|
|
|
def test_admin_cache(self, server_setup_for_class, is_system_test):
|
|
"""Test whether cache is reconstructed from cached files."""
|
|
server = server_setup_for_class["server"]
|
|
eos_dir = server_setup_for_class["eos_dir"]
|
|
|
|
result = requests.get(f"{server}/v1/admin/cache")
|
|
assert result.status_code == HTTPStatus.OK
|
|
cache = result.json()
|
|
|
|
if is_system_test:
|
|
# There should be some cache data
|
|
assert cache != {}
|
|
|
|
# Save cache
|
|
result = requests.post(f"{server}/v1/admin/cache/save")
|
|
assert result.status_code == HTTPStatus.OK
|
|
cache_saved = result.json()
|
|
assert cache_saved == cache
|
|
|
|
# Clear cache - should clear nothing as all cache files expire in the future
|
|
result = requests.post(f"{server}/v1/admin/cache/clear")
|
|
assert result.status_code == HTTPStatus.OK
|
|
cache_cleared = result.json()
|
|
assert cache_cleared == cache
|
|
|
|
# Force clear cache
|
|
result = requests.post(f"{server}/v1/admin/cache/clear?clear_all=true")
|
|
assert result.status_code == HTTPStatus.OK
|
|
cache_cleared = result.json()
|
|
assert cache_cleared == {}
|
|
|
|
# Try to load already deleted cache entries
|
|
result = requests.post(f"{server}/v1/admin/cache/load")
|
|
assert result.status_code == HTTPStatus.OK
|
|
cache_loaded = result.json()
|
|
assert cache_loaded == {}
|
|
|
|
# Cache should still be empty
|
|
result = requests.get(f"{server}/v1/admin/cache")
|
|
assert result.status_code == HTTPStatus.OK
|
|
cache = result.json()
|
|
assert cache == {}
|
|
|
|
|
|
class TestServerStartStop:
|
|
def test_server_start_eosdash(self, tmpdir):
|
|
"""Test the EOSdash server startup from EOS."""
|
|
# Do not use any fixture as this will make pytest the owner of the EOSdash port.
|
|
host = get_default_host()
|
|
if os.name == "nt":
|
|
# Windows does not provide SIGKILL
|
|
sigkill = signal.SIGTERM # type: ignore[attr-defined,unused-ignore]
|
|
else:
|
|
sigkill = signal.SIGKILL # type: ignore
|
|
port = 8503
|
|
eosdash_port = 8504
|
|
timeout = 120
|
|
|
|
server = f"http://{host}:{port}"
|
|
eosdash_server = f"http://{host}:{eosdash_port}"
|
|
eos_dir = str(tmpdir)
|
|
|
|
# Cleanup any EOSdash process left.
|
|
try:
|
|
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
|
|
if result.status_code == HTTPStatus.OK:
|
|
pid = result.json()["pid"]
|
|
os.kill(pid, sigkill)
|
|
time.sleep(1)
|
|
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
|
|
assert result.status_code != HTTPStatus.OK
|
|
except:
|
|
pass
|
|
|
|
# Wait for EOSdash port to be freed
|
|
process_info: list[dict] = []
|
|
for retries in range(int(timeout / 3)):
|
|
process_info = []
|
|
pids: list[int] = []
|
|
for conn in psutil.net_connections(kind="inet"):
|
|
if conn.laddr.port == eosdash_port:
|
|
if conn.pid not in pids:
|
|
# Get fresh process info
|
|
process = psutil.Process(conn.pid)
|
|
pids.append(conn.pid)
|
|
process_info.append(process.as_dict(attrs=["pid", "cmdline"]))
|
|
if len(process_info) == 0:
|
|
break
|
|
time.sleep(3)
|
|
assert len(process_info) == 0
|
|
|
|
# Import after test setup to prevent creation of config file before test
|
|
from akkudoktoreos.server.eos import start_eosdash
|
|
|
|
process = start_eosdash(
|
|
host=host,
|
|
port=eosdash_port,
|
|
eos_host=host,
|
|
eos_port=port,
|
|
log_level="debug",
|
|
access_log=False,
|
|
reload=False,
|
|
eos_dir=eos_dir,
|
|
eos_config_dir=eos_dir,
|
|
)
|
|
|
|
# Assure EOSdash is up
|
|
startup = False
|
|
error = ""
|
|
for retries in range(int(timeout / 3)):
|
|
try:
|
|
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
|
|
if result.status_code == HTTPStatus.OK:
|
|
startup = True
|
|
break
|
|
error = f"{result.status_code}, {str(result.content)}"
|
|
except Exception as ex:
|
|
error = str(ex)
|
|
time.sleep(3)
|
|
|
|
assert startup, f"Connection to {eosdash_server}/eosdash/health failed: {error}"
|
|
assert result.json()["status"] == "alive"
|
|
|
|
# Shutdown eosdash
|
|
try:
|
|
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
|
|
if result.status_code == HTTPStatus.OK:
|
|
pid = result.json()["pid"]
|
|
os.kill(pid, signal.SIGTERM)
|
|
time.sleep(1)
|
|
result = requests.get(f"{eosdash_server}/eosdash/health", timeout=2)
|
|
assert result.status_code != HTTPStatus.OK
|
|
except:
|
|
pass
|
|
|
|
@pytest.mark.skipif(os.name == "nt", reason="Server restart not supported on Windows")
|
|
def test_server_restart(self, server_setup_for_function, is_system_test):
|
|
"""Test server restart."""
|
|
server = server_setup_for_function["server"]
|
|
eos_dir = server_setup_for_function["eos_dir"]
|
|
timeout = server_setup_for_function["timeout"]
|
|
|
|
result = requests.get(f"{server}/v1/config")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
# Get testing config
|
|
config_json = result.json()
|
|
config_folder_path = Path(config_json["general"]["config_folder_path"])
|
|
config_file_path = Path(config_json["general"]["config_file_path"])
|
|
data_folder_path = Path(config_json["general"]["data_folder_path"])
|
|
data_ouput_path = Path(config_json["general"]["data_output_path"])
|
|
cache_file_path = data_folder_path.joinpath(config_json["cache"]["subpath"]).joinpath(
|
|
"cachefilestore.json"
|
|
)
|
|
# Assure we are working in test environment
|
|
assert str(config_folder_path).startswith(eos_dir)
|
|
assert str(config_file_path).startswith(eos_dir)
|
|
assert str(data_folder_path).startswith(eos_dir)
|
|
assert str(data_ouput_path).startswith(eos_dir)
|
|
|
|
if is_system_test:
|
|
# Prepare cache entry and get cached data
|
|
result = requests.put(f"{server}/v1/config/weather/provider", json="BrightSky")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
result = requests.post(f"{server}/v1/prediction/update/BrightSky")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
result = requests.get(f"{server}/v1/prediction/series?key=weather_temp_air")
|
|
assert result.status_code == HTTPStatus.OK
|
|
data = result.json()
|
|
assert data["data"] != {}
|
|
|
|
result = requests.put(f"{server}/v1/config/file")
|
|
assert result.status_code == HTTPStatus.OK
|
|
|
|
# Save cache
|
|
result = requests.post(f"{server}/v1/admin/cache/save")
|
|
assert result.status_code == HTTPStatus.OK
|
|
cache = result.json()
|
|
|
|
assert cache_file_path.exists()
|
|
|
|
result = requests.get(f"{server}/v1/admin/cache")
|
|
assert result.status_code == HTTPStatus.OK
|
|
cache = result.json()
|
|
|
|
result = requests.get(f"{server}/v1/health")
|
|
assert result.status_code == HTTPStatus.OK
|
|
pid = result.json()["pid"]
|
|
|
|
result = requests.post(f"{server}/v1/admin/server/restart")
|
|
assert result.status_code == HTTPStatus.OK
|
|
assert "Restarting EOS.." in result.json()["message"]
|
|
new_pid = result.json()["pid"]
|
|
|
|
# Wait for server to shut down
|
|
for retries in range(10):
|
|
try:
|
|
result = requests.get(f"{server}/v1/health", timeout=2)
|
|
if result.status_code == HTTPStatus.OK:
|
|
pid = result.json()["pid"]
|
|
if pid == new_pid:
|
|
# Already started
|
|
break
|
|
else:
|
|
break
|
|
except Exception as ex:
|
|
break
|
|
time.sleep(3)
|
|
|
|
# Assure EOS is up again
|
|
startup = False
|
|
error = ""
|
|
for retries in range(int(timeout / 3)):
|
|
try:
|
|
result = requests.get(f"{server}/v1/health", timeout=2)
|
|
if result.status_code == HTTPStatus.OK:
|
|
startup = True
|
|
break
|
|
error = f"{result.status_code}, {str(result.content)}"
|
|
except Exception as ex:
|
|
error = str(ex)
|
|
time.sleep(3)
|
|
|
|
assert startup, f"Connection to {server}/v1/health failed: {error}"
|
|
assert result.json()["status"] == "alive"
|
|
pid = result.json()["pid"]
|
|
assert pid == new_pid
|
|
|
|
result = requests.get(f"{server}/v1/admin/cache")
|
|
assert result.status_code == HTTPStatus.OK
|
|
new_cache = result.json()
|
|
|
|
assert cache.items() <= new_cache.items()
|
|
|
|
if is_system_test:
|
|
result = requests.get(f"{server}/v1/config")
|
|
assert result.status_code == HTTPStatus.OK
|
|
assert result.json()["weather"]["provider"] == "BrightSky"
|
|
|
|
# Wait for initialisation task to have finished
|
|
time.sleep(5)
|
|
|
|
result = requests.get(f"{server}/v1/prediction/series?key=weather_temp_air")
|
|
assert result.status_code == HTTPStatus.OK
|
|
assert result.json() == data
|
|
|
|
# Shutdown the newly created server
|
|
result = requests.post(f"{server}/v1/admin/server/shutdown")
|
|
assert result.status_code == HTTPStatus.OK
|
|
assert "Stopping EOS.." in result.json()["message"]
|
|
new_pid = result.json()["pid"]
|