Data prefetch for ems (#418)
Some checks failed
docker-build / platform-excludes (push) Has been cancelled
pre-commit / pre-commit (push) Has been cancelled
Run Pytest on Pull Request / test (push) Has been cancelled
docker-build / build (push) Has been cancelled
docker-build / merge (push) Has been cancelled

* Pre-fetch data

* maintanance and extend tests

* comment clean up

* nansum usage (to be save)
This commit is contained in:
Normann 2025-01-26 18:29:26 +01:00 committed by GitHub
parent 774cfd8b65
commit 1a2da7636b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 171 additions and 106 deletions

View File

@ -1,4 +1,4 @@
from typing import Any, ClassVar, Dict, Optional, Union
from typing import Any, ClassVar, Optional
import numpy as np
from numpydantic import NDArray, Shape
@ -186,7 +186,7 @@ class EnergieManagementSystem(SingletonMixin, ConfigMixin, PredictionMixin, Pyda
len(self.load_energy_array), parameters.einspeiseverguetung_euro_pro_wh, float
)
)
if inverter is not None:
if inverter:
self.battery = inverter.battery
else:
self.battery = None
@ -198,7 +198,7 @@ class EnergieManagementSystem(SingletonMixin, ConfigMixin, PredictionMixin, Pyda
self.ev_charge_hours = np.full(self.config.prediction_hours, 0.0)
def set_akku_discharge_hours(self, ds: np.ndarray) -> None:
if self.battery is not None:
if self.battery:
self.battery.set_discharge_per_hour(ds)
def set_akku_ac_charge_hours(self, ds: np.ndarray) -> None:
@ -211,7 +211,7 @@ class EnergieManagementSystem(SingletonMixin, ConfigMixin, PredictionMixin, Pyda
self.ev_charge_hours = ds
def set_home_appliance_start(self, ds: int, global_start_hour: int = 0) -> None:
if self.home_appliance is not None:
if self.home_appliance:
self.home_appliance.set_starting_time(ds, global_start_hour=global_start_hour)
def reset(self) -> None:
@ -276,53 +276,50 @@ class EnergieManagementSystem(SingletonMixin, ConfigMixin, PredictionMixin, Pyda
return self.simulate(start_hour)
def simulate(self, start_hour: int) -> dict[str, Any]:
"""hour.
"""Simulate energy usage and costs for the given start hour.
akku_soc_pro_stunde begin of the hour, initial hour state!
last_wh_pro_stunde integral of last hour (end state)
last_wh_pro_stunde integral of last hour (end state)
"""
# Check for simulation integrity
missing_data = []
if self.load_energy_array is None:
missing_data.append("Load Curve")
if self.pv_prediction_wh is None:
missing_data.append("PV Forecast")
if self.elect_price_hourly is None:
missing_data.append("Electricity Price")
if self.ev_charge_hours is None:
missing_data.append("EV Charge Hours")
if self.ac_charge_hours is None:
missing_data.append("AC Charge Hours")
if self.dc_charge_hours is None:
missing_data.append("DC Charge Hours")
if self.elect_revenue_per_hour_arr is None:
missing_data.append("Feed-in Tariff")
required_attrs = [
"load_energy_array",
"pv_prediction_wh",
"elect_price_hourly",
"ev_charge_hours",
"ac_charge_hours",
"dc_charge_hours",
"elect_revenue_per_hour_arr",
]
missing_data = [
attr.replace("_", " ").title() for attr in required_attrs if getattr(self, attr) is None
]
if missing_data:
error_msg = "Mandatory data missing - " + ", ".join(missing_data)
logger.error(error_msg)
raise ValueError(error_msg)
else:
# make mypy happy
assert self.load_energy_array is not None
assert self.pv_prediction_wh is not None
assert self.elect_price_hourly is not None
assert self.ev_charge_hours is not None
assert self.ac_charge_hours is not None
assert self.dc_charge_hours is not None
assert self.elect_revenue_per_hour_arr is not None
logger.error("Mandatory data missing - %s", ", ".join(missing_data))
raise ValueError(f"Mandatory data missing: {', '.join(missing_data)}")
load_energy_array = self.load_energy_array
# Pre-fetch data
load_energy_array = np.array(self.load_energy_array)
pv_prediction_wh = np.array(self.pv_prediction_wh)
elect_price_hourly = np.array(self.elect_price_hourly)
ev_charge_hours = np.array(self.ev_charge_hours)
ac_charge_hours = np.array(self.ac_charge_hours)
dc_charge_hours = np.array(self.dc_charge_hours)
elect_revenue_per_hour_arr = np.array(self.elect_revenue_per_hour_arr)
if not (
len(load_energy_array) == len(self.pv_prediction_wh) == len(self.elect_price_hourly)
):
error_msg = f"Array sizes do not match: Load Curve = {len(load_energy_array)}, PV Forecast = {len(self.pv_prediction_wh)}, Electricity Price = {len(self.elect_price_hourly)}"
# Fetch objects
battery = self.battery
assert battery # to please mypy
ev = self.ev
home_appliance = self.home_appliance
inverter = self.inverter
if not (len(load_energy_array) == len(pv_prediction_wh) == len(elect_price_hourly)):
error_msg = f"Array sizes do not match: Load Curve = {len(load_energy_array)}, PV Forecast = {len(pv_prediction_wh)}, Electricity Price = {len(elect_price_hourly)}"
logger.error(error_msg)
raise ValueError(error_msg)
# Optimized total hours calculation
end_hour = len(load_energy_array)
total_hours = end_hour - start_hour
@ -332,116 +329,110 @@ class EnergieManagementSystem(SingletonMixin, ConfigMixin, PredictionMixin, Pyda
consumption_energy_per_hour = np.full((total_hours), np.nan)
costs_per_hour = np.full((total_hours), np.nan)
revenue_per_hour = np.full((total_hours), np.nan)
soc_per_hour = np.full((total_hours), np.nan) # Hour End State
soc_per_hour = np.full((total_hours), np.nan)
soc_ev_per_hour = np.full((total_hours), np.nan)
losses_wh_per_hour = np.full((total_hours), np.nan)
home_appliance_wh_per_hour = np.full((total_hours), np.nan)
electricity_price_per_hour = np.full((total_hours), np.nan)
# Set initial state
if self.battery:
soc_per_hour[0] = self.battery.current_soc_percentage()
if self.ev:
soc_ev_per_hour[0] = self.ev.current_soc_percentage()
soc_per_hour[0] = battery.current_soc_percentage()
if ev:
soc_ev_per_hour[0] = ev.current_soc_percentage()
for hour in range(start_hour, end_hour):
hour_since_now = hour - start_hour
hour_idx = hour - start_hour
# save begin states
if self.battery:
soc_per_hour[hour_since_now] = self.battery.current_soc_percentage()
else:
soc_per_hour[hour_since_now] = 0.0
if self.ev:
soc_ev_per_hour[hour_since_now] = self.ev.current_soc_percentage()
soc_per_hour[hour_idx] = battery.current_soc_percentage()
if ev:
soc_ev_per_hour[hour_idx] = ev.current_soc_percentage()
# Accumulate loads and PV generation
consumption = self.load_energy_array[hour]
losses_wh_per_hour[hour_since_now] = 0.0
consumption = load_energy_array[hour]
losses_wh_per_hour[hour_idx] = 0.0
# Home appliances
if self.home_appliance:
ha_load = self.home_appliance.get_load_for_hour(hour)
if home_appliance:
ha_load = home_appliance.get_load_for_hour(hour)
consumption += ha_load
home_appliance_wh_per_hour[hour_since_now] = ha_load
home_appliance_wh_per_hour[hour_idx] = ha_load
# E-Auto handling
if self.ev:
if self.ev_charge_hours[hour] > 0:
loaded_energy_ev, verluste_eauto = self.ev.charge_energy(
None, hour, relative_power=self.ev_charge_hours[hour]
)
consumption += loaded_energy_ev
losses_wh_per_hour[hour_since_now] += verluste_eauto
if ev and ev_charge_hours[hour] > 0:
loaded_energy_ev, verluste_eauto = ev.charge_energy(
None, hour, relative_power=ev_charge_hours[hour]
)
consumption += loaded_energy_ev
losses_wh_per_hour[hour_idx] += verluste_eauto
# Process inverter logic
energy_feedin_grid_actual, energy_consumption_grid_actual, losses, eigenverbrauch = (
0.0,
0.0,
0.0,
0.0,
energy_feedin_grid_actual = energy_consumption_grid_actual = losses = eigenverbrauch = (
0.0
)
if self.battery:
self.battery.set_charge_allowed_for_hour(self.dc_charge_hours[hour], hour)
if self.inverter:
energy_produced = self.pv_prediction_wh[hour]
hour_ac_charge = ac_charge_hours[hour]
hour_dc_charge = dc_charge_hours[hour]
hourly_electricity_price = elect_price_hourly[hour]
hourly_energy_revenue = elect_revenue_per_hour_arr[hour]
battery.set_charge_allowed_for_hour(hour_dc_charge, hour)
if inverter:
energy_produced = pv_prediction_wh[hour]
(
energy_feedin_grid_actual,
energy_consumption_grid_actual,
losses,
eigenverbrauch,
) = self.inverter.process_energy(energy_produced, consumption, hour)
) = inverter.process_energy(energy_produced, consumption, hour)
# AC PV Battery Charge
if self.battery and self.ac_charge_hours[hour] > 0.0:
self.battery.set_charge_allowed_for_hour(1, hour)
battery_charged_energy_actual, battery_losses_actual = self.battery.charge_energy(
None, hour, relative_power=self.ac_charge_hours[hour]
if hour_ac_charge > 0.0:
battery.set_charge_allowed_for_hour(1, hour)
battery_charged_energy_actual, battery_losses_actual = battery.charge_energy(
None, hour, relative_power=hour_ac_charge
)
# print(hour, " ", battery_charged_energy_actual, " ",self.ac_charge_hours[hour]," ",self.battery.current_soc_percentage())
consumption += battery_charged_energy_actual
consumption += battery_losses_actual
energy_consumption_grid_actual += battery_charged_energy_actual
energy_consumption_grid_actual += battery_losses_actual
losses_wh_per_hour[hour_since_now] += battery_losses_actual
feedin_energy_per_hour[hour_since_now] = energy_feedin_grid_actual
consumption_energy_per_hour[hour_since_now] = energy_consumption_grid_actual
losses_wh_per_hour[hour_since_now] += losses
loads_energy_per_hour[hour_since_now] = consumption
electricity_price_per_hour[hour_since_now] = self.elect_price_hourly[hour]
total_battery_energy = battery_charged_energy_actual + battery_losses_actual
consumption += total_battery_energy
energy_consumption_grid_actual += total_battery_energy
losses_wh_per_hour[hour_idx] += battery_losses_actual
# Update hourly arrays
feedin_energy_per_hour[hour_idx] = energy_feedin_grid_actual
consumption_energy_per_hour[hour_idx] = energy_consumption_grid_actual
losses_wh_per_hour[hour_idx] += losses
loads_energy_per_hour[hour_idx] = consumption
electricity_price_per_hour[hour_idx] = hourly_electricity_price
# Financial calculations
costs_per_hour[hour_since_now] = (
energy_consumption_grid_actual * self.elect_price_hourly[hour]
)
revenue_per_hour[hour_since_now] = (
energy_feedin_grid_actual * self.elect_revenue_per_hour_arr[hour]
)
costs_per_hour[hour_idx] = energy_consumption_grid_actual * hourly_electricity_price
revenue_per_hour[hour_idx] = energy_feedin_grid_actual * hourly_energy_revenue
# Total cost and return
gesamtkosten_euro = np.nansum(costs_per_hour) - np.nansum(revenue_per_hour)
total_cost = np.nansum(costs_per_hour)
total_losses = np.nansum(losses_wh_per_hour)
total_revenue = np.nansum(revenue_per_hour)
# Prepare output dictionary
out: Dict[str, Union[np.ndarray, float]] = {
return {
"Last_Wh_pro_Stunde": loads_energy_per_hour,
"Netzeinspeisung_Wh_pro_Stunde": feedin_energy_per_hour,
"Netzbezug_Wh_pro_Stunde": consumption_energy_per_hour,
"Kosten_Euro_pro_Stunde": costs_per_hour,
"akku_soc_pro_stunde": soc_per_hour,
"Einnahmen_Euro_pro_Stunde": revenue_per_hour,
"Gesamtbilanz_Euro": gesamtkosten_euro,
"Gesamtbilanz_Euro": total_cost - total_revenue,
"EAuto_SoC_pro_Stunde": soc_ev_per_hour,
"Gesamteinnahmen_Euro": np.nansum(revenue_per_hour),
"Gesamtkosten_Euro": np.nansum(costs_per_hour),
"Gesamteinnahmen_Euro": total_revenue,
"Gesamtkosten_Euro": total_cost,
"Verluste_Pro_Stunde": losses_wh_per_hour,
"Gesamt_Verluste": np.nansum(losses_wh_per_hour),
"Gesamt_Verluste": total_losses,
"Home_appliance_wh_per_hour": home_appliance_wh_per_hour,
"Electricity_price": electricity_price_per_hour,
}
return out
# Initialize the Energy Management System, it is a singleton.
ems = EnergieManagementSystem()

View File

@ -6,6 +6,7 @@ import pytest
from akkudoktoreos.core.ems import (
EnergieManagementSystem,
EnergieManagementSystemParameters,
SimulationResult,
get_ems,
)
from akkudoktoreos.devices.battery import (
@ -182,6 +183,7 @@ def test_simulation(create_ems_instance):
# Assertions to validate results
assert result is not None, "Result should not be None"
assert isinstance(result, dict), "Result should be a dictionary"
assert SimulationResult(**result) is not None
assert "Last_Wh_pro_Stunde" in result, "Result should contain 'Last_Wh_pro_Stunde'"
"""
@ -240,7 +242,7 @@ def test_simulation(create_ems_instance):
assert (
abs(result["Netzeinspeisung_Wh_pro_Stunde"][10] - 3946.93) < 1e-3
), "'Netzeinspeisung_Wh_pro_Stunde[11]' should be 4000."
), "'Netzeinspeisung_Wh_pro_Stunde[11]' should be 3946.93."
assert (
abs(result["Netzeinspeisung_Wh_pro_Stunde"][11] - 0.0) < 1e-3
@ -251,6 +253,78 @@ def test_simulation(create_ems_instance):
), "'akku_soc_pro_stunde[20]' should be 10."
assert (
abs(result["Last_Wh_pro_Stunde"][20] - 6050.98) < 1e-3
), "'Netzeinspeisung_Wh_pro_Stunde[11]' should be 0.0."
), "'Last_Wh_pro_Stunde[20]' should be 6050.98."
print("All tests passed successfully.")
def test_set_parameters(create_ems_instance):
"""Test the set_parameters method of EnergieManagementSystem."""
ems = create_ems_instance
# Check if parameters are set correctly
assert ems.load_energy_array is not None, "load_energy_array should not be None"
assert ems.pv_prediction_wh is not None, "pv_prediction_wh should not be None"
assert ems.elect_price_hourly is not None, "elect_price_hourly should not be None"
assert (
ems.elect_revenue_per_hour_arr is not None
), "elect_revenue_per_hour_arr should not be None"
def test_set_akku_discharge_hours(create_ems_instance):
"""Test the set_akku_discharge_hours method of EnergieManagementSystem."""
ems = create_ems_instance
discharge_hours = np.full(ems.config.prediction_hours, 1.0)
ems.set_akku_discharge_hours(discharge_hours)
assert np.array_equal(
ems.battery.discharge_array, discharge_hours
), "Discharge hours should be set correctly"
def test_set_akku_ac_charge_hours(create_ems_instance):
"""Test the set_akku_ac_charge_hours method of EnergieManagementSystem."""
ems = create_ems_instance
ac_charge_hours = np.full(ems.config.prediction_hours, 1.0)
ems.set_akku_ac_charge_hours(ac_charge_hours)
assert np.array_equal(
ems.ac_charge_hours, ac_charge_hours
), "AC charge hours should be set correctly"
def test_set_akku_dc_charge_hours(create_ems_instance):
"""Test the set_akku_dc_charge_hours method of EnergieManagementSystem."""
ems = create_ems_instance
dc_charge_hours = np.full(ems.config.prediction_hours, 1.0)
ems.set_akku_dc_charge_hours(dc_charge_hours)
assert np.array_equal(
ems.dc_charge_hours, dc_charge_hours
), "DC charge hours should be set correctly"
def test_set_ev_charge_hours(create_ems_instance):
"""Test the set_ev_charge_hours method of EnergieManagementSystem."""
ems = create_ems_instance
ev_charge_hours = np.full(ems.config.prediction_hours, 1.0)
ems.set_ev_charge_hours(ev_charge_hours)
assert np.array_equal(
ems.ev_charge_hours, ev_charge_hours
), "EV charge hours should be set correctly"
def test_reset(create_ems_instance):
"""Test the reset method of EnergieManagementSystem."""
ems = create_ems_instance
ems.reset()
assert ems.ev.current_soc_percentage() == 100, "EV SOC should be reset to initial value"
assert (
ems.battery.current_soc_percentage() == 80
), "Battery SOC should be reset to initial value"
def test_simulate_start_now(create_ems_instance):
"""Test the simulate_start_now method of EnergieManagementSystem."""
ems = create_ems_instance
result = ems.simulate_start_now()
assert result is not None, "Result should not be None"
assert isinstance(result, dict), "Result should be a dictionary"
assert "Last_Wh_pro_Stunde" in result, "Result should contain 'Last_Wh_pro_Stunde'"