Files
EOS/src/akkudoktoreos/prediction/elecpriceakkudoktor.py

259 lines
11 KiB
Python
Raw Normal View History

"""Retrieves and processes electricity price forecast data from Akkudoktor.
This module provides classes and mappings to manage electricity price data obtained from the
Akkudoktor API, including support for various electricity price attributes such as temperature,
humidity, cloud cover, and solar irradiance. The data is mapped to the `ElecPriceDataRecord`
format, enabling consistent access to forecasted and historical electricity price attributes.
"""
2025-01-08 17:24:09 +01:00
from typing import Any, List, Optional, Union
Fix2 config and predictions revamp. (#281) measurement: - Add new measurement class to hold real world measurements. - Handles load meter readings, grid import and export meter readings. - Aggregates load meter readings aka. measurements to total load. - Can import measurements from files, pandas datetime series, pandas datetime dataframes, simple daetime arrays and programmatically. - Maybe expanded to other measurement values. - Should be used for load prediction adaptions by real world measurements. core/coreabc: - Add mixin class to access measurements core/pydantic: - Add pydantic models for pandas datetime series and dataframes. - Add pydantic models for simple datetime array core/dataabc: - Provide DataImport mixin class for generic import handling. Imports from JSON string and files. Imports from pandas datetime dataframes and simple datetime arrays. Signature of import method changed to allow import datetimes to be given programmatically and by data content. - Use pydantic models for datetime series, dataframes, arrays - Validate generic imports by pydantic models - Provide new attributes min_datetime and max_datetime for DataSequence. - Add parameter dropna to drop NAN/ None values when creating lists, pandas series or numpy array from DataSequence. config/config: - Add common settings for the measurement module. predictions/elecpriceakkudoktor: - Use mean values of last 7 days to fill prediction values not provided by akkudoktor.net (only provides 24 values). prediction/loadabc: - Extend the generic prediction keys by 'load_total_adjusted' for load predictions that adjust the predicted total load by measured load values. prediction/loadakkudoktor: - Extend the Akkudoktor load prediction by load adjustment using measured load values. prediction/load_aggregator: - Module removed. Load aggregation is now handled by the measurement module. prediction/load_corrector: - Module removed. Load correction (aka. adjustment of load prediction by measured load energy) is handled by the LoadAkkudoktor prediction and the generic 'load_mean_adjusted' prediction key. prediction/load_forecast: - Module removed. Functionality now completely handled by the LoadAkkudoktor prediction. utils/cacheutil: - Use pydantic. - Fix potential bug in ttl (time to live) duration handling. utils/datetimeutil: - Added missing handling of pendulum.DateTime and pendulum.Duration instances as input. Handled before as datetime.datetime and datetime.timedelta. utils/visualize: - Move main to generate_example_report() for better testing support. server/server: - Added new configuration option server_fastapi_startup_server_fasthtml to make startup of FastHTML server by FastAPI server conditional. server/fastapi_server: - Add APIs for measurements - Improve APIs to provide or take pandas datetime series and datetime dataframes controlled by pydantic model. - Improve APIs to provide or take simple datetime data arrays controlled by pydantic model. - Move fastAPI server API to v1 for new APIs. - Update pre v1 endpoints to use new prediction and measurement capabilities. - Only start FastHTML server if 'server_fastapi_startup_server_fasthtml' config option is set. tests: - Adapt import tests to changed import method signature - Adapt server test to use the v1 API - Extend the dataabc test to test for array generation from data with several data interval scenarios. - Extend the datetimeutil test to also test for correct handling of to_datetime() providing now(). - Adapt LoadAkkudoktor test for new adjustment calculation. - Adapt visualization test to use example report function instead of visualize.py run as process. - Removed test_load_aggregator. Functionality is now tested in test_measurement. - Added tests for measurement module docs: - Remove sphinxcontrib-openapi as it prevents build of documentation. "site-packages/sphinxcontrib/openapi/openapi31.py", line 305, in _get_type_from_schema for t in schema["anyOf"]: KeyError: 'anyOf'" Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
2024-12-29 18:42:49 +01:00
import numpy as np
2025-01-08 17:24:09 +01:00
import pandas as pd
import requests
from loguru import logger
2025-01-07 01:09:07 +01:00
from pydantic import ValidationError
2025-01-07 00:30:53 +01:00
from statsmodels.tsa.holtwinters import ExponentialSmoothing
Improve caching. (#431) * Move the caching module to core. Add an in memory cache that for caching function and method results during an energy management run (optimization run). Two decorators are provided for methods and functions. * Improve the file cache store by load and save functions. Make EOS load the cache file store on startup and save it on shutdown. Add a cyclic task that cleans the cache file store from outdated cache files. * Improve startup of EOSdash by EOS Make EOS starting EOSdash adhere to path configuration given in EOS. The whole environment from EOS is now passed to EOSdash. Should also prevent test errors due to unwanted/ wrong config file creation. Both servers now provide a health endpoint that can be used to detect whether the server is running. This is also used for testing now. * Improve startup of EOS EOS now has got an energy management task that runs shortly after startup. It tries to execute energy management runs with predictions newly fetched or initialized from cached data on first run. * Improve shutdown of EOS EOS has now a shutdown task that shuts EOS down gracefully with some time delay to allow REST API requests for shutdwon or restart to be fully serviced. * Improve EMS Add energy management task for repeated energy management controlled by startup delay and interval configuration parameters. Translate EnergieManagementSystem to english EnergyManagement. * Add administration endpoints - endpoints to control caching from REST API. - endpoints to control server restart (will not work on Windows) and shutdown from REST API * Improve doc generation Use "\n" linenend convention also on Windows when generating doc files. Replace Windows specific 127.0.0.1 address by standard 0.0.0.0. * Improve test support (to be able to test caching) - Add system test option to pytest for running tests with "real" resources - Add new test fixture to start server for test class and test function - Make kill signal adapt to Windows/ Linux - Use consistently "\n" for lineends when writing text files in doc test - Fix test_logging under Windows - Fix conftest config_default_dirs test fixture under Windows From @Lasall * Improve Windows support - Use 127.0.0.1 as default config host (model defaults) and addionally redirect 0.0.0.0 to localhost on Windows (because default config file still has 0.0.0.0). - Update install/startup instructions as package installation is required atm. Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
2025-02-12 21:35:51 +01:00
from akkudoktoreos.core.cache import cache_in_file
from akkudoktoreos.core.pydantic import PydanticBaseModel
2025-01-08 17:24:09 +01:00
from akkudoktoreos.prediction.elecpriceabc import ElecPriceProvider
2025-01-07 00:07:56 +01:00
from akkudoktoreos.utils.datetimeutil import to_datetime, to_duration
class AkkudoktorElecPriceMeta(PydanticBaseModel):
2025-01-02 14:49:31 +01:00
start_timestamp: str
end_timestamp: str
start: str
end: str
class AkkudoktorElecPriceValue(PydanticBaseModel):
start_timestamp: int
end_timestamp: int
start: str
end: str
marketprice: float
unit: str
marketpriceEurocentPerKWh: float
class AkkudoktorElecPrice(PydanticBaseModel):
meta: AkkudoktorElecPriceMeta
values: List[AkkudoktorElecPriceValue]
class ElecPriceAkkudoktor(ElecPriceProvider):
"""Fetch and process electricity price forecast data from Akkudoktor.
ElecPriceAkkudoktor is a singleton-based class that retrieves electricity price forecast data
from the Akkudoktor API and maps it to `ElecPriceDataRecord` fields, applying
any necessary scaling or unit corrections. It manages the forecast over a range
of hours into the future and retains historical data.
Attributes:
hours (int, optional): Number of hours in the future for the forecast.
historic_hours (int, optional): Number of past hours for retaining data.
start_datetime (datetime, optional): Start datetime for forecasts, defaults to the current datetime.
end_datetime (datetime, computed): The forecast's end datetime, computed based on `start_datetime` and `hours`.
keep_datetime (datetime, computed): The datetime to retain historical data, computed from `start_datetime` and `historic_hours`.
Methods:
provider_id(): Returns a unique identifier for the provider.
_request_forecast(): Fetches the forecast from the Akkudoktor API.
_update_data(): Processes and updates forecast data from Akkudoktor in ElecPriceDataRecord format.
"""
@classmethod
def provider_id(cls) -> str:
"""Return the unique identifier for the Akkudoktor provider."""
return "ElecPriceAkkudoktor"
@classmethod
def _validate_data(cls, json_str: Union[bytes, Any]) -> AkkudoktorElecPrice:
"""Validate Akkudoktor Electricity Price forecast data."""
try:
akkudoktor_data = AkkudoktorElecPrice.model_validate_json(json_str)
except ValidationError as e:
error_msg = ""
for error in e.errors():
field = " -> ".join(str(x) for x in error["loc"])
message = error["msg"]
error_type = error["type"]
error_msg += f"Field: {field}\nError: {message}\nType: {error_type}\n"
logger.error(f"Akkudoktor schema change: {error_msg}")
raise ValueError(error_msg)
return akkudoktor_data
@cache_in_file(with_ttl="1 hour")
def _request_forecast(self) -> AkkudoktorElecPrice:
"""Fetch electricity price forecast data from Akkudoktor API.
This method sends a request to Akkudoktor's API to retrieve forecast data for a specified
date range. The response data is parsed and returned as JSON for further processing.
Returns:
dict: The parsed JSON response from Akkudoktor API containing forecast data.
Raises:
ValueError: If the API response does not include expected `electricity price` data.
2025-01-07 18:29:50 +01:00
Todo:
- add the file cache again.
"""
source = "https://api.akkudoktor.net"
if not self.start_datetime:
raise ValueError(f"Start DateTime not set: {self.start_datetime}")
2025-01-07 00:07:56 +01:00
# Try to take data from 5 weeks back for prediction
date = to_datetime(self.start_datetime - to_duration("35 days"), as_string="YYYY-MM-DD")
2025-01-02 14:49:31 +01:00
last_date = to_datetime(self.end_datetime, as_string="YYYY-MM-DD")
url = f"{source}/prices?start={date}&end={last_date}&tz={self.config.general.timezone}"
response = requests.get(url, timeout=10)
Fix2 config and predictions revamp. (#281) measurement: - Add new measurement class to hold real world measurements. - Handles load meter readings, grid import and export meter readings. - Aggregates load meter readings aka. measurements to total load. - Can import measurements from files, pandas datetime series, pandas datetime dataframes, simple daetime arrays and programmatically. - Maybe expanded to other measurement values. - Should be used for load prediction adaptions by real world measurements. core/coreabc: - Add mixin class to access measurements core/pydantic: - Add pydantic models for pandas datetime series and dataframes. - Add pydantic models for simple datetime array core/dataabc: - Provide DataImport mixin class for generic import handling. Imports from JSON string and files. Imports from pandas datetime dataframes and simple datetime arrays. Signature of import method changed to allow import datetimes to be given programmatically and by data content. - Use pydantic models for datetime series, dataframes, arrays - Validate generic imports by pydantic models - Provide new attributes min_datetime and max_datetime for DataSequence. - Add parameter dropna to drop NAN/ None values when creating lists, pandas series or numpy array from DataSequence. config/config: - Add common settings for the measurement module. predictions/elecpriceakkudoktor: - Use mean values of last 7 days to fill prediction values not provided by akkudoktor.net (only provides 24 values). prediction/loadabc: - Extend the generic prediction keys by 'load_total_adjusted' for load predictions that adjust the predicted total load by measured load values. prediction/loadakkudoktor: - Extend the Akkudoktor load prediction by load adjustment using measured load values. prediction/load_aggregator: - Module removed. Load aggregation is now handled by the measurement module. prediction/load_corrector: - Module removed. Load correction (aka. adjustment of load prediction by measured load energy) is handled by the LoadAkkudoktor prediction and the generic 'load_mean_adjusted' prediction key. prediction/load_forecast: - Module removed. Functionality now completely handled by the LoadAkkudoktor prediction. utils/cacheutil: - Use pydantic. - Fix potential bug in ttl (time to live) duration handling. utils/datetimeutil: - Added missing handling of pendulum.DateTime and pendulum.Duration instances as input. Handled before as datetime.datetime and datetime.timedelta. utils/visualize: - Move main to generate_example_report() for better testing support. server/server: - Added new configuration option server_fastapi_startup_server_fasthtml to make startup of FastHTML server by FastAPI server conditional. server/fastapi_server: - Add APIs for measurements - Improve APIs to provide or take pandas datetime series and datetime dataframes controlled by pydantic model. - Improve APIs to provide or take simple datetime data arrays controlled by pydantic model. - Move fastAPI server API to v1 for new APIs. - Update pre v1 endpoints to use new prediction and measurement capabilities. - Only start FastHTML server if 'server_fastapi_startup_server_fasthtml' config option is set. tests: - Adapt import tests to changed import method signature - Adapt server test to use the v1 API - Extend the dataabc test to test for array generation from data with several data interval scenarios. - Extend the datetimeutil test to also test for correct handling of to_datetime() providing now(). - Adapt LoadAkkudoktor test for new adjustment calculation. - Adapt visualization test to use example report function instead of visualize.py run as process. - Removed test_load_aggregator. Functionality is now tested in test_measurement. - Added tests for measurement module docs: - Remove sphinxcontrib-openapi as it prevents build of documentation. "site-packages/sphinxcontrib/openapi/openapi31.py", line 305, in _get_type_from_schema for t in schema["anyOf"]: KeyError: 'anyOf'" Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
2024-12-29 18:42:49 +01:00
logger.debug(f"Response from {url}: {response}")
response.raise_for_status() # Raise an error for bad responses
akkudoktor_data = self._validate_data(response.content)
# We are working on fresh data (no cache), report update time
self.update_datetime = to_datetime(in_timezone=self.config.general.timezone)
return akkudoktor_data
2025-01-07 00:30:53 +01:00
def _cap_outliers(self, data: np.ndarray, sigma: int = 2) -> np.ndarray:
mean = data.mean()
std = data.std()
lower_bound = mean - sigma * std
upper_bound = mean + sigma * std
capped_data = data.clip(min=lower_bound, max=upper_bound)
return capped_data
def _predict_ets(self, history: np.ndarray, seasonal_periods: int, hours: int) -> np.ndarray:
2025-01-07 00:30:53 +01:00
clean_history = self._cap_outliers(history)
model = ExponentialSmoothing(
clean_history, seasonal="add", seasonal_periods=seasonal_periods
).fit()
return model.forecast(hours)
2025-01-07 00:07:56 +01:00
def _predict_median(self, history: np.ndarray, hours: int) -> np.ndarray:
2025-01-07 00:44:57 +01:00
clean_history = self._cap_outliers(history)
return np.full(hours, np.median(clean_history))
2025-01-07 00:44:57 +01:00
2025-01-07 17:26:20 +01:00
def _update_data(
self, force_update: Optional[bool] = False
2025-01-08 23:20:35 +01:00
) -> None: # tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Update forecast data in the ElecPriceDataRecord format.
Retrieves data from Akkudoktor, maps each Akkudoktor field to the corresponding
`ElecPriceDataRecord` and applies any necessary scaling.
The final mapped and processed data is inserted into the sequence as `ElecPriceDataRecord`.
"""
# Get Akkudoktor electricity price data
akkudoktor_data = self._request_forecast(force_update=force_update) # type: ignore
if not self.start_datetime:
raise ValueError(f"Start DateTime not set: {self.start_datetime}")
2025-01-08 17:24:09 +01:00
# Assumption that all lists are the same length and are ordered chronologically
# in ascending order and have the same timestamps.
Fix2 config and predictions revamp. (#281) measurement: - Add new measurement class to hold real world measurements. - Handles load meter readings, grid import and export meter readings. - Aggregates load meter readings aka. measurements to total load. - Can import measurements from files, pandas datetime series, pandas datetime dataframes, simple daetime arrays and programmatically. - Maybe expanded to other measurement values. - Should be used for load prediction adaptions by real world measurements. core/coreabc: - Add mixin class to access measurements core/pydantic: - Add pydantic models for pandas datetime series and dataframes. - Add pydantic models for simple datetime array core/dataabc: - Provide DataImport mixin class for generic import handling. Imports from JSON string and files. Imports from pandas datetime dataframes and simple datetime arrays. Signature of import method changed to allow import datetimes to be given programmatically and by data content. - Use pydantic models for datetime series, dataframes, arrays - Validate generic imports by pydantic models - Provide new attributes min_datetime and max_datetime for DataSequence. - Add parameter dropna to drop NAN/ None values when creating lists, pandas series or numpy array from DataSequence. config/config: - Add common settings for the measurement module. predictions/elecpriceakkudoktor: - Use mean values of last 7 days to fill prediction values not provided by akkudoktor.net (only provides 24 values). prediction/loadabc: - Extend the generic prediction keys by 'load_total_adjusted' for load predictions that adjust the predicted total load by measured load values. prediction/loadakkudoktor: - Extend the Akkudoktor load prediction by load adjustment using measured load values. prediction/load_aggregator: - Module removed. Load aggregation is now handled by the measurement module. prediction/load_corrector: - Module removed. Load correction (aka. adjustment of load prediction by measured load energy) is handled by the LoadAkkudoktor prediction and the generic 'load_mean_adjusted' prediction key. prediction/load_forecast: - Module removed. Functionality now completely handled by the LoadAkkudoktor prediction. utils/cacheutil: - Use pydantic. - Fix potential bug in ttl (time to live) duration handling. utils/datetimeutil: - Added missing handling of pendulum.DateTime and pendulum.Duration instances as input. Handled before as datetime.datetime and datetime.timedelta. utils/visualize: - Move main to generate_example_report() for better testing support. server/server: - Added new configuration option server_fastapi_startup_server_fasthtml to make startup of FastHTML server by FastAPI server conditional. server/fastapi_server: - Add APIs for measurements - Improve APIs to provide or take pandas datetime series and datetime dataframes controlled by pydantic model. - Improve APIs to provide or take simple datetime data arrays controlled by pydantic model. - Move fastAPI server API to v1 for new APIs. - Update pre v1 endpoints to use new prediction and measurement capabilities. - Only start FastHTML server if 'server_fastapi_startup_server_fasthtml' config option is set. tests: - Adapt import tests to changed import method signature - Adapt server test to use the v1 API - Extend the dataabc test to test for array generation from data with several data interval scenarios. - Extend the datetimeutil test to also test for correct handling of to_datetime() providing now(). - Adapt LoadAkkudoktor test for new adjustment calculation. - Adapt visualization test to use example report function instead of visualize.py run as process. - Removed test_load_aggregator. Functionality is now tested in test_measurement. - Added tests for measurement module docs: - Remove sphinxcontrib-openapi as it prevents build of documentation. "site-packages/sphinxcontrib/openapi/openapi31.py", line 305, in _get_type_from_schema for t in schema["anyOf"]: KeyError: 'anyOf'" Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
2024-12-29 18:42:49 +01:00
# Get charges_kwh in wh
charges_wh = (self.config.elecprice.charges_kwh or 0) / 1000
2025-01-07 14:33:10 +01:00
highest_orig_datetime = None # newest datetime from the api after that we want to update.
2025-01-08 17:24:09 +01:00
series_data = pd.Series(dtype=float) # Initialize an empty series
2025-01-07 14:33:10 +01:00
for value in akkudoktor_data.values:
orig_datetime = to_datetime(value.start, in_timezone=self.config.general.timezone)
2025-01-07 14:33:10 +01:00
if highest_orig_datetime is None or orig_datetime > highest_orig_datetime:
highest_orig_datetime = orig_datetime
price_wh = value.marketpriceEurocentPerKWh / (100 * 1000) + charges_wh
2025-01-08 17:24:09 +01:00
# Collect all values into the Pandas Series
series_data.at[orig_datetime] = price_wh
# Update values using key_from_series
self.key_from_series("elecprice_marketprice_wh", series_data)
2025-01-07 14:33:10 +01:00
# Generate history array for prediction
2025-01-08 17:24:09 +01:00
history = self.key_to_array(
key="elecprice_marketprice_wh", end_datetime=highest_orig_datetime, fill_method="linear"
)
2025-01-07 14:33:10 +01:00
amount_datasets = len(self.records)
if not highest_orig_datetime: # mypy fix
error_msg = f"Highest original datetime not available: {highest_orig_datetime}"
logger.error(error_msg)
raise ValueError(error_msg)
2025-01-07 18:37:43 +01:00
2025-01-08 23:20:35 +01:00
# some of our data is already in the future, so we need to predict less. If we got less data we increase the prediction hours
needed_hours = int(
self.config.prediction.hours
2025-01-08 23:20:35 +01:00
- ((highest_orig_datetime - self.start_datetime).total_seconds() // 3600)
)
if needed_hours <= 0:
logger.warning(
f"No prediction needed. needed_hours={needed_hours}, hours={self.config.prediction.hours},highest_orig_datetime {highest_orig_datetime}, start_datetime {self.start_datetime}"
) # this might keep data longer than self.start_datetime + self.config.prediction.hours in the records
return
2025-01-08 17:24:09 +01:00
if amount_datasets > 800: # we do the full ets with seasons of 1 week
prediction = self._predict_ets(history, seasonal_periods=168, hours=needed_hours)
2025-01-08 17:24:09 +01:00
elif amount_datasets > 168: # not enough data to do seasons of 1 week, but enough for 1 day
prediction = self._predict_ets(history, seasonal_periods=24, hours=needed_hours)
2025-01-08 17:24:09 +01:00
elif amount_datasets > 0: # not enough data for ets, do median
prediction = self._predict_median(history, hours=needed_hours)
2025-01-07 14:56:25 +01:00
else:
2025-01-08 17:24:09 +01:00
logger.error("No data available for prediction")
raise ValueError("No data available")
2025-01-07 18:37:43 +01:00
# write predictions into the records, update if exist.
2025-01-08 17:24:09 +01:00
prediction_series = pd.Series(
data=prediction,
index=[
highest_orig_datetime + to_duration(f"{i + 1} hours")
for i in range(len(prediction))
],
2025-01-07 14:33:10 +01:00
)
2025-01-08 17:24:09 +01:00
self.key_from_series("elecprice_marketprice_wh", prediction_series)
2025-01-07 14:56:25 +01:00
2025-01-08 17:24:09 +01:00
# history2 = self.key_to_array(key="elecprice_marketprice_wh", fill_method="linear") + 0.0002
# return history, history2, prediction # for debug main
2025-01-07 17:26:20 +01:00
2025-01-08 17:24:09 +01:00
"""
2025-01-07 17:26:20 +01:00
def visualize_predictions(
2025-01-07 18:29:50 +01:00
history: np.ndarray[Any, Any],
history2: np.ndarray[Any, Any],
predictions: np.ndarray[Any, Any],
2025-01-07 17:26:20 +01:00
) -> None:
import matplotlib.pyplot as plt
plt.figure(figsize=(28, 14))
plt.plot(range(len(history)), history, label="History", color="green")
2025-01-07 18:29:50 +01:00
plt.plot(range(len(history2)), history2, label="History_new", color="blue")
2025-01-07 17:26:20 +01:00
plt.plot(
range(len(history), len(history) + len(predictions)),
predictions,
label="Predictions",
color="red",
)
2025-01-08 23:20:35 +01:00
plt.title("Predictions ets")
2025-01-07 17:26:20 +01:00
plt.xlabel("Time")
plt.ylabel("Price")
plt.legend()
plt.savefig("predictions_vs_true.png")
plt.close()
2025-01-07 14:33:10 +01:00
2025-01-08 17:24:09 +01:00
def main() -> None:
2025-01-08 23:20:35 +01:00
# Initialize ElecPriceAkkudoktor with required parameters
2025-01-08 17:24:09 +01:00
elec_price_akkudoktor = ElecPriceAkkudoktor()
history, history2, predictions = elec_price_akkudoktor._update_data()
visualize_predictions(history, history2, predictions)
# print(history, history2, predictions)
2025-01-07 14:33:10 +01:00
if __name__ == "__main__":
main()
2025-01-08 17:24:09 +01:00
"""