mirror of
https://github.com/Akkudoktor-EOS/EOS.git
synced 2025-08-25 06:52:23 +00:00
Move Python package files to new package directories
Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
This commit is contained in:
0
src/akkudoktoreos/__init__.py
Normal file
0
src/akkudoktoreos/__init__.py
Normal file
240
src/akkudoktoreos/class_akku.py
Normal file
240
src/akkudoktoreos/class_akku.py
Normal file
@@ -0,0 +1,240 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
class PVAkku:
|
||||
def __init__(
|
||||
self,
|
||||
kapazitaet_wh=None,
|
||||
hours=None,
|
||||
lade_effizienz=0.88,
|
||||
entlade_effizienz=0.88,
|
||||
max_ladeleistung_w=None,
|
||||
start_soc_prozent=0,
|
||||
min_soc_prozent=0,
|
||||
max_soc_prozent=100,
|
||||
):
|
||||
# Battery capacity in Wh
|
||||
self.kapazitaet_wh = kapazitaet_wh
|
||||
# Initial state of charge in Wh
|
||||
self.start_soc_prozent = start_soc_prozent
|
||||
self.soc_wh = (start_soc_prozent / 100) * kapazitaet_wh
|
||||
self.hours = (
|
||||
hours if hours is not None else 24
|
||||
) # Default to 24 hours if not specified
|
||||
self.discharge_array = np.full(self.hours, 1)
|
||||
self.charge_array = np.full(self.hours, 1)
|
||||
# Charge and discharge efficiency
|
||||
self.lade_effizienz = lade_effizienz
|
||||
self.entlade_effizienz = entlade_effizienz
|
||||
self.max_ladeleistung_w = (
|
||||
max_ladeleistung_w if max_ladeleistung_w else self.kapazitaet_wh
|
||||
)
|
||||
self.min_soc_prozent = min_soc_prozent
|
||||
self.max_soc_prozent = max_soc_prozent
|
||||
# Calculate min and max SoC in Wh
|
||||
self.min_soc_wh = (self.min_soc_prozent / 100) * self.kapazitaet_wh
|
||||
self.max_soc_wh = (self.max_soc_prozent / 100) * self.kapazitaet_wh
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"kapazitaet_wh": self.kapazitaet_wh,
|
||||
"start_soc_prozent": self.start_soc_prozent,
|
||||
"soc_wh": self.soc_wh,
|
||||
"hours": self.hours,
|
||||
"discharge_array": self.discharge_array.tolist(), # Convert np.array to list
|
||||
"charge_array": self.charge_array.tolist(),
|
||||
"lade_effizienz": self.lade_effizienz,
|
||||
"entlade_effizienz": self.entlade_effizienz,
|
||||
"max_ladeleistung_w": self.max_ladeleistung_w,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
# Create a new object with basic data
|
||||
obj = cls(
|
||||
kapazitaet_wh=data["kapazitaet_wh"],
|
||||
hours=data["hours"],
|
||||
lade_effizienz=data["lade_effizienz"],
|
||||
entlade_effizienz=data["entlade_effizienz"],
|
||||
max_ladeleistung_w=data["max_ladeleistung_w"],
|
||||
start_soc_prozent=data["start_soc_prozent"],
|
||||
)
|
||||
# Set arrays
|
||||
obj.discharge_array = np.array(data["discharge_array"])
|
||||
obj.charge_array = np.array(data["charge_array"])
|
||||
obj.soc_wh = data[
|
||||
"soc_wh"
|
||||
] # Set current state of charge, which may differ from start_soc_prozent
|
||||
|
||||
return obj
|
||||
|
||||
def reset(self):
|
||||
self.soc_wh = (self.start_soc_prozent / 100) * self.kapazitaet_wh
|
||||
# Ensure soc_wh is within min and max limits
|
||||
self.soc_wh = min(max(self.soc_wh, self.min_soc_wh), self.max_soc_wh)
|
||||
|
||||
self.discharge_array = np.full(self.hours, 1)
|
||||
self.charge_array = np.full(self.hours, 1)
|
||||
|
||||
def set_discharge_per_hour(self, discharge_array):
|
||||
assert len(discharge_array) == self.hours
|
||||
self.discharge_array = np.array(discharge_array)
|
||||
|
||||
def set_charge_per_hour(self, charge_array):
|
||||
assert len(charge_array) == self.hours
|
||||
self.charge_array = np.array(charge_array)
|
||||
|
||||
def ladezustand_in_prozent(self):
|
||||
return (self.soc_wh / self.kapazitaet_wh) * 100
|
||||
|
||||
def energie_abgeben(self, wh, hour):
|
||||
if self.discharge_array[hour] == 0:
|
||||
return 0.0, 0.0 # No energy discharge and no losses
|
||||
|
||||
# Calculate the maximum energy that can be discharged considering min_soc and efficiency
|
||||
max_possible_discharge_wh = (
|
||||
self.soc_wh - self.min_soc_wh
|
||||
) * self.entlade_effizienz
|
||||
max_possible_discharge_wh = max(
|
||||
max_possible_discharge_wh, 0.0
|
||||
) # Ensure non-negative
|
||||
|
||||
# Consider the maximum discharge power of the battery
|
||||
max_abgebbar_wh = min(max_possible_discharge_wh, self.max_ladeleistung_w)
|
||||
|
||||
# The actually discharged energy cannot exceed requested energy or maximum discharge
|
||||
tatsaechlich_abgegeben_wh = min(wh, max_abgebbar_wh)
|
||||
|
||||
# Calculate the actual amount withdrawn from the battery (before efficiency loss)
|
||||
if self.entlade_effizienz > 0:
|
||||
tatsaechliche_entnahme_wh = (
|
||||
tatsaechlich_abgegeben_wh / self.entlade_effizienz
|
||||
)
|
||||
else:
|
||||
tatsaechliche_entnahme_wh = 0.0
|
||||
|
||||
# Update the state of charge considering the actual withdrawal
|
||||
self.soc_wh -= tatsaechliche_entnahme_wh
|
||||
# Ensure soc_wh does not go below min_soc_wh
|
||||
self.soc_wh = max(self.soc_wh, self.min_soc_wh)
|
||||
|
||||
# Calculate losses due to efficiency
|
||||
verluste_wh = tatsaechliche_entnahme_wh - tatsaechlich_abgegeben_wh
|
||||
|
||||
# Return the actually discharged energy and the losses
|
||||
return tatsaechlich_abgegeben_wh, verluste_wh
|
||||
|
||||
def energie_laden(self, wh, hour):
|
||||
if hour is not None and self.charge_array[hour] == 0:
|
||||
return 0, 0 # Charging not allowed in this hour
|
||||
|
||||
# If no value for wh is given, use the maximum charging power
|
||||
wh = wh if wh is not None else self.max_ladeleistung_w
|
||||
|
||||
# Relative to the maximum charging power (between 0 and 1)
|
||||
relative_ladeleistung = self.charge_array[hour]
|
||||
effektive_ladeleistung = relative_ladeleistung * self.max_ladeleistung_w
|
||||
|
||||
# Calculate the maximum energy that can be charged considering max_soc and efficiency
|
||||
if self.lade_effizienz > 0:
|
||||
max_possible_charge_wh = (
|
||||
self.max_soc_wh - self.soc_wh
|
||||
) / self.lade_effizienz
|
||||
else:
|
||||
max_possible_charge_wh = 0.0
|
||||
max_possible_charge_wh = max(max_possible_charge_wh, 0.0) # Ensure non-negative
|
||||
|
||||
# The actually charged energy cannot exceed requested energy, charging power, or maximum possible charge
|
||||
effektive_lademenge = min(wh, effektive_ladeleistung, max_possible_charge_wh)
|
||||
|
||||
# Energy actually stored in the battery
|
||||
geladene_menge = effektive_lademenge * self.lade_effizienz
|
||||
|
||||
# Update soc_wh
|
||||
self.soc_wh += geladene_menge
|
||||
# Ensure soc_wh does not exceed max_soc_wh
|
||||
self.soc_wh = min(self.soc_wh, self.max_soc_wh)
|
||||
|
||||
# Calculate losses
|
||||
verluste_wh = effektive_lademenge - geladene_menge
|
||||
|
||||
return geladene_menge, verluste_wh
|
||||
|
||||
def aktueller_energieinhalt(self):
|
||||
"""
|
||||
This method returns the current remaining energy considering efficiency.
|
||||
It accounts for both charging and discharging efficiency.
|
||||
"""
|
||||
# Calculate remaining energy considering discharge efficiency
|
||||
nutzbare_energie = (self.soc_wh - self.min_soc_wh) * self.entlade_effizienz
|
||||
return max(nutzbare_energie, 0.0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Test battery discharge below min_soc
|
||||
print("Test: Discharge below min_soc")
|
||||
akku = PVAkku(
|
||||
kapazitaet_wh=10000,
|
||||
hours=1,
|
||||
start_soc_prozent=50,
|
||||
min_soc_prozent=20,
|
||||
max_soc_prozent=80,
|
||||
)
|
||||
akku.reset()
|
||||
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")
|
||||
|
||||
# Try to discharge 5000 Wh
|
||||
abgegeben_wh, verlust_wh = akku.energie_abgeben(5000, 0)
|
||||
print(f"Energy discharged: {abgegeben_wh} Wh, Losses: {verlust_wh} Wh")
|
||||
print(f"SoC after discharge: {akku.ladezustand_in_prozent()}%")
|
||||
print(f"Expected min SoC: {akku.min_soc_prozent}%")
|
||||
|
||||
# Test battery charge above max_soc
|
||||
print("\nTest: Charge above max_soc")
|
||||
akku = PVAkku(
|
||||
kapazitaet_wh=10000,
|
||||
hours=1,
|
||||
start_soc_prozent=50,
|
||||
min_soc_prozent=20,
|
||||
max_soc_prozent=80,
|
||||
)
|
||||
akku.reset()
|
||||
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")
|
||||
|
||||
# Try to charge 5000 Wh
|
||||
geladen_wh, verlust_wh = akku.energie_laden(5000, 0)
|
||||
print(f"Energy charged: {geladen_wh} Wh, Losses: {verlust_wh} Wh")
|
||||
print(f"SoC after charge: {akku.ladezustand_in_prozent()}%")
|
||||
print(f"Expected max SoC: {akku.max_soc_prozent}%")
|
||||
|
||||
# Test charging when battery is at max_soc
|
||||
print("\nTest: Charging when at max_soc")
|
||||
akku = PVAkku(
|
||||
kapazitaet_wh=10000,
|
||||
hours=1,
|
||||
start_soc_prozent=80,
|
||||
min_soc_prozent=20,
|
||||
max_soc_prozent=80,
|
||||
)
|
||||
akku.reset()
|
||||
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")
|
||||
|
||||
geladen_wh, verlust_wh = akku.energie_laden(5000, 0)
|
||||
print(f"Energy charged: {geladen_wh} Wh, Losses: {verlust_wh} Wh")
|
||||
print(f"SoC after charge: {akku.ladezustand_in_prozent()}%")
|
||||
|
||||
# Test discharging when battery is at min_soc
|
||||
print("\nTest: Discharging when at min_soc")
|
||||
akku = PVAkku(
|
||||
kapazitaet_wh=10000,
|
||||
hours=1,
|
||||
start_soc_prozent=20,
|
||||
min_soc_prozent=20,
|
||||
max_soc_prozent=80,
|
||||
)
|
||||
akku.reset()
|
||||
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")
|
||||
|
||||
abgegeben_wh, verlust_wh = akku.energie_abgeben(5000, 0)
|
||||
print(f"Energy discharged: {abgegeben_wh} Wh, Losses: {verlust_wh} Wh")
|
||||
print(f"SoC after discharge: {akku.ladezustand_in_prozent()}%")
|
159
src/akkudoktoreos/class_ems.py
Normal file
159
src/akkudoktoreos/class_ems.py
Normal file
@@ -0,0 +1,159 @@
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def replace_nan_with_none(
|
||||
data: Union[np.ndarray, dict, list, float],
|
||||
) -> Union[List, dict, float, None]:
|
||||
if data is None:
|
||||
return None
|
||||
if isinstance(data, np.ndarray):
|
||||
# Use numpy vectorized approach
|
||||
return np.where(np.isnan(data), None, data).tolist()
|
||||
elif isinstance(data, dict):
|
||||
return {key: replace_nan_with_none(value) for key, value in data.items()}
|
||||
elif isinstance(data, list):
|
||||
return [replace_nan_with_none(element) for element in data]
|
||||
elif isinstance(data, (float, np.floating)) and np.isnan(data):
|
||||
return None
|
||||
else:
|
||||
return data
|
||||
|
||||
|
||||
class EnergieManagementSystem:
|
||||
def __init__(
|
||||
self,
|
||||
pv_prognose_wh: Optional[np.ndarray] = None,
|
||||
strompreis_euro_pro_wh: Optional[np.ndarray] = None,
|
||||
einspeiseverguetung_euro_pro_wh: Optional[np.ndarray] = None,
|
||||
eauto: Optional[object] = None,
|
||||
gesamtlast: Optional[np.ndarray] = None,
|
||||
haushaltsgeraet: Optional[object] = None,
|
||||
wechselrichter: Optional[object] = None,
|
||||
):
|
||||
self.akku = wechselrichter.akku
|
||||
self.gesamtlast = gesamtlast
|
||||
self.pv_prognose_wh = pv_prognose_wh
|
||||
self.strompreis_euro_pro_wh = strompreis_euro_pro_wh
|
||||
self.einspeiseverguetung_euro_pro_wh = einspeiseverguetung_euro_pro_wh
|
||||
self.eauto = eauto
|
||||
self.haushaltsgeraet = haushaltsgeraet
|
||||
self.wechselrichter = wechselrichter
|
||||
|
||||
def set_akku_discharge_hours(self, ds: List[int]) -> None:
|
||||
self.akku.set_discharge_per_hour(ds)
|
||||
|
||||
def set_eauto_charge_hours(self, ds: List[int]) -> None:
|
||||
self.eauto.set_charge_per_hour(ds)
|
||||
|
||||
def set_haushaltsgeraet_start(
|
||||
self, ds: List[int], global_start_hour: int = 0
|
||||
) -> None:
|
||||
self.haushaltsgeraet.set_startzeitpunkt(ds, global_start_hour=global_start_hour)
|
||||
|
||||
def reset(self) -> None:
|
||||
self.eauto.reset()
|
||||
self.akku.reset()
|
||||
|
||||
def simuliere_ab_jetzt(self) -> dict:
|
||||
jetzt = datetime.now()
|
||||
start_stunde = jetzt.hour
|
||||
return self.simuliere(start_stunde)
|
||||
|
||||
def simuliere(self, start_stunde: int) -> dict:
|
||||
# Ensure arrays have the same length
|
||||
lastkurve_wh = self.gesamtlast
|
||||
assert (
|
||||
len(lastkurve_wh)
|
||||
== len(self.pv_prognose_wh)
|
||||
== len(self.strompreis_euro_pro_wh)
|
||||
), f"Array sizes do not match: Load Curve = {len(lastkurve_wh)}, PV Forecast = {len(self.pv_prognose_wh)}, Electricity Price = {len(self.strompreis_euro_pro_wh)}"
|
||||
|
||||
# Optimized total hours calculation
|
||||
ende = len(lastkurve_wh)
|
||||
total_hours = ende - start_stunde
|
||||
|
||||
# Pre-allocate arrays for the results, optimized for speed
|
||||
last_wh_pro_stunde = np.zeros(total_hours)
|
||||
netzeinspeisung_wh_pro_stunde = np.zeros(total_hours)
|
||||
netzbezug_wh_pro_stunde = np.zeros(total_hours)
|
||||
kosten_euro_pro_stunde = np.zeros(total_hours)
|
||||
einnahmen_euro_pro_stunde = np.zeros(total_hours)
|
||||
akku_soc_pro_stunde = np.zeros(total_hours)
|
||||
eauto_soc_pro_stunde = np.zeros(total_hours)
|
||||
verluste_wh_pro_stunde = np.zeros(total_hours)
|
||||
haushaltsgeraet_wh_pro_stunde = np.zeros(total_hours)
|
||||
|
||||
# Set initial state
|
||||
akku_soc_pro_stunde[0] = self.akku.ladezustand_in_prozent()
|
||||
if self.eauto:
|
||||
eauto_soc_pro_stunde[0] = self.eauto.ladezustand_in_prozent()
|
||||
|
||||
for stunde in range(start_stunde + 1, ende):
|
||||
stunde_since_now = stunde - start_stunde
|
||||
|
||||
# Accumulate loads and PV generation
|
||||
verbrauch = self.gesamtlast[stunde]
|
||||
|
||||
if self.haushaltsgeraet:
|
||||
ha_load = self.haushaltsgeraet.get_last_fuer_stunde(stunde)
|
||||
verbrauch += ha_load
|
||||
haushaltsgeraet_wh_pro_stunde[stunde_since_now] = ha_load
|
||||
|
||||
# E-Auto handling
|
||||
if self.eauto:
|
||||
geladene_menge_eauto, verluste_eauto = self.eauto.energie_laden(
|
||||
None, stunde
|
||||
)
|
||||
verbrauch += geladene_menge_eauto
|
||||
verluste_wh_pro_stunde[stunde_since_now] += verluste_eauto
|
||||
eauto_soc_pro_stunde[stunde_since_now] = (
|
||||
self.eauto.ladezustand_in_prozent()
|
||||
)
|
||||
|
||||
# Process inverter logic
|
||||
erzeugung = self.pv_prognose_wh[stunde]
|
||||
netzeinspeisung, netzbezug, verluste, eigenverbrauch = (
|
||||
self.wechselrichter.energie_verarbeiten(erzeugung, verbrauch, stunde)
|
||||
)
|
||||
netzeinspeisung_wh_pro_stunde[stunde_since_now] = netzeinspeisung
|
||||
netzbezug_wh_pro_stunde[stunde_since_now] = netzbezug
|
||||
verluste_wh_pro_stunde[stunde_since_now] += verluste
|
||||
last_wh_pro_stunde[stunde_since_now] = verbrauch
|
||||
|
||||
# Financial calculations
|
||||
kosten_euro_pro_stunde[stunde_since_now] = (
|
||||
netzbezug * self.strompreis_euro_pro_wh[stunde]
|
||||
)
|
||||
einnahmen_euro_pro_stunde[stunde_since_now] = (
|
||||
netzeinspeisung * self.einspeiseverguetung_euro_pro_wh[stunde]
|
||||
)
|
||||
|
||||
# Akku SOC tracking
|
||||
akku_soc_pro_stunde[stunde_since_now] = self.akku.ladezustand_in_prozent()
|
||||
|
||||
# Total cost and return
|
||||
gesamtkosten_euro = np.sum(kosten_euro_pro_stunde) - np.sum(
|
||||
einnahmen_euro_pro_stunde
|
||||
)
|
||||
|
||||
# Prepare output dictionary
|
||||
out: Dict[str, Union[np.ndarray, float]] = {
|
||||
"Last_Wh_pro_Stunde": last_wh_pro_stunde,
|
||||
"Netzeinspeisung_Wh_pro_Stunde": netzeinspeisung_wh_pro_stunde,
|
||||
"Netzbezug_Wh_pro_Stunde": netzbezug_wh_pro_stunde,
|
||||
"Kosten_Euro_pro_Stunde": kosten_euro_pro_stunde,
|
||||
"akku_soc_pro_stunde": akku_soc_pro_stunde,
|
||||
"Einnahmen_Euro_pro_Stunde": einnahmen_euro_pro_stunde,
|
||||
"Gesamtbilanz_Euro": gesamtkosten_euro,
|
||||
"E-Auto_SoC_pro_Stunde": eauto_soc_pro_stunde,
|
||||
"Gesamteinnahmen_Euro": np.sum(einnahmen_euro_pro_stunde),
|
||||
"Gesamtkosten_Euro": np.sum(kosten_euro_pro_stunde),
|
||||
"Verluste_Pro_Stunde": verluste_wh_pro_stunde,
|
||||
"Gesamt_Verluste": np.sum(verluste_wh_pro_stunde),
|
||||
"Haushaltsgeraet_wh_pro_stunde": haushaltsgeraet_wh_pro_stunde,
|
||||
}
|
||||
|
||||
return replace_nan_with_none(out)
|
59
src/akkudoktoreos/class_haushaltsgeraet.py
Normal file
59
src/akkudoktoreos/class_haushaltsgeraet.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
class Haushaltsgeraet:
|
||||
def __init__(self, hours=None, verbrauch_wh=None, dauer_h=None):
|
||||
self.hours = hours # Total duration for which the planning is done
|
||||
self.verbrauch_wh = (
|
||||
verbrauch_wh # Total energy consumption of the device in kWh
|
||||
)
|
||||
self.dauer_h = dauer_h # Duration of use in hours
|
||||
self.lastkurve = np.zeros(self.hours) # Initialize the load curve with zeros
|
||||
|
||||
def set_startzeitpunkt(self, start_hour, global_start_hour=0):
|
||||
"""
|
||||
Sets the start time of the device and generates the corresponding load curve.
|
||||
:param start_hour: The hour at which the device should start.
|
||||
"""
|
||||
self.reset()
|
||||
|
||||
# Check if the duration of use is within the available time frame
|
||||
if start_hour + self.dauer_h > self.hours:
|
||||
raise ValueError("The duration of use exceeds the available time frame.")
|
||||
if start_hour < global_start_hour:
|
||||
raise ValueError("The start time is earlier than the available time frame.")
|
||||
|
||||
# Calculate power per hour based on total consumption and duration
|
||||
leistung_pro_stunde = self.verbrauch_wh / self.dauer_h # Convert to watt-hours
|
||||
|
||||
# Set the power for the duration of use in the load curve array
|
||||
self.lastkurve[start_hour : start_hour + self.dauer_h] = leistung_pro_stunde
|
||||
|
||||
def reset(self):
|
||||
"""
|
||||
Resets the load curve.
|
||||
"""
|
||||
self.lastkurve = np.zeros(self.hours)
|
||||
|
||||
def get_lastkurve(self):
|
||||
"""
|
||||
Returns the current load curve.
|
||||
"""
|
||||
return self.lastkurve
|
||||
|
||||
def get_last_fuer_stunde(self, hour):
|
||||
"""
|
||||
Returns the load for a specific hour.
|
||||
:param hour: The hour for which the load is queried.
|
||||
:return: The load in watts for the specified hour.
|
||||
"""
|
||||
if hour < 0 or hour >= self.hours:
|
||||
raise ValueError("The specified hour is outside the available time frame.")
|
||||
|
||||
return self.lastkurve[hour]
|
||||
|
||||
def spaetestmoeglicher_startzeitpunkt(self):
|
||||
"""
|
||||
Returns the latest possible start time at which the device can still run completely.
|
||||
"""
|
||||
return self.hours - self.dauer_h
|
70
src/akkudoktoreos/class_inverter.py
Normal file
70
src/akkudoktoreos/class_inverter.py
Normal file
@@ -0,0 +1,70 @@
|
||||
class Wechselrichter:
|
||||
def __init__(self, max_leistung_wh, akku):
|
||||
self.max_leistung_wh = (
|
||||
max_leistung_wh # Maximum power that the inverter can handle
|
||||
)
|
||||
self.akku = akku # Connection to a battery object
|
||||
|
||||
def energie_verarbeiten(self, erzeugung, verbrauch, hour):
|
||||
verluste = 0 # Losses during processing
|
||||
netzeinspeisung = 0 # Grid feed-in
|
||||
netzbezug = 0.0 # Grid draw
|
||||
eigenverbrauch = 0.0 # Self-consumption
|
||||
|
||||
if erzeugung >= verbrauch:
|
||||
if verbrauch > self.max_leistung_wh:
|
||||
# If consumption exceeds maximum inverter power
|
||||
verluste += erzeugung - self.max_leistung_wh
|
||||
restleistung_nach_verbrauch = self.max_leistung_wh - verbrauch
|
||||
netzbezug = (
|
||||
-restleistung_nach_verbrauch
|
||||
) # Negative indicates feeding into the grid
|
||||
eigenverbrauch = self.max_leistung_wh
|
||||
else:
|
||||
# Remaining power after consumption
|
||||
restleistung_nach_verbrauch = erzeugung - verbrauch
|
||||
|
||||
# Load battery with excess energy
|
||||
geladene_energie, verluste_laden_akku = self.akku.energie_laden(
|
||||
restleistung_nach_verbrauch, hour
|
||||
)
|
||||
rest_überschuss = restleistung_nach_verbrauch - (
|
||||
geladene_energie + verluste_laden_akku
|
||||
)
|
||||
|
||||
# Feed-in to the grid based on remaining capacity
|
||||
if rest_überschuss > self.max_leistung_wh - verbrauch:
|
||||
netzeinspeisung = self.max_leistung_wh - verbrauch
|
||||
verluste += rest_überschuss - netzeinspeisung
|
||||
else:
|
||||
netzeinspeisung = rest_überschuss
|
||||
|
||||
verluste += verluste_laden_akku
|
||||
eigenverbrauch = verbrauch # Self-consumption is equal to the load
|
||||
|
||||
else:
|
||||
benötigte_energie = (
|
||||
verbrauch - erzeugung
|
||||
) # Energy needed from external sources
|
||||
max_akku_leistung = (
|
||||
self.akku.max_ladeleistung_w
|
||||
) # Maximum battery discharge power
|
||||
|
||||
# Calculate remaining AC power available
|
||||
rest_ac_leistung = max(self.max_leistung_wh - erzeugung, 0)
|
||||
|
||||
# Discharge energy from the battery based on need
|
||||
if benötigte_energie < rest_ac_leistung:
|
||||
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(
|
||||
benötigte_energie, hour
|
||||
)
|
||||
else:
|
||||
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(
|
||||
rest_ac_leistung, hour
|
||||
)
|
||||
|
||||
verluste += akku_entladeverluste # Include losses from battery discharge
|
||||
netzbezug = benötigte_energie - aus_akku # Energy drawn from the grid
|
||||
eigenverbrauch = erzeugung + aus_akku # Total self-consumption
|
||||
|
||||
return netzeinspeisung, netzbezug, verluste, eigenverbrauch
|
110
src/akkudoktoreos/class_load.py
Normal file
110
src/akkudoktoreos/class_load.py
Normal file
@@ -0,0 +1,110 @@
|
||||
from datetime import datetime
|
||||
|
||||
import numpy as np
|
||||
|
||||
# Load the .npz file when the application starts
|
||||
|
||||
|
||||
class LoadForecast:
|
||||
def __init__(self, filepath=None, year_energy=None):
|
||||
self.filepath = filepath
|
||||
self.data = None
|
||||
self.data_year_energy = None
|
||||
self.year_energy = year_energy
|
||||
self.load_data()
|
||||
|
||||
def get_daily_stats(self, date_str):
|
||||
"""
|
||||
Returns the 24-hour profile with mean and standard deviation for a given date.
|
||||
|
||||
:param date_str: Date as a string in the format "YYYY-MM-DD"
|
||||
:return: An array with shape (2, 24), contains means and standard deviations
|
||||
"""
|
||||
# Convert the date string into a datetime object
|
||||
date = self._convert_to_datetime(date_str)
|
||||
|
||||
# Calculate the day of the year (1 to 365)
|
||||
day_of_year = date.timetuple().tm_yday
|
||||
|
||||
# Extract the 24-hour profile for the given date
|
||||
daily_stats = self.data_year_energy[
|
||||
day_of_year - 1
|
||||
] # -1 because indexing starts at 0
|
||||
return daily_stats
|
||||
|
||||
def get_hourly_stats(self, date_str, hour):
|
||||
"""
|
||||
Returns the mean and standard deviation for a specific hour of a given date.
|
||||
|
||||
:param date_str: Date as a string in the format "YYYY-MM-DD"
|
||||
:param hour: Specific hour (0 to 23)
|
||||
:return: An array with shape (2,), contains mean and standard deviation for the specified hour
|
||||
"""
|
||||
# Convert the date string into a datetime object
|
||||
date = self._convert_to_datetime(date_str)
|
||||
|
||||
# Calculate the day of the year (1 to 365)
|
||||
day_of_year = date.timetuple().tm_yday
|
||||
|
||||
# Extract mean and standard deviation for the given hour
|
||||
hourly_stats = self.data_year_energy[
|
||||
day_of_year - 1, :, hour
|
||||
] # Access the specific hour
|
||||
|
||||
return hourly_stats
|
||||
|
||||
def get_stats_for_date_range(self, start_date_str, end_date_str):
|
||||
"""
|
||||
Returns the means and standard deviations for a date range.
|
||||
|
||||
:param start_date_str: Start date as a string in the format "YYYY-MM-DD"
|
||||
:param end_date_str: End date as a string in the format "YYYY-MM-DD"
|
||||
:return: An array with aggregated data for the date range
|
||||
"""
|
||||
start_date = self._convert_to_datetime(start_date_str)
|
||||
end_date = self._convert_to_datetime(end_date_str)
|
||||
|
||||
start_day_of_year = start_date.timetuple().tm_yday
|
||||
end_day_of_year = end_date.timetuple().tm_yday
|
||||
|
||||
# Note that in leap years, the day of the year may need adjustment
|
||||
stats_for_range = self.data_year_energy[
|
||||
start_day_of_year:end_day_of_year
|
||||
] # -1 because indexing starts at 0
|
||||
stats_for_range = stats_for_range.swapaxes(1, 0)
|
||||
|
||||
stats_for_range = stats_for_range.reshape(stats_for_range.shape[0], -1)
|
||||
return stats_for_range
|
||||
|
||||
def load_data(self):
|
||||
"""Loads data from the specified file."""
|
||||
try:
|
||||
data = np.load(self.filepath)
|
||||
self.data = np.array(
|
||||
list(zip(data["yearly_profiles"], data["yearly_profiles_std"]))
|
||||
)
|
||||
self.data_year_energy = self.data * self.year_energy
|
||||
# pprint(self.data_year_energy)
|
||||
except FileNotFoundError:
|
||||
print(f"Error: File {self.filepath} not found.")
|
||||
except Exception as e:
|
||||
print(f"An error occurred while loading data: {e}")
|
||||
|
||||
def get_price_data(self):
|
||||
"""Returns price data (currently not implemented)."""
|
||||
return self.price_data
|
||||
|
||||
def _convert_to_datetime(self, date_str):
|
||||
"""Converts a date string to a datetime object."""
|
||||
return datetime.strptime(date_str, "%Y-%m-%d")
|
||||
|
||||
|
||||
# Example usage of the class
|
||||
if __name__ == "__main__":
|
||||
filepath = r"..\data\load_profiles.npz" # Adjust the path to the .npz file
|
||||
lf = LoadForecast(filepath=filepath, year_energy=2000)
|
||||
specific_date_prices = lf.get_daily_stats("2024-02-16") # Adjust date as needed
|
||||
specific_hour_stats = lf.get_hourly_stats(
|
||||
"2024-02-16", 12
|
||||
) # Adjust date and hour as needed
|
||||
print(specific_hour_stats)
|
41
src/akkudoktoreos/class_load_container.py
Normal file
41
src/akkudoktoreos/class_load_container.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
class Gesamtlast:
|
||||
def __init__(self, prediction_hours=24):
|
||||
self.lasten = {} # Contains names and load arrays for different sources
|
||||
self.prediction_hours = prediction_hours
|
||||
|
||||
def hinzufuegen(self, name, last_array):
|
||||
"""
|
||||
Adds an array of loads for a specific source.
|
||||
|
||||
:param name: Name of the load source (e.g., "Household", "Heat Pump")
|
||||
:param last_array: Array of loads, where each entry corresponds to an hour
|
||||
"""
|
||||
if len(last_array) != self.prediction_hours:
|
||||
raise ValueError(
|
||||
f"Total load inconsistent lengths in arrays: {name} {len(last_array)}"
|
||||
)
|
||||
self.lasten[name] = last_array
|
||||
|
||||
def gesamtlast_berechnen(self):
|
||||
"""
|
||||
Calculates the total load for each hour and returns an array of total loads.
|
||||
|
||||
:return: Array of total loads, where each entry corresponds to an hour
|
||||
"""
|
||||
if not self.lasten:
|
||||
return []
|
||||
|
||||
# Assumption: All load arrays have the same length
|
||||
stunden = len(next(iter(self.lasten.values())))
|
||||
gesamtlast_array = [0] * stunden
|
||||
|
||||
for last_array in self.lasten.values():
|
||||
gesamtlast_array = [
|
||||
gesamtlast + stundenlast
|
||||
for gesamtlast, stundenlast in zip(gesamtlast_array, last_array)
|
||||
]
|
||||
|
||||
return np.array(gesamtlast_array)
|
222
src/akkudoktoreos/class_load_corrector.py
Normal file
222
src/akkudoktoreos/class_load_corrector.py
Normal file
@@ -0,0 +1,222 @@
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from sklearn.metrics import mean_squared_error, r2_score
|
||||
|
||||
|
||||
class LoadPredictionAdjuster:
|
||||
def __init__(self, measured_data, predicted_data, load_forecast):
|
||||
self.measured_data = measured_data
|
||||
self.predicted_data = predicted_data
|
||||
self.load_forecast = load_forecast
|
||||
self.merged_data = self._merge_data()
|
||||
self.train_data = None
|
||||
self.test_data = None
|
||||
self.weekday_diff = None
|
||||
self.weekend_diff = None
|
||||
|
||||
def _remove_outliers(self, data, threshold=2):
|
||||
# Calculate the Z-Score of the 'Last' data
|
||||
data["Z-Score"] = np.abs(
|
||||
(data["Last"] - data["Last"].mean()) / data["Last"].std()
|
||||
)
|
||||
# Filter the data based on the threshold
|
||||
filtered_data = data[data["Z-Score"] < threshold]
|
||||
return filtered_data.drop(columns=["Z-Score"])
|
||||
|
||||
def _merge_data(self):
|
||||
# Convert the time column in both DataFrames to datetime
|
||||
self.predicted_data["time"] = pd.to_datetime(self.predicted_data["time"])
|
||||
self.measured_data["time"] = pd.to_datetime(self.measured_data["time"])
|
||||
|
||||
# Ensure both time columns have the same timezone
|
||||
if self.measured_data["time"].dt.tz is None:
|
||||
self.measured_data["time"] = self.measured_data["time"].dt.tz_localize(
|
||||
"UTC"
|
||||
)
|
||||
|
||||
self.predicted_data["time"] = (
|
||||
self.predicted_data["time"]
|
||||
.dt.tz_localize("UTC")
|
||||
.dt.tz_convert("Europe/Berlin")
|
||||
)
|
||||
self.measured_data["time"] = self.measured_data["time"].dt.tz_convert(
|
||||
"Europe/Berlin"
|
||||
)
|
||||
|
||||
# Optionally: Remove timezone information if only working locally
|
||||
self.predicted_data["time"] = self.predicted_data["time"].dt.tz_localize(None)
|
||||
self.measured_data["time"] = self.measured_data["time"].dt.tz_localize(None)
|
||||
|
||||
# Now you can perform the merge
|
||||
merged_data = pd.merge(
|
||||
self.measured_data, self.predicted_data, on="time", how="inner"
|
||||
)
|
||||
print(merged_data)
|
||||
merged_data["Hour"] = merged_data["time"].dt.hour
|
||||
merged_data["DayOfWeek"] = merged_data["time"].dt.dayofweek
|
||||
return merged_data
|
||||
|
||||
def calculate_weighted_mean(self, train_period_weeks=9, test_period_weeks=1):
|
||||
self.merged_data = self._remove_outliers(self.merged_data)
|
||||
train_end_date = self.merged_data["time"].max() - pd.Timedelta(
|
||||
weeks=test_period_weeks
|
||||
)
|
||||
train_start_date = train_end_date - pd.Timedelta(weeks=train_period_weeks)
|
||||
|
||||
test_start_date = train_end_date + pd.Timedelta(hours=1)
|
||||
test_end_date = (
|
||||
test_start_date
|
||||
+ pd.Timedelta(weeks=test_period_weeks)
|
||||
- pd.Timedelta(hours=1)
|
||||
)
|
||||
|
||||
self.train_data = self.merged_data[
|
||||
(self.merged_data["time"] >= train_start_date)
|
||||
& (self.merged_data["time"] <= train_end_date)
|
||||
]
|
||||
|
||||
self.test_data = self.merged_data[
|
||||
(self.merged_data["time"] >= test_start_date)
|
||||
& (self.merged_data["time"] <= test_end_date)
|
||||
]
|
||||
|
||||
self.train_data["Difference"] = (
|
||||
self.train_data["Last"] - self.train_data["Last Pred"]
|
||||
)
|
||||
|
||||
weekdays_train_data = self.train_data[self.train_data["DayOfWeek"] < 5]
|
||||
weekends_train_data = self.train_data[self.train_data["DayOfWeek"] >= 5]
|
||||
|
||||
self.weekday_diff = (
|
||||
weekdays_train_data.groupby("Hour").apply(self._weighted_mean_diff).dropna()
|
||||
)
|
||||
self.weekend_diff = (
|
||||
weekends_train_data.groupby("Hour").apply(self._weighted_mean_diff).dropna()
|
||||
)
|
||||
|
||||
def _weighted_mean_diff(self, data):
|
||||
train_end_date = self.train_data["time"].max()
|
||||
weights = 1 / (train_end_date - data["time"]).dt.days.replace(0, np.nan)
|
||||
weighted_mean = (data["Difference"] * weights).sum() / weights.sum()
|
||||
return weighted_mean
|
||||
|
||||
def adjust_predictions(self):
|
||||
self.train_data["Adjusted Pred"] = self.train_data.apply(
|
||||
self._adjust_row, axis=1
|
||||
)
|
||||
self.test_data["Adjusted Pred"] = self.test_data.apply(self._adjust_row, axis=1)
|
||||
|
||||
def _adjust_row(self, row):
|
||||
if row["DayOfWeek"] < 5:
|
||||
return row["Last Pred"] + self.weekday_diff.get(row["Hour"], 0)
|
||||
else:
|
||||
return row["Last Pred"] + self.weekend_diff.get(row["Hour"], 0)
|
||||
|
||||
def plot_results(self):
|
||||
self._plot_data(self.train_data, "Training")
|
||||
self._plot_data(self.test_data, "Testing")
|
||||
|
||||
def _plot_data(self, data, data_type):
|
||||
plt.figure(figsize=(14, 7))
|
||||
plt.plot(
|
||||
data["time"], data["Last"], label=f"Actual Last - {data_type}", color="blue"
|
||||
)
|
||||
plt.plot(
|
||||
data["time"],
|
||||
data["Last Pred"],
|
||||
label=f"Predicted Last - {data_type}",
|
||||
color="red",
|
||||
linestyle="--",
|
||||
)
|
||||
plt.plot(
|
||||
data["time"],
|
||||
data["Adjusted Pred"],
|
||||
label=f"Adjusted Predicted Last - {data_type}",
|
||||
color="green",
|
||||
linestyle=":",
|
||||
)
|
||||
plt.xlabel("Time")
|
||||
plt.ylabel("Load")
|
||||
plt.title(f"Actual vs Predicted vs Adjusted Predicted Load ({data_type} Data)")
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
plt.show()
|
||||
|
||||
def evaluate_model(self):
|
||||
mse = mean_squared_error(
|
||||
self.test_data["Last"], self.test_data["Adjusted Pred"]
|
||||
)
|
||||
r2 = r2_score(self.test_data["Last"], self.test_data["Adjusted Pred"])
|
||||
print(f"Mean Squared Error: {mse}")
|
||||
print(f"R-squared: {r2}")
|
||||
|
||||
def predict_next_hours(self, hours_ahead):
|
||||
last_date = self.merged_data["time"].max()
|
||||
future_dates = [
|
||||
last_date + pd.Timedelta(hours=i) for i in range(1, hours_ahead + 1)
|
||||
]
|
||||
future_df = pd.DataFrame({"time": future_dates})
|
||||
future_df["Hour"] = future_df["time"].dt.hour
|
||||
future_df["DayOfWeek"] = future_df["time"].dt.dayofweek
|
||||
future_df["Last Pred"] = future_df["time"].apply(self._forecast_next_hours)
|
||||
future_df["Adjusted Pred"] = future_df.apply(self._adjust_row, axis=1)
|
||||
return future_df
|
||||
|
||||
def _forecast_next_hours(self, timestamp):
|
||||
date_str = timestamp.strftime("%Y-%m-%d")
|
||||
hour = timestamp.hour
|
||||
daily_forecast = self.load_forecast.get_daily_stats(date_str)
|
||||
return daily_forecast[0][hour] if hour < len(daily_forecast[0]) else np.nan
|
||||
|
||||
|
||||
# if __name__ == '__main__':
|
||||
# estimator = LastEstimator()
|
||||
# start_date = "2024-06-01"
|
||||
# end_date = "2024-08-01"
|
||||
# last_df = estimator.get_last(start_date, end_date)
|
||||
|
||||
# selected_columns = last_df[['timestamp', 'Last']]
|
||||
# selected_columns['time'] = pd.to_datetime(selected_columns['timestamp']).dt.floor('H')
|
||||
# selected_columns['Last'] = pd.to_numeric(selected_columns['Last'], errors='coerce')
|
||||
|
||||
# # Drop rows with NaN values
|
||||
# cleaned_data = selected_columns.dropna()
|
||||
|
||||
# print(cleaned_data)
|
||||
# # Create an instance of LoadForecast
|
||||
# lf = LoadForecast(filepath=r'.\load_profiles.npz', year_energy=6000*1000)
|
||||
|
||||
# # Initialize an empty DataFrame to hold the forecast data
|
||||
# forecast_list = []
|
||||
|
||||
# # Loop through each day in the date range
|
||||
# for single_date in pd.date_range(cleaned_data['time'].min().date(), cleaned_data['time'].max().date()):
|
||||
# date_str = single_date.strftime('%Y-%m-%d')
|
||||
# daily_forecast = lf.get_daily_stats(date_str)
|
||||
# mean_values = daily_forecast[0] # Extract the mean values
|
||||
# hours = [single_date + pd.Timedelta(hours=i) for i in range(24)]
|
||||
# daily_forecast_df = pd.DataFrame({'time': hours, 'Last Pred': mean_values})
|
||||
# forecast_list.append(daily_forecast_df)
|
||||
|
||||
# # Concatenate all daily forecasts into a single DataFrame
|
||||
# forecast_df = pd.concat(forecast_list, ignore_index=True)
|
||||
|
||||
# # Create an instance of the LoadPredictionAdjuster class
|
||||
# adjuster = LoadPredictionAdjuster(cleaned_data, forecast_df, lf)
|
||||
|
||||
# # Calculate the weighted mean differences
|
||||
# adjuster.calculate_weighted_mean()
|
||||
|
||||
# # Adjust the predictions
|
||||
# adjuster.adjust_predictions()
|
||||
|
||||
# # Plot the results
|
||||
# adjuster.plot_results()
|
||||
|
||||
# # Evaluate the model
|
||||
# adjuster.evaluate_model()
|
||||
|
||||
# # Predict the next x hours
|
||||
# future_predictions = adjuster.predict_next_hours(48)
|
||||
# print(future_predictions)
|
351
src/akkudoktoreos/class_optimize.py
Normal file
351
src/akkudoktoreos/class_optimize.py
Normal file
@@ -0,0 +1,351 @@
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
from deap import algorithms, base, creator, tools
|
||||
|
||||
from modules.class_akku import PVAkku
|
||||
from modules.class_ems import EnergieManagementSystem
|
||||
from modules.class_haushaltsgeraet import Haushaltsgeraet
|
||||
from modules.class_inverter import Wechselrichter
|
||||
from modules.visualize import visualisiere_ergebnisse
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from config import moegliche_ladestroeme_in_prozent
|
||||
|
||||
|
||||
def isfloat(num: Any) -> bool:
|
||||
"""Check if a given input can be converted to float."""
|
||||
try:
|
||||
float(num)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
class optimization_problem:
|
||||
def __init__(
|
||||
self,
|
||||
prediction_hours: int = 24,
|
||||
strafe: float = 10,
|
||||
optimization_hours: int = 24,
|
||||
verbose: bool = False,
|
||||
fixed_seed: Optional[int] = None,
|
||||
):
|
||||
"""Initialize the optimization problem with the required parameters."""
|
||||
self.prediction_hours = prediction_hours
|
||||
self.strafe = strafe
|
||||
self.opti_param = None
|
||||
self.fixed_eauto_hours = prediction_hours - optimization_hours
|
||||
self.possible_charge_values = moegliche_ladestroeme_in_prozent
|
||||
self.verbose = verbose
|
||||
self.fix_seed = fixed_seed
|
||||
|
||||
# Set a fixed seed for random operations if provided
|
||||
if fixed_seed is not None:
|
||||
random.seed(fixed_seed)
|
||||
|
||||
def split_individual(
|
||||
self, individual: List[float]
|
||||
) -> Tuple[List[int], List[float], Optional[int]]:
|
||||
"""
|
||||
Split the individual solution into its components:
|
||||
1. Discharge hours (binary),
|
||||
2. Electric vehicle charge hours (float),
|
||||
3. Dishwasher start time (integer if applicable).
|
||||
"""
|
||||
discharge_hours_bin = individual[: self.prediction_hours]
|
||||
eautocharge_hours_float = individual[
|
||||
self.prediction_hours : self.prediction_hours * 2
|
||||
]
|
||||
spuelstart_int = (
|
||||
individual[-1]
|
||||
if self.opti_param and self.opti_param.get("haushaltsgeraete", 0) > 0
|
||||
else None
|
||||
)
|
||||
return discharge_hours_bin, eautocharge_hours_float, spuelstart_int
|
||||
|
||||
def setup_deap_environment(
|
||||
self, opti_param: Dict[str, Any], start_hour: int
|
||||
) -> None:
|
||||
"""
|
||||
Set up the DEAP environment with fitness and individual creation rules.
|
||||
"""
|
||||
self.opti_param = opti_param
|
||||
|
||||
# Remove existing FitnessMin and Individual classes from creator if present
|
||||
for attr in ["FitnessMin", "Individual"]:
|
||||
if attr in creator.__dict__:
|
||||
del creator.__dict__[attr]
|
||||
|
||||
# Create new FitnessMin and Individual classes
|
||||
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
|
||||
creator.create("Individual", list, fitness=creator.FitnessMin)
|
||||
|
||||
# Initialize toolbox with attributes and operations
|
||||
self.toolbox = base.Toolbox()
|
||||
self.toolbox.register("attr_bool", random.randint, 0, 1)
|
||||
self.toolbox.register("attr_float", random.uniform, 0, 1)
|
||||
self.toolbox.register("attr_int", random.randint, start_hour, 23)
|
||||
|
||||
# Register individual creation method based on household appliance parameter
|
||||
if opti_param["haushaltsgeraete"] > 0:
|
||||
self.toolbox.register(
|
||||
"individual",
|
||||
lambda: creator.Individual(
|
||||
[self.toolbox.attr_bool() for _ in range(self.prediction_hours)]
|
||||
+ [self.toolbox.attr_float() for _ in range(self.prediction_hours)]
|
||||
+ [self.toolbox.attr_int()]
|
||||
),
|
||||
)
|
||||
else:
|
||||
self.toolbox.register(
|
||||
"individual",
|
||||
lambda: creator.Individual(
|
||||
[self.toolbox.attr_bool() for _ in range(self.prediction_hours)]
|
||||
+ [self.toolbox.attr_float() for _ in range(self.prediction_hours)]
|
||||
),
|
||||
)
|
||||
|
||||
# Register population, mating, mutation, and selection functions
|
||||
self.toolbox.register(
|
||||
"population", tools.initRepeat, list, self.toolbox.individual
|
||||
)
|
||||
self.toolbox.register("mate", tools.cxTwoPoint)
|
||||
self.toolbox.register("mutate", tools.mutFlipBit, indpb=0.1)
|
||||
self.toolbox.register("select", tools.selTournament, tournsize=3)
|
||||
|
||||
def evaluate_inner(
|
||||
self, individual: List[float], ems: EnergieManagementSystem, start_hour: int
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Internal evaluation function that simulates the energy management system (EMS)
|
||||
using the provided individual solution.
|
||||
"""
|
||||
ems.reset()
|
||||
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = (
|
||||
self.split_individual(individual)
|
||||
)
|
||||
if self.opti_param.get("haushaltsgeraete", 0) > 0:
|
||||
ems.set_haushaltsgeraet_start(spuelstart_int, global_start_hour=start_hour)
|
||||
|
||||
ems.set_akku_discharge_hours(discharge_hours_bin)
|
||||
eautocharge_hours_float[self.prediction_hours - self.fixed_eauto_hours :] = [
|
||||
0.0
|
||||
] * self.fixed_eauto_hours
|
||||
ems.set_eauto_charge_hours(eautocharge_hours_float)
|
||||
return ems.simuliere(start_hour)
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
individual: List[float],
|
||||
ems: EnergieManagementSystem,
|
||||
parameter: Dict[str, Any],
|
||||
start_hour: int,
|
||||
worst_case: bool,
|
||||
) -> Tuple[float]:
|
||||
"""
|
||||
Evaluate the fitness of an individual solution based on the simulation results.
|
||||
"""
|
||||
try:
|
||||
o = self.evaluate_inner(individual, ems, start_hour)
|
||||
except Exception:
|
||||
return (100000.0,) # Return a high penalty in case of an exception
|
||||
|
||||
gesamtbilanz = o["Gesamtbilanz_Euro"] * (-1.0 if worst_case else 1.0)
|
||||
discharge_hours_bin, eautocharge_hours_float, _ = self.split_individual(
|
||||
individual
|
||||
)
|
||||
max_ladeleistung = np.max(moegliche_ladestroeme_in_prozent)
|
||||
|
||||
# Penalty for not discharging
|
||||
gesamtbilanz += sum(
|
||||
0.01 for i in range(self.prediction_hours) if discharge_hours_bin[i] == 0.0
|
||||
)
|
||||
|
||||
# Penalty for charging the electric vehicle during restricted hours
|
||||
gesamtbilanz += sum(
|
||||
self.strafe
|
||||
for i in range(
|
||||
self.prediction_hours - self.fixed_eauto_hours, self.prediction_hours
|
||||
)
|
||||
if eautocharge_hours_float[i] != 0.0
|
||||
)
|
||||
|
||||
# Penalty for exceeding maximum charge power
|
||||
gesamtbilanz += sum(
|
||||
self.strafe * 10
|
||||
for ladeleistung in eautocharge_hours_float
|
||||
if ladeleistung > max_ladeleistung
|
||||
)
|
||||
|
||||
# Penalty for not meeting the minimum SOC (State of Charge) requirement
|
||||
if parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent() <= 0.0:
|
||||
gesamtbilanz += sum(
|
||||
self.strafe
|
||||
for ladeleistung in eautocharge_hours_float
|
||||
if ladeleistung != 0.0
|
||||
)
|
||||
|
||||
individual.extra_data = (
|
||||
o["Gesamtbilanz_Euro"],
|
||||
o["Gesamt_Verluste"],
|
||||
parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent(),
|
||||
)
|
||||
|
||||
# Adjust total balance with battery value and penalties for unmet SOC
|
||||
restwert_akku = (
|
||||
ems.akku.aktueller_energieinhalt() * parameter["preis_euro_pro_wh_akku"]
|
||||
)
|
||||
gesamtbilanz += (
|
||||
max(
|
||||
0,
|
||||
(parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent())
|
||||
* self.strafe,
|
||||
)
|
||||
- restwert_akku
|
||||
)
|
||||
|
||||
return (gesamtbilanz,)
|
||||
|
||||
def optimize(
|
||||
self, start_solution: Optional[List[float]] = None
|
||||
) -> Tuple[Any, Dict[str, List[Any]]]:
|
||||
"""Run the optimization process using a genetic algorithm."""
|
||||
population = self.toolbox.population(n=300)
|
||||
hof = tools.HallOfFame(1)
|
||||
stats = tools.Statistics(lambda ind: ind.fitness.values)
|
||||
stats.register("min", np.min)
|
||||
|
||||
if self.verbose:
|
||||
print("Start optimize:", start_solution)
|
||||
|
||||
# Insert the start solution into the population if provided
|
||||
if start_solution not in [None, -1]:
|
||||
for _ in range(3):
|
||||
population.insert(0, creator.Individual(start_solution))
|
||||
|
||||
# Run the evolutionary algorithm
|
||||
algorithms.eaMuPlusLambda(
|
||||
population,
|
||||
self.toolbox,
|
||||
mu=100,
|
||||
lambda_=200,
|
||||
cxpb=0.5,
|
||||
mutpb=0.3,
|
||||
ngen=400,
|
||||
stats=stats,
|
||||
halloffame=hof,
|
||||
verbose=self.verbose,
|
||||
)
|
||||
|
||||
member = {"bilanz": [], "verluste": [], "nebenbedingung": []}
|
||||
for ind in population:
|
||||
if hasattr(ind, "extra_data"):
|
||||
extra_value1, extra_value2, extra_value3 = ind.extra_data
|
||||
member["bilanz"].append(extra_value1)
|
||||
member["verluste"].append(extra_value2)
|
||||
member["nebenbedingung"].append(extra_value3)
|
||||
|
||||
return hof[0], member
|
||||
|
||||
def optimierung_ems(
|
||||
self,
|
||||
parameter: Optional[Dict[str, Any]] = None,
|
||||
start_hour: Optional[int] = None,
|
||||
worst_case: bool = False,
|
||||
startdate: Optional[Any] = None, # startdate is not used!
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Perform EMS (Energy Management System) optimization and visualize results.
|
||||
"""
|
||||
einspeiseverguetung_euro_pro_wh = np.full(
|
||||
self.prediction_hours, parameter["einspeiseverguetung_euro_pro_wh"]
|
||||
)
|
||||
|
||||
# Initialize PV and EV batteries
|
||||
akku = PVAkku(
|
||||
kapazitaet_wh=parameter["pv_akku_cap"],
|
||||
hours=self.prediction_hours,
|
||||
start_soc_prozent=parameter["pv_soc"],
|
||||
min_soc_prozent=parameter["min_soc_prozent"],
|
||||
max_ladeleistung_w=5000,
|
||||
)
|
||||
akku.set_charge_per_hour(np.full(self.prediction_hours, 1))
|
||||
|
||||
eauto = PVAkku(
|
||||
kapazitaet_wh=parameter["eauto_cap"],
|
||||
hours=self.prediction_hours,
|
||||
lade_effizienz=parameter["eauto_charge_efficiency"],
|
||||
entlade_effizienz=1.0,
|
||||
max_ladeleistung_w=parameter["eauto_charge_power"],
|
||||
start_soc_prozent=parameter["eauto_soc"],
|
||||
)
|
||||
eauto.set_charge_per_hour(np.full(self.prediction_hours, 1))
|
||||
|
||||
# Initialize household appliance if applicable
|
||||
spuelmaschine = (
|
||||
Haushaltsgeraet(
|
||||
hours=self.prediction_hours,
|
||||
verbrauch_wh=parameter["haushaltsgeraet_wh"],
|
||||
dauer_h=parameter["haushaltsgeraet_dauer"],
|
||||
)
|
||||
if parameter["haushaltsgeraet_dauer"] > 0
|
||||
else None
|
||||
)
|
||||
|
||||
# Initialize the inverter and energy management system
|
||||
wr = Wechselrichter(10000, akku)
|
||||
ems = EnergieManagementSystem(
|
||||
gesamtlast=parameter["gesamtlast"],
|
||||
pv_prognose_wh=parameter["pv_forecast"],
|
||||
strompreis_euro_pro_wh=parameter["strompreis_euro_pro_wh"],
|
||||
einspeiseverguetung_euro_pro_wh=einspeiseverguetung_euro_pro_wh,
|
||||
eauto=eauto,
|
||||
haushaltsgeraet=spuelmaschine,
|
||||
wechselrichter=wr,
|
||||
)
|
||||
|
||||
# Setup the DEAP environment and optimization process
|
||||
self.setup_deap_environment(
|
||||
{"haushaltsgeraete": 1 if spuelmaschine else 0}, start_hour
|
||||
)
|
||||
self.toolbox.register(
|
||||
"evaluate",
|
||||
lambda ind: self.evaluate(ind, ems, parameter, start_hour, worst_case),
|
||||
)
|
||||
start_solution, extra_data = self.optimize(parameter["start_solution"])
|
||||
|
||||
# Perform final evaluation on the best solution
|
||||
o = self.evaluate_inner(start_solution, ems, start_hour)
|
||||
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = (
|
||||
self.split_individual(start_solution)
|
||||
)
|
||||
|
||||
# Visualize the results
|
||||
visualisiere_ergebnisse(
|
||||
parameter["gesamtlast"],
|
||||
parameter["pv_forecast"],
|
||||
parameter["strompreis_euro_pro_wh"],
|
||||
o,
|
||||
discharge_hours_bin,
|
||||
eautocharge_hours_float,
|
||||
parameter["temperature_forecast"],
|
||||
start_hour,
|
||||
self.prediction_hours,
|
||||
einspeiseverguetung_euro_pro_wh,
|
||||
extra_data=extra_data,
|
||||
)
|
||||
|
||||
# Return final results as a dictionary
|
||||
return {
|
||||
"discharge_hours_bin": discharge_hours_bin,
|
||||
"eautocharge_hours_float": eautocharge_hours_float,
|
||||
"result": o,
|
||||
"eauto_obj": ems.eauto.to_dict(),
|
||||
"start_solution": start_solution,
|
||||
"spuelstart": spuelstart_int,
|
||||
"simulation_data": o,
|
||||
}
|
247
src/akkudoktoreos/class_pv_forecast.py
Normal file
247
src/akkudoktoreos/class_pv_forecast.py
Normal file
@@ -0,0 +1,247 @@
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
from pprint import pprint
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import requests
|
||||
from dateutil import parser
|
||||
|
||||
|
||||
class ForecastData:
|
||||
def __init__(
|
||||
self,
|
||||
date_time,
|
||||
dc_power,
|
||||
ac_power,
|
||||
windspeed_10m=None,
|
||||
temperature=None,
|
||||
ac_power_measurement=None,
|
||||
):
|
||||
self.date_time = date_time
|
||||
self.dc_power = dc_power
|
||||
self.ac_power = ac_power
|
||||
self.windspeed_10m = windspeed_10m
|
||||
self.temperature = temperature
|
||||
self.ac_power_measurement = ac_power_measurement
|
||||
|
||||
def get_date_time(self):
|
||||
return self.date_time
|
||||
|
||||
def get_dc_power(self):
|
||||
return self.dc_power
|
||||
|
||||
def ac_power_measurement(self):
|
||||
return self.ac_power_measurement
|
||||
|
||||
def get_ac_power(self):
|
||||
if self.ac_power_measurement is not None:
|
||||
return self.ac_power_measurement
|
||||
else:
|
||||
return self.ac_power
|
||||
|
||||
def get_windspeed_10m(self):
|
||||
return self.windspeed_10m
|
||||
|
||||
def get_temperature(self):
|
||||
return self.temperature
|
||||
|
||||
|
||||
class PVForecast:
|
||||
def __init__(self, filepath=None, url=None, cache_dir="cache", prediction_hours=48):
|
||||
self.meta = {}
|
||||
self.forecast_data = []
|
||||
self.cache_dir = cache_dir
|
||||
self.prediction_hours = prediction_hours
|
||||
self.current_measurement = None
|
||||
|
||||
if not os.path.exists(self.cache_dir):
|
||||
os.makedirs(self.cache_dir)
|
||||
if filepath:
|
||||
self.load_data_from_file(filepath)
|
||||
elif url:
|
||||
self.load_data_with_caching(url)
|
||||
|
||||
if len(self.forecast_data) < self.prediction_hours:
|
||||
raise ValueError(
|
||||
f"Die Vorhersage muss mindestens {self.prediction_hours} Stunden umfassen, aber es wurden nur {len(self.forecast_data)} Stunden vorhergesagt."
|
||||
)
|
||||
|
||||
def update_ac_power_measurement(
|
||||
self, date_time=None, ac_power_measurement=None
|
||||
) -> bool:
|
||||
found = False
|
||||
input_date_hour = date_time.replace(minute=0, second=0, microsecond=0)
|
||||
|
||||
for forecast in self.forecast_data:
|
||||
forecast_date_hour = parser.parse(forecast.date_time).replace(
|
||||
minute=0, second=0, microsecond=0
|
||||
)
|
||||
if forecast_date_hour == input_date_hour:
|
||||
forecast.ac_power_measurement = ac_power_measurement
|
||||
found = True
|
||||
break
|
||||
return found
|
||||
|
||||
def process_data(self, data):
|
||||
self.meta = data.get("meta", {})
|
||||
all_values = data.get("values", [])
|
||||
|
||||
for i in range(
|
||||
len(all_values[0])
|
||||
): # Annahme, dass alle Listen gleich lang sind
|
||||
sum_dc_power = sum(values[i]["dcPower"] for values in all_values)
|
||||
sum_ac_power = sum(values[i]["power"] for values in all_values)
|
||||
|
||||
# Zeige die ursprünglichen und berechneten Zeitstempel an
|
||||
original_datetime = all_values[0][i].get("datetime")
|
||||
# print(original_datetime," ",sum_dc_power," ",all_values[0][i]['dcPower'])
|
||||
dt = datetime.strptime(original_datetime, "%Y-%m-%dT%H:%M:%S.%f%z")
|
||||
dt = dt.replace(tzinfo=None)
|
||||
# iso_datetime = parser.parse(original_datetime).isoformat() # Konvertiere zu ISO-Format
|
||||
# print()
|
||||
# Optional: 2 Stunden abziehen, um die Zeitanpassung zu testen
|
||||
# adjusted_datetime = parser.parse(original_datetime) - timedelta(hours=2)
|
||||
# print(f"Angepasste Zeitstempel: {adjusted_datetime.isoformat()}")
|
||||
|
||||
forecast = ForecastData(
|
||||
date_time=dt, # Verwende angepassten Zeitstempel
|
||||
dc_power=sum_dc_power,
|
||||
ac_power=sum_ac_power,
|
||||
windspeed_10m=all_values[0][i].get("windspeed_10m"),
|
||||
temperature=all_values[0][i].get("temperature"),
|
||||
)
|
||||
|
||||
self.forecast_data.append(forecast)
|
||||
|
||||
def load_data_from_file(self, filepath):
|
||||
with open(filepath, "r") as file:
|
||||
data = json.load(file)
|
||||
self.process_data(data)
|
||||
|
||||
def load_data_from_url(self, url):
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
pprint(data)
|
||||
self.process_data(data)
|
||||
else:
|
||||
print(
|
||||
f"Failed to load data from {url}. Status Code: {response.status_code}"
|
||||
)
|
||||
self.load_data_from_url(url)
|
||||
|
||||
def load_data_with_caching(self, url):
|
||||
date = datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
cache_file = os.path.join(
|
||||
self.cache_dir, self.generate_cache_filename(url, date)
|
||||
)
|
||||
if os.path.exists(cache_file):
|
||||
with open(cache_file, "r") as file:
|
||||
data = json.load(file)
|
||||
print("Loading data from cache.")
|
||||
else:
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
with open(cache_file, "w") as file:
|
||||
json.dump(data, file)
|
||||
print("Data fetched from URL and cached.")
|
||||
else:
|
||||
print(
|
||||
f"Failed to load data from {url}. Status Code: {response.status_code}"
|
||||
)
|
||||
return
|
||||
self.process_data(data)
|
||||
|
||||
def generate_cache_filename(self, url, date):
|
||||
cache_key = hashlib.sha256(f"{url}{date}".encode("utf-8")).hexdigest()
|
||||
return f"cache_{cache_key}.json"
|
||||
|
||||
def get_forecast_data(self):
|
||||
return self.forecast_data
|
||||
|
||||
def get_temperature_forecast_for_date(self, input_date_str):
|
||||
input_date = datetime.strptime(input_date_str, "%Y-%m-%d")
|
||||
daily_forecast_obj = [
|
||||
data
|
||||
for data in self.forecast_data
|
||||
if parser.parse(data.get_date_time()).date() == input_date.date()
|
||||
]
|
||||
daily_forecast = []
|
||||
for d in daily_forecast_obj:
|
||||
daily_forecast.append(d.get_temperature())
|
||||
|
||||
return np.array(daily_forecast)
|
||||
|
||||
def get_pv_forecast_for_date_range(self, start_date_str, end_date_str):
|
||||
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
|
||||
end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()
|
||||
date_range_forecast = []
|
||||
|
||||
for data in self.forecast_data:
|
||||
data_date = (
|
||||
data.get_date_time().date()
|
||||
) # parser.parse(data.get_date_time()).date()
|
||||
if start_date <= data_date <= end_date:
|
||||
date_range_forecast.append(data)
|
||||
print(data.get_date_time(), " ", data.get_ac_power())
|
||||
|
||||
ac_power_forecast = np.array(
|
||||
[data.get_ac_power() for data in date_range_forecast]
|
||||
)
|
||||
|
||||
return np.array(ac_power_forecast)[: self.prediction_hours]
|
||||
|
||||
def get_temperature_for_date_range(self, start_date_str, end_date_str):
|
||||
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
|
||||
end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()
|
||||
date_range_forecast = []
|
||||
|
||||
for data in self.forecast_data:
|
||||
data_date = data.get_date_time().date()
|
||||
if start_date <= data_date <= end_date:
|
||||
date_range_forecast.append(data)
|
||||
|
||||
temperature_forecast = [data.get_temperature() for data in date_range_forecast]
|
||||
return np.array(temperature_forecast)[: self.prediction_hours]
|
||||
|
||||
def get_forecast_dataframe(self):
|
||||
# Wandelt die Vorhersagedaten in ein Pandas DataFrame um
|
||||
data = [
|
||||
{
|
||||
"date_time": f.get_date_time(),
|
||||
"dc_power": f.get_dc_power(),
|
||||
"ac_power": f.get_ac_power(),
|
||||
"windspeed_10m": f.get_windspeed_10m(),
|
||||
"temperature": f.get_temperature(),
|
||||
}
|
||||
for f in self.forecast_data
|
||||
]
|
||||
|
||||
# Erstelle ein DataFrame
|
||||
df = pd.DataFrame(data)
|
||||
return df
|
||||
|
||||
def print_ac_power_and_measurement(self):
|
||||
"""Druckt die DC-Leistung und den Messwert für jede Stunde."""
|
||||
for forecast in self.forecast_data:
|
||||
date_time = forecast.date_time
|
||||
print(
|
||||
f"Zeit: {date_time}, DC: {forecast.dc_power}, AC: {forecast.ac_power}, Messwert: {forecast.ac_power_measurement}, AC GET: {forecast.get_ac_power()}"
|
||||
)
|
||||
|
||||
|
||||
# Beispiel für die Verwendung der Klasse
|
||||
if __name__ == "__main__":
|
||||
forecast = PVForecast(
|
||||
prediction_hours=24,
|
||||
url="https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405&power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&timezone=Europe%2FBerlin&hourly=relativehumidity_2m%2Cwindspeed_10m",
|
||||
)
|
||||
forecast.update_ac_power_measurement(
|
||||
date_time=datetime.now(), ac_power_measurement=1000
|
||||
)
|
||||
forecast.print_ac_power_and_measurement()
|
331
src/akkudoktoreos/class_soc_calc.py
Normal file
331
src/akkudoktoreos/class_soc_calc.py
Normal file
@@ -0,0 +1,331 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import mariadb
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
|
||||
class BatteryDataProcessor:
|
||||
def __init__(
|
||||
self,
|
||||
config,
|
||||
voltage_high_threshold,
|
||||
voltage_low_threshold,
|
||||
current_low_threshold,
|
||||
gap,
|
||||
battery_capacity_ah,
|
||||
):
|
||||
self.config = config
|
||||
self.voltage_high_threshold = voltage_high_threshold
|
||||
self.voltage_low_threshold = voltage_low_threshold
|
||||
self.current_low_threshold = current_low_threshold
|
||||
self.gap = gap
|
||||
self.battery_capacity_ah = battery_capacity_ah
|
||||
self.conn = None
|
||||
self.data = None
|
||||
|
||||
def connect_db(self):
|
||||
self.conn = mariadb.connect(**self.config)
|
||||
self.cursor = self.conn.cursor()
|
||||
|
||||
def disconnect_db(self):
|
||||
if self.conn:
|
||||
self.cursor.close()
|
||||
self.conn.close()
|
||||
|
||||
def fetch_data(self, start_time):
|
||||
query = """
|
||||
SELECT timestamp, data, topic
|
||||
FROM pip
|
||||
WHERE timestamp >= %s AND (topic = 'battery_current' OR topic = 'battery_voltage')
|
||||
ORDER BY timestamp
|
||||
"""
|
||||
self.cursor.execute(query, (start_time,))
|
||||
rows = self.cursor.fetchall()
|
||||
self.data = pd.DataFrame(rows, columns=["timestamp", "data", "topic"])
|
||||
self.data["timestamp"] = pd.to_datetime(self.data["timestamp"])
|
||||
self.data["data"] = self.data["data"].astype(float)
|
||||
|
||||
def process_data(self):
|
||||
self.data.drop_duplicates(subset=["timestamp", "topic"], inplace=True)
|
||||
|
||||
data_pivot = self.data.pivot(index="timestamp", columns="topic", values="data")
|
||||
data_pivot = data_pivot.resample("1T").mean().interpolate()
|
||||
data_pivot.columns.name = None
|
||||
data_pivot.reset_index(inplace=True)
|
||||
self.data = data_pivot
|
||||
|
||||
def group_points(self, df):
|
||||
df = df.sort_values("timestamp")
|
||||
groups = []
|
||||
group = []
|
||||
last_time = None
|
||||
|
||||
for _, row in df.iterrows():
|
||||
if last_time is None or (row["timestamp"] - last_time) <= pd.Timedelta(
|
||||
minutes=self.gap
|
||||
):
|
||||
group.append(row)
|
||||
else:
|
||||
groups.append(group)
|
||||
group = [row]
|
||||
last_time = row["timestamp"]
|
||||
|
||||
if group:
|
||||
groups.append(group)
|
||||
|
||||
last_points = [group[-1] for group in groups]
|
||||
return last_points
|
||||
|
||||
def find_soc_points(self):
|
||||
condition_soc_100 = (
|
||||
self.data["battery_voltage"] >= self.voltage_high_threshold
|
||||
) & (self.data["battery_current"].abs() <= self.current_low_threshold)
|
||||
condition_soc_0 = (
|
||||
self.data["battery_voltage"] <= self.voltage_low_threshold
|
||||
) & (self.data["battery_current"].abs() <= self.current_low_threshold)
|
||||
|
||||
times_soc_100_all = self.data[condition_soc_100][
|
||||
["timestamp", "battery_voltage", "battery_current"]
|
||||
]
|
||||
times_soc_0_all = self.data[condition_soc_0][
|
||||
["timestamp", "battery_voltage", "battery_current"]
|
||||
]
|
||||
|
||||
last_points_100 = self.group_points(times_soc_100_all)
|
||||
last_points_0 = self.group_points(times_soc_0_all)
|
||||
|
||||
last_points_100_df = pd.DataFrame(last_points_100)
|
||||
last_points_0_df = pd.DataFrame(last_points_0)
|
||||
|
||||
return last_points_100_df, last_points_0_df
|
||||
|
||||
def calculate_resetting_soc(self, last_points_100_df, last_points_0_df):
|
||||
soc_values = []
|
||||
integration_results = []
|
||||
reset_points = pd.concat([last_points_100_df, last_points_0_df]).sort_values(
|
||||
"timestamp"
|
||||
)
|
||||
|
||||
# Initialisieren der SoC-Liste
|
||||
self.data["calculated_soc"] = np.nan
|
||||
|
||||
for i in range(len(reset_points)):
|
||||
start_point = reset_points.iloc[i]
|
||||
if i < len(reset_points) - 1:
|
||||
end_point = reset_points.iloc[i + 1]
|
||||
else:
|
||||
end_point = self.data.iloc[
|
||||
-1
|
||||
] # Verwenden des letzten Datensatzes als Endpunkt
|
||||
|
||||
if start_point["timestamp"] in last_points_100_df["timestamp"].values:
|
||||
initial_soc = 100
|
||||
elif start_point["timestamp"] in last_points_0_df["timestamp"].values:
|
||||
initial_soc = 0
|
||||
|
||||
cut_data = self.data[
|
||||
(self.data["timestamp"] >= start_point["timestamp"])
|
||||
& (self.data["timestamp"] <= end_point["timestamp"])
|
||||
].copy()
|
||||
cut_data["time_diff_hours"] = (
|
||||
cut_data["timestamp"].diff().dt.total_seconds() / 3600
|
||||
)
|
||||
cut_data.dropna(subset=["time_diff_hours"], inplace=True)
|
||||
|
||||
calculated_soc = initial_soc
|
||||
calculated_soc_list = [calculated_soc]
|
||||
integrated_current = 0
|
||||
|
||||
for j in range(1, len(cut_data)):
|
||||
current = cut_data.iloc[j]["battery_current"]
|
||||
delta_t = cut_data.iloc[j]["time_diff_hours"]
|
||||
delta_soc = (
|
||||
(current * delta_t) / self.battery_capacity_ah * 100
|
||||
) # Convert to percentage
|
||||
|
||||
calculated_soc += delta_soc
|
||||
calculated_soc = min(max(calculated_soc, 0), 100) # Clip to 0-100%
|
||||
calculated_soc_list.append(calculated_soc)
|
||||
|
||||
# Integration des Stroms aufaddieren
|
||||
integrated_current += current * delta_t
|
||||
|
||||
cut_data["calculated_soc"] = calculated_soc_list
|
||||
soc_values.append(cut_data[["timestamp", "calculated_soc"]])
|
||||
|
||||
integration_results.append(
|
||||
{
|
||||
"start_time": start_point["timestamp"],
|
||||
"end_time": end_point["timestamp"],
|
||||
"integrated_current": integrated_current,
|
||||
"start_soc": initial_soc,
|
||||
"end_soc": calculated_soc_list[-1],
|
||||
}
|
||||
)
|
||||
|
||||
soc_df = (
|
||||
pd.concat(soc_values)
|
||||
.drop_duplicates(subset=["timestamp"])
|
||||
.reset_index(drop=True)
|
||||
)
|
||||
return soc_df, integration_results
|
||||
|
||||
def calculate_soh(self, integration_results):
|
||||
soh_values = []
|
||||
|
||||
for result in integration_results:
|
||||
delta_soc = abs(
|
||||
result["start_soc"] - result["end_soc"]
|
||||
) # Use the actual change in SoC
|
||||
if delta_soc > 0: # Avoid division by zero
|
||||
effective_capacity_ah = result["integrated_current"]
|
||||
soh = (effective_capacity_ah / self.battery_capacity_ah) * 100
|
||||
soh_values.append({"timestamp": result["end_time"], "soh": soh})
|
||||
|
||||
soh_df = pd.DataFrame(soh_values)
|
||||
return soh_df
|
||||
|
||||
def delete_existing_soc_entries(self, soc_df):
|
||||
delete_query = """
|
||||
DELETE FROM pip
|
||||
WHERE timestamp = %s AND topic = 'calculated_soc'
|
||||
"""
|
||||
timestamps = [
|
||||
(row["timestamp"].strftime("%Y-%m-%d %H:%M:%S"),)
|
||||
for _, row in soc_df.iterrows()
|
||||
if pd.notna(row["timestamp"])
|
||||
]
|
||||
|
||||
self.cursor.executemany(delete_query, timestamps)
|
||||
self.conn.commit()
|
||||
|
||||
def update_database_with_soc(self, soc_df):
|
||||
# Löschen der vorhandenen Einträge mit demselben Topic und Datum
|
||||
self.delete_existing_soc_entries(soc_df)
|
||||
|
||||
# Resample `soc_df` auf 5-Minuten-Intervalle und berechnen des Mittelwerts
|
||||
soc_df.set_index("timestamp", inplace=True)
|
||||
soc_df_resampled = soc_df.resample("5T").mean().dropna().reset_index()
|
||||
# soc_df_resampled['timestamp'] = soc_df_resampled['timestamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
|
||||
print(soc_df_resampled)
|
||||
|
||||
# Einfügen der berechneten SoC-Werte in die Datenbank
|
||||
insert_query = """
|
||||
INSERT INTO pip (timestamp, data, topic)
|
||||
VALUES (%s, %s, 'calculated_soc')
|
||||
"""
|
||||
for _, row in soc_df_resampled.iterrows():
|
||||
print(row)
|
||||
print(row["timestamp"])
|
||||
record = (
|
||||
row["timestamp"].strftime("%Y-%m-%d %H:%M:%S"),
|
||||
row["calculated_soc"],
|
||||
)
|
||||
try:
|
||||
self.cursor.execute(insert_query, record)
|
||||
except mariadb.OperationalError as e:
|
||||
print(f"Error inserting record {record}: {e}")
|
||||
|
||||
self.conn.commit()
|
||||
|
||||
def plot_data(self, last_points_100_df, last_points_0_df, soc_df):
|
||||
plt.figure(figsize=(14, 10))
|
||||
|
||||
plt.subplot(4, 1, 1)
|
||||
plt.plot(
|
||||
self.data["timestamp"],
|
||||
self.data["battery_voltage"],
|
||||
label="Battery Voltage",
|
||||
color="blue",
|
||||
)
|
||||
plt.scatter(
|
||||
last_points_100_df["timestamp"],
|
||||
last_points_100_df["battery_voltage"],
|
||||
color="green",
|
||||
marker="o",
|
||||
label="100% SoC Points",
|
||||
)
|
||||
# plt.scatter(last_points_0_df['timestamp'], last_points_0_df['battery_voltage'], color='red', marker='x', label='0% SoC Points')
|
||||
plt.xlabel("Timestamp")
|
||||
plt.ylabel("Voltage (V)")
|
||||
plt.legend()
|
||||
plt.title("Battery Voltage over Time")
|
||||
|
||||
plt.subplot(4, 1, 2)
|
||||
plt.plot(
|
||||
self.data["timestamp"],
|
||||
self.data["battery_current"],
|
||||
label="Battery Current",
|
||||
color="orange",
|
||||
)
|
||||
plt.scatter(
|
||||
last_points_100_df["timestamp"],
|
||||
last_points_100_df["battery_current"],
|
||||
color="green",
|
||||
marker="o",
|
||||
label="100% SoC Points",
|
||||
)
|
||||
# plt.scatter(last_points_0_df['timestamp'], last_points_0_df['battery_current'], color='red', marker='x', label='0% SoC Points')
|
||||
plt.xlabel("Timestamp")
|
||||
plt.ylabel("Current (A)")
|
||||
plt.legend()
|
||||
plt.title("Battery Current over Time")
|
||||
|
||||
plt.subplot(4, 1, 3)
|
||||
plt.plot(
|
||||
soc_df["timestamp"], soc_df["calculated_soc"], label="SoC", color="purple"
|
||||
)
|
||||
plt.xlabel("Timestamp")
|
||||
plt.ylabel("SoC (%)")
|
||||
plt.legend()
|
||||
plt.title("State of Charge (SoC) over Time")
|
||||
|
||||
# plt.subplot(4, 1, 4)
|
||||
# plt.plot(soh_df['timestamp'], soh_df['soh'], label='SoH', color='brown')
|
||||
# plt.xlabel('Timestamp')
|
||||
# plt.ylabel('SoH (%)')
|
||||
# plt.legend()
|
||||
# plt.title('State of Health (SoH) over Time')
|
||||
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# MariaDB Verbindungsdetails
|
||||
config = {}
|
||||
|
||||
# Parameter festlegen
|
||||
voltage_high_threshold = 55.4 # 100% SoC
|
||||
voltage_low_threshold = 46.5 # 0% SoC
|
||||
current_low_threshold = 2 # Niedriger Strom für beide Zustände
|
||||
gap = 30 # Zeitlücke in Minuten zum Gruppieren von Maxima/Minima
|
||||
bat_capacity = 33 * 1000 / 48
|
||||
|
||||
# Zeitpunkt X definieren
|
||||
zeitpunkt_x = (datetime.now() - timedelta(weeks=100)).strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
# BatteryDataProcessor instanziieren und verwenden
|
||||
processor = BatteryDataProcessor(
|
||||
config,
|
||||
voltage_high_threshold,
|
||||
voltage_low_threshold,
|
||||
current_low_threshold,
|
||||
gap,
|
||||
bat_capacity,
|
||||
)
|
||||
processor.connect_db()
|
||||
processor.fetch_data(zeitpunkt_x)
|
||||
processor.process_data()
|
||||
last_points_100_df, last_points_0_df = processor.find_soc_points()
|
||||
soc_df, integration_results = processor.calculate_resetting_soc(
|
||||
last_points_100_df, last_points_0_df
|
||||
)
|
||||
# soh_df = processor.calculate_soh(integration_results)
|
||||
processor.update_database_with_soc(soc_df)
|
||||
|
||||
processor.plot_data(last_points_100_df, last_points_0_df, soc_df)
|
||||
|
||||
processor.disconnect_db()
|
28
src/akkudoktoreos/class_sommerzeit.py
Normal file
28
src/akkudoktoreos/class_sommerzeit.py
Normal file
@@ -0,0 +1,28 @@
|
||||
import datetime
|
||||
|
||||
import pytz
|
||||
|
||||
|
||||
def ist_dst_wechsel(tag, timezone="Europe/Berlin"):
|
||||
"""Checks if Daylight Saving Time (DST) starts or ends on a given day."""
|
||||
tz = pytz.timezone(timezone)
|
||||
# Get the current day and the next day
|
||||
current_day = datetime.datetime(tag.year, tag.month, tag.day)
|
||||
next_day = current_day + datetime.timedelta(days=1)
|
||||
|
||||
# Localize the days in the given timezone
|
||||
current_day_localized = tz.localize(current_day, is_dst=None)
|
||||
next_day_localized = tz.localize(next_day, is_dst=None)
|
||||
|
||||
# Check if the UTC offsets are different (indicating a DST change)
|
||||
dst_change = current_day_localized.dst() != next_day_localized.dst()
|
||||
|
||||
return dst_change
|
||||
|
||||
|
||||
# # Example usage
|
||||
# start_date = datetime.datetime(2024, 3, 31) # Date of the DST change
|
||||
# if ist_dst_wechsel(start_date):
|
||||
# prediction_hours = 23 # Adjust to 23 hours for DST change days
|
||||
# else:
|
||||
# prediction_hours = 24 # Default value for days without DST change
|
141
src/akkudoktoreos/class_strompreis.py
Normal file
141
src/akkudoktoreos/class_strompreis.py
Normal file
@@ -0,0 +1,141 @@
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import numpy as np
|
||||
import pytz
|
||||
import requests
|
||||
|
||||
# Example: Converting a UTC timestamp to local time
|
||||
utc_time = datetime.strptime("2024-03-28T01:00:00.000Z", "%Y-%m-%dT%H:%M:%S.%fZ")
|
||||
utc_time = utc_time.replace(tzinfo=pytz.utc)
|
||||
|
||||
# Replace 'Europe/Berlin' with your own timezone
|
||||
local_time = utc_time.astimezone(pytz.timezone("Europe/Berlin"))
|
||||
print(local_time)
|
||||
|
||||
|
||||
def repeat_to_shape(array, target_shape):
|
||||
# Check if the array fits the target shape
|
||||
if len(target_shape) != array.ndim:
|
||||
raise ValueError(
|
||||
"Array and target shape must have the same number of dimensions"
|
||||
)
|
||||
|
||||
# Number of repetitions per dimension
|
||||
repeats = tuple(target_shape[i] // array.shape[i] for i in range(array.ndim))
|
||||
|
||||
# Use np.tile to expand the array
|
||||
expanded_array = np.tile(array, repeats)
|
||||
return expanded_array
|
||||
|
||||
|
||||
class HourlyElectricityPriceForecast:
|
||||
def __init__(
|
||||
self, source, cache_dir="cache", charges=0.000228, prediction_hours=24
|
||||
): # 228
|
||||
self.cache_dir = cache_dir
|
||||
os.makedirs(self.cache_dir, exist_ok=True)
|
||||
self.cache_time_file = os.path.join(self.cache_dir, "cache_timestamp.txt")
|
||||
self.prices = self.load_data(source)
|
||||
self.charges = charges
|
||||
self.prediction_hours = prediction_hours
|
||||
|
||||
def load_data(self, source):
|
||||
cache_filename = self.get_cache_filename(source)
|
||||
if source.startswith("http"):
|
||||
if os.path.exists(cache_filename) and not self.is_cache_expired():
|
||||
print("Loading data from cache...")
|
||||
with open(cache_filename, "r") as file:
|
||||
json_data = json.load(file)
|
||||
else:
|
||||
print("Loading data from the URL...")
|
||||
response = requests.get(source)
|
||||
if response.status_code == 200:
|
||||
json_data = response.json()
|
||||
with open(cache_filename, "w") as file:
|
||||
json.dump(json_data, file)
|
||||
self.update_cache_timestamp()
|
||||
else:
|
||||
raise Exception(f"Error fetching data: {response.status_code}")
|
||||
else:
|
||||
with open(source, "r") as file:
|
||||
json_data = json.load(file)
|
||||
return json_data["values"]
|
||||
|
||||
def get_cache_filename(self, url):
|
||||
hash_object = hashlib.sha256(url.encode())
|
||||
hex_dig = hash_object.hexdigest()
|
||||
return os.path.join(self.cache_dir, f"cache_{hex_dig}.json")
|
||||
|
||||
def is_cache_expired(self):
|
||||
if not os.path.exists(self.cache_time_file):
|
||||
return True
|
||||
with open(self.cache_time_file, "r") as file:
|
||||
timestamp_str = file.read()
|
||||
last_cache_time = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")
|
||||
return datetime.now() - last_cache_time > timedelta(hours=1)
|
||||
|
||||
def update_cache_timestamp(self):
|
||||
with open(self.cache_time_file, "w") as file:
|
||||
file.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
|
||||
|
||||
def get_price_for_date(self, date_str):
|
||||
"""Returns all prices for the specified date, including the price from 00:00 of the previous day."""
|
||||
# Convert date string to datetime object
|
||||
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
|
||||
|
||||
# Calculate the previous day
|
||||
previous_day = date_obj - timedelta(days=1)
|
||||
previous_day_str = previous_day.strftime("%Y-%m-%d")
|
||||
|
||||
# Extract the price from 00:00 of the previous day
|
||||
last_price_of_previous_day = [
|
||||
entry["marketpriceEurocentPerKWh"] + self.charges
|
||||
for entry in self.prices
|
||||
if previous_day_str in entry["end"]
|
||||
][-1]
|
||||
|
||||
# Extract all prices for the specified date
|
||||
date_prices = [
|
||||
entry["marketpriceEurocentPerKWh"] + self.charges
|
||||
for entry in self.prices
|
||||
if date_str in entry["end"]
|
||||
]
|
||||
print(f"getPrice: {len(date_prices)}")
|
||||
|
||||
# Add the last price of the previous day at the start of the list
|
||||
if len(date_prices) == 23:
|
||||
date_prices.insert(0, last_price_of_previous_day)
|
||||
|
||||
return np.array(date_prices) / (1000.0 * 100.0) + self.charges
|
||||
|
||||
def get_price_for_daterange(self, start_date_str, end_date_str):
|
||||
"""Returns all prices between the start and end dates."""
|
||||
print(start_date_str)
|
||||
print(end_date_str)
|
||||
start_date_utc = datetime.strptime(start_date_str, "%Y-%m-%d").replace(
|
||||
tzinfo=pytz.utc
|
||||
)
|
||||
end_date_utc = datetime.strptime(end_date_str, "%Y-%m-%d").replace(
|
||||
tzinfo=pytz.utc
|
||||
)
|
||||
start_date = start_date_utc.astimezone(pytz.timezone("Europe/Berlin"))
|
||||
end_date = end_date_utc.astimezone(pytz.timezone("Europe/Berlin"))
|
||||
|
||||
price_list = []
|
||||
|
||||
while start_date < end_date:
|
||||
date_str = start_date.strftime("%Y-%m-%d")
|
||||
daily_prices = self.get_price_for_date(date_str)
|
||||
|
||||
if daily_prices.size == 24:
|
||||
price_list.extend(daily_prices)
|
||||
start_date += timedelta(days=1)
|
||||
|
||||
# If prediction hours are greater than 0, reshape the price list
|
||||
if self.prediction_hours > 0:
|
||||
price_list = repeat_to_shape(np.array(price_list), (self.prediction_hours,))
|
||||
|
||||
return price_list
|
34
src/akkudoktoreos/config.py
Normal file
34
src/akkudoktoreos/config.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
prediction_hours = 48
|
||||
optimization_hours = 24
|
||||
strafe = 10
|
||||
moegliche_ladestroeme_in_prozent = [
|
||||
0.0,
|
||||
6.0 / 16.0,
|
||||
7.0 / 16.0,
|
||||
8.0 / 16.0,
|
||||
9.0 / 16.0,
|
||||
10.0 / 16.0,
|
||||
11.0 / 16.0,
|
||||
12.0 / 16.0,
|
||||
13.0 / 16.0,
|
||||
14.0 / 16.0,
|
||||
15.0 / 16.0,
|
||||
1.0,
|
||||
]
|
||||
|
||||
|
||||
def get_start_enddate(prediction_hours=48, startdate=None):
|
||||
############
|
||||
# Parameter
|
||||
############
|
||||
if startdate is None:
|
||||
date = (datetime.now().date() + timedelta(hours=prediction_hours)).strftime(
|
||||
"%Y-%m-%d"
|
||||
)
|
||||
date_now = datetime.now().strftime("%Y-%m-%d")
|
||||
else:
|
||||
date = (startdate + timedelta(hours=prediction_hours)).strftime("%Y-%m-%d")
|
||||
date_now = startdate.strftime("%Y-%m-%d")
|
||||
return date_now, date
|
142
src/akkudoktoreos/heatpump.py
Normal file
142
src/akkudoktoreos/heatpump.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import logging
|
||||
from typing import List, Sequence
|
||||
|
||||
|
||||
class Heatpump:
|
||||
MAX_HEATOUTPUT = 5000
|
||||
"""Maximum heating power in watts"""
|
||||
|
||||
BASE_HEATPOWER = 235.0
|
||||
"""Base heating power value"""
|
||||
|
||||
TEMPERATURE_COEFFICIENT = -11.645
|
||||
"""Coefficient for temperature"""
|
||||
|
||||
COP_BASE = 3.0
|
||||
"""Base COP value"""
|
||||
|
||||
COP_COEFFICIENT = 0.1
|
||||
"""COP increase per degree"""
|
||||
|
||||
def __init__(self, max_heat_output, prediction_hours):
|
||||
self.max_heat_output = max_heat_output
|
||||
self.prediction_hours = prediction_hours
|
||||
self.log = logging.getLogger(__name__)
|
||||
|
||||
def __check_outside_temperature_range__(self, temp_celsius: float) -> bool:
|
||||
"""Check if temperature is in valid range between -100 and 100 degree Celsius.
|
||||
|
||||
Args:
|
||||
temp_celsius: Temperature in degree Celsius
|
||||
|
||||
Returns:
|
||||
bool: True if in range
|
||||
"""
|
||||
return temp_celsius > -100 and temp_celsius < 100
|
||||
|
||||
def calculate_cop(self, outside_temperature_celsius: float) -> float:
|
||||
"""Calculate the coefficient of performance (COP) based on outside temperature. Supported
|
||||
temperate range -100 degree Celsius to 100 degree Celsius.
|
||||
|
||||
Args:
|
||||
outside_temperature_celsius: Outside temperature in degree Celsius
|
||||
|
||||
Raise:
|
||||
ValueError: If outside temperature isn't range.
|
||||
|
||||
Return:
|
||||
cop: Calculated COP based on temperature
|
||||
"""
|
||||
# TODO: Support for other temperature units (e.g Fahrenheit, Kelvin)
|
||||
# Check for sensible temperature values
|
||||
if self.__check_outside_temperature_range__(outside_temperature_celsius):
|
||||
cop = self.COP_BASE + (outside_temperature_celsius * self.COP_COEFFICIENT)
|
||||
return max(cop, 1)
|
||||
else:
|
||||
err_msg = f"Outside temperature '{outside_temperature_celsius}' not in range (min: -100 Celsius, max: 100 Celsius) "
|
||||
self.log.error(err_msg)
|
||||
raise ValueError(err_msg)
|
||||
|
||||
def calculate_heating_output(self, outside_temperature_celsius: float) -> float:
|
||||
"""Calculate the heating output in Watts based on outside temperature in degree Celsius.
|
||||
Temperature range must be between -100 and 100 degree Celsius.
|
||||
|
||||
Args:
|
||||
outside_temperature_celsius: Outside temperature in degree Celsius
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if outside temperature isn't in described range.
|
||||
|
||||
Returns:
|
||||
heating output: Calculated heating output in Watts.
|
||||
"""
|
||||
if self.__check_outside_temperature_range__(outside_temperature_celsius):
|
||||
heat_output = (
|
||||
(
|
||||
self.BASE_HEATPOWER
|
||||
+ outside_temperature_celsius * self.TEMPERATURE_COEFFICIENT
|
||||
)
|
||||
* 1000
|
||||
) / 24.0
|
||||
return min(self.max_heat_output, heat_output)
|
||||
else:
|
||||
err_msg = f"Outside temperature '{outside_temperature_celsius}' not in range (min: -100 Celsius, max: 100 Celsius) "
|
||||
self.log.error(err_msg)
|
||||
raise ValueError(err_msg)
|
||||
|
||||
def calculate_heat_power(self, outside_temperature_celsius: float) -> float:
|
||||
"""Calculate electrical power based on outside temperature (degree Celsius).
|
||||
|
||||
Args:
|
||||
outside_temperature_celsius: Temperature in range -100 to 100 degree Celsius.
|
||||
|
||||
Raises:
|
||||
ValueError: Raised if temperature isn't in described range
|
||||
|
||||
Returns:
|
||||
power: Calculated electrical power in Watt.
|
||||
"""
|
||||
if self.__check_outside_temperature_range__(outside_temperature_celsius):
|
||||
return (
|
||||
1164
|
||||
- 77.8 * outside_temperature_celsius
|
||||
+ 1.62 * outside_temperature_celsius**2.0
|
||||
)
|
||||
else:
|
||||
err_msg = f"Outside temperature '{outside_temperature_celsius}' not in range (min: -100 Celsius, max: 100 Celsius) "
|
||||
self.log.error(err_msg)
|
||||
raise ValueError(err_msg)
|
||||
|
||||
def simulate_24h(self, temperatures: Sequence[float]) -> List[float]:
|
||||
"""Simulate power data for 24 hours based on provided temperatures."""
|
||||
power_data: List[float] = []
|
||||
|
||||
if len(temperatures) != self.prediction_hours:
|
||||
raise ValueError(
|
||||
f"The temperature array must contain exactly {self.prediction_hours} entries, one for each hour of the day."
|
||||
)
|
||||
|
||||
for temp in temperatures:
|
||||
power = self.calculate_heat_power(temp)
|
||||
power_data.append(power)
|
||||
return power_data
|
||||
|
||||
|
||||
# Example usage of the class
|
||||
if __name__ == "__main__":
|
||||
max_heizleistung = 5000 # 5 kW heating power
|
||||
start_innentemperatur = 15 # Initial indoor temperature
|
||||
isolationseffizienz = 0.8 # Insulation efficiency
|
||||
gewuenschte_innentemperatur = 20 # Desired indoor temperature
|
||||
wp = Heatpump(max_heizleistung, 24) # Initialize heat pump with prediction hours
|
||||
|
||||
# Print COP for various outside temperatures
|
||||
print(wp.calculate_cop(-10), " ", wp.calculate_cop(0), " ", wp.calculate_cop(10))
|
||||
|
||||
# 24 hours of outside temperatures (example values)
|
||||
temperaturen = [ 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -5, -2, 5, ] # fmt: skip
|
||||
|
||||
# Calculate the 24-hour power data
|
||||
leistungsdaten = wp.simulate_24h(temperaturen)
|
||||
|
||||
print(leistungsdaten)
|
292
src/akkudoktoreos/visualize.py
Normal file
292
src/akkudoktoreos/visualize.py
Normal file
@@ -0,0 +1,292 @@
|
||||
import datetime
|
||||
|
||||
# Set the backend for matplotlib to Agg
|
||||
import matplotlib
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from matplotlib.backends.backend_pdf import PdfPages
|
||||
|
||||
from modules.class_sommerzeit import ist_dst_wechsel
|
||||
|
||||
matplotlib.use("Agg")
|
||||
|
||||
|
||||
def visualisiere_ergebnisse(
|
||||
gesamtlast,
|
||||
pv_forecast,
|
||||
strompreise,
|
||||
ergebnisse,
|
||||
discharge_hours,
|
||||
laden_moeglich,
|
||||
temperature,
|
||||
start_hour,
|
||||
prediction_hours,
|
||||
einspeiseverguetung_euro_pro_wh,
|
||||
filename="visualization_results.pdf",
|
||||
extra_data=None,
|
||||
):
|
||||
#####################
|
||||
# 24-hour visualization
|
||||
#####################
|
||||
with PdfPages(filename) as pdf:
|
||||
# Load and PV generation
|
||||
plt.figure(figsize=(14, 14))
|
||||
plt.subplot(3, 3, 1)
|
||||
hours = np.arange(0, prediction_hours)
|
||||
|
||||
gesamtlast_array = np.array(gesamtlast)
|
||||
# Plot individual loads
|
||||
plt.plot(hours, gesamtlast_array, label="Load (Wh)", marker="o")
|
||||
|
||||
# Calculate and plot total load
|
||||
plt.plot(
|
||||
hours,
|
||||
gesamtlast_array,
|
||||
label="Total Load (Wh)",
|
||||
marker="o",
|
||||
linewidth=2,
|
||||
linestyle="--",
|
||||
)
|
||||
plt.xlabel("Hour")
|
||||
plt.ylabel("Load (Wh)")
|
||||
plt.title("Load Profiles")
|
||||
plt.grid(True)
|
||||
plt.legend()
|
||||
|
||||
# Electricity prices
|
||||
hours_p = np.arange(0, len(strompreise))
|
||||
plt.subplot(3, 2, 2)
|
||||
plt.plot(
|
||||
hours_p,
|
||||
strompreise,
|
||||
label="Electricity Price (€/Wh)",
|
||||
color="purple",
|
||||
marker="s",
|
||||
)
|
||||
plt.title("Electricity Prices")
|
||||
plt.xlabel("Hour of the Day")
|
||||
plt.ylabel("Price (€/Wh)")
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
|
||||
# PV forecast
|
||||
plt.subplot(3, 2, 3)
|
||||
plt.plot(hours, pv_forecast, label="PV Generation (Wh)", marker="x")
|
||||
plt.title("PV Forecast")
|
||||
plt.xlabel("Hour of the Day")
|
||||
plt.ylabel("Wh")
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
|
||||
# Feed-in remuneration
|
||||
plt.subplot(3, 2, 4)
|
||||
plt.plot(
|
||||
hours,
|
||||
einspeiseverguetung_euro_pro_wh,
|
||||
label="Remuneration (€/Wh)",
|
||||
marker="x",
|
||||
)
|
||||
plt.title("Remuneration")
|
||||
plt.xlabel("Hour of the Day")
|
||||
plt.ylabel("€/Wh")
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
|
||||
# Temperature forecast
|
||||
plt.subplot(3, 2, 5)
|
||||
plt.title("Temperature Forecast (°C)")
|
||||
plt.plot(hours, temperature, label="Temperature (°C)", marker="x")
|
||||
plt.xlabel("Hour of the Day")
|
||||
plt.ylabel("°C")
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
|
||||
pdf.savefig() # Save the current figure state to the PDF
|
||||
plt.close() # Close the current figure to free up memory
|
||||
|
||||
#####################
|
||||
# Start hour visualization
|
||||
#####################
|
||||
|
||||
plt.figure(figsize=(14, 10))
|
||||
|
||||
if ist_dst_wechsel(datetime.datetime.now()):
|
||||
hours = np.arange(start_hour, prediction_hours - 1)
|
||||
else:
|
||||
hours = np.arange(start_hour, prediction_hours)
|
||||
|
||||
# Energy flow, grid feed-in, and grid consumption
|
||||
plt.subplot(3, 2, 1)
|
||||
plt.plot(hours, ergebnisse["Last_Wh_pro_Stunde"], label="Load (Wh)", marker="o")
|
||||
plt.plot(
|
||||
hours,
|
||||
ergebnisse["Haushaltsgeraet_wh_pro_stunde"],
|
||||
label="Household Device (Wh)",
|
||||
marker="o",
|
||||
)
|
||||
plt.plot(
|
||||
hours,
|
||||
ergebnisse["Netzeinspeisung_Wh_pro_Stunde"],
|
||||
label="Grid Feed-in (Wh)",
|
||||
marker="x",
|
||||
)
|
||||
plt.plot(
|
||||
hours,
|
||||
ergebnisse["Netzbezug_Wh_pro_Stunde"],
|
||||
label="Grid Consumption (Wh)",
|
||||
marker="^",
|
||||
)
|
||||
plt.plot(
|
||||
hours, ergebnisse["Verluste_Pro_Stunde"], label="Losses (Wh)", marker="^"
|
||||
)
|
||||
plt.title("Energy Flow per Hour")
|
||||
plt.xlabel("Hour")
|
||||
plt.ylabel("Energy (Wh)")
|
||||
plt.legend()
|
||||
|
||||
# State of charge for batteries
|
||||
plt.subplot(3, 2, 2)
|
||||
plt.plot(
|
||||
hours, ergebnisse["akku_soc_pro_stunde"], label="PV Battery (%)", marker="x"
|
||||
)
|
||||
plt.plot(
|
||||
hours,
|
||||
ergebnisse["E-Auto_SoC_pro_Stunde"],
|
||||
label="E-Car Battery (%)",
|
||||
marker="x",
|
||||
)
|
||||
plt.legend(
|
||||
loc="upper left", bbox_to_anchor=(1, 1)
|
||||
) # Place legend outside the plot
|
||||
plt.grid(True, which="both", axis="x") # Grid for every hour
|
||||
|
||||
ax1 = plt.subplot(3, 2, 3)
|
||||
for hour, value in enumerate(discharge_hours):
|
||||
ax1.axvspan(
|
||||
hour,
|
||||
hour + 1,
|
||||
color="red",
|
||||
ymax=value,
|
||||
alpha=0.3,
|
||||
label="Discharge Possibility" if hour == 0 else "",
|
||||
)
|
||||
for hour, value in enumerate(laden_moeglich):
|
||||
ax1.axvspan(
|
||||
hour,
|
||||
hour + 1,
|
||||
color="green",
|
||||
ymax=value,
|
||||
alpha=0.3,
|
||||
label="Charging Possibility" if hour == 0 else "",
|
||||
)
|
||||
ax1.legend(loc="upper left")
|
||||
ax1.set_xlim(0, prediction_hours)
|
||||
|
||||
pdf.savefig() # Save the current figure state to the PDF
|
||||
plt.close() # Close the current figure to free up memory
|
||||
|
||||
# Financial overview
|
||||
fig, axs = plt.subplots(1, 2, figsize=(14, 10)) # Create a 1x2 grid of subplots
|
||||
total_costs = ergebnisse["Gesamtkosten_Euro"]
|
||||
total_revenue = ergebnisse["Gesamteinnahmen_Euro"]
|
||||
total_balance = ergebnisse["Gesamtbilanz_Euro"]
|
||||
losses = ergebnisse["Gesamt_Verluste"]
|
||||
|
||||
# Costs and revenues per hour on the first axis (axs[0])
|
||||
axs[0].plot(
|
||||
hours,
|
||||
ergebnisse["Kosten_Euro_pro_Stunde"],
|
||||
label="Costs (Euro)",
|
||||
marker="o",
|
||||
color="red",
|
||||
)
|
||||
axs[0].plot(
|
||||
hours,
|
||||
ergebnisse["Einnahmen_Euro_pro_Stunde"],
|
||||
label="Revenue (Euro)",
|
||||
marker="x",
|
||||
color="green",
|
||||
)
|
||||
axs[0].set_title("Financial Balance per Hour")
|
||||
axs[0].set_xlabel("Hour")
|
||||
axs[0].set_ylabel("Euro")
|
||||
axs[0].legend()
|
||||
axs[0].grid(True)
|
||||
|
||||
# Summary of finances on the second axis (axs[1])
|
||||
labels = ["Total Costs [€]", "Total Revenue [€]", "Total Balance [€]"]
|
||||
values = [total_costs, total_revenue, total_balance]
|
||||
colors = ["red" if value > 0 else "green" for value in values]
|
||||
axs[1].bar(labels, values, color=colors)
|
||||
axs[1].set_title("Financial Overview")
|
||||
axs[1].set_ylabel("Euro")
|
||||
|
||||
# Second axis (ax2) for losses, shared with axs[1]
|
||||
ax2 = axs[1].twinx()
|
||||
ax2.bar("Total Losses", losses, color="blue")
|
||||
ax2.set_ylabel("Losses [Wh]", color="blue")
|
||||
ax2.tick_params(axis="y", labelcolor="blue")
|
||||
|
||||
pdf.savefig() # Save the complete figure to the PDF
|
||||
plt.close() # Close the figure
|
||||
|
||||
# Additional data visualization if provided
|
||||
if extra_data is not None:
|
||||
plt.figure(figsize=(14, 10))
|
||||
plt.subplot(1, 2, 1)
|
||||
f1 = np.array(extra_data["verluste"])
|
||||
f2 = np.array(extra_data["bilanz"])
|
||||
n1 = np.array(extra_data["nebenbedingung"])
|
||||
scatter = plt.scatter(f1, f2, c=n1, cmap="viridis")
|
||||
|
||||
# Add color legend
|
||||
plt.colorbar(scatter, label="Constraint")
|
||||
|
||||
pdf.savefig() # Save the complete figure to the PDF
|
||||
plt.close() # Close the figure
|
||||
|
||||
plt.figure(figsize=(14, 10))
|
||||
filtered_losses = np.array(
|
||||
[
|
||||
v
|
||||
for v, n in zip(
|
||||
extra_data["verluste"], extra_data["nebenbedingung"]
|
||||
)
|
||||
if n < 0.01
|
||||
]
|
||||
)
|
||||
filtered_balance = np.array(
|
||||
[
|
||||
b
|
||||
for b, n in zip(extra_data["bilanz"], extra_data["nebenbedingung"])
|
||||
if n < 0.01
|
||||
]
|
||||
)
|
||||
if filtered_losses.size != 0:
|
||||
best_loss = min(filtered_losses)
|
||||
worst_loss = max(filtered_losses)
|
||||
best_balance = min(filtered_balance)
|
||||
worst_balance = max(filtered_balance)
|
||||
|
||||
data = [filtered_losses, filtered_balance]
|
||||
labels = ["Losses", "Balance"]
|
||||
# Create plots
|
||||
fig, axs = plt.subplots(
|
||||
1, 2, figsize=(10, 6), sharey=False
|
||||
) # Two subplots, separate y-axes
|
||||
|
||||
# First violin plot for losses
|
||||
axs[0].violinplot(data[0], showmeans=True, showmedians=True)
|
||||
axs[0].set_title("Losses")
|
||||
axs[0].set_xticklabels(["Losses"])
|
||||
|
||||
# Second violin plot for balance
|
||||
axs[1].violinplot(data[1], showmeans=True, showmedians=True)
|
||||
axs[1].set_title("Balance")
|
||||
axs[1].set_xticklabels(["Balance"])
|
||||
|
||||
# Fine-tuning
|
||||
plt.tight_layout()
|
||||
|
||||
pdf.savefig() # Save the current figure state to the PDF
|
||||
plt.close() # Close the figure
|
286
src/akkudoktoreosserver/flask_server.py
Executable file
286
src/akkudoktoreosserver/flask_server.py
Executable file
@@ -0,0 +1,286 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
import matplotlib
|
||||
|
||||
# Sets the Matplotlib backend to 'Agg' for rendering plots in environments without a display
|
||||
matplotlib.use("Agg")
|
||||
|
||||
import pandas as pd
|
||||
from flask import Flask, jsonify, redirect, request, send_from_directory, url_for
|
||||
|
||||
from modules.class_load import LoadForecast
|
||||
from modules.class_load_container import Gesamtlast
|
||||
from modules.class_load_corrector import LoadPredictionAdjuster
|
||||
from modules.class_optimize import isfloat, optimization_problem
|
||||
from modules.class_pv_forecast import PVForecast
|
||||
from modules.class_strompreis import HourlyElectricityPriceForecast
|
||||
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from config import get_start_enddate, optimization_hours, prediction_hours
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
opt_class = optimization_problem(
|
||||
prediction_hours=prediction_hours, strafe=10, optimization_hours=optimization_hours
|
||||
)
|
||||
|
||||
|
||||
@app.route("/strompreis", methods=["GET"])
|
||||
def flask_strompreis():
|
||||
# Get the current date and the end date based on prediction hours
|
||||
date_now, date = get_start_enddate(
|
||||
prediction_hours, startdate=datetime.now().date()
|
||||
)
|
||||
filepath = os.path.join(
|
||||
r"test_data", r"strompreise_akkudokAPI.json"
|
||||
) # Adjust the path to the JSON file
|
||||
price_forecast = HourlyElectricityPriceForecast(
|
||||
source=f"https://api.akkudoktor.net/prices?start={date_now}&end={date}",
|
||||
prediction_hours=prediction_hours,
|
||||
)
|
||||
specific_date_prices = price_forecast.get_price_for_daterange(
|
||||
date_now, date
|
||||
) # Fetch prices for the specified date range
|
||||
return jsonify(specific_date_prices.tolist())
|
||||
|
||||
|
||||
# Endpoint to handle total load calculation based on the latest measured data
|
||||
@app.route("/gesamtlast", methods=["POST"])
|
||||
def flask_gesamtlast():
|
||||
# Retrieve data from the JSON body
|
||||
data = request.get_json()
|
||||
|
||||
# Extract year_energy and prediction_hours from the request JSON
|
||||
year_energy = float(data.get("year_energy"))
|
||||
prediction_hours = int(
|
||||
data.get("hours", 48)
|
||||
) # Default to 48 hours if not specified
|
||||
|
||||
# Measured data in JSON format
|
||||
measured_data_json = data.get("measured_data")
|
||||
measured_data = pd.DataFrame(measured_data_json)
|
||||
measured_data["time"] = pd.to_datetime(measured_data["time"])
|
||||
|
||||
# Ensure datetime has timezone info for accurate calculations
|
||||
if measured_data["time"].dt.tz is None:
|
||||
measured_data["time"] = measured_data["time"].dt.tz_localize("Europe/Berlin")
|
||||
else:
|
||||
measured_data["time"] = measured_data["time"].dt.tz_convert("Europe/Berlin")
|
||||
|
||||
# Remove timezone info after conversion to simplify further processing
|
||||
measured_data["time"] = measured_data["time"].dt.tz_localize(None)
|
||||
|
||||
# Instantiate LoadForecast and generate forecast data
|
||||
file_path = os.path.join("data", "load_profiles.npz")
|
||||
lf = LoadForecast(filepath=file_path, year_energy=year_energy)
|
||||
forecast_list = []
|
||||
|
||||
# Generate daily forecasts for the date range based on measured data
|
||||
for single_date in pd.date_range(
|
||||
measured_data["time"].min().date(), measured_data["time"].max().date()
|
||||
):
|
||||
date_str = single_date.strftime("%Y-%m-%d")
|
||||
daily_forecast = lf.get_daily_stats(date_str)
|
||||
mean_values = daily_forecast[0]
|
||||
hours = [single_date + pd.Timedelta(hours=i) for i in range(24)]
|
||||
daily_forecast_df = pd.DataFrame({"time": hours, "Last Pred": mean_values})
|
||||
forecast_list.append(daily_forecast_df)
|
||||
|
||||
# Concatenate all daily forecasts into a single DataFrame
|
||||
predicted_data = pd.concat(forecast_list, ignore_index=True)
|
||||
|
||||
# Create LoadPredictionAdjuster instance to adjust the predictions based on measured data
|
||||
adjuster = LoadPredictionAdjuster(measured_data, predicted_data, lf)
|
||||
adjuster.calculate_weighted_mean() # Calculate weighted mean for adjustment
|
||||
adjuster.adjust_predictions() # Adjust predictions based on measured data
|
||||
future_predictions = adjuster.predict_next_hours(
|
||||
prediction_hours
|
||||
) # Predict future load
|
||||
|
||||
# Extract household power predictions
|
||||
leistung_haushalt = future_predictions["Adjusted Pred"].values
|
||||
gesamtlast = Gesamtlast(prediction_hours=prediction_hours)
|
||||
gesamtlast.hinzufuegen(
|
||||
"Haushalt", leistung_haushalt
|
||||
) # Add household load to total load calculation
|
||||
|
||||
# Calculate the total load
|
||||
last = gesamtlast.gesamtlast_berechnen() # Compute total load
|
||||
return jsonify(last.tolist())
|
||||
|
||||
|
||||
@app.route("/gesamtlast_simple", methods=["GET"])
|
||||
def flask_gesamtlast_simple():
|
||||
if request.method == "GET":
|
||||
year_energy = float(
|
||||
request.args.get("year_energy")
|
||||
) # Get annual energy value from query parameters
|
||||
date_now, date = get_start_enddate(
|
||||
prediction_hours, startdate=datetime.now().date()
|
||||
) # Get the current date and prediction end date
|
||||
|
||||
###############
|
||||
# Load Forecast
|
||||
###############
|
||||
file_path = os.path.join("data", "load_profiles.npz")
|
||||
|
||||
lf = LoadForecast(
|
||||
filepath=file_path, year_energy=year_energy
|
||||
) # Instantiate LoadForecast with specified parameters
|
||||
leistung_haushalt = lf.get_stats_for_date_range(date_now, date)[
|
||||
0
|
||||
] # Get expected household load for the date range
|
||||
|
||||
gesamtlast = Gesamtlast(
|
||||
prediction_hours=prediction_hours
|
||||
) # Create Gesamtlast instance
|
||||
gesamtlast.hinzufuegen(
|
||||
"Haushalt", leistung_haushalt
|
||||
) # Add household load to total load calculation
|
||||
|
||||
# ###############
|
||||
# # WP (Heat Pump)
|
||||
# ##############
|
||||
# leistung_wp = wp.simulate_24h(temperature_forecast) # Simulate heat pump load for 24 hours
|
||||
# gesamtlast.hinzufuegen("Heatpump", leistung_wp) # Add heat pump load to total load calculation
|
||||
|
||||
last = gesamtlast.gesamtlast_berechnen() # Calculate total load
|
||||
print(last) # Output total load
|
||||
return jsonify(last.tolist()) # Return total load as JSON
|
||||
|
||||
|
||||
@app.route("/pvforecast", methods=["GET"])
|
||||
def flask_pvprognose():
|
||||
if request.method == "GET":
|
||||
# Retrieve URL and AC power measurement from query parameters
|
||||
url = request.args.get("url")
|
||||
ac_power_measurement = request.args.get("ac_power_measurement")
|
||||
date_now, date = get_start_enddate(
|
||||
prediction_hours, startdate=datetime.now().date()
|
||||
)
|
||||
|
||||
###############
|
||||
# PV Forecast
|
||||
###############
|
||||
PVforecast = PVForecast(
|
||||
prediction_hours=prediction_hours, url=url
|
||||
) # Instantiate PVForecast with given parameters
|
||||
if isfloat(
|
||||
ac_power_measurement
|
||||
): # Check if the AC power measurement is a valid float
|
||||
PVforecast.update_ac_power_measurement(
|
||||
date_time=datetime.now(),
|
||||
ac_power_measurement=float(ac_power_measurement),
|
||||
) # Update measurement
|
||||
|
||||
# Get PV forecast and temperature forecast for the specified date range
|
||||
pv_forecast = PVforecast.get_pv_forecast_for_date_range(date_now, date)
|
||||
temperature_forecast = PVforecast.get_temperature_for_date_range(date_now, date)
|
||||
|
||||
# Return both forecasts as a JSON response
|
||||
ret = {
|
||||
"temperature": temperature_forecast.tolist(),
|
||||
"pvpower": pv_forecast.tolist(),
|
||||
}
|
||||
return jsonify(ret)
|
||||
|
||||
|
||||
@app.route("/optimize", methods=["POST"])
|
||||
def flask_optimize():
|
||||
if request.method == "POST":
|
||||
from datetime import datetime
|
||||
|
||||
# Retrieve optimization parameters from the request JSON
|
||||
parameter = request.json
|
||||
|
||||
# Check for required parameters
|
||||
required_parameters = [
|
||||
"preis_euro_pro_wh_akku",
|
||||
"strompreis_euro_pro_wh",
|
||||
"gesamtlast",
|
||||
"pv_akku_cap",
|
||||
"einspeiseverguetung_euro_pro_wh",
|
||||
"pv_forecast",
|
||||
"temperature_forecast",
|
||||
"eauto_min_soc",
|
||||
"eauto_cap",
|
||||
"eauto_charge_efficiency",
|
||||
"eauto_charge_power",
|
||||
"eauto_soc",
|
||||
"pv_soc",
|
||||
"start_solution",
|
||||
"haushaltsgeraet_dauer",
|
||||
"haushaltsgeraet_wh",
|
||||
]
|
||||
# Identify any missing parameters
|
||||
missing_params = [p for p in required_parameters if p not in parameter]
|
||||
if missing_params:
|
||||
return jsonify(
|
||||
{"error": f"Missing parameter: {', '.join(missing_params)}"}
|
||||
), 400 # Return error for missing parameters
|
||||
|
||||
# Perform optimization simulation
|
||||
result = opt_class.optimierung_ems(
|
||||
parameter=parameter, start_hour=datetime.now().hour
|
||||
)
|
||||
|
||||
# Optional min SoC PV Battery
|
||||
if "min_soc_prozent" not in parameter:
|
||||
parameter["min_soc_prozent"] = None
|
||||
|
||||
return jsonify(result) # Return optimization results as JSON
|
||||
|
||||
|
||||
@app.route("/visualisierungsergebnisse.pdf")
|
||||
def get_pdf():
|
||||
# Endpoint to serve the generated PDF with visualization results
|
||||
return send_from_directory(
|
||||
"", "visualisierungsergebnisse.pdf"
|
||||
) # Adjust the directory if needed
|
||||
|
||||
|
||||
@app.route("/site-map")
|
||||
def site_map():
|
||||
# Function to generate a site map of valid routes in the application
|
||||
def print_links(links):
|
||||
content = "<h1>Valid routes</h1><ul>"
|
||||
for link in links:
|
||||
content += f"<li><a href='{link}'>{link}</a></li>"
|
||||
content += "</ul>"
|
||||
return content
|
||||
|
||||
# Check if the route has no empty parameters
|
||||
def has_no_empty_params(rule):
|
||||
defaults = rule.defaults if rule.defaults is not None else ()
|
||||
arguments = rule.arguments if rule.arguments is not None else ()
|
||||
return len(defaults) >= len(arguments)
|
||||
|
||||
# Collect all valid GET routes without empty parameters
|
||||
links = []
|
||||
for rule in app.url_map.iter_rules():
|
||||
if "GET" in rule.methods and has_no_empty_params(rule):
|
||||
url = url_for(rule.endpoint, **(rule.defaults or {}))
|
||||
links.append(url)
|
||||
return print_links(sorted(links)) # Return the sorted links as HTML
|
||||
|
||||
|
||||
@app.route("/")
|
||||
def root():
|
||||
# Redirect the root URL to the site map
|
||||
return redirect("/site-map", code=302)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
# Set host and port from environment variables or defaults
|
||||
host = os.getenv("FLASK_RUN_HOST", "0.0.0.0")
|
||||
port = os.getenv("FLASK_RUN_PORT", 8503)
|
||||
app.run(debug=True, host=host, port=port) # Run the Flask application
|
||||
except Exception as e:
|
||||
print(
|
||||
f"Could not bind to host {host}:{port}. Error: {e}"
|
||||
) # Error handling for binding issues
|
Reference in New Issue
Block a user