PreCommit Fixed

This commit is contained in:
Andreas 2024-10-22 10:29:57 +02:00 committed by Andreas
parent 45a3bcdb09
commit c47f071f55
8 changed files with 216 additions and 132 deletions

View File

@ -88,7 +88,7 @@ class PVAkku:
return (self.soc_wh / self.kapazitaet_wh) * 100 return (self.soc_wh / self.kapazitaet_wh) * 100
def energie_abgeben(self, wh, hour): def energie_abgeben(self, wh, hour):
if self.discharge_array[hour] == 0 : if self.discharge_array[hour] == 0:
return 0.0, 0.0 # No energy discharge and no losses return 0.0, 0.0 # No energy discharge and no losses
# Calculate the maximum energy that can be discharged considering min_soc and efficiency # Calculate the maximum energy that can be discharged considering min_soc and efficiency
@ -122,7 +122,7 @@ class PVAkku:
if hour is not None and self.charge_array[hour] == 0: if hour is not None and self.charge_array[hour] == 0:
return 0, 0 # Charging not allowed in this hour return 0, 0 # Charging not allowed in this hour
if relative_power > 0.0: if relative_power > 0.0:
wh=self.max_ladeleistung_w*relative_power wh = self.max_ladeleistung_w * relative_power
# If no value for wh is given, use the maximum charging power # If no value for wh is given, use the maximum charging power
wh = wh if wh is not None else self.max_ladeleistung_w wh = wh if wh is not None else self.max_ladeleistung_w
@ -134,7 +134,7 @@ class PVAkku:
max_possible_charge_wh = max(max_possible_charge_wh, 0.0) # Ensure non-negative max_possible_charge_wh = max(max_possible_charge_wh, 0.0) # Ensure non-negative
# The actually charged energy cannot exceed requested energy, charging power, or maximum possible charge # The actually charged energy cannot exceed requested energy, charging power, or maximum possible charge
effektive_lademenge = min(wh, max_possible_charge_wh) effektive_lademenge = min(wh, max_possible_charge_wh)
# Energy actually stored in the battery # Energy actually stored in the battery
geladene_menge = effektive_lademenge * self.lade_effizienz geladene_menge = effektive_lademenge * self.lade_effizienz

View File

@ -1,8 +1,10 @@
from datetime import datetime from datetime import datetime
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Union
from akkudoktoreos.config import *
import numpy as np import numpy as np
from akkudoktoreos.config import prediction_hours
class EnergieManagementSystem: class EnergieManagementSystem:
def __init__( def __init__(
@ -23,9 +25,9 @@ class EnergieManagementSystem:
self.eauto = eauto self.eauto = eauto
self.haushaltsgeraet = haushaltsgeraet self.haushaltsgeraet = haushaltsgeraet
self.wechselrichter = wechselrichter self.wechselrichter = wechselrichter
self.ac_charge_hours = np.full(prediction_hours,0) self.ac_charge_hours = np.full(prediction_hours, 0)
self.dc_charge_hours = np.full(prediction_hours,1) self.dc_charge_hours = np.full(prediction_hours, 1)
self.ev_charge_hours = np.full(prediction_hours,0) self.ev_charge_hours = np.full(prediction_hours, 0)
def set_akku_discharge_hours(self, ds: List[int]) -> None: def set_akku_discharge_hours(self, ds: List[int]) -> None:
self.akku.set_discharge_per_hour(ds) self.akku.set_discharge_per_hour(ds)
@ -52,11 +54,11 @@ class EnergieManagementSystem:
return self.simuliere(start_stunde) return self.simuliere(start_stunde)
def simuliere(self, start_stunde: int) -> dict: def simuliere(self, start_stunde: int) -> dict:
''' """
hour: hour:
akku_soc_pro_stunde begin of the hour, initial hour state! akku_soc_pro_stunde begin of the hour, initial hour state!
last_wh_pro_stunde integral of last hour (end state) last_wh_pro_stunde integral of last hour (end state)
''' """
lastkurve_wh = self.gesamtlast lastkurve_wh = self.gesamtlast
assert ( assert (
@ -83,7 +85,7 @@ class EnergieManagementSystem:
if self.eauto: if self.eauto:
eauto_soc_pro_stunde[0] = self.eauto.ladezustand_in_prozent() eauto_soc_pro_stunde[0] = self.eauto.ladezustand_in_prozent()
for stunde in range(start_stunde , ende): for stunde in range(start_stunde, ende):
stunde_since_now = stunde - start_stunde stunde_since_now = stunde - start_stunde
# Accumulate loads and PV generation # Accumulate loads and PV generation
@ -95,8 +97,10 @@ class EnergieManagementSystem:
haushaltsgeraet_wh_pro_stunde[stunde_since_now] = ha_load haushaltsgeraet_wh_pro_stunde[stunde_since_now] = ha_load
# E-Auto handling # E-Auto handling
if self.eauto and self.ev_charge_hours[stunde]>0: if self.eauto and self.ev_charge_hours[stunde] > 0:
geladene_menge_eauto, verluste_eauto = self.eauto.energie_laden(None, stunde, relative_power=self.ev_charge_hours[stunde]) geladene_menge_eauto, verluste_eauto = self.eauto.energie_laden(
None, stunde, relative_power=self.ev_charge_hours[stunde]
)
verbrauch += geladene_menge_eauto verbrauch += geladene_menge_eauto
verluste_wh_pro_stunde[stunde_since_now] += verluste_eauto verluste_wh_pro_stunde[stunde_since_now] += verluste_eauto
@ -104,18 +108,20 @@ class EnergieManagementSystem:
eauto_soc_pro_stunde[stunde_since_now] = self.eauto.ladezustand_in_prozent() eauto_soc_pro_stunde[stunde_since_now] = self.eauto.ladezustand_in_prozent()
# Process inverter logic # Process inverter logic
erzeugung = self.pv_prognose_wh[stunde] erzeugung = self.pv_prognose_wh[stunde]
self.akku.set_charge_allowed_for_hour(self.dc_charge_hours[stunde],stunde) self.akku.set_charge_allowed_for_hour(self.dc_charge_hours[stunde], stunde)
netzeinspeisung, netzbezug, verluste, eigenverbrauch = ( netzeinspeisung, netzbezug, verluste, eigenverbrauch = (
self.wechselrichter.energie_verarbeiten(erzeugung, verbrauch, stunde) self.wechselrichter.energie_verarbeiten(erzeugung, verbrauch, stunde)
) )
# AC PV Battery Charge # AC PV Battery Charge
if self.ac_charge_hours[stunde] > 0.0: if self.ac_charge_hours[stunde] > 0.0:
self.akku.set_charge_allowed_for_hour(1,stunde) self.akku.set_charge_allowed_for_hour(1, stunde)
geladene_menge, verluste_wh = self.akku.energie_laden(None,stunde,relative_power=self.ac_charge_hours[stunde]) geladene_menge, verluste_wh = self.akku.energie_laden(
#print(stunde, " ", geladene_menge, " ",self.ac_charge_hours[stunde]," ",self.akku.ladezustand_in_prozent()) None, stunde, relative_power=self.ac_charge_hours[stunde]
)
# print(stunde, " ", geladene_menge, " ",self.ac_charge_hours[stunde]," ",self.akku.ladezustand_in_prozent())
verbrauch += geladene_menge verbrauch += geladene_menge
netzbezug +=geladene_menge netzbezug += geladene_menge
verluste_wh_pro_stunde[stunde_since_now] += verluste_wh verluste_wh_pro_stunde[stunde_since_now] += verluste_wh
netzeinspeisung_wh_pro_stunde[stunde_since_now] = netzeinspeisung netzeinspeisung_wh_pro_stunde[stunde_since_now] = netzeinspeisung

View File

@ -1,6 +1,8 @@
import json import json
import numpy as np import numpy as np
class NumpyEncoder(json.JSONEncoder): class NumpyEncoder(json.JSONEncoder):
def default(self, obj): def default(self, obj):
if isinstance(obj, np.ndarray): if isinstance(obj, np.ndarray):
@ -21,4 +23,3 @@ class NumpyEncoder(json.JSONEncoder):
str: A JSON string representation of the object. str: A JSON string representation of the object.
""" """
return json.dumps(data, cls=NumpyEncoder) return json.dumps(data, cls=NumpyEncoder)

View File

@ -12,7 +12,6 @@ from akkudoktoreos.config import possible_ev_charge_currents
from akkudoktoreos.visualize import visualisiere_ergebnisse from akkudoktoreos.visualize import visualisiere_ergebnisse
class optimization_problem: class optimization_problem:
def __init__( def __init__(
self, self,
@ -37,8 +36,9 @@ class optimization_problem:
if fixed_seed is not None: if fixed_seed is not None:
random.seed(fixed_seed) random.seed(fixed_seed)
def decode_charge_discharge(
def decode_charge_discharge(self, discharge_hours_bin: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: self, discharge_hours_bin: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
""" """
Decode the input array `discharge_hours_bin` into three separate arrays for AC charging, DC charging, and discharge. Decode the input array `discharge_hours_bin` into three separate arrays for AC charging, DC charging, and discharge.
The function maps AC and DC charging values to relative power levels (0 to 1), while the discharge remains binary (0 or 1). The function maps AC and DC charging values to relative power levels (0 to 1), while the discharge remains binary (0 or 1).
@ -60,7 +60,9 @@ class optimization_problem:
discharge_hours_bin = np.array(discharge_hours_bin) discharge_hours_bin = np.array(discharge_hours_bin)
# Create ac_charge array: Only consider values between 2 and 6 (AC charging power levels), set the rest to 0 # Create ac_charge array: Only consider values between 2 and 6 (AC charging power levels), set the rest to 0
ac_charge = np.where((discharge_hours_bin >= 2) & (discharge_hours_bin <= 6), discharge_hours_bin - 1, 0) ac_charge = np.where(
(discharge_hours_bin >= 2) & (discharge_hours_bin <= 6), discharge_hours_bin - 1, 0
)
ac_charge = ac_charge / 5.0 # Normalize AC charge to range between 0 and 1 ac_charge = ac_charge / 5.0 # Normalize AC charge to range between 0 and 1
# Create dc_charge array: 7 = Not allowed (mapped to 0), 8 = Allowed (mapped to 1) # Create dc_charge array: 7 = Not allowed (mapped to 0), 8 = Allowed (mapped to 1)
@ -68,16 +70,15 @@ class optimization_problem:
if self.optimize_dc_charge: if self.optimize_dc_charge:
dc_charge = np.where(discharge_hours_bin == 8, 1, 0) dc_charge = np.where(discharge_hours_bin == 8, 1, 0)
else: else:
dc_charge = np.ones_like(discharge_hours_bin) # Set DC charge to 0 if optimization is disabled dc_charge = np.ones_like(
discharge_hours_bin
) # Set DC charge to 0 if optimization is disabled
# Create discharge array: Only consider value 1 (Discharge), set the rest to 0 (binary output) # Create discharge array: Only consider value 1 (Discharge), set the rest to 0 (binary output)
discharge = np.where(discharge_hours_bin == 1, 1, 0) discharge = np.where(discharge_hours_bin == 1, 1, 0)
return ac_charge, dc_charge, discharge return ac_charge, dc_charge, discharge
# Custom mutation function that applies type-specific mutations # Custom mutation function that applies type-specific mutations
def mutate(self, individual): def mutate(self, individual):
""" """
@ -95,10 +96,10 @@ class optimization_problem:
# Step 1: Mutate the charge/discharge states (idle, discharge, AC charge, DC charge) # Step 1: Mutate the charge/discharge states (idle, discharge, AC charge, DC charge)
# Extract the relevant part of the individual for prediction hours, which represents the charge/discharge behavior. # Extract the relevant part of the individual for prediction hours, which represents the charge/discharge behavior.
charge_discharge_part = individual[:self.prediction_hours] charge_discharge_part = individual[: self.prediction_hours]
# Apply the mutation to the charge/discharge part # Apply the mutation to the charge/discharge part
charge_discharge_mutated, = self.toolbox.mutate_charge_discharge(charge_discharge_part) (charge_discharge_mutated,) = self.toolbox.mutate_charge_discharge(charge_discharge_part)
# Ensure that no invalid states are introduced during mutation (valid values: 0-8) # Ensure that no invalid states are introduced during mutation (valid values: 0-8)
if self.optimize_dc_charge: if self.optimize_dc_charge:
@ -107,13 +108,13 @@ class optimization_problem:
charge_discharge_mutated = np.clip(charge_discharge_mutated, 0, 6) charge_discharge_mutated = np.clip(charge_discharge_mutated, 0, 6)
# Use split_charge_discharge to split the mutated array into AC charge, DC charge, and discharge components # Use split_charge_discharge to split the mutated array into AC charge, DC charge, and discharge components
#ac_charge, dc_charge, discharge = self.split_charge_discharge(charge_discharge_mutated) # ac_charge, dc_charge, discharge = self.split_charge_discharge(charge_discharge_mutated)
# Optionally: You can process the split arrays further if needed, for example, # Optionally: You can process the split arrays further if needed, for example,
# applying additional constraints or penalties, or keeping track of charging limits. # applying additional constraints or penalties, or keeping track of charging limits.
# Reassign the mutated values back to the individual # Reassign the mutated values back to the individual
individual[:self.prediction_hours] = charge_discharge_mutated individual[: self.prediction_hours] = charge_discharge_mutated
# Step 2: Mutate EV charging schedule if enabled # Step 2: Mutate EV charging schedule if enabled
if self.optimize_ev: if self.optimize_ev:
@ -121,10 +122,12 @@ class optimization_problem:
ev_charge_part = individual[self.prediction_hours : self.prediction_hours * 2] ev_charge_part = individual[self.prediction_hours : self.prediction_hours * 2]
# Apply mutation on the EV charging schedule # Apply mutation on the EV charging schedule
ev_charge_part_mutated, = self.toolbox.mutate_ev_charge_index(ev_charge_part) (ev_charge_part_mutated,) = self.toolbox.mutate_ev_charge_index(ev_charge_part)
# Ensure the EV does not charge during fixed hours (set those hours to 0) # Ensure the EV does not charge during fixed hours (set those hours to 0)
ev_charge_part_mutated[self.prediction_hours - self.fixed_eauto_hours :] = [0] * self.fixed_eauto_hours ev_charge_part_mutated[self.prediction_hours - self.fixed_eauto_hours :] = [
0
] * self.fixed_eauto_hours
# Reassign the mutated EV charging part back to the individual # Reassign the mutated EV charging part back to the individual
individual[self.prediction_hours : self.prediction_hours * 2] = ev_charge_part_mutated individual[self.prediction_hours : self.prediction_hours * 2] = ev_charge_part_mutated
@ -135,22 +138,25 @@ class optimization_problem:
appliance_part = [individual[-1]] appliance_part = [individual[-1]]
# Apply mutation on the appliance start hour # Apply mutation on the appliance start hour
appliance_part_mutated, = self.toolbox.mutate_hour(appliance_part) (appliance_part_mutated,) = self.toolbox.mutate_hour(appliance_part)
# Reassign the mutated appliance part back to the individual # Reassign the mutated appliance part back to the individual
individual[-1] = appliance_part_mutated[0] individual[-1] = appliance_part_mutated[0]
return (individual,) return (individual,)
# Method to create an individual based on the conditions # Method to create an individual based on the conditions
def create_individual(self): def create_individual(self):
# Start with discharge states for the individual # Start with discharge states for the individual
individual_components = [self.toolbox.attr_discharge_state() for _ in range(self.prediction_hours)] individual_components = [
self.toolbox.attr_discharge_state() for _ in range(self.prediction_hours)
]
# Add EV charge index values if optimize_ev is True # Add EV charge index values if optimize_ev is True
if self.optimize_ev: if self.optimize_ev:
individual_components += [self.toolbox.attr_ev_charge_index() for _ in range(self.prediction_hours)] individual_components += [
self.toolbox.attr_ev_charge_index() for _ in range(self.prediction_hours)
]
# Add the start time of the household appliance if it's being optimized # Add the start time of the household appliance if it's being optimized
if self.opti_param["haushaltsgeraete"] > 0: if self.opti_param["haushaltsgeraete"] > 0:
@ -200,30 +206,41 @@ class optimization_problem:
# Initialize toolbox with attributes and operations # Initialize toolbox with attributes and operations
self.toolbox = base.Toolbox() self.toolbox = base.Toolbox()
if self.optimize_dc_charge: if self.optimize_dc_charge:
self.toolbox.register("attr_discharge_state", random.randint, 0,8) self.toolbox.register("attr_discharge_state", random.randint, 0, 8)
else: else:
self.toolbox.register("attr_discharge_state", random.randint, 0,6) self.toolbox.register("attr_discharge_state", random.randint, 0, 6)
if self.optimize_ev: if self.optimize_ev:
self.toolbox.register("attr_ev_charge_index", random.randint, 0, len(possible_ev_charge_currents) - 1) self.toolbox.register(
"attr_ev_charge_index", random.randint, 0, len(possible_ev_charge_currents) - 1
)
self.toolbox.register("attr_int", random.randint, start_hour, 23) self.toolbox.register("attr_int", random.randint, start_hour, 23)
# Register individual creation function # Register individual creation function
self.toolbox.register("individual", self.create_individual) self.toolbox.register("individual", self.create_individual)
# Register population, mating, mutation, and selection functions # Register population, mating, mutation, and selection functions
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual) self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
self.toolbox.register("mate", tools.cxTwoPoint) self.toolbox.register("mate", tools.cxTwoPoint)
#self.toolbox.register("mutate", tools.mutFlipBit, indpb=0.1) # self.toolbox.register("mutate", tools.mutFlipBit, indpb=0.1)
# Register separate mutation functions for each type of value: # Register separate mutation functions for each type of value:
# - Discharge state mutation (-5, 0, 1) # - Discharge state mutation (-5, 0, 1)
if self.optimize_dc_charge: if self.optimize_dc_charge:
self.toolbox.register("mutate_charge_discharge", tools.mutUniformInt, low=0, up=8, indpb=0.2) self.toolbox.register(
"mutate_charge_discharge", tools.mutUniformInt, low=0, up=8, indpb=0.2
)
else: else:
self.toolbox.register("mutate_charge_discharge", tools.mutUniformInt, low=0, up=6, indpb=0.2) self.toolbox.register(
"mutate_charge_discharge", tools.mutUniformInt, low=0, up=6, indpb=0.2
)
# - Float mutation for EV charging values # - Float mutation for EV charging values
self.toolbox.register("mutate_ev_charge_index", tools.mutUniformInt, low=0, up=len(possible_ev_charge_currents) - 1, indpb=0.2) self.toolbox.register(
"mutate_ev_charge_index",
tools.mutUniformInt,
low=0,
up=len(possible_ev_charge_currents) - 1,
indpb=0.2,
)
# - Start hour mutation for household devices # - Start hour mutation for household devices
self.toolbox.register("mutate_hour", tools.mutUniformInt, low=start_hour, up=23, indpb=0.2) self.toolbox.register("mutate_hour", tools.mutUniformInt, low=start_hour, up=23, indpb=0.2)
@ -246,8 +263,7 @@ class optimization_problem:
if self.opti_param.get("haushaltsgeraete", 0) > 0: if self.opti_param.get("haushaltsgeraete", 0) > 0:
ems.set_haushaltsgeraet_start(spuelstart_int, global_start_hour=start_hour) ems.set_haushaltsgeraet_start(spuelstart_int, global_start_hour=start_hour)
ac,dc,discharge = self.decode_charge_discharge(discharge_hours_bin) ac, dc, discharge = self.decode_charge_discharge(discharge_hours_bin)
ems.set_akku_discharge_hours(discharge) ems.set_akku_discharge_hours(discharge)
# Set DC charge hours only if DC optimization is enabled # Set DC charge hours only if DC optimization is enabled
@ -261,7 +277,7 @@ class optimization_problem:
] ]
ems.set_ev_charge_hours(eautocharge_hours_float) ems.set_ev_charge_hours(eautocharge_hours_float)
else: else:
ems.set_ev_charge_hours(np.full(self.prediction_hours, 0 )) ems.set_ev_charge_hours(np.full(self.prediction_hours, 0))
return ems.simuliere(start_hour) return ems.simuliere(start_hour)
def evaluate( def evaluate(
@ -304,15 +320,14 @@ class optimization_problem:
# Adjust total balance with battery value and penalties for unmet SOC # Adjust total balance with battery value and penalties for unmet SOC
restwert_akku = ems.akku.aktueller_energieinhalt() * parameter["preis_euro_pro_wh_akku"] restwert_akku = ems.akku.aktueller_energieinhalt() * parameter["preis_euro_pro_wh_akku"]
#print(ems.akku.aktueller_energieinhalt()," * ", parameter["preis_euro_pro_wh_akku"] , " ", restwert_akku, " ", gesamtbilanz) # print(ems.akku.aktueller_energieinhalt()," * ", parameter["preis_euro_pro_wh_akku"] , " ", restwert_akku, " ", gesamtbilanz)
gesamtbilanz += -restwert_akku gesamtbilanz += -restwert_akku
#print(gesamtbilanz) # print(gesamtbilanz)
if self.optimize_ev: if self.optimize_ev:
gesamtbilanz += max( gesamtbilanz += max(
0, 0,
(parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent()) * self.strafe, (parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent()) * self.strafe,
) )
return (gesamtbilanz,) return (gesamtbilanz,)
@ -333,7 +348,7 @@ class optimization_problem:
for _ in range(3): for _ in range(3):
population.insert(0, creator.Individual(start_solution)) population.insert(0, creator.Individual(start_solution))
#Run the evolutionary algorithm # Run the evolutionary algorithm
algorithms.eaMuPlusLambda( algorithms.eaMuPlusLambda(
population, population,
self.toolbox, self.toolbox,
@ -384,7 +399,7 @@ class optimization_problem:
akku.set_charge_per_hour(np.full(self.prediction_hours, 1)) akku.set_charge_per_hour(np.full(self.prediction_hours, 1))
self.optimize_ev = True self.optimize_ev = True
if parameter["eauto_min_soc"] - parameter["eauto_soc"] <0: if parameter["eauto_min_soc"] - parameter["eauto_soc"] < 0:
self.optimize_ev = False self.optimize_ev = False
eauto = PVAkku( eauto = PVAkku(
@ -426,7 +441,7 @@ class optimization_problem:
"evaluate", "evaluate",
lambda ind: self.evaluate(ind, ems, parameter, start_hour, worst_case), lambda ind: self.evaluate(ind, ems, parameter, start_hour, worst_case),
) )
start_solution, extra_data = self.optimize(parameter["start_solution"], ngen=ngen) # start_solution, extra_data = self.optimize(parameter["start_solution"], ngen=ngen) #
# Perform final evaluation on the best solution # Perform final evaluation on the best solution
o = self.evaluate_inner(start_solution, ems, start_hour) o = self.evaluate_inner(start_solution, ems, start_hour)
@ -434,7 +449,9 @@ class optimization_problem:
start_solution start_solution
) )
if self.optimize_ev: if self.optimize_ev:
eautocharge_hours_float = [possible_ev_charge_currents[i] for i in eautocharge_hours_float] eautocharge_hours_float = [
possible_ev_charge_currents[i] for i in eautocharge_hours_float
]
ac_charge, dc_charge, discharge = self.decode_charge_discharge(discharge_hours_bin) ac_charge, dc_charge, discharge = self.decode_charge_discharge(discharge_hours_bin)
# Visualize the results # Visualize the results
@ -472,7 +489,7 @@ class optimization_problem:
element_list = o[key].tolist() element_list = o[key].tolist()
# Change the first value to None # Change the first value to None
#element_list[0] = None # element_list[0] = None
# Change the NaN to None (JSON) # Change the NaN to None (JSON)
element_list = [ element_list = [
None if isinstance(x, (int, float)) and np.isnan(x) else x for x in element_list None if isinstance(x, (int, float)) and np.isnan(x) else x for x in element_list
@ -484,8 +501,8 @@ class optimization_problem:
# Return final results as a dictionary # Return final results as a dictionary
return { return {
"ac_charge": ac_charge.tolist(), "ac_charge": ac_charge.tolist(),
"dc_charge":dc_charge.tolist(), "dc_charge": dc_charge.tolist(),
"discharge_allowed":discharge.tolist(), "discharge_allowed": discharge.tolist(),
"eautocharge_hours_float": eautocharge_hours_float, "eautocharge_hours_float": eautocharge_hours_float,
"result": o, "result": o,
"eauto_obj": ems.eauto.to_dict(), "eauto_obj": ems.eauto.to_dict(),
@ -493,4 +510,3 @@ class optimization_problem:
"spuelstart": spuelstart_int, "spuelstart": spuelstart_int,
"simulation_data": o, "simulation_data": o,
} }

View File

@ -5,6 +5,7 @@ import matplotlib.pyplot as plt
import numpy as np import numpy as np
import pandas as pd import pandas as pd
class BatteryDataProcessor: class BatteryDataProcessor:
def __init__( def __init__(
self, self,
@ -115,7 +116,10 @@ class BatteryDataProcessor:
else: else:
end_point = self.data.iloc[-1] # Verwenden des letzten Datensatzes als Endpunkt end_point = self.data.iloc[-1] # Verwenden des letzten Datensatzes als Endpunkt
if not last_points_100_df.empty and start_point["timestamp"] in last_points_100_df["timestamp"].values: if (
not last_points_100_df.empty
and start_point["timestamp"] in last_points_100_df["timestamp"].values
):
initial_soc = 100 initial_soc = 100
elif start_point["timestamp"] in last_points_0_df["timestamp"].values: elif start_point["timestamp"] in last_points_0_df["timestamp"].values:
initial_soc = 0 initial_soc = 0
@ -235,7 +239,13 @@ class BatteryDataProcessor:
marker="o", marker="o",
label="100% SoC Points", label="100% SoC Points",
) )
plt.scatter(last_points_0_df['timestamp'], last_points_0_df['battery_voltage'], color='red', marker='x', label='0% SoC Points') plt.scatter(
last_points_0_df["timestamp"],
last_points_0_df["battery_voltage"],
color="red",
marker="x",
label="0% SoC Points",
)
plt.xlabel("Timestamp") plt.xlabel("Timestamp")
plt.ylabel("Voltage (V)") plt.ylabel("Voltage (V)")
plt.legend() plt.legend()
@ -256,7 +266,13 @@ class BatteryDataProcessor:
marker="o", marker="o",
label="100% SoC Points", label="100% SoC Points",
) )
plt.scatter(last_points_0_df['timestamp'], last_points_0_df['battery_current'], color='red', marker='x', label='0% SoC Points') plt.scatter(
last_points_0_df["timestamp"],
last_points_0_df["battery_current"],
color="red",
marker="x",
label="0% SoC Points",
)
plt.xlabel("Timestamp") plt.xlabel("Timestamp")
plt.ylabel("Current (A)") plt.ylabel("Current (A)")
plt.legend() plt.legend()
@ -284,10 +300,10 @@ if __name__ == "__main__":
# MariaDB Verbindungsdetails # MariaDB Verbindungsdetails
config = { config = {
'user': 'soc', "user": "soc",
'password': 'Rayoflight123!', "password": "Rayoflight123!",
'host': '192.168.1.135', "host": "192.168.1.135",
'database': 'sensor' "database": "sensor",
} }
# Parameter festlegen # Parameter festlegen
@ -295,7 +311,7 @@ if __name__ == "__main__":
voltage_low_threshold = 48 # 0% SoC voltage_low_threshold = 48 # 0% SoC
current_low_threshold = 2 # Niedriger Strom für beide Zustände current_low_threshold = 2 # Niedriger Strom für beide Zustände
gap = 30 # Zeitlücke in Minuten zum Gruppieren von Maxima/Minima gap = 30 # Zeitlücke in Minuten zum Gruppieren von Maxima/Minima
bat_capacity = 0.8*33 * 1000 / 48 bat_capacity = 0.8 * 33 * 1000 / 48
# Zeitpunkt X definieren # Zeitpunkt X definieren
zeitpunkt_x = (datetime.now() - timedelta(weeks=4)).strftime("%Y-%m-%d %H:%M:%S") zeitpunkt_x = (datetime.now() - timedelta(weeks=4)).strftime("%Y-%m-%d %H:%M:%S")
@ -317,7 +333,7 @@ if __name__ == "__main__":
last_points_100_df, last_points_0_df last_points_100_df, last_points_0_df
) )
# soh_df = processor.calculate_soh(integration_results) # soh_df = processor.calculate_soh(integration_results)
#processor.update_database_with_soc(soc_df) # processor.update_database_with_soc(soc_df)
processor.plot_data(last_points_100_df, last_points_0_df, soc_df) processor.plot_data(last_points_100_df, last_points_0_df, soc_df)

View File

@ -22,20 +22,21 @@ def repeat_to_shape(array, target_shape):
class HourlyElectricityPriceForecast: class HourlyElectricityPriceForecast:
def __init__(self, source, cache_dir="cache", charges=0.000228, prediction_hours=24, cache=True): # 228 def __init__(
self, source, cache_dir="cache", charges=0.000228, prediction_hours=24, cache=True
): # 228
self.cache_dir = cache_dir self.cache_dir = cache_dir
self.cache=cache self.cache = cache
os.makedirs(self.cache_dir, exist_ok=True) os.makedirs(self.cache_dir, exist_ok=True)
self.cache_time_file = os.path.join(self.cache_dir, "cache_timestamp.txt") self.cache_time_file = os.path.join(self.cache_dir, "cache_timestamp.txt")
self.prices = self.load_data(source) self.prices = self.load_data(source)
self.charges = charges self.charges = charges
self.prediction_hours = prediction_hours self.prediction_hours = prediction_hours
def load_data(self, source): def load_data(self, source):
cache_filename = self.get_cache_filename(source) cache_filename = self.get_cache_filename(source)
if source.startswith("http"): if source.startswith("http"):
if os.path.exists(cache_filename) and not self.is_cache_expired() and self.cache==True: if os.path.exists(cache_filename) and not self.is_cache_expired() and self.cache:
print("Loading data from cache...") print("Loading data from cache...")
with open(cache_filename, "r") as file: with open(cache_filename, "r") as file:
json_data = json.load(file) json_data = json.load(file)

View File

@ -59,8 +59,6 @@ def visualisiere_ergebnisse(
plt.grid(True) plt.grid(True)
plt.legend() plt.legend()
# PV forecast # PV forecast
plt.subplot(3, 2, 3) plt.subplot(3, 2, 3)
plt.plot(hours, pv_forecast, label="PV Generation (Wh)", marker="x") plt.plot(hours, pv_forecast, label="PV Generation (Wh)", marker="x")
@ -111,19 +109,44 @@ def visualisiere_ergebnisse(
plt.subplot(3, 2, 1) plt.subplot(3, 2, 1)
# Plot with transparency (alpha) and different linestyles # Plot with transparency (alpha) and different linestyles
plt.plot( plt.plot(
hours, ergebnisse["Last_Wh_pro_Stunde"], label="Load (Wh)", marker="o", linestyle="-", alpha=0.8 hours,
ergebnisse["Last_Wh_pro_Stunde"],
label="Load (Wh)",
marker="o",
linestyle="-",
alpha=0.8,
) )
plt.plot( plt.plot(
hours, ergebnisse["Haushaltsgeraet_wh_pro_stunde"], label="Household Device (Wh)", marker="o", linestyle="--", alpha=0.8 hours,
ergebnisse["Haushaltsgeraet_wh_pro_stunde"],
label="Household Device (Wh)",
marker="o",
linestyle="--",
alpha=0.8,
) )
plt.plot( plt.plot(
hours, ergebnisse["Netzeinspeisung_Wh_pro_Stunde"], label="Grid Feed-in (Wh)", marker="x", linestyle=":", alpha=0.8 hours,
ergebnisse["Netzeinspeisung_Wh_pro_Stunde"],
label="Grid Feed-in (Wh)",
marker="x",
linestyle=":",
alpha=0.8,
) )
plt.plot( plt.plot(
hours, ergebnisse["Netzbezug_Wh_pro_Stunde"], label="Grid Consumption (Wh)", marker="^", linestyle="-.", alpha=0.8 hours,
ergebnisse["Netzbezug_Wh_pro_Stunde"],
label="Grid Consumption (Wh)",
marker="^",
linestyle="-.",
alpha=0.8,
) )
plt.plot( plt.plot(
hours, ergebnisse["Verluste_Pro_Stunde"], label="Losses (Wh)", marker="^", linestyle="-", alpha=0.8 hours,
ergebnisse["Verluste_Pro_Stunde"],
label="Losses (Wh)",
marker="^",
linestyle="-",
alpha=0.8,
) )
# Title and labels # Title and labels
@ -134,8 +157,6 @@ def visualisiere_ergebnisse(
# Show legend with a higher number of columns to avoid overlap # Show legend with a higher number of columns to avoid overlap
plt.legend(ncol=2) plt.legend(ncol=2)
# Electricity prices # Electricity prices
hours_p = np.arange(0, len(strompreise)) hours_p = np.arange(0, len(strompreise))
plt.subplot(3, 2, 3) plt.subplot(3, 2, 3)
@ -164,8 +185,6 @@ def visualisiere_ergebnisse(
plt.legend(loc="upper left", bbox_to_anchor=(1, 1)) # Place legend outside the plot plt.legend(loc="upper left", bbox_to_anchor=(1, 1)) # Place legend outside the plot
plt.grid(True, which="both", axis="x") # Grid for every hour plt.grid(True, which="both", axis="x") # Grid for every hour
# Plot for AC, DC charging, and Discharge status using bar charts # Plot for AC, DC charging, and Discharge status using bar charts
ax1 = plt.subplot(3, 2, 5) ax1 = plt.subplot(3, 2, 5)
hours = np.arange(0, prediction_hours) hours = np.arange(0, prediction_hours)
@ -173,10 +192,20 @@ def visualisiere_ergebnisse(
plt.bar(hours, ac, width=0.4, label="AC Charging (relative)", color="blue", alpha=0.6) plt.bar(hours, ac, width=0.4, label="AC Charging (relative)", color="blue", alpha=0.6)
# Plot DC charging as bars (relative values between 0 and 1) # Plot DC charging as bars (relative values between 0 and 1)
plt.bar(hours + 0.4, dc, width=0.4, label="DC Charging (relative)", color="green", alpha=0.6) plt.bar(
hours + 0.4, dc, width=0.4, label="DC Charging (relative)", color="green", alpha=0.6
)
# Plot Discharge as bars (0 or 1, binary values) # Plot Discharge as bars (0 or 1, binary values)
plt.bar(hours, discharge, width=0.4, label="Discharge Allowed", color="red", alpha=0.6, bottom=np.maximum(ac, dc)) plt.bar(
hours,
discharge,
width=0.4,
label="Discharge Allowed",
color="red",
alpha=0.6,
bottom=np.maximum(ac, dc),
)
# Configure the plot # Configure the plot
ax1.legend(loc="upper left") ax1.legend(loc="upper left")
@ -186,13 +215,11 @@ def visualisiere_ergebnisse(
ax1.set_title("AC/DC Charging and Discharge Overview") ax1.set_title("AC/DC Charging and Discharge Overview")
ax1.grid(True) ax1.grid(True)
if ist_dst_wechsel(datetime.datetime.now()): if ist_dst_wechsel(datetime.datetime.now()):
hours = np.arange(start_hour, prediction_hours - 1) hours = np.arange(start_hour, prediction_hours - 1)
else: else:
hours = np.arange(start_hour, prediction_hours) hours = np.arange(start_hour, prediction_hours)
pdf.savefig() # Save the current figure state to the PDF pdf.savefig() # Save the current figure state to the PDF
plt.close() # Close the current figure to free up memory plt.close() # Close the current figure to free up memory
@ -217,9 +244,17 @@ def visualisiere_ergebnisse(
) )
# Annotate costs # Annotate costs
for hour, value in enumerate(costs): for hour, value in enumerate(costs):
if value == None or np.isnan(value): if value is None or np.isnan(value):
value=0 value = 0
axs[0].annotate(f'{value:.2f}', (hour, value), textcoords="offset points", xytext=(0,5), ha='center', fontsize=8, color='red') axs[0].annotate(
f"{value:.2f}",
(hour, value),
textcoords="offset points",
xytext=(0, 5),
ha="center",
fontsize=8,
color="red",
)
# Plot revenues # Plot revenues
axs[0].plot( axs[0].plot(
@ -231,9 +266,17 @@ def visualisiere_ergebnisse(
) )
# Annotate revenues # Annotate revenues
for hour, value in enumerate(revenues): for hour, value in enumerate(revenues):
if value == None or np.isnan(value): if value is None or np.isnan(value):
value=0 value = 0
axs[0].annotate(f'{value:.2f}', (hour, value), textcoords="offset points", xytext=(0,5), ha='center', fontsize=8, color='green') axs[0].annotate(
f"{value:.2f}",
(hour, value),
textcoords="offset points",
xytext=(0, 5),
ha="center",
fontsize=8,
color="green",
)
# Title and labels # Title and labels
axs[0].set_title("Financial Balance per Hour") axs[0].set_title("Financial Balance per Hour")
@ -242,8 +285,6 @@ def visualisiere_ergebnisse(
axs[0].legend() axs[0].legend()
axs[0].grid(True) axs[0].grid(True)
# Summary of finances on the second axis (axs[1]) # Summary of finances on the second axis (axs[1])
labels = ["Total Costs [€]", "Total Revenue [€]", "Total Balance [€]"] labels = ["Total Costs [€]", "Total Revenue [€]", "Total Balance [€]"]
values = [total_costs, total_revenue, total_balance] values = [total_costs, total_revenue, total_balance]

View File

@ -15,9 +15,9 @@ from flask import Flask, jsonify, redirect, request, send_from_directory, url_fo
from akkudoktoreos.class_load import LoadForecast from akkudoktoreos.class_load import LoadForecast
from akkudoktoreos.class_load_container import Gesamtlast from akkudoktoreos.class_load_container import Gesamtlast
from akkudoktoreos.class_load_corrector import LoadPredictionAdjuster from akkudoktoreos.class_load_corrector import LoadPredictionAdjuster
from akkudoktoreos.class_numpy_encoder import NumpyEncoder
from akkudoktoreos.class_optimize import optimization_problem from akkudoktoreos.class_optimize import optimization_problem
from akkudoktoreos.class_pv_forecast import PVForecast from akkudoktoreos.class_pv_forecast import PVForecast
from akkudoktoreos.class_numpy_encoder import *
from akkudoktoreos.class_strompreis import HourlyElectricityPriceForecast from akkudoktoreos.class_strompreis import HourlyElectricityPriceForecast
from akkudoktoreos.config import ( from akkudoktoreos.config import (
get_start_enddate, get_start_enddate,
@ -29,7 +29,10 @@ from akkudoktoreos.config import (
app = Flask(__name__) app = Flask(__name__)
opt_class = optimization_problem( opt_class = optimization_problem(
prediction_hours=prediction_hours, strafe=10, optimization_hours=optimization_hours, verbose=True prediction_hours=prediction_hours,
strafe=10,
optimization_hours=optimization_hours,
verbose=True,
) )
@ -62,7 +65,7 @@ def flask_strompreis():
price_forecast = HourlyElectricityPriceForecast( price_forecast = HourlyElectricityPriceForecast(
source=f"https://api.akkudoktor.net/prices?start={date_now}&end={date}", source=f"https://api.akkudoktor.net/prices?start={date_now}&end={date}",
prediction_hours=prediction_hours, prediction_hours=prediction_hours,
cache=False cache=False,
) )
specific_date_prices = price_forecast.get_price_for_daterange( specific_date_prices = price_forecast.get_price_for_daterange(
date_now, date date_now, date
@ -250,7 +253,7 @@ def flask_optimize():
# Perform optimization simulation # Perform optimization simulation
result = opt_class.optimierung_ems(parameter=parameter, start_hour=datetime.now().hour) result = opt_class.optimierung_ems(parameter=parameter, start_hour=datetime.now().hour)
#print(result) # print(result)
# convert to JSON (None accepted by dumps) # convert to JSON (None accepted by dumps)
return NumpyEncoder.dumps(result) return NumpyEncoder.dumps(result)