Files
EOS/src/akkudoktoreos/optimization/genetic/genetic.py
Bobby Noelte 18b580cabe
Some checks failed
Close stale pull requests/issues / Find Stale issues and PRs (push) Has been cancelled
docker-build / platform-excludes (push) Has been cancelled
pre-commit / pre-commit (push) Has been cancelled
Run Pytest on Pull Request / test (push) Has been cancelled
docker-build / build (push) Has been cancelled
docker-build / merge (push) Has been cancelled
fix: ensure EV charge rates settings available
Allow charge rates for electric vehicle to be provided by the POST
optimize endpoint. Create a default value in case neither the
parameters nor the configuration provide charge rates.

This is also to allow to migrate from 0.1.0 configuration format
to actual one.

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
2025-10-30 17:25:26 +01:00

959 lines
38 KiB
Python

"""Genetic algorithm."""
import random
import time
from typing import Any, Optional
import numpy as np
from deap import algorithms, base, creator, tools
from loguru import logger
from numpydantic import NDArray, Shape
from pydantic import ConfigDict, Field
from akkudoktoreos.core.pydantic import PydanticBaseModel
from akkudoktoreos.devices.genetic.battery import Battery
from akkudoktoreos.devices.genetic.homeappliance import HomeAppliance
from akkudoktoreos.devices.genetic.inverter import Inverter
from akkudoktoreos.optimization.genetic.geneticparams import (
GeneticEnergyManagementParameters,
GeneticOptimizationParameters,
)
from akkudoktoreos.optimization.genetic.geneticsolution import (
GeneticSimulationResult,
GeneticSolution,
)
from akkudoktoreos.optimization.optimizationabc import OptimizationBase
class GeneticSimulation(PydanticBaseModel):
"""Device simulation for GENETIC optimization algorithm."""
# Disable validation on assignment to speed up simulation runs.
model_config = ConfigDict(
validate_assignment=False,
)
start_hour: int = Field(
default=0, ge=0, le=23, description="Starting hour on day for optimizations."
)
optimization_hours: Optional[int] = Field(
default=24, ge=0, description="Number of hours into the future for optimizations."
)
prediction_hours: Optional[int] = Field(
default=48, ge=0, description="Number of hours into the future for predictions"
)
load_energy_array: Optional[NDArray[Shape["*"], float]] = Field(
default=None,
description="An array of floats representing the total load (consumption) in watts for different time intervals.",
)
pv_prediction_wh: Optional[NDArray[Shape["*"], float]] = Field(
default=None,
description="An array of floats representing the forecasted photovoltaic output in watts for different time intervals.",
)
elect_price_hourly: Optional[NDArray[Shape["*"], float]] = Field(
default=None,
description="An array of floats representing the electricity price in euros per watt-hour for different time intervals.",
)
elect_revenue_per_hour_arr: Optional[NDArray[Shape["*"], float]] = Field(
default=None,
description="An array of floats representing the feed-in compensation in euros per watt-hour.",
)
battery: Optional[Battery] = Field(default=None, description="TBD.")
ev: Optional[Battery] = Field(default=None, description="TBD.")
home_appliance: Optional[HomeAppliance] = Field(default=None, description="TBD.")
inverter: Optional[Inverter] = Field(default=None, description="TBD.")
ac_charge_hours: Optional[NDArray[Shape["*"], float]] = Field(default=None, description="TBD")
dc_charge_hours: Optional[NDArray[Shape["*"], float]] = Field(default=None, description="TBD")
ev_charge_hours: Optional[NDArray[Shape["*"], float]] = Field(default=None, description="TBD")
def prepare(
self,
parameters: GeneticEnergyManagementParameters,
optimization_hours: int,
prediction_hours: int,
ev: Optional[Battery] = None,
home_appliance: Optional[HomeAppliance] = None,
inverter: Optional[Inverter] = None,
) -> None:
self.optimization_hours = optimization_hours
self.prediction_hours = prediction_hours
self.load_energy_array = np.array(parameters.gesamtlast, float)
self.pv_prediction_wh = np.array(parameters.pv_prognose_wh, float)
self.elect_price_hourly = np.array(parameters.strompreis_euro_pro_wh, float)
self.elect_revenue_per_hour_arr = (
parameters.einspeiseverguetung_euro_pro_wh
if isinstance(parameters.einspeiseverguetung_euro_pro_wh, list)
else np.full(
len(self.load_energy_array), parameters.einspeiseverguetung_euro_pro_wh, float
)
)
if inverter:
self.battery = inverter.battery
else:
self.battery = None
self.ev = ev
self.home_appliance = home_appliance
self.inverter = inverter
self.ac_charge_hours = np.full(self.prediction_hours, 0.0)
self.dc_charge_hours = np.full(self.prediction_hours, 1.0)
self.ev_charge_hours = np.full(self.prediction_hours, 0.0)
"""Prepare simulation runs."""
self.load_energy_array = np.array(parameters.gesamtlast, float)
self.pv_prediction_wh = np.array(parameters.pv_prognose_wh, float)
self.elect_price_hourly = np.array(parameters.strompreis_euro_pro_wh, float)
self.elect_revenue_per_hour_arr = (
parameters.einspeiseverguetung_euro_pro_wh
if isinstance(parameters.einspeiseverguetung_euro_pro_wh, list)
else np.full(
len(self.load_energy_array), parameters.einspeiseverguetung_euro_pro_wh, float
)
)
def set_akku_discharge_hours(self, ds: np.ndarray) -> None:
if self.battery:
self.battery.set_discharge_per_hour(ds)
def set_akku_ac_charge_hours(self, ds: np.ndarray) -> None:
self.ac_charge_hours = ds
def set_akku_dc_charge_hours(self, ds: np.ndarray) -> None:
self.dc_charge_hours = ds
def set_ev_charge_hours(self, ds: np.ndarray) -> None:
self.ev_charge_hours = ds
def set_home_appliance_start(self, ds: int, global_start_hour: int = 0) -> None:
if self.home_appliance:
self.home_appliance.set_starting_time(ds, global_start_hour=global_start_hour)
def reset(self) -> None:
if self.ev:
self.ev.reset()
if self.battery:
self.battery.reset()
def simulate(self, start_hour: int) -> dict[str, Any]:
"""Simulate energy usage and costs for the given start hour.
akku_soc_pro_stunde begin of the hour, initial hour state!
last_wh_pro_stunde integral of last hour (end state)
"""
# Remember start hour
self.start_hour = start_hour
# Check for simulation integrity
required_attrs = [
"load_energy_array",
"pv_prediction_wh",
"elect_price_hourly",
"ev_charge_hours",
"ac_charge_hours",
"dc_charge_hours",
"elect_revenue_per_hour_arr",
]
missing_data = [
attr.replace("_", " ").title() for attr in required_attrs if getattr(self, attr) is None
]
if missing_data:
logger.error("Mandatory data missing - %s", ", ".join(missing_data))
raise ValueError(f"Mandatory data missing: {', '.join(missing_data)}")
# Pre-fetch data
load_energy_array = np.array(self.load_energy_array)
pv_prediction_wh = np.array(self.pv_prediction_wh)
elect_price_hourly = np.array(self.elect_price_hourly)
ev_charge_hours = np.array(self.ev_charge_hours)
ac_charge_hours = np.array(self.ac_charge_hours)
dc_charge_hours = np.array(self.dc_charge_hours)
elect_revenue_per_hour_arr = np.array(self.elect_revenue_per_hour_arr)
# Fetch objects
battery = self.battery
ev = self.ev
home_appliance = self.home_appliance
inverter = self.inverter
if not (len(load_energy_array) == len(pv_prediction_wh) == len(elect_price_hourly)):
error_msg = f"Array sizes do not match: Load Curve = {len(load_energy_array)}, PV Forecast = {len(pv_prediction_wh)}, Electricity Price = {len(elect_price_hourly)}"
logger.error(error_msg)
raise ValueError(error_msg)
end_hour = len(load_energy_array)
total_hours = end_hour - start_hour
# Pre-allocate arrays for the results, optimized for speed
loads_energy_per_hour = np.full((total_hours), np.nan)
feedin_energy_per_hour = np.full((total_hours), np.nan)
consumption_energy_per_hour = np.full((total_hours), np.nan)
costs_per_hour = np.full((total_hours), np.nan)
revenue_per_hour = np.full((total_hours), np.nan)
soc_per_hour = np.full((total_hours), np.nan)
soc_ev_per_hour = np.full((total_hours), np.nan)
losses_wh_per_hour = np.full((total_hours), np.nan)
home_appliance_wh_per_hour = np.full((total_hours), np.nan)
electricity_price_per_hour = np.full((total_hours), np.nan)
# Set initial state
if battery:
soc_per_hour[0] = battery.current_soc_percentage()
if ev:
soc_ev_per_hour[0] = ev.current_soc_percentage()
for hour in range(start_hour, end_hour):
hour_idx = hour - start_hour
# save begin states
if battery:
soc_per_hour[hour_idx] = battery.current_soc_percentage()
if ev:
soc_ev_per_hour[hour_idx] = ev.current_soc_percentage()
# Accumulate loads and PV generation
consumption = load_energy_array[hour]
losses_wh_per_hour[hour_idx] = 0.0
# Home appliances
if home_appliance:
ha_load = home_appliance.get_load_for_hour(hour)
consumption += ha_load
home_appliance_wh_per_hour[hour_idx] = ha_load
# E-Auto handling
if ev and ev_charge_hours[hour] > 0:
loaded_energy_ev, verluste_eauto = ev.charge_energy(
None, hour, relative_power=ev_charge_hours[hour]
)
consumption += loaded_energy_ev
losses_wh_per_hour[hour_idx] += verluste_eauto
# Process inverter logic
energy_feedin_grid_actual = energy_consumption_grid_actual = losses = eigenverbrauch = (
0.0
)
hour_ac_charge = ac_charge_hours[hour]
hour_dc_charge = dc_charge_hours[hour]
hourly_electricity_price = elect_price_hourly[hour]
hourly_energy_revenue = elect_revenue_per_hour_arr[hour]
if battery:
battery.set_charge_allowed_for_hour(hour_dc_charge, hour)
if inverter:
energy_produced = pv_prediction_wh[hour]
(
energy_feedin_grid_actual,
energy_consumption_grid_actual,
losses,
eigenverbrauch,
) = inverter.process_energy(energy_produced, consumption, hour)
# AC PV Battery Charge
if battery and hour_ac_charge > 0.0:
battery.set_charge_allowed_for_hour(1, hour)
battery_charged_energy_actual, battery_losses_actual = battery.charge_energy(
None, hour, relative_power=hour_ac_charge
)
total_battery_energy = battery_charged_energy_actual + battery_losses_actual
consumption += total_battery_energy
energy_consumption_grid_actual += total_battery_energy
losses_wh_per_hour[hour_idx] += battery_losses_actual
# Update hourly arrays
feedin_energy_per_hour[hour_idx] = energy_feedin_grid_actual
consumption_energy_per_hour[hour_idx] = energy_consumption_grid_actual
losses_wh_per_hour[hour_idx] += losses
loads_energy_per_hour[hour_idx] = consumption
electricity_price_per_hour[hour_idx] = hourly_electricity_price
# Financial calculations
costs_per_hour[hour_idx] = energy_consumption_grid_actual * hourly_electricity_price
revenue_per_hour[hour_idx] = energy_feedin_grid_actual * hourly_energy_revenue
total_cost = np.nansum(costs_per_hour)
total_losses = np.nansum(losses_wh_per_hour)
total_revenue = np.nansum(revenue_per_hour)
# Prepare output dictionary
return {
"Last_Wh_pro_Stunde": loads_energy_per_hour,
"Netzeinspeisung_Wh_pro_Stunde": feedin_energy_per_hour,
"Netzbezug_Wh_pro_Stunde": consumption_energy_per_hour,
"Kosten_Euro_pro_Stunde": costs_per_hour,
"akku_soc_pro_stunde": soc_per_hour,
"Einnahmen_Euro_pro_Stunde": revenue_per_hour,
"Gesamtbilanz_Euro": total_cost - total_revenue,
"EAuto_SoC_pro_Stunde": soc_ev_per_hour,
"Gesamteinnahmen_Euro": total_revenue,
"Gesamtkosten_Euro": total_cost,
"Verluste_Pro_Stunde": losses_wh_per_hour,
"Gesamt_Verluste": total_losses,
"Home_appliance_wh_per_hour": home_appliance_wh_per_hour,
"Electricity_price": electricity_price_per_hour,
}
class GeneticOptimization(OptimizationBase):
"""GENETIC algorithm to solve energy optimization."""
def __init__(
self,
verbose: bool = False,
fixed_seed: Optional[int] = None,
):
"""Initialize the optimization problem with the required parameters."""
self.opti_param: dict[str, Any] = {}
self.fixed_eauto_hours = (
self.config.prediction.hours - self.config.optimization.horizon_hours
)
self.ev_possible_charge_values: list[float] = [1.0]
self.verbose = verbose
self.fix_seed = fixed_seed
self.optimize_ev = True
self.optimize_dc_charge = False
self.fitness_history: dict[str, Any] = {}
# Set a fixed seed for random operations if provided or in debug mode
if self.fix_seed is not None:
random.seed(self.fix_seed)
elif logger.level == "DEBUG":
self.fix_seed = random.randint(1, 100000000000) # noqa: S311
random.seed(self.fix_seed)
# Create Simulation
self.simulation = GeneticSimulation()
def decode_charge_discharge(
self, discharge_hours_bin: np.ndarray
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Decode the input array into ac_charge, dc_charge, and discharge arrays."""
discharge_hours_bin_np = np.array(discharge_hours_bin)
len_ac = len(self.ev_possible_charge_values)
# Categorization:
# Idle: 0 .. len_ac-1
# Discharge: len_ac .. 2*len_ac - 1
# AC Charge: 2*len_ac .. 3*len_ac - 1
# DC optional: 3*len_ac (not allowed), 3*len_ac + 1 (allowed)
# Idle has no charge, Discharge has binary 1, AC Charge has corresponding values
# Idle states
idle_mask = (discharge_hours_bin_np >= 0) & (discharge_hours_bin_np < len_ac)
# Discharge states
discharge_mask = (discharge_hours_bin_np >= len_ac) & (discharge_hours_bin_np < 2 * len_ac)
# AC states
ac_mask = (discharge_hours_bin_np >= 2 * len_ac) & (discharge_hours_bin_np < 3 * len_ac)
ac_indices = (discharge_hours_bin_np[ac_mask] - 2 * len_ac).astype(int)
# DC states (if enabled)
if self.optimize_dc_charge:
dc_not_allowed_state = 3 * len_ac
dc_allowed_state = 3 * len_ac + 1
dc_charge = np.where(discharge_hours_bin_np == dc_allowed_state, 1, 0)
else:
dc_charge = np.ones_like(discharge_hours_bin_np, dtype=float)
# Generate the result arrays
discharge = np.zeros_like(discharge_hours_bin_np, dtype=int)
discharge[discharge_mask] = 1 # Set Discharge states to 1
ac_charge = np.zeros_like(discharge_hours_bin_np, dtype=float)
ac_charge[ac_mask] = [self.ev_possible_charge_values[i] for i in ac_indices]
# Idle is just 0, already default.
return ac_charge, dc_charge, discharge
def mutate(self, individual: list[int]) -> tuple[list[int]]:
"""Custom mutation function for the individual."""
# Calculate the number of states
len_ac = len(self.ev_possible_charge_values)
if self.optimize_dc_charge:
total_states = 3 * len_ac + 2
else:
total_states = 3 * len_ac
# 1. Mutating the charge_discharge part
charge_discharge_part = individual[: self.config.prediction.hours]
(charge_discharge_mutated,) = self.toolbox.mutate_charge_discharge(charge_discharge_part)
# Instead of a fixed clamping to 0..8 or 0..6 dynamically:
charge_discharge_mutated = np.clip(charge_discharge_mutated, 0, total_states - 1)
individual[: self.config.prediction.hours] = charge_discharge_mutated
# 2. Mutating the EV charge part, if active
if self.optimize_ev:
ev_charge_part = individual[
self.config.prediction.hours : self.config.prediction.hours * 2
]
(ev_charge_part_mutated,) = self.toolbox.mutate_ev_charge_index(ev_charge_part)
ev_charge_part_mutated[self.config.prediction.hours - self.fixed_eauto_hours :] = [
0
] * self.fixed_eauto_hours
individual[self.config.prediction.hours : self.config.prediction.hours * 2] = (
ev_charge_part_mutated
)
# 3. Mutating the appliance start time, if applicable
if self.opti_param["home_appliance"] > 0:
appliance_part = [individual[-1]]
(appliance_part_mutated,) = self.toolbox.mutate_hour(appliance_part)
individual[-1] = appliance_part_mutated[0]
return (individual,)
# Method to create an individual based on the conditions
def create_individual(self) -> list[int]:
# Start with discharge states for the individual
individual_components = [
self.toolbox.attr_discharge_state() for _ in range(self.config.prediction.hours)
]
# Add EV charge index values if optimize_ev is True
if self.optimize_ev:
individual_components += [
self.toolbox.attr_ev_charge_index() for _ in range(self.config.prediction.hours)
]
# Add the start time of the household appliance if it's being optimized
if self.opti_param["home_appliance"] > 0:
individual_components += [self.toolbox.attr_int()]
return creator.Individual(individual_components)
def merge_individual(
self,
discharge_hours_bin: np.ndarray,
eautocharge_hours_index: Optional[np.ndarray],
washingstart_int: Optional[int],
) -> list[int]:
"""Merge the individual components back into a single solution list.
Parameters:
discharge_hours_bin (np.ndarray): Binary discharge hours.
eautocharge_hours_index (Optional[np.ndarray]): EV charge hours as integers, or None.
washingstart_int (Optional[int]): Dishwasher start time as integer, or None.
Returns:
list[int]: The merged individual solution as a list of integers.
"""
# Start with the discharge hours
individual = discharge_hours_bin.tolist()
# Add EV charge hours if applicable
if self.optimize_ev and eautocharge_hours_index is not None:
individual.extend(eautocharge_hours_index.tolist())
elif self.optimize_ev:
# Falls optimize_ev aktiv ist, aber keine EV-Daten vorhanden sind, fügen wir Nullen hinzu
individual.extend([0] * self.config.prediction.hours)
# Add dishwasher start time if applicable
if self.opti_param.get("home_appliance", 0) > 0 and washingstart_int is not None:
individual.append(washingstart_int)
elif self.opti_param.get("home_appliance", 0) > 0:
# Falls ein Haushaltsgerät optimiert wird, aber kein Startzeitpunkt vorhanden ist
individual.append(0)
return individual
def split_individual(
self, individual: list[int]
) -> tuple[np.ndarray, Optional[np.ndarray], Optional[int]]:
"""Split the individual solution into its components.
Components:
1. Discharge hours (binary as int NumPy array),
2. Electric vehicle charge hours (float as int NumPy array, if applicable),
3. Dishwasher start time (integer if applicable).
"""
# Discharge hours as a NumPy array of ints
discharge_hours_bin = np.array(individual[: self.config.prediction.hours], dtype=int)
# EV charge hours as a NumPy array of ints (if optimize_ev is True)
eautocharge_hours_index = (
# append ev charging states to individual
np.array(
individual[self.config.prediction.hours : self.config.prediction.hours * 2],
dtype=int,
)
if self.optimize_ev
else None
)
# Washing machine start time as an integer (if applicable)
washingstart_int = (
int(individual[-1])
if self.opti_param and self.opti_param.get("home_appliance", 0) > 0
else None
)
return discharge_hours_bin, eautocharge_hours_index, washingstart_int
def setup_deap_environment(self, opti_param: dict[str, Any], start_hour: int) -> None:
"""Set up the DEAP environment with fitness and individual creation rules."""
self.opti_param = opti_param
# Remove existing definitions if any
for attr in ["FitnessMin", "Individual"]:
if attr in creator.__dict__:
del creator.__dict__[attr]
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)
self.toolbox = base.Toolbox()
len_ac = len(self.ev_possible_charge_values)
# Total number of states without DC:
# Idle: len_ac states
# Discharge: len_ac states
# AC-Charge: len_ac states
# Total without DC: 3 * len_ac
# With DC: + 2 states
if self.optimize_dc_charge:
total_states = 3 * len_ac + 2
else:
total_states = 3 * len_ac
# State space: 0 .. (total_states - 1)
self.toolbox.register("attr_discharge_state", random.randint, 0, total_states - 1)
# EV attributes
if self.optimize_ev:
self.toolbox.register(
"attr_ev_charge_index",
random.randint,
0,
len_ac - 1,
)
# Household appliance start time
self.toolbox.register("attr_int", random.randint, start_hour, 23)
self.toolbox.register("individual", self.create_individual)
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
self.toolbox.register("mate", tools.cxTwoPoint)
# Mutation operator for charge/discharge states
self.toolbox.register(
"mutate_charge_discharge", tools.mutUniformInt, low=0, up=total_states - 1, indpb=0.2
)
# Mutation operator for EV states
self.toolbox.register(
"mutate_ev_charge_index",
tools.mutUniformInt,
low=0,
up=len_ac - 1,
indpb=0.2,
)
# Mutation for household appliance
self.toolbox.register("mutate_hour", tools.mutUniformInt, low=start_hour, up=23, indpb=0.2)
# Custom mutate function remains unchanged
self.toolbox.register("mutate", self.mutate)
self.toolbox.register("select", tools.selTournament, tournsize=3)
def evaluate_inner(self, individual: list[int]) -> dict[str, Any]:
"""Simulates the energy management system (EMS) using the provided individual solution.
This is an internal function.
"""
self.simulation.reset()
discharge_hours_bin, eautocharge_hours_index, washingstart_int = self.split_individual(
individual
)
if self.opti_param.get("home_appliance", 0) > 0 and washingstart_int:
self.simulation.set_home_appliance_start(
washingstart_int, global_start_hour=self.ems.start_datetime.hour
)
ac, dc, discharge = self.decode_charge_discharge(discharge_hours_bin)
self.simulation.set_akku_discharge_hours(discharge)
# Set DC charge hours only if DC optimization is enabled
if self.optimize_dc_charge:
self.simulation.set_akku_dc_charge_hours(dc)
self.simulation.set_akku_ac_charge_hours(ac)
if eautocharge_hours_index is not None:
eautocharge_hours_float = np.array(
[self.ev_possible_charge_values[i] for i in eautocharge_hours_index],
float,
)
self.simulation.set_ev_charge_hours(eautocharge_hours_float)
else:
self.simulation.set_ev_charge_hours(np.full(self.config.prediction.hours, 0))
# Do the simulation and return result.
return self.simulation.simulate(self.ems.start_datetime.hour)
def evaluate(
self,
individual: list[int],
parameters: GeneticOptimizationParameters,
start_hour: int,
worst_case: bool,
) -> tuple[float]:
"""Evaluate the fitness of an individual solution based on the simulation results."""
try:
o = self.evaluate_inner(individual)
except Exception as e:
return (100000.0,) # Return a high penalty in case of an exception
gesamtbilanz = o["Gesamtbilanz_Euro"] * (-1.0 if worst_case else 1.0)
discharge_hours_bin, eautocharge_hours_index, washingstart_int = self.split_individual(
individual
)
# EV 100% & charge not allowed
if self.optimize_ev:
eauto_soc_per_hour = np.array(o.get("EAuto_SoC_pro_Stunde", [])) # Beispielkey
if eauto_soc_per_hour is None or eautocharge_hours_index is None:
raise ValueError("eauto_soc_per_hour or eautocharge_hours_index is None")
min_length = min(eauto_soc_per_hour.size, eautocharge_hours_index.size)
eauto_soc_per_hour_tail = eauto_soc_per_hour[-min_length:]
eautocharge_hours_index_tail = eautocharge_hours_index[-min_length:]
# Mask
invalid_charge_mask = (eauto_soc_per_hour_tail == 100) & (
eautocharge_hours_index_tail > 0
)
if np.any(invalid_charge_mask):
invalid_indices = np.where(invalid_charge_mask)[0]
if len(invalid_indices) > 1:
eautocharge_hours_index_tail[invalid_indices[1:]] = 0
eautocharge_hours_index[-min_length:] = eautocharge_hours_index_tail.tolist()
adjusted_individual = self.merge_individual(
discharge_hours_bin, eautocharge_hours_index, washingstart_int
)
individual[:] = adjusted_individual
# New check: Activate discharge when battery SoC is 0
# battery_soc_per_hour = np.array(
# o.get("akku_soc_pro_stunde", [])
# ) # Example key for battery SoC
# if battery_soc_per_hour is not None:
# if battery_soc_per_hour is None or discharge_hours_bin is None:
# raise ValueError("battery_soc_per_hour or discharge_hours_bin is None")
# min_length = min(battery_soc_per_hour.size, discharge_hours_bin.size)
# battery_soc_per_hour_tail = battery_soc_per_hour[-min_length:]
# discharge_hours_bin_tail = discharge_hours_bin[-min_length:]
# len_ac = len(self.config.optimization.ev_available_charge_rates_percent)
# # # Find hours where battery SoC is 0
# # zero_soc_mask = battery_soc_per_hour_tail == 0
# # discharge_hours_bin_tail[zero_soc_mask] = (
# # len_ac + 2
# # ) # Activate discharge for these hours
# # When Battery SoC then set the Discharge randomly to 0 or 1. otherwise it's very
# # unlikely to get a state where a battery can store energy for a longer time
# # Find hours where battery SoC is 0
# zero_soc_mask = battery_soc_per_hour_tail == 0
# # discharge_hours_bin_tail[zero_soc_mask] = (
# # len_ac + 2
# # ) # Activate discharge for these hours
# set_to_len_ac_plus_2 = np.random.rand() < 0.5 # True mit 50% Wahrscheinlichkeit
# # Werte setzen basierend auf der zufälligen Entscheidung
# value_to_set = len_ac + 2 if set_to_len_ac_plus_2 else 0
# discharge_hours_bin_tail[zero_soc_mask] = value_to_set
# # Merge the updated discharge_hours_bin back into the individual
# adjusted_individual = self.merge_individual(
# discharge_hours_bin, eautocharge_hours_index, washingstart_int
# )
# individual[:] = adjusted_individual
# More metrics
individual.extra_data = ( # type: ignore[attr-defined]
o["Gesamtbilanz_Euro"],
o["Gesamt_Verluste"],
parameters.eauto.min_soc_percentage - self.simulation.ev.current_soc_percentage()
if parameters.eauto and self.simulation.ev
else 0,
)
# Adjust total balance with battery value and penalties for unmet SOC
if self.simulation.battery:
restwert_akku = (
self.simulation.battery.current_energy_content()
* parameters.ems.preis_euro_pro_wh_akku
)
gesamtbilanz += -restwert_akku
if self.optimize_ev:
try:
penalty = self.config.optimization.genetic.penalties["ev_soc_miss"]
except:
# Use default
penalty = 10
logger.error(
"Penalty function parameter `ev_soc_miss` not configured, using {}.", penalty
)
gesamtbilanz += max(
0,
(
parameters.eauto.min_soc_percentage
- self.simulation.ev.current_soc_percentage()
if parameters.eauto and self.simulation.ev
else 0
)
* penalty,
)
return (gesamtbilanz,)
def optimize(
self,
start_solution: Optional[list[float]] = None,
ngen: int = 200,
) -> tuple[Any, dict[str, list[Any]]]:
"""Run the optimization process using a genetic algorithm.
@TODO: optimize() ngen default (200) is different from optimierung_ems() ngen default (400).
"""
# Set the number of inviduals in a generation
try:
individuals = self.config.optimization.genetic.individuals
if individuals is None:
raise
except:
individuals = 300
logger.error("Individuals not configured. Using {}.", individuals)
population = self.toolbox.population(n=individuals)
hof = tools.HallOfFame(1)
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("min", np.min)
stats.register("avg", np.mean)
stats.register("max", np.max)
logger.debug("Start optimize: {}", start_solution)
# Insert the start solution into the population if provided
if start_solution is not None:
for _ in range(10):
population.insert(0, creator.Individual(start_solution))
# Run the evolutionary algorithm
pop, log = algorithms.eaMuPlusLambda(
population,
self.toolbox,
mu=100,
lambda_=150,
cxpb=0.6,
mutpb=0.4,
ngen=ngen,
stats=stats,
halloffame=hof,
verbose=self.verbose,
)
# Store fitness history
self.fitness_history = {
"gen": log.select("gen"), # Generation numbers (X-axis)
"avg": log.select("avg"), # Average fitness for each generation (Y-axis)
"max": log.select("max"), # Maximum fitness for each generation (Y-axis)
"min": log.select("min"), # Minimum fitness for each generation (Y-axis)
}
member: dict[str, list[float]] = {"bilanz": [], "verluste": [], "nebenbedingung": []}
for ind in population:
if hasattr(ind, "extra_data"):
extra_value1, extra_value2, extra_value3 = ind.extra_data
member["bilanz"].append(extra_value1)
member["verluste"].append(extra_value2)
member["nebenbedingung"].append(extra_value3)
return hof[0], member
def optimierung_ems(
self,
parameters: GeneticOptimizationParameters,
start_hour: Optional[int] = None,
worst_case: bool = False,
ngen: Optional[int] = None,
) -> GeneticSolution:
"""Perform EMS (Energy Management System) optimization and visualize results."""
if start_hour is None:
start_hour = self.ems.start_datetime.hour
# Start hour has to be in sync with energy management
if start_hour != self.ems.start_datetime.hour:
raise ValueError(
f"Start hour not synced. EMS {self.ems.start_datetime.hour} vs. GENETIC {start_hour}."
)
# Set the number of generations
generations = ngen
if generations is None:
try:
generations = self.config.optimization.genetic.generations
except:
generations = 400
logger.error("Generations not configured. Using {}.", generations)
einspeiseverguetung_euro_pro_wh = np.full(
self.config.prediction.hours, parameters.ems.einspeiseverguetung_euro_pro_wh
)
self.simulation.reset()
# Initialize PV and EV batteries
akku: Optional[Battery] = None
if parameters.pv_akku:
akku = Battery(
parameters.pv_akku,
prediction_hours=self.config.prediction.hours,
)
akku.set_charge_per_hour(np.full(self.config.prediction.hours, 1))
eauto: Optional[Battery] = None
if parameters.eauto:
eauto = Battery(
parameters.eauto,
prediction_hours=self.config.prediction.hours,
)
eauto.set_charge_per_hour(np.full(self.config.prediction.hours, 1))
self.optimize_ev = (
parameters.eauto.min_soc_percentage - parameters.eauto.initial_soc_percentage >= 0
)
# electrical vehicle charge rates
if parameters.eauto.charge_rates is not None:
self.ev_possible_charge_values = parameters.eauto.charge_rates
elif (
self.config.devices.electric_vehicles
and self.config.devices.electric_vehicles[0]
and self.config.devices.electric_vehicles[0].charge_rates is not None
):
self.ev_possible_charge_values = self.config.devices.electric_vehicles[
0
].charge_rates
else:
warning_msg = "No charge rates provided for electric vehicle - using default."
logger.warning(warning_msg)
self.ev_possible_charge_values = [
0.0,
0.1,
0.2,
0.3,
0.4,
0.5,
0.6,
0.7,
0.8,
0.9,
1.0,
]
else:
self.optimize_ev = False
# Initialize household appliance if applicable
dishwasher = (
HomeAppliance(
parameters=parameters.dishwasher,
optimization_hours=self.config.optimization.horizon_hours,
prediction_hours=self.config.prediction.hours,
)
if parameters.dishwasher is not None
else None
)
# Initialize the inverter and energy management system
inverter: Optional[Inverter] = None
if parameters.inverter:
inverter = Inverter(
parameters.inverter,
battery=akku,
)
# Prepare device simulation
self.simulation.prepare(
parameters=parameters.ems,
optimization_hours=self.config.optimization.horizon_hours,
prediction_hours=self.config.prediction.hours,
inverter=inverter, # battery is part of inverter
ev=eauto,
home_appliance=dishwasher,
)
# Setup the DEAP environment and optimization process
self.setup_deap_environment({"home_appliance": 1 if dishwasher else 0}, start_hour)
self.toolbox.register(
"evaluate",
lambda ind: self.evaluate(ind, parameters, start_hour, worst_case),
)
start_time = time.time()
start_solution, extra_data = self.optimize(parameters.start_solution, ngen=generations)
elapsed_time = time.time() - start_time
logger.debug(f"Time evaluate inner: {elapsed_time:.4f} sec.")
# Perform final evaluation on the best solution
simulation_result = self.evaluate_inner(start_solution)
# Prepare results
discharge_hours_bin, eautocharge_hours_index, washingstart_int = self.split_individual(
start_solution
)
# home appliance may have choosen a different appliance start hour
if self.simulation.home_appliance:
washingstart_int = self.simulation.home_appliance.get_appliance_start()
eautocharge_hours_float = (
[self.ev_possible_charge_values[i] for i in eautocharge_hours_index]
if eautocharge_hours_index is not None
else None
)
ac_charge, dc_charge, discharge = self.decode_charge_discharge(discharge_hours_bin)
# Visualize the results
visualize = {
"ac_charge": ac_charge.tolist(),
"dc_charge": dc_charge.tolist(),
"discharge_allowed": discharge.tolist(),
"eautocharge_hours_float": eautocharge_hours_float,
"result": simulation_result,
"eauto_obj": self.simulation.ev.to_dict() if self.simulation.ev else None,
"start_solution": start_solution,
"spuelstart": washingstart_int,
"extra_data": extra_data,
"fitness_history": self.fitness_history,
"fixed_seed": self.fix_seed,
}
from akkudoktoreos.utils.visualize import prepare_visualize
prepare_visualize(parameters, visualize, start_hour=start_hour)
return GeneticSolution(
**{
"ac_charge": ac_charge,
"dc_charge": dc_charge,
"discharge_allowed": discharge,
"eautocharge_hours_float": eautocharge_hours_float,
"result": GeneticSimulationResult(**simulation_result),
"eauto_obj": self.simulation.ev,
"start_solution": start_solution,
"washingstart": washingstart_int,
}
)