mirror of
				https://github.com/Akkudoktor-EOS/EOS.git
				synced 2025-10-31 06:46:20 +00:00 
			
		
		
		
	Add new electricity price provider: Energy-Charts #381 (#590)
	
		
			
	
		
	
	
		
	
		
			Some checks failed
		
		
	
	
		
			
				
	
				docker-build / platform-excludes (push) Has been cancelled
				
			
		
			
				
	
				pre-commit / pre-commit (push) Has been cancelled
				
			
		
			
				
	
				Run Pytest on Pull Request / test (push) Has been cancelled
				
			
		
			
				
	
				docker-build / build (push) Has been cancelled
				
			
		
			
				
	
				docker-build / merge (push) Has been cancelled
				
			
		
			
				
	
				Close stale pull requests/issues / Find Stale issues and PRs (push) Has been cancelled
				
			
		
		
	
	
				
					
				
			
		
			Some checks failed
		
		
	
	docker-build / platform-excludes (push) Has been cancelled
				
			pre-commit / pre-commit (push) Has been cancelled
				
			Run Pytest on Pull Request / test (push) Has been cancelled
				
			docker-build / build (push) Has been cancelled
				
			docker-build / merge (push) Has been cancelled
				
			Close stale pull requests/issues / Find Stale issues and PRs (push) Has been cancelled
				
			* feat(ElecPriceEnergyCharts): Add new electricity price provider: Energy-Charts * feat(ElecPriceEnergyCharts): update data only if needed * test(elecpriceforecast): add test for energycharts * docs(predictions.md): add ElecPriceEnergyCharts Provider Signed-off-by: redmoon2711 <redmoon2711@gmx.de>
This commit is contained in:
		
							
								
								
									
										262
									
								
								src/akkudoktoreos/prediction/elecpriceenergycharts.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										262
									
								
								src/akkudoktoreos/prediction/elecpriceenergycharts.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,262 @@ | ||||
| """Retrieves and processes electricity price forecast data from Energy-Charts. | ||||
|  | ||||
| This module provides classes and mappings to manage electricity price data obtained from the | ||||
| Energy-Charts API, including support for various electricity price attributes such as temperature, | ||||
| humidity, cloud cover, and solar irradiance. The data is mapped to the `ElecPriceDataRecord` | ||||
| format, enabling consistent access to forecasted and historical electricity price attributes. | ||||
| """ | ||||
|  | ||||
| from datetime import datetime | ||||
| from typing import Any, List, Optional, Union | ||||
|  | ||||
| import numpy as np | ||||
| import pandas as pd | ||||
| import requests | ||||
| from loguru import logger | ||||
| from pydantic import ValidationError | ||||
| from statsmodels.tsa.holtwinters import ExponentialSmoothing | ||||
|  | ||||
| from akkudoktoreos.core.cache import cache_in_file | ||||
| from akkudoktoreos.core.pydantic import PydanticBaseModel | ||||
| from akkudoktoreos.prediction.elecpriceabc import ElecPriceProvider | ||||
| from akkudoktoreos.utils.datetimeutil import to_datetime, to_duration | ||||
|  | ||||
|  | ||||
| class EnergyChartsElecPrice(PydanticBaseModel): | ||||
|     license_info: str | ||||
|     unix_seconds: List[int] | ||||
|     price: List[float] | ||||
|     unit: str | ||||
|     deprecated: bool | ||||
|  | ||||
|  | ||||
| class ElecPriceEnergyCharts(ElecPriceProvider): | ||||
|     """Fetch and process electricity price forecast data from Energy-Charts. | ||||
|  | ||||
|     ElecPriceEnergyCharts is a singleton-based class that retrieves electricity price forecast data | ||||
|     from the Energy-Charts API and maps it to `ElecPriceDataRecord` fields, applying | ||||
|     any necessary scaling or unit corrections. It manages the forecast over a range | ||||
|     of hours into the future and retains historical data. | ||||
|  | ||||
|     Attributes: | ||||
|         hours (int, optional): Number of hours in the future for the forecast. | ||||
|         historic_hours (int, optional): Number of past hours for retaining data. | ||||
|         start_datetime (datetime, optional): Start datetime for forecasts, defaults to the current datetime. | ||||
|         end_datetime (datetime, computed): The forecast's end datetime, computed based on `start_datetime` and `hours`. | ||||
|         keep_datetime (datetime, computed): The datetime to retain historical data, computed from `start_datetime` and `historic_hours`. | ||||
|  | ||||
|     Methods: | ||||
|         provider_id(): Returns a unique identifier for the provider. | ||||
|         _request_forecast(): Fetches the forecast from the Energy-Charts API. | ||||
|         _update_data(): Processes and updates forecast data from Energy-Charts in ElecPriceDataRecord format. | ||||
|     """ | ||||
|  | ||||
|     highest_orig_datetime: Optional[datetime] = None | ||||
|  | ||||
|     @classmethod | ||||
|     def provider_id(cls) -> str: | ||||
|         """Return the unique identifier for the Energy-Charts provider.""" | ||||
|         return "ElecPriceEnergyCharts" | ||||
|  | ||||
|     @classmethod | ||||
|     def _validate_data(cls, json_str: Union[bytes, Any]) -> EnergyChartsElecPrice: | ||||
|         """Validate Energy-Charts Electricity Price forecast data.""" | ||||
|         try: | ||||
|             energy_charts_data = EnergyChartsElecPrice.model_validate_json(json_str) | ||||
|         except ValidationError as e: | ||||
|             error_msg = "" | ||||
|             for error in e.errors(): | ||||
|                 field = " -> ".join(str(x) for x in error["loc"]) | ||||
|                 message = error["msg"] | ||||
|                 error_type = error["type"] | ||||
|                 error_msg += f"Field: {field}\nError: {message}\nType: {error_type}\n" | ||||
|             logger.error(f"Energy-Charts schema change: {error_msg}") | ||||
|             raise ValueError(error_msg) | ||||
|         return energy_charts_data | ||||
|  | ||||
|     @cache_in_file(with_ttl="1 hour") | ||||
|     def _request_forecast(self, start_date: Optional[str] = None) -> EnergyChartsElecPrice: | ||||
|         """Fetch electricity price forecast data from Energy-Charts API. | ||||
|  | ||||
|         This method sends a request to Energy-Charts API to retrieve forecast data for a specified | ||||
|         date range. The response data is parsed and returned as JSON for further processing. | ||||
|  | ||||
|         Returns: | ||||
|             dict: The parsed JSON response from Energy-Charts API containing forecast data. | ||||
|  | ||||
|         Raises: | ||||
|             ValueError: If the API response does not include expected `electricity price` data. | ||||
|         """ | ||||
|         source = "https://api.energy-charts.info" | ||||
|         if start_date is None: | ||||
|             # Try to take data from 5 weeks back for prediction | ||||
|             start_date = to_datetime( | ||||
|                 self.start_datetime - to_duration("35 days"), as_string="YYYY-MM-DD" | ||||
|             ) | ||||
|  | ||||
|         last_date = to_datetime(self.end_datetime, as_string="YYYY-MM-DD") | ||||
|         url = f"{source}/price?bzn=DE-LU&start={start_date}&end={last_date}" | ||||
|         response = requests.get(url, timeout=30) | ||||
|         logger.debug(f"Response from {url}: {response}") | ||||
|         response.raise_for_status()  # Raise an error for bad responses | ||||
|         energy_charts_data = self._validate_data(response.content) | ||||
|         # We are working on fresh data (no cache), report update time | ||||
|         self.update_datetime = to_datetime(in_timezone=self.config.general.timezone) | ||||
|         return energy_charts_data | ||||
|  | ||||
|     def _parse_data(self, energy_charts_data: EnergyChartsElecPrice) -> pd.Series: | ||||
|         # Assumption that all lists are the same length and are ordered chronologically | ||||
|         # in ascending order and have the same timestamps. | ||||
|  | ||||
|         # Get charges_kwh in wh | ||||
|         charges_wh = (self.config.elecprice.charges_kwh or 0) / 1000 | ||||
|  | ||||
|         # Initialize | ||||
|         highest_orig_datetime = None  # newest datetime from the api after that we want to update. | ||||
|         series_data = pd.Series(dtype=float)  # Initialize an empty series | ||||
|  | ||||
|         # Iterate over timestamps and prices together | ||||
|         for unix_sec, price_eur_per_mwh in zip( | ||||
|             energy_charts_data.unix_seconds, energy_charts_data.price | ||||
|         ): | ||||
|             orig_datetime = to_datetime(unix_sec, in_timezone=self.config.general.timezone) | ||||
|  | ||||
|             # Track the latest datetime | ||||
|             if highest_orig_datetime is None or orig_datetime > highest_orig_datetime: | ||||
|                 highest_orig_datetime = orig_datetime | ||||
|  | ||||
|             # Convert EUR/MWh to EUR/Wh, apply charges and VAT if charges > 0 | ||||
|             if charges_wh > 0: | ||||
|                 price_wh = ((price_eur_per_mwh / 1_000_000) + charges_wh) * 1.19 | ||||
|             else: | ||||
|                 price_wh = price_eur_per_mwh / 1_000_000 | ||||
|  | ||||
|             # Store in series | ||||
|             series_data.at[orig_datetime] = price_wh | ||||
|  | ||||
|         return series_data | ||||
|  | ||||
|     def _cap_outliers(self, data: np.ndarray, sigma: int = 2) -> np.ndarray: | ||||
|         mean = data.mean() | ||||
|         std = data.std() | ||||
|         lower_bound = mean - sigma * std | ||||
|         upper_bound = mean + sigma * std | ||||
|         capped_data = data.clip(min=lower_bound, max=upper_bound) | ||||
|         return capped_data | ||||
|  | ||||
|     def _predict_ets(self, history: np.ndarray, seasonal_periods: int, hours: int) -> np.ndarray: | ||||
|         clean_history = self._cap_outliers(history) | ||||
|         model = ExponentialSmoothing( | ||||
|             clean_history, seasonal="add", seasonal_periods=seasonal_periods | ||||
|         ).fit() | ||||
|         return model.forecast(hours) | ||||
|  | ||||
|     def _predict_median(self, history: np.ndarray, hours: int) -> np.ndarray: | ||||
|         clean_history = self._cap_outliers(history) | ||||
|         return np.full(hours, np.median(clean_history)) | ||||
|  | ||||
|     def _update_data( | ||||
|         self, force_update: Optional[bool] = False | ||||
|     ) -> None:  # tuple[np.ndarray, np.ndarray, np.ndarray]: | ||||
|         """Update forecast data in the ElecPriceDataRecord format. | ||||
|  | ||||
|         Retrieves data from Energy-Charts, maps each Energy-Charts field to the corresponding | ||||
|         `ElecPriceDataRecord` and applies any necessary scaling. | ||||
|  | ||||
|         The final mapped and processed data is inserted into the sequence as `ElecPriceDataRecord`. | ||||
|         """ | ||||
|         # New prices are available every day at 14:00 | ||||
|         now = pd.Timestamp.now(tz=self.config.general.timezone) | ||||
|         midnight = now.normalize() | ||||
|         hours_ahead = 23 if now.time() < pd.Timestamp("14:00").time() else 47 | ||||
|         end = midnight + pd.Timedelta(hours=hours_ahead) | ||||
|  | ||||
|         if not self.start_datetime: | ||||
|             raise ValueError(f"Start DateTime not set: {self.start_datetime}") | ||||
|  | ||||
|         # Determine if update is needed and how many days | ||||
|         past_days = 35 | ||||
|         if self.highest_orig_datetime: | ||||
|             # Try to get lowest datetime in history | ||||
|             try: | ||||
|                 history = self.key_to_array( | ||||
|                     key="elecprice_marketprice_wh", | ||||
|                     start_datetime=self.start_datetime, | ||||
|                     fill_method="linear", | ||||
|                 ) | ||||
|                 # If start_datetime lower then history | ||||
|                 if history.index.min() <= self.start_datetime: | ||||
|                     past_days = 1 | ||||
|             except AttributeError as e: | ||||
|                 logger.error(f"Energy-Charts no Index in history: {e}") | ||||
|  | ||||
|             needs_update = end > self.highest_orig_datetime | ||||
|         else: | ||||
|             needs_update = True | ||||
|  | ||||
|         if needs_update: | ||||
|             logger.info( | ||||
|                 f"Update ElecPriceEnergyCharts is needed, last update:{self.highest_orig_datetime}" | ||||
|             ) | ||||
|             # Set Start_date try to take data from 5 weeks back for prediction | ||||
|             start_date = to_datetime( | ||||
|                 self.start_datetime - to_duration(f"{past_days} days"), as_string="YYYY-MM-DD" | ||||
|             ) | ||||
|             # Get Energy-Charts electricity price data | ||||
|             energy_charts_data = self._request_forecast( | ||||
|                 start_date=start_date, force_update=force_update | ||||
|             )  # type: ignore | ||||
|  | ||||
|             # Parse and store data | ||||
|             series_data = self._parse_data(energy_charts_data) | ||||
|             self.highest_orig_datetime = series_data.index.max() | ||||
|             self.key_from_series("elecprice_marketprice_wh", series_data) | ||||
|         else: | ||||
|             logger.info( | ||||
|                 f"No update ElecPriceEnergyCharts is needed last update:{self.highest_orig_datetime}" | ||||
|             ) | ||||
|  | ||||
|         # Generate history array for prediction | ||||
|         history = self.key_to_array( | ||||
|             key="elecprice_marketprice_wh", | ||||
|             end_datetime=self.highest_orig_datetime, | ||||
|             fill_method="linear", | ||||
|         ) | ||||
|  | ||||
|         amount_datasets = len(self.records) | ||||
|         if not self.highest_orig_datetime:  # mypy fix | ||||
|             error_msg = f"Highest original datetime not available: {self.highest_orig_datetime}" | ||||
|             logger.error(error_msg) | ||||
|             raise ValueError(error_msg) | ||||
|  | ||||
|         # some of our data is already in the future, so we need to predict less. If we got less data we increase the prediction hours | ||||
|         needed_hours = int( | ||||
|             self.config.prediction.hours | ||||
|             - ((self.highest_orig_datetime - self.start_datetime).total_seconds() // 3600) | ||||
|         ) | ||||
|  | ||||
|         if needed_hours <= 0: | ||||
|             logger.warning( | ||||
|                 f"No prediction needed. needed_hours={needed_hours}, hours={self.config.prediction.hours},highest_orig_datetime {self.highest_orig_datetime}, start_datetime {self.start_datetime}" | ||||
|             )  # this might keep data longer than self.start_datetime + self.config.prediction.hours in the records | ||||
|             return | ||||
|  | ||||
|         if amount_datasets > 800:  # we do the full ets with seasons of 1 week | ||||
|             prediction = self._predict_ets(history, seasonal_periods=168, hours=needed_hours) | ||||
|         elif amount_datasets > 168:  # not enough data to do seasons of 1 week, but enough for 1 day | ||||
|             prediction = self._predict_ets(history, seasonal_periods=24, hours=needed_hours) | ||||
|         elif amount_datasets > 0:  # not enough data for ets, do median | ||||
|             prediction = self._predict_median(history, hours=needed_hours) | ||||
|         else: | ||||
|             logger.error("No data available for prediction") | ||||
|             raise ValueError("No data available") | ||||
|  | ||||
|         # write predictions into the records, update if exist. | ||||
|         prediction_series = pd.Series( | ||||
|             data=prediction, | ||||
|             index=[ | ||||
|                 self.highest_orig_datetime + to_duration(f"{i + 1} hours") | ||||
|                 for i in range(len(prediction)) | ||||
|             ], | ||||
|         ) | ||||
|         self.key_from_series("elecprice_marketprice_wh", prediction_series) | ||||
| @@ -32,6 +32,7 @@ from pydantic import Field | ||||
|  | ||||
| from akkudoktoreos.config.configabc import SettingsBaseModel | ||||
| from akkudoktoreos.prediction.elecpriceakkudoktor import ElecPriceAkkudoktor | ||||
| from akkudoktoreos.prediction.elecpriceenergycharts import ElecPriceEnergyCharts | ||||
| from akkudoktoreos.prediction.elecpriceimport import ElecPriceImport | ||||
| from akkudoktoreos.prediction.loadakkudoktor import LoadAkkudoktor | ||||
| from akkudoktoreos.prediction.loadimport import LoadImport | ||||
| @@ -83,6 +84,7 @@ class Prediction(PredictionContainer): | ||||
|     providers: List[ | ||||
|         Union[ | ||||
|             ElecPriceAkkudoktor, | ||||
|             ElecPriceEnergyCharts, | ||||
|             ElecPriceImport, | ||||
|             LoadAkkudoktor, | ||||
|             LoadImport, | ||||
| @@ -97,6 +99,7 @@ class Prediction(PredictionContainer): | ||||
|  | ||||
| # Initialize forecast providers, all are singletons. | ||||
| elecprice_akkudoktor = ElecPriceAkkudoktor() | ||||
| elecprice_energy_charts = ElecPriceEnergyCharts() | ||||
| elecprice_import = ElecPriceImport() | ||||
| load_akkudoktor = LoadAkkudoktor() | ||||
| load_import = LoadImport() | ||||
| @@ -114,6 +117,7 @@ def get_prediction() -> Prediction: | ||||
|     prediction = Prediction( | ||||
|         providers=[ | ||||
|             elecprice_akkudoktor, | ||||
|             elecprice_energy_charts, | ||||
|             elecprice_import, | ||||
|             load_akkudoktor, | ||||
|             load_import, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user