class_ems: AC / DC Charging

class_optimize: Timing Bugs fixed
class_numpy_encoder: JSON Encoder with Numpy support
visualize: AC / DC / Discharge
test_class_ems_2: New Test for AC / DC charging decision
This commit is contained in:
Andreas 2024-10-16 15:40:04 +02:00 committed by Andreas
parent cafed7eaca
commit 87ec02a90e
8 changed files with 585 additions and 240 deletions

View File

@ -5,60 +5,63 @@ import time
# Import necessary modules from the project # Import necessary modules from the project
from akkudoktoreos.class_optimize import optimization_problem from akkudoktoreos.class_optimize import optimization_problem
from akkudoktoreos.visualize import *
start_hour = 10 from akkudoktoreos.class_numpy_encoder import *
start_hour = 0
# PV Forecast (in W) # PV Forecast (in W)
pv_forecast = [ pv_forecast = np.zeros(48)
0, pv_forecast[12] = 5000
0, # [
0, # 0,
0, # 0,
0, # 0,
0, # 0,
0, # 0,
8.05, # 0,
352.91, # 0,
728.51, # 8.05,
930.28, # 352.91,
1043.25, # 728.51,
1106.74, # 930.28,
1161.69, # 1043.25,
6018.82, # 1106.74,
5519.07, # 1161.69,
3969.88, # 1018.82,
3017.96, # 1519.07,
1943.07, # 1969.88,
1007.17, # 1017.96,
319.67, # 1043.07,
7.88, # 1007.17,
0, # 319.67,
0, # 7.88,
0, # 0,
0, # 0,
0, # 0,
0, # 0,
0, # 0,
0, # 0,
0, # 0,
5.04, # 0,
335.59, # 0,
705.32, # 5.04,
1121.12, # 335.59,
1604.79, # 705.32,
2157.38, # 1121.12,
1433.25, # 1604.79,
5718.49, # 2157.38,
4553.96, # 1433.25,
3027.55, # 5718.49,
2574.46, # 4553.96,
1720.4, # 3027.55,
963.4, # 2574.46,
383.3, # 1720.4,
0, # 963.4,
0, # 383.3,
0, # 0,
] # 0,
# 0,
# ]
# Temperature Forecast (in degree C) # Temperature Forecast (in degree C)
temperature_forecast = [ temperature_forecast = [
@ -113,56 +116,60 @@ temperature_forecast = [
] ]
# Electricity Price (in Euro per Wh) # Electricity Price (in Euro per Wh)
strompreis_euro_pro_wh = [ strompreis_euro_pro_wh = np.full(48, 0.001)
0.0003384, strompreis_euro_pro_wh [0:10] = 0.00001
0.0003318, strompreis_euro_pro_wh [11:15] = 0.00005
0.0003284, strompreis_euro_pro_wh [20] = 0.00001
0.0003283, # [
0.0003289, # 0.0000384,
0.0003334, # 0.0000318,
0.0003290, # 0.0000284,
0.0003302, # 0.0008283,
0.0003042, # 0.0008289,
0.0002430, # 0.0008334,
0.0002280, # 0.0008290,
0.0002212, # 0.0003302,
0.0002093, # 0.0003042,
0.0001879, # 0.0002430,
0.0001838, # 0.0002280,
0.0002004, # 0.0002212,
0.0002198, # 0.0002093,
0.0002270, # 0.0001879,
0.0002997, # 0.0001838,
0.0003195, # 0.0002004,
0.0003081, # 0.0002198,
0.0002969, # 0.0002270,
0.0002921, # 0.0002997,
0.0002780, # 0.0003195,
0.0003384, # 0.0003081,
0.0003318, # 0.0002969,
0.0003284, # 0.0002921,
0.0003283, # 0.0002780,
0.0003289, # 0.0003384,
0.0003334, # 0.0003318,
0.0003290, # 0.0003284,
0.0003302, # 0.0003283,
0.0003042, # 0.0003289,
0.0002430, # 0.0003334,
0.0002280, # 0.0003290,
0.0002212, # 0.0003302,
0.0002093, # 0.0003042,
0.0001879, # 0.0002430,
0.0001838, # 0.0002280,
0.0002004, # 0.0002212,
0.0002198, # 0.0002093,
0.0002270, # 0.0001879,
0.0002997, # 0.0001838,
0.0003195, # 0.0002004,
0.0003081, # 0.0002198,
0.0002969, # 0.0002270,
0.0002921, # 0.0002997,
0.0002780, # 0.0003195,
] # 0.0003081,
# 0.0002969,
# 0.0002921,
# 0.0002780,
# ]
# Overall System Load (in W) # Overall System Load (in W)
gesamtlast = [ gesamtlast = [
@ -221,10 +228,10 @@ start_solution = None
# Define parameters for the optimization problem # Define parameters for the optimization problem
parameter = { parameter = {
# Cost of storing energy in battery (per Wh) # Value of energy in battery (per Wh)
"preis_euro_pro_wh_akku": 10e-05, "preis_euro_pro_wh_akku": 0e-05,
# Initial state of charge (SOC) of PV battery (%) # Initial state of charge (SOC) of PV battery (%)
"pv_soc": 80, "pv_soc": 15,
# Battery capacity (in Wh) # Battery capacity (in Wh)
"pv_akku_cap": 26400, "pv_akku_cap": 26400,
# Yearly energy consumption (in Wh) # Yearly energy consumption (in Wh)
@ -242,7 +249,7 @@ parameter = {
# Electricity price forecast (48 hours) # Electricity price forecast (48 hours)
"strompreis_euro_pro_wh": strompreis_euro_pro_wh, "strompreis_euro_pro_wh": strompreis_euro_pro_wh,
# Minimum SOC for electric car # Minimum SOC for electric car
"eauto_min_soc": 20, "eauto_min_soc": 50,
# Electric car battery capacity (Wh) # Electric car battery capacity (Wh)
"eauto_cap": 60000, "eauto_cap": 60000,
# Charging efficiency of the electric car # Charging efficiency of the electric car
@ -250,7 +257,7 @@ parameter = {
# Charging power of the electric car (W) # Charging power of the electric car (W)
"eauto_charge_power": 11040, "eauto_charge_power": 11040,
# Current SOC of the electric car (%) # Current SOC of the electric car (%)
"eauto_soc": 5, "eauto_soc": 15,
# Current PV power generation (W) # Current PV power generation (W)
"pvpowernow": 211.137503624, "pvpowernow": 211.137503624,
# Initial solution for the optimization # Initial solution for the optimization
@ -258,7 +265,7 @@ parameter = {
# Household appliance consumption (Wh) # Household appliance consumption (Wh)
"haushaltsgeraet_wh": 5000, "haushaltsgeraet_wh": 5000,
# Duration of appliance usage (hours) # Duration of appliance usage (hours)
"haushaltsgeraet_dauer": 2, "haushaltsgeraet_dauer": 0,
# Minimum Soc PV Battery # Minimum Soc PV Battery
"min_soc_prozent": 15, "min_soc_prozent": 15,
} }
@ -282,8 +289,25 @@ elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time:.4f} seconds") print(f"Elapsed time: {elapsed_time:.4f} seconds")
# Print or visualize the result
# pprint(ergebnis)
#json_data = json.dumps(ergebnis) ac_charge, dc_charge, discharge = (ergebnis["ac_charge"],ergebnis["dc_charge"],ergebnis["discharge_allowed"])
#print(json_data)
visualisiere_ergebnisse(
gesamtlast,
pv_forecast,
strompreis_euro_pro_wh,
ergebnis["result"],
ac_charge,
dc_charge,
discharge,
temperature_forecast,
start_hour,
48,
np.full(48, parameter["einspeiseverguetung_euro_pro_wh"]),
filename="visualization_results.pdf",
extra_data=None,
)
json_data = NumpyEncoder.dumps(ergebnis)
print(json_data)

View File

@ -20,7 +20,7 @@ class PVAkku:
self.soc_wh = (start_soc_prozent / 100) * kapazitaet_wh self.soc_wh = (start_soc_prozent / 100) * kapazitaet_wh
self.hours = hours if hours is not None else 24 # Default to 24 hours if not specified self.hours = hours if hours is not None else 24 # Default to 24 hours if not specified
self.discharge_array = np.full(self.hours, 1) self.discharge_array = np.full(self.hours, 1)
self.charge_array = np.full(self.hours, 0) self.charge_array = np.full(self.hours, 1)
# Charge and discharge efficiency # Charge and discharge efficiency
self.lade_effizienz = lade_effizienz self.lade_effizienz = lade_effizienz
self.entlade_effizienz = entlade_effizienz self.entlade_effizienz = entlade_effizienz
@ -70,26 +70,19 @@ class PVAkku:
self.soc_wh = min(max(self.soc_wh, self.min_soc_wh), self.max_soc_wh) self.soc_wh = min(max(self.soc_wh, self.min_soc_wh), self.max_soc_wh)
self.discharge_array = np.full(self.hours, 1) self.discharge_array = np.full(self.hours, 1)
self.charge_array = np.full(self.hours, 0) self.charge_array = np.full(self.hours, 1)
def set_discharge_per_hour(self, discharge_array): def set_discharge_per_hour(self, discharge_array):
assert len(discharge_array) == self.hours assert len(discharge_array) == self.hours
self.discharge_array = np.array(discharge_array) self.discharge_array = np.array(discharge_array)
# Ensure no simultaneous charging and discharging in the same hour using NumPy mask
conflict_mask = (self.charge_array > 0) & (self.discharge_array > 0)
# Prioritize discharge by setting charge to 0 where both are > 0
self.charge_array[conflict_mask] = 0
def set_charge_per_hour(self, charge_array): def set_charge_per_hour(self, charge_array):
assert len(charge_array) == self.hours assert len(charge_array) == self.hours
self.charge_array = np.array(charge_array) self.charge_array = np.array(charge_array)
# Ensure no simultaneous charging and discharging in the same hour using NumPy mask def set_charge_allowed_for_hour(self, charge, hour):
conflict_mask = (self.charge_array > 0) & (self.discharge_array > 0) assert hour < self.hours
# Prioritize discharge by setting charge to 0 where both are > 0 self.charge_array[hour] = charge
self.discharge_array[conflict_mask] = 0
def ladezustand_in_prozent(self): def ladezustand_in_prozent(self):
return (self.soc_wh / self.kapazitaet_wh) * 100 return (self.soc_wh / self.kapazitaet_wh) * 100
@ -125,17 +118,14 @@ class PVAkku:
# Return the actually discharged energy and the losses # Return the actually discharged energy and the losses
return tatsaechlich_abgegeben_wh, verluste_wh return tatsaechlich_abgegeben_wh, verluste_wh
def energie_laden(self, wh, hour): def energie_laden(self, wh, hour, relative_power=0.0):
if hour is not None and self.charge_array[hour] == 0: if hour is not None and self.charge_array[hour] == 0:
return 0, 0 # Charging not allowed in this hour return 0, 0 # Charging not allowed in this hour
if relative_power > 0.0:
wh=self.max_ladeleistung_w*relative_power
# If no value for wh is given, use the maximum charging power # If no value for wh is given, use the maximum charging power
wh = wh if wh is not None else self.max_ladeleistung_w wh = wh if wh is not None else self.max_ladeleistung_w
# Relative to the maximum charging power (between 0 and 1)
relative_ladeleistung = self.charge_array[hour]
effektive_ladeleistung = relative_ladeleistung * self.max_ladeleistung_w
# Calculate the maximum energy that can be charged considering max_soc and efficiency # Calculate the maximum energy that can be charged considering max_soc and efficiency
if self.lade_effizienz > 0: if self.lade_effizienz > 0:
max_possible_charge_wh = (self.max_soc_wh - self.soc_wh) / self.lade_effizienz max_possible_charge_wh = (self.max_soc_wh - self.soc_wh) / self.lade_effizienz
@ -144,8 +134,8 @@ class PVAkku:
max_possible_charge_wh = max(max_possible_charge_wh, 0.0) # Ensure non-negative max_possible_charge_wh = max(max_possible_charge_wh, 0.0) # Ensure non-negative
# The actually charged energy cannot exceed requested energy, charging power, or maximum possible charge # The actually charged energy cannot exceed requested energy, charging power, or maximum possible charge
effektive_lademenge = min(wh, effektive_ladeleistung, max_possible_charge_wh) effektive_lademenge = min(wh, max_possible_charge_wh)
# Energy actually stored in the battery # Energy actually stored in the battery
geladene_menge = effektive_lademenge * self.lade_effizienz geladene_menge = effektive_lademenge * self.lade_effizienz
@ -153,10 +143,9 @@ class PVAkku:
self.soc_wh += geladene_menge self.soc_wh += geladene_menge
# Ensure soc_wh does not exceed max_soc_wh # Ensure soc_wh does not exceed max_soc_wh
self.soc_wh = min(self.soc_wh, self.max_soc_wh) self.soc_wh = min(self.soc_wh, self.max_soc_wh)
# Calculate losses # Calculate losses
verluste_wh = effektive_lademenge - geladene_menge verluste_wh = effektive_lademenge - geladene_menge
return geladene_menge, verluste_wh return geladene_menge, verluste_wh
def aktueller_energieinhalt(self): def aktueller_energieinhalt(self):

View File

@ -1,6 +1,6 @@
from datetime import datetime from datetime import datetime
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Union
from akkudoktoreos.config import *
import numpy as np import numpy as np
@ -23,12 +23,17 @@ class EnergieManagementSystem:
self.eauto = eauto self.eauto = eauto
self.haushaltsgeraet = haushaltsgeraet self.haushaltsgeraet = haushaltsgeraet
self.wechselrichter = wechselrichter self.wechselrichter = wechselrichter
self.ac_charge_hours = np.full(prediction_hours,0)
self.dc_charge_hours = np.full(prediction_hours,1)
def set_akku_discharge_hours(self, ds: List[int]) -> None: def set_akku_discharge_hours(self, ds: List[int]) -> None:
self.akku.set_discharge_per_hour(ds) self.akku.set_discharge_per_hour(ds)
def set_akku_charge_hours(self, ds: List[int]) -> None: def set_akku_ac_charge_hours(self, ds: np.ndarray) -> None:
self.akku.set_charge_per_hour(ds) self.ac_charge_hours = ds
def set_akku_dc_charge_hours(self, ds: np.ndarray) -> None:
self.dc_charge_hours = ds
def set_eauto_charge_hours(self, ds: List[int]) -> None: def set_eauto_charge_hours(self, ds: List[int]) -> None:
self.eauto.set_charge_per_hour(ds) self.eauto.set_charge_per_hour(ds)
@ -46,7 +51,12 @@ class EnergieManagementSystem:
return self.simuliere(start_stunde) return self.simuliere(start_stunde)
def simuliere(self, start_stunde: int) -> dict: def simuliere(self, start_stunde: int) -> dict:
# Ensure arrays have the same length '''
hour:
akku_soc_pro_stunde begin of the hour, initial hour state!
last_wh_pro_stunde integral of last hour (end state)
'''
lastkurve_wh = self.gesamtlast lastkurve_wh = self.gesamtlast
assert ( assert (
len(lastkurve_wh) == len(self.pv_prognose_wh) == len(self.strompreis_euro_pro_wh) len(lastkurve_wh) == len(self.pv_prognose_wh) == len(self.strompreis_euro_pro_wh)
@ -91,16 +101,21 @@ class EnergieManagementSystem:
eauto_soc_pro_stunde[stunde_since_now] = self.eauto.ladezustand_in_prozent() eauto_soc_pro_stunde[stunde_since_now] = self.eauto.ladezustand_in_prozent()
# AC PV Battery Charge # AC PV Battery Charge
if self.akku.charge_array[stunde] > 0.0: if self.ac_charge_hours[stunde] > 0.0:
geladene_menge, verluste_wh = self.akku.energie_laden(None,stunde) self.akku.set_charge_allowed_for_hour(self.ac_charge_hours[stunde],stunde)
geladene_menge, verluste_wh = self.akku.energie_laden(None,stunde,relative_power=self.ac_charge_hours[stunde])
verbrauch += geladene_menge verbrauch += geladene_menge
verluste_wh_pro_stunde[stunde_since_now] += verluste_wh verluste_wh_pro_stunde[stunde_since_now] += verluste_wh
# Process inverter logic # Process inverter logic
erzeugung = self.pv_prognose_wh[stunde] erzeugung = self.pv_prognose_wh[stunde]
self.akku.set_charge_allowed_for_hour(self.dc_charge_hours[stunde],stunde)
netzeinspeisung, netzbezug, verluste, eigenverbrauch = ( netzeinspeisung, netzbezug, verluste, eigenverbrauch = (
self.wechselrichter.energie_verarbeiten(erzeugung, verbrauch, stunde) self.wechselrichter.energie_verarbeiten(erzeugung, verbrauch, stunde)
) )
netzeinspeisung_wh_pro_stunde[stunde_since_now] = netzeinspeisung netzeinspeisung_wh_pro_stunde[stunde_since_now] = netzeinspeisung
netzbezug_wh_pro_stunde[stunde_since_now] = netzbezug netzbezug_wh_pro_stunde[stunde_since_now] = netzbezug
verluste_wh_pro_stunde[stunde_since_now] += verluste verluste_wh_pro_stunde[stunde_since_now] += verluste
@ -136,4 +151,5 @@ class EnergieManagementSystem:
"Gesamt_Verluste": np.nansum(verluste_wh_pro_stunde), "Gesamt_Verluste": np.nansum(verluste_wh_pro_stunde),
"Haushaltsgeraet_wh_pro_stunde": haushaltsgeraet_wh_pro_stunde, "Haushaltsgeraet_wh_pro_stunde": haushaltsgeraet_wh_pro_stunde,
} }
return out return out

View File

@ -0,0 +1,24 @@
import json
import numpy as np
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist() # Convert NumPy arrays to lists
if isinstance(obj, np.generic):
return obj.item() # Convert NumPy scalars to native Python types
return super(NumpyEncoder, self).default(obj)
@staticmethod
def dumps(data):
"""
Static method to serialize a Python object into a JSON string using NumpyEncoder.
Args:
data: The Python object to serialize.
Returns:
str: A JSON string representation of the object.
"""
return json.dumps(data, cls=NumpyEncoder)

View File

@ -12,6 +12,7 @@ from akkudoktoreos.config import possible_ev_charge_currents
from akkudoktoreos.visualize import visualisiere_ergebnisse from akkudoktoreos.visualize import visualisiere_ergebnisse
class optimization_problem: class optimization_problem:
def __init__( def __init__(
self, self,
@ -35,53 +36,104 @@ class optimization_problem:
if fixed_seed is not None: if fixed_seed is not None:
random.seed(fixed_seed) random.seed(fixed_seed)
def split_charge_discharge(self, discharge_hours_bin: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
def decode_charge_discharge(self, discharge_hours_bin: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
""" """
Split the input array `discharge_hours_bin` into two separate arrays: Decode the input array `discharge_hours_bin` into three separate arrays for AC charging, DC charging, and discharge.
- `charge`: Contains only the negative values from `discharge_hours_bin` (charging values). The function maps AC and DC charging values to relative power levels (0 to 1), while the discharge remains binary (0 or 1).
- `discharge`: Contains only the positive values from `discharge_hours_bin` (discharging values).
Parameters: Parameters:
- discharge_hours_bin (np.ndarray): Input array with both positive and negative values. - discharge_hours_bin (np.ndarray): Input array with integer values representing the different states.
The states are:
0: No action ("idle")
1: Discharge ("discharge")
2-6: AC charging with different power levels ("ac_charge")
7-8: DC charging Dissallowed/allowed ("dc_charge")
Returns: Returns:
- charge (np.ndarray): Array with negative values from `discharge_hours_bin`, other values set to 0. - ac_charge (np.ndarray): Array with AC charging values as relative power (0-1), other values set to 0.
- discharge (np.ndarray): Array with positive values from `discharge_hours_bin`, other values set to 0. - dc_charge (np.ndarray): Array with DC charging values as relative power (0-1), other values set to 0.
- discharge (np.ndarray): Array with discharge values (1 for discharge, 0 otherwise).
""" """
# Convert the input list to a NumPy array, if it's not already # Convert the input list to a NumPy array, if it's not already
discharge_hours_bin = np.array(discharge_hours_bin) discharge_hours_bin = np.array(discharge_hours_bin)
# Create charge array: Keep only negative values, set the rest to 0
charge = -np.where(discharge_hours_bin < 0, discharge_hours_bin, 0)
charge = charge / np.max(charge)
# Create discharge array: Keep only positive values, set the rest to 0 # Create ac_charge array: Only consider values between 2 and 6 (AC charging power levels), set the rest to 0
discharge = np.where(discharge_hours_bin > 0, discharge_hours_bin, 0) ac_charge = np.where((discharge_hours_bin >= 2) & (discharge_hours_bin <= 6), discharge_hours_bin - 1, 0)
ac_charge = ac_charge / 5.0 # Normalize AC charge to range between 0 and 1
# Create dc_charge array: 7 = Not allowed (mapped to 0), 8 = Allowed (mapped to 1)
dc_charge = np.where(discharge_hours_bin == 8, 1, 0)
# Create discharge array: Only consider value 1 (Discharge), set the rest to 0 (binary output)
discharge = np.where(discharge_hours_bin == 1, 1, 0)
return ac_charge, dc_charge, discharge
return charge, discharge
# Custom mutation function that applies type-specific mutations # Custom mutation function that applies type-specific mutations
def mutate(self,individual): def mutate(self, individual):
# Mutate the discharge state genes (-1, 0, 1) """
individual[:self.prediction_hours], = self.toolbox.mutate_discharge( Custom mutation function for the individual. This function mutates different parts of the individual:
individual[:self.prediction_hours] - Mutates the discharge and charge states (AC, DC, idle) using the split_charge_discharge method.
) - Mutates the EV charging schedule if EV optimization is enabled.
- Mutates appliance start times if household appliances are part of the optimization.
Parameters:
- individual (list): The individual being mutated, which includes different optimization parameters.
Returns:
- (tuple): The mutated individual as a tuple (required by DEAP).
"""
# Step 1: Mutate the charge/discharge states (idle, discharge, AC charge, DC charge)
# Extract the relevant part of the individual for prediction hours, which represents the charge/discharge behavior.
charge_discharge_part = individual[:self.prediction_hours]
# Apply the mutation to the charge/discharge part
charge_discharge_mutated, = self.toolbox.mutate_charge_discharge(charge_discharge_part)
# Ensure that no invalid states are introduced during mutation (valid values: 0-8)
charge_discharge_mutated = np.clip(charge_discharge_mutated, 0, 8)
# Use split_charge_discharge to split the mutated array into AC charge, DC charge, and discharge components
#ac_charge, dc_charge, discharge = self.split_charge_discharge(charge_discharge_mutated)
# Optionally: You can process the split arrays further if needed, for example,
# applying additional constraints or penalties, or keeping track of charging limits.
# Reassign the mutated values back to the individual
individual[:self.prediction_hours] = charge_discharge_mutated
# Step 2: Mutate EV charging schedule if enabled
if self.optimize_ev: if self.optimize_ev:
# Mutate the EV charging indices # Extract the relevant part for EV charging schedule
ev_charge_part = individual[self.prediction_hours : self.prediction_hours * 2] ev_charge_part = individual[self.prediction_hours : self.prediction_hours * 2]
# Apply mutation on the EV charging schedule
ev_charge_part_mutated, = self.toolbox.mutate_ev_charge_index(ev_charge_part) ev_charge_part_mutated, = self.toolbox.mutate_ev_charge_index(ev_charge_part)
# Ensure the EV does not charge during fixed hours (set those hours to 0)
ev_charge_part_mutated[self.prediction_hours - self.fixed_eauto_hours :] = [0] * self.fixed_eauto_hours ev_charge_part_mutated[self.prediction_hours - self.fixed_eauto_hours :] = [0] * self.fixed_eauto_hours
# Reassign the mutated EV charging part back to the individual
individual[self.prediction_hours : self.prediction_hours * 2] = ev_charge_part_mutated individual[self.prediction_hours : self.prediction_hours * 2] = ev_charge_part_mutated
# Mutate the appliance start hour if present # Step 3: Mutate appliance start times if household appliances are part of the optimization
if self.opti_param["haushaltsgeraete"] > 0: if self.opti_param["haushaltsgeraete"] > 0:
# Extract the appliance part (typically a single value for the start hour)
appliance_part = [individual[-1]] appliance_part = [individual[-1]]
# Apply mutation on the appliance start hour
appliance_part_mutated, = self.toolbox.mutate_hour(appliance_part) appliance_part_mutated, = self.toolbox.mutate_hour(appliance_part)
# Reassign the mutated appliance part back to the individual
individual[-1] = appliance_part_mutated[0] individual[-1] = appliance_part_mutated[0]
return (individual,) return (individual,)
# Method to create an individual based on the conditions # Method to create an individual based on the conditions
def create_individual(self): def create_individual(self):
# Start with discharge states for the individual # Start with discharge states for the individual
@ -106,8 +158,14 @@ class optimization_problem:
2. Electric vehicle charge hours (possible_charge_values), 2. Electric vehicle charge hours (possible_charge_values),
3. Dishwasher start time (integer if applicable). 3. Dishwasher start time (integer if applicable).
""" """
discharge_hours_bin = individual[: self.prediction_hours] discharge_hours_bin = individual[: self.prediction_hours]
eautocharge_hours_float = individual[self.prediction_hours : self.prediction_hours * 2] eautocharge_hours_float = (
individual[self.prediction_hours : self.prediction_hours * 2]
if self.optimize_ev
else None
)
spuelstart_int = ( spuelstart_int = (
individual[-1] individual[-1]
if self.opti_param and self.opti_param.get("haushaltsgeraete", 0) > 0 if self.opti_param and self.opti_param.get("haushaltsgeraete", 0) > 0
@ -132,13 +190,12 @@ class optimization_problem:
# Initialize toolbox with attributes and operations # Initialize toolbox with attributes and operations
self.toolbox = base.Toolbox() self.toolbox = base.Toolbox()
self.toolbox.register("attr_discharge_state", random.randint, -5, 1) self.toolbox.register("attr_discharge_state", random.randint, 0,11)
if self.optimize_ev: if self.optimize_ev:
self.toolbox.register("attr_ev_charge_index", random.randint, 0, len(possible_ev_charge_currents) - 1) self.toolbox.register("attr_ev_charge_index", random.randint, 0, len(possible_ev_charge_currents) - 1)
self.toolbox.register("attr_int", random.randint, start_hour, 23) self.toolbox.register("attr_int", random.randint, start_hour, 23)
# Register individual creation function # Register individual creation function
self.toolbox.register("individual", self.create_individual) self.toolbox.register("individual", self.create_individual)
@ -148,7 +205,7 @@ class optimization_problem:
#self.toolbox.register("mutate", tools.mutFlipBit, indpb=0.1) #self.toolbox.register("mutate", tools.mutFlipBit, indpb=0.1)
# Register separate mutation functions for each type of value: # Register separate mutation functions for each type of value:
# - Discharge state mutation (-5, 0, 1) # - Discharge state mutation (-5, 0, 1)
self.toolbox.register("mutate_discharge", tools.mutUniformInt, low=-5, up=1, indpb=0.1) self.toolbox.register("mutate_charge_discharge", tools.mutUniformInt, low=0, up=8, indpb=0.1)
# - Float mutation for EV charging values # - Float mutation for EV charging values
self.toolbox.register("mutate_ev_charge_index", tools.mutUniformInt, low=0, up=len(possible_ev_charge_currents) - 1, indpb=0.1) self.toolbox.register("mutate_ev_charge_index", tools.mutUniformInt, low=0, up=len(possible_ev_charge_currents) - 1, indpb=0.1)
# - Start hour mutation for household devices # - Start hour mutation for household devices
@ -173,11 +230,12 @@ class optimization_problem:
if self.opti_param.get("haushaltsgeraete", 0) > 0: if self.opti_param.get("haushaltsgeraete", 0) > 0:
ems.set_haushaltsgeraet_start(spuelstart_int, global_start_hour=start_hour) ems.set_haushaltsgeraet_start(spuelstart_int, global_start_hour=start_hour)
charge, discharge = self.split_charge_discharge(discharge_hours_bin) ac,dc,discharge = self.decode_charge_discharge(discharge_hours_bin)
ems.set_akku_discharge_hours(discharge) ems.set_akku_discharge_hours(discharge)
ems.set_akku_charge_hours(charge) ems.set_akku_dc_charge_hours(dc)
ems.set_akku_ac_charge_hours(ac)
if self.optimize_ev: if self.optimize_ev:
eautocharge_hours_float = [ eautocharge_hours_float = [
@ -212,13 +270,6 @@ class optimization_problem:
0.01 for i in range(self.prediction_hours) if discharge_hours_bin[i] == 0.0 0.01 for i in range(self.prediction_hours) if discharge_hours_bin[i] == 0.0
) )
# Penalty for charging the electric vehicle during restricted hours
# gesamtbilanz += sum(
# self.strafe
# for i in range(self.prediction_hours - self.fixed_eauto_hours, self.prediction_hours)
# if eautocharge_hours_float[i] != 0.0
# )
# Penalty for not meeting the minimum SOC (State of Charge) requirement # Penalty for not meeting the minimum SOC (State of Charge) requirement
if parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent() <= 0.0: if parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent() <= 0.0:
gesamtbilanz += sum( gesamtbilanz += sum(
@ -266,8 +317,8 @@ class optimization_problem:
self.toolbox, self.toolbox,
mu=100, mu=100,
lambda_=150, lambda_=150,
cxpb=0.5, cxpb=0.7,
mutpb=0.5, mutpb=0.3,
ngen=ngen, ngen=ngen,
stats=stats, stats=stats,
halloffame=hof, halloffame=hof,
@ -361,14 +412,18 @@ class optimization_problem:
start_solution start_solution
) )
ac_charge, dc_charge, discharge = self.decode_charge_discharge(discharge_hours_bin)
# Visualize the results # Visualize the results
visualisiere_ergebnisse( visualisiere_ergebnisse(
parameter["gesamtlast"], parameter["gesamtlast"],
parameter["pv_forecast"], parameter["pv_forecast"],
parameter["strompreis_euro_pro_wh"], parameter["strompreis_euro_pro_wh"],
o, o,
discharge_hours_bin, ac_charge,
eautocharge_hours_float, dc_charge,
discharge,
parameter["temperature_forecast"], parameter["temperature_forecast"],
start_hour, start_hour,
self.prediction_hours, self.prediction_hours,
@ -395,7 +450,7 @@ class optimization_problem:
element_list = o[key].tolist() element_list = o[key].tolist()
# Change the first value to None # Change the first value to None
element_list[0] = None #element_list[0] = None
# Change the NaN to None (JSON) # Change the NaN to None (JSON)
element_list = [ element_list = [
None if isinstance(x, (int, float)) and np.isnan(x) else x for x in element_list None if isinstance(x, (int, float)) and np.isnan(x) else x for x in element_list
@ -406,7 +461,9 @@ class optimization_problem:
# Return final results as a dictionary # Return final results as a dictionary
return { return {
"discharge_hours_bin": discharge_hours_bin, "ac_charge": ac_charge.tolist(),
"dc_charge":dc_charge.tolist(),
"discharge_allowed":discharge.tolist(),
"eautocharge_hours_float": eautocharge_hours_float, "eautocharge_hours_float": eautocharge_hours_float,
"result": o, "result": o,
"eauto_obj": ems.eauto.to_dict(), "eauto_obj": ems.eauto.to_dict(),

View File

@ -18,8 +18,9 @@ def visualisiere_ergebnisse(
pv_forecast, pv_forecast,
strompreise, strompreise,
ergebnisse, ergebnisse,
discharge_hours, ac, # AC charging allowed
laden_moeglich, dc, # DC charging allowed
discharge, # Discharge allowed
temperature, temperature,
start_hour, start_hour,
prediction_hours, prediction_hours,
@ -58,21 +59,7 @@ def visualisiere_ergebnisse(
plt.grid(True) plt.grid(True)
plt.legend() plt.legend()
# Electricity prices
hours_p = np.arange(0, len(strompreise))
plt.subplot(3, 2, 2)
plt.plot(
hours_p,
strompreise,
label="Electricity Price (€/Wh)",
color="purple",
marker="s",
)
plt.title("Electricity Prices")
plt.xlabel("Hour of the Day")
plt.ylabel("Price (€/Wh)")
plt.legend()
plt.grid(True)
# PV forecast # PV forecast
plt.subplot(3, 2, 3) plt.subplot(3, 2, 3)
@ -122,30 +109,48 @@ def visualisiere_ergebnisse(
# Energy flow, grid feed-in, and grid consumption # Energy flow, grid feed-in, and grid consumption
plt.subplot(3, 2, 1) plt.subplot(3, 2, 1)
plt.plot(hours, ergebnisse["Last_Wh_pro_Stunde"], label="Load (Wh)", marker="o") # Plot with transparency (alpha) and different linestyles
plt.plot( plt.plot(
hours, hours, ergebnisse["Last_Wh_pro_Stunde"], label="Load (Wh)", marker="o", linestyle="-", alpha=0.8
ergebnisse["Haushaltsgeraet_wh_pro_stunde"],
label="Household Device (Wh)",
marker="o",
) )
plt.plot( plt.plot(
hours, hours, ergebnisse["Haushaltsgeraet_wh_pro_stunde"], label="Household Device (Wh)", marker="o", linestyle="--", alpha=0.8
ergebnisse["Netzeinspeisung_Wh_pro_Stunde"],
label="Grid Feed-in (Wh)",
marker="x",
) )
plt.plot( plt.plot(
hours, hours, ergebnisse["Netzeinspeisung_Wh_pro_Stunde"], label="Grid Feed-in (Wh)", marker="x", linestyle=":", alpha=0.8
ergebnisse["Netzbezug_Wh_pro_Stunde"],
label="Grid Consumption (Wh)",
marker="^",
) )
plt.plot(hours, ergebnisse["Verluste_Pro_Stunde"], label="Losses (Wh)", marker="^") plt.plot(
hours, ergebnisse["Netzbezug_Wh_pro_Stunde"], label="Grid Consumption (Wh)", marker="^", linestyle="-.", alpha=0.8
)
plt.plot(
hours, ergebnisse["Verluste_Pro_Stunde"], label="Losses (Wh)", marker="^", linestyle="-", alpha=0.8
)
# Title and labels
plt.title("Energy Flow per Hour") plt.title("Energy Flow per Hour")
plt.xlabel("Hour") plt.xlabel("Hour")
plt.ylabel("Energy (Wh)") plt.ylabel("Energy (Wh)")
# Show legend with a higher number of columns to avoid overlap
plt.legend(ncol=2)
# Electricity prices
hours_p = np.arange(0, len(strompreise))
plt.subplot(3, 2, 3)
plt.plot(
hours_p,
strompreise,
label="Electricity Price (€/Wh)",
color="purple",
marker="s",
)
plt.title("Electricity Prices")
plt.xlabel("Hour of the Day")
plt.ylabel("Price (€/Wh)")
plt.legend() plt.legend()
plt.grid(True)
# State of charge for batteries # State of charge for batteries
plt.subplot(3, 2, 2) plt.subplot(3, 2, 2)
@ -159,42 +164,30 @@ def visualisiere_ergebnisse(
plt.legend(loc="upper left", bbox_to_anchor=(1, 1)) # Place legend outside the plot plt.legend(loc="upper left", bbox_to_anchor=(1, 1)) # Place legend outside the plot
plt.grid(True, which="both", axis="x") # Grid for every hour plt.grid(True, which="both", axis="x") # Grid for every hour
ax1 = plt.subplot(3, 2, 3) # Plot for AC, DC charging, and Discharge status using bar charts
# Plot charge and discharge values ax1 = plt.subplot(3, 2, 5)
for hour, value in enumerate(discharge_hours):
# Determine color and label based on the value
if value > 0: # Positive values (discharge)
color = "red"
label = "Discharge" if hour == 0 else ""
elif value < 0: # Negative values (charge)
color = "blue"
label = "Charge" if hour == 0 else ""
else: # Plot AC charging as bars (relative values between 0 and 1)
continue # Skip zero values plt.bar(hours, ac, width=0.4, label="AC Charging (relative)", color="blue", alpha=0.6)
# Create colored areas with `axvspan` # Plot DC charging as bars (relative values between 0 and 1)
ax1.axvspan( plt.bar(hours + 0.4, dc, width=0.4, label="DC Charging (relative)", color="green", alpha=0.6)
hour, # Start of the hour
hour + 1, # End of the hour
ymin=0, # Lower bound
ymax=abs(value) / 5 if value < 0 else value, # Adjust height based on the value
color=color,
alpha=0.3,
label=label
)
# Plot Discharge as bars (0 or 1, binary values)
plt.bar(hours, discharge, width=0.4, label="Discharge Allowed", color="red", alpha=0.6, bottom=np.maximum(ac, dc))
# Configure the plot # Configure the plot
ax1.legend(loc="upper left") ax1.legend(loc="upper left")
ax1.set_xlim(0, prediction_hours) ax1.set_xlim(0, prediction_hours)
ax1.set_xlabel("Hour") ax1.set_xlabel("Hour")
ax1.set_ylabel("Charge/Discharge Level") ax1.set_ylabel("Relative Power (0-1) / Discharge (0 or 1)")
ax1.set_title("Charge and Discharge Hours Overview") ax1.set_title("AC/DC Charging and Discharge Overview")
ax1.grid(True) ax1.grid(True)
pdf.savefig() # Save the current figure state to the PDF pdf.savefig() # Save the current figure state to the PDF
plt.close() # Close the current figure to free up memory plt.close() # Close the current figure to free up memory
@ -219,7 +212,6 @@ def visualisiere_ergebnisse(
) )
# Annotate costs # Annotate costs
for hour, value in enumerate(costs): for hour, value in enumerate(costs):
print(hour, " ", value)
if value == None or np.isnan(value): if value == None or np.isnan(value):
value=0 value=0
axs[0].annotate(f'{value:.2f}', (hour, value), textcoords="offset points", xytext=(0,5), ha='center', fontsize=8, color='red') axs[0].annotate(f'{value:.2f}', (hour, value), textcoords="offset points", xytext=(0,5), ha='center', fontsize=8, color='red')

View File

@ -5,6 +5,7 @@ from akkudoktoreos.class_akku import PVAkku
from akkudoktoreos.class_ems import EnergieManagementSystem from akkudoktoreos.class_ems import EnergieManagementSystem
from akkudoktoreos.class_haushaltsgeraet import Haushaltsgeraet from akkudoktoreos.class_haushaltsgeraet import Haushaltsgeraet
from akkudoktoreos.class_inverter import Wechselrichter # Example import from akkudoktoreos.class_inverter import Wechselrichter # Example import
from akkudoktoreos.visualize import *
prediction_hours = 48 prediction_hours = 48
optimization_hours = 24 optimization_hours = 24
@ -32,7 +33,7 @@ def create_ems_instance():
# Example initialization of electric car battery # Example initialization of electric car battery
eauto = PVAkku(kapazitaet_wh=26400, start_soc_prozent=10, hours=48, min_soc_prozent=10) eauto = PVAkku(kapazitaet_wh=26400, start_soc_prozent=10, hours=48, min_soc_prozent=10)
eauto.set_charge_per_hour(np.full(48,1))
# Parameters based on previous example data # Parameters based on previous example data
pv_prognose_wh = [ pv_prognose_wh = [
0, 0,
@ -199,6 +200,9 @@ def create_ems_instance():
haushaltsgeraet=home_appliance, haushaltsgeraet=home_appliance,
wechselrichter=wechselrichter, wechselrichter=wechselrichter,
) )
return ems return ems
@ -212,6 +216,21 @@ def test_simulation(create_ems_instance):
result = ems.simuliere(start_stunde=start_hour) result = ems.simuliere(start_stunde=start_hour)
visualisiere_ergebnisse(
ems.gesamtlast,
ems.pv_prognose_wh,
ems.strompreis_euro_pro_wh,
result,
ems.akku.discharge_array+ems.akku.charge_array,
None,
ems.pv_prognose_wh,
start_hour,
48,
np.full(48, 0.0),
filename="visualization_results.pdf",
extra_data=None,
)
# Assertions to validate results # Assertions to validate results
assert result is not None, "Result should not be None" assert result is not None, "Result should not be None"
assert isinstance(result, dict), "Result should be a dictionary" assert isinstance(result, dict), "Result should be a dictionary"

224
tests/test_class_ems_2.py Normal file
View File

@ -0,0 +1,224 @@
import numpy as np
import pytest
from akkudoktoreos.visualize import *
from akkudoktoreos.class_akku import PVAkku
from akkudoktoreos.class_ems import EnergieManagementSystem
from akkudoktoreos.class_haushaltsgeraet import Haushaltsgeraet
from akkudoktoreos.class_inverter import Wechselrichter # Example import
prediction_hours = 48
optimization_hours = 24
start_hour = 0
# Example initialization of necessary components
@pytest.fixture
def create_ems_instance():
"""
Fixture to create an EnergieManagementSystem instance with given test parameters.
"""
# Initialize the battery and the inverter
akku = PVAkku(kapazitaet_wh=5000, start_soc_prozent=80, hours=48, min_soc_prozent=10)
akku.reset()
wechselrichter = Wechselrichter(10000, akku)
# Household device (currently not used, set to None)
home_appliance = Haushaltsgeraet(
hours=prediction_hours,
verbrauch_wh=2000,
dauer_h=2,
)
home_appliance.set_startzeitpunkt(2)
# Example initialization of electric car battery
eauto = PVAkku(kapazitaet_wh=26400, start_soc_prozent=100, hours=48, min_soc_prozent=100)
# Parameters based on previous example data
pv_prognose_wh = np.full(prediction_hours, 0)
pv_prognose_wh[10] = 5000.0
pv_prognose_wh[11] = 5000.0
strompreis_euro_pro_wh = np.full(48, 0.001)
strompreis_euro_pro_wh [0:10] = 0.00001
strompreis_euro_pro_wh [11:15] = 0.00005
strompreis_euro_pro_wh [20] = 0.00001
einspeiseverguetung_euro_pro_wh = [0.00007] * len(strompreis_euro_pro_wh)
gesamtlast = [
676.71,
876.19,
527.13,
468.88,
531.38,
517.95,
483.15,
472.28,
1011.68,
995.00,
1053.07,
1063.91,
1320.56,
1132.03,
1163.67,
1176.82,
1216.22,
1103.78,
1129.12,
1178.71,
1050.98,
988.56,
912.38,
704.61,
516.37,
868.05,
694.34,
608.79,
556.31,
488.89,
506.91,
804.89,
1141.98,
1056.97,
992.46,
1155.99,
827.01,
1257.98,
1232.67,
871.26,
860.88,
1158.03,
1222.72,
1221.04,
949.99,
987.01,
733.99,
592.97,
]
# Initialize the energy management system with the respective parameters
ems = EnergieManagementSystem(
pv_prognose_wh=pv_prognose_wh,
strompreis_euro_pro_wh=strompreis_euro_pro_wh,
einspeiseverguetung_euro_pro_wh=einspeiseverguetung_euro_pro_wh,
eauto=eauto,
gesamtlast=gesamtlast,
haushaltsgeraet=home_appliance,
wechselrichter=wechselrichter,
)
ac= np.full(prediction_hours,0)
ac[20] = 1
ems.set_akku_ac_charge_hours(ac)
dc= np.full(prediction_hours,0)
dc[11] = 1
ems.set_akku_dc_charge_hours(dc)
return ems
def test_simulation(create_ems_instance):
"""
Test the EnergieManagementSystem simulation method.
"""
ems = create_ems_instance
# Simulate starting from hour 0 (this value can be adjusted)
result = ems.simuliere(start_stunde=start_hour)
# --- Pls do not remove! ---
# visualisiere_ergebnisse(
# ems.gesamtlast,
# ems.pv_prognose_wh,
# ems.strompreis_euro_pro_wh,
# result,
# ems.akku.discharge_array+ems.akku.charge_array,
# None,
# ems.pv_prognose_wh,
# start_hour,
# 48,
# np.full(48, 0.0),
# filename="visualization_results.pdf",
# extra_data=None,
# )
# Assertions to validate results
assert result is not None, "Result should not be None"
assert isinstance(result, dict), "Result should be a dictionary"
assert "Last_Wh_pro_Stunde" in result, "Result should contain 'Last_Wh_pro_Stunde'"
"""
Check the result of the simulation based on expected values.
"""
# Example result returned from the simulation (used for assertions)
assert result is not None, "Result should not be None."
# Check that the result is a dictionary
assert isinstance(result, dict), "Result should be a dictionary."
# Verify that the expected keys are present in the result
expected_keys = [
"Last_Wh_pro_Stunde",
"Netzeinspeisung_Wh_pro_Stunde",
"Netzbezug_Wh_pro_Stunde",
"Kosten_Euro_pro_Stunde",
"akku_soc_pro_stunde",
"Einnahmen_Euro_pro_Stunde",
"Gesamtbilanz_Euro",
"E-Auto_SoC_pro_Stunde",
"Gesamteinnahmen_Euro",
"Gesamtkosten_Euro",
"Verluste_Pro_Stunde",
"Gesamt_Verluste",
"Haushaltsgeraet_wh_pro_stunde",
]
for key in expected_keys:
assert key in result, f"The key '{key}' should be present in the result."
# Check the length of the main arrays
assert (
len(result["Last_Wh_pro_Stunde"]) == 48
), "The length of 'Last_Wh_pro_Stunde' should be 48."
assert (
len(result["Netzeinspeisung_Wh_pro_Stunde"]) == 48
), "The length of 'Netzeinspeisung_Wh_pro_Stunde' should be 48."
assert (
len(result["Netzbezug_Wh_pro_Stunde"]) == 48
), "The length of 'Netzbezug_Wh_pro_Stunde' should be 48."
assert (
len(result["Kosten_Euro_pro_Stunde"]) == 48
), "The length of 'Kosten_Euro_pro_Stunde' should be 48."
assert (
len(result["akku_soc_pro_stunde"]) == 48
), "The length of 'akku_soc_pro_stunde' should be 48."
# Verfify DC and AC Charge Bins
assert (
abs(result["akku_soc_pro_stunde"][10] - 10.0) < 1e-5
), "'akku_soc_pro_stunde[10]' should be 10."
assert (
abs(result["akku_soc_pro_stunde"][11] -79.275184) < 1e-5
), "'akku_soc_pro_stunde[11]' should be 79.275184."
assert (
abs(result["Netzeinspeisung_Wh_pro_Stunde"][10] - 3946.93) < 1e-3
), "'Netzeinspeisung_Wh_pro_Stunde[11]' should be 4000."
assert (
abs(result["Netzeinspeisung_Wh_pro_Stunde"][11] - 0.0) < 1e-3
), "'Netzeinspeisung_Wh_pro_Stunde[11]' should be 0.0."
assert (
abs(result["akku_soc_pro_stunde"][20] - 98
) < 1e-5
), "'akku_soc_pro_stunde[11]' should be 98."
assert (
abs(result["Last_Wh_pro_Stunde"][20] - 5450.98) < 1e-3
), "'Netzeinspeisung_Wh_pro_Stunde[11]' should be 0.0."
print("All tests passed successfully.")