EOSdash: Improve PV forecast configuration. (#500)

* Allow to configure planes and configuration values of planes separatedly.

Make single configuration values for planes explicitly available for configuration.
Still allows to also configure a plane by a whole plane value struct.

* Enhance admin page by file import and export of the EOS configuration

The actual EOS configuration can now be exported to the EOSdash server.
From there it can be also imported. For security reasons only import and export
from/ to a predefined directory on the EOSdash server is possible.

* Improve handling of nested value pathes in pydantic models.

Added separate methods for nested path access (get_nested_value, set_nested_value).
On value setting the missing fields along the nested path are now added automatically
and initialized with default values. Nested path access was before restricted to the
EOS configuration and is now part of the pydantic base model.

* Makefile

Add new target to run rests as CI does on Github. Improve target docs.

* Datetimeutil tests

Prolong acceptable time difference for comparison of approximately equal times in tests.

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
This commit is contained in:
Bobby Noelte 2025-04-05 13:08:12 +02:00 committed by GitHub
parent e6a8c0508e
commit 0bda5ba4cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1216 additions and 257 deletions

View File

@ -23,6 +23,9 @@ help:
@echo " run-dev - Run EOS development server in virtual environment (automatically reloads)."
@echo " run-dash - Run EOSdash production server in virtual environment."
@echo " run-dash-dev - Run EOSdash development server in virtual environment (automatically reloads)."
@echo " test - Run tests."
@echo " test-full - Run tests with full optimization."
@echo " test-ci - Run tests as CI does. No user config file allowed."
@echo " dist - Create distribution (in dist/)."
@echo " clean - Remove generated documentation, distribution and virtual environment."
@ -110,6 +113,11 @@ test:
@echo "Running tests..."
.venv/bin/pytest -vs --cov src --cov-report term-missing
# Target to run tests as done by CI on Github.
test-ci:
@echo "Running tests as CI..."
.venv/bin/pytest --full-run --check-config-side-effect -vs --cov src --cov-report term-missing
# Target to run all tests.
test-full:
@echo "Running all tests..."

View File

@ -552,8 +552,9 @@ Validators:
| Name | Environment Variable | Type | Read-Only | Default | Description |
| ---- | -------------------- | ---- | --------- | ------- | ----------- |
| provider | `EOS_PVFORECAST__PROVIDER` | `Optional[str]` | `rw` | `None` | PVForecast provider id of provider to be used. |
| planes | `EOS_PVFORECAST__PLANES` | `Optional[list[akkudoktoreos.prediction.pvforecast.PVForecastPlaneSetting]]` | `rw` | `None` | Plane configuration. |
| provider_settings | `EOS_PVFORECAST__PROVIDER_SETTINGS` | `Optional[akkudoktoreos.prediction.pvforecastimport.PVForecastImportCommonSettings]` | `rw` | `None` | Provider settings |
| planes | `EOS_PVFORECAST__PLANES` | `Optional[list[akkudoktoreos.prediction.pvforecast.PVForecastPlaneSetting]]` | `rw` | `None` | Plane configuration. |
| max_planes | `EOS_PVFORECAST__MAX_PLANES` | `Optional[int]` | `rw` | `0` | Maximum number of planes that can be set |
| planes_peakpower | | `List[float]` | `ro` | `N/A` | Compute a list of the peak power per active planes. |
| planes_azimuth | | `List[float]` | `ro` | `N/A` | Compute a list of the azimuths per active planes. |
| planes_tilt | | `List[float]` | `ro` | `N/A` | Compute a list of the tilts per active planes. |
@ -569,6 +570,7 @@ Validators:
{
"pvforecast": {
"provider": "PVForecastAkkudoktor",
"provider_settings": null,
"planes": [
{
"surface_tilt": 10.0,
@ -615,7 +617,7 @@ Validators:
"strings_per_inverter": 2
}
],
"provider_settings": null
"max_planes": 0
}
}
```
@ -628,6 +630,7 @@ Validators:
{
"pvforecast": {
"provider": "PVForecastAkkudoktor",
"provider_settings": null,
"planes": [
{
"surface_tilt": 10.0,
@ -674,7 +677,7 @@ Validators:
"strings_per_inverter": 2
}
],
"provider_settings": null,
"max_planes": 0,
"planes_peakpower": [
5.0,
3.5
@ -707,33 +710,6 @@ Validators:
}
```
### Common settings for pvforecast data import from file or JSON string
:::{table} pvforecast::provider_settings
:widths: 10 10 5 5 30
:align: left
| Name | Type | Read-Only | Default | Description |
| ---- | ---- | --------- | ------- | ----------- |
| import_file_path | `Union[str, pathlib.Path, NoneType]` | `rw` | `None` | Path to the file to import PV forecast data from. |
| import_json | `Optional[str]` | `rw` | `None` | JSON string, dictionary of PV forecast value lists. |
:::
#### Example Input/Output
```{eval-rst}
.. code-block:: json
{
"pvforecast": {
"provider_settings": {
"import_file_path": null,
"import_json": "{\"pvforecast_ac_power\": [0, 8.05, 352.91]}"
}
}
}
```
### PV Forecast Plane Configuration
:::{table} pvforecast::planes::list
@ -742,8 +718,8 @@ Validators:
| Name | Type | Read-Only | Default | Description |
| ---- | ---- | --------- | ------- | ----------- |
| surface_tilt | `Optional[float]` | `rw` | `None` | Tilt angle from horizontal plane. Ignored for two-axis tracking. |
| surface_azimuth | `Optional[float]` | `rw` | `None` | Orientation (azimuth angle) of the (fixed) plane. Clockwise from north (north=0, east=90, south=180, west=270). |
| surface_tilt | `Optional[float]` | `rw` | `30.0` | Tilt angle from horizontal plane. Ignored for two-axis tracking. |
| surface_azimuth | `Optional[float]` | `rw` | `180.0` | Orientation (azimuth angle) of the (fixed) plane. Clockwise from north (north=0, east=90, south=180, west=270). |
| userhorizon | `Optional[List[float]]` | `rw` | `None` | Elevation of horizon in degrees, at equally spaced azimuth clockwise from north. |
| peakpower | `Optional[float]` | `rw` | `None` | Nominal power of PV system in kW. |
| pvtechchoice | `Optional[str]` | `rw` | `crystSi` | PV technology. One of 'crystSi', 'CIS', 'CdTe', 'Unknown'. |
@ -755,7 +731,7 @@ Validators:
| albedo | `Optional[float]` | `rw` | `None` | Proportion of the light hitting the ground that it reflects back. |
| module_model | `Optional[str]` | `rw` | `None` | Model of the PV modules of this plane. |
| inverter_model | `Optional[str]` | `rw` | `None` | Model of the inverter of this plane. |
| inverter_paco | `Optional[int]` | `rw` | `None` | AC power rating of the inverter. [W] |
| inverter_paco | `Optional[int]` | `rw` | `None` | AC power rating of the inverter [W]. |
| modules_per_string | `Optional[int]` | `rw` | `None` | Number of the PV modules of the strings of this plane. |
| strings_per_inverter | `Optional[int]` | `rw` | `None` | Number of the strings of the inverter of this plane. |
:::
@ -817,6 +793,33 @@ Validators:
}
```
### Common settings for pvforecast data import from file or JSON string
:::{table} pvforecast::provider_settings
:widths: 10 10 5 5 30
:align: left
| Name | Type | Read-Only | Default | Description |
| ---- | ---- | --------- | ------- | ----------- |
| import_file_path | `Union[str, pathlib.Path, NoneType]` | `rw` | `None` | Path to the file to import PV forecast data from. |
| import_json | `Optional[str]` | `rw` | `None` | JSON string, dictionary of PV forecast value lists. |
:::
#### Example Input/Output
```{eval-rst}
.. code-block:: json
{
"pvforecast": {
"provider_settings": {
"import_file_path": null,
"import_json": "{\"pvforecast_ac_power\": [0, 8.05, 352.91]}"
}
}
}
```
## Weather Forecast Configuration
:::{table} weather
@ -997,6 +1000,7 @@ Validators:
},
"pvforecast": {
"provider": "PVForecastAkkudoktor",
"provider_settings": null,
"planes": [
{
"surface_tilt": 10.0,
@ -1043,7 +1047,7 @@ Validators:
"strings_per_inverter": 2
}
],
"provider_settings": null
"max_planes": 0
},
"weather": {
"provider": "WeatherImport",

View File

@ -209,6 +209,7 @@
"pvforecast": {
"$ref": "#/components/schemas/PVForecastCommonSettings-Output",
"default": {
"max_planes": 0,
"planes_azimuth": [],
"planes_inverter_paco": [],
"planes_peakpower": [],
@ -1553,6 +1554,20 @@
"PVForecastCommonSettings-Input": {
"description": "PV Forecast Configuration.",
"properties": {
"max_planes": {
"anyOf": [
{
"minimum": 0.0,
"type": "integer"
},
{
"type": "null"
}
],
"default": 0,
"description": "Maximum number of planes that can be set",
"title": "Max Planes"
},
"planes": {
"anyOf": [
{
@ -1646,6 +1661,20 @@
"PVForecastCommonSettings-Output": {
"description": "PV Forecast Configuration.",
"properties": {
"max_planes": {
"anyOf": [
{
"minimum": 0.0,
"type": "integer"
},
{
"type": "null"
}
],
"default": 0,
"description": "Maximum number of planes that can be set",
"title": "Max Planes"
},
"planes": {
"anyOf": [
{
@ -1864,7 +1893,7 @@
"type": "null"
}
],
"description": "AC power rating of the inverter. [W]",
"description": "AC power rating of the inverter [W].",
"examples": [
6000,
4000
@ -2012,6 +2041,7 @@
"type": "null"
}
],
"default": 180.0,
"description": "Orientation (azimuth angle) of the (fixed) plane. Clockwise from north (north=0, east=90, south=180, west=270).",
"examples": [
10.0,
@ -2028,6 +2058,7 @@
"type": "null"
}
],
"default": 30.0,
"description": "Tilt angle from horizontal plane. Ignored for two-axis tracking.",
"examples": [
10.0,

View File

@ -31,7 +31,7 @@ from akkudoktoreos.core.decorators import classproperty
from akkudoktoreos.core.emsettings import EnergyManagementCommonSettings
from akkudoktoreos.core.logging import get_logger
from akkudoktoreos.core.logsettings import LoggingCommonSettings
from akkudoktoreos.core.pydantic import access_nested_value, merge_models
from akkudoktoreos.core.pydantic import PydanticModelNestedValueMixin, merge_models
from akkudoktoreos.devices.settings import DevicesCommonSettings
from akkudoktoreos.measurement.measurement import MeasurementCommonSettings
from akkudoktoreos.optimization.optimization import OptimizationCommonSettings
@ -138,7 +138,7 @@ class GeneralSettings(SettingsBaseModel):
return self._config_file_path
class SettingsEOS(BaseSettings):
class SettingsEOS(BaseSettings, PydanticModelNestedValueMixin):
"""Settings for all EOS.
Used by updating the configuration with specific settings only.
@ -426,32 +426,6 @@ class ConfigEOS(SingletonMixin, SettingsEOSDefaults):
"""
self._setup()
def set_config_value(self, path: str, value: Any) -> None:
"""Set a configuration value based on the provided path.
Supports string paths (with '/' separators) or sequence paths (list/tuple).
Trims leading and trailing '/' from string paths.
Args:
path (str): The path to the configuration key (e.g., "key1/key2/key3" or key1/key2/0).
value (Any]): The value to set.
"""
access_nested_value(self, path, True, value)
def get_config_value(self, path: str) -> Any:
"""Get a configuration value based on the provided path.
Supports string paths (with '/' separators) or sequence paths (list/tuple).
Trims leading and trailing '/' from string paths.
Args:
path (str): The path to the configuration key (e.g., "key1/key2/key3" or key1/key2/0).
Returns:
Any: The retrieved value.
"""
return access_nested_value(self, path, False)
def _create_initial_config_file(self) -> None:
if self.general.config_file_path and not self.general.config_file_path.exists():
self.general.config_file_path.parent.mkdir(parents=True, exist_ok=True)

View File

@ -15,7 +15,7 @@ Key Features:
import json
import re
from copy import deepcopy
from typing import Any, Dict, List, Optional, Type, Union
from typing import Any, Dict, List, Optional, Type, Union, get_args, get_origin
from zoneinfo import ZoneInfo
import pandas as pd
@ -51,70 +51,6 @@ def merge_models(source: BaseModel, update_dict: dict[str, Any]) -> dict[str, An
return merged_dict
def access_nested_value(
model: BaseModel, path: str, setter: bool, value: Optional[Any] = None
) -> Any:
"""Get or set a nested model value based on the provided path.
Supports string paths (with '/' separators) or sequence paths (list/tuple).
Trims leading and trailing '/' from string paths.
Args:
model (BaseModel): The model object for partial assignment.
path (str): The path to the model key (e.g., "key1/key2/key3" or key1/key2/0).
setter (bool): True to set value at path, False to return value at path.
value (Optional[Any]): The value to set.
Returns:
Any: The retrieved value if acting as a getter, or None if setting a value.
"""
path_elements = path.strip("/").split("/")
cfg: Any = model
parent: BaseModel = model
model_key: str = ""
for i, key in enumerate(path_elements):
is_final_key = i == len(path_elements) - 1
if isinstance(cfg, list):
try:
idx = int(key)
if is_final_key:
if not setter: # Getter
return cfg[idx]
else: # Setter
new_list = list(cfg)
new_list[idx] = value
# Trigger validation
setattr(parent, model_key, new_list)
else:
cfg = cfg[idx]
except ValidationError as e:
raise ValueError(f"Error updating model: {e}") from e
except (ValueError, IndexError) as e:
raise IndexError(f"Invalid list index at {path}: {key}") from e
elif isinstance(cfg, BaseModel):
parent = cfg
model_key = key
if is_final_key:
if not setter: # Getter
return getattr(cfg, key)
else: # Setter
try:
# Verification also if nested value is provided opposed to just setattr
# Will merge partial assignment
cfg = cfg.__pydantic_validator__.validate_assignment(cfg, key, value)
except Exception as e:
raise ValueError(f"Error updating model: {e}") from e
else:
cfg = getattr(cfg, key)
else:
raise KeyError(f"Key '{key}' not found in model.")
class PydanticTypeAdapterDateTime(TypeAdapter[pendulum.DateTime]):
"""Custom type adapter for Pendulum DateTime fields."""
@ -146,7 +82,333 @@ class PydanticTypeAdapterDateTime(TypeAdapter[pendulum.DateTime]):
return bool(re.match(iso8601_pattern, value))
class PydanticBaseModel(BaseModel):
class PydanticModelNestedValueMixin:
"""A mixin providing methods to get and set nested values within a Pydantic model.
The methods use a '/'-separated path to denote the nested values.
Supports handling `Optional`, `List`, and `Dict` types, ensuring correct initialization of
missing attributes.
"""
def get_nested_value(self, path: str) -> Any:
"""Retrieve a nested value from the model using a '/'-separated path.
Supports accessing nested attributes and list indices.
Args:
path (str): A '/'-separated path to the nested attribute (e.g., "key1/key2/0").
Returns:
Any: The retrieved value.
Raises:
KeyError: If a key is not found in the model.
IndexError: If a list index is out of bounds or invalid.
Example:
```python
class Address(PydanticBaseModel):
city: str
class User(PydanticBaseModel):
name: str
address: Address
user = User(name="Alice", address=Address(city="New York"))
city = user.get_nested_value("address/city")
print(city) # Output: "New York"
```
"""
path_elements = path.strip("/").split("/")
model: Any = self
for key in path_elements:
if isinstance(model, list):
try:
model = model[int(key)]
except (ValueError, IndexError) as e:
raise IndexError(f"Invalid list index at '{path}': {key}; {e}")
elif isinstance(model, BaseModel):
model = getattr(model, key)
else:
raise KeyError(f"Key '{key}' not found in model.")
return model
def set_nested_value(self, path: str, value: Any) -> None:
"""Set a nested value in the model using a '/'-separated path.
Supports modifying nested attributes and list indices while preserving Pydantic validation.
Automatically initializes missing `Optional`, `Union`, `dict`, and `list` fields if necessary.
If a missing field cannot be initialized, raises an exception.
Args:
path (str): A '/'-separated path to the nested attribute (e.g., "key1/key2/0").
value (Any): The new value to set.
Raises:
KeyError: If a key is not found in the model.
IndexError: If a list index is out of bounds or invalid.
ValueError: If a validation error occurs.
TypeError: If a missing field cannot be initialized.
Example:
```python
class Address(PydanticBaseModel):
city: Optional[str]
class User(PydanticBaseModel):
name: str
address: Optional[Address]
settings: Optional[Dict[str, Any]]
user = User(name="Alice", address=None, settings=None)
user.set_nested_value("address/city", "Los Angeles")
user.set_nested_value("settings/theme", "dark")
print(user.address.city) # Output: "Los Angeles"
print(user.settings) # Output: {'theme': 'dark'}
```
"""
path_elements = path.strip("/").split("/")
# The model we are currently working on
model: Any = self
# The model we get the type information from. It is a pydantic BaseModel
parent: BaseModel = model
# The field that provides type information for the current key
# Fields may have nested types that translates to a sequence of keys, not just one
# - my_field: Optional[list[OtherModel]] -> e.g. "myfield/0" for index 0
# parent_key = ["myfield",] ... ["myfield", "0"]
# parent_key_types = [list, OtherModel]
parent_key: list[str] = []
parent_key_types: list = []
for i, key in enumerate(path_elements):
is_final_key = i == len(path_elements) - 1
# Add current key to parent key to enable nested type tracking
parent_key.append(key)
# Get next value
next_value = None
if isinstance(model, BaseModel):
# If this is the final key, set the value
if is_final_key:
try:
model.__pydantic_validator__.validate_assignment(model, key, value)
except ValidationError as e:
raise ValueError(f"Error updating model: {e}") from e
return
# Track parent and key for possible assignment later
parent = model
parent_key = [
key,
]
parent_key_types = self._get_key_types(model, key)
# Attempt to access the next attribute, handling None values
next_value = getattr(model, key, None)
# Handle missing values (initialize dict/list/model if necessary)
if next_value is None:
next_type = parent_key_types[len(parent_key) - 1]
next_value = self._initialize_value(next_type)
if next_value is None:
raise TypeError(
f"Unable to initialize missing value for key '{key}' in path '{path}' with type {next_type} of {parent_key}:{parent_key_types}."
)
setattr(parent, key, next_value)
# pydantic may copy on validation assignment - reread to get the copied model
next_value = getattr(model, key, None)
elif isinstance(model, list):
# Handle lists (ensure index exists and modify safely)
try:
idx = int(key)
except Exception as e:
raise IndexError(
f"Invalid list index '{key}' at '{path}': key = {key}; parent = {parent}, parent_key = {parent_key}; model = {model}; {e}"
)
# Get next type from parent key type information
next_type = parent_key_types[len(parent_key) - 1]
if len(model) > idx:
next_value = model[idx]
else:
# Extend the list with default values if index is out of range
while len(model) <= idx:
next_value = self._initialize_value(next_type)
if next_value is None:
raise TypeError(
f"Unable to initialize missing value for key '{key}' in path '{path}' with type {next_type} of {parent_key}:{parent_key_types}."
)
model.append(next_value)
if is_final_key:
if (
(isinstance(next_type, type) and not isinstance(value, next_type))
or (next_type is dict and not isinstance(value, dict))
or (next_type is list and not isinstance(value, list))
):
raise TypeError(
f"Expected type {next_type} for key '{key}' in path '{path}', but got {type(value)}: {value}"
)
model[idx] = value
return
elif isinstance(model, dict):
# Handle dictionaries (auto-create missing keys)
# Get next type from parent key type information
next_type = parent_key_types[len(parent_key) - 1]
if is_final_key:
if (
(isinstance(next_type, type) and not isinstance(value, next_type))
or (next_type is dict and not isinstance(value, dict))
or (next_type is list and not isinstance(value, list))
):
raise TypeError(
f"Expected type {next_type} for key '{key}' in path '{path}', but got {type(value)}: {value}"
)
model[key] = value
return
if key not in model:
next_value = self._initialize_value(next_type)
if next_value is None:
raise TypeError(
f"Unable to initialize missing value for key '{key}' in path '{path}' with type {next_type} of {parent_key}:{parent_key_types}."
)
model[key] = next_value
else:
next_value = model[key]
else:
raise KeyError(f"Key '{key}' not found in model.")
# Move deeper
model = next_value
@staticmethod
def _get_key_types(model: Type[BaseModel], key: str) -> List[Union[Type[Any], list, dict]]:
"""Returns a list of nested types for a given Pydantic model key.
- Skips `Optional` and `Union`, using only the first non-None type.
- Skips dictionary keys and only adds value types.
- Keeps `list` and `dict` as origins.
Args:
model (Type[BaseModel]): The Pydantic model class to inspect.
key (str): The attribute name in the model.
Returns:
List[Union[Type[Any], list, dict]]: A list of extracted types, preserving `list` and `dict` origins.
Raises:
TypeError: If the key does not exist or lacks a valid type annotation.
"""
if key not in model.model_fields:
raise TypeError(f"Field '{key}' does not exist in model '{model.__name__}'.")
field_annotation = model.model_fields[key].annotation
if not field_annotation:
raise TypeError(
f"Missing type annotation for field '{key}' in model '{model.__name__}'."
)
nested_types: list[Union[Type[Any], list, dict]] = []
queue: list[Any] = [field_annotation]
while queue:
annotation = queue.pop(0)
origin = get_origin(annotation)
args = get_args(annotation)
# Handle Union (Optional[X] is treated as Union[X, None])
if origin is Union:
queue.extend(arg for arg in args if arg is not type(None))
continue
# Handle lists and dictionaries
if origin is list:
nested_types.append(list)
if args:
queue.append(args[0]) # Extract value type for list[T]
continue
if origin is dict:
nested_types.append(dict)
if len(args) == 2:
queue.append(args[1]) # Extract only the value type for dict[K, V]
continue
# If it's a BaseModel, add it to the list
if isinstance(annotation, type) and issubclass(annotation, BaseModel):
nested_types.append(annotation)
continue
# Otherwise, it's a standard type (e.g., str, int, bool, float, etc.)
nested_types.append(annotation)
return nested_types
@staticmethod
def _initialize_value(type_hint: Type[Any] | None | list[Any] | dict[Any, Any]) -> Any:
"""Initialize a missing value based on the provided type hint.
Args:
type_hint (Type[Any] | None | list[Any] | dict[Any, Any]): The type hint that determines
how the missing value should be initialized.
Returns:
Any: An instance of the expected type (e.g., list, dict, or Pydantic model), or `None`
if initialization is not possible.
Raises:
TypeError: If instantiation fails.
Example:
- For `list[str]`, returns `[]`
- For `dict[str, Any]`, returns `{}`
- For `Address` (a Pydantic model), returns a new `Address()` instance.
"""
if type_hint is None:
return None
# Handle direct instances of list or dict
if isinstance(type_hint, list):
return []
if isinstance(type_hint, dict):
return {}
origin = get_origin(type_hint)
# Handle generic list and dictionary
if origin is list:
return []
if origin is dict:
return {}
# Handle Pydantic models
if isinstance(type_hint, type) and issubclass(type_hint, BaseModel):
try:
return type_hint.model_construct()
except Exception as e:
raise TypeError(f"Failed to initialize model '{type_hint.__name__}': {e}")
# Handle standard built-in types (int, float, str, bool, etc.)
if isinstance(type_hint, type):
try:
return type_hint()
except Exception as e:
raise TypeError(f"Failed to initialize instance of '{type_hint.__name__}': {e}")
raise TypeError(f"Unsupported type hint '{type_hint}' for initialization.")
class PydanticBaseModel(BaseModel, PydanticModelNestedValueMixin):
"""Base model class with automatic serialization and deserialization of `pendulum.DateTime` fields.
This model serializes pendulum.DateTime objects to ISO 8601 strings and

View File

@ -1,6 +1,6 @@
"""PV forecast module for PV power predictions."""
from typing import Any, ClassVar, List, Optional, Self
from typing import Any, List, Optional, Self
from pydantic import Field, computed_field, field_validator, model_validator
@ -27,12 +27,12 @@ class PVForecastPlaneSetting(SettingsBaseModel):
# latitude: Optional[float] = Field(default=None, description="Latitude in decimal degrees, between -90 and 90, north is positive (ISO 19115) (°)")
surface_tilt: Optional[float] = Field(
default=None,
default=30.0,
description="Tilt angle from horizontal plane. Ignored for two-axis tracking.",
examples=[10.0, 20.0],
)
surface_azimuth: Optional[float] = Field(
default=None,
default=180.0,
description="Orientation (azimuth angle) of the (fixed) plane. Clockwise from north (north=0, east=90, south=180, west=270).",
examples=[10.0, 20.0],
)
@ -81,7 +81,7 @@ class PVForecastPlaneSetting(SettingsBaseModel):
default=None, description="Model of the inverter of this plane.", examples=[None]
)
inverter_paco: Optional[int] = Field(
default=None, description="AC power rating of the inverter. [W]", examples=[6000, 4000]
default=None, description="AC power rating of the inverter [W].", examples=[6000, 4000]
)
modules_per_string: Optional[int] = Field(
default=None,
@ -132,13 +132,21 @@ class PVForecastCommonSettings(SettingsBaseModel):
examples=["PVForecastAkkudoktor"],
)
provider_settings: Optional[PVForecastImportCommonSettings] = Field(
default=None, description="Provider settings", examples=[None]
)
planes: Optional[list[PVForecastPlaneSetting]] = Field(
default=None,
description="Plane configuration.",
examples=[get_model_structure_from_examples(PVForecastPlaneSetting, True)],
)
max_planes: ClassVar[int] = 6 # Maximum number of planes that can be set
max_planes: Optional[int] = Field(
default=0,
ge=0,
description="Maximum number of planes that can be set",
)
# Validators
@field_validator("provider", mode="after")
@ -150,18 +158,6 @@ class PVForecastCommonSettings(SettingsBaseModel):
f"Provider '{value}' is not a valid PV forecast provider: {pvforecast_providers}."
)
@field_validator("planes")
def validate_planes(
cls, planes: Optional[list[PVForecastPlaneSetting]]
) -> Optional[list[PVForecastPlaneSetting]]:
if planes is not None and len(planes) > cls.max_planes:
raise ValueError(f"Maximum number of supported planes: {cls.max_planes}.")
return planes
provider_settings: Optional[PVForecastImportCommonSettings] = Field(
default=None, description="Provider settings", examples=[None]
)
## Computed fields
@computed_field # type: ignore[prop-decorator]
@property

View File

@ -4,23 +4,40 @@ This module provides functions to generate administrative UI components
for the EOS dashboard.
"""
import json
from pathlib import Path
from typing import Any, Optional, Union
import requests
from fasthtml.common import Div
from fasthtml.common import Select
from monsterui.foundations import stringify
from monsterui.franken import (
from monsterui.franken import ( # Select, TODO: Select from FrankenUI does not work - using Select from FastHTML instead
H3,
Button,
ButtonT,
Card,
Details,
Div,
DivHStacked,
DividerLine,
Grid,
Input,
Options,
P,
Summary,
UkIcon,
)
from platformdirs import user_config_dir
from akkudoktoreos.core.logging import get_logger
from akkudoktoreos.server.dash.components import Error, Success
from akkudoktoreos.server.dash.configuration import get_nested_value
from akkudoktoreos.utils.datetimeutil import to_datetime
logger = get_logger(__name__)
# Directory to export files to, or to import files from
export_import_directory = Path(user_config_dir("net.akkudoktor.eosdash", "akkudoktor"))
def AdminButton(*c: Any, cls: Optional[Union[str, tuple]] = None, **kwargs: Any) -> Button:
@ -41,7 +58,9 @@ def AdminButton(*c: Any, cls: Optional[Union[str, tuple]] = None, **kwargs: Any)
return Button(*c, submit=False, **kwargs)
def AdminConfig(eos_host: str, eos_port: Union[str, int], data: Optional[dict]) -> Card:
def AdminConfig(
eos_host: str, eos_port: Union[str, int], data: Optional[dict], config: Optional[dict[str, Any]]
) -> tuple[str, Union[Card, list[Card]]]:
"""Creates a configuration management card with save-to-file functionality.
Args:
@ -50,13 +69,28 @@ def AdminConfig(eos_host: str, eos_port: Union[str, int], data: Optional[dict])
data (Optional[dict]): Incoming data containing action and category for processing.
Returns:
tuple[str, Card]: A tuple containing the configuration category label and the `Card` UI component.
tuple[str, Union[Card, list[Card]]]: A tuple containing the configuration category label and the `Card` UI component.
"""
server = f"http://{eos_host}:{eos_port}"
eos_hostname = "EOS server"
eosdash_hostname = "EOSdash server"
category = "configuration"
# save config file
status = (None,)
if data and data["category"] == category:
config_file_path = "<unknown>"
try:
if config:
config_file_path = get_nested_value(config, ["general", "config_file_path"])
except:
pass
# export config file
export_to_file_next_tag = to_datetime(as_string="YYYYMMDDHHmmss")
export_to_file_status = (None,)
# import config file
import_from_file_status = (None,)
if data and data.get("category", None) == category:
# This data is for us
if data["action"] == "save_to_file":
# Safe current configuration to file
@ -64,39 +98,156 @@ def AdminConfig(eos_host: str, eos_port: Union[str, int], data: Optional[dict])
result = requests.put(f"{server}/v1/config/file")
result.raise_for_status()
config_file_path = result.json()["general"]["config_file_path"]
status = P(
f"Actual config saved to {config_file_path} on {server}",
cls="text-left",
)
except requests.exceptions.HTTPError as err:
status = Success(f"Saved to '{config_file_path}' on '{eos_hostname}'")
except requests.exceptions.HTTPError as e:
detail = result.json()["detail"]
status = P(
f"Can not save actual config to file on {server}: {err}, {detail}",
cls="text-left",
status = Error(
f"Can not save actual config to file on '{eos_hostname}': {e}, {detail}"
)
except Exception as e:
status = Error(f"Can not save actual config to file on '{eos_hostname}': {e}")
elif data["action"] == "export_to_file":
# Export current configuration to file
export_to_file_tag = data.get("export_to_file_tag", export_to_file_next_tag)
export_to_file_path = export_import_directory.joinpath(
f"eos_config_{export_to_file_tag}.json"
)
try:
if not config:
raise ValueError(f"No config from '{eos_hostname}'")
export_to_file_path.parent.mkdir(parents=True, exist_ok=True)
with export_to_file_path.open("w", encoding="utf-8", newline="\n") as fd:
json.dump(config, fd, indent=4, sort_keys=True)
export_to_file_status = Success(
f"Exported to '{export_to_file_path}' on '{eosdash_hostname}'"
)
except requests.exceptions.HTTPError as e:
detail = result.json()["detail"]
export_to_file_status = Error(
f"Can not export actual config to '{export_to_file_path}' on '{eosdash_hostname}': {e}, {detail}"
)
except Exception as e:
export_to_file_status = Error(
f"Can not export actual config to '{export_to_file_path}' on '{eosdash_hostname}': {e}"
)
elif data["action"] == "import_from_file":
import_file_name = data.get("import_file_name", None)
import_from_file_pathes = list(
export_import_directory.glob("*.json")
) # expand generator object
import_file_path = None
for f in import_from_file_pathes:
if f.name == import_file_name:
import_file_path = f
if import_file_path:
try:
with import_file_path.open("r", encoding="utf-8", newline=None) as fd:
import_config = json.load(fd)
result = requests.put(f"{server}/v1/config", json=import_config)
result.raise_for_status()
import_from_file_status = Success(
f"Config imported from '{import_file_path}' on '{eosdash_hostname}'"
)
except requests.exceptions.HTTPError as e:
detail = result.json()["detail"]
import_from_file_status = Error(
f"Can not import config from '{import_file_name}' on '{eosdash_hostname}' {e}, {detail}"
)
except Exception as e:
import_from_file_status = Error(
f"Can not import config from '{import_file_name}' on '{eosdash_hostname}' {e}"
)
else:
import_from_file_status = Error(
f"Can not import config from '{import_file_name}', not found in '{export_import_directory}' on '{eosdash_hostname}'"
)
# Update for display, in case we added a new file before
import_from_file_names = [f.name for f in list(export_import_directory.glob("*.json"))]
return (
category,
Card(
Details(
Summary(
Grid(
DivHStacked(
UkIcon(icon="play"),
AdminButton(
"Save to file",
hx_post="/eosdash/admin",
hx_target="#page-content",
hx_swap="innerHTML",
hx_vals='{"category": "configuration", "action": "save_to_file"}',
[
Card(
Details(
Summary(
Grid(
DivHStacked(
UkIcon(icon="play"),
AdminButton(
"Save to file",
hx_post="/eosdash/admin",
hx_target="#page-content",
hx_swap="innerHTML",
hx_vals='{"category": "configuration", "action": "save_to_file"}',
),
P(f"'{config_file_path}' on '{eos_hostname}'"),
),
status,
),
status,
cls="list-none",
),
cls="list-none",
P(f"Safe actual configuration to '{config_file_path}' on '{eos_hostname}'."),
),
P(f"Safe actual configuration to config file on {server}."),
),
),
Card(
Details(
Summary(
Grid(
DivHStacked(
UkIcon(icon="play"),
AdminButton(
"Export to file",
hx_post="/eosdash/admin",
hx_target="#page-content",
hx_swap="innerHTML",
hx_vals='js:{"category": "configuration", "action": "export_to_file", "export_to_file_tag": document.querySelector("[name=\'chosen_export_file_tag\']").value }',
),
P("'eos_config_"),
Input(
id="export_file_tag",
name="chosen_export_file_tag",
value=export_to_file_next_tag,
),
P(".json'"),
),
export_to_file_status,
),
cls="list-none",
),
P(
f"Export actual configuration to 'eos_config_{export_to_file_next_tag}.json' on '{eosdash_hostname}'."
),
),
),
Card(
Details(
Summary(
Grid(
DivHStacked(
UkIcon(icon="play"),
AdminButton(
"Import from file",
hx_post="/eosdash/admin",
hx_target="#page-content",
hx_swap="innerHTML",
hx_vals='js:{ "category": "configuration", "action": "import_from_file", "import_file_name": document.querySelector("[name=\'selected_import_file_name\']").value }',
),
Select(
*Options(*import_from_file_names),
id="import_file_name",
name="selected_import_file_name", # Name of hidden input field with selected value
placeholder="Select file",
),
),
import_from_file_status,
),
cls="list-none",
),
P(f"Import configuration from config file on '{eosdash_hostname}'."),
),
),
],
)
@ -113,15 +264,36 @@ def Admin(eos_host: str, eos_port: Union[str, int], data: Optional[dict] = None)
Returns:
Div: A `Div` component containing the assembled admin interface.
"""
# Get current configuration from server
server = f"http://{eos_host}:{eos_port}"
try:
result = requests.get(f"{server}/v1/config")
result.raise_for_status()
config = result.json()
except requests.exceptions.HTTPError as e:
config = {}
detail = result.json()["detail"]
warning_msg = f"Can not retrieve configuration from {server}: {e}, {detail}"
logger.warning(warning_msg)
return Error(warning_msg)
except Exception as e:
warning_msg = f"Can not retrieve configuration from {server}: {e}"
logger.warning(warning_msg)
return Error(warning_msg)
rows = []
last_category = ""
for category, admin in [
AdminConfig(eos_host, eos_port, data),
AdminConfig(eos_host, eos_port, data, config),
]:
if category != last_category:
rows.append(P(category))
rows.append(H3(category))
rows.append(DividerLine())
last_category = category
rows.append(admin)
if isinstance(admin, list):
for card in admin:
rows.append(card)
else:
rows.append(admin)
return Div(*rows, cls="space-y-4")

View File

@ -1,8 +1,13 @@
from typing import Any, Optional, Union
from fasthtml.common import H1, Div, Li
from monsterui.daisy import (
Alert,
AlertT,
)
from monsterui.foundations import stringify
from monsterui.franken import (
H3,
Button,
ButtonT,
Card,
@ -68,6 +73,26 @@ def ScrollArea(
)
def Success(*c: Any) -> Alert:
return Alert(
DivLAligned(
UkIcon("check"),
P(*c),
),
cls=AlertT.success,
)
def Error(*c: Any) -> Alert:
return Alert(
DivLAligned(
UkIcon("triangle-alert"),
P(*c),
),
cls=AlertT.error,
)
def ConfigCard(
config_name: str,
config_type: str,
@ -79,7 +104,28 @@ def ConfigCard(
update_value: Optional[str],
update_open: Optional[bool],
) -> Card:
"""Creates a styled configuration card."""
"""Creates a styled configuration card for displaying configuration details.
This function generates a configuration card that is displayed in the UI with
various sections such as configuration name, type, description, default value,
current value, and error details. It supports both read-only and editable modes.
Args:
config_name (str): The name of the configuration.
config_type (str): The type of the configuration.
read_only (str): Indicates if the configuration is read-only ("rw" for read-write,
any other value indicates read-only).
value (str): The current value of the configuration.
default (str): The default value of the configuration.
description (str): A description of the configuration.
update_error (Optional[str]): The error message, if any, during the update process.
update_value (Optional[str]): The value to be updated, if different from the current value.
update_open (Optional[bool]): A flag indicating whether the update section of the card
should be initially expanded.
Returns:
Card: A styled Card component containing the configuration details.
"""
config_id = config_name.replace(".", "-")
if not update_value:
update_value = value
@ -207,7 +253,7 @@ def DashboardTabs(dashboard_items: dict[str, str]) -> Card:
dash_items = [
Li(
DashboardTrigger(
menu,
H3(menu),
hx_get=f"{path}",
hx_target="#page-content",
hx_swap="innerHTML",

View File

@ -2,17 +2,32 @@ import json
from typing import Any, Dict, List, Optional, Sequence, TypeVar, Union
import requests
from monsterui.franken import Div, DividerLine, P
from monsterui.franken import (
H3,
H4,
Card,
Details,
Div,
DividerLine,
DivLAligned,
DivRAligned,
Form,
Grid,
Input,
P,
Summary,
UkIcon,
)
from pydantic.fields import ComputedFieldInfo, FieldInfo
from pydantic_core import PydanticUndefined
from akkudoktoreos.config.config import get_config
from akkudoktoreos.config.config import ConfigEOS
from akkudoktoreos.core.logging import get_logger
from akkudoktoreos.core.pydantic import PydanticBaseModel
from akkudoktoreos.prediction.pvforecast import PVForecastPlaneSetting
from akkudoktoreos.server.dash.components import ConfigCard
logger = get_logger(__name__)
config_eos = get_config()
T = TypeVar("T")
@ -53,10 +68,10 @@ def get_nested_value(
# Traverse the structure
current = dictionary
for key in keys:
if isinstance(current, dict) and isinstance(key, str):
current = current[key]
elif isinstance(current, list) and isinstance(key, int):
current = current[key]
if isinstance(current, dict):
current = current[str(key)]
elif isinstance(current, list):
current = current[int(key)]
else:
raise KeyError(f"Invalid key or index: {key}")
return current
@ -106,25 +121,36 @@ def resolve_nested_types(field_type: Any, parent_types: list[str]) -> list[tuple
return resolved_types
def configuration(values: dict) -> list[dict]:
def configuration(
model: type[PydanticBaseModel], values: dict, values_prefix: list[str] = []
) -> list[dict]:
"""Generate configuration details based on provided values and model metadata.
Args:
model (type[PydanticBaseModel]): The Pydantic model to extract configuration from.
values (dict): A dictionary containing the current configuration values.
values_prefix (list[str]): A list of parent type names that prefixes the model values in the values.
Returns:
List[dict]: A sorted list of configuration details, each represented as a dictionary.
list[dict]: A sorted list of configuration details, each represented as a dictionary.
"""
configs = []
inner_types: set[type[PydanticBaseModel]] = set()
for field_name, field_info in list(config_eos.model_fields.items()) + list(
config_eos.model_computed_fields.items()
for field_name, field_info in list(model.model_fields.items()) + list(
model.model_computed_fields.items()
):
def extract_nested_models(
subfield_info: Union[ComputedFieldInfo, FieldInfo], parent_types: list[str]
) -> None:
"""Extract nested models from the given subfield information.
Args:
subfield_info (Union[ComputedFieldInfo, FieldInfo]): Field metadata from Pydantic.
parent_types (list[str]): A list of parent type names for hierarchical representation.
"""
nonlocal values, values_prefix
regular_field = isinstance(subfield_info, FieldInfo)
subtype = subfield_info.annotation if regular_field else subfield_info.return_type
@ -141,9 +167,11 @@ def configuration(values: dict) -> list[dict]:
continue
config = {}
config["name"] = ".".join(parent_types)
config["value"] = str(get_nested_value(values, parent_types, "<unknown>"))
config["default"] = str(get_default_value(subfield_info, regular_field))
config["name"] = ".".join(values_prefix + parent_types)
config["value"] = json.dumps(
get_nested_value(values, values_prefix + parent_types, "<unknown>")
)
config["default"] = json.dumps(get_default_value(subfield_info, regular_field))
config["description"] = (
subfield_info.description if subfield_info.description else ""
)
@ -192,14 +220,188 @@ def get_configuration(eos_host: str, eos_port: Union[str, int]) -> list[dict]:
try:
result = requests.get(f"{server}/v1/config")
result.raise_for_status()
config = result.json()
except requests.exceptions.HTTPError as e:
config = {}
detail = result.json()["detail"]
warning_msg = f"Can not retrieve configuration from {server}: {e}, {detail}"
logger.warning(warning_msg)
return configuration({})
config = result.json()
return configuration(config)
return configuration(ConfigEOS, config)
def ConfigPlanesCard(
config_name: str,
config_type: str,
read_only: str,
value: str,
default: str,
description: str,
max_planes: int,
update_error: Optional[str],
update_value: Optional[str],
update_open: Optional[bool],
) -> Card:
"""Creates a styled configuration card for PV planes.
This function generates a configuration card that is displayed in the UI with
various sections such as configuration name, type, description, default value,
current value, and error details. It supports both read-only and editable modes.
Args:
config_name (str): The name of the PV planes configuration.
config_type (str): The type of the PV planes configuration.
read_only (str): Indicates if the PV planes configuration is read-only ("rw" for read-write,
any other value indicates read-only).
value (str): The current value of the PV planes configuration.
default (str): The default value of the PV planes configuration.
description (str): A description of the PV planes configuration.
max_planes (int): Maximum number of planes that can be set
update_error (Optional[str]): The error message, if any, during the update process.
update_value (Optional[str]): The value to be updated, if different from the current value.
update_open (Optional[bool]): A flag indicating whether the update section of the card
should be initially expanded.
Returns:
Card: A styled Card component containing the PV planes configuration details.
"""
config_id = config_name.replace(".", "-")
# Remember overall planes update status
planes_update_error = update_error
planes_update_value = update_value
if not planes_update_value:
planes_update_value = value
planes_update_open = update_open
if not planes_update_open:
planes_update_open = False
# Create EOS planes configuration
eos_planes = json.loads(value)
eos_planes_config = {
"pvforecast": {
"planes": eos_planes,
},
}
# Create cards for all planes
rows = []
for i in range(0, max_planes):
plane_config = configuration(
PVForecastPlaneSetting(),
eos_planes_config,
values_prefix=["pvforecast", "planes", str(i)],
)
plane_rows = []
plane_update_open = False
if eos_planes and len(eos_planes) > i:
plane_value = json.dumps(eos_planes[i])
else:
plane_value = json.dumps(None)
for config in plane_config:
update_error = config_update_latest.get(config["name"], {}).get("error") # type: ignore
update_value = config_update_latest.get(config["name"], {}).get("value") # type: ignore
update_open = config_update_latest.get(config["name"], {}).get("open") # type: ignore
if update_open:
planes_update_open = True
plane_update_open = True
# Make mypy happy - should never trigger
assert isinstance(update_error, (str, type(None)))
assert isinstance(update_value, (str, type(None)))
assert isinstance(update_open, (bool, type(None)))
plane_rows.append(
ConfigCard(
config["name"],
config["type"],
config["read-only"],
config["value"],
config["default"],
config["description"],
update_error,
update_value,
update_open,
)
)
rows.append(
Card(
Details(
Summary(
Grid(
Grid(
DivLAligned(
UkIcon(icon="play"),
H4(f"pvforecast.planes.{i}"),
),
DivRAligned(
P(read_only),
),
),
P(plane_value),
),
cls="list-none",
),
*plane_rows,
cls="space-y-4 gap-4",
open=plane_update_open,
),
cls="w-full",
)
)
return Card(
Details(
Summary(
Grid(
Grid(
DivLAligned(
UkIcon(icon="play"),
P(config_name),
),
DivRAligned(
P(read_only),
),
),
P(value),
),
cls="list-none",
),
Grid(
P(description),
P(config_type),
),
# Default
Grid(
DivRAligned(P("default")),
P(default),
)
if read_only == "rw"
else None,
# Set value
Grid(
DivRAligned(P("update")),
Grid(
Form(
Input(value=config_name, type="hidden", id="key"),
Input(value=planes_update_value, type="text", id="value"),
hx_put="/eosdash/configuration",
hx_target="#page-content",
hx_swap="innerHTML",
),
),
)
if read_only == "rw"
else None,
# Last error
Grid(
DivRAligned(P("update error")),
P(planes_update_error),
)
if planes_update_error
else None,
# Now come the single element configs
*rows,
cls="space-y-4 gap-4",
open=planes_update_open,
),
cls="w-full",
)
def Configuration(
@ -220,10 +422,19 @@ def Configuration(
configuration = get_configuration(eos_host, eos_port)
rows = []
last_category = ""
# find some special configuration values
max_planes = 0
for config in configuration:
if config["name"] == "pvforecast.max_planes":
try:
max_planes = int(config["value"])
except:
max_planes = 0
# build visual representation
for config in configuration:
category = config["name"].split(".")[0]
if category != last_category:
rows.append(P(category))
rows.append(H3(category))
rows.append(DividerLine())
last_category = category
update_error = config_update_latest.get(config["name"], {}).get("error")
@ -233,19 +444,39 @@ def Configuration(
assert isinstance(update_error, (str, type(None)))
assert isinstance(update_value, (str, type(None)))
assert isinstance(update_open, (bool, type(None)))
rows.append(
ConfigCard(
config["name"],
config["type"],
config["read-only"],
config["value"],
config["default"],
config["description"],
update_error,
update_value,
update_open,
if (
config["type"]
== "Optional[list[akkudoktoreos.prediction.pvforecast.PVForecastPlaneSetting]]"
):
# Special configuration for PV planes
rows.append(
ConfigPlanesCard(
config["name"],
config["type"],
config["read-only"],
config["value"],
config["default"],
config["description"],
max_planes,
update_error,
update_value,
update_open,
)
)
else:
rows.append(
ConfigCard(
config["name"],
config["type"],
config["read-only"],
config["value"],
config["default"],
config["description"],
update_error,
update_value,
update_open,
)
)
)
return Div(*rows, cls="space-y-4")
@ -272,15 +503,20 @@ def ConfigKeyUpdate(eos_host: str, eos_port: Union[str, int], key: str, value: s
data = value
error = None
result = None
config = None
try:
result = requests.put(f"{server}/v1/config/{path}", json=data)
result.raise_for_status()
response = requests.put(f"{server}/v1/config/{path}", json=data)
response.raise_for_status()
config = response.json()
except requests.exceptions.HTTPError as err:
if result:
detail = result.json()["detail"]
else:
detail = "No details"
try:
# Try to get 'detail' from the JSON response
detail = response.json().get(
"detail", f"No error details for data '{data}' '{response.text}'"
)
except ValueError:
# Response is not JSON
detail = f"No error details for data '{data}' '{response.text}'"
error = f"Can not set {key} on {server}: {err}, {detail}"
# Mark all updates as closed
for k in config_update_latest:
@ -288,12 +524,12 @@ def ConfigKeyUpdate(eos_host: str, eos_port: Union[str, int], key: str, value: s
# Remember this update as latest one
config_update_latest[key] = {
"error": error,
"result": result.json() if result else None,
"result": config,
"value": value,
"open": True,
}
if error or result is None:
if error or config is None:
# Reread configuration to be shure we display actual data
return Configuration(eos_host, eos_port)
# Use configuration already provided
return Configuration(eos_host, eos_port, configuration(result.json()))
return Configuration(eos_host, eos_port, configuration(ConfigEOS, config))

View File

@ -500,13 +500,13 @@ def fastapi_config_put_key(
configuration (ConfigEOS): The current configuration after the update.
"""
try:
config_eos.set_config_value(path, value)
except IndexError as e:
raise HTTPException(status_code=400, detail=str(e))
except KeyError as e:
raise HTTPException(status_code=404, detail=str(e))
config_eos.set_nested_value(path, value)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
trace = "".join(traceback.TracebackException.from_exception(e).format())
raise HTTPException(
status_code=400,
detail=f"Error on update of configuration '{path}','{value}': {e}\n{trace}",
)
return config_eos
@ -526,7 +526,7 @@ def fastapi_config_get_key(
value (Any): The value of the selected nested key.
"""
try:
return config_eos.get_config_value(path)
return config_eos.get_nested_value(path)
except IndexError as e:
raise HTTPException(status_code=400, detail=str(e))
except KeyError as e:

View File

@ -304,7 +304,7 @@ def test_config_common_settings_timezone_none_when_coordinates_missing():
[0.0, 0.375, 0.5, 0.625, 0.75, 0.875, 1.0],
)
],
ValueError,
TypeError,
),
# Invalid index (out of bound)
(
@ -316,7 +316,7 @@ def test_config_common_settings_timezone_none_when_coordinates_missing():
[0.0, 0.375, 0.5, 0.625, 0.75, 0.875, 1.0],
)
],
IndexError,
TypeError,
),
# Invalid index (no number)
(
@ -346,14 +346,27 @@ def test_config_common_settings_timezone_none_when_coordinates_missing():
)
def test_set_nested_key(path, value, expected, exception, config_eos):
if not exception:
config_eos.set_config_value(path, value)
config_eos.set_nested_value(path, value)
for expected_path, expected_value in expected:
assert eval(f"config_eos.{expected_path}") == expected_value
actual_value = eval(f"config_eos.{expected_path}")
assert actual_value == expected_value, (
f"Expected {expected_value} at {expected_path}, but got {actual_value}"
)
else:
with pytest.raises(exception):
config_eos.set_config_value(path, value)
for expected_path, expected_value in expected:
assert eval(f"config_eos.{expected_path}") == expected_value
try:
config_eos.set_nested_value(path, value)
for expected_path, expected_value in expected:
actual_value = eval(f"config_eos.{expected_path}")
assert actual_value == expected_value, (
f"Expected {expected_value} at {expected_path}, but got {actual_value}"
)
pytest.fail(
f"Expected exception {exception} but none was raised. Set '{expected_path}' to '{actual_value}'"
)
except Exception as e:
assert isinstance(e, exception), (
f"Expected exception {exception}, but got {type(e)}: {e}"
)
@pytest.mark.parametrize(
@ -378,10 +391,10 @@ def test_set_nested_key(path, value, expected, exception, config_eos):
)
def test_get_nested_key(path, expected_value, exception, config_eos):
if not exception:
assert config_eos.get_config_value(path) == expected_value
assert config_eos.get_nested_value(path) == expected_value
else:
with pytest.raises(exception):
config_eos.get_config_value(path)
config_eos.get_nested_value(path)
def test_merge_settings_from_dict_invalid(config_eos):

View File

@ -368,7 +368,7 @@ def test_to_datetime(
# print(f"Result: {result} tz={result.timezone}")
# print(f"Compare: {compare}")
if expected_approximately:
assert compare.time_diff < 200
assert compare.time_diff < 300
else:
assert compare.equal == True

105
tests/test_eosdashconfig.py Normal file
View File

@ -0,0 +1,105 @@
"""Test suite for the EOS Dash configuration module.
This module contains tests for utility functions related to retrieving and processing
configuration data using Pydantic models.
"""
import json
from pathlib import Path
from typing import Union
import pytest
from pydantic.fields import FieldInfo
from akkudoktoreos.core.pydantic import PydanticBaseModel
from akkudoktoreos.prediction.pvforecast import PVForecastPlaneSetting
from akkudoktoreos.server.dash.configuration import (
configuration,
get_default_value,
get_nested_value,
resolve_nested_types,
)
DIR_TESTDATA = Path(__file__).absolute().parent.joinpath("testdata")
FILE_TESTDATA_EOSSERVER_CONFIG_1 = DIR_TESTDATA.joinpath("eosserver_config_1.json")
class SampleModel(PydanticBaseModel):
field1: str = "default_value"
field2: int = 10
class TestEOSdashConfig:
"""Test case for EOS Dash configuration utility functions.
This class tests functions for retrieving nested values, extracting default values,
resolving nested types, and generating configuration details from Pydantic models.
"""
def test_get_nested_value_from_dict(self):
"""Test retrieving a nested value from a dictionary using a sequence of keys."""
data = {"a": {"b": {"c": 42}}}
assert get_nested_value(data, ["a", "b", "c"]) == 42
assert get_nested_value(data, ["a", "x"], default="not found") == "not found"
with pytest.raises(TypeError):
get_nested_value("not_a_dict", ["a"]) # type: ignore
def test_get_nested_value_from_list(self):
"""Test retrieving a nested value from a list using a sequence of keys."""
data = {"a": {"b": {"c": [42]}}}
assert get_nested_value(data, ["a", "b", "c", 0]) == 42
assert get_nested_value(data, ["a", "b", "c", "0"]) == 42
def test_get_default_value(self):
"""Test retrieving the default value of a field based on FieldInfo metadata."""
field_info = FieldInfo(default="test_value")
assert get_default_value(field_info, True) == "test_value"
field_info_no_default = FieldInfo()
assert get_default_value(field_info_no_default, True) == ""
assert get_default_value(field_info, False) == "N/A"
def test_resolve_nested_types(self):
"""Test resolving nested types within a field, ensuring correct type extraction."""
nested_types = resolve_nested_types(Union[int, str], [])
assert (int, []) in nested_types
assert (str, []) in nested_types
def test_configuration(self):
"""Test extracting configuration details from a Pydantic model based on provided values."""
values = {"field1": "custom_value", "field2": 20}
config = configuration(SampleModel, values)
assert any(
item["name"] == "field1" and item["value"] == '"custom_value"' for item in config
)
assert any(item["name"] == "field2" and item["value"] == "20" for item in config)
def test_configuration_eos(self, config_eos):
"""Test extracting EOS configuration details from EOS config based on provided values."""
with FILE_TESTDATA_EOSSERVER_CONFIG_1.open("r", encoding="utf-8", newline=None) as fd:
values = json.load(fd)
config = configuration(config_eos, values)
assert any(
item["name"] == "server.eosdash_port" and item["value"] == "8504" for item in config
)
assert any(
item["name"] == "server.eosdash_host" and item["value"] == '"0.0.0.0"'
for item in config
)
def test_configuration_pvforecast_plane_settings(self):
"""Test extracting EOS PV forecast plane configuration details from EOS config based on provided values."""
with FILE_TESTDATA_EOSSERVER_CONFIG_1.open("r", encoding="utf-8", newline=None) as fd:
values = json.load(fd)
config = configuration(
PVForecastPlaneSetting(), values, values_prefix=["pvforecast", "planes", "0"]
)
assert any(
item["name"] == "pvforecast.planes.0.surface_azimuth" and item["value"] == "-10"
for item in config
)
assert any(
item["name"] == "pvforecast.planes.0.userhorizon"
and item["value"] == "[20, 27, 22, 20]"
for item in config
)

View File

@ -138,23 +138,11 @@ def test_mixed_plane_configuration(settings):
assert settings.planes_peakpower == [5.0, 5000.0, 3.0]
def test_max_planes_limit(settings):
"""Test that the maximum number of planes is enforced."""
assert settings.max_planes == 6
# Create settings with more planes than allowed (should only recognize up to max)
plane_settings = [{"peakpower": 5.0} for _ in range(8)]
with pytest.raises(ValueError):
PVForecastCommonSettings(planes=plane_settings)
def test_invalid_plane_settings():
def test_none_plane_settings():
"""Test that optional parameters can be None for non-zero planes."""
with pytest.raises(ValueError):
PVForecastPlaneSetting(
peakpower=5.0,
albedo=None,
module_model=None,
userhorizon=None,
)
setting = PVForecastPlaneSetting(
peakpower=5.0,
albedo=None,
module_model=None,
userhorizon=None,
)

View File

@ -10,6 +10,7 @@ from akkudoktoreos.core.pydantic import (
PydanticDateTimeData,
PydanticDateTimeDataFrame,
PydanticDateTimeSeries,
PydanticModelNestedValueMixin,
)
from akkudoktoreos.utils.datetimeutil import compare_datetimes, to_datetime
@ -21,6 +22,129 @@ class PydanticTestModel(PydanticBaseModel):
optional_field: Optional[str] = Field(default=None, description="An optional field.")
class Address(PydanticBaseModel):
city: Optional[str] = None
postal_code: Optional[str] = None
class User(PydanticBaseModel):
name: str
addresses: Optional[list[Address]] = None
settings: Optional[dict[str, str]] = None
class TestPydanticModelNestedValueMixin:
"""Umbrella test class to group all test cases for `PydanticModelNestedValueMixin`."""
@pytest.fixture
def user_instance(self):
"""Fixture to initialize a sample User instance."""
return User(name="Alice", addresses=None, settings=None)
def test_get_key_types_for_simple_field(self):
"""Test _get_key_types for a simple string field."""
key_types = PydanticModelNestedValueMixin._get_key_types(User, "name")
assert key_types == [str], f"Expected [str], got {key_types}"
def test_get_key_types_for_list_of_models(self):
"""Test _get_key_types for a list of Address models."""
key_types = PydanticModelNestedValueMixin._get_key_types(User, "addresses")
assert key_types == [list, Address], f"Expected [list, Address], got {key_types}"
def test_get_key_types_for_dict_field(self):
"""Test _get_key_types for a dictionary field."""
key_types = PydanticModelNestedValueMixin._get_key_types(User, "settings")
assert key_types == [dict, str], f"Expected [dict, str], got {key_types}"
def test_get_key_types_for_optional_field(self):
"""Test _get_key_types correctly handles Optional fields."""
key_types = PydanticModelNestedValueMixin._get_key_types(Address, "city")
assert key_types == [str], f"Expected [str], got {key_types}"
def test_get_key_types_for_non_existent_field(self):
"""Test _get_key_types raises an error for non-existent field."""
with pytest.raises(TypeError):
PydanticModelNestedValueMixin._get_key_types(User, "unknown_field")
def test_set_nested_value_in_model(self, user_instance):
"""Test setting nested value in a model field (Address -> city)."""
assert user_instance.addresses is None
user_instance.set_nested_value("addresses/0/city", "New York")
assert user_instance.addresses is not None
assert user_instance.addresses[0].city == "New York", "The city should be set to 'New York'"
def test_set_nested_value_in_dict(self, user_instance):
"""Test setting nested value in a dictionary field (settings -> theme)."""
assert user_instance.settings is None
user_instance.set_nested_value("settings/theme", "dark")
assert user_instance.settings is not None
assert user_instance.settings["theme"] == "dark", "The theme should be set to 'dark'"
def test_set_nested_value_in_list(self, user_instance):
"""Test setting nested value in a list of models (addresses -> 1 -> city)."""
user_instance.set_nested_value("addresses/1/city", "Los Angeles")
# Check if the city in the second address is set correctly
assert user_instance.addresses[1].city == "Los Angeles", (
"The city at index 1 should be set to 'Los Angeles'"
)
def test_set_nested_value_in_optional_field(self, user_instance):
"""Test setting value in an Optional field (addresses)."""
user_instance.set_nested_value("addresses/0", Address(city="Chicago"))
# Check if the first address is set correctly
assert user_instance.addresses is not None
assert user_instance.addresses[0].city == "Chicago", "The city should be set to 'Chicago'"
def test_set_nested_value_with_empty_list(self):
"""Test setting value in an empty list of models."""
user = User(name="Bob", addresses=[])
user.set_nested_value("addresses/0/city", "Seattle")
assert user.addresses is not None
assert user.addresses[0].city == "Seattle", (
"The first address should have the city 'Seattle'"
)
def test_set_nested_value_with_missing_key_in_dict(self, user_instance):
"""Test setting value in a dict when the key does not exist."""
user_instance.set_nested_value("settings/language", "English")
assert user_instance.settings["language"] == "English", (
"The language setting should be 'English'"
)
def test_set_nested_value_for_non_existent_field(self):
"""Test attempting to set value for a non-existent field."""
user = User(name="John")
with pytest.raises(ValueError):
user.set_nested_value("non_existent_field", "Some Value")
def test_set_nested_value_with_invalid_type(self, user_instance):
"""Test setting value with an invalid type."""
with pytest.raises(ValueError):
user_instance.set_nested_value(
"addresses/0/city", 1234
) # city should be a string, not an integer
def test_set_nested_value_with_model_initialization(self):
"""Test setting a value in a model that should initialize a missing model."""
user = User(name="James", addresses=None)
user.set_nested_value("addresses/0/city", "Boston")
assert user.addresses is not None
assert user.addresses[0].city == "Boston", "The city should be set to 'Boston'"
assert isinstance(user.addresses[0], Address), (
"The first address should be an instance of Address"
)
class TestPydanticBaseModel:
def test_valid_pendulum_datetime(self):
dt = pendulum.now()