Improve Configuration and Prediction Usability (#220)

* Update utilities in utils submodule.
* Add base configuration modules.
* Add server base configuration modules.
* Add devices base configuration modules.
* Add optimization base configuration modules.
* Add utils base configuration modules.
* Add prediction abstract and base classes plus tests.
* Add PV forecast to prediction submodule.
   The PV forecast modules are adapted from the class_pvforecast module and
   replace it.
* Add weather forecast to prediction submodule.
   The modules provide classes and methods to retrieve, manage, and process weather forecast data
   from various sources. Includes are structured representations of weather data and utilities
   for fetching forecasts for specific locations and time ranges.
   BrightSky and ClearOutside are currently supported.
* Add electricity price forecast to prediction submodule.
* Adapt fastapi server to base config and add fasthtml server.
* Add ems to core submodule.
* Adapt genetic to config.
* Adapt visualize to config.
* Adapt common test fixtures to config.
* Add load forecast to prediction submodule.
* Add core abstract and base classes.
* Adapt single test optimization to config.
* Adapt devices to config.

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
This commit is contained in:
Bobby Noelte
2024-12-15 14:40:03 +01:00
committed by GitHub
parent a5e637ab4c
commit aa334d0b61
80 changed files with 29048 additions and 2451 deletions

View File

@@ -0,0 +1,11 @@
from typing import Optional
from pydantic import Field
from akkudoktoreos.config.configabc import SettingsBaseModel
class ElecPriceCommonSettings(SettingsBaseModel):
elecprice_provider: Optional[str] = Field(
"ElecPriceAkkudoktor", description="Electicity price provider id of provider to be used."
)

View File

@@ -0,0 +1,62 @@
"""Abstract and base classes for electricity price predictions.
Notes:
- Ensure appropriate API keys or configurations are set up if required by external data sources.
"""
from abc import abstractmethod
from typing import List, Optional
from pydantic import Field
from akkudoktoreos.prediction.predictionabc import PredictionProvider, PredictionRecord
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class ElecPriceDataRecord(PredictionRecord):
"""Represents a electricity price data record containing various price attributes at a specific datetime.
Attributes:
date_time (Optional[AwareDatetime]): The datetime of the record.
"""
elecprice_marketprice: Optional[float] = Field(
None, description="Electricity market price (€/KWh)"
)
class ElecPriceProvider(PredictionProvider):
"""Abstract base class for electricity price providers.
WeatherProvider is a thread-safe singleton, ensuring only one instance of this class is created.
Configuration variables:
electricity price_provider (str): Prediction provider for electricity price.
Attributes:
prediction_hours (int, optional): The number of hours into the future for which predictions are generated.
prediction_historic_hours (int, optional): The number of past hours for which historical data is retained.
latitude (float, optional): The latitude in degrees, must be within -90 to 90.
longitude (float, optional): The longitude in degrees, must be within -180 to 180.
start_datetime (datetime, optional): The starting datetime for predictions, defaults to the current datetime if unspecified.
end_datetime (datetime, computed): The datetime representing the end of the prediction range,
calculated based on `start_datetime` and `prediction_hours`.
keep_datetime (datetime, computed): The earliest datetime for retaining historical data, calculated
based on `start_datetime` and `prediction_historic_hours`.
"""
# overload
records: List[ElecPriceDataRecord] = Field(
default_factory=list, description="List of ElecPriceDataRecord records"
)
@classmethod
@abstractmethod
def provider_id(cls) -> str:
return "ElecPriceProvider"
def enabled(self) -> bool:
return self.provider_id() == self.config.elecprice_provider

View File

@@ -0,0 +1,164 @@
"""Retrieves and processes electricity price forecast data from Akkudoktor.
This module provides classes and mappings to manage electricity price data obtained from the
Akkudoktor API, including support for various electricity price attributes such as temperature,
humidity, cloud cover, and solar irradiance. The data is mapped to the `ElecPriceDataRecord`
format, enabling consistent access to forecasted and historical electricity price attributes.
"""
from typing import Any, List, Optional, Union
import requests
from pydantic import ValidationError
from akkudoktoreos.core.pydantic import PydanticBaseModel
from akkudoktoreos.prediction.elecpriceabc import ElecPriceDataRecord, ElecPriceProvider
from akkudoktoreos.utils.cacheutil import cache_in_file
from akkudoktoreos.utils.datetimeutil import compare_datetimes, to_datetime
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class AkkudoktorElecPriceMeta(PydanticBaseModel):
start_timestamp: int
end_timestamp: int
start: str
end: str
class AkkudoktorElecPriceValue(PydanticBaseModel):
start_timestamp: int
end_timestamp: int
start: str
end: str
marketprice: float
unit: str
marketpriceEurocentPerKWh: float
class AkkudoktorElecPrice(PydanticBaseModel):
meta: AkkudoktorElecPriceMeta
values: List[AkkudoktorElecPriceValue]
class ElecPriceAkkudoktor(ElecPriceProvider):
"""Fetch and process electricity price forecast data from Akkudoktor.
ElecPriceAkkudoktor is a singleton-based class that retrieves electricity price forecast data
from the Akkudoktor API and maps it to `ElecPriceDataRecord` fields, applying
any necessary scaling or unit corrections. It manages the forecast over a range
of hours into the future and retains historical data.
Attributes:
prediction_hours (int, optional): Number of hours in the future for the forecast.
prediction_historic_hours (int, optional): Number of past hours for retaining data.
start_datetime (datetime, optional): Start datetime for forecasts, defaults to the current datetime.
end_datetime (datetime, computed): The forecast's end datetime, computed based on `start_datetime` and `prediction_hours`.
keep_datetime (datetime, computed): The datetime to retain historical data, computed from `start_datetime` and `prediction_historic_hours`.
Methods:
provider_id(): Returns a unique identifier for the provider.
_request_forecast(): Fetches the forecast from the Akkudoktor API.
_update_data(): Processes and updates forecast data from Akkudoktor in ElecPriceDataRecord format.
"""
@classmethod
def provider_id(cls) -> str:
"""Return the unique identifier for the Akkudoktor provider."""
return "Akkudoktor"
@classmethod
def _validate_data(cls, json_str: Union[bytes, Any]) -> AkkudoktorElecPrice:
"""Validate Akkudoktor Electricity Price forecast data."""
try:
akkudoktor_data = AkkudoktorElecPrice.model_validate_json(json_str)
except ValidationError as e:
error_msg = ""
for error in e.errors():
field = " -> ".join(str(x) for x in error["loc"])
message = error["msg"]
error_type = error["type"]
error_msg += f"Field: {field}\nError: {message}\nType: {error_type}\n"
logger.error(f"Akkudoktor schema change: {error_msg}")
raise ValueError(error_msg)
return akkudoktor_data
@cache_in_file(with_ttl="1 hour")
def _request_forecast(self) -> AkkudoktorElecPrice:
"""Fetch electricity price forecast data from Akkudoktor API.
This method sends a request to Akkudoktor's API to retrieve forecast data for a specified
date range. The response data is parsed and returned as JSON for further processing.
Returns:
dict: The parsed JSON response from Akkudoktor API containing forecast data.
Raises:
ValueError: If the API response does not include expected `electricity price` data.
"""
source = "https://api.akkudoktor.net"
date = to_datetime(self.start_datetime, as_string="%Y-%m-%d")
last_date = to_datetime(self.end_datetime, as_string="%Y-%m-%d")
response = requests.get(
f"{source}/prices?date={date}&last_date={last_date}&tz={self.config.timezone}"
)
response.raise_for_status() # Raise an error for bad responses
logger.debug(f"Response from {source}: {response}")
akkudoktor_data = self._validate_data(response.content)
# We are working on fresh data (no cache), report update time
self.update_datetime = to_datetime(in_timezone=self.config.timezone)
return akkudoktor_data
def _update_data(self, force_update: Optional[bool] = False) -> None:
"""Update forecast data in the ElecPriceDataRecord format.
Retrieves data from Akkudoktor, maps each Akkudoktor field to the corresponding
`ElecPriceDataRecord` and applies any necessary scaling.
The final mapped and processed data is inserted into the sequence as `ElecPriceDataRecord`.
"""
# Get Akkudoktor electricity price data
akkudoktor_data = self._request_forecast(force_update=force_update) # type: ignore
# Assumption that all lists are the same length and are ordered chronologically
# in ascending order and have the same timestamps.
values_len = len(akkudoktor_data.values)
if values_len < 1:
# Expect one value set per prediction hour
raise ValueError(
f"The forecast must have at least one dataset, "
f"but only {values_len} data sets are given in forecast data."
)
previous_price = akkudoktor_data.values[0].marketpriceEurocentPerKWh
for i in range(values_len):
original_datetime = akkudoktor_data.values[i].start
dt = to_datetime(original_datetime, in_timezone=self.config.timezone)
if compare_datetimes(dt, self.start_datetime).le:
# forecast data is too old
previous_price = akkudoktor_data.values[i].marketpriceEurocentPerKWh
continue
record = ElecPriceDataRecord(
date_time=dt,
elecprice_marketprice=akkudoktor_data.values[i].marketpriceEurocentPerKWh,
)
self.append(record)
# Assure price starts at start_time
if compare_datetimes(self[0].date_time, self.start_datetime).gt:
record = ElecPriceDataRecord(
date_time=self.start_datetime,
elecprice_marketprice=previous_price,
)
self.insert(0, record)
# Assure price ends at end_time
if compare_datetimes(self[-1].date_time, self.end_datetime).lt:
record = ElecPriceDataRecord(
date_time=self.end_datetime,
elecprice_marketprice=self[-1].elecprice_marketprice,
)
self.append(record)
# If some of the hourly values are missing, they will be interpolated when using
# `key_to_array`.

View File

@@ -0,0 +1,68 @@
"""Retrieves elecprice forecast data from an import file.
This module provides classes and mappings to manage elecprice data obtained from
an import file, including support for various elecprice attributes such as temperature,
humidity, cloud cover, and solar irradiance. The data is mapped to the `ElecPriceDataRecord`
format, enabling consistent access to forecasted and historical elecprice attributes.
"""
from pathlib import Path
from typing import Optional, Union
from pydantic import Field, field_validator
from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.prediction.elecpriceabc import ElecPriceProvider
from akkudoktoreos.prediction.predictionabc import PredictionImportProvider
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class ElecPriceImportCommonSettings(SettingsBaseModel):
"""Common settings for elecprice data import from file."""
elecpriceimport_file_path: Optional[Union[str, Path]] = Field(
default=None, description="Path to the file to import elecprice data from."
)
elecpriceimport_json: Optional[str] = Field(
default=None,
description="JSON string, dictionary of electricity price forecast value lists.",
)
# Validators
@field_validator("elecpriceimport_file_path", mode="after")
@classmethod
def validate_elecpriceimport_file_path(
cls, value: Optional[Union[str, Path]]
) -> Optional[Path]:
if value is None:
return None
if isinstance(value, str):
value = Path(value)
"""Ensure file is available."""
value.resolve()
if not value.is_file():
raise ValueError(f"Import file path '{value}' is not a file.")
return value
class ElecPriceImport(ElecPriceProvider, PredictionImportProvider):
"""Fetch PV forecast data from import file or JSON string.
ElecPriceImport is a singleton-based class that retrieves elecprice forecast data
from a file or JSON string and maps it to `ElecPriceDataRecord` fields. It manages the forecast
over a range of hours into the future and retains historical data.
"""
@classmethod
def provider_id(cls) -> str:
"""Return the unique identifier for the ElecPriceImport provider."""
return "ElecPriceImport"
def _update_data(self, force_update: Optional[bool] = False) -> None:
if self.config.elecpriceimport_file_path is not None:
self.import_from_file(self.config.elecpriceimport_file_path, key_prefix="elecprice")
if self.config.elecpriceimport_json is not None:
self.import_from_json(self.config.elecpriceimport_json, key_prefix="elecprice")

View File

@@ -1,257 +0,0 @@
from datetime import datetime
from typing import Any, Dict, Optional, Union
import numpy as np
from pydantic import BaseModel, Field, field_validator, model_validator
from typing_extensions import Self
from akkudoktoreos.config import EOSConfig
from akkudoktoreos.devices.battery import PVAkku
from akkudoktoreos.devices.generic import HomeAppliance
from akkudoktoreos.devices.inverter import Wechselrichter
from akkudoktoreos.utils.utils import NumpyEncoder
class EnergieManagementSystemParameters(BaseModel):
pv_prognose_wh: list[float] = Field(
description="An array of floats representing the forecasted photovoltaic output in watts for different time intervals."
)
strompreis_euro_pro_wh: list[float] = Field(
description="An array of floats representing the electricity price in euros per watt-hour for different time intervals."
)
einspeiseverguetung_euro_pro_wh: list[float] | float = Field(
description="A float or array of floats representing the feed-in compensation in euros per watt-hour."
)
preis_euro_pro_wh_akku: float
gesamtlast: list[float] = Field(
description="An array of floats representing the total load (consumption) in watts for different time intervals."
)
@model_validator(mode="after")
def validate_list_length(self) -> Self:
pv_prognose_length = len(self.pv_prognose_wh)
if (
pv_prognose_length != len(self.strompreis_euro_pro_wh)
or pv_prognose_length != len(self.gesamtlast)
or (
isinstance(self.einspeiseverguetung_euro_pro_wh, list)
and pv_prognose_length != len(self.einspeiseverguetung_euro_pro_wh)
)
):
raise ValueError("Input lists have different lengths")
return self
class SimulationResult(BaseModel):
"""This object contains the results of the simulation and provides insights into various parameters over the entire forecast period."""
Last_Wh_pro_Stunde: list[Optional[float]] = Field(description="TBD")
EAuto_SoC_pro_Stunde: list[Optional[float]] = Field(
description="The state of charge of the EV for each hour."
)
Einnahmen_Euro_pro_Stunde: list[Optional[float]] = Field(
description="The revenue from grid feed-in or other sources in euros per hour."
)
Gesamt_Verluste: float = Field(
description="The total losses in watt-hours over the entire period."
)
Gesamtbilanz_Euro: float = Field(
description="The total balance of revenues minus costs in euros."
)
Gesamteinnahmen_Euro: float = Field(description="The total revenues in euros.")
Gesamtkosten_Euro: float = Field(description="The total costs in euros.")
Home_appliance_wh_per_hour: list[Optional[float]] = Field(
description="The energy consumption of a household appliance in watt-hours per hour."
)
Kosten_Euro_pro_Stunde: list[Optional[float]] = Field(
description="The costs in euros per hour."
)
Netzbezug_Wh_pro_Stunde: list[Optional[float]] = Field(
description="The grid energy drawn in watt-hours per hour."
)
Netzeinspeisung_Wh_pro_Stunde: list[Optional[float]] = Field(
description="The energy fed into the grid in watt-hours per hour."
)
Verluste_Pro_Stunde: list[Optional[float]] = Field(
description="The losses in watt-hours per hour."
)
akku_soc_pro_stunde: list[Optional[float]] = Field(
description="The state of charge of the battery (not the EV) in percentage per hour."
)
@field_validator(
"Last_Wh_pro_Stunde",
"Netzeinspeisung_Wh_pro_Stunde",
"akku_soc_pro_stunde",
"Netzbezug_Wh_pro_Stunde",
"Kosten_Euro_pro_Stunde",
"Einnahmen_Euro_pro_Stunde",
"EAuto_SoC_pro_Stunde",
"Verluste_Pro_Stunde",
"Home_appliance_wh_per_hour",
mode="before",
)
def convert_numpy(cls, field: Any) -> Any:
return NumpyEncoder.convert_numpy(field)[0]
class EnergieManagementSystem:
def __init__(
self,
config: EOSConfig,
parameters: EnergieManagementSystemParameters,
wechselrichter: Wechselrichter,
eauto: Optional[PVAkku] = None,
home_appliance: Optional[HomeAppliance] = None,
):
self.akku = wechselrichter.akku
self.gesamtlast = np.array(parameters.gesamtlast, float)
self.pv_prognose_wh = np.array(parameters.pv_prognose_wh, float)
self.strompreis_euro_pro_wh = np.array(parameters.strompreis_euro_pro_wh, float)
self.einspeiseverguetung_euro_pro_wh_arr = (
parameters.einspeiseverguetung_euro_pro_wh
if isinstance(parameters.einspeiseverguetung_euro_pro_wh, list)
else np.full(len(self.gesamtlast), parameters.einspeiseverguetung_euro_pro_wh, float)
)
self.eauto = eauto
self.home_appliance = home_appliance
self.wechselrichter = wechselrichter
self.ac_charge_hours = np.full(config.prediction_hours, 0)
self.dc_charge_hours = np.full(config.prediction_hours, 1)
self.ev_charge_hours = np.full(config.prediction_hours, 0)
def set_akku_discharge_hours(self, ds: np.ndarray) -> None:
self.akku.set_discharge_per_hour(ds)
def set_akku_ac_charge_hours(self, ds: np.ndarray) -> None:
self.ac_charge_hours = ds
def set_akku_dc_charge_hours(self, ds: np.ndarray) -> None:
self.dc_charge_hours = ds
def set_ev_charge_hours(self, ds: np.ndarray) -> None:
self.ev_charge_hours = ds
def set_home_appliance_start(self, start_hour: int, global_start_hour: int = 0) -> None:
assert self.home_appliance is not None
self.home_appliance.set_starting_time(start_hour, global_start_hour=global_start_hour)
def reset(self) -> None:
if self.eauto:
self.eauto.reset()
self.akku.reset()
def simuliere_ab_jetzt(self) -> dict[str, Any]:
jetzt = datetime.now()
start_stunde = jetzt.hour
return self.simuliere(start_stunde)
def simuliere(self, start_stunde: int) -> dict[str, Any]:
"""hour.
akku_soc_pro_stunde begin of the hour, initial hour state!
last_wh_pro_stunde integral of last hour (end state)
"""
lastkurve_wh = self.gesamtlast
assert (
len(lastkurve_wh) == len(self.pv_prognose_wh) == len(self.strompreis_euro_pro_wh)
), f"Array sizes do not match: Load Curve = {len(lastkurve_wh)}, PV Forecast = {len(self.pv_prognose_wh)}, Electricity Price = {len(self.strompreis_euro_pro_wh)}"
# Optimized total hours calculation
ende = len(lastkurve_wh)
total_hours = ende - start_stunde
# Pre-allocate arrays for the results, optimized for speed
last_wh_pro_stunde = np.full((total_hours), np.nan)
netzeinspeisung_wh_pro_stunde = np.full((total_hours), np.nan)
netzbezug_wh_pro_stunde = np.full((total_hours), np.nan)
kosten_euro_pro_stunde = np.full((total_hours), np.nan)
einnahmen_euro_pro_stunde = np.full((total_hours), np.nan)
akku_soc_pro_stunde = np.full((total_hours), np.nan)
eauto_soc_pro_stunde = np.full((total_hours), np.nan)
verluste_wh_pro_stunde = np.full((total_hours), np.nan)
home_appliance_wh_per_hour = np.full((total_hours), np.nan)
# Set initial state
akku_soc_pro_stunde[0] = self.akku.ladezustand_in_prozent()
if self.eauto:
eauto_soc_pro_stunde[0] = self.eauto.ladezustand_in_prozent()
for stunde in range(start_stunde, ende):
stunde_since_now = stunde - start_stunde
# Accumulate loads and PV generation
verbrauch = self.gesamtlast[stunde]
verluste_wh_pro_stunde[stunde_since_now] = 0.0
if self.home_appliance:
ha_load = self.home_appliance.get_load_for_hour(stunde)
verbrauch += ha_load
home_appliance_wh_per_hour[stunde_since_now] = ha_load
# E-Auto handling
if self.eauto and self.ev_charge_hours[stunde] > 0:
geladene_menge_eauto, verluste_eauto = self.eauto.energie_laden(
None, stunde, relative_power=self.ev_charge_hours[stunde]
)
verbrauch += geladene_menge_eauto
verluste_wh_pro_stunde[stunde_since_now] += verluste_eauto
if self.eauto:
eauto_soc_pro_stunde[stunde_since_now] = self.eauto.ladezustand_in_prozent()
# Process inverter logic
erzeugung = self.pv_prognose_wh[stunde]
self.akku.set_charge_allowed_for_hour(self.dc_charge_hours[stunde], stunde)
netzeinspeisung, netzbezug, verluste, eigenverbrauch = (
self.wechselrichter.energie_verarbeiten(erzeugung, verbrauch, stunde)
)
# AC PV Battery Charge
if self.ac_charge_hours[stunde] > 0.0:
self.akku.set_charge_allowed_for_hour(1, stunde)
geladene_menge, verluste_wh = self.akku.energie_laden(
None, stunde, relative_power=self.ac_charge_hours[stunde]
)
# print(stunde, " ", geladene_menge, " ",self.ac_charge_hours[stunde]," ",self.akku.ladezustand_in_prozent())
verbrauch += geladene_menge
verbrauch += verluste_wh
netzbezug += geladene_menge
netzbezug += verluste_wh
verluste_wh_pro_stunde[stunde_since_now] += verluste_wh
netzeinspeisung_wh_pro_stunde[stunde_since_now] = netzeinspeisung
netzbezug_wh_pro_stunde[stunde_since_now] = netzbezug
verluste_wh_pro_stunde[stunde_since_now] += verluste
last_wh_pro_stunde[stunde_since_now] = verbrauch
# Financial calculations
kosten_euro_pro_stunde[stunde_since_now] = (
netzbezug * self.strompreis_euro_pro_wh[stunde]
)
einnahmen_euro_pro_stunde[stunde_since_now] = (
netzeinspeisung * self.einspeiseverguetung_euro_pro_wh_arr[stunde]
)
# Akku SOC tracking
akku_soc_pro_stunde[stunde_since_now] = self.akku.ladezustand_in_prozent()
# Total cost and return
gesamtkosten_euro = np.nansum(kosten_euro_pro_stunde) - np.nansum(einnahmen_euro_pro_stunde)
# Prepare output dictionary
out: Dict[str, Union[np.ndarray, float]] = {
"Last_Wh_pro_Stunde": last_wh_pro_stunde,
"Netzeinspeisung_Wh_pro_Stunde": netzeinspeisung_wh_pro_stunde,
"Netzbezug_Wh_pro_Stunde": netzbezug_wh_pro_stunde,
"Kosten_Euro_pro_Stunde": kosten_euro_pro_stunde,
"akku_soc_pro_stunde": akku_soc_pro_stunde,
"Einnahmen_Euro_pro_Stunde": einnahmen_euro_pro_stunde,
"Gesamtbilanz_Euro": gesamtkosten_euro,
"EAuto_SoC_pro_Stunde": eauto_soc_pro_stunde,
"Gesamteinnahmen_Euro": np.nansum(einnahmen_euro_pro_stunde),
"Gesamtkosten_Euro": np.nansum(kosten_euro_pro_stunde),
"Verluste_Pro_Stunde": verluste_wh_pro_stunde,
"Gesamt_Verluste": np.nansum(verluste_wh_pro_stunde),
"Home_appliance_wh_per_hour": home_appliance_wh_per_hour,
}
return out

View File

@@ -0,0 +1,61 @@
"""Load forecast module for load predictions."""
from typing import Optional, Set
from pydantic import Field, computed_field
from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class LoadCommonSettings(SettingsBaseModel):
# Load 0
load0_provider: Optional[str] = Field(
default=None, description="Load provider id of provider to be used."
)
load0_name: Optional[str] = Field(default=None, description="Name of the load source.")
# Load 1
load1_provider: Optional[str] = Field(
default=None, description="Load provider id of provider to be used."
)
load1_name: Optional[str] = Field(default=None, description="Name of the load source.")
# Load 2
load2_provider: Optional[str] = Field(
default=None, description="Load provider id of provider to be used."
)
load2_name: Optional[str] = Field(default=None, description="Name of the load source.")
# Load 3
load3_provider: Optional[str] = Field(
default=None, description="Load provider id of provider to be used."
)
load3_name: Optional[str] = Field(default=None, description="Name of the load source.")
# Load 4
load4_provider: Optional[str] = Field(
default=None, description="Load provider id of provider to be used."
)
load4_name: Optional[str] = Field(default=None, description="Name of the load source.")
# Computed fields
@computed_field # type: ignore[prop-decorator]
@property
def load_count(self) -> int:
"""Maximum number of loads."""
return 5
@computed_field # type: ignore[prop-decorator]
@property
def load_providers(self) -> Set[str]:
"""Load providers."""
providers = []
for i in range(self.load_count):
load_provider_attr = f"load{i}_provider"
value = getattr(self, load_provider_attr)
if value:
providers.append(value)
return set(providers)

View File

@@ -0,0 +1,102 @@
"""Abstract and base classes for load predictions.
Notes:
- Ensure appropriate API keys or configurations are set up if required by external data sources.
"""
from abc import abstractmethod
from typing import List, Optional
from pydantic import Field, computed_field
from akkudoktoreos.prediction.predictionabc import PredictionProvider, PredictionRecord
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class LoadDataRecord(PredictionRecord):
"""Represents a load data record containing various load attributes at a specific datetime."""
load0_mean: Optional[float] = Field(default=None, description="Load 0 mean value (W)")
load0_std: Optional[float] = Field(default=None, description="Load 0 standard deviation (W)")
load1_mean: Optional[float] = Field(default=None, description="Load 1 mean value (W)")
load1_std: Optional[float] = Field(default=None, description="Load 1 standard deviation (W)")
load2_mean: Optional[float] = Field(default=None, description="Load 2 mean value (W)")
load2_std: Optional[float] = Field(default=None, description="Load 2 standard deviation (W)")
load3_mean: Optional[float] = Field(default=None, description="Load 3 mean value (W)")
load3_std: Optional[float] = Field(default=None, description="Load 3 standard deviation (W)")
load4_mean: Optional[float] = Field(default=None, description="Load 4 mean value (W)")
load4_std: Optional[float] = Field(default=None, description="Load 4 standard deviation (W)")
# Computed fields
@computed_field # type: ignore[prop-decorator]
@property
def load_total_mean(self) -> float:
"""Total load mean value (W)."""
total_mean = 0.0
for i in range(5):
load_mean_attr = f"load{i}_mean"
value = getattr(self, load_mean_attr)
if value:
total_mean += value
return total_mean
@computed_field # type: ignore[prop-decorator]
@property
def load_total_std(self) -> float:
"""Total load standard deviation (W)."""
total_std = 0.0
for i in range(5):
load_std_attr = f"load{i}_std"
value = getattr(self, load_std_attr)
if value:
total_std += value
return total_std
class LoadProvider(PredictionProvider):
"""Abstract base class for load providers.
LoadProvider is a thread-safe singleton, ensuring only one instance of this class is created.
Configuration variables:
load_provider (str): Prediction provider for load.
Attributes:
prediction_hours (int, optional): The number of hours into the future for which predictions are generated.
prediction_historic_hours (int, optional): The number of past hours for which historical data is retained.
latitude (float, optional): The latitude in degrees, must be within -90 to 90.
longitude (float, optional): The longitude in degrees, must be within -180 to 180.
start_datetime (datetime, optional): The starting datetime for predictions, defaults to the current datetime if unspecified.
end_datetime (datetime, computed): The datetime representing the end of the prediction range,
calculated based on `start_datetime` and `prediction_hours`.
keep_datetime (datetime, computed): The earliest datetime for retaining historical data, calculated
based on `start_datetime` and `prediction_historic_hours`.
"""
# overload
records: List[LoadDataRecord] = Field(
default_factory=list, description="List of LoadDataRecord records"
)
@classmethod
@abstractmethod
def provider_id(cls) -> str:
return "LoadProvider"
def enabled(self) -> bool:
logger.debug(
f"LoadProvider ID {self.provider_id()} vs. config {self.config.load_providers}"
)
return self.provider_id() == self.config.load_providers
def loads(self) -> List[str]:
"""Returns a list of key prefixes of the loads managed by this provider."""
loads_prefix = []
for i in range(self.config.load_count):
load_provider_attr = f"load{i}_provider"
value = getattr(self.config, load_provider_attr)
if value == self.provider_id():
loads_prefix.append(f"load{i}")
return loads_prefix

View File

@@ -0,0 +1,69 @@
"""Retrieves load forecast data from Akkudoktor load profiles."""
from pathlib import Path
from typing import Optional
import numpy as np
from pydantic import Field
from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.prediction.loadabc import LoadProvider
from akkudoktoreos.utils.datetimeutil import to_datetime, to_duration
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class LoadAkkudoktorCommonSettings(SettingsBaseModel):
"""Common settings for load data import from file."""
loadakkudoktor_year_energy: Optional[float] = Field(
default=None, description="Yearly energy consumption (kWh)."
)
class LoadAkkudoktor(LoadProvider):
"""Fetch Load forecast data from Akkudoktor load profiles."""
@classmethod
def provider_id(cls) -> str:
"""Return the unique identifier for the LoadAkkudoktor provider."""
return "LoadAkkudoktor"
def load_data(self) -> np.ndarray:
"""Loads data from the Akkudoktor load file."""
load_file = Path(__file__).parent.parent.joinpath("data/load_profiles.npz")
data_year_energy = None
try:
file_data = np.load(load_file)
profile_data = np.array(
list(zip(file_data["yearly_profiles"], file_data["yearly_profiles_std"]))
)
data_year_energy = profile_data * self.config.loadakkudoktor_year_energy
# pprint(self.data_year_energy)
except FileNotFoundError:
error_msg = f"Error: File {load_file} not found."
logger.error(error_msg)
raise FileNotFoundError(error_msg)
except Exception as e:
error_msg = f"An error occurred while loading data: {e}"
logger.error(error_msg)
raise ValueError(error_msg)
return data_year_energy
def _update_data(self, force_update: Optional[bool] = False) -> None:
"""Adds the load means and standard deviations."""
data_year_energy = self.load_data()
for load in self.loads():
attr_load_mean = f"{load}_mean"
attr_load_std = f"{load}_std"
date = self.start_datetime
for i in range(self.config.prediction_hours):
# Extract mean and standard deviation for the given day and hour
# Day indexing starts at 0, -1 because of that
hourly_stats = data_year_energy[date.day_of_year - 1, :, date.hour]
self.update_value(date, attr_load_mean, hourly_stats[0])
self.update_value(date, attr_load_std, hourly_stats[1])
date += to_duration("1 hour")
# We are working on fresh data (no cache), report update time
self.update_datetime = to_datetime(in_timezone=self.config.timezone)

View File

@@ -0,0 +1,100 @@
"""Retrieves load forecast data from an import file.
This module provides classes and mappings to manage load data obtained from
an import file, including support for various load attributes such as temperature,
humidity, cloud cover, and solar irradiance. The data is mapped to the `LoadDataRecord`
format, enabling consistent access to forecasted and historical load attributes.
"""
from pathlib import Path
from typing import Optional, Union
from pydantic import Field, field_validator
from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.prediction.loadabc import LoadProvider
from akkudoktoreos.prediction.predictionabc import PredictionImportProvider
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class LoadImportCommonSettings(SettingsBaseModel):
"""Common settings for load data import from file."""
load0_import_file_path: Optional[Union[str, Path]] = Field(
default=None, description="Path to the file to import load data from."
)
load0_import_json: Optional[str] = Field(
default=None, description="JSON string, dictionary of load forecast value lists."
)
load1_import_file_path: Optional[Union[str, Path]] = Field(
default=None, description="Path to the file to import load data from."
)
load1_import_json: Optional[str] = Field(
default=None, description="JSON string, dictionary of load forecast value lists."
)
load2_import_file_path: Optional[Union[str, Path]] = Field(
default=None, description="Path to the file to import load data from."
)
load2_import_json: Optional[str] = Field(
default=None, description="JSON string, dictionary of load forecast value lists."
)
load3_import_file_path: Optional[Union[str, Path]] = Field(
default=None, description="Path to the file to import load data from."
)
load3_import_json: Optional[str] = Field(
default=None, description="JSON string, dictionary of load forecast value lists."
)
load4_import_file_path: Optional[Union[str, Path]] = Field(
default=None, description="Path to the file to import load data from."
)
load4_import_json: Optional[str] = Field(
default=None, description="JSON string, dictionary of load forecast value lists."
)
# Validators
@field_validator(
"load0_import_file_path",
"load1_import_file_path",
"load2_import_file_path",
"load3_import_file_path",
"load4_import_file_path",
mode="after",
)
@classmethod
def validate_loadimport_file_path(cls, value: Optional[Union[str, Path]]) -> Optional[Path]:
if value is None:
return None
if isinstance(value, str):
value = Path(value)
"""Ensure file is available."""
value.resolve()
if not value.is_file():
raise ValueError(f"Import file path '{value}' is not a file.")
return value
class LoadImport(LoadProvider, PredictionImportProvider):
"""Fetch Load data from import file or JSON string.
LoadImport is a singleton-based class that retrieves load forecast data
from a file or JSON string and maps it to `LoadDataRecord` fields. It manages the forecast
over a range of hours into the future and retains historical data.
"""
@classmethod
def provider_id(cls) -> str:
"""Return the unique identifier for the LoadImport provider."""
return "LoadImport"
def _update_data(self, force_update: Optional[bool] = False) -> None:
for load in self.loads():
attr_file_path = f"{load}_import_file_path"
attr_json = f"{load}_import_json"
import_file_path = getattr(self.config, attr_file_path)
if import_file_path is not None:
self.import_from_file(import_file_path, key_prefix=load)
import_json = getattr(self.config, attr_json)
if import_json is not None:
self.import_from_json(import_json, key_prefix=load)

View File

@@ -0,0 +1,173 @@
"""Prediction module for weather and photovoltaic forecasts.
This module provides a `Prediction` class to manage and update a sequence of
prediction providers. The `Prediction` class is a subclass of `PredictionContainer`
and is initialized with a set of forecast providers, such as `WeatherBrightSky`,
`WeatherClearOutside`, and `PVForecastAkkudoktor`.
Usage:
Instantiate the `Prediction` class with the required providers, maintaining
the necessary order. Then call the `update` method to refresh forecasts from
all providers in sequence.
Example:
# Create singleton prediction instance with prediction providers
from akkudoktoreos.prediction.prediction import prediction
prediction.update_data()
print("Prediction:", prediction)
Classes:
Prediction: Manages a list of forecast providers to fetch and update predictions.
Attributes:
pvforecast_akkudoktor (PVForecastAkkudoktor): Forecast provider for photovoltaic data.
weather_brightsky (WeatherBrightSky): Weather forecast provider using BrightSky.
weather_clearoutside (WeatherClearOutside): Weather forecast provider using ClearOutside.
"""
from typing import List, Optional, Union
from pydantic import Field, computed_field
from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.prediction.elecpriceakkudoktor import ElecPriceAkkudoktor
from akkudoktoreos.prediction.elecpriceimport import ElecPriceImport
from akkudoktoreos.prediction.loadakkudoktor import LoadAkkudoktor
from akkudoktoreos.prediction.loadimport import LoadImport
from akkudoktoreos.prediction.predictionabc import PredictionContainer
from akkudoktoreos.prediction.pvforecastakkudoktor import PVForecastAkkudoktor
from akkudoktoreos.prediction.pvforecastimport import PVForecastImport
from akkudoktoreos.prediction.weatherbrightsky import WeatherBrightSky
from akkudoktoreos.prediction.weatherclearoutside import WeatherClearOutside
from akkudoktoreos.prediction.weatherimport import WeatherImport
from akkudoktoreos.utils.datetimeutil import to_timezone
class PredictionCommonSettings(SettingsBaseModel):
"""Base configuration for prediction settings, including forecast duration, geographic location, and time zone.
This class provides configuration for prediction settings, allowing users to specify
parameters such as the forecast duration (in hours) and location (latitude and longitude).
Validators ensure each parameter is within a specified range. A computed property, `timezone`,
determines the time zone based on latitude and longitude.
Attributes:
prediction_hours (Optional[int]): Number of hours into the future for predictions.
Must be non-negative.
prediction_historic_hours (Optional[int]): Number of hours into the past for historical data.
Must be non-negative.
latitude (Optional[float]): Latitude in degrees, must be between -90 and 90.
longitude (Optional[float]): Longitude in degrees, must be between -180 and 180.
Properties:
timezone (Optional[str]): Computed time zone string based on the specified latitude
and longitude.
Validators:
validate_prediction_hours (int): Ensures `prediction_hours` is a non-negative integer.
validate_prediction_historic_hours (int): Ensures `prediction_historic_hours` is a non-negative integer.
validate_latitude (float): Ensures `latitude` is within the range -90 to 90.
validate_longitude (float): Ensures `longitude` is within the range -180 to 180.
"""
prediction_hours: Optional[int] = Field(
default=48, ge=0, description="Number of hours into the future for predictions"
)
prediction_historic_hours: Optional[int] = Field(
default=48,
ge=0,
description="Number of hours into the past for historical predictions data",
)
latitude: Optional[float] = Field(
default=None,
ge=-90.0,
le=90.0,
description="Latitude in decimal degrees, between -90 and 90, north is positive (ISO 19115) (°)",
)
longitude: Optional[float] = Field(
default=None,
ge=-180.0,
le=180.0,
description="Longitude in decimal degrees, within -180 to 180 (°)",
)
# Computed fields
@computed_field # type: ignore[prop-decorator]
@property
def timezone(self) -> Optional[str]:
"""Compute timezone based on latitude and longitude."""
if self.latitude and self.longitude:
return to_timezone(location=(self.latitude, self.longitude), as_string=True)
return None
class Prediction(PredictionContainer):
"""Prediction container to manage multiple prediction providers.
Attributes:
providers (List[Union[PVForecastAkkudoktor, WeatherBrightSky, WeatherClearOutside]]):
List of forecast provider instances, in the order they should be updated.
Providers may depend on updates from others.
"""
providers: List[
Union[
ElecPriceAkkudoktor,
ElecPriceImport,
LoadAkkudoktor,
LoadImport,
PVForecastAkkudoktor,
PVForecastImport,
WeatherBrightSky,
WeatherClearOutside,
WeatherImport,
]
] = Field(default_factory=list, description="List of prediction providers")
# Initialize forecast providers, all are singletons.
elecprice_akkudoktor = ElecPriceAkkudoktor()
elecprice_import = ElecPriceImport()
load_akkudoktor = LoadAkkudoktor()
load_import = LoadImport()
pvforecast_akkudoktor = PVForecastAkkudoktor()
pvforecast_import = PVForecastImport()
weather_brightsky = WeatherBrightSky()
weather_clearoutside = WeatherClearOutside()
weather_import = WeatherImport()
def get_prediction() -> Prediction:
"""Gets the EOS prediction data."""
# Initialize Prediction instance with providers in the required order
# Care for provider sequence as providers may rely on others to be updated before.
prediction = Prediction(
providers=[
elecprice_akkudoktor,
elecprice_import,
load_akkudoktor,
load_import,
pvforecast_akkudoktor,
pvforecast_import,
weather_brightsky,
weather_clearoutside,
weather_import,
]
)
return prediction
def main() -> None:
"""Main function to update and display predictions.
This function initializes and updates the forecast providers in sequence
according to the `Prediction` instance, then prints the updated prediction data.
"""
prediction = get_prediction()
prediction.update_data()
print(f"Prediction: {prediction}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,242 @@
"""Abstract and base classes for predictions.
This module provides classes for managing and processing prediction data in a flexible, configurable manner.
It includes classes to handle configurations, record structures, sequences, and containers for prediction data,
enabling efficient storage, retrieval, and manipulation of prediction records.
This module is designed for use in predictive modeling workflows, facilitating the organization, serialization,
and manipulation of configuration and prediction data in a clear, scalable, and structured manner.
"""
from typing import List, Optional
from pendulum import DateTime
from pydantic import Field, computed_field
from akkudoktoreos.core.dataabc import (
DataBase,
DataContainer,
DataImportProvider,
DataProvider,
DataRecord,
DataSequence,
)
from akkudoktoreos.utils.datetimeutil import to_duration
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class PredictionBase(DataBase):
"""Base class for handling prediction data.
Enables access to EOS configuration data (attribute `config`).
"""
pass
class PredictionRecord(DataRecord):
"""Base class for prediction records, enabling dynamic access to fields defined in derived classes.
Fields can be accessed and mutated both using dictionary-style access (`record['field_name']`)
and attribute-style access (`record.field_name`).
Attributes:
date_time (Optional[AwareDatetime]): Aware datetime indicating when the prediction record applies.
Configurations:
- Allows mutation after creation.
- Supports non-standard data types like `datetime`.
"""
pass
class PredictionSequence(DataSequence):
"""A managed sequence of PredictionRecord instances with list-like behavior.
The PredictionSequence class provides an ordered, mutable collection of PredictionRecord
instances, allowing list-style access for adding, deleting, and retrieving records. It also
supports advanced data operations such as JSON serialization, conversion to Pandas Series,
and sorting by timestamp.
Attributes:
records (List[PredictionRecord]): A list of PredictionRecord instances representing
individual prediction data points.
record_keys (Optional[List[str]]): A list of field names (keys) expected in each
PredictionRecord.
Note:
Derived classes have to provide their own records field with correct record type set.
Usage:
# Example of creating, adding, and using PredictionSequence
class DerivedSequence(PredictionSquence):
records: List[DerivedPredictionRecord] = Field(default_factory=list,
description="List of prediction records")
seq = DerivedSequence()
seq.insert(DerivedPredictionRecord(date_time=datetime.now(), temperature=72))
seq.insert(DerivedPredictionRecord(date_time=datetime.now(), temperature=75))
# Convert to JSON and back
json_data = seq.to_json()
new_seq = DerivedSequence.from_json(json_data)
# Convert to Pandas Series
series = seq.key_to_series('temperature')
"""
# To be overloaded by derived classes.
records: List[PredictionRecord] = Field(
default_factory=list, description="List of prediction records"
)
class PredictionStartEndKeepMixin(PredictionBase):
"""A mixin to manage start, end, and historical retention datetimes for prediction data.
The starting datetime for prediction data generation is provided by the energy management
system. Predictions cannot be computed if this value is `None`.
"""
# Computed field for end_datetime and keep_datetime
@computed_field # type: ignore[prop-decorator]
@property
def end_datetime(self) -> Optional[DateTime]:
"""Compute the end datetime based on the `start_datetime` and `prediction_hours`.
Ajusts the calculated end time if DST transitions occur within the prediction window.
Returns:
Optional[DateTime]: The calculated end datetime, or `None` if inputs are missing.
"""
if self.start_datetime and self.config.prediction_hours:
end_datetime = self.start_datetime + to_duration(
f"{self.config.prediction_hours} hours"
)
dst_change = end_datetime.offset_hours - self.start_datetime.offset_hours
logger.debug(f"Pre: {self.start_datetime}..{end_datetime}: DST change: {dst_change}")
if dst_change < 0:
end_datetime = end_datetime + to_duration(f"{abs(int(dst_change))} hours")
elif dst_change > 0:
end_datetime = end_datetime - to_duration(f"{abs(int(dst_change))} hours")
logger.debug(f"Pst: {self.start_datetime}..{end_datetime}: DST change: {dst_change}")
return end_datetime
return None
@computed_field # type: ignore[prop-decorator]
@property
def keep_datetime(self) -> Optional[DateTime]:
"""Compute the keep datetime for historical data retention.
Returns:
Optional[DateTime]: The calculated retention cutoff datetime, or `None` if inputs are missing.
"""
if self.start_datetime and self.config.prediction_historic_hours:
return self.start_datetime - to_duration(
f"{int(self.config.prediction_historic_hours)} hours"
)
return None
@computed_field # type: ignore[prop-decorator]
@property
def total_hours(self) -> Optional[int]:
"""Compute the hours from `start_datetime` to `end_datetime`.
Returns:
Optional[pendulum.period]: The duration hours, or `None` if either datetime is unavailable.
"""
end_dt = self.end_datetime
if end_dt is None:
return None
duration = end_dt - self.start_datetime
return int(duration.total_hours())
@computed_field # type: ignore[prop-decorator]
@property
def keep_hours(self) -> Optional[int]:
"""Compute the hours from `keep_datetime` to `start_datetime`.
Returns:
Optional[pendulum.period]: The duration hours, or `None` if either datetime is unavailable.
"""
keep_dt = self.keep_datetime
if keep_dt is None:
return None
duration = self.start_datetime - keep_dt
return int(duration.total_hours())
class PredictionProvider(PredictionStartEndKeepMixin, DataProvider):
"""Abstract base class for prediction providers with singleton thread-safety and configurable prediction parameters.
This class serves as a base for managing prediction data, providing an interface for derived
classes to maintain a single instance across threads. It offers attributes for managing
prediction and historical data retention.
Note:
Derived classes have to provide their own records field with correct record type set.
"""
def update_data(
self,
force_enable: Optional[bool] = False,
force_update: Optional[bool] = False,
) -> None:
"""Update prediction parameters and call the custom update function.
Updates the configuration, deletes outdated records, and performs the custom update logic.
Args:
force_enable (bool, optional): If True, forces the update even if the provider is disabled.
force_update (bool, optional): If True, forces the provider to update the data even if still cached.
"""
# Update prediction configuration
self.config.update()
# Check after configuration is updated.
if not force_enable and not self.enabled():
return
# Delete outdated records before updating
self.delete_by_datetime(end_datetime=self.keep_datetime)
# Call the custom update logic
self._update_data(force_update=force_update)
# Assure records are sorted.
self.sort_by_datetime()
class PredictionImportProvider(PredictionProvider, DataImportProvider):
"""Abstract base class for prediction providers that import prediction data.
This class is designed to handle prediction data provided in the form of a key-value dictionary.
- **Keys**: Represent identifiers from the record keys of a specific prediction.
- **Values**: Are lists of prediction values starting at a specified `start_datetime`, where
each value corresponds to a subsequent time interval (e.g., hourly).
Subclasses must implement the logic for managing prediction data based on the imported records.
"""
pass
class PredictionContainer(PredictionStartEndKeepMixin, DataContainer):
"""A container for managing multiple PredictionProvider instances.
This class enables access to data from multiple prediction providers, supporting retrieval and
aggregation of their data as Pandas Series objects. It acts as a dictionary-like structure
where each key represents a specific data field, and the value is a Pandas Series containing
combined data from all PredictionProvider instances for that key.
Note:
Derived classes have to provide their own providers field with correct provider type set.
"""
# To be overloaded by derived classes.
providers: List[PredictionProvider] = Field(
default_factory=list, description="List of prediction providers"
)

View File

@@ -1,144 +0,0 @@
import hashlib
import json
import zoneinfo
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Sequence
import numpy as np
import requests
from akkudoktoreos.config import AppConfig, SetupIncomplete
def repeat_to_shape(array: np.ndarray, target_shape: Sequence[int]) -> np.ndarray:
# Check if the array fits the target shape
if len(target_shape) != array.ndim:
raise ValueError("Array and target shape must have the same number of dimensions")
# Number of repetitions per dimension
repeats = tuple(target_shape[i] // array.shape[i] for i in range(array.ndim))
# Use np.tile to expand the array
expanded_array = np.tile(array, repeats)
return expanded_array
class HourlyElectricityPriceForecast:
def __init__(
self,
source: str | Path,
config: AppConfig,
charges: float = 0.000228,
use_cache: bool = True,
): # 228
self.cache_dir = config.working_dir / config.directories.cache
self.use_cache = use_cache
if not self.cache_dir.is_dir():
raise SetupIncomplete(f"Output path does not exist: {self.cache_dir}.")
self.cache_time_file = self.cache_dir / "cache_timestamp.txt"
self.prices = self.load_data(source)
self.charges = charges
self.prediction_hours = config.eos.prediction_hours
def load_data(self, source: str | Path) -> list[dict[str, Any]]:
cache_file = self.get_cache_file(source)
if isinstance(source, str):
if cache_file.is_file() and not self.is_cache_expired() and self.use_cache:
print("Loading data from cache...")
with cache_file.open("r") as file:
json_data = json.load(file)
else:
print("Loading data from the URL...")
response = requests.get(source)
if response.status_code == 200:
json_data = response.json()
with cache_file.open("w") as file:
json.dump(json_data, file)
self.update_cache_timestamp()
else:
raise Exception(f"Error fetching data: {response.status_code}")
elif source.is_file():
with source.open("r") as file:
json_data = json.load(file)
else:
raise ValueError(f"Input is not a valid path: {source}")
return json_data["values"]
def get_cache_file(self, url: str | Path) -> Path:
if isinstance(url, Path):
url = str(url)
hash_object = hashlib.sha256(url.encode())
hex_dig = hash_object.hexdigest()
return self.cache_dir / f"cache_{hex_dig}.json"
def is_cache_expired(self) -> bool:
if not self.cache_time_file.is_file():
return True
with self.cache_time_file.open("r") as file:
timestamp_str = file.read()
last_cache_time = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")
return datetime.now() - last_cache_time > timedelta(hours=1)
def update_cache_timestamp(self) -> None:
with self.cache_time_file.open("w") as file:
file.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def get_price_for_date(self, date_str: str) -> np.ndarray:
"""Returns all prices for the specified date, including the price from 00:00 of the previous day."""
# Convert date string to datetime object
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
# Calculate the previous day
previous_day = date_obj - timedelta(days=1)
previous_day_str = previous_day.strftime("%Y-%m-%d")
# Extract the price from 00:00 of the previous day
previous_day_prices = [
entry["marketpriceEurocentPerKWh"] + self.charges
for entry in self.prices
if previous_day_str in entry["end"]
]
last_price_of_previous_day = previous_day_prices[-1] if previous_day_prices else 0
# Extract all prices for the specified date
date_prices = [
entry["marketpriceEurocentPerKWh"] + self.charges
for entry in self.prices
if date_str in entry["end"]
]
print(f"getPrice: {len(date_prices)}")
# Add the last price of the previous day at the start of the list
if len(date_prices) == 23:
date_prices.insert(0, last_price_of_previous_day)
return np.array(date_prices) / (1000.0 * 100.0) + self.charges
def get_price_for_daterange(self, start_date_str: str, end_date_str: str) -> np.ndarray:
"""Returns all prices between the start and end dates."""
print(start_date_str)
print(end_date_str)
start_date_utc = datetime.strptime(start_date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
end_date_utc = datetime.strptime(end_date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
start_date = start_date_utc.astimezone(zoneinfo.ZoneInfo("Europe/Berlin"))
end_date = end_date_utc.astimezone(zoneinfo.ZoneInfo("Europe/Berlin"))
price_list: list[float] = []
while start_date < end_date:
date_str = start_date.strftime("%Y-%m-%d")
daily_prices = self.get_price_for_date(date_str)
if daily_prices.size == 24:
price_list.extend(daily_prices)
start_date += timedelta(days=1)
price_list_np = np.array(price_list)
# If prediction hours are greater than 0, reshape the price list
if self.prediction_hours > 0:
price_list_np = repeat_to_shape(price_list_np, (self.prediction_hours,))
return price_list_np

View File

@@ -1,682 +0,0 @@
"""PV Power Forecasting Module.
This module contains classes and methods to retrieve, process, and display photovoltaic (PV)
power forecast data, including temperature, windspeed, DC power, and AC power forecasts.
The module supports caching of forecast data to reduce redundant network requests and includes
functions to update AC power measurements and retrieve forecasts within a specified date range.
Classes
ForecastData: Represents a single forecast entry, including DC power, AC power,
temperature, and windspeed.
PVForecast: Retrieves, processes, and stores PV power forecast data, either from
a file or URL, with optional caching. It also provides methods to query
and update the forecast data, convert it to a DataFrame, and output key
metrics like AC power.
Example:
# Initialize PVForecast class with an URL
forecast = PVForecast(
prediction_hours=24,
url="https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405..."
)
# Update the AC power measurement for a specific date and time
forecast.update_ac_power_measurement(ac_power_measurement=1000, date_time=datetime.now())
# Print the forecast data with DC and AC power details
forecast.print_ac_power_and_measurement()
# Get the forecast data as a Pandas DataFrame
df = forecast.get_forecast_dataframe()
print(df)
Attributes:
prediction_hours (int): Number of forecast hours. Defaults to 48.
"""
import json
from datetime import date, datetime
from pathlib import Path
from typing import Any, List, Optional, Union
import numpy as np
import pandas as pd
import requests
from pydantic import BaseModel, ValidationError
from akkudoktoreos.utils.cachefilestore import cache_in_file
from akkudoktoreos.utils.datetimeutil import to_datetime
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__, logging_level="DEBUG")
class AkkudoktorForecastHorizon(BaseModel):
altitude: int
azimuthFrom: int
azimuthTo: int
class AkkudoktorForecastMeta(BaseModel):
lat: float
lon: float
power: List[int]
azimuth: List[int]
tilt: List[int]
timezone: str
albedo: float
past_days: int
inverterEfficiency: float
powerInverter: List[int]
cellCoEff: float
range: bool
horizont: List[List[AkkudoktorForecastHorizon]]
horizontString: List[str]
class AkkudoktorForecastValue(BaseModel):
datetime: str
dcPower: float
power: float
sunTilt: float
sunAzimuth: float
temperature: float
relativehumidity_2m: float
windspeed_10m: float
class AkkudoktorForecast(BaseModel):
meta: AkkudoktorForecastMeta
values: List[List[AkkudoktorForecastValue]]
def validate_pv_forecast_data(data: dict[str, Any]) -> Optional[str]:
"""Validate PV forecast data."""
try:
AkkudoktorForecast.model_validate(data)
data_type = "Akkudoktor"
except ValidationError as e:
error_msg = ""
for error in e.errors():
field = " -> ".join(str(x) for x in error["loc"])
message = error["msg"]
error_type = error["type"]
error_msg += f"Field: {field}\nError: {message}\nType: {error_type}\n"
logger.debug(f"Validation did not succeed: {error_msg}")
return None
return data_type
class ForecastResponse(BaseModel):
temperature: list[float]
pvpower: list[float]
class ForecastData:
"""Stores forecast data for PV power and weather parameters.
Attributes:
date_time (datetime): The date and time of the forecast.
dc_power (float): The direct current (DC) power in watts.
ac_power (float): The alternating current (AC) power in watts.
windspeed_10m (float, optional): Wind speed at 10 meters altitude.
temperature (float, optional): Temperature in degrees Celsius.
ac_power_measurement (float, optional): Measured AC power.
"""
def __init__(
self,
date_time: datetime,
dc_power: float,
ac_power: float,
windspeed_10m: Optional[float] = None,
temperature: Optional[float] = None,
ac_power_measurement: Optional[float] = None,
):
"""Initializes the ForecastData instance.
Args:
date_time (datetime): The date and time of the forecast.
dc_power (float): The DC power in watts.
ac_power (float): The AC power in watts.
windspeed_10m (float, optional): Wind speed at 10 meters altitude. Defaults to None.
temperature (float, optional): Temperature in degrees Celsius. Defaults to None.
ac_power_measurement (float, optional): Measured AC power. Defaults to None.
"""
self.date_time = date_time
self.dc_power = dc_power
self.ac_power = ac_power
self.windspeed_10m = windspeed_10m
self.temperature = temperature
self.ac_power_measurement = ac_power_measurement
def get_date_time(self) -> datetime:
"""Returns the forecast date and time.
Returns:
datetime: The date and time of the forecast.
"""
return self.date_time
def get_dc_power(self) -> float:
"""Returns the DC power.
Returns:
float: DC power in watts.
"""
return self.dc_power
def get_ac_power_measurement(self) -> Optional[float]:
"""Returns the measured AC power.
It returns the measured AC power if available; otherwise None.
Returns:
float: Measured AC power in watts or None
"""
return self.ac_power_measurement
def get_ac_power(self) -> float:
"""Returns the AC power.
If a measured value is available, it returns the measured AC power;
otherwise, it returns the forecasted AC power.
Returns:
float: AC power in watts.
"""
if self.ac_power_measurement is not None:
return self.ac_power_measurement
else:
return self.ac_power
def get_windspeed_10m(self) -> Optional[float]:
"""Returns the wind speed at 10 meters altitude.
Returns:
float: Wind speed in meters per second.
"""
return self.windspeed_10m
def get_temperature(self) -> Optional[float]:
"""Returns the temperature.
Returns:
float: Temperature in degrees Celsius.
"""
return self.temperature
class PVForecast:
"""Manages PV (photovoltaic) power forecasts and weather data.
Forecast data can be loaded from different sources (in-memory data, file, or URL).
Attributes:
meta (dict): Metadata related to the forecast (e.g., source, location).
forecast_data (list): A list of forecast data points of `ForecastData` objects.
prediction_hours (int): The number of hours into the future the forecast covers.
current_measurement (Optional[float]): The current AC power measurement in watts (or None if unavailable).
data (Optional[dict]): JSON data containing the forecast information (if provided).
filepath (Optional[str]): Filepath to the forecast data file (if provided).
url (Optional[str]): URL to retrieve forecast data from an API (if provided).
_forecast_start (Optional[date]): Start datetime for the forecast period.
tz_name (Optional[str]): The time zone name of the forecast data, if applicable.
"""
def __init__(
self,
data: Optional[dict[str, Any]] = None,
filepath: Optional[str | Path] = None,
url: Optional[str] = None,
forecast_start: Union[datetime, date, str, int, float, None] = None,
prediction_hours: Optional[int] = None,
):
"""Initializes a `PVForecast` instance.
Forecast data can be loaded from in-memory `data`, a file specified by `filepath`, or
fetched from a remote `url`. If none are provided, an empty forecast will be initialized.
The `forecast_start` and `prediction_hours` parameters can be specified to control the
forecasting time period.
Use `process_data()` to fill an empty forecast later on.
Args:
data (Optional[dict]): In-memory JSON data containing forecast information. Defaults to None.
filepath (Optional[str]): Path to a local file containing forecast data in JSON format. Defaults to None.
url (Optional[str]): URL to an API providing forecast data. Defaults to None.
forecast_start (Union[datetime, date, str, int, float]): The start datetime for the forecast period.
Can be a `datetime`, `date`, `str` (formatted date), `int` (timestamp), `float`, or None. Defaults to None.
prediction_hours (Optional[int]): The number of hours to forecast into the future. Defaults to 48 hours.
Example:
forecast = PVForecast(data=my_forecast_data, forecast_start="2024-10-13", prediction_hours=72)
"""
self.meta: dict[str, Any] = {}
self.forecast_data: list[ForecastData] = []
self.current_measurement: Optional[float] = None
self.data = data
self.filepath = filepath
self.url = url
self._forecast_start: Optional[datetime] = None
if forecast_start:
self._forecast_start = to_datetime(forecast_start, to_naiv=True, to_maxtime=False)
self.prediction_hours = prediction_hours
self._tz_name = None
if self.data or self.filepath or self.url:
self.process_data(
data=self.data,
filepath=self.filepath,
url=self.url,
forecast_start=self._forecast_start,
prediction_hours=self.prediction_hours,
)
def update_ac_power_measurement(
self,
ac_power_measurement: float,
date_time: Union[datetime, date, str, int, float, None] = None,
) -> bool:
"""Updates the AC power measurement for a specific time.
Args:
date_time (datetime): The date and time of the measurement.
ac_power_measurement (float): Measured AC power.
Returns:
bool: True if a matching timestamp was found, False otherwise.
"""
found = False
input_date_hour = to_datetime(
date_time, to_timezone=self._tz_name, to_naiv=True, to_maxtime=False
).replace(minute=0, second=0, microsecond=0)
for forecast in self.forecast_data:
forecast_date_hour = to_datetime(forecast.date_time, to_naiv=True).replace(
minute=0, second=0, microsecond=0
)
if forecast_date_hour == input_date_hour:
forecast.ac_power_measurement = ac_power_measurement
found = True
logger.debug(
f"AC Power measurement updated at date {input_date_hour}: {ac_power_measurement}"
)
break
return found
def process_data(
self,
data: Optional[dict[str, Any]] = None,
filepath: Optional[str | Path] = None,
url: Optional[str] = None,
forecast_start: Union[datetime, date, str, int, float, None] = None,
prediction_hours: Optional[int] = None,
) -> None:
"""Processes the forecast data from the provided source (in-memory `data`, `filepath`, or `url`).
If `forecast_start` and `prediction_hours` are provided, they define the forecast period.
Args:
data (Optional[dict]): JSON data containing forecast values. Defaults to None.
filepath (Optional[str]): Path to a file with forecast data. Defaults to None.
url (Optional[str]): API URL to retrieve forecast data from. Defaults to None.
forecast_start (Union[datetime, date, str, int, float, None]): Start datetime of the forecast
period. Defaults to None. If given before it is cached.
prediction_hours (Optional[int]): The number of hours to forecast into the future.
Defaults to None. If given before it is cached.
Returns:
None
Raises:
FileNotFoundError: If the specified `filepath` does not exist.
ValueError: If no valid data source or data is provided.
Example:
forecast = PVForecast(
url="https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405&"
"power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&"
"power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&"
"power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&"
"power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&"
"past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&"
"timezone=Europe%2FBerlin&hourly=relativehumidity_2m%2Cwindspeed_10m",
prediction_hours = 24,
)
"""
# Get input forecast data
if data:
pass
elif filepath:
data = self.load_data_from_file(filepath)
elif url:
data = self.load_data_from_url_with_caching(url)
elif self.data or self.filepath or self.url:
# Re-process according to previous arguments
if self.data:
data = self.data
elif self.filepath:
data = self.load_data_from_file(self.filepath)
elif self.url:
data = self.load_data_from_url_with_caching(self.url)
else:
raise NotImplementedError(
"Re-processing for None input is not implemented!"
) # Invalid path
else:
raise ValueError("No prediction input data available.")
assert data is not None # make mypy happy
# Validate input data to be of a known format
data_format = validate_pv_forecast_data(data)
if data_format != "Akkudoktor":
raise ValueError(f"Prediction input data are of unknown format: '{data_format}'.")
# Assure we have a forecast start datetime
if forecast_start is None:
forecast_start = self._forecast_start
if forecast_start is None:
forecast_start = datetime(1970, 1, 1)
# Assure we have prediction hours set
if prediction_hours is None:
prediction_hours = self.prediction_hours
if prediction_hours is None:
prediction_hours = 48
self.prediction_hours = prediction_hours
if data_format == "Akkudoktor":
# --------------------------------------------
# From here Akkudoktor PV forecast data format
# ---------------------------------------------
self.meta = data.get("meta", {})
all_values = data.get("values")
# timezone of the PV system
self._tz_name = self.meta.get("timezone", None)
if not self._tz_name:
raise NotImplementedError(
"Processing without PV system timezone info ist not implemented!"
)
# Assumption that all lists are the same length and are ordered chronologically
# in ascending order and have the same timestamps.
values_len = len(all_values[0])
if values_len < self.prediction_hours:
# Expect one value set per prediction hour
raise ValueError(
f"The forecast must cover at least {self.prediction_hours} hours, "
f"but only {values_len} data sets are given in forecast data."
)
# Convert forecast_start to timezone of PV system and make it a naiv datetime
self._forecast_start = to_datetime(
forecast_start, to_timezone=self._tz_name, to_naiv=True
)
logger.debug(f"Forecast start set to {self._forecast_start}")
for i in range(values_len):
# Zeige die ursprünglichen und berechneten Zeitstempel an
original_datetime = all_values[0][i].get("datetime")
# print(original_datetime," ",sum_dc_power," ",all_values[0][i]['dcPower'])
dt = to_datetime(original_datetime, to_timezone=self._tz_name, to_naiv=True)
# iso_datetime = parser.parse(original_datetime).isoformat() # Konvertiere zu ISO-Format
# print()
# Optional: 2 Stunden abziehen, um die Zeitanpassung zu testen
# adjusted_datetime = parser.parse(original_datetime) - timedelta(hours=2)
# print(f"Angepasste Zeitstempel: {adjusted_datetime.isoformat()}")
if dt < self._forecast_start:
# forecast data are too old
continue
sum_dc_power = sum(values[i]["dcPower"] for values in all_values)
sum_ac_power = sum(values[i]["power"] for values in all_values)
forecast = ForecastData(
date_time=dt, # Verwende angepassten Zeitstempel
dc_power=sum_dc_power,
ac_power=sum_ac_power,
windspeed_10m=all_values[0][i].get("windspeed_10m"),
temperature=all_values[0][i].get("temperature"),
)
self.forecast_data.append(forecast)
if len(self.forecast_data) < self.prediction_hours:
raise ValueError(
f"The forecast must cover at least {self.prediction_hours} hours, "
f"but only {len(self.forecast_data)} hours starting from {forecast_start} "
f"were predicted."
)
# Adapt forecast start to actual value
self._forecast_start = self.forecast_data[0].get_date_time()
logger.debug(f"Forecast start adapted to {self._forecast_start}")
def load_data_from_file(self, filepath: str | Path) -> dict[str, Any]:
"""Loads forecast data from a file.
Args:
filepath (str): Path to the file containing the forecast data.
Returns:
data (dict): JSON data containing forecast values.
"""
with open(filepath, "r") as file:
data = json.load(file)
return data
def load_data_from_url(self, url: str) -> dict[str, Any]:
"""Loads forecast data from a URL.
Example:
https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405&power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&timezone=Europe%2FBerlin&hourly=relativehumidity_2m%2Cwindspeed_10m
Args:
url (str): URL of the API providing forecast data.
Returns:
data (dict): JSON data containing forecast values.
"""
response = requests.get(url)
if response.status_code == 200:
data = response.json()
else:
data = f"Failed to load data from `{url}`. Status Code: {response.status_code}"
logger.error(data)
return data
@cache_in_file() # use binary mode by default as we have python objects not text
def load_data_from_url_with_caching(self, url: str) -> dict[str, Any]:
"""Loads data from a URL or from the cache if available.
Args:
url (str): URL of the API providing forecast data.
Returns:
data (dict): JSON data containing forecast values.
"""
response = requests.get(url)
if response.status_code == 200:
data = response.json()
logger.debug(f"Data fetched from URL `{url} and cached.")
else:
data = f"Failed to load data from `{url}`. Status Code: {response.status_code}"
logger.error(data)
return data
def get_forecast_data(self) -> list[ForecastData]:
"""Returns the forecast data.
Returns:
list: List of ForecastData objects.
"""
return self.forecast_data
def get_temperature_forecast_for_date(
self, input_date: Union[datetime, date, str, int, float, None]
) -> np.ndarray:
"""Returns the temperature forecast for a specific date.
Args:
input_date (str): Date
Returns:
np.array: Array of temperature forecasts.
"""
if not self._tz_name:
raise NotImplementedError(
"Processing without PV system timezone info ist not implemented!"
)
input_date = to_datetime(input_date, to_timezone=self._tz_name, to_naiv=True).date()
daily_forecast_obj = [
data for data in self.forecast_data if data.get_date_time().date() == input_date
]
daily_forecast = []
for d in daily_forecast_obj:
daily_forecast.append(d.get_temperature())
return np.array(daily_forecast)
def get_pv_forecast_for_date_range(
self,
start_date: Union[datetime, date, str, int, float, None],
end_date: Union[datetime, date, str, int, float, None],
) -> np.ndarray:
"""Returns the PV forecast for a date range.
Args:
start_date_str (str): Start date in the format YYYY-MM-DD.
end_date_str (str): End date in the format YYYY-MM-DD.
Returns:
pd.DataFrame: DataFrame containing the forecast data.
"""
if not self._tz_name:
raise NotImplementedError(
"Processing without PV system timezone info ist not implemented!"
)
start_date = to_datetime(start_date, to_timezone=self._tz_name, to_naiv=True).date()
end_date = to_datetime(end_date, to_timezone=self._tz_name, to_naiv=True).date()
date_range_forecast = []
for data in self.forecast_data:
data_date = data.get_date_time().date()
if start_date <= data_date <= end_date:
date_range_forecast.append(data)
# print(data.get_date_time(), " ", data.get_ac_power())
ac_power_forecast = np.array([data.get_ac_power() for data in date_range_forecast])
return np.array(ac_power_forecast)[: self.prediction_hours]
def get_temperature_for_date_range(
self,
start_date: Union[datetime, date, str, int, float, None],
end_date: Union[datetime, date, str, int, float, None],
) -> np.ndarray:
"""Returns the temperature forecast for a given date range.
Args:
start_date (datetime | date | str | int | float | None): Start date.
end_date (datetime | date | str | int | float | None): End date.
Returns:
np.array: Array containing temperature forecasts for each hour within the date range.
"""
if not self._tz_name:
raise NotImplementedError(
"Processing without PV system timezone info ist not implemented!"
)
start_date = to_datetime(start_date, to_timezone=self._tz_name, to_naiv=True).date()
end_date = to_datetime(end_date, to_timezone=self._tz_name, to_naiv=True).date()
date_range_forecast = []
for data in self.forecast_data:
data_date = data.get_date_time().date()
if start_date <= data_date <= end_date:
date_range_forecast.append(data)
temperature_forecast = [data.get_temperature() for data in date_range_forecast]
return np.array(temperature_forecast)[: self.prediction_hours]
def get_forecast_dataframe(self) -> pd.DataFrame:
"""Converts the forecast data into a Pandas DataFrame.
Returns:
pd.DataFrame: A DataFrame containing the forecast data with columns for date/time,
DC power, AC power, windspeed, and temperature.
"""
data = [
{
"date_time": f.get_date_time(),
"dc_power": f.get_dc_power(),
"ac_power": f.get_ac_power(),
"windspeed_10m": f.get_windspeed_10m(),
"temperature": f.get_temperature(),
}
for f in self.forecast_data
]
# Erstelle ein DataFrame
df = pd.DataFrame(data)
return df
def get_forecast_start(self) -> Optional[datetime]:
"""Return the start of the forecast data in local timezone.
Returns:
forecast_start (datetime | None): The start datetime or None if no data available.
"""
if not self._forecast_start:
return None
return to_datetime(
self._forecast_start, to_timezone=self._tz_name, to_naiv=True, to_maxtime=False
)
def report_ac_power_and_measurement(self) -> str:
"""Report DC/ AC power, and AC power measurement for each forecast hour.
For each forecast entry, the time, DC power, forecasted AC power, measured AC power
(if available), and the value returned by the `get_ac_power` method is provided.
Returns:
str: The report.
"""
rep = ""
for forecast in self.forecast_data:
date_time = forecast.date_time
dc_pow = round(forecast.dc_power, 2) if forecast.dc_power else None
ac_pow = round(forecast.ac_power, 2) if forecast.ac_power else None
ac_pow_measurement = (
round(forecast.ac_power_measurement, 2) if forecast.ac_power_measurement else None
)
get_ac_pow = round(forecast.get_ac_power(), 2) if forecast.get_ac_power() else None
rep += (
f"Date&Time: {date_time}, DC: {dc_pow}, AC: {ac_pow}, "
f"AC measured: {ac_pow_measurement}, AC GET: {get_ac_pow}"
"\n"
)
return rep
# Example of how to use the PVForecast class
if __name__ == "__main__":
"""Main execution block to demonstrate the use of the PVForecast class.
Fetches PV power forecast data from a given URL, updates the AC power measurement
for the current date/time, and prints the DC and AC power information.
"""
forecast = PVForecast(
prediction_hours=24,
url="https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405&"
"power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&"
"power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&"
"power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&"
"power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&"
"past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&timezone=Europe%2FBerlin&"
"hourly=relativehumidity_2m%2Cwindspeed_10m",
)
forecast.update_ac_power_measurement(ac_power_measurement=1000, date_time=datetime.now())
print(forecast.report_ac_power_and_measurement())

View File

@@ -0,0 +1,453 @@
"""PV forecast module for PV power predictions."""
from typing import Any, ClassVar, List, Optional
from pydantic import Field, computed_field
from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class PVForecastCommonSettings(SettingsBaseModel):
# General plane parameters
# https://pvlib-python.readthedocs.io/en/stable/_modules/pvlib/iotools/pvgis.html
# Inverter Parameters
# https://pvlib-python.readthedocs.io/en/stable/_modules/pvlib/inverter.html
pvforecast_provider: Optional[str] = Field(
default=None, description="PVForecast provider id of provider to be used."
)
# pvforecast0_latitude: Optional[float] = Field(default=None, description="Latitude in decimal degrees, between -90 and 90, north is positive (ISO 19115) (°)")
# Plane 0
pvforecast0_surface_tilt: Optional[float] = Field(
default=0, description="Tilt angle from horizontal plane. Ignored for two-axis tracking."
)
pvforecast0_surface_azimuth: Optional[float] = Field(
default=180,
description="Orientation (azimuth angle) of the (fixed) plane. Clockwise from north (north=0, east=90, south=180, west=270).",
)
pvforecast0_userhorizon: Optional[List[float]] = Field(
default=None,
description="Elevation of horizon in degrees, at equally spaced azimuth clockwise from north.",
)
pvforecast0_peakpower: Optional[float] = Field(
default=None, description="Nominal power of PV system in kW."
)
pvforecast0_pvtechchoice: Optional[str] = Field(
default="crystSi", description="PV technology. One of 'crystSi', 'CIS', 'CdTe', 'Unknown'."
)
pvforecast0_mountingplace: Optional[str] = Field(
default="free",
description="Type of mounting for PV system. Options are 'free' for free-standing and 'building' for building-integrated.",
)
pvforecast0_loss: Optional[float] = Field(
default=None, description="Sum of PV system losses in percent"
)
pvforecast0_trackingtype: Optional[int] = Field(
default=0,
description="Type of suntracking. 0=fixed, 1=single horizontal axis aligned north-south, 2=two-axis tracking, 3=vertical axis tracking, 4=single horizontal axis aligned east-west, 5=single inclined axis aligned north-south.",
)
pvforecast0_optimal_surface_tilt: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt angle. Ignored for two-axis tracking.",
)
pvforecast0_optimalangles: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt and azimuth angles. Ignored for two-axis tracking.",
)
pvforecast0_albedo: Optional[float] = Field(
default=None,
description="Proportion of the light hitting the ground that it reflects back.",
)
pvforecast0_module_model: Optional[str] = Field(
default=None, description="Model of the PV modules of this plane."
)
pvforecast0_inverter_model: Optional[str] = Field(
default=None, description="Model of the inverter of this plane."
)
pvforecast0_inverter_paco: Optional[int] = Field(
default=None, description="AC power rating of the inverter. [W]"
)
pvforecast0_modules_per_string: Optional[str] = Field(
default=None, description="Number of the PV modules of the strings of this plane."
)
pvforecast0_strings_per_inverter: Optional[str] = Field(
default=None, description="Number of the strings of the inverter of this plane."
)
# Plane 1
pvforecast1_surface_tilt: Optional[float] = Field(
default=0, description="Tilt angle from horizontal plane. Ignored for two-axis tracking."
)
pvforecast1_surface_azimuth: Optional[float] = Field(
default=180,
description="Orientation (azimuth angle) of the (fixed) plane. Clockwise from north (north=0, east=90, south=180, west=270).",
)
pvforecast1_userhorizon: Optional[List[float]] = Field(
default=None,
description="Elevation of horizon in degrees, at equally spaced azimuth clockwise from north.",
)
pvforecast1_peakpower: Optional[float] = Field(
default=None, description="Nominal power of PV system in kW."
)
pvforecast1_pvtechchoice: Optional[str] = Field(
default="crystSi", description="PV technology. One of 'crystSi', 'CIS', 'CdTe', 'Unknown'."
)
pvforecast1_mountingplace: Optional[str] = Field(
default="free",
description="Type of mounting for PV system. Options are 'free' for free-standing and 'building' for building-integrated.",
)
pvforecast1_loss: Optional[float] = Field(0, description="Sum of PV system losses in percent")
pvforecast1_trackingtype: Optional[int] = Field(
default=0,
description="Type of suntracking. 0=fixed, 1=single horizontal axis aligned north-south, 2=two-axis tracking, 3=vertical axis tracking, 4=single horizontal axis aligned east-west, 5=single inclined axis aligned north-south.",
)
pvforecast1_optimal_surface_tilt: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt angle. Ignored for two-axis tracking.",
)
pvforecast1_optimalangles: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt and azimuth angles. Ignored for two-axis tracking.",
)
pvforecast1_albedo: Optional[float] = Field(
default=None,
description="Proportion of the light hitting the ground that it reflects back.",
)
pvforecast1_module_model: Optional[str] = Field(
default=None, description="Model of the PV modules of this plane."
)
pvforecast1_inverter_model: Optional[str] = Field(
default=None, description="Model of the inverter of this plane."
)
pvforecast1_inverter_paco: Optional[int] = Field(
default=None, description="AC power rating of the inverter. [W]"
)
pvforecast1_modules_per_string: Optional[str] = Field(
default=None, description="Number of the PV modules of the strings of this plane."
)
pvforecast1_strings_per_inverter: Optional[str] = Field(
default=None, description="Number of the strings of the inverter of this plane."
)
# Plane 2
pvforecast2_surface_tilt: Optional[float] = Field(
default=0, description="Tilt angle from horizontal plane. Ignored for two-axis tracking."
)
pvforecast2_surface_azimuth: Optional[float] = Field(
default=180,
description="Orientation (azimuth angle) of the (fixed) plane. Clockwise from north (north=0, east=90, south=180, west=270).",
)
pvforecast2_userhorizon: Optional[List[float]] = Field(
default=None,
description="Elevation of horizon in degrees, at equally spaced azimuth clockwise from north.",
)
pvforecast2_peakpower: Optional[float] = Field(
default=None, description="Nominal power of PV system in kW."
)
pvforecast2_pvtechchoice: Optional[str] = Field(
default="crystSi", description="PV technology. One of 'crystSi', 'CIS', 'CdTe', 'Unknown'."
)
pvforecast2_mountingplace: Optional[str] = Field(
default="free",
description="Type of mounting for PV system. Options are 'free' for free-standing and 'building' for building-integrated.",
)
pvforecast2_loss: Optional[float] = Field(0, description="Sum of PV system losses in percent")
pvforecast2_trackingtype: Optional[int] = Field(
default=0,
description="Type of suntracking. 0=fixed, 1=single horizontal axis aligned north-south, 2=two-axis tracking, 3=vertical axis tracking, 4=single horizontal axis aligned east-west, 5=single inclined axis aligned north-south.",
)
pvforecast2_optimal_surface_tilt: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt angle. Ignored for two-axis tracking.",
)
pvforecast2_optimalangles: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt and azimuth angles. Ignored for two-axis tracking.",
)
pvforecast2_albedo: Optional[float] = Field(
default=None,
description="Proportion of the light hitting the ground that it reflects back.",
)
pvforecast2_module_model: Optional[str] = Field(
default=None, description="Model of the PV modules of this plane."
)
pvforecast2_inverter_model: Optional[str] = Field(
default=None, description="Model of the inverter of this plane."
)
pvforecast2_inverter_paco: Optional[int] = Field(
default=None, description="AC power rating of the inverter. [W]"
)
pvforecast2_modules_per_string: Optional[str] = Field(
default=None, description="Number of the PV modules of the strings of this plane."
)
pvforecast2_strings_per_inverter: Optional[str] = Field(
default=None, description="Number of the strings of the inverter of this plane."
)
# Plane 3
pvforecast3_surface_tilt: Optional[float] = Field(
default=0, description="Tilt angle from horizontal plane. Ignored for two-axis tracking."
)
pvforecast3_surface_azimuth: Optional[float] = Field(
default=180,
description="Orientation (azimuth angle) of the (fixed) plane. Clockwise from north (north=0, east=90, south=180, west=270).",
)
pvforecast3_userhorizon: Optional[List[float]] = Field(
default=None,
description="Elevation of horizon in degrees, at equally spaced azimuth clockwise from north.",
)
pvforecast3_peakpower: Optional[float] = Field(
default=None, description="Nominal power of PV system in kW."
)
pvforecast3_pvtechchoice: Optional[str] = Field(
default="crystSi", description="PV technology. One of 'crystSi', 'CIS', 'CdTe', 'Unknown'."
)
pvforecast3_mountingplace: Optional[str] = Field(
default="free",
description="Type of mounting for PV system. Options are 'free' for free-standing and 'building' for building-integrated.",
)
pvforecast3_loss: Optional[float] = Field(0, description="Sum of PV system losses in percent")
pvforecast3_trackingtype: Optional[int] = Field(
default=0,
description="Type of suntracking. 0=fixed, 1=single horizontal axis aligned north-south, 2=two-axis tracking, 3=vertical axis tracking, 4=single horizontal axis aligned east-west, 5=single inclined axis aligned north-south.",
)
pvforecast3_optimal_surface_tilt: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt angle. Ignored for two-axis tracking.",
)
pvforecast3_optimalangles: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt and azimuth angles. Ignored for two-axis tracking.",
)
pvforecast3_albedo: Optional[float] = Field(
default=None,
description="Proportion of the light hitting the ground that it reflects back.",
)
pvforecast3_module_model: Optional[str] = Field(
default=None, description="Model of the PV modules of this plane."
)
pvforecast3_inverter_model: Optional[str] = Field(
default=None, description="Model of the inverter of this plane."
)
pvforecast3_inverter_paco: Optional[int] = Field(
default=None, description="AC power rating of the inverter. [W]"
)
pvforecast3_modules_per_string: Optional[str] = Field(
default=None, description="Number of the PV modules of the strings of this plane."
)
pvforecast3_strings_per_inverter: Optional[str] = Field(
default=None, description="Number of the strings of the inverter of this plane."
)
# Plane 4
pvforecast4_surface_tilt: Optional[float] = Field(
default=0, description="Tilt angle from horizontal plane. Ignored for two-axis tracking."
)
pvforecast4_surface_azimuth: Optional[float] = Field(
default=180,
description="Orientation (azimuth angle) of the (fixed) plane. Clockwise from north (north=0, east=90, south=180, west=270).",
)
pvforecast4_userhorizon: Optional[List[float]] = Field(
default=None,
description="Elevation of horizon in degrees, at equally spaced azimuth clockwise from north.",
)
pvforecast4_peakpower: Optional[float] = Field(
default=None, description="Nominal power of PV system in kW."
)
pvforecast4_pvtechchoice: Optional[str] = Field(
"crystSi", description="PV technology. One of 'crystSi', 'CIS', 'CdTe', 'Unknown'."
)
pvforecast4_mountingplace: Optional[str] = Field(
default="free",
description="Type of mounting for PV system. Options are 'free' for free-standing and 'building' for building-integrated.",
)
pvforecast4_loss: Optional[float] = Field(0, description="Sum of PV system losses in percent")
pvforecast4_trackingtype: Optional[int] = Field(
default=0,
description="Type of suntracking. 0=fixed, 1=single horizontal axis aligned north-south, 2=two-axis tracking, 3=vertical axis tracking, 4=single horizontal axis aligned east-west, 5=single inclined axis aligned north-south.",
)
pvforecast4_optimal_surface_tilt: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt angle. Ignored for two-axis tracking.",
)
pvforecast4_optimalangles: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt and azimuth angles. Ignored for two-axis tracking.",
)
pvforecast4_albedo: Optional[float] = Field(
default=None,
description="Proportion of the light hitting the ground that it reflects back.",
)
pvforecast4_module_model: Optional[str] = Field(
default=None, description="Model of the PV modules of this plane."
)
pvforecast4_inverter_model: Optional[str] = Field(
default=None, description="Model of the inverter of this plane."
)
pvforecast4_inverter_paco: Optional[int] = Field(
default=None, description="AC power rating of the inverter. [W]"
)
pvforecast4_modules_per_string: Optional[str] = Field(
default=None, description="Number of the PV modules of the strings of this plane."
)
pvforecast4_strings_per_inverter: Optional[str] = Field(
default=None, description="Number of the strings of the inverter of this plane."
)
# Plane 5
pvforecast5_surface_tilt: Optional[float] = Field(
default=0, description="Tilt angle from horizontal plane. Ignored for two-axis tracking."
)
pvforecast5_surface_azimuth: Optional[float] = Field(
default=180,
description="Orientation (azimuth angle) of the (fixed) plane. Clockwise from north (north=0, east=90, south=180, west=270).",
)
pvforecast5_userhorizon: Optional[List[float]] = Field(
default=None,
description="Elevation of horizon in degrees, at equally spaced azimuth clockwise from north.",
)
pvforecast5_peakpower: Optional[float] = Field(
default=None, description="Nominal power of PV system in kW."
)
pvforecast5_pvtechchoice: Optional[str] = Field(
"crystSi", description="PV technology. One of 'crystSi', 'CIS', 'CdTe', 'Unknown'."
)
pvforecast5_mountingplace: Optional[str] = Field(
default="free",
description="Type of mounting for PV system. Options are 'free' for free-standing and 'building' for building-integrated.",
)
pvforecast5_loss: Optional[float] = Field(0, description="Sum of PV system losses in percent")
pvforecast5_trackingtype: Optional[int] = Field(
default=0,
description="Type of suntracking. 0=fixed, 1=single horizontal axis aligned north-south, 2=two-axis tracking, 3=vertical axis tracking, 4=single horizontal axis aligned east-west, 5=single inclined axis aligned north-south.",
)
pvforecast5_optimal_surface_tilt: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt angle. Ignored for two-axis tracking.",
)
pvforecast5_optimalangles: Optional[bool] = Field(
default=False,
description="Calculate the optimum tilt and azimuth angles. Ignored for two-axis tracking.",
)
pvforecast5_albedo: Optional[float] = Field(
default=None,
description="Proportion of the light hitting the ground that it reflects back.",
)
pvforecast5_module_model: Optional[str] = Field(
default=None, description="Model of the PV modules of this plane."
)
pvforecast5_inverter_model: Optional[str] = Field(
default=None, description="Model of the inverter of this plane."
)
pvforecast5_inverter_paco: Optional[int] = Field(
default=None, description="AC power rating of the inverter. [W]"
)
pvforecast5_modules_per_string: Optional[str] = Field(
default=None, description="Number of the PV modules of the strings of this plane."
)
pvforecast5_strings_per_inverter: Optional[str] = Field(
default=None, description="Number of the strings of the inverter of this plane."
)
pvforecast_max_planes: ClassVar[int] = 6 # Maximum number of planes that can be set
# Computed fields
@computed_field # type: ignore[prop-decorator]
@property
def pvforecast_planes(self) -> List[str]:
"""Compute a list of active planes."""
active_planes = []
# Loop through pvforecast0 to pvforecast4
for i in range(self.pvforecast_max_planes):
peakpower_attr = f"pvforecast{i}_peakpower"
modules_attr = f"pvforecast{i}_modules_per_string"
# Check if either attribute is set and add to active planes
if getattr(self, peakpower_attr, None) or getattr(self, modules_attr, None):
active_planes.append(f"pvforecast{i}")
return active_planes
@computed_field # type: ignore[prop-decorator]
@property
def pvforecast_planes_peakpower(self) -> List[float]:
"""Compute a list of the peak power per active planes."""
planes_peakpower = []
for plane in self.pvforecast_planes:
peakpower_attr = f"{plane}_peakpower"
peakpower = getattr(self, peakpower_attr, None)
if peakpower:
planes_peakpower.append(float(peakpower))
continue
# TODO calculate peak power from modules/strings
planes_peakpower.append(float(5000))
return planes_peakpower
@computed_field # type: ignore[prop-decorator]
@property
def pvforecast_planes_azimuth(self) -> List[float]:
"""Compute a list of the azimuths per active planes."""
planes_azimuth = []
for plane in self.pvforecast_planes:
azimuth_attr = f"{plane}_azimuth"
azimuth = getattr(self, azimuth_attr, None)
if azimuth:
planes_azimuth.append(float(azimuth))
continue
# TODO Use default
planes_azimuth.append(float(180))
return planes_azimuth
@computed_field # type: ignore[prop-decorator]
@property
def pvforecast_planes_tilt(self) -> List[float]:
"""Compute a list of the tilts per active planes."""
planes_tilt = []
for plane in self.pvforecast_planes:
tilt_attr = f"{plane}_tilt"
tilt = getattr(self, tilt_attr, None)
if tilt:
planes_tilt.append(float(tilt))
continue
# TODO Use default
planes_tilt.append(float(0))
return planes_tilt
@computed_field # type: ignore[prop-decorator]
@property
def pvforecast_planes_userhorizon(self) -> Any:
"""Compute a list of the user horizon per active planes."""
planes_userhorizon = []
for plane in self.pvforecast_planes:
userhorizon_attr = f"{plane}_userhorizon"
userhorizon = getattr(self, userhorizon_attr, None)
if userhorizon:
planes_userhorizon.append(userhorizon)
continue
# TODO Use default
planes_userhorizon.append([float(0), float(0)])
return planes_userhorizon
@computed_field # type: ignore[prop-decorator]
@property
def pvforecast_planes_inverter_paco(self) -> Any:
"""Compute a list of the maximum power rating of the inverter per active planes."""
planes_inverter_paco = []
for plane in self.pvforecast_planes:
inverter_paco_attr = f"{plane}_inverter_paco"
inverter_paco = getattr(self, inverter_paco_attr, None)
if inverter_paco:
planes_inverter_paco.append(inverter_paco)
continue
# TODO Use default - no clipping
planes_inverter_paco.append(25000)
return planes_inverter_paco

View File

@@ -0,0 +1,59 @@
"""Abstract and base classes for pvforecast predictions.
Notes:
- Ensure appropriate API keys or configurations are set up if required by external data sources.
"""
from abc import abstractmethod
from typing import List, Optional
from pydantic import Field
from akkudoktoreos.prediction.predictionabc import PredictionProvider, PredictionRecord
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class PVForecastDataRecord(PredictionRecord):
"""Represents a pvforecast data record containing various pvforecast attributes at a specific datetime."""
pvforecast_dc_power: Optional[float] = Field(default=None, description="Total DC power (W)")
pvforecast_ac_power: Optional[float] = Field(default=None, description="Total AC power (W)")
class PVForecastProvider(PredictionProvider):
"""Abstract base class for pvforecast providers.
PVForecastProvider is a thread-safe singleton, ensuring only one instance of this class is created.
Configuration variables:
pvforecast_provider (str): Prediction provider for pvforecast.
Attributes:
prediction_hours (int, optional): The number of hours into the future for which predictions are generated.
prediction_historic_hours (int, optional): The number of past hours for which historical data is retained.
latitude (float, optional): The latitude in degrees, must be within -90 to 90.
longitude (float, optional): The longitude in degrees, must be within -180 to 180.
start_datetime (datetime, optional): The starting datetime for predictions (inlcusive), defaults to the current datetime if unspecified.
end_datetime (datetime, computed): The datetime representing the end of the prediction range (exclusive),
calculated based on `start_datetime` and `prediction_hours`.
keep_datetime (datetime, computed): The earliest datetime for retaining historical data (inclusive), calculated
based on `start_datetime` and `prediction_historic_hours`.
"""
# overload
records: List[PVForecastDataRecord] = Field(
default_factory=list, description="List of PVForecastDataRecord records"
)
@classmethod
@abstractmethod
def provider_id(cls) -> str:
return "PVForecastProvider"
def enabled(self) -> bool:
logger.debug(
f"PVForecastProvider ID {self.provider_id()} vs. config {self.config.pvforecast_provider}"
)
return self.provider_id() == self.config.pvforecast_provider

View File

@@ -0,0 +1,396 @@
"""PV Power Forecasting with Akkudoktor.
This module provides classes and methods to retrieve, process, and display photovoltaic (PV) power forecast data. It includes features for working with environmental data such as temperature, wind speed, DC power, and AC power. Data retrieval is designed to work with Akkudoktor.net, and caching is implemented to reduce redundant network requests. Additionally, the module supports management of historical data for analysis over time.
Classes:
AkkudoktorForecastHorizon: Represents details about the orientation of PV system horizons.
AkkudoktorForecastMeta: Metadata configuration for the forecast, including location, system settings, and timezone.
AkkudoktorForecastValue: Represents a single forecast data entry with information on temperature, wind speed, and solar orientation.
AkkudoktorForecast: The main container for forecast data, holding both metadata and individual forecast entries.
PVForecastAkkudoktorDataRecord: A specialized data record format for PV forecast data, including forecasted and actual AC power measurements.
PVForecastAkkudoktorSettings: Contains configuration settings for constructing the Akkudoktor forecast API URL.
PVForecastAkkudoktor: Primary class to manage PV power forecasts, handle data retrieval, caching, and integration with Akkudoktor.net.
Example:
# Set up the configuration with necessary fields for URL generation
settings_data = {
"prediction_hours": 48,
"prediction_historic_hours": 24,
"latitude": 52.52,
"longitude": 13.405,
"pvforecast_provider": "Akkudoktor",
"pvforecast0_peakpower": 5.0,
"pvforecast0_surface_azimuth": -10,
"pvforecast0_surface_tilt": 7,
"pvforecast0_userhorizon": [20, 27, 22, 20],
"pvforecast0_inverter_paco": 10000,
"pvforecast1_peakpower": 4.8,
"pvforecast1_surface_azimuth": -90,
"pvforecast1_surface_tilt": 7,
"pvforecast1_userhorizon": [30, 30, 30, 50],
"pvforecast1_inverter_paco": 10000,
}
# Create the config instance from the provided data
config = PVForecastAkkudoktorSettings(**settings_data)
# Initialize the forecast object with the generated configuration
forecast = PVForecastAkkudoktor(settings=config)
# Get an actual forecast
forecast.update_data()
# Update the AC power measurement for a specific date and time
forecast.update_value(to_datetime(None, to_maxtime=False), "pvforecastakkudoktor_ac_power_measured", 1000.0)
# Report the DC and AC power forecast along with AC measurements
print(forecast.report_ac_power_and_measurement())
Attributes:
prediction_hours (int): Number of hours into the future to forecast. Default is 48.
prediction_historic_hours (int): Number of past hours to retain for analysis. Default is 24.
latitude (float): Latitude for the forecast location.
longitude (float): Longitude for the forecast location.
start_datetime (datetime): Start time for the forecast, defaulting to current datetime.
end_datetime (datetime): Computed end datetime based on `start_datetime` and `prediction_hours`.
keep_datetime (datetime): Computed threshold datetime for retaining historical data.
Methods:
provider_id(): Returns the unique identifier for the Akkudoktor provider.
_request_forecast(): Retrieves forecast data from the Akkudoktor API.
_update_data(): Updates forecast data within the PVForecastAkkudoktorDataRecord structure.
report_ac_power_and_measurement(): Generates a report on AC and DC power forecasts and actual measurements.
"""
from typing import Any, List, Optional, Union
import requests
from pydantic import Field, ValidationError, computed_field
from akkudoktoreos.core.pydantic import PydanticBaseModel
from akkudoktoreos.prediction.pvforecastabc import (
PVForecastDataRecord,
PVForecastProvider,
)
from akkudoktoreos.utils.cacheutil import cache_in_file
from akkudoktoreos.utils.datetimeutil import compare_datetimes, to_datetime
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class AkkudoktorForecastHorizon(PydanticBaseModel):
altitude: int
azimuthFrom: int
azimuthTo: int
class AkkudoktorForecastMeta(PydanticBaseModel):
lat: float
lon: float
power: List[int]
azimuth: List[int]
tilt: List[int]
timezone: str
albedo: float
past_days: int
inverterEfficiency: float
powerInverter: List[int]
cellCoEff: float
range: bool
horizont: List[List[AkkudoktorForecastHorizon]]
horizontString: List[str]
class AkkudoktorForecastValue(PydanticBaseModel):
datetime: str
dcPower: float
power: float
sunTilt: float
sunAzimuth: float
temperature: float
relativehumidity_2m: float
windspeed_10m: float
class AkkudoktorForecast(PydanticBaseModel):
meta: AkkudoktorForecastMeta
values: List[List[AkkudoktorForecastValue]]
class PVForecastAkkudoktorDataRecord(PVForecastDataRecord):
"""Represents a Akkudoktor specific pvforecast data record containing various pvforecast attributes at a specific datetime."""
pvforecastakkudoktor_ac_power_measured: Optional[float] = Field(
default=None, description="Total AC power measured (W)"
)
pvforecastakkudoktor_wind_speed_10m: Optional[float] = Field(
default=None, description="Wind Speed 10m (kmph)"
)
pvforecastakkudoktor_temp_air: Optional[float] = Field(
default=None, description="Temperature (°C)"
)
# Computed fields
@computed_field # type: ignore[prop-decorator]
@property
def pvforecastakkudoktor_ac_power_any(self) -> Optional[float]:
"""Returns the AC power.
If a measured value is available, it returns the measured AC power;
otherwise, it returns the forecasted AC power.
Returns:
float: AC power in watts or None if no forecast data is available.
"""
if self.pvforecastakkudoktor_ac_power_measured is not None:
return self.pvforecastakkudoktor_ac_power_measured
else:
return self.pvforecast_ac_power
class PVForecastAkkudoktor(PVForecastProvider):
"""Fetch and process PV forecast data from akkudoktor.net.
PVForecastAkkudoktor is a singleton-based class that retrieves weather forecast data
from the PVForecastAkkudoktor API and maps it to `PVForecastDataRecord` fields, applying
any necessary scaling or unit corrections. It manages the forecast over a range
of hours into the future and retains historical data.
Attributes:
prediction_hours (int, optional): Number of hours in the future for the forecast.
prediction_historic_hours (int, optional): Number of past hours for retaining data.
latitude (float, optional): The latitude in degrees, validated to be between -90 and 90.
longitude (float, optional): The longitude in degrees, validated to be between -180 and 180.
start_datetime (datetime, optional): Start datetime for forecasts, defaults to the current datetime.
end_datetime (datetime, computed): The forecast's end datetime, computed based on `start_datetime` and `prediction_hours`.
keep_datetime (datetime, computed): The datetime to retain historical data, computed from `start_datetime` and `prediction_historic_hours`.
Methods:
provider_id(): Returns a unique identifier for the provider.
_request_forecast(): Fetches the forecast from the Akkudoktor API.
_update_data(): Processes and updates forecast data from Akkudoktor in PVForecastDataRecord format.
"""
# overload
records: List[PVForecastAkkudoktorDataRecord] = Field(
default_factory=list, description="List of PVForecastAkkudoktorDataRecord records"
)
@classmethod
def provider_id(cls) -> str:
"""Return the unique identifier for the Akkudoktor provider."""
return "PVForecastAkkudoktor"
@classmethod
def _validate_data(cls, json_str: Union[bytes, Any]) -> AkkudoktorForecast:
"""Validate Akkudoktor PV forecast data."""
try:
akkudoktor_data = AkkudoktorForecast.model_validate_json(json_str)
except ValidationError as e:
error_msg = ""
for error in e.errors():
field = " -> ".join(str(x) for x in error["loc"])
message = error["msg"]
error_type = error["type"]
error_msg += f"Field: {field}\nError: {message}\nType: {error_type}\n"
logger.error(f"Akkudoktor schema change: {error_msg}")
raise ValueError(error_msg)
return akkudoktor_data
def _url(self) -> str:
"""Build akkudoktor.net API request URL."""
url = f"https://api.akkudoktor.net/forecast?lat={self.config.latitude}&lon={self.config.longitude}&"
planes_peakpower = self.config.pvforecast_planes_peakpower
planes_azimuth = self.config.pvforecast_planes_azimuth
planes_tilt = self.config.pvforecast_planes_tilt
planes_inverter_paco = self.config.pvforecast_planes_inverter_paco
planes_userhorizon = self.config.pvforecast_planes_userhorizon
for i, plane in enumerate(self.config.pvforecast_planes):
url += f"power={int(planes_peakpower[i]*1000)}&"
url += f"azimuth={int(planes_azimuth[i])}&"
url += f"tilt={int(planes_tilt[i])}&"
url += f"powerInverter={int(planes_inverter_paco[i])}&"
url += "horizont="
for horizon in planes_userhorizon[i]:
url += f"{int(horizon)},"
url = url[:-1] # remove trailing comma
url += "&"
url += "past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&"
url += f"timezone={self.config.timezone}&"
url += "hourly=relativehumidity_2m%2Cwindspeed_10m"
logger.debug(f"Akkudoktor URL: {url}")
return url
@cache_in_file(with_ttl="1 hour")
def _request_forecast(self) -> AkkudoktorForecast:
"""Fetch PV forecast data from Akkudoktor API.
This method sends a request to Akkudoktor API to retrieve forecast data
for a specified date range and location. The response data is parsed and
returned as JSON for further processing.
Returns:
dict: The parsed JSON response from Akkudoktor API containing forecast data.
Raises:
ValueError: If the API response does not include expected `meta` data.
"""
response = requests.get(self._url())
response.raise_for_status() # Raise an error for bad responses
logger.debug(f"Response from {self._url()}: {response}")
akkudoktor_data = self._validate_data(response.content)
# We are working on fresh data (no cache), report update time
self.update_datetime = to_datetime(in_timezone=self.config.timezone)
return akkudoktor_data
def _update_data(self, force_update: Optional[bool] = False) -> None:
"""Update forecast data in the PVForecastAkkudoktorDataRecord format.
Retrieves data from Akkudoktor. The processed data is inserted into the sequence as
`PVForecastAkkudoktorDataRecord`.
"""
# Assure we have something to request PV power for.
if len(self.config.pvforecast_planes) == 0:
# No planes for PV
error_msg = "Requested PV forecast, but no planes configured."
logger.error(f"Configuration error: {error_msg}")
raise ValueError(error_msg)
# Get Akkudoktor PV Forecast data for the given configuration.
akkudoktor_data = self._request_forecast(force_update=force_update) # type: ignore
# Timezone of the PV system
if self.config.timezone != akkudoktor_data.meta.timezone:
error_msg = f"Configured timezone '{self.config.timezone}' does not match Akkudoktor timezone '{akkudoktor_data.meta.timezone}'."
logger.error(f"Akkudoktor schema change: {error_msg}")
raise ValueError(error_msg)
# Assumption that all lists are the same length and are ordered chronologically
# in ascending order and have the same timestamps.
values_len = len(akkudoktor_data.values[0])
if values_len < self.config.prediction_hours:
# Expect one value set per prediction hour
error_msg = (
f"The forecast must cover at least {self.config.prediction_hours} hours, "
f"but only {values_len} data sets are given in forecast data."
)
logger.error(f"Akkudoktor schema change: {error_msg}")
raise ValueError(error_msg)
for i in range(values_len):
original_datetime = akkudoktor_data.values[0][i].datetime
dt = to_datetime(original_datetime, in_timezone=self.config.timezone)
# iso_datetime = parser.parse(original_datetime).isoformat() # Konvertiere zu ISO-Format
# print()
# Optional: 2 Stunden abziehen, um die Zeitanpassung zu testen
# adjusted_datetime = parser.parse(original_datetime) - timedelta(hours=2)
# print(f"Angepasste Zeitstempel: {adjusted_datetime.isoformat()}")
if compare_datetimes(dt, self.start_datetime).lt:
# forecast data is too old
continue
sum_dc_power = sum(values[i].dcPower for values in akkudoktor_data.values)
sum_ac_power = sum(values[i].power for values in akkudoktor_data.values)
record = PVForecastAkkudoktorDataRecord(
date_time=dt, # Verwende angepassten Zeitstempel
pvforecast_dc_power=sum_dc_power,
pvforecast_ac_power=sum_ac_power,
pvforecastakkudoktor_wind_speed_10m=akkudoktor_data.values[0][i].windspeed_10m,
pvforecastakkudoktor_temp_air=akkudoktor_data.values[0][i].temperature,
)
self.append(record)
if len(self) < self.config.prediction_hours:
raise ValueError(
f"The forecast must cover at least {self.config.prediction_hours} hours, "
f"but only {len(self)} hours starting from {self.start_datetime} "
f"were predicted."
)
def report_ac_power_and_measurement(self) -> str:
"""Report DC/ AC power, and AC power measurement for each forecast hour.
For each forecast entry, the time, DC power, forecasted AC power, measured AC power
(if available), and the value returned by the `get_ac_power` method is provided.
Returns:
str: The report.
"""
rep = ""
for record in self.records:
date_time = record.date_time
dc_pow = round(record.pvforecast_dc_power, 2) if record.pvforecast_dc_power else None
ac_pow = round(record.pvforecast_ac_power, 2) if record.pvforecast_ac_power else None
ac_pow_measurement = (
round(record.pvforecastakkudoktor_ac_power_measured, 2)
if record.pvforecastakkudoktor_ac_power_measured
else None
)
ac_pow_any = (
round(record.pvforecastakkudoktor_ac_power_any, 2)
if record.pvforecastakkudoktor_ac_power_any
else None
)
rep += (
f"Date&Time: {date_time}, DC: {dc_pow}, AC: {ac_pow}, "
f"AC sampled: {ac_pow_measurement}, AC any: {ac_pow_any}"
"\n"
)
return rep
# Example of how to use the PVForecastAkkudoktor class
if __name__ == "__main__":
"""Main execution block to demonstrate the use of the PVForecastAkkudoktor class.
Sets up the forecast configuration fields, fetches PV power forecast data,
updates the AC power measurement for the current date/time, and prints
the DC and AC power information.
"""
# Set up the configuration with necessary fields for URL generation
settings_data = {
"prediction_hours": 48,
"prediction_historic_hours": 24,
"latitude": 52.52,
"longitude": 13.405,
"pvforecast_provider": "PVForecastAkkudoktor",
"pvforecast0_peakpower": 5.0,
"pvforecast0_surface_azimuth": -10,
"pvforecast0_surface_tilt": 7,
"pvforecast0_userhorizon": [20, 27, 22, 20],
"pvforecast0_inverter_paco": 10000,
"pvforecast1_peakpower": 4.8,
"pvforecast1_surface_azimuth": -90,
"pvforecast1_surface_tilt": 7,
"pvforecast1_userhorizon": [30, 30, 30, 50],
"pvforecast1_inverter_paco": 10000,
"pvforecast2_peakpower": 1.4,
"pvforecast2_surface_azimuth": -40,
"pvforecast2_surface_tilt": 60,
"pvforecast2_userhorizon": [60, 30, 0, 30],
"pvforecast2_inverter_paco": 2000,
"pvforecast3_peakpower": 1.6,
"pvforecast3_surface_azimuth": 5,
"pvforecast3_surface_tilt": 45,
"pvforecast3_userhorizon": [45, 25, 30, 60],
"pvforecast3_inverter_paco": 1400,
}
# Initialize the forecast object with the generated configuration
forecast = PVForecastAkkudoktor()
# Get an actual forecast
forecast.update_data()
# Update the AC power measurement for a specific date and time
forecast.update_value(
to_datetime(None, to_maxtime=False), "pvforecastakkudoktor_ac_power_measured", 1000.0
)
# Report the DC and AC power forecast along with AC measurements
print(forecast.report_ac_power_and_measurement())

View File

@@ -0,0 +1,69 @@
"""Retrieves pvforecast forecast data from an import file.
This module provides classes and mappings to manage pvforecast data obtained from
an import file, including support for various pvforecast attributes such as temperature,
humidity, cloud cover, and solar irradiance. The data is mapped to the `PVForecastDataRecord`
format, enabling consistent access to forecasted and historical pvforecast attributes.
"""
from pathlib import Path
from typing import Optional, Union
from pydantic import Field, field_validator
from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.prediction.predictionabc import PredictionImportProvider
from akkudoktoreos.prediction.pvforecastabc import PVForecastProvider
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class PVForecastImportCommonSettings(SettingsBaseModel):
"""Common settings for pvforecast data import from file."""
pvforecastimport_file_path: Optional[Union[str, Path]] = Field(
default=None, description="Path to the file to import pvforecast data from."
)
pvforecastimport_json: Optional[str] = Field(
default=None,
description="JSON string, dictionary of PV forecast float value lists."
"Keys are 'pvforecast_dc_power', 'pvforecast_ac_power'.",
)
# Validators
@field_validator("pvforecastimport_file_path", mode="after")
@classmethod
def validate_pvforecastimport_file_path(
cls, value: Optional[Union[str, Path]]
) -> Optional[Path]:
if value is None:
return None
if isinstance(value, str):
value = Path(value)
"""Ensure file is available."""
value.resolve()
if not value.is_file():
raise ValueError(f"Import file path '{value}' is not a file.")
return value
class PVForecastImport(PVForecastProvider, PredictionImportProvider):
"""Fetch PV forecast data from import file or JSON string.
PVForecastImport is a singleton-based class that retrieves pvforecast forecast data
from a file or JSON string and maps it to `PVForecastDataRecord` fields. It manages the forecast
over a range of hours into the future and retains historical data.
"""
@classmethod
def provider_id(cls) -> str:
"""Return the unique identifier for the PVForecastImport provider."""
return "PVForecastImport"
def _update_data(self, force_update: Optional[bool] = False) -> None:
if self.config.pvforecastimport_file_path is not None:
self.import_from_file(self.config.pvforecastimport_file_path, key_prefix="pvforecast")
if self.config.pvforecastimport_json is not None:
self.import_from_json(self.config.pvforecastimport_json, key_prefix="pvforecast")

View File

@@ -0,0 +1,13 @@
"""Weather forecast module for weather predictions."""
from typing import Optional
from pydantic import Field
from akkudoktoreos.config.configabc import SettingsBaseModel
class WeatherCommonSettings(SettingsBaseModel):
weather_provider: Optional[str] = Field(
default="ClearOutside", description="Weather provider id of provider to be used."
)

View File

@@ -0,0 +1,198 @@
"""Abstract and base classes for weather predictions.
Notes:
- Supported weather sources can be expanded by adding new fetch methods within the
WeatherForecast class.
- Ensure appropriate API keys or configurations are set up if required by external data sources.
"""
from abc import abstractmethod
from typing import List, Optional
import numpy as np
import pandas as pd
import pvlib
from pydantic import Field
from akkudoktoreos.prediction.predictionabc import PredictionProvider, PredictionRecord
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class WeatherDataRecord(PredictionRecord):
"""Represents a weather data record containing various weather attributes at a specific datetime.
Attributes:
date_time (Optional[AwareDatetime]): The datetime of the record.
total_clouds (Optional[float]): Total cloud cover as a percentage of the sky obscured.
low_clouds (Optional[float]): Cloud cover in the lower atmosphere (% sky obscured).
medium_clouds (Optional[float]): Cloud cover in the middle atmosphere (% sky obscured).
high_clouds (Optional[float]): Cloud cover in the upper atmosphere (% sky obscured).
visibility (Optional[float]): Horizontal visibility in meters.
fog (Optional[float]): Fog cover percentage.
precip_type (Optional[str]): Type of precipitation (e.g., "rain", "snow").
precip_prob (Optional[float]): Probability of precipitation as a percentage.
precip_amt (Optional[float]): Precipitation amount in millimeters.
preciptable_water (Optional[float]): Precipitable water in centimeters.
wind_speed (Optional[float]): Wind speed in kilometers per hour.
wind_direction (Optional[float]): Wind direction in degrees (0-360°).
frost_chance (Optional[str]): Probability of frost.
temp_air (Optional[float]): Air temperature in degrees Celsius.
feels_like (Optional[float]): Feels-like temperature in degrees Celsius.
dew_point (Optional[float]): Dew point in degrees Celsius.
relative_humidity (Optional[float]): Relative humidity in percentage.
pressure (Optional[float]): Atmospheric pressure in millibars.
ozone (Optional[float]): Ozone concentration in Dobson units.
ghi (Optional[float]): Global Horizontal Irradiance in watts per square meter (W/m²).
dni (Optional[float]): Direct Normal Irradiance in watts per square meter (W/m²).
dhi (Optional[float]): Diffuse Horizontal Irradiance in watts per square meter (W/m²).
"""
weather_total_clouds: Optional[float] = Field(
default=None, description="Total Clouds (% Sky Obscured)"
)
weather_low_clouds: Optional[float] = Field(
default=None, description="Low Clouds (% Sky Obscured)"
)
weather_medium_clouds: Optional[float] = Field(
None, description="Medium Clouds (% Sky Obscured)"
)
weather_high_clouds: Optional[float] = Field(
default=None, description="High Clouds (% Sky Obscured)"
)
weather_visibility: Optional[float] = Field(default=None, description="Visibility (m)")
weather_fog: Optional[float] = Field(default=None, description="Fog (%)")
weather_precip_type: Optional[str] = Field(default=None, description="Precipitation Type")
weather_precip_prob: Optional[float] = Field(
default=None, description="Precipitation Probability (%)"
)
weather_precip_amt: Optional[float] = Field(
default=None, description="Precipitation Amount (mm)"
)
weather_preciptable_water: Optional[float] = Field(
default=None, description="Precipitable Water (cm)"
)
weather_wind_speed: Optional[float] = Field(default=None, description="Wind Speed (kmph)")
weather_wind_direction: Optional[float] = Field(default=None, description="Wind Direction (°)")
weather_frost_chance: Optional[str] = Field(default=None, description="Chance of Frost")
weather_temp_air: Optional[float] = Field(default=None, description="Temperature (°C)")
weather_feels_like: Optional[float] = Field(default=None, description="Feels Like (°C)")
weather_dew_point: Optional[float] = Field(default=None, description="Dew Point (°C)")
weather_relative_humidity: Optional[float] = Field(
default=None, description="Relative Humidity (%)"
)
weather_pressure: Optional[float] = Field(default=None, description="Pressure (mb)")
weather_ozone: Optional[float] = Field(default=None, description="Ozone (du)")
weather_ghi: Optional[float] = Field(
default=None, description="Global Horizontal Irradiance (W/m2)"
)
weather_dni: Optional[float] = Field(
default=None, description="Direct Normal Irradiance (W/m2)"
)
weather_dhi: Optional[float] = Field(
default=None, description="Diffuse Horizontal Irradiance (W/m2)"
)
class WeatherProvider(PredictionProvider):
"""Abstract base class for weather providers.
WeatherProvider is a thread-safe singleton, ensuring only one instance of this class is created.
Configuration variables:
weather_provider (str): Prediction provider for weather.
Attributes:
prediction_hours (int, optional): The number of hours into the future for which predictions are generated.
prediction_historic_hours (int, optional): The number of past hours for which historical data is retained.
latitude (float, optional): The latitude in degrees, must be within -90 to 90.
longitude (float, optional): The longitude in degrees, must be within -180 to 180.
start_datetime (datetime, optional): The starting datetime for predictions, defaults to the current datetime if unspecified.
end_datetime (datetime, computed): The datetime representing the end of the prediction range,
calculated based on `start_datetime` and `prediction_hours`.
keep_datetime (datetime, computed): The earliest datetime for retaining historical data, calculated
based on `start_datetime` and `prediction_historic_hours`.
"""
# overload
records: List[WeatherDataRecord] = Field(
default_factory=list, description="List of WeatherDataRecord records"
)
@classmethod
@abstractmethod
def provider_id(cls) -> str:
return "WeatherProvider"
def enabled(self) -> bool:
return self.provider_id() == self.config.weather_provider
@classmethod
def estimate_irradiance_from_cloud_cover(
cls, lat: float, lon: float, cloud_cover: pd.Series, offset: int = 35
) -> tuple:
"""Estimates irradiance values (GHI, DNI, DHI) based on cloud cover.
This method estimates solar irradiance in several steps:
1. **Clear Sky GHI Calculation**: Determines the Global Horizontal Irradiance (GHI) under clear sky conditions using the Ineichen model and climatological turbidity data.
2. **Cloudy Sky GHI Estimation**: Adjusts the clear sky GHI based on the provided cloud cover percentage to estimate cloudy sky GHI.
3. **Direct Normal Irradiance (DNI) Estimation**: Uses the DISC model to estimate the DNI from the adjusted GHI.
4. **Diffuse Horizontal Irradiance (DHI) Calculation**: Computes DHI from the estimated GHI and DNI values.
Args:
lat (float): Latitude of the location for irradiance estimation.
lon (float): Longitude of the location for irradiance estimation.
cloud_cover (pd.Series): Series of cloud cover values (0-100%) indexed by datetime.
offset (Optional[sint]): Baseline for GHI adjustment as a percentage (default is 35).
Returns:
tuple: Lists of estimated irradiance values in the order of GHI, DNI, and DHI.
Note:
This method is based on the implementation from PVLib and is adapted from
https://github.com/davidusb-geek/emhass/blob/master/src/emhass/forecast.py (MIT License).
"""
# Adjust offset percentage to scaling factor
offset_fraction = offset / 100.0
# Get cloud cover datetimes
cloud_cover_times = cloud_cover.index
# Create a location object
location = pvlib.location.Location(latitude=lat, longitude=lon)
# Get solar position and clear-sky GHI using the Ineichen model
solpos = location.get_solarposition(cloud_cover_times)
clear_sky = location.get_clearsky(cloud_cover_times, model="ineichen")
# Convert cloud cover percentage to a scaling factor
cloud_cover_fraction = np.array(cloud_cover) / 100.0
# Calculate adjusted GHI with proportional offset adjustment
adjusted_ghi = clear_sky["ghi"] * (
offset_fraction + (1 - offset_fraction) * (1 - cloud_cover_fraction)
)
adjusted_ghi.fillna(0.0, inplace=True)
# Apply DISC model to estimate Direct Normal Irradiance (DNI) from adjusted GHI
disc_output = pvlib.irradiance.disc(adjusted_ghi, solpos["zenith"], cloud_cover_times)
adjusted_dni = disc_output["dni"]
adjusted_dni.fillna(0.0, inplace=True)
# Calculate Diffuse Horizontal Irradiance (DHI) as DHI = GHI - DNI * cos(zenith)
zenith_rad = np.radians(solpos["zenith"])
adjusted_dhi = adjusted_ghi - adjusted_dni * np.cos(zenith_rad)
adjusted_dhi.fillna(0.0, inplace=True)
# Return GHI, DNI, DHI lists
ghi = adjusted_ghi.to_list()
dni = adjusted_dni.to_list()
dhi = adjusted_dhi.to_list()
return ghi, dni, dhi
@classmethod
def estimate_preciptable_water(
cls, temperature: pd.Series, relative_humidity: pd.Series
) -> pd.Series:
return pvlib.atmosphere.gueymard94_pw(temperature, relative_humidity)

View File

@@ -0,0 +1,229 @@
"""Retrieves and processes weather forecast data from BrightSky.
This module provides classes and mappings to manage weather data obtained from the
BrightSky API, including support for various weather attributes such as temperature,
humidity, cloud cover, and solar irradiance. The data is mapped to the `WeatherDataRecord`
format, enabling consistent access to forecasted and historical weather attributes.
"""
import json
from typing import Dict, List, Optional, Tuple
import pandas as pd
import pvlib
import requests
from akkudoktoreos.prediction.weatherabc import WeatherDataRecord, WeatherProvider
from akkudoktoreos.utils.cacheutil import cache_in_file
from akkudoktoreos.utils.datetimeutil import to_datetime
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
WheaterDataBrightSkyMapping: List[Tuple[str, Optional[str], Optional[float]]] = [
# brightsky_key, description, corr_factor
("timestamp", "DateTime", None),
("precipitation", "Precipitation Amount (mm)", 1),
("pressure_msl", "Pressure (mb)", 1),
("sunshine", None, None),
("temperature", "Temperature (°C)", 1),
("wind_direction", "Wind Direction (°)", 1),
("wind_speed", "Wind Speed (kmph)", 1),
("cloud_cover", "Total Clouds (% Sky Obscured)", 1),
("dew_point", "Dew Point (°C)", 1),
("relative_humidity", "Relative Humidity (%)", 1),
("visibility", "Visibility (m)", 1),
("wind_gust_direction", None, None),
("wind_gust_speed", None, None),
("condition", None, None),
("precipitation_probability", "Precipitation Probability (%)", 1),
("precipitation_probability_6h", None, None),
("solar", "Global Horizontal Irradiance (W/m2)", 1000),
("fallback_source_ids", None, None),
("icon", None, None),
]
"""Mapping of BrightSky weather data keys to WeatherDataRecord field descriptions.
Each tuple represents a field in the BrightSky data, with:
- The BrightSky field key,
- The corresponding `WeatherDataRecord` description, if applicable,
- A correction factor for unit or value scaling.
Fields without descriptions or correction factors are mapped to `None`.
"""
class WeatherBrightSky(WeatherProvider):
"""Fetch and process weather forecast data from BrightSky.
WeatherBrightSky is a singleton-based class that retrieves weather forecast data
from the BrightSky API and maps it to `WeatherDataRecord` fields, applying
any necessary scaling or unit corrections. It manages the forecast over a range
of hours into the future and retains historical data.
Attributes:
prediction_hours (int, optional): Number of hours in the future for the forecast.
prediction_historic_hours (int, optional): Number of past hours for retaining data.
latitude (float, optional): The latitude in degrees, validated to be between -90 and 90.
longitude (float, optional): The longitude in degrees, validated to be between -180 and 180.
start_datetime (datetime, optional): Start datetime for forecasts, defaults to the current datetime.
end_datetime (datetime, computed): The forecast's end datetime, computed based on `start_datetime` and `prediction_hours`.
keep_datetime (datetime, computed): The datetime to retain historical data, computed from `start_datetime` and `prediction_historic_hours`.
Methods:
provider_id(): Returns a unique identifier for the provider.
_request_forecast(): Fetches the forecast from the BrightSky API.
_update_data(): Processes and updates forecast data from BrightSky in WeatherDataRecord format.
"""
@classmethod
def provider_id(cls) -> str:
"""Return the unique identifier for the BrightSky provider."""
return "BrightSky"
@cache_in_file(with_ttl="1 hour")
def _request_forecast(self) -> dict:
"""Fetch weather forecast data from BrightSky API.
This method sends a request to BrightSky's API to retrieve forecast data
for a specified date range and location. The response data is parsed and
returned as JSON for further processing.
Returns:
dict: The parsed JSON response from BrightSky API containing forecast data.
Raises:
ValueError: If the API response does not include expected `weather` data.
"""
source = "https://api.brightsky.dev"
date = to_datetime(self.start_datetime, as_string="%Y-%m-%d")
last_date = to_datetime(self.end_datetime, as_string="%Y-%m-%d")
response = requests.get(
f"{source}/weather?lat={self.config.latitude}&lon={self.config.longitude}&date={date}&last_date={last_date}&tz={self.config.timezone}"
)
response.raise_for_status() # Raise an error for bad responses
logger.debug(f"Response from {source}: {response}")
brightsky_data = json.loads(response.content)
if "weather" not in brightsky_data:
error_msg = f"BrightSky schema change. `wheather`expected to be part of BrightSky data: {brightsky_data}."
logger.error(error_msg)
raise ValueError(error_msg)
# We are working on fresh data (no cache), report update time
self.update_datetime = to_datetime(in_timezone=self.config.timezone)
return brightsky_data
def _description_to_series(self, description: str) -> pd.Series:
"""Retrieve a pandas Series corresponding to a weather data description.
This method fetches the key associated with the provided description
and retrieves the data series mapped to that key. If the description
does not correspond to a valid key, a `ValueError` is raised.
Args:
description (str): The description of the WeatherDataRecord to retrieve.
Returns:
pd.Series: The data series corresponding to the description.
Raises:
ValueError: If no key is found for the provided description.
"""
key = WeatherDataRecord.key_from_description(description)
if key is None:
error_msg = f"No WeatherDataRecord key for '{description}'"
logger.error(error_msg)
raise ValueError(error_msg)
return self.key_to_series(key)
def _description_from_series(self, description: str, data: pd.Series) -> None:
"""Update a weather data with a pandas Series based on its description.
This method fetches the key associated with the provided description
and updates the weather data with the provided data series. If the description
does not correspond to a valid key, a `ValueError` is raised.
Args:
description (str): The description of the weather data to update.
data (pd.Series): The pandas Series containing the data to update.
Raises:
ValueError: If no key is found for the provided description.
"""
key = WeatherDataRecord.key_from_description(description)
if key is None:
error_msg = f"No WeatherDataRecord key for '{description}'"
logger.error(error_msg)
raise ValueError(error_msg)
self.key_from_series(key, data)
def _update_data(self, force_update: Optional[bool] = False) -> None:
"""Update forecast data in the WeatherDataRecord format.
Retrieves data from BrightSky, maps each BrightSky field to the corresponding
`WeatherDataRecord` attribute using `WheaterDataBrightSkyMapping`, and applies
any necessary scaling. Forecast data such as cloud cover, temperature, and
humidity is further processed to estimate solar irradiance and precipitable water.
The final mapped and processed data is inserted into the sequence as `WeatherDataRecord`.
"""
# Get BrightSky weather data for the given coordinates
brightsky_data = self._request_forecast(force_update=force_update) # type: ignore
# Get key mapping from description
brightsky_key_mapping: Dict[str, Tuple[Optional[str], Optional[float]]] = {}
for brightsky_key, description, corr_factor in WheaterDataBrightSkyMapping:
if description is None:
brightsky_key_mapping[brightsky_key] = (None, None)
continue
weatherdata_key = WeatherDataRecord.key_from_description(description)
if weatherdata_key is None:
# Should not happen
error_msg = "No WeatherDataRecord key for 'description'"
logger.error(error_msg)
raise ValueError(error_msg)
brightsky_key_mapping[brightsky_key] = (weatherdata_key, corr_factor)
for brightsky_record in brightsky_data["weather"]:
weather_record = WeatherDataRecord()
for brightsky_key, item in brightsky_key_mapping.items():
key = item[0]
if key is None:
continue
value = brightsky_record[brightsky_key]
corr_factor = item[1]
if value and corr_factor:
value = value * corr_factor
setattr(weather_record, key, value)
self.insert_by_datetime(weather_record)
# Converting the cloud cover into Irradiance (GHI, DNI, DHI)
description = "Total Clouds (% Sky Obscured)"
cloud_cover = self._description_to_series(description)
ghi, dni, dhi = self.estimate_irradiance_from_cloud_cover(
self.config.latitude, self.config.longitude, cloud_cover
)
description = "Global Horizontal Irradiance (W/m2)"
ghi = pd.Series(data=ghi, index=cloud_cover.index)
self._description_from_series(description, ghi)
description = "Direct Normal Irradiance (W/m2)"
dni = pd.Series(data=dni, index=cloud_cover.index)
self._description_from_series(description, dni)
description = "Diffuse Horizontal Irradiance (W/m2)"
dhi = pd.Series(data=dhi, index=cloud_cover.index)
self._description_from_series(description, dhi)
# Add Preciptable Water (PWAT) with a PVLib method.
description = "Temperature (°C)"
temperature = self._description_to_series(description)
description = "Relative Humidity (%)"
humidity = self._description_to_series(description)
pwat = pd.Series(
data=pvlib.atmosphere.gueymard94_pw(temperature, humidity), index=temperature.index
)
description = "Preciptable Water (cm)"
self._description_from_series(description, pwat)

View File

@@ -0,0 +1,342 @@
"""Weather Forecast.
This module provides classes and methods to retrieve, manage, and process weather forecast data
from various online sources. It includes structured representations of weather data and utilities
for fetching forecasts for specific locations and time ranges. By integrating multiple data sources,
the module enables flexible access to weather information based on latitude, longitude, and
desired time periods.
Notes:
- Supported weather sources can be expanded by adding new fetch methods within the
WeatherForecast class.
- Ensure appropriate API keys or configurations are set up if required by external data sources.
"""
import re
from typing import Dict, List, Optional, Tuple
import pandas as pd
import requests
from bs4 import BeautifulSoup
from akkudoktoreos.prediction.weatherabc import WeatherDataRecord, WeatherProvider
from akkudoktoreos.utils.cacheutil import cache_in_file
from akkudoktoreos.utils.datetimeutil import to_datetime, to_duration, to_timezone
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
WheaterDataClearOutsideMapping: List[Tuple[str, Optional[str], Optional[float]]] = [
# clearoutside_key, description, corr_factor
("DateTime", "DateTime", None),
("Total Clouds (% Sky Obscured)", "Total Clouds (% Sky Obscured)", 1),
("Low Clouds (% Sky Obscured)", "Low Clouds (% Sky Obscured)", 1),
("Medium Clouds (% Sky Obscured)", "Medium Clouds (% Sky Obscured)", 1),
("High Clouds (% Sky Obscured)", "High Clouds (% Sky Obscured)", 1),
("ISS Passover", None, None),
("Visibility (miles)", "Visibility (m)", 1609.34),
("Fog (%)", "Fog (%)", 1),
("Precipitation Type", "Precipitation Type", None),
("Precipitation Probability (%)", "Precipitation Probability (%)", 1),
("Precipitation Amount (mm)", "Precipitation Amount (mm)", 1),
("Wind Speed (mph)", "Wind Speed (kmph)", 1.60934),
("Chance of Frost", "Chance of Frost", None),
("Temperature (°C)", "Temperature (°C)", 1),
("Feels Like (°C)", "Feels Like (°C)", 1),
("Dew Point (°C)", "Dew Point (°C)", 1),
("Relative Humidity (%)", "Relative Humidity (%)", 1),
("Pressure (mb)", "Pressure (mb)", 1),
("Ozone (du)", "Ozone (du)", 1),
# Extra extraction
("Wind Direction (°)", "Wind Direction (°)", 1),
# Generated from above
("Preciptable Water (cm)", "Preciptable Water (cm)", 1),
("Global Horizontal Irradiance (W/m2)", "Global Horizontal Irradiance (W/m2)", 1),
("Direct Normal Irradiance (W/m2)", "Direct Normal Irradiance (W/m2)", 1),
("Diffuse Horizontal Irradiance (W/m2)", "Diffuse Horizontal Irradiance (W/m2)", 1),
]
"""Mapping of ClearOutside weather data keys to WeatherDataRecord field description.
A list of tuples: (ClearOutside key, field description, correction factor).
"""
class WeatherClearOutside(WeatherProvider):
"""Retrieves and processes weather forecast data from ClearOutside.
WeatherClearOutside is a thread-safe singleton, ensuring only one instance of this class is created.
Attributes:
prediction_hours (int, optional): The number of hours into the future for which predictions are generated.
prediction_historic_hours (int, optional): The number of past hours for which historical data is retained.
latitude (float, optional): The latitude in degrees, must be within -90 to 90.
longitude (float, optional): The longitude in degrees, must be within -180 to 180.
start_datetime (datetime, optional): The starting datetime for predictions, defaults to the current datetime if unspecified.
end_datetime (datetime, computed): The datetime representing the end of the prediction range,
calculated based on `start_datetime` and `prediction_hours`.
keep_datetime (datetime, computed): The earliest datetime for retaining historical data, calculated
based on `start_datetime` and `prediction_historic_hours`.
"""
@classmethod
def provider_id(cls) -> str:
return "ClearOutside"
@cache_in_file(with_ttl="1 hour")
def _request_forecast(self) -> requests.Response:
"""Requests weather forecast from ClearOutside.
Returns:
response: Weather forecast request reponse from ClearOutside.
"""
source = "https://clearoutside.com/forecast"
latitude = round(self.config.latitude, 2)
longitude = round(self.config.longitude, 2)
response = requests.get(f"{source}/{latitude}/{longitude}?desktop=true")
response.raise_for_status() # Raise an error for bad responses
logger.debug(f"Response from {source}: {response}")
# We are working on fresh data (no cache), report update time
self.update_datetime = to_datetime(in_timezone=self.config.timezone)
return response
def _update_data(self, force_update: Optional[bool] = None) -> None:
"""Scrape weather forecast data from ClearOutside's website.
This method requests weather forecast data from ClearOutside based on latitude
and longitude, then processes and structures this data for further use in analysis.
The forecast data includes a variety of weather parameters such as cloud cover, temperature,
humidity, visibility, precipitation, wind speed, and additional irradiance values
calculated using the cloud cover data.
Raises:
ValueError: If the HTML structure of ClearOutside's website changes, causing
extraction issues with forecast dates, timezone, or expected data sections.
Note:
- The function partly builds on code from https://github.com/davidusb-geek/emhass/blob/master/src/emhass/forecast.py (MIT License).
- Uses `pvlib` to estimate irradiance (GHI, DNI, DHI) based on cloud cover data.
Workflow:
1. **Retrieve Web Content**: Uses a helper method to fetch or retrieve cached ClearOutside HTML content.
2. **Extract Forecast Date and Timezone**:
- Parses the forecast's start and end dates and the UTC offset from the "Generated" header.
3. **Extract Weather Data**:
- For each day in the 7-day forecast, the function finds detailed weather parameters
and associates values for each hour.
- Parameters include cloud cover, temperature, humidity, visibility, and precipitation type, among others.
4. **Irradiance Calculation**:
- Calculates irradiance (GHI, DNI, DHI) values using cloud cover data and the `pvlib` library.
5. **Store Data**:
- Combines all hourly data into `WeatherDataRecord` objects, with keys
standardized according to `WeatherDataRecord` attributes.
"""
# Get ClearOutside web content - either from site or cached
response = self._request_forecast(force_update=force_update) # type: ignore
# Scrape the data
soup = BeautifulSoup(response.content, "html.parser")
# Find generation data
p_generated = soup.find("h2", string=lambda text: text and text.startswith("Generated:"))
if not p_generated:
error_msg = f"Clearoutside schema change. Could not get '<h2>Generated:', got {p_generated} from {str(response.content)}."
logger.error(error_msg)
raise ValueError(error_msg)
# Extract forecast start and end dates
forecast_pattern = r"Forecast: (\d{2}/\d{2}/\d{2}) to (\d{2}/\d{2}/\d{2})"
forecast_match = re.search(forecast_pattern, p_generated.get_text())
if forecast_match:
forecast_start_date = forecast_match.group(1)
forecast_end_date = forecast_match.group(2)
else:
error_msg = f"Clearoutside schema change. Could not extract forecast start and end dates from {p_generated}."
logger.error(error_msg)
raise ValueError(error_msg)
# Extract timezone offset
timezone_pattern = r"Timezone: UTC([+-]\d+)\.(\d+)"
timezone_match = re.search(timezone_pattern, p_generated.get_text())
if timezone_match:
hours = int(timezone_match.group(1))
# Convert the decimal part to minutes (e.g., .50 -> 30 minutes)
minutes = int(timezone_match.group(2)) * 6 # Multiply by 6 to convert to minutes
# Create the timezone object using offset
utc_offset = float(hours) + float(minutes) / 60.0
forecast_timezone = to_timezone(utc_offset=utc_offset)
else:
error_msg = "Clearoutside schema change. Could not extract forecast timezone."
logger.error(error_msg)
raise ValueError(error_msg)
forecast_start_datetime = to_datetime(
forecast_start_date, in_timezone=forecast_timezone, to_maxtime=False
)
# Get key mapping from description
clearoutside_key_mapping: Dict[str, Tuple[Optional[str], Optional[float]]] = {}
for clearoutside_key, description, corr_factor in WheaterDataClearOutsideMapping:
if description is None:
clearoutside_key_mapping[clearoutside_key] = (None, None)
continue
weatherdata_key = WeatherDataRecord.key_from_description(description)
if weatherdata_key is None:
# Should not happen
error_msg = f"No WeatherDataRecord key for '{description}'"
logger.error(error_msg)
raise ValueError(error_msg)
clearoutside_key_mapping[clearoutside_key] = (weatherdata_key, corr_factor)
# Find all paragraphs with id 'day_<x>'. There should be seven.
p_days = soup.find_all(id=re.compile(r"day_[0-9]"))
if len(p_days) != 7:
error_msg = f"Clearoutside schema change. Found {len(p_days)} day tables, expected 7."
logger.error(error_msg)
raise ValueError(error_msg)
# Delete all records that will be newly added
self.delete_by_datetime(start_datetime=forecast_start_datetime)
# Collect weather data, loop over all days
for day, p_day in enumerate(p_days):
# Within day_x paragraph find the details labels
p_detail_labels = p_day.find_all(class_="fc_detail_label")
detail_names = [p.get_text() for p in p_detail_labels]
# Check for schema changes
if len(detail_names) < 18:
error_msg = f"Clearoutside schema change. Unexpected number ({len(detail_names)}) of `fc_detail_label`."
logger.error(error_msg)
raise ValueError(error_msg)
for detail_name in detail_names:
if detail_name not in clearoutside_key_mapping:
warning_msg = (
f"Clearoutside schema change. Unexpected detail name {detail_name}."
)
logger.warning(warning_msg)
# Find all the paragraphs that are associated to the details.
# Beware there is one ul paragraph before that is not associated to a detail
p_detail_tables = p_day.find_all("ul")
if len(p_detail_tables) != len(detail_names) + 1:
error_msg = f"Clearoutside schema change. Unexpected number ({p_detail_tables}) of `ul` for details {len(detail_names)}. Should be one extra only."
logger.error(error_msg)
raise ValueError(error_msg)
p_detail_tables.pop(0)
# Create clearout data
clearout_data = {}
# Replace some detail names that we use differently
detail_names = [
s.replace("Wind Speed/Direction (mph)", "Wind Speed (mph)") for s in detail_names
]
# Number of detail values. On last day may be less than 24.
detail_values_count = None
# Add data values
scrape_detail_names = detail_names.copy() # do not change list during iteration!
for i, detail_name in enumerate(scrape_detail_names):
p_detail_values = p_detail_tables[i].find_all("li")
# Assure the number of values fits
p_detail_values_count = len(p_detail_values)
if (day == 6 and p_detail_values_count > 24) or (
day < 6 and p_detail_values_count != 24
):
error_msg = f"Clearoutside schema change. Unexpected number ({p_detail_values_count}) of `li` for detail `{detail_name}` data. Should be 24 or less on day 7. Table is `{p_detail_tables[i]}`."
logger.error(error_msg)
raise ValueError(error_msg)
if detail_values_count is None:
# Remember detail values count only once
detail_values_count = p_detail_values_count
if p_detail_values_count != detail_values_count:
# Value count for details differ.
error_msg = f"Clearoutside schema change. Number ({p_detail_values_count}) of `li` for detail `{detail_name}` data is different than last one {detail_values_count}. Table is `{p_detail_tables[i]}`."
logger.error(error_msg)
raise ValueError(error_msg)
# Scrape the detail values
detail_data = []
extra_detail_name = None
extra_detail_data = []
for p_detail_value in p_detail_values:
if detail_name == "Wind Speed (mph)":
# Get the usual value
value_str = p_detail_value.get_text()
# Also extract extra data
extra_detail_name = "Wind Direction (°)"
extra_value = None
match = re.search(r"(\d+)°", str(p_detail_value))
if match:
extra_value = float(match.group(1))
else:
error_msg = f"Clearoutside schema change. Can't extract direction angle from `{p_detail_value}` for detail `{extra_detail_name}`. Table is `{p_detail_tables[i]}`."
logger.error(error_msg)
raise ValueError(error_msg)
extra_detail_data.append(extra_value)
elif (
detail_name in ("Precipitation Type", "Chance of Frost")
and hasattr(p_detail_value, "title")
and p_detail_value.title
):
value_str = p_detail_value.title.string
else:
value_str = p_detail_value.get_text()
try:
value = float(value_str)
except ValueError:
value = value_str
detail_data.append(value)
clearout_data[detail_name] = detail_data
if extra_detail_name:
if extra_detail_name not in detail_names:
detail_names.append(extra_detail_name)
clearout_data[extra_detail_name] = extra_detail_data
logger.debug(f"Added extra data {extra_detail_name} with {extra_detail_data}")
# Add datetimes of the scrapped data
clearout_data["DateTime"] = [
forecast_start_datetime + to_duration(f"{day} days {i} hours")
for i in range(0, detail_values_count) # type: ignore[arg-type]
]
detail_names.append("DateTime")
# Converting the cloud cover into Irradiance (GHI, DNI, DHI)
cloud_cover = pd.Series(
data=clearout_data["Total Clouds (% Sky Obscured)"], index=clearout_data["DateTime"]
)
ghi, dni, dhi = self.estimate_irradiance_from_cloud_cover(
self.config.latitude, self.config.longitude, cloud_cover
)
# Add GHI, DNI, DHI to clearout data
clearout_data["Global Horizontal Irradiance (W/m2)"] = ghi
detail_names.append("Global Horizontal Irradiance (W/m2)")
clearout_data["Direct Normal Irradiance (W/m2)"] = dni
detail_names.append("Direct Normal Irradiance (W/m2)")
clearout_data["Diffuse Horizontal Irradiance (W/m2)"] = dhi
detail_names.append("Diffuse Horizontal Irradiance (W/m2)")
# Add Preciptable Water (PWAT) with a PVLib method.
clearout_data["Preciptable Water (cm)"] = self.estimate_preciptable_water(
pd.Series(data=clearout_data["Temperature (°C)"]),
pd.Series(data=clearout_data["Relative Humidity (%)"]),
).to_list()
detail_names.append("Preciptable Water (cm)")
# Add weather data
# Add the records from clearout
for row_index in range(0, len(clearout_data["DateTime"])):
weather_record = WeatherDataRecord()
for detail_name in detail_names:
key = clearoutside_key_mapping[detail_name][0]
if key is None:
continue
if detail_name in clearout_data:
value = clearout_data[detail_name][row_index]
corr_factor = clearoutside_key_mapping[detail_name][1]
if corr_factor:
value = value * corr_factor
setattr(weather_record, key, value)
self.insert_by_datetime(weather_record)

View File

@@ -0,0 +1,65 @@
"""Retrieves weather forecast data from an import file.
This module provides classes and mappings to manage weather data obtained from
an import file, including support for various weather attributes such as temperature,
humidity, cloud cover, and solar irradiance. The data is mapped to the `WeatherDataRecord`
format, enabling consistent access to forecasted and historical weather attributes.
"""
from pathlib import Path
from typing import Optional, Union
from pydantic import Field, field_validator
from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.prediction.predictionabc import PredictionImportProvider
from akkudoktoreos.prediction.weatherabc import WeatherProvider
from akkudoktoreos.utils.logutil import get_logger
logger = get_logger(__name__)
class WeatherImportCommonSettings(SettingsBaseModel):
"""Common settings for weather data import from file."""
weatherimport_file_path: Optional[Union[str, Path]] = Field(
default=None, description="Path to the file to import weather data from."
)
weatherimport_json: Optional[str] = Field(
default=None, description="JSON string, dictionary of weather forecast value lists."
)
# Validators
@field_validator("weatherimport_file_path", mode="after")
@classmethod
def validate_weatherimport_file_path(cls, value: Optional[Union[str, Path]]) -> Optional[Path]:
if value is None:
return None
if isinstance(value, str):
value = Path(value)
"""Ensure file is available."""
value.resolve()
if not value.is_file():
raise ValueError(f"Import file path '{value}' is not a file.")
return value
class WeatherImport(WeatherProvider, PredictionImportProvider):
"""Fetch weather forecast data from import file or JSON string.
WeatherImport is a singleton-based class that retrieves weather forecast data
from a file or JSON string and maps it to `WeatherDataRecord` fields. It manages the forecast
over a range of hours into the future and retains historical data.
"""
@classmethod
def provider_id(cls) -> str:
"""Return the unique identifier for the WeatherImport provider."""
return "WeatherImport"
def _update_data(self, force_update: Optional[bool] = False) -> None:
if self.config.weatherimport_file_path is not None:
self.import_from_file(self.config.weatherimport_file_path, key_prefix="weather")
if self.config.weatherimport_json is not None:
self.import_from_json(self.config.weatherimport_json, key_prefix="weather")