EOS/modules/class_optimize.py
drbacke c17d535cef Revert "restructure and optimization of class_optimize.py"
This reverts commit 1f0145582542d32367792a308c2125165afcd6d4.
2024-09-30 08:27:07 +02:00

418 lines
18 KiB
Python

from flask import Flask, jsonify, request
import numpy as np
from modules.class_load import *
from modules.class_ems import *
from modules.class_pv_forecast import *
from modules.class_akku import *
from modules.class_heatpump import *
from modules.class_load_container import *
from modules.class_inverter import *
from modules.class_sommerzeit import *
from modules.visualize import *
from modules.class_haushaltsgeraet import *
import os
from flask import Flask, send_from_directory
from pprint import pprint
import matplotlib
matplotlib.use('Agg') # Setzt das Backend auf Agg
import matplotlib.pyplot as plt
import string
from datetime import datetime
from deap import base, creator, tools, algorithms
import numpy as np
import random
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import *
def isfloat(num):
try:
float(num)
return True
except:
return False
def differential_evolution(population, toolbox, cxpb, mutpb, ngen, stats=None, halloffame=None, verbose=__debug__):
"""Differential Evolution Algorithm"""
# Evaluate the entire population
fitnesses = list(map(toolbox.evaluate, population))
for ind, fit in zip(population, fitnesses):
ind.fitness.values = fit
if halloffame is not None:
halloffame.update(population)
logbook = tools.Logbook()
logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])
for gen in range(ngen):
# Generate the next generation by mutation and recombination
for i, target in enumerate(population):
a, b, c = random.sample([ind for ind in population if ind != target], 3)
mutant = toolbox.clone(a)
for k in range(len(mutant)):
mutant[k] = c[k] + mutpb * (a[k] - b[k]) # Mutation step
if random.random() < cxpb: # Recombination step
mutant[k] = target[k]
# Evaluate the mutant
mutant.fitness.values = toolbox.evaluate(mutant)
# Replace if mutant is better
if mutant.fitness > target.fitness:
population[i] = mutant
# Update hall of fame
if halloffame is not None:
halloffame.update(population)
# Gather all the fitnesses in one list and print the stats
record = stats.compile(population) if stats else {}
logbook.record(gen=gen, nevals=len(population), **record)
if verbose:
print(logbook.stream)
return population, logbook
class optimization_problem:
def __init__(self, prediction_hours=24, strafe = 10, optimization_hours= 24):
self.prediction_hours = prediction_hours#
self.strafe = strafe
self.opti_param = None
self.fixed_eauto_hours = prediction_hours-optimization_hours
self.possible_charge_values = moegliche_ladestroeme_in_prozent
def split_individual(self, individual):
"""
Teilt das gegebene Individuum in die verschiedenen Parameter auf:
- Entladeparameter (discharge_hours_bin)
- Ladeparameter (eautocharge_hours_float)
- Haushaltsgeräte (spuelstart_int, falls vorhanden)
"""
# Extrahiere die Entlade- und Ladeparameter direkt aus dem Individuum
discharge_hours_bin = individual[:self.prediction_hours] # Erste 24 Werte sind Bool (Entladen)
eautocharge_hours_float = individual[self.prediction_hours:self.prediction_hours * 2] # Nächste 24 Werte sind Float (Laden)
spuelstart_int = None
if self.opti_param and self.opti_param.get("haushaltsgeraete", 0) > 0:
spuelstart_int = individual[-1] # Letzter Wert ist Startzeit für Haushaltsgerät
return discharge_hours_bin, eautocharge_hours_float, spuelstart_int
def setup_deap_environment(self,opti_param, start_hour):
self.opti_param = opti_param
if "FitnessMin" in creator.__dict__:
del creator.FitnessMin
if "Individual" in creator.__dict__:
del creator.Individual
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)
# PARAMETER
self.toolbox = base.Toolbox()
self.toolbox.register("attr_bool", random.randint, 0, 1)
self.toolbox.register("attr_float", random.uniform, 0, 1) # Für kontinuierliche Werte zwischen 0 und 1 (z.B. für E-Auto-Ladeleistung)
#self.toolbox.register("attr_choice", random.choice, self.possible_charge_values) # Für diskrete Ladeströme
self.toolbox.register("attr_int", random.randint, start_hour, 23)
###################
# Haushaltsgeraete
#print("Haushalt:",opti_param["haushaltsgeraete"])
if opti_param["haushaltsgeraete"]>0:
def create_individual():
attrs = [self.toolbox.attr_bool() for _ in range(self.prediction_hours)] # 24 Bool-Werte für Entladen
attrs += [self.toolbox.attr_float() for _ in range(self.prediction_hours)] # 24 Float-Werte für Laden
attrs.append(self.toolbox.attr_int()) # Haushaltsgerät-Startzeit
return creator.Individual(attrs)
else:
def create_individual():
attrs = [self.toolbox.attr_bool() for _ in range(self.prediction_hours)] # 24 Bool-Werte für Entladen
attrs += [self.toolbox.attr_float() for _ in range(self.prediction_hours)] # 24 Float-Werte für Laden
return creator.Individual(attrs)
self.toolbox.register("individual", create_individual)#tools.initCycle, creator.Individual, (self.toolbox.attr_bool,self.toolbox.attr_bool), n=self.prediction_hours+1)
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
self.toolbox.register("mate", tools.cxTwoPoint)
self.toolbox.register("mutate", tools.mutFlipBit, indpb=0.1)
#self.toolbox.register("mutate", mutate_choice, self.possible_charge_values, indpb=0.1)
#self.toolbox.register("mutate", tools.mutUniformInt, low=0, up=len(self.possible_charge_values)-1, indpb=0.1)
self.toolbox.register("select", tools.selTournament, tournsize=3)
def evaluate_inner(self,individual, ems,start_hour):
ems.reset()
#print("Spuel:",self.opti_param)
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = self.split_individual(individual)
# Haushaltsgeraete
if self.opti_param["haushaltsgeraete"]>0:
ems.set_haushaltsgeraet_start(spuelstart_int,global_start_hour=start_hour)
#discharge_hours_bin = np.full(self.prediction_hours,0)
ems.set_akku_discharge_hours(discharge_hours_bin)
# Setze die festen Werte für die letzten x Stunden
for i in range(self.prediction_hours - self.fixed_eauto_hours, self.prediction_hours):
eautocharge_hours_float[i] = 0.0 # Setze die letzten x Stunden auf einen festen Wert (oder vorgegebenen Wert)
#print(eautocharge_hours_float)
ems.set_eauto_charge_hours(eautocharge_hours_float)
o = ems.simuliere(start_hour)
return o
# Fitness-Funktion (muss Ihre EnergieManagementSystem-Logik integrieren)
def evaluate(self,individual,ems,parameter,start_hour,worst_case):
try:
o = self.evaluate_inner(individual,ems,start_hour)
except:
return (100000.0,)
gesamtbilanz = o["Gesamtbilanz_Euro"]
if worst_case:
gesamtbilanz = gesamtbilanz * -1.0
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = self.split_individual(individual)
max_ladeleistung = np.max(moegliche_ladestroeme_in_prozent)
strafe_überschreitung = 0.0
# Ladeleistung überschritten?
for ladeleistung in eautocharge_hours_float:
if ladeleistung > max_ladeleistung:
# Berechne die Überschreitung
überschreitung = ladeleistung - max_ladeleistung
# Füge eine Strafe hinzu (z.B. 10 Einheiten Strafe pro Prozentpunkt Überschreitung)
strafe_überschreitung += self.strafe * 10 # Hier ist die Strafe proportional zur Überschreitung
# Für jeden Discharge 0, eine kleine Strafe von 1 Cent, da die Lastvertelung noch fehlt. Also wenn es egal ist, soll er den Akku entladen lassen
for i in range(0, self.prediction_hours):
if discharge_hours_bin[i] == 0.0: # Wenn die letzten x Stunden von einem festen Wert abweichen
gesamtbilanz += 0.01 # Bestrafe den Optimierer
# E-Auto nur die ersten self.fixed_eauto_hours
for i in range(self.prediction_hours - self.fixed_eauto_hours, self.prediction_hours):
if eautocharge_hours_float[i] != 0.0: # Wenn die letzten x Stunden von einem festen Wert abweichen
gesamtbilanz += self.strafe # Bestrafe den Optimierer
# Überprüfung, ob der Mindest-SoC erreicht wird
final_soc = ems.eauto.ladezustand_in_prozent() # Nimmt den SoC am Ende des Optimierungszeitraums
if (parameter['eauto_min_soc']-ems.eauto.ladezustand_in_prozent()) <= 0.0:
#print (parameter['eauto_min_soc']," " ,ems.eauto.ladezustand_in_prozent()," ",(parameter['eauto_min_soc']-ems.eauto.ladezustand_in_prozent()))
for i in range(0, self.prediction_hours):
if eautocharge_hours_float[i] != 0.0: # Wenn die letzten x Stunden von einem festen Wert abweichen
gesamtbilanz += self.strafe # Bestrafe den Optimierer
eauto_roi = (parameter['eauto_min_soc']-ems.eauto.ladezustand_in_prozent())
individual.extra_data = (o["Gesamtbilanz_Euro"],o["Gesamt_Verluste"], eauto_roi )
restenergie_akku = ems.akku.aktueller_energieinhalt()
restwert_akku = restenergie_akku*parameter["preis_euro_pro_wh_akku"]
# print(restenergie_akku)
# print(parameter["preis_euro_pro_wh_akku"])
# print(restwert_akku)
# print()
strafe = 0.0
strafe = max(0,(parameter['eauto_min_soc']-ems.eauto.ladezustand_in_prozent()) * self.strafe )
gesamtbilanz += strafe - restwert_akku + strafe_überschreitung
#gesamtbilanz += o["Gesamt_Verluste"]/10000.0
return (gesamtbilanz,)
# Genetischer Algorithmus
def optimize(self,start_solution=None):
population = self.toolbox.population(n=300)
hof = tools.HallOfFame(1)
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("min", np.min)
stats.register("max", np.max)
print("Start:",start_solution)
if start_solution is not None and start_solution != -1:
population.insert(0, creator.Individual(start_solution))
population.insert(1, creator.Individual(start_solution))
population.insert(2, creator.Individual(start_solution))
algorithms.eaMuPlusLambda(population, self.toolbox, mu=100, lambda_=200, cxpb=0.5, mutpb=0.3, ngen=400, stats=stats, halloffame=hof, verbose=True)
#algorithms.eaSimple(population, self.toolbox, cxpb=0.3, mutpb=0.3, ngen=200, stats=stats, halloffame=hof, verbose=True)
#algorithms.eaMuCommaLambda(population, self.toolbox, mu=100, lambda_=200, cxpb=0.2, mutpb=0.4, ngen=300, stats=stats, halloffame=hof, verbose=True)
#population, log = differential_evolution(population, self.toolbox, cxpb=0.2, mutpb=0.5, ngen=200, stats=stats, halloffame=hof, verbose=True)
member = {"bilanz":[],"verluste":[],"nebenbedingung":[]}
for ind in population:
if hasattr(ind, 'extra_data'):
extra_value1, extra_value2,extra_value3 = ind.extra_data
member["bilanz"].append(extra_value1)
member["verluste"].append(extra_value2)
member["nebenbedingung"].append(extra_value3)
return hof[0], member
def optimierung_ems(self,parameter=None, start_hour=None,worst_case=False, startdate=None):
############
# Parameter
############
if startdate == None:
date = (datetime.now().date() + timedelta(hours = self.prediction_hours)).strftime("%Y-%m-%d")
date_now = datetime.now().strftime("%Y-%m-%d")
else:
date = (startdate + timedelta(hours = self.prediction_hours)).strftime("%Y-%m-%d")
date_now = startdate.strftime("%Y-%m-%d")
#print("Start_date:",date_now)
akku_size = parameter['pv_akku_cap'] # Wh
einspeiseverguetung_euro_pro_wh = np.full(self.prediction_hours, parameter["einspeiseverguetung_euro_pro_wh"]) #= # € / Wh 7/(1000.0*100.0)
discharge_array = np.full(self.prediction_hours,1) #np.array([1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]) #
akku = PVAkku(kapazitaet_wh=akku_size,hours=self.prediction_hours,start_soc_prozent=parameter["pv_soc"], max_ladeleistung_w=5000)
akku.set_charge_per_hour(discharge_array)
laden_moeglich = np.full(self.prediction_hours,1) # np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0])
eauto = PVAkku(kapazitaet_wh=parameter["eauto_cap"], hours=self.prediction_hours, lade_effizienz=parameter["eauto_charge_efficiency"], entlade_effizienz=1.0, max_ladeleistung_w=parameter["eauto_charge_power"] ,start_soc_prozent=parameter["eauto_soc"])
eauto.set_charge_per_hour(laden_moeglich)
min_soc_eauto = parameter['eauto_min_soc']
start_params = parameter['start_solution']
###############
# spuelmaschine
##############
print(parameter)
if parameter["haushaltsgeraet_dauer"] >0:
spuelmaschine = Haushaltsgeraet(hours=self.prediction_hours, verbrauch_kwh=parameter["haushaltsgeraet_wh"], dauer_h=parameter["haushaltsgeraet_dauer"])
spuelmaschine.set_startzeitpunkt(start_hour) # Startet jetzt
else:
spuelmaschine = None
###############
# PV Forecast
###############
#PVforecast = PVForecast(filepath=os.path.join(r'test_data', r'pvprognose.json'))
# PVforecast = PVForecast(prediction_hours = self.prediction_hours, url=pv_forecast_url)
# #print("PVPOWER",parameter['pvpowernow'])
# if isfloat(parameter['pvpowernow']):
# PVforecast.update_ac_power_measurement(date_time=datetime.now(), ac_power_measurement=float(parameter['pvpowernow']))
# #PVforecast.print_ac_power_and_measurement()
pv_forecast = parameter['pv_forecast'] #PVforecast.get_pv_forecast_for_date_range(date_now,date) #get_forecast_for_date(date)
temperature_forecast = parameter['temperature_forecast'] #PVforecast.get_temperature_for_date_range(date_now,date)
###############
# Strompreise
###############
specific_date_prices = parameter["strompreis_euro_pro_wh"]
print(specific_date_prices)
#print("https://api.akkudoktor.net/prices?start="+date_now+"&end="+date)
wr = Wechselrichter(10000, akku)
ems = EnergieManagementSystem(gesamtlast = parameter["gesamtlast"], pv_prognose_wh=pv_forecast, strompreis_euro_pro_wh=specific_date_prices, einspeiseverguetung_euro_pro_wh=einspeiseverguetung_euro_pro_wh, eauto=eauto, haushaltsgeraet=spuelmaschine,wechselrichter=wr)
o = ems.simuliere(start_hour)
###############
# Optimizer Init
##############
opti_param = {}
opti_param["haushaltsgeraete"] = 0
if spuelmaschine != None:
opti_param["haushaltsgeraete"] = 1
self.setup_deap_environment(opti_param, start_hour)
def evaluate_wrapper(individual):
return self.evaluate(individual, ems, parameter,start_hour,worst_case)
self.toolbox.register("evaluate", evaluate_wrapper)
start_solution, extra_data = self.optimize(start_params)
best_solution = start_solution
o = self.evaluate_inner(best_solution, ems,start_hour)
eauto = ems.eauto.to_dict()
spuelstart_int = None
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = self.split_individual(best_solution)
print(parameter)
print(best_solution)
visualisiere_ergebnisse(parameter["gesamtlast"], pv_forecast, specific_date_prices, o,discharge_hours_bin,eautocharge_hours_float , temperature_forecast, start_hour, self.prediction_hours,einspeiseverguetung_euro_pro_wh,extra_data=extra_data)
os.system("cp visualisierungsergebnisse.pdf ~/")
# 'Eigenverbrauch_Wh_pro_Stunde': eigenverbrauch_wh_pro_stunde,
# 'Netzeinspeisung_Wh_pro_Stunde': netzeinspeisung_wh_pro_stunde,
# 'Netzbezug_Wh_pro_Stunde': netzbezug_wh_pro_stunde,
# 'Kosten_Euro_pro_Stunde': kosten_euro_pro_stunde,
# 'akku_soc_pro_stunde': akku_soc_pro_stunde,
# 'Einnahmen_Euro_pro_Stunde': einnahmen_euro_pro_stunde,
# 'Gesamtbilanz_Euro': gesamtkosten_euro,
# 'E-Auto_SoC_pro_Stunde':eauto_soc_pro_stunde,
# 'Gesamteinnahmen_Euro': sum(einnahmen_euro_pro_stunde),
# 'Gesamtkosten_Euro': sum(kosten_euro_pro_stunde),
# "Verluste_Pro_Stunde":verluste_wh_pro_stunde,
# "Gesamt_Verluste":sum(verluste_wh_pro_stunde),
# "Haushaltsgeraet_wh_pro_stunde":haushaltsgeraet_wh_pro_stunde
#print(eauto)
return {"discharge_hours_bin":discharge_hours_bin, "eautocharge_hours_float":eautocharge_hours_float ,"result":o ,"eauto_obj":eauto,"start_solution":best_solution,"spuelstart":spuelstart_int,"simulation_data":o}