initial commit

This commit is contained in:
Normann 2025-01-07 00:07:56 +01:00 committed by Andreas
parent d2f6e9866d
commit 4c0afb6a2a

View File

@ -16,8 +16,9 @@ from pydantic import Field, ValidationError
from akkudoktoreos.core.logging import get_logger
from akkudoktoreos.core.pydantic import PydanticBaseModel
from akkudoktoreos.prediction.elecpriceabc import ElecPriceDataRecord, ElecPriceProvider
from akkudoktoreos.utils.cacheutil import CacheFileStore, cache_in_file
from akkudoktoreos.utils.datetimeutil import compare_datetimes, to_datetime, to_duration
from akkudoktoreos.utils.cacheutil import cache_in_file
from akkudoktoreos.utils.datetimeutil import to_datetime, to_duration
from statsmodels.tsa.holtwinters import ExponentialSmoothing
logger = get_logger(__name__)
@ -65,12 +66,6 @@ class ElecPriceAkkudoktor(ElecPriceProvider):
_update_data(): Processes and updates forecast data from Akkudoktor in ElecPriceDataRecord format.
"""
elecprice_8days: NDArray[Shape["24, 8"], float] = Field(
default=np.full((24, 8), np.nan),
description="Hourly electricity prices for the last 7 days and today (€/KWh). "
"A NumPy array of 24 elements, each representing the hourly prices "
"of the last 7 days (index 0..6, Monday..Sunday) and today (index 7).",
)
elecprice_8days_weights_day_of_week: NDArray[Shape["7, 8"], float] = Field(
default=np.full((7, 8), np.nan),
description="Daily electricity price weights for the last 7 days and today. "
@ -100,54 +95,6 @@ class ElecPriceAkkudoktor(ElecPriceProvider):
raise ValueError(error_msg)
return akkudoktor_data
def historic_hours_min(self) -> int:
"""Return the minimum historic prediction hours for Akkudoktor electricity price data."""
return 5 * 7 * 24 # 5 weeks a 7 days a 24 hours
def _calculate_weighted_mean(self, day_of_week: int, hour: int) -> float:
"""Calculate the weighted mean price for given day_of_week and hour.
Args:
day_of_week (int). The day of week to calculate the mean for (0=Monday..6).
hour (int): The hour week to calculate the mean for (0..23).
Returns:
price_weihgted_mead (float): Weighted mean price for given day_of:week and hour.
"""
if np.isnan(self.elecprice_8days_weights_day_of_week[0][0]):
# Weights not initialized - do now
# Priority of day: 1=most .. 7=least
priority_of_day = np.array(
# Available Prediction days /
# M,Tu,We,Th,Fr,Sa,Su,Today/ Forecast day_of_week
[
[1, 2, 3, 4, 5, 6, 7, 1], # Monday
[3, 1, 2, 4, 5, 6, 7, 1], # Tuesday
[4, 2, 1, 3, 5, 6, 7, 1], # Wednesday
[5, 4, 2, 1, 3, 6, 7, 1], # Thursday
[5, 4, 3, 2, 1, 6, 7, 1], # Friday
[7, 6, 5, 4, 2, 1, 3, 1], # Saturday
[7, 6, 5, 4, 3, 2, 1, 1], # Sunday
]
)
# Take priorities above to decrease relevance in 2s exponential
self.elecprice_8days_weights_day_of_week = 2 / (2**priority_of_day)
# Compute the weighted mean for day_of_week and hour
prices_of_hour = self.elecprice_8days[hour]
if np.isnan(prices_of_hour).all():
# No prediction prices available for this hour - use mean value of all prices
price_weighted_mean = np.nanmean(self.elecprice_marketprice_wh_8day)
else:
weights = self.elecprice_8days_weights_day_of_week[day_of_week]
prices_of_hour_masked: NDArray[Shape["24"]] = np.ma.MaskedArray(
prices_of_hour, mask=np.isnan(prices_of_hour)
)
price_weighted_mean = np.ma.average(prices_of_hour_masked, weights=weights)
return float(price_weighted_mean)
@cache_in_file(with_ttl="1 hour")
def _request_forecast(self) -> AkkudoktorElecPrice:
"""Fetch electricity price forecast data from Akkudoktor API.
@ -162,8 +109,9 @@ class ElecPriceAkkudoktor(ElecPriceProvider):
ValueError: If the API response does not include expected `electricity price` data.
"""
source = "https://api.akkudoktor.net"
# Try to take data from 7 days back for prediction - usually only some hours back are available
date = to_datetime(self.start_datetime - to_duration("7 days"), as_string="YYYY-MM-DD")
assert self.start_datetime # mypy fix
# Try to take data from 5 weeks back for prediction
date = to_datetime(self.start_datetime - to_duration("35 days"), as_string="YYYY-MM-DD")
last_date = to_datetime(self.end_datetime, as_string="YYYY-MM-DD")
url = f"{source}/prices?start={date}&end={last_date}&tz={self.config.timezone}"
response = requests.get(url)
@ -174,6 +122,23 @@ class ElecPriceAkkudoktor(ElecPriceProvider):
self.update_datetime = to_datetime(in_timezone=self.config.timezone)
return akkudoktor_data
def cap_outliers(data, sigma=2):
mean = data.mean()
std = data.std()
lower_bound = mean - sigma * std
upper_bound = mean + sigma * std
capped_data = data.clip(lower=lower_bound, upper=upper_bound)
return capped_data
def predict_ets(
history,
seasonal_periods,
):
model = ExponentialSmoothing(
history, seasonal="add", seasonal_periods=seasonal_periods
).fit()
return model.forecast(7 * 24)
def _update_data(self, force_update: Optional[bool] = False) -> None:
"""Update forecast data in the ElecPriceDataRecord format.
@ -187,77 +152,34 @@ class ElecPriceAkkudoktor(ElecPriceProvider):
# Assumption that all lists are the same length and are ordered chronologically
# in ascending order and have the same timestamps.
values_len = len(akkudoktor_data.values)
if values_len < 1:
# Expect one value set per prediction hour
raise ValueError(
f"The forecast must have at least one dataset, "
f"but only {values_len} data sets are given in forecast data."
)
# Get cached 8day values
elecprice_cache_file = CacheFileStore().get(key="ElecPriceAkkudoktor8dayCache")
if elecprice_cache_file is None:
# Cache does not exist - create it
elecprice_cache_file = CacheFileStore().create(
key="ElecPriceAkkudoktor8dayCache",
until_datetime=to_datetime("infinity"),
suffix=".npy",
)
np.save(elecprice_cache_file, self.elecprice_8days)
elecprice_cache_file.seek(0)
self.elecprice_8days = np.load(elecprice_cache_file)
# Get elecprice_charges_kwh_kwh
charges_kwh = (
self.config.elecprice_charges_kwh if self.config.elecprice_charges_kwh else 0.0
charges_wh = (
self.config.elecprice_charges_kwh / 1000 if self.config.elecprice_charges_kwh else 0.0
)
assert self.start_datetime # mypy fix
for i in range(values_len):
original_datetime = akkudoktor_data.values[i].start
dt = to_datetime(original_datetime, in_timezone=self.config.timezone)
for akkudoktor_value in akkudoktor_data.values:
orig_datetime = to_datetime(akkudoktor_value.start, in_timezone=self.config.timezone)
akkudoktor_value = akkudoktor_data.values[i]
price_wh = (
akkudoktor_value.marketpriceEurocentPerKWh / (100 * 1000) + charges_kwh / 1000
)
# We provide prediction starting at start of day, to be compatible to old system.
if compare_datetimes(dt, self.start_datetime.start_of("day")).lt:
# forecast data is too old - older than start_datetime with time set to 00:00:00
self.elecprice_8days[dt.hour, dt.day_of_week] = price_wh
continue
self.elecprice_8days[dt.hour, 7] = price_wh
self.update_value(dt, "elecprice_marketprice_wh", price_wh)
# Update 8day cache
elecprice_cache_file.seek(0)
np.save(elecprice_cache_file, self.elecprice_8days)
# Check for new/ valid forecast data
if len(self) == 0:
# Got no valid forecast data
return
# Assure price starts at start_time
while compare_datetimes(self[0].date_time, self.start_datetime).gt:
# Repeat the mean on the 8 day array to cover the missing hours
dt = self[0].date_time.subtract(hours=1) # type: ignore
value = self._calculate_weighted_mean(dt.day_of_week, dt.hour)
price_wh = akkudoktor_value.marketpriceEurocentPerKWh / (100 * 1000) + charges_wh
record = ElecPriceDataRecord(
date_time=dt,
elecprice_marketprice_wh=value,
date_time=orig_datetime,
elecprice_marketprice_wh=price_wh,
)
self.insert(0, record)
# Assure price ends at end_time
while compare_datetimes(self[-1].date_time, self.end_datetime).lt:
# Repeat the mean on the 8 day array to cover the missing hours
dt = self[-1].date_time.add(hours=1) # type: ignore
value = self._calculate_weighted_mean(dt.day_of_week, dt.hour)
record = ElecPriceDataRecord(
date_time=dt,
elecprice_marketprice_wh=value,
)
self.append(record)
self.insert(
0, record
) # idk what happens if the date is already there. try except update?
# now we check if we have data newer than the last from the api. if so thats old prediction. we delete them all.
# now we count how many data points we have.
# if its > 800 (5 weeks) we will use EST
# elif > idk maybe 168 (1 week) we use EST without season
# elif < 168 we use a simple median
# #elif == 0 we need some static value from the config
# depending on the result we check prediction_hours and predict that many hours.
# we get the result and iterate over it to put it into ElecPriceDataRecord