chore: eosdash improve plan display (#739)
Some checks failed
docker-build / platform-excludes (push) Has been cancelled
docker-build / build (push) Has been cancelled
docker-build / merge (push) Has been cancelled
pre-commit / pre-commit (push) Has been cancelled
Run Pytest on Pull Request / test (push) Has been cancelled
Close stale pull requests/issues / Find Stale issues and PRs (push) Has been cancelled

* chore: improve plan solution display

Add genetic optimization results to general solution provided by EOSdash plan display.

Add total results.

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>

* fix: genetic battery and home appliance device simulation

Fix genetic solution to make ac_charge, dc_charge, discharge, ev_charge or
home appliance start time reflect what the simulation was doing. Sometimes
the simulation decided to charge less or to start the appliance at another
time and this was not brought back to e.g. ac_charge.

Make home appliance simulation activate time window for the next day if it can not be
run today.

Improve simulation speed.

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>

---------

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
This commit is contained in:
Bobby Noelte
2025-11-08 15:42:18 +01:00
committed by GitHub
parent c50cdd95cb
commit 3599088dce
18 changed files with 1769 additions and 1345 deletions

View File

@@ -1,7 +1,8 @@
from typing import Any, Optional
from typing import Any, Iterator, Optional
import numpy as np
from akkudoktoreos.devices.devices import BATTERY_DEFAULT_CHARGE_RATES
from akkudoktoreos.optimization.genetic.geneticdevices import (
BaseBatteryParameters,
SolarPanelBatteryParameters,
@@ -17,12 +18,20 @@ class Battery:
self._setup()
def _setup(self) -> None:
"""Sets up the battery parameters based on configuration or provided parameters."""
"""Sets up the battery parameters based on provided parameters."""
self.capacity_wh = self.parameters.capacity_wh
self.initial_soc_percentage = self.parameters.initial_soc_percentage
self.charging_efficiency = self.parameters.charging_efficiency
self.discharging_efficiency = self.parameters.discharging_efficiency
# Charge rates, in case of None use default
self.charge_rates = BATTERY_DEFAULT_CHARGE_RATES
if self.parameters.charge_rates:
charge_rates = np.array(self.parameters.charge_rates, dtype=float)
charge_rates = np.unique(charge_rates)
charge_rates.sort()
self.charge_rates = charge_rates
# Only assign for storage battery
self.min_soc_percentage = (
self.parameters.min_soc_percentage
@@ -36,12 +45,30 @@ class Battery:
self.max_charge_power_w = self.parameters.max_charge_power_w
else:
self.max_charge_power_w = self.capacity_wh # TODO this should not be equal capacity_wh
self.discharge_array = np.full(self.prediction_hours, 1)
self.charge_array = np.full(self.prediction_hours, 1)
self.discharge_array = np.full(self.prediction_hours, 0)
self.charge_array = np.full(self.prediction_hours, 0)
self.soc_wh = (self.initial_soc_percentage / 100) * self.capacity_wh
self.min_soc_wh = (self.min_soc_percentage / 100) * self.capacity_wh
self.max_soc_wh = (self.max_soc_percentage / 100) * self.capacity_wh
def _lower_charge_rates_desc(self, start_rate: float) -> Iterator[float]:
"""Yield all charge rates lower than a given rate in descending order.
Args:
charge_rates (np.ndarray): Sorted 1D array of available charge rates.
start_rate (float): The reference charge rate.
Yields:
float: Charge rates lower than `start_rate`, in descending order.
"""
charge_rates_fast = self.charge_rates
# Find the insertion index for start_rate (left-most position)
idx = np.searchsorted(charge_rates_fast, start_rate, side="left")
# Yield values before idx in reverse (descending)
return (charge_rates_fast[j] for j in range(idx - 1, -1, -1))
def to_dict(self) -> dict[str, Any]:
"""Converts the object to a dictionary representation."""
return {
@@ -61,8 +88,8 @@ class Battery:
"""Resets the battery state to its initial values."""
self.soc_wh = (self.initial_soc_percentage / 100) * self.capacity_wh
self.soc_wh = min(max(self.soc_wh, self.min_soc_wh), self.max_soc_wh)
self.discharge_array = np.full(self.prediction_hours, 1)
self.charge_array = np.full(self.prediction_hours, 1)
self.discharge_array = np.full(self.prediction_hours, 0)
self.charge_array = np.full(self.prediction_hours, 0)
def set_discharge_per_hour(self, discharge_array: np.ndarray) -> None:
"""Sets the discharge values for each hour."""
@@ -80,70 +107,172 @@ class Battery:
)
self.charge_array = np.array(charge_array)
def set_charge_allowed_for_hour(self, charge: float, hour: int) -> None:
"""Sets the charge for a specific hour."""
if hour >= self.prediction_hours:
raise ValueError(
f"Hour {hour} is out of range. Must be less than {self.prediction_hours}."
)
self.charge_array[hour] = charge
def current_soc_percentage(self) -> float:
"""Calculates the current state of charge in percentage."""
return (self.soc_wh / self.capacity_wh) * 100
def discharge_energy(self, wh: float, hour: int) -> tuple[float, float]:
"""Discharges energy from the battery."""
"""Discharge energy from the battery.
Discharge is limited by:
* Requested delivered energy
* Remaining energy above minimum SoC
* Maximum discharge power
* Discharge efficiency
Args:
wh (float): Requested delivered energy in watt-hours.
hour (int): Time index. If `self.discharge_array[hour] == 0`,
no discharge occurs.
Returns:
tuple[float, float]:
delivered_wh (float): Actual delivered energy [Wh].
losses_wh (float): Conversion losses [Wh].
"""
if self.discharge_array[hour] == 0:
return 0.0, 0.0
max_possible_discharge_wh = (self.soc_wh - self.min_soc_wh) * self.discharging_efficiency
max_possible_discharge_wh = max(max_possible_discharge_wh, 0.0)
# Raw extractable energy above minimum SoC
raw_available_wh = max(self.soc_wh - self.min_soc_wh, 0.0)
max_possible_discharge_wh = min(
max_possible_discharge_wh, self.max_charge_power_w
) # TODO make a new cfg variable max_discharge_power_w
# Maximum raw discharge due to power limit
max_raw_wh = self.max_charge_power_w # TODO rename to max_discharge_power_w
actual_discharge_wh = min(wh, max_possible_discharge_wh)
actual_withdrawal_wh = (
actual_discharge_wh / self.discharging_efficiency
if self.discharging_efficiency > 0
else 0.0
)
# Actual raw withdrawal (internal)
raw_withdrawal_wh = min(raw_available_wh, max_raw_wh)
self.soc_wh -= actual_withdrawal_wh
# Convert raw to delivered
max_deliverable_wh = raw_withdrawal_wh * self.discharging_efficiency
# Cap by requested delivered energy
delivered_wh = min(wh, max_deliverable_wh)
# Effective raw withdrawal based on what is delivered
raw_used_wh = delivered_wh / self.discharging_efficiency
# Update SoC
self.soc_wh -= raw_used_wh
self.soc_wh = max(self.soc_wh, self.min_soc_wh)
losses_wh = actual_withdrawal_wh - actual_discharge_wh
return actual_discharge_wh, losses_wh
# Losses
losses_wh = raw_used_wh - delivered_wh
return delivered_wh, losses_wh
def charge_energy(
self, wh: Optional[float], hour: int, relative_power: float = 0.0
self,
wh: Optional[float],
hour: int,
charge_factor: float = 0.0,
) -> tuple[float, float]:
"""Charges energy into the battery."""
"""Charge energy into the battery.
Two **exclusive** modes:
Mode 1:
- `wh is not None` and `charge_factor == 0`
→ The raw requested charge energy is `wh` (pre-efficiency).
→ If remaining capacity is insufficient, charging is automatically limited.
→ No exception is raised due to capacity limits.
Mode 2:
- `wh is None` and `charge_factor > 0`
→ The raw requested energy is `max_charge_power_w * charge_factor`.
→ If the request exceeds remaining capacity, the algorithm tries to
find a lower charge_factor that is compatible. If such a charge factor
exists, this hours charge_factor is replaced.
→ If no charge factor can accommodate charging, the request is ignored
(`(0.0, 0.0)` is returned) and a penalty is applied elsewhere.
Charging is constrained by:
• Available SoC headroom (max_soc_wh soc_wh)
• max_charge_power_w
• charging_efficiency
Args:
wh (float | None):
Requested raw energy [Wh] before efficiency.
Must be provided only for Mode 1 (charge_factor must be 0).
hour (int):
Time index. If charging is disabled at this hour (charge_array[hour] == 0),
returns `(0.0, 0.0)`.
charge_factor (float):
Fraction (01) of max charge power.
Must be >0 only in Mode 2 (`wh is None`).
Returns:
tuple[float, float]:
stored_wh : float
Energy stored after efficiency [Wh].
losses_wh : float
Conversion losses [Wh].
Raises:
ValueError:
- If the mode is ambiguous (neither Mode 1 nor Mode 2).
- If the final new SoC would exceed capacity_wh.
Notes:
stored_wh = raw_input_wh * charging_efficiency
losses_wh = raw_input_wh stored_wh
"""
# Charging allowed in this hour?
if hour is not None and self.charge_array[hour] == 0:
return 0.0, 0.0 # Charging not allowed in this hour
return 0.0, 0.0
if relative_power > 0.0:
wh = self.max_charge_power_w * relative_power
# Provide fast (3x..5x) local read access (vs. self.xxx) for repetitive read access
soc_wh_fast = self.soc_wh
max_charge_power_w_fast = self.max_charge_power_w
charging_efficiency_fast = self.charging_efficiency
wh = wh if wh is not None else self.max_charge_power_w
# Decide mode & determine raw_request_wh and raw_charge_wh
if wh is not None and charge_factor == 0.0: # mode 1
raw_request_wh = wh
raw_charge_wh = max(self.max_soc_wh - soc_wh_fast, 0.0) / charging_efficiency_fast
elif wh is None and charge_factor > 0.0: # mode 2
raw_request_wh = max_charge_power_w_fast * charge_factor
raw_charge_wh = max(self.max_soc_wh - soc_wh_fast, 0.0) / charging_efficiency_fast
if raw_request_wh > raw_charge_wh:
# Use a lower charge factor
lower_charge_factors = self._lower_charge_rates_desc(charge_factor)
for charge_factor in lower_charge_factors:
raw_request_wh = max_charge_power_w_fast * charge_factor
if raw_request_wh <= raw_charge_wh:
self.charge_array[hour] = charge_factor
break
if raw_request_wh > raw_charge_wh:
# ignore request - penalty for missing SoC will be applied
self.charge_array[hour] = 0
return 0.0, 0.0
else:
raise ValueError(
f"{self.parameters.device_id}: charge_energy must be called either "
"with wh != None and charge_factor == 0, or with wh == None and charge_factor > 0."
)
max_possible_charge_wh = (
(self.max_soc_wh - self.soc_wh) / self.charging_efficiency
if self.charging_efficiency > 0
else 0.0
)
max_possible_charge_wh = max(max_possible_charge_wh, 0.0)
# Remaining capacity
max_raw_wh = min(raw_charge_wh, max_charge_power_w_fast)
effective_charge_wh = min(wh, max_possible_charge_wh)
charged_wh = effective_charge_wh * self.charging_efficiency
# Actual raw intake
raw_input_wh = raw_request_wh if raw_request_wh < max_raw_wh else max_raw_wh
self.soc_wh += charged_wh
self.soc_wh = min(self.soc_wh, self.max_soc_wh)
# Apply efficiency
stored_wh = raw_input_wh * charging_efficiency_fast
new_soc = soc_wh_fast + stored_wh
losses_wh = effective_charge_wh - charged_wh
return charged_wh, losses_wh
if new_soc > self.capacity_wh:
raise ValueError(
f"{self.parameters.device_id}: SoC {new_soc} Wh exceeds capacity {self.capacity_wh} Wh"
)
self.soc_wh = new_soc
losses_wh = raw_input_wh - stored_wh
return stored_wh, losses_wh
def current_energy_content(self) -> float:
"""Returns the current usable energy in the battery."""

View File

@@ -1,5 +1,3 @@
from typing import Optional
import numpy as np
from akkudoktoreos.optimization.genetic.geneticdevices import HomeApplianceParameters
@@ -28,7 +26,6 @@ class HomeAppliance:
self.load_curve = np.zeros(self.prediction_hours) # Initialize the load curve with zeros
self.duration_h = self.parameters.duration_h
self.consumption_wh = self.parameters.consumption_wh
self.appliance_start: Optional[int] = None
# setup possible start times
if self.parameters.time_windows is None:
self.parameters.time_windows = TimeWindowSequence(
@@ -59,33 +56,32 @@ class HomeAppliance:
else:
self.start_latest = 23
def set_starting_time(self, start_hour: int, global_start_hour: int = 0) -> None:
def set_starting_time(self, start_hour: int, global_start_hour: int = 0) -> int:
"""Sets the start time of the device and generates the corresponding load curve.
:param start_hour: The hour at which the device should start.
"""
self.reset_load_curve()
# Check if the duration of use is within the available time windows
if not self.start_allowed[start_hour]:
# No available time window to start home appliance
# Use the earliest one
start_hour = self.start_earliest
# It is not allowed (by the time windows) to start the application at this time
if global_start_hour <= self.start_latest:
# There is a time window left to start the appliance. Use it
start_hour = self.start_latest
else:
# There is no time window left to run the application
# Set the start into tomorrow
start_hour = self.start_earliest + 24
# Check if it is possibility to start the appliance
if start_hour < global_start_hour:
# Start is before current time
# Use the latest one
start_hour = self.start_latest
self.reset_load_curve()
# Calculate power per hour based on total consumption and duration
power_per_hour = self.consumption_wh / self.duration_h # Convert to watt-hours
# Set the power for the duration of use in the load curve array
self.load_curve[start_hour : start_hour + self.duration_h] = power_per_hour
if start_hour < len(self.load_curve):
end_hour = min(start_hour + self.duration_h, self.prediction_hours)
self.load_curve[start_hour:end_hour] = power_per_hour
# Set the selected start hour
self.appliance_start = start_hour
return start_hour
def reset_load_curve(self) -> None:
"""Resets the load curve."""
@@ -107,6 +103,3 @@ class HomeAppliance:
)
return self.load_curve[hour]
def get_appliance_start(self) -> Optional[int]:
return self.appliance_start