fix: genetic optimizer charge rates, SoC accuracy, and minor bugfixes (#949)
Some checks failed
Bump Version / Bump Version Workflow (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
docker-build / platform-excludes (push) Has been cancelled
docker-build / build (push) Has been cancelled
docker-build / merge (push) Has been cancelled
pre-commit / pre-commit (push) Has been cancelled
Run Pytest on Pull Request / test (push) Has been cancelled

* record battery SOC at the start of the interval for accurate display

* map battery charge rates to GeneticOptimizationParameters and update related logic

  Instead of using the EV charge rates, the battery now has its own charge rate defined
  in the GeneticOptimizationParameters to better separate those entities.

* separate raw gene values from SOC-clamped op factors

  The genetic_*_factor columns in the OptimizationSolution dataframe now
  always carry the raw gene values (optimizer intent), while the
  battery1_*_op_mode / battery1_*_op_factor columns and FRBCInstruction
  operation_mode_factor reflect SOC-clamped effective values that can
  actually be executed given the battery's state of charge at each hour.

  Adds GeneticSolution._soc_clamped_operation_factors():
    - AC charge factor: proportionally scaled down when battery headroom
      (max_soc - current_soc) is less than what the commanded factor
      would store in one hour; zeroed when battery is full.
    - DC charge factor: zeroed when battery is at or above max SOC.
    - Discharge: blocked when SOC is at or below min SOC.

  Both optimization_solution() and energy_management_plan() pass the
  clamped values to _battery_operation_from_solution(), so the plan
  instructions the HEMS receives reflect physically achievable targets.

* ensure max AC charge power is only used if defined

* update fixtures for PV suffix + SOC-clamp algorithm changes

* handle None case for home appliance start hour in simulation

  if the hour is zero, the the home appliance wont start

* update handling of invalid charge indices in autocharge hours

  Seems to be a bug, since all invalid indexes needs to be invalidated (also the first one)

* remove double code in simulation preparation and improve comments

* update version

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
Co-authored-by: Christopher Nadler <christopher.nadler@gmail.com>
This commit is contained in:
Bobby Noelte
2026-03-17 12:41:15 +01:00
committed by GitHub
parent 71e5abce88
commit 779395cef5
11 changed files with 1415 additions and 1192 deletions

View File

@@ -113,8 +113,14 @@ class GeneticSimulation(PydanticBaseModel):
home_appliance: Optional[HomeAppliance] = None,
inverter: Optional[Inverter] = None,
) -> None:
"""Prepare simulation runs.
Populate internal arrays and device references used during simulation.
"""
self.optimization_hours = optimization_hours
self.prediction_hours = prediction_hours
# Load arrays from provided EMS parameters
self.load_energy_array = np.array(parameters.gesamtlast, float)
self.pv_prediction_wh = np.array(parameters.pv_prognose_wh, float)
self.elect_price_hourly = np.array(parameters.strompreis_euro_pro_wh, float)
@@ -125,6 +131,8 @@ class GeneticSimulation(PydanticBaseModel):
len(self.load_energy_array), parameters.einspeiseverguetung_euro_pro_wh, float
)
)
# Associate devices
if inverter:
self.battery = inverter.battery
else:
@@ -132,23 +140,14 @@ class GeneticSimulation(PydanticBaseModel):
self.ev = ev
self.home_appliance = home_appliance
self.inverter = inverter
# Initialize per-hour action arrays for the prediction horizon
self.ac_charge_hours = np.full(self.prediction_hours, 0.0)
self.dc_charge_hours = np.full(self.prediction_hours, 0.0)
self.bat_discharge_hours = np.full(self.prediction_hours, 0.0)
self.ev_charge_hours = np.full(self.prediction_hours, 0.0)
self.ev_discharge_hours = np.full(self.prediction_hours, 0.0)
self.home_appliance_start_hour = None
"""Prepare simulation runs."""
self.load_energy_array = np.array(parameters.gesamtlast, float)
self.pv_prediction_wh = np.array(parameters.pv_prognose_wh, float)
self.elect_price_hourly = np.array(parameters.strompreis_euro_pro_wh, float)
self.elect_revenue_per_hour_arr = (
parameters.einspeiseverguetung_euro_pro_wh
if isinstance(parameters.einspeiseverguetung_euro_pro_wh, list)
else np.full(
len(self.load_energy_array), parameters.einspeiseverguetung_euro_pro_wh, float
)
)
def reset(self) -> None:
if self.ev:
@@ -299,7 +298,7 @@ class GeneticSimulation(PydanticBaseModel):
# Default return if no electric vehicle is available
soc_ev_per_hour = np.full((total_hours), 0)
if home_appliance_fast and self.home_appliance_start_hour:
if home_appliance_fast and self.home_appliance_start_hour is not None:
home_appliance_enabled = True
# Pre-allocate arrays for the results, optimized for speed
home_appliance_wh_per_hour = np.full((total_hours), np.nan)
@@ -335,6 +334,13 @@ class GeneticSimulation(PydanticBaseModel):
consumption += loaded_energy_ev
losses_wh_per_hour[hour_idx] += verluste_eauto
# Save battery SOC before inverter processing = true begin-of-interval state.
# Must be recorded here (before DC charge/discharge) so the displayed SOC at
# timestamp T reflects what the battery actually had at the START of interval T,
# not the post-DC result. Consistent with the EV SOC convention above.
if battery_fast:
soc_per_hour[hour_idx] = battery_fast.current_soc_percentage()
# Process inverter logic
energy_feedin_grid_actual = energy_consumption_grid_actual = losses = eigenverbrauch = (
0.0
@@ -351,7 +357,6 @@ class GeneticSimulation(PydanticBaseModel):
# AC PV Battery Charge
if battery_fast:
soc_per_hour[hour_idx] = battery_fast.current_soc_percentage() # save begin state
hour_ac_charge = ac_charge_hours_fast[hour]
if hour_ac_charge > 0.0 and ac_charging_possible:
# Cap charge factor by max_ac_charge_power_w if set
@@ -436,6 +441,9 @@ class GeneticOptimization(OptimizationBase):
self.config.prediction.hours - self.config.optimization.horizon_hours
)
self.ev_possible_charge_values: list[float] = [1.0]
# Separate charge-level list for battery AC charging (independent of EV rates).
# Populated from parameters.pv_akku.charge_rates in optimierung_ems.
self.bat_possible_charge_values: list[float] = [1.0]
self.verbose = verbose
self.fix_seed = fixed_seed
self.optimize_ev = True
@@ -457,29 +465,31 @@ class GeneticOptimization(OptimizationBase):
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Decode the input array into ac_charge, dc_charge, and discharge arrays."""
discharge_hours_bin_np = np.array(discharge_hours_bin)
len_ac = len(self.ev_possible_charge_values)
# Battery AC charge uses its own charge-level list (bat_possible_charge_values).
len_bat = len(self.bat_possible_charge_values)
# Categorization:
# Idle: 0 .. len_ac-1
# Discharge: len_ac .. 2*len_ac - 1
# AC Charge: 2*len_ac .. 3*len_ac - 1
# DC optional: 3*len_ac (not allowed), 3*len_ac + 1 (allowed)
# Categorization (using battery charge levels):
# Idle: 0 .. len_bat-1
# Discharge: len_bat .. 2*len_bat - 1
# AC Charge: 2*len_bat .. 3*len_bat - 1 (maps to bat_possible_charge_values)
# DC optional: 3*len_bat (not allowed), 3*len_bat + 1 (allowed)
# Idle has no charge, Discharge has binary 1, AC Charge has corresponding values
# Idle states
idle_mask = (discharge_hours_bin_np >= 0) & (discharge_hours_bin_np < len_ac)
idle_mask = (discharge_hours_bin_np >= 0) & (discharge_hours_bin_np < len_bat)
# Discharge states
discharge_mask = (discharge_hours_bin_np >= len_ac) & (discharge_hours_bin_np < 2 * len_ac)
discharge_mask = (discharge_hours_bin_np >= len_bat) & (
discharge_hours_bin_np < 2 * len_bat
)
# AC states
ac_mask = (discharge_hours_bin_np >= 2 * len_ac) & (discharge_hours_bin_np < 3 * len_ac)
ac_indices = (discharge_hours_bin_np[ac_mask] - 2 * len_ac).astype(int)
ac_mask = (discharge_hours_bin_np >= 2 * len_bat) & (discharge_hours_bin_np < 3 * len_bat)
ac_indices = (discharge_hours_bin_np[ac_mask] - 2 * len_bat).astype(int)
# DC states (if enabled)
if self.optimize_dc_charge:
dc_not_allowed_state = 3 * len_ac
dc_allowed_state = 3 * len_ac + 1
dc_not_allowed_state = 3 * len_bat
dc_allowed_state = 3 * len_bat + 1
dc_charge = np.where(discharge_hours_bin_np == dc_allowed_state, 1, 0)
else:
dc_charge = np.ones_like(discharge_hours_bin_np, dtype=float)
@@ -489,7 +499,7 @@ class GeneticOptimization(OptimizationBase):
discharge[discharge_mask] = 1 # Set Discharge states to 1
ac_charge = np.zeros_like(discharge_hours_bin_np, dtype=float)
ac_charge[ac_mask] = [self.ev_possible_charge_values[i] for i in ac_indices]
ac_charge[ac_mask] = [self.bat_possible_charge_values[i] for i in ac_indices]
# Idle is just 0, already default.
@@ -497,12 +507,12 @@ class GeneticOptimization(OptimizationBase):
def mutate(self, individual: list[int]) -> tuple[list[int]]:
"""Custom mutation function for the individual."""
# Calculate the number of states
len_ac = len(self.ev_possible_charge_values)
# Calculate the number of states using battery charge levels
len_bat = len(self.bat_possible_charge_values)
if self.optimize_dc_charge:
total_states = 3 * len_ac + 2
total_states = 3 * len_bat + 2
else:
total_states = 3 * len_ac
total_states = 3 * len_bat
# 1. Mutating the charge_discharge part
charge_discharge_part = individual[: self.config.prediction.hours]
@@ -633,30 +643,30 @@ class GeneticOptimization(OptimizationBase):
creator.create("Individual", list, fitness=creator.FitnessMin)
self.toolbox = base.Toolbox()
len_ac = len(self.ev_possible_charge_values)
# Battery state space uses bat_possible_charge_values; EV index space uses ev_possible_charge_values.
len_bat = len(self.bat_possible_charge_values)
len_ev = len(self.ev_possible_charge_values)
# Total number of states without DC:
# Idle: len_ac states
# Discharge: len_ac states
# AC-Charge: len_ac states
# Total without DC: 3 * len_ac
# With DC: + 2 states
# Total battery/discharge states:
# Idle: len_bat states
# Discharge: len_bat states
# AC-Charge: len_bat states (maps to bat_possible_charge_values)
# With DC: + 2 additional states
if self.optimize_dc_charge:
total_states = 3 * len_ac + 2
total_states = 3 * len_bat + 2
else:
total_states = 3 * len_ac
total_states = 3 * len_bat
# State space: 0 .. (total_states - 1)
self.toolbox.register("attr_discharge_state", random.randint, 0, total_states - 1)
# EV attributes
# EV attributes (separate index space)
if self.optimize_ev:
self.toolbox.register(
"attr_ev_charge_index",
random.randint,
0,
len_ac - 1,
len_ev - 1,
)
# Household appliance start time
@@ -666,17 +676,17 @@ class GeneticOptimization(OptimizationBase):
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
self.toolbox.register("mate", tools.cxTwoPoint)
# Mutation operator for charge/discharge states
# Mutation operator for battery charge/discharge states
self.toolbox.register(
"mutate_charge_discharge", tools.mutUniformInt, low=0, up=total_states - 1, indpb=0.2
)
# Mutation operator for EV states
# Mutation operator for EV states (separate index space)
self.toolbox.register(
"mutate_ev_charge_index",
tools.mutUniformInt,
low=0,
up=len_ac - 1,
up=len_ev - 1,
indpb=0.2,
)
@@ -800,7 +810,7 @@ class GeneticOptimization(OptimizationBase):
if np.any(invalid_charge_mask):
invalid_indices = np.where(invalid_charge_mask)[0]
if len(invalid_indices) > 1:
eautocharge_hours_index_tail[invalid_indices[1:]] = 0
eautocharge_hours_index_tail[invalid_indices] = 0
eautocharge_hours_index[-min_length:] = eautocharge_hours_index_tail.tolist()
@@ -1127,6 +1137,25 @@ class GeneticOptimization(OptimizationBase):
else:
self.optimize_ev = False
# Battery AC charge rates — use the battery's configured charge_rates so the
# optimizer can select partial AC charge power (e.g. 10 %, 50 %, 100 %) instead
# of always forcing full power. Falls back to [1.0] when not configured.
if parameters.pv_akku and parameters.pv_akku.charge_rates:
self.bat_possible_charge_values = [
r for r in parameters.pv_akku.charge_rates if r > 0.0
] or [1.0]
elif (
self.config.devices.batteries
and self.config.devices.batteries[0]
and self.config.devices.batteries[0].charge_rates
):
self.bat_possible_charge_values = [
r for r in self.config.devices.batteries[0].charge_rates if r > 0.0
] or [1.0]
else:
self.bat_possible_charge_values = [1.0]
logger.debug("Battery AC charge levels: {}", self.bat_possible_charge_values)
# Initialize household appliance if applicable
dishwasher = (
HomeAppliance(

View File

@@ -407,6 +407,7 @@ class GeneticOptimizationParameters(
max_charge_power_w=battery_config.max_charge_power_w,
min_soc_percentage=battery_config.min_soc_percentage,
max_soc_percentage=battery_config.max_soc_percentage,
charge_rates=battery_config.charge_rates,
)
except:
logger.info(

View File

@@ -262,6 +262,67 @@ class GeneticSolution(ConfigMixin, GeneticParametersBaseModel):
# Fallback → safe idle
return BatteryOperationMode.IDLE, 1.0
def _soc_clamped_operation_factors(
self,
ac_charge: float,
dc_charge: float,
discharge_allowed: bool,
soc_pct: float,
) -> tuple[float, float, bool]:
"""Clamp raw genetic gene values by the battery's actual SOC at that hour.
The raw gene values represent the optimizer's *intent* and are stored
verbatim in the ``genetic_*`` solution columns. This method derives
the *effective* values that can physically be executed given the
battery's state of charge, used for the ``battery1_*_op_*`` columns
and for ``energy_management_plan`` instructions.
Clamping rules:
- AC charge factor: scaled down proportionally when the battery
headroom (max_soc current_soc) is smaller than what the
commanded factor would store in one hour. Set to 0 when full.
- DC charge factor (PV): zeroed when battery is at or above max SOC
(the inverter curtails automatically, but this makes intent clear).
- Discharge: blocked when SOC is at or below min SOC.
"""
bat_list = self.config.devices.batteries
if not bat_list:
return ac_charge, dc_charge, discharge_allowed
bat = bat_list[0]
min_soc = float(bat.min_soc_percentage)
max_soc = float(bat.max_soc_percentage)
capacity_wh = float(bat.capacity_wh)
ch_eff = float(bat.charging_efficiency)
headroom_wh = max(0.0, (max_soc - soc_pct) / 100.0 * capacity_wh)
# --- AC charge: scale to available headroom ---
effective_ac = ac_charge
if effective_ac > 0.0:
if headroom_wh <= 0.0:
effective_ac = 0.0
else:
inv_list = self.config.devices.inverters
ac_to_dc_eff = float(inv_list[0].ac_to_dc_efficiency) if inv_list else 1.0
max_ac_cp_w = (
float(inv_list[0].max_ac_charge_power_w)
if inv_list and inv_list[0].max_ac_charge_power_w is not None
else float(bat.max_charge_power_w)
)
max_dc_per_h_wh = effective_ac * max_ac_cp_w * ac_to_dc_eff * ch_eff
if max_dc_per_h_wh > headroom_wh:
effective_ac = effective_ac * (headroom_wh / max_dc_per_h_wh)
# --- DC charge (PV): zero when battery is full ---
effective_dc = dc_charge
if effective_dc > 0.0 and headroom_wh <= 0.0:
effective_dc = 0.0
# --- Discharge: block at min SOC ---
effective_dis = discharge_allowed and (soc_pct > min_soc)
return effective_ac, effective_dc, effective_dis
def optimization_solution(self) -> OptimizationSolution:
"""Provide the genetic solution as a general optimization solution.
@@ -334,12 +395,26 @@ class GeneticSolution(ConfigMixin, GeneticParametersBaseModel):
ac_charge_hour = self.ac_charge[hour_idx]
dc_charge_hour = self.dc_charge[hour_idx]
discharge_allowed_hour = bool(self.discharge_allowed[hour_idx])
operation_mode, operation_mode_factor = self._battery_operation_from_solution(
ac_charge_hour, dc_charge_hour, discharge_allowed_hour
)
# Raw genetic gene values — optimizer intent, stored verbatim
operation["genetic_ac_charge_factor"].append(ac_charge_hour)
operation["genetic_dc_charge_factor"].append(dc_charge_hour)
operation["genetic_discharge_allowed_factor"].append(discharge_allowed_hour)
operation["genetic_discharge_allowed_factor"].append(float(discharge_allowed_hour))
# SOC-clamped effective values — what can physically be executed at
# this hour given the expected battery state of charge.
result_idx = hour_idx - start_day_hour
soc_h_pct = (
self.result.akku_soc_pro_stunde[result_idx]
if result_idx < len(self.result.akku_soc_pro_stunde)
else 0.0
)
eff_ac, eff_dc, eff_dis = self._soc_clamped_operation_factors(
ac_charge_hour, dc_charge_hour, discharge_allowed_hour, soc_h_pct
)
operation_mode, operation_mode_factor = self._battery_operation_from_solution(
eff_ac, eff_dc, eff_dis
)
for mode in BatteryOperationMode:
mode_key = f"battery1_{mode.lower()}_op_mode"
factor_key = f"battery1_{mode.lower()}_op_factor"
@@ -561,10 +636,24 @@ class GeneticSolution(ConfigMixin, GeneticParametersBaseModel):
for hour_idx, rate in enumerate(self.ac_charge):
if hour_idx < start_day_hour:
continue
operation_mode, operation_mode_factor = self._battery_operation_from_solution(
# Derive SOC-clamped effective factors so that FRBCInstruction
# operation_mode_factor reflects what can physically be executed,
# while the raw genetic gene values are preserved in the solution
# dataframe (genetic_*_factor columns).
result_idx = hour_idx - start_day_hour
soc_h_pct = (
self.result.akku_soc_pro_stunde[result_idx]
if result_idx < len(self.result.akku_soc_pro_stunde)
else 0.0
)
eff_ac, eff_dc, eff_dis = self._soc_clamped_operation_factors(
self.ac_charge[hour_idx],
self.dc_charge[hour_idx],
bool(self.discharge_allowed[hour_idx]),
soc_h_pct,
)
operation_mode, operation_mode_factor = self._battery_operation_from_solution(
eff_ac, eff_dc, eff_dis
)
if (
operation_mode == last_operation_mode