Apply isort and ruff code style

This commit is contained in:
Michael Osthege 2024-10-03 11:05:44 +02:00 committed by Andreas
parent 05a3c1a5bb
commit a4d178d250
23 changed files with 1787 additions and 866 deletions

View File

@ -1,28 +1,38 @@
from datetime import datetime, timedelta
prediction_hours=48
optimization_hours=24
strafe=10
moegliche_ladestroeme_in_prozent = [0.0 ,6.0/16.0, 7.0/16.0, 8.0/16.0, 9.0/16.0, 10.0/16.0, 11.0/16.0, 12.0/16.0, 13.0/16.0, 14.0/16.0, 15.0/16.0, 1.0 ]
prediction_hours = 48
optimization_hours = 24
strafe = 10
moegliche_ladestroeme_in_prozent = [
0.0,
6.0 / 16.0,
7.0 / 16.0,
8.0 / 16.0,
9.0 / 16.0,
10.0 / 16.0,
11.0 / 16.0,
12.0 / 16.0,
13.0 / 16.0,
14.0 / 16.0,
15.0 / 16.0,
1.0,
]
# Optional
db_config = {
'user': 'eos',
'password': 'eos',
'host': '127.0.0.1',
'database': 'eos'
}
db_config = {"user": "eos", "password": "eos", "host": "127.0.0.1", "database": "eos"}
def get_start_enddate(prediction_hours=48,startdate=None):
############
# Parameter
############
if startdate == None:
date = (datetime.now().date() + timedelta(hours = prediction_hours)).strftime("%Y-%m-%d")
date_now = datetime.now().strftime("%Y-%m-%d")
else:
date = (startdate + timedelta(hours = prediction_hours)).strftime("%Y-%m-%d")
date_now = startdate.strftime("%Y-%m-%d")
return date_now,date
def get_start_enddate(prediction_hours=48, startdate=None):
############
# Parameter
############
if startdate == None:
date = (datetime.now().date() + timedelta(hours=prediction_hours)).strftime(
"%Y-%m-%d"
)
date_now = datetime.now().strftime("%Y-%m-%d")
else:
date = (startdate + timedelta(hours=prediction_hours)).strftime("%Y-%m-%d")
date_now = startdate.strftime("%Y-%m-%d")
return date_now, date

View File

@ -1,19 +1,18 @@
#!/usr/bin/env python3
import os
import random
from pprint import pprint
import matplotlib
matplotlib.use('Agg') # Sets the Matplotlib backend to 'Agg' for rendering plots in environments without a display
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from deap import base, creator, tools, algorithms
from flask import Flask, jsonify, request, redirect, send_from_directory, url_for
matplotlib.use(
"Agg"
) # Sets the Matplotlib backend to 'Agg' for rendering plots in environments without a display
from datetime import datetime, timedelta
import pandas as pd
from config import *
from flask import Flask, jsonify, redirect, request, send_from_directory, url_for
from modules.class_akku import *
from modules.class_ems import *
from modules.class_heatpump import *
@ -26,10 +25,12 @@ from modules.class_soc_calc import *
from modules.class_sommerzeit import *
from modules.class_strompreis import *
from modules.visualize import *
from datetime import datetime, timedelta
app = Flask(__name__)
opt_class = optimization_problem(prediction_hours=prediction_hours, strafe=10, optimization_hours=optimization_hours)
opt_class = optimization_problem(
prediction_hours=prediction_hours, strafe=10, optimization_hours=optimization_hours
)
# @app.route('/last_correction', methods=['GET'])
# def flask_last_correction():
@ -53,7 +54,8 @@ opt_class = optimization_problem(prediction_hours=prediction_hours, strafe=10, o
# print(last)
# return jsonify(last.tolist())
@app.route('/soc', methods=['GET'])
@app.route("/soc", methods=["GET"])
def flask_soc():
# MariaDB connection details
config = db_config
@ -66,15 +68,24 @@ def flask_soc():
bat_capacity = 33 * 1000 / 48 # Battery capacity in watt-hours
# Define the reference time point (3 weeks ago)
zeitpunkt_x = (datetime.now() - timedelta(weeks=3)).strftime('%Y-%m-%d %H:%M:%S')
zeitpunkt_x = (datetime.now() - timedelta(weeks=3)).strftime("%Y-%m-%d %H:%M:%S")
# Instantiate BatteryDataProcessor and perform calculations
processor = BatteryDataProcessor(config, voltage_high_threshold, voltage_low_threshold, current_low_threshold, gap, bat_capacity)
processor = BatteryDataProcessor(
config,
voltage_high_threshold,
voltage_low_threshold,
current_low_threshold,
gap,
bat_capacity,
)
processor.connect_db()
processor.fetch_data(zeitpunkt_x)
processor.process_data()
last_points_100_df, last_points_0_df = processor.find_soc_points()
soc_df, integration_results = processor.calculate_resetting_soc(last_points_100_df, last_points_0_df)
soc_df, integration_results = processor.calculate_resetting_soc(
last_points_100_df, last_points_0_df
)
# soh_df = processor.calculate_soh(integration_results) # Optional State of Health calculation
processor.update_database_with_soc(soc_df) # Update database with SOC data
# processor.plot_data(last_points_100_df, last_points_0_df, soc_df) # Optional data visualization
@ -82,50 +93,65 @@ def flask_soc():
return jsonify("Done")
@app.route('/strompreis', methods=['GET'])
@app.route("/strompreis", methods=["GET"])
def flask_strompreis():
# Get the current date and the end date based on prediction hours
date_now, date = get_start_enddate(prediction_hours, startdate=datetime.now().date())
filepath = os.path.join(r'test_data', r'strompreise_akkudokAPI.json') # Adjust the path to the JSON file
price_forecast = HourlyElectricityPriceForecast(source=f"https://api.akkudoktor.net/prices?start={date_now}&end={date}", prediction_hours=prediction_hours)
specific_date_prices = price_forecast.get_price_for_daterange(date_now, date) # Fetch prices for the specified date range
date_now, date = get_start_enddate(
prediction_hours, startdate=datetime.now().date()
)
filepath = os.path.join(
r"test_data", r"strompreise_akkudokAPI.json"
) # Adjust the path to the JSON file
price_forecast = HourlyElectricityPriceForecast(
source=f"https://api.akkudoktor.net/prices?start={date_now}&end={date}",
prediction_hours=prediction_hours,
)
specific_date_prices = price_forecast.get_price_for_daterange(
date_now, date
) # Fetch prices for the specified date range
return jsonify(specific_date_prices.tolist())
# Endpoint to handle total load calculation based on the latest measured data
@app.route('/gesamtlast', methods=['POST'])
@app.route("/gesamtlast", methods=["POST"])
def flask_gesamtlast():
# Retrieve data from the JSON body
data = request.get_json()
# Extract year_energy and prediction_hours from the request JSON
year_energy = float(data.get("year_energy"))
prediction_hours = int(data.get("hours", 48)) # Default to 48 hours if not specified
prediction_hours = int(
data.get("hours", 48)
) # Default to 48 hours if not specified
# Measured data in JSON format
measured_data_json = data.get("measured_data")
measured_data = pd.DataFrame(measured_data_json)
measured_data['time'] = pd.to_datetime(measured_data['time'])
measured_data["time"] = pd.to_datetime(measured_data["time"])
# Ensure datetime has timezone info for accurate calculations
if measured_data['time'].dt.tz is None:
measured_data['time'] = measured_data['time'].dt.tz_localize('Europe/Berlin')
if measured_data["time"].dt.tz is None:
measured_data["time"] = measured_data["time"].dt.tz_localize("Europe/Berlin")
else:
measured_data['time'] = measured_data['time'].dt.tz_convert('Europe/Berlin')
measured_data["time"] = measured_data["time"].dt.tz_convert("Europe/Berlin")
# Remove timezone info after conversion to simplify further processing
measured_data['time'] = measured_data['time'].dt.tz_localize(None)
measured_data["time"] = measured_data["time"].dt.tz_localize(None)
# Instantiate LoadForecast and generate forecast data
lf = LoadForecast(filepath=r'load_profiles.npz', year_energy=year_energy)
lf = LoadForecast(filepath=r"load_profiles.npz", year_energy=year_energy)
forecast_list = []
# Generate daily forecasts for the date range based on measured data
for single_date in pd.date_range(measured_data['time'].min().date(), measured_data['time'].max().date()):
date_str = single_date.strftime('%Y-%m-%d')
for single_date in pd.date_range(
measured_data["time"].min().date(), measured_data["time"].max().date()
):
date_str = single_date.strftime("%Y-%m-%d")
daily_forecast = lf.get_daily_stats(date_str)
mean_values = daily_forecast[0]
hours = [single_date + pd.Timedelta(hours=i) for i in range(24)]
daily_forecast_df = pd.DataFrame({'time': hours, 'Last Pred': mean_values})
daily_forecast_df = pd.DataFrame({"time": hours, "Last Pred": mean_values})
forecast_list.append(daily_forecast_df)
# Concatenate all daily forecasts into a single DataFrame
@ -135,17 +161,22 @@ def flask_gesamtlast():
adjuster = LoadPredictionAdjuster(measured_data, predicted_data, lf)
adjuster.calculate_weighted_mean() # Calculate weighted mean for adjustment
adjuster.adjust_predictions() # Adjust predictions based on measured data
future_predictions = adjuster.predict_next_hours(prediction_hours) # Predict future load
future_predictions = adjuster.predict_next_hours(
prediction_hours
) # Predict future load
# Extract household power predictions
leistung_haushalt = future_predictions['Adjusted Pred'].values
leistung_haushalt = future_predictions["Adjusted Pred"].values
gesamtlast = Gesamtlast(prediction_hours=prediction_hours)
gesamtlast.hinzufuegen("Haushalt", leistung_haushalt) # Add household load to total load calculation
gesamtlast.hinzufuegen(
"Haushalt", leistung_haushalt
) # Add household load to total load calculation
# Calculate the total load
last = gesamtlast.gesamtlast_berechnen() # Compute total load
return jsonify(last.tolist())
# @app.route('/gesamtlast', methods=['GET'])
# def flask_gesamtlast():
# if request.method == 'GET':
@ -207,20 +238,33 @@ def flask_gesamtlast():
# print(last) # Output total load
# return jsonify(last.tolist()) # Return total load as JSON
@app.route('/gesamtlast_simple', methods=['GET'])
@app.route("/gesamtlast_simple", methods=["GET"])
def flask_gesamtlast_simple():
if request.method == 'GET':
year_energy = float(request.args.get("year_energy")) # Get annual energy value from query parameters
date_now, date = get_start_enddate(prediction_hours, startdate=datetime.now().date()) # Get the current date and prediction end date
if request.method == "GET":
year_energy = float(
request.args.get("year_energy")
) # Get annual energy value from query parameters
date_now, date = get_start_enddate(
prediction_hours, startdate=datetime.now().date()
) # Get the current date and prediction end date
###############
# Load Forecast
###############
lf = LoadForecast(filepath=r'load_profiles.npz', year_energy=year_energy) # Instantiate LoadForecast with specified parameters
leistung_haushalt = lf.get_stats_for_date_range(date_now, date)[0] # Get expected household load for the date range
lf = LoadForecast(
filepath=r"load_profiles.npz", year_energy=year_energy
) # Instantiate LoadForecast with specified parameters
leistung_haushalt = lf.get_stats_for_date_range(date_now, date)[
0
] # Get expected household load for the date range
gesamtlast = Gesamtlast(prediction_hours=prediction_hours) # Create Gesamtlast instance
gesamtlast.hinzufuegen("Haushalt", leistung_haushalt) # Add household load to total load calculation
gesamtlast = Gesamtlast(
prediction_hours=prediction_hours
) # Create Gesamtlast instance
gesamtlast.hinzufuegen(
"Haushalt", leistung_haushalt
) # Add household load to total load calculation
# ###############
# # WP (Heat Pump)
@ -233,56 +277,91 @@ def flask_gesamtlast_simple():
return jsonify(last.tolist()) # Return total load as JSON
@app.route('/pvforecast', methods=['GET'])
@app.route("/pvforecast", methods=["GET"])
def flask_pvprognose():
if request.method == 'GET':
if request.method == "GET":
# Retrieve URL and AC power measurement from query parameters
url = request.args.get("url")
ac_power_measurement = request.args.get("ac_power_measurement")
date_now, date = get_start_enddate(prediction_hours, startdate=datetime.now().date())
date_now, date = get_start_enddate(
prediction_hours, startdate=datetime.now().date()
)
###############
# PV Forecast
###############
PVforecast = PVForecast(prediction_hours=prediction_hours, url=url) # Instantiate PVForecast with given parameters
if isfloat(ac_power_measurement): # Check if the AC power measurement is a valid float
PVforecast.update_ac_power_measurement(date_time=datetime.now(), ac_power_measurement=float(ac_power_measurement)) # Update measurement
PVforecast = PVForecast(
prediction_hours=prediction_hours, url=url
) # Instantiate PVForecast with given parameters
if isfloat(
ac_power_measurement
): # Check if the AC power measurement is a valid float
PVforecast.update_ac_power_measurement(
date_time=datetime.now(),
ac_power_measurement=float(ac_power_measurement),
) # Update measurement
# Get PV forecast and temperature forecast for the specified date range
pv_forecast = PVforecast.get_pv_forecast_for_date_range(date_now, date)
temperature_forecast = PVforecast.get_temperature_for_date_range(date_now, date)
# Return both forecasts as a JSON response
ret = {"temperature": temperature_forecast.tolist(), "pvpower": pv_forecast.tolist()}
ret = {
"temperature": temperature_forecast.tolist(),
"pvpower": pv_forecast.tolist(),
}
return jsonify(ret)
@app.route('/optimize', methods=['POST'])
@app.route("/optimize", methods=["POST"])
def flask_optimize():
if request.method == 'POST':
if request.method == "POST":
from datetime import datetime
# Retrieve optimization parameters from the request JSON
parameter = request.json
# Check for required parameters
required_parameters = ['preis_euro_pro_wh_akku', 'strompreis_euro_pro_wh', "gesamtlast", 'pv_akku_cap',
"einspeiseverguetung_euro_pro_wh", 'pv_forecast', 'temperature_forecast',
'eauto_min_soc', "eauto_cap", "eauto_charge_efficiency", "eauto_charge_power",
"eauto_soc", "pv_soc", "start_solution", "haushaltsgeraet_dauer",
"haushaltsgeraet_wh"]
required_parameters = [
"preis_euro_pro_wh_akku",
"strompreis_euro_pro_wh",
"gesamtlast",
"pv_akku_cap",
"einspeiseverguetung_euro_pro_wh",
"pv_forecast",
"temperature_forecast",
"eauto_min_soc",
"eauto_cap",
"eauto_charge_efficiency",
"eauto_charge_power",
"eauto_soc",
"pv_soc",
"start_solution",
"haushaltsgeraet_dauer",
"haushaltsgeraet_wh",
]
# Identify any missing parameters
missing_params = [p for p in required_parameters if p not in parameter]
if missing_params:
return jsonify({"error": f"Missing parameter: {', '.join(missing_params)}"}), 400 # Return error for missing parameters
return jsonify(
{"error": f"Missing parameter: {', '.join(missing_params)}"}
), 400 # Return error for missing parameters
# Perform optimization simulation
result = opt_class.optimierung_ems(parameter=parameter, start_hour=datetime.now().hour)
result = opt_class.optimierung_ems(
parameter=parameter, start_hour=datetime.now().hour
)
return jsonify(result) # Return optimization results as JSON
@app.route('/visualisierungsergebnisse.pdf')
@app.route("/visualisierungsergebnisse.pdf")
def get_pdf():
# Endpoint to serve the generated PDF with visualization results
return send_from_directory('', 'visualisierungsergebnisse.pdf') # Adjust the directory if needed
return send_from_directory(
"", "visualisierungsergebnisse.pdf"
) # Adjust the directory if needed
@app.route("/site-map")
def site_map():
@ -299,28 +378,32 @@ def site_map():
defaults = rule.defaults if rule.defaults is not None else ()
arguments = rule.arguments if rule.arguments is not None else ()
return len(defaults) >= len(arguments)
# Collect all valid GET routes without empty parameters
links = []
for rule in app.url_map.iter_rules():
if "GET" in rule.methods and has_no_empty_params(rule):
url = url_for(rule.endpoint, **(rule.defaults or {}))
links.append(url)
return print_links(sorted(links))# Return the sorted links as HTML
return print_links(sorted(links)) # Return the sorted links as HTML
@app.route('/')
@app.route("/")
def root():
# Redirect the root URL to the site map
return redirect("/site-map", code=302)
if __name__ == '__main__':
if __name__ == "__main__":
try:
# Set host and port from environment variables or defaults
host = os.getenv("FLASK_RUN_HOST", "0.0.0.0")
port = os.getenv("FLASK_RUN_PORT", 8503)
app.run(debug=True, host=host, port=port) # Run the Flask application
except Exception as e:
print(f"Could not bind to host {host}:{port}. Error: {e}") # Error handling for binding issues
print(
f"Could not bind to host {host}:{port}. Error: {e}"
) # Error handling for binding issues
# PV Forecast:
# object {

View File

@ -1,27 +1,40 @@
import numpy as np
class PVAkku:
def __init__(self, kapazitaet_wh=None, hours=None, lade_effizienz=0.88, entlade_effizienz=0.88,
max_ladeleistung_w=None, start_soc_prozent=0, min_soc_prozent=0, max_soc_prozent=100):
def __init__(
self,
kapazitaet_wh=None,
hours=None,
lade_effizienz=0.88,
entlade_effizienz=0.88,
max_ladeleistung_w=None,
start_soc_prozent=0,
min_soc_prozent=0,
max_soc_prozent=100,
):
# Battery capacity in Wh
self.kapazitaet_wh = kapazitaet_wh
# Initial state of charge in Wh
self.start_soc_prozent = start_soc_prozent
self.soc_wh = (start_soc_prozent / 100) * kapazitaet_wh
self.hours = hours if hours is not None else 24 # Default to 24 hours if not specified
self.hours = (
hours if hours is not None else 24
) # Default to 24 hours if not specified
self.discharge_array = np.full(self.hours, 1)
self.charge_array = np.full(self.hours, 1)
# Charge and discharge efficiency
self.lade_effizienz = lade_effizienz
self.entlade_effizienz = entlade_effizienz
self.max_ladeleistung_w = max_ladeleistung_w if max_ladeleistung_w else self.kapazitaet_wh
self.max_ladeleistung_w = (
max_ladeleistung_w if max_ladeleistung_w else self.kapazitaet_wh
)
self.min_soc_prozent = min_soc_prozent
self.max_soc_prozent = max_soc_prozent
# Calculate min and max SoC in Wh
self.min_soc_wh = (self.min_soc_prozent / 100) * self.kapazitaet_wh
self.max_soc_wh = (self.max_soc_prozent / 100) * self.kapazitaet_wh
def to_dict(self):
return {
"kapazitaet_wh": self.kapazitaet_wh,
@ -32,7 +45,7 @@ class PVAkku:
"charge_array": self.charge_array.tolist(),
"lade_effizienz": self.lade_effizienz,
"entlade_effizienz": self.entlade_effizienz,
"max_ladeleistung_w": self.max_ladeleistung_w
"max_ladeleistung_w": self.max_ladeleistung_w,
}
@classmethod
@ -44,12 +57,14 @@ class PVAkku:
lade_effizienz=data["lade_effizienz"],
entlade_effizienz=data["entlade_effizienz"],
max_ladeleistung_w=data["max_ladeleistung_w"],
start_soc_prozent=data["start_soc_prozent"]
start_soc_prozent=data["start_soc_prozent"],
)
# Set arrays
obj.discharge_array = np.array(data["discharge_array"])
obj.charge_array = np.array(data["charge_array"])
obj.soc_wh = data["soc_wh"] # Set current state of charge, which may differ from start_soc_prozent
obj.soc_wh = data[
"soc_wh"
] # Set current state of charge, which may differ from start_soc_prozent
return obj
@ -77,8 +92,12 @@ class PVAkku:
return 0.0, 0.0 # No energy discharge and no losses
# Calculate the maximum energy that can be discharged considering min_soc and efficiency
max_possible_discharge_wh = (self.soc_wh - self.min_soc_wh) * self.entlade_effizienz
max_possible_discharge_wh = max(max_possible_discharge_wh, 0.0) # Ensure non-negative
max_possible_discharge_wh = (
self.soc_wh - self.min_soc_wh
) * self.entlade_effizienz
max_possible_discharge_wh = max(
max_possible_discharge_wh, 0.0
) # Ensure non-negative
# Consider the maximum discharge power of the battery
max_abgebbar_wh = min(max_possible_discharge_wh, self.max_ladeleistung_w)
@ -88,7 +107,9 @@ class PVAkku:
# Calculate the actual amount withdrawn from the battery (before efficiency loss)
if self.entlade_effizienz > 0:
tatsaechliche_entnahme_wh = tatsaechlich_abgegeben_wh / self.entlade_effizienz
tatsaechliche_entnahme_wh = (
tatsaechlich_abgegeben_wh / self.entlade_effizienz
)
else:
tatsaechliche_entnahme_wh = 0.0
@ -103,7 +124,6 @@ class PVAkku:
# Return the actually discharged energy and the losses
return tatsaechlich_abgegeben_wh, verluste_wh
def energie_laden(self, wh, hour):
if hour is not None and self.charge_array[hour] == 0:
return 0, 0 # Charging not allowed in this hour
@ -117,7 +137,9 @@ class PVAkku:
# Calculate the maximum energy that can be charged considering max_soc and efficiency
if self.lade_effizienz > 0:
max_possible_charge_wh = (self.max_soc_wh - self.soc_wh) / self.lade_effizienz
max_possible_charge_wh = (
self.max_soc_wh - self.soc_wh
) / self.lade_effizienz
else:
max_possible_charge_wh = 0.0
max_possible_charge_wh = max(max_possible_charge_wh, 0.0) # Ensure non-negative
@ -138,7 +160,6 @@ class PVAkku:
return geladene_menge, verluste_wh
def aktueller_energieinhalt(self):
"""
This method returns the current remaining energy considering efficiency.
@ -149,11 +170,16 @@ class PVAkku:
return max(nutzbare_energie, 0.0)
if __name__ == '__main__':
if __name__ == "__main__":
# Test battery discharge below min_soc
print("Test: Discharge below min_soc")
akku = PVAkku(kapazitaet_wh=10000, hours=1, start_soc_prozent=50, min_soc_prozent=20, max_soc_prozent=80)
akku = PVAkku(
kapazitaet_wh=10000,
hours=1,
start_soc_prozent=50,
min_soc_prozent=20,
max_soc_prozent=80,
)
akku.reset()
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")
@ -165,7 +191,13 @@ if __name__ == '__main__':
# Test battery charge above max_soc
print("\nTest: Charge above max_soc")
akku = PVAkku(kapazitaet_wh=10000, hours=1, start_soc_prozent=50, min_soc_prozent=20, max_soc_prozent=80)
akku = PVAkku(
kapazitaet_wh=10000,
hours=1,
start_soc_prozent=50,
min_soc_prozent=20,
max_soc_prozent=80,
)
akku.reset()
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")
@ -177,7 +209,13 @@ if __name__ == '__main__':
# Test charging when battery is at max_soc
print("\nTest: Charging when at max_soc")
akku = PVAkku(kapazitaet_wh=10000, hours=1, start_soc_prozent=80, min_soc_prozent=20, max_soc_prozent=80)
akku = PVAkku(
kapazitaet_wh=10000,
hours=1,
start_soc_prozent=80,
min_soc_prozent=20,
max_soc_prozent=80,
)
akku.reset()
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")
@ -187,7 +225,13 @@ if __name__ == '__main__':
# Test discharging when battery is at min_soc
print("\nTest: Discharging when at min_soc")
akku = PVAkku(kapazitaet_wh=10000, hours=1, start_soc_prozent=20, min_soc_prozent=20, max_soc_prozent=80)
akku = PVAkku(
kapazitaet_wh=10000,
hours=1,
start_soc_prozent=20,
min_soc_prozent=20,
max_soc_prozent=80,
)
akku.reset()
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")

View File

@ -1,7 +1,7 @@
from datetime import datetime
from pprint import pprint
import numpy as np
import modules.class_akku as PVAkku
def replace_nan_with_none(data):
if isinstance(data, dict):
@ -18,22 +18,31 @@ def replace_nan_with_none(data):
return data
class EnergieManagementSystem:
def __init__(self, pv_prognose_wh=None, strompreis_euro_pro_wh=None, einspeiseverguetung_euro_pro_wh=None, eauto=None, gesamtlast=None, haushaltsgeraet=None, wechselrichter=None):
def __init__(
self,
pv_prognose_wh=None,
strompreis_euro_pro_wh=None,
einspeiseverguetung_euro_pro_wh=None,
eauto=None,
gesamtlast=None,
haushaltsgeraet=None,
wechselrichter=None,
):
self.akku = wechselrichter.akku
#self.lastkurve_wh = lastkurve_wh
# self.lastkurve_wh = lastkurve_wh
self.gesamtlast = gesamtlast
self.pv_prognose_wh = pv_prognose_wh
self.strompreis_euro_pro_wh = strompreis_euro_pro_wh # Strompreis in Cent pro Wh
self.einspeiseverguetung_euro_pro_wh = einspeiseverguetung_euro_pro_wh # Einspeisevergütung in Cent pro Wh
self.strompreis_euro_pro_wh = (
strompreis_euro_pro_wh # Strompreis in Cent pro Wh
)
self.einspeiseverguetung_euro_pro_wh = (
einspeiseverguetung_euro_pro_wh # Einspeisevergütung in Cent pro Wh
)
self.eauto = eauto
self.haushaltsgeraet = haushaltsgeraet
self.wechselrichter = wechselrichter
def set_akku_discharge_hours(self, ds):
self.akku.set_discharge_per_hour(ds)
@ -41,7 +50,7 @@ class EnergieManagementSystem:
self.eauto.set_charge_per_hour(ds)
def set_haushaltsgeraet_start(self, ds, global_start_hour=0):
self.haushaltsgeraet.set_startzeitpunkt(ds,global_start_hour=global_start_hour)
self.haushaltsgeraet.set_startzeitpunkt(ds, global_start_hour=global_start_hour)
def reset(self):
self.eauto.reset()
@ -58,19 +67,22 @@ class EnergieManagementSystem:
# Beginne die Simulation ab der aktuellen Stunde und führe sie für die berechnete Dauer aus
return self.simuliere(start_stunde)
def simuliere(self, start_stunde):
lastkurve_wh = self.gesamtlast
# Anzahl der Stunden berechnen
assert len(lastkurve_wh) == len(self.pv_prognose_wh) == len(self.strompreis_euro_pro_wh), f"Arraygrößen stimmen nicht überein: Lastkurve = {len(lastkurve_wh)}, PV-Prognose = {len(self.pv_prognose_wh)}, Strompreis = {len(self.strompreis_euro_pro_wh)}"
assert (
len(lastkurve_wh)
== len(self.pv_prognose_wh)
== len(self.strompreis_euro_pro_wh)
), f"Arraygrößen stimmen nicht überein: Lastkurve = {len(lastkurve_wh)}, PV-Prognose = {len(self.pv_prognose_wh)}, Strompreis = {len(self.strompreis_euro_pro_wh)}"
ende = min( len(lastkurve_wh),len(self.pv_prognose_wh), len(self.strompreis_euro_pro_wh))
ende = min(
len(lastkurve_wh),
len(self.pv_prognose_wh),
len(self.strompreis_euro_pro_wh),
)
total_hours = ende-start_stunde
total_hours = ende - start_stunde
# Initialisierung der Arrays mit NaN-Werten
last_wh_pro_stunde = np.full(total_hours, np.nan)
@ -88,13 +100,14 @@ class EnergieManagementSystem:
if self.eauto:
eauto_soc_pro_stunde[start_stunde] = self.eauto.ladezustand_in_prozent()
for stunde in range(start_stunde + 1, ende):
stunde_since_now = stunde-start_stunde
#print(stunde_since_now)
stunde_since_now = stunde - start_stunde
# print(stunde_since_now)
# Anfangszustände
akku_soc_start = self.akku.ladezustand_in_prozent()
eauto_soc_start = self.eauto.ladezustand_in_prozent() if self.eauto else None
eauto_soc_start = (
self.eauto.ladezustand_in_prozent() if self.eauto else None
)
# Verbrauch und zusätzliche Lasten bestimmen
verbrauch = self.gesamtlast[stunde]
@ -115,13 +128,19 @@ class EnergieManagementSystem:
# E-Auto-Verbrauch bestimmen
if self.eauto:
geladene_menge_eauto, verluste_eauto = self.eauto.energie_laden(None, stunde)
geladene_menge_eauto, verluste_eauto = self.eauto.energie_laden(
None, stunde
)
verbrauch += geladene_menge_eauto
verluste_wh_pro_stunde[stunde_since_now] += verluste_eauto
eauto_soc_pro_stunde[stunde_since_now] = self.eauto.ladezustand_in_prozent()
eauto_soc_pro_stunde[stunde_since_now] = (
self.eauto.ladezustand_in_prozent()
)
# Wechselrichter-Logik
netzeinspeisung, netzbezug, verluste, eigenverbrauch = self.wechselrichter.energie_verarbeiten(erzeugung, verbrauch, stunde)
netzeinspeisung, netzbezug, verluste, eigenverbrauch = (
self.wechselrichter.energie_verarbeiten(erzeugung, verbrauch, stunde)
)
# Ergebnisse speichern
netzeinspeisung_wh_pro_stunde[stunde_since_now] = netzeinspeisung
@ -130,30 +149,33 @@ class EnergieManagementSystem:
last_wh_pro_stunde[stunde_since_now] = verbrauch
# Finanzen berechnen
kosten_euro_pro_stunde[stunde_since_now] = netzbezug * strompreis
einnahmen_euro_pro_stunde[stunde_since_now] = netzeinspeisung * self.einspeiseverguetung_euro_pro_wh[stunde]
einnahmen_euro_pro_stunde[stunde_since_now] = (
netzeinspeisung * self.einspeiseverguetung_euro_pro_wh[stunde]
)
# Letzter Akkuzustand speichern
akku_soc_pro_stunde[stunde_since_now] = self.akku.ladezustand_in_prozent()
# Gesamtkosten berechnen
gesamtkosten_euro = np.nansum(kosten_euro_pro_stunde) - np.nansum(einnahmen_euro_pro_stunde)
gesamtkosten_euro = np.nansum(kosten_euro_pro_stunde) - np.nansum(
einnahmen_euro_pro_stunde
)
out = {
'Last_Wh_pro_Stunde': last_wh_pro_stunde,
'Netzeinspeisung_Wh_pro_Stunde': netzeinspeisung_wh_pro_stunde,
'Netzbezug_Wh_pro_Stunde': netzbezug_wh_pro_stunde,
'Kosten_Euro_pro_Stunde': kosten_euro_pro_stunde,
'akku_soc_pro_stunde': akku_soc_pro_stunde,
'Einnahmen_Euro_pro_Stunde': einnahmen_euro_pro_stunde,
'Gesamtbilanz_Euro': gesamtkosten_euro,
'E-Auto_SoC_pro_Stunde': eauto_soc_pro_stunde,
'Gesamteinnahmen_Euro': np.nansum(einnahmen_euro_pro_stunde),
'Gesamtkosten_Euro': np.nansum(kosten_euro_pro_stunde),
"Last_Wh_pro_Stunde": last_wh_pro_stunde,
"Netzeinspeisung_Wh_pro_Stunde": netzeinspeisung_wh_pro_stunde,
"Netzbezug_Wh_pro_Stunde": netzbezug_wh_pro_stunde,
"Kosten_Euro_pro_Stunde": kosten_euro_pro_stunde,
"akku_soc_pro_stunde": akku_soc_pro_stunde,
"Einnahmen_Euro_pro_Stunde": einnahmen_euro_pro_stunde,
"Gesamtbilanz_Euro": gesamtkosten_euro,
"E-Auto_SoC_pro_Stunde": eauto_soc_pro_stunde,
"Gesamteinnahmen_Euro": np.nansum(einnahmen_euro_pro_stunde),
"Gesamtkosten_Euro": np.nansum(kosten_euro_pro_stunde),
"Verluste_Pro_Stunde": verluste_wh_pro_stunde,
"Gesamt_Verluste": np.nansum(verluste_wh_pro_stunde),
"Haushaltsgeraet_wh_pro_stunde": haushaltsgeraet_wh_pro_stunde
"Haushaltsgeraet_wh_pro_stunde": haushaltsgeraet_wh_pro_stunde,
}
out = replace_nan_with_none(out)
return out

View File

@ -1,9 +1,12 @@
import numpy as np
class Haushaltsgeraet:
def __init__(self, hours=None, verbrauch_kwh=None, dauer_h=None):
self.hours = hours # Total duration for which the planning is done
self.verbrauch_kwh = verbrauch_kwh # Total energy consumption of the device in kWh
self.verbrauch_kwh = (
verbrauch_kwh # Total energy consumption of the device in kWh
)
self.dauer_h = dauer_h # Duration of use in hours
self.lastkurve = np.zeros(self.hours) # Initialize the load curve with zeros
@ -21,10 +24,10 @@ class Haushaltsgeraet:
raise ValueError("The start time is earlier than the available time frame.")
# Calculate power per hour based on total consumption and duration
leistung_pro_stunde = (self.verbrauch_kwh / self.dauer_h) # Convert to watt-hours
leistung_pro_stunde = self.verbrauch_kwh / self.dauer_h # Convert to watt-hours
# Set the power for the duration of use in the load curve array
self.lastkurve[start_hour:start_hour + self.dauer_h] = leistung_pro_stunde
self.lastkurve[start_hour : start_hour + self.dauer_h] = leistung_pro_stunde
def reset(self):
"""

View File

@ -1,4 +1,3 @@
class Heatpump:
MAX_HEIZLEISTUNG = 5000 # Maximum heating power in watts
BASE_HEIZLEISTUNG = 235.0 # Base heating power value
@ -17,19 +16,26 @@ class Heatpump:
def heizleistung_berechnen(self, aussentemperatur):
"""Calculate heating power based on outside temperature."""
heizleistung = ((self.BASE_HEIZLEISTUNG + aussentemperatur * self.TEMPERATURE_COEFFICIENT) * 1000) / 24.0
heizleistung = (
(self.BASE_HEIZLEISTUNG + aussentemperatur * self.TEMPERATURE_COEFFICIENT)
* 1000
) / 24.0
return min(self.max_heizleistung, heizleistung)
def elektrische_leistung_berechnen(self, aussentemperatur):
"""Calculate electrical power based on outside temperature."""
return 1164 - 77.8 * aussentemperatur + 1.62 * aussentemperatur ** 2.0
return 1164 - 77.8 * aussentemperatur + 1.62 * aussentemperatur**2.0
def simulate_24h(self, temperaturen):
"""Simulate power data for 24 hours based on provided temperatures."""
leistungsdaten = []
if len(temperaturen) != self.prediction_hours:
raise ValueError("The temperature array must contain exactly " + str(self.prediction_hours) + " entries, one for each hour of the day.")
raise ValueError(
"The temperature array must contain exactly "
+ str(self.prediction_hours)
+ " entries, one for each hour of the day."
)
for temp in temperaturen:
elektrische_leistung = self.elektrische_leistung_berechnen(temp)
@ -38,7 +44,7 @@ class Heatpump:
# Example usage of the class
if __name__ == '__main__':
if __name__ == "__main__":
max_heizleistung = 5000 # 5 kW heating power
start_innentemperatur = 15 # Initial indoor temperature
isolationseffizienz = 0.8 # Insulation efficiency
@ -49,7 +55,32 @@ if __name__ == '__main__':
print(wp.cop_berechnen(-10), " ", wp.cop_berechnen(0), " ", wp.cop_berechnen(10))
# 24 hours of outside temperatures (example values)
temperaturen = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -5, -2, 5]
temperaturen = [
10,
9,
8,
7,
6,
5,
4,
3,
2,
1,
0,
-1,
-2,
-3,
-4,
-5,
-6,
-7,
-8,
-9,
-10,
-5,
-2,
5,
]
# Calculate the 24-hour power data
leistungsdaten = wp.simulate_24h(temperaturen)

View File

@ -1,6 +1,8 @@
class Wechselrichter:
def __init__(self, max_leistung_wh, akku):
self.max_leistung_wh = max_leistung_wh # Maximum power that the inverter can handle
self.max_leistung_wh = (
max_leistung_wh # Maximum power that the inverter can handle
)
self.akku = akku # Connection to a battery object
def energie_verarbeiten(self, erzeugung, verbrauch, hour):
@ -14,15 +16,21 @@ class Wechselrichter:
# If consumption exceeds maximum inverter power
verluste += erzeugung - self.max_leistung_wh
restleistung_nach_verbrauch = self.max_leistung_wh - verbrauch
netzbezug = -restleistung_nach_verbrauch # Negative indicates feeding into the grid
netzbezug = (
-restleistung_nach_verbrauch
) # Negative indicates feeding into the grid
eigenverbrauch = self.max_leistung_wh
else:
# Remaining power after consumption
restleistung_nach_verbrauch = erzeugung - verbrauch
# Load battery with excess energy
geladene_energie, verluste_laden_akku = self.akku.energie_laden(restleistung_nach_verbrauch, hour)
rest_überschuss = restleistung_nach_verbrauch - (geladene_energie + verluste_laden_akku)
geladene_energie, verluste_laden_akku = self.akku.energie_laden(
restleistung_nach_verbrauch, hour
)
rest_überschuss = restleistung_nach_verbrauch - (
geladene_energie + verluste_laden_akku
)
# Feed-in to the grid based on remaining capacity
if rest_überschuss > self.max_leistung_wh - verbrauch:
@ -35,17 +43,25 @@ class Wechselrichter:
eigenverbrauch = verbrauch # Self-consumption is equal to the load
else:
benötigte_energie = verbrauch - erzeugung # Energy needed from external sources
max_akku_leistung = self.akku.max_ladeleistung_w # Maximum battery discharge power
benötigte_energie = (
verbrauch - erzeugung
) # Energy needed from external sources
max_akku_leistung = (
self.akku.max_ladeleistung_w
) # Maximum battery discharge power
# Calculate remaining AC power available
rest_ac_leistung = max(self.max_leistung_wh - erzeugung, 0)
# Discharge energy from the battery based on need
if benötigte_energie < rest_ac_leistung:
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(benötigte_energie, hour)
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(
benötigte_energie, hour
)
else:
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(rest_ac_leistung, hour)
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(
rest_ac_leistung, hour
)
verluste += akku_entladeverluste # Include losses from battery discharge
netzbezug = benötigte_energie - aus_akku # Energy drawn from the grid

View File

@ -1,9 +1,10 @@
import numpy as np
from datetime import datetime
from pprint import pprint
import numpy as np
# Load the .npz file when the application starts
class LoadForecast:
def __init__(self, filepath=None, year_energy=None):
self.filepath = filepath
@ -26,7 +27,9 @@ class LoadForecast:
day_of_year = date.timetuple().tm_yday
# Extract the 24-hour profile for the given date
daily_stats = self.data_year_energy[day_of_year - 1] # -1 because indexing starts at 0
daily_stats = self.data_year_energy[
day_of_year - 1
] # -1 because indexing starts at 0
return daily_stats
def get_hourly_stats(self, date_str, hour):
@ -44,7 +47,9 @@ class LoadForecast:
day_of_year = date.timetuple().tm_yday
# Extract mean and standard deviation for the given hour
hourly_stats = self.data_year_energy[day_of_year - 1, :, hour] # Access the specific hour
hourly_stats = self.data_year_energy[
day_of_year - 1, :, hour
] # Access the specific hour
return hourly_stats
@ -63,7 +68,9 @@ class LoadForecast:
end_day_of_year = end_date.timetuple().tm_yday
# Note that in leap years, the day of the year may need adjustment
stats_for_range = self.data_year_energy[start_day_of_year:end_day_of_year] # -1 because indexing starts at 0
stats_for_range = self.data_year_energy[
start_day_of_year:end_day_of_year
] # -1 because indexing starts at 0
stats_for_range = stats_for_range.swapaxes(1, 0)
stats_for_range = stats_for_range.reshape(stats_for_range.shape[0], -1)
@ -73,7 +80,9 @@ class LoadForecast:
"""Loads data from the specified file."""
try:
data = np.load(self.filepath)
self.data = np.array(list(zip(data["yearly_profiles"], data["yearly_profiles_std"])))
self.data = np.array(
list(zip(data["yearly_profiles"], data["yearly_profiles_std"]))
)
self.data_year_energy = self.data * self.year_energy
# pprint(self.data_year_energy)
except FileNotFoundError:
@ -89,10 +98,13 @@ class LoadForecast:
"""Converts a date string to a datetime object."""
return datetime.strptime(date_str, "%Y-%m-%d")
# Example usage of the class
if __name__ == '__main__':
filepath = r'..\load_profiles.npz' # Adjust the path to the .npz file
if __name__ == "__main__":
filepath = r"..\load_profiles.npz" # Adjust the path to the .npz file
lf = LoadForecast(filepath=filepath, year_energy=2000)
specific_date_prices = lf.get_daily_stats('2024-02-16') # Adjust date as needed
specific_hour_stats = lf.get_hourly_stats('2024-02-16', 12) # Adjust date and hour as needed
specific_date_prices = lf.get_daily_stats("2024-02-16") # Adjust date as needed
specific_hour_stats = lf.get_hourly_stats(
"2024-02-16", 12
) # Adjust date and hour as needed
print(specific_hour_stats)

View File

@ -1,5 +1,5 @@
import numpy as np
from pprint import pprint
class Gesamtlast:
def __init__(self, prediction_hours=24):
@ -14,7 +14,9 @@ class Gesamtlast:
:param last_array: Array of loads, where each entry corresponds to an hour
"""
if len(last_array) != self.prediction_hours:
raise ValueError(f"Total load inconsistent lengths in arrays: {name} {len(last_array)}")
raise ValueError(
f"Total load inconsistent lengths in arrays: {name} {len(last_array)}"
)
self.lasten[name] = last_array
def gesamtlast_berechnen(self):
@ -31,6 +33,9 @@ class Gesamtlast:
gesamtlast_array = [0] * stunden
for last_array in self.lasten.values():
gesamtlast_array = [gesamtlast + stundenlast for gesamtlast, stundenlast in zip(gesamtlast_array, last_array)]
gesamtlast_array = [
gesamtlast + stundenlast
for gesamtlast, stundenlast in zip(gesamtlast_array, last_array)
]
return np.array(gesamtlast_array)

View File

@ -1,14 +1,11 @@
import json
import os
import sys
from datetime import datetime, timedelta, timezone
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, r2_score
import mariadb
# from sklearn.model_selection import train_test_split, GridSearchCV
# from sklearn.ensemble import GradientBoostingRegressor
# from xgboost import XGBRegressor
@ -22,6 +19,7 @@ import mariadb
# Add the parent directory to sys.path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import *
from modules.class_load import *
@ -38,117 +36,159 @@ class LoadPredictionAdjuster:
def _remove_outliers(self, data, threshold=2):
# Calculate the Z-Score of the 'Last' data
data['Z-Score'] = np.abs((data['Last'] - data['Last'].mean()) / data['Last'].std())
data["Z-Score"] = np.abs(
(data["Last"] - data["Last"].mean()) / data["Last"].std()
)
# Filter the data based on the threshold
filtered_data = data[data['Z-Score'] < threshold]
return filtered_data.drop(columns=['Z-Score'])
filtered_data = data[data["Z-Score"] < threshold]
return filtered_data.drop(columns=["Z-Score"])
def _merge_data(self):
# Convert the time column in both DataFrames to datetime
self.predicted_data['time'] = pd.to_datetime(self.predicted_data['time'])
self.measured_data['time'] = pd.to_datetime(self.measured_data['time'])
self.predicted_data["time"] = pd.to_datetime(self.predicted_data["time"])
self.measured_data["time"] = pd.to_datetime(self.measured_data["time"])
# Ensure both time columns have the same timezone
if self.measured_data['time'].dt.tz is None:
self.measured_data['time'] = self.measured_data['time'].dt.tz_localize('UTC')
if self.measured_data["time"].dt.tz is None:
self.measured_data["time"] = self.measured_data["time"].dt.tz_localize(
"UTC"
)
self.predicted_data['time'] = (
self.predicted_data['time'].dt.tz_localize('UTC')
.dt.tz_convert('Europe/Berlin')
self.predicted_data["time"] = (
self.predicted_data["time"]
.dt.tz_localize("UTC")
.dt.tz_convert("Europe/Berlin")
)
self.measured_data["time"] = self.measured_data["time"].dt.tz_convert(
"Europe/Berlin"
)
self.measured_data['time'] = self.measured_data['time'].dt.tz_convert('Europe/Berlin')
# Optionally: Remove timezone information if only working locally
self.predicted_data['time'] = self.predicted_data['time'].dt.tz_localize(None)
self.measured_data['time'] = self.measured_data['time'].dt.tz_localize(None)
self.predicted_data["time"] = self.predicted_data["time"].dt.tz_localize(None)
self.measured_data["time"] = self.measured_data["time"].dt.tz_localize(None)
# Now you can perform the merge
merged_data = pd.merge(self.measured_data, self.predicted_data, on='time', how='inner')
merged_data = pd.merge(
self.measured_data, self.predicted_data, on="time", how="inner"
)
print(merged_data)
merged_data['Hour'] = merged_data['time'].dt.hour
merged_data['DayOfWeek'] = merged_data['time'].dt.dayofweek
merged_data["Hour"] = merged_data["time"].dt.hour
merged_data["DayOfWeek"] = merged_data["time"].dt.dayofweek
return merged_data
def calculate_weighted_mean(self, train_period_weeks=9, test_period_weeks=1):
self.merged_data = self._remove_outliers(self.merged_data)
train_end_date = self.merged_data['time'].max() - pd.Timedelta(weeks=test_period_weeks)
train_end_date = self.merged_data["time"].max() - pd.Timedelta(
weeks=test_period_weeks
)
train_start_date = train_end_date - pd.Timedelta(weeks=train_period_weeks)
test_start_date = train_end_date + pd.Timedelta(hours=1)
test_end_date = test_start_date + pd.Timedelta(weeks=test_period_weeks) - pd.Timedelta(hours=1)
test_end_date = (
test_start_date
+ pd.Timedelta(weeks=test_period_weeks)
- pd.Timedelta(hours=1)
)
self.train_data = self.merged_data[
(self.merged_data['time'] >= train_start_date) &
(self.merged_data['time'] <= train_end_date)
(self.merged_data["time"] >= train_start_date)
& (self.merged_data["time"] <= train_end_date)
]
self.test_data = self.merged_data[
(self.merged_data['time'] >= test_start_date) &
(self.merged_data['time'] <= test_end_date)
(self.merged_data["time"] >= test_start_date)
& (self.merged_data["time"] <= test_end_date)
]
self.train_data['Difference'] = self.train_data['Last'] - self.train_data['Last Pred']
self.train_data["Difference"] = (
self.train_data["Last"] - self.train_data["Last Pred"]
)
weekdays_train_data = self.train_data[self.train_data['DayOfWeek'] < 5]
weekends_train_data = self.train_data[self.train_data['DayOfWeek'] >= 5]
weekdays_train_data = self.train_data[self.train_data["DayOfWeek"] < 5]
weekends_train_data = self.train_data[self.train_data["DayOfWeek"] >= 5]
self.weekday_diff = weekdays_train_data.groupby('Hour').apply(self._weighted_mean_diff).dropna()
self.weekend_diff = weekends_train_data.groupby('Hour').apply(self._weighted_mean_diff).dropna()
self.weekday_diff = (
weekdays_train_data.groupby("Hour").apply(self._weighted_mean_diff).dropna()
)
self.weekend_diff = (
weekends_train_data.groupby("Hour").apply(self._weighted_mean_diff).dropna()
)
def _weighted_mean_diff(self, data):
train_end_date = self.train_data['time'].max()
weights = 1 / (train_end_date - data['time']).dt.days.replace(0, np.nan)
weighted_mean = (data['Difference'] * weights).sum() / weights.sum()
train_end_date = self.train_data["time"].max()
weights = 1 / (train_end_date - data["time"]).dt.days.replace(0, np.nan)
weighted_mean = (data["Difference"] * weights).sum() / weights.sum()
return weighted_mean
def adjust_predictions(self):
self.train_data['Adjusted Pred'] = self.train_data.apply(self._adjust_row, axis=1)
self.test_data['Adjusted Pred'] = self.test_data.apply(self._adjust_row, axis=1)
self.train_data["Adjusted Pred"] = self.train_data.apply(
self._adjust_row, axis=1
)
self.test_data["Adjusted Pred"] = self.test_data.apply(self._adjust_row, axis=1)
def _adjust_row(self, row):
if row['DayOfWeek'] < 5:
return row['Last Pred'] + self.weekday_diff.get(row['Hour'], 0)
if row["DayOfWeek"] < 5:
return row["Last Pred"] + self.weekday_diff.get(row["Hour"], 0)
else:
return row['Last Pred'] + self.weekend_diff.get(row['Hour'], 0)
return row["Last Pred"] + self.weekend_diff.get(row["Hour"], 0)
def plot_results(self):
self._plot_data(self.train_data, 'Training')
self._plot_data(self.test_data, 'Testing')
self._plot_data(self.train_data, "Training")
self._plot_data(self.test_data, "Testing")
def _plot_data(self, data, data_type):
plt.figure(figsize=(14, 7))
plt.plot(data['time'], data['Last'], label=f'Actual Last - {data_type}', color='blue')
plt.plot(data['time'], data['Last Pred'], label=f'Predicted Last - {data_type}', color='red', linestyle='--')
plt.plot(data['time'], data['Adjusted Pred'], label=f'Adjusted Predicted Last - {data_type}', color='green', linestyle=':')
plt.xlabel('Time')
plt.ylabel('Load')
plt.title(f'Actual vs Predicted vs Adjusted Predicted Load ({data_type} Data)')
plt.plot(
data["time"], data["Last"], label=f"Actual Last - {data_type}", color="blue"
)
plt.plot(
data["time"],
data["Last Pred"],
label=f"Predicted Last - {data_type}",
color="red",
linestyle="--",
)
plt.plot(
data["time"],
data["Adjusted Pred"],
label=f"Adjusted Predicted Last - {data_type}",
color="green",
linestyle=":",
)
plt.xlabel("Time")
plt.ylabel("Load")
plt.title(f"Actual vs Predicted vs Adjusted Predicted Load ({data_type} Data)")
plt.legend()
plt.grid(True)
plt.show()
def evaluate_model(self):
mse = mean_squared_error(self.test_data['Last'], self.test_data['Adjusted Pred'])
r2 = r2_score(self.test_data['Last'], self.test_data['Adjusted Pred'])
print(f'Mean Squared Error: {mse}')
print(f'R-squared: {r2}')
mse = mean_squared_error(
self.test_data["Last"], self.test_data["Adjusted Pred"]
)
r2 = r2_score(self.test_data["Last"], self.test_data["Adjusted Pred"])
print(f"Mean Squared Error: {mse}")
print(f"R-squared: {r2}")
def predict_next_hours(self, hours_ahead):
last_date = self.merged_data['time'].max()
future_dates = [last_date + pd.Timedelta(hours=i) for i in range(1, hours_ahead + 1)]
future_df = pd.DataFrame({'time': future_dates})
future_df['Hour'] = future_df['time'].dt.hour
future_df['DayOfWeek'] = future_df['time'].dt.dayofweek
future_df['Last Pred'] = future_df['time'].apply(self._forecast_next_hours)
future_df['Adjusted Pred'] = future_df.apply(self._adjust_row, axis=1)
last_date = self.merged_data["time"].max()
future_dates = [
last_date + pd.Timedelta(hours=i) for i in range(1, hours_ahead + 1)
]
future_df = pd.DataFrame({"time": future_dates})
future_df["Hour"] = future_df["time"].dt.hour
future_df["DayOfWeek"] = future_df["time"].dt.dayofweek
future_df["Last Pred"] = future_df["time"].apply(self._forecast_next_hours)
future_df["Adjusted Pred"] = future_df.apply(self._adjust_row, axis=1)
return future_df
def _forecast_next_hours(self, timestamp):
date_str = timestamp.strftime('%Y-%m-%d')
date_str = timestamp.strftime("%Y-%m-%d")
hour = timestamp.hour
daily_forecast = self.load_forecast.get_daily_stats(date_str)
return daily_forecast[0][hour] if hour < len(daily_forecast[0]) else np.nan
# if __name__ == '__main__':
# estimator = LastEstimator()
# start_date = "2024-06-01"

View File

@ -1,31 +1,29 @@
from flask import Flask, jsonify, request
import numpy as np
from modules.class_load import *
from modules.class_ems import *
from modules.class_pv_forecast import *
from modules.class_akku import *
import os
import matplotlib
import numpy as np
from modules.class_akku import *
from modules.class_ems import *
from modules.class_haushaltsgeraet import *
from modules.class_heatpump import *
from modules.class_load_container import *
from modules.class_inverter import *
from modules.class_load import *
from modules.class_load_container import *
from modules.class_pv_forecast import *
from modules.class_sommerzeit import *
from modules.visualize import *
from modules.class_haushaltsgeraet import *
import os
from flask import Flask, send_from_directory
from pprint import pprint
import matplotlib
matplotlib.use('Agg') # Setzt das Backend auf Agg
import matplotlib.pyplot as plt
import string
from datetime import datetime
from deap import base, creator, tools, algorithms
import numpy as np
matplotlib.use("Agg") # Setzt das Backend auf Agg
import random
import os
from datetime import datetime
from deap import algorithms, base, creator, tools
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import *
def isfloat(num):
try:
float(num)
@ -33,7 +31,17 @@ def isfloat(num):
except:
return False
def differential_evolution(population, toolbox, cxpb, mutpb, ngen, stats=None, halloffame=None, verbose=__debug__):
def differential_evolution(
population,
toolbox,
cxpb,
mutpb,
ngen,
stats=None,
halloffame=None,
verbose=__debug__,
):
"""Differential Evolution Algorithm"""
# Evaluate the entire population
@ -45,7 +53,7 @@ def differential_evolution(population, toolbox, cxpb, mutpb, ngen, stats=None, h
halloffame.update(population)
logbook = tools.Logbook()
logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])
logbook.header = ["gen", "nevals"] + (stats.fields if stats else [])
for gen in range(ngen):
# Generate the next generation by mutation and recombination
@ -77,16 +85,14 @@ def differential_evolution(population, toolbox, cxpb, mutpb, ngen, stats=None, h
return population, logbook
class optimization_problem:
def __init__(self, prediction_hours=24, strafe = 10, optimization_hours= 24):
self.prediction_hours = prediction_hours#
def __init__(self, prediction_hours=24, strafe=10, optimization_hours=24):
self.prediction_hours = prediction_hours #
self.strafe = strafe
self.opti_param = None
self.fixed_eauto_hours = prediction_hours-optimization_hours
self.fixed_eauto_hours = prediction_hours - optimization_hours
self.possible_charge_values = moegliche_ladestroeme_in_prozent
def split_individual(self, individual):
"""
Teilt das gegebene Individuum in die verschiedenen Parameter auf:
@ -95,24 +101,28 @@ class optimization_problem:
- Haushaltsgeräte (spuelstart_int, falls vorhanden)
"""
# Extrahiere die Entlade- und Ladeparameter direkt aus dem Individuum
discharge_hours_bin = individual[:self.prediction_hours] # Erste 24 Werte sind Bool (Entladen)
eautocharge_hours_float = individual[self.prediction_hours:self.prediction_hours * 2] # Nächste 24 Werte sind Float (Laden)
discharge_hours_bin = individual[
: self.prediction_hours
] # Erste 24 Werte sind Bool (Entladen)
eautocharge_hours_float = individual[
self.prediction_hours : self.prediction_hours * 2
] # Nächste 24 Werte sind Float (Laden)
spuelstart_int = None
if self.opti_param and self.opti_param.get("haushaltsgeraete", 0) > 0:
spuelstart_int = individual[-1] # Letzter Wert ist Startzeit für Haushaltsgerät
spuelstart_int = individual[
-1
] # Letzter Wert ist Startzeit für Haushaltsgerät
return discharge_hours_bin, eautocharge_hours_float, spuelstart_int
def setup_deap_environment(self,opti_param, start_hour):
def setup_deap_environment(self, opti_param, start_hour):
self.opti_param = opti_param
if "FitnessMin" in creator.__dict__:
del creator.FitnessMin
del creator.FitnessMin
if "Individual" in creator.__dict__:
del creator.Individual
del creator.Individual
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)
@ -120,145 +130,168 @@ class optimization_problem:
# PARAMETER
self.toolbox = base.Toolbox()
self.toolbox.register("attr_bool", random.randint, 0, 1)
self.toolbox.register("attr_float", random.uniform, 0, 1) # Für kontinuierliche Werte zwischen 0 und 1 (z.B. für E-Auto-Ladeleistung)
#self.toolbox.register("attr_choice", random.choice, self.possible_charge_values) # Für diskrete Ladeströme
self.toolbox.register(
"attr_float", random.uniform, 0, 1
) # Für kontinuierliche Werte zwischen 0 und 1 (z.B. für E-Auto-Ladeleistung)
# self.toolbox.register("attr_choice", random.choice, self.possible_charge_values) # Für diskrete Ladeströme
self.toolbox.register("attr_int", random.randint, start_hour, 23)
###################
# Haushaltsgeraete
#print("Haushalt:",opti_param["haushaltsgeraete"])
if opti_param["haushaltsgeraete"]>0:
def create_individual():
attrs = [self.toolbox.attr_bool() for _ in range(self.prediction_hours)] # 24 Bool-Werte für Entladen
attrs += [self.toolbox.attr_float() for _ in range(self.prediction_hours)] # 24 Float-Werte für Laden
attrs.append(self.toolbox.attr_int()) # Haushaltsgerät-Startzeit
return creator.Individual(attrs)
# print("Haushalt:",opti_param["haushaltsgeraete"])
if opti_param["haushaltsgeraete"] > 0:
def create_individual():
attrs = [
self.toolbox.attr_bool() for _ in range(self.prediction_hours)
] # 24 Bool-Werte für Entladen
attrs += [
self.toolbox.attr_float() for _ in range(self.prediction_hours)
] # 24 Float-Werte für Laden
attrs.append(self.toolbox.attr_int()) # Haushaltsgerät-Startzeit
return creator.Individual(attrs)
else:
def create_individual():
attrs = [self.toolbox.attr_bool() for _ in range(self.prediction_hours)] # 24 Bool-Werte für Entladen
attrs += [self.toolbox.attr_float() for _ in range(self.prediction_hours)] # 24 Float-Werte für Laden
return creator.Individual(attrs)
def create_individual():
attrs = [
self.toolbox.attr_bool() for _ in range(self.prediction_hours)
] # 24 Bool-Werte für Entladen
attrs += [
self.toolbox.attr_float() for _ in range(self.prediction_hours)
] # 24 Float-Werte für Laden
return creator.Individual(attrs)
self.toolbox.register("individual", create_individual)#tools.initCycle, creator.Individual, (self.toolbox.attr_bool,self.toolbox.attr_bool), n=self.prediction_hours+1)
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
self.toolbox.register(
"individual", create_individual
) # tools.initCycle, creator.Individual, (self.toolbox.attr_bool,self.toolbox.attr_bool), n=self.prediction_hours+1)
self.toolbox.register(
"population", tools.initRepeat, list, self.toolbox.individual
)
self.toolbox.register("mate", tools.cxTwoPoint)
self.toolbox.register("mutate", tools.mutFlipBit, indpb=0.1)
#self.toolbox.register("mutate", mutate_choice, self.possible_charge_values, indpb=0.1)
#self.toolbox.register("mutate", tools.mutUniformInt, low=0, up=len(self.possible_charge_values)-1, indpb=0.1)
# self.toolbox.register("mutate", mutate_choice, self.possible_charge_values, indpb=0.1)
# self.toolbox.register("mutate", tools.mutUniformInt, low=0, up=len(self.possible_charge_values)-1, indpb=0.1)
self.toolbox.register("select", tools.selTournament, tournsize=3)
def evaluate_inner(self,individual, ems,start_hour):
def evaluate_inner(self, individual, ems, start_hour):
ems.reset()
#print("Spuel:",self.opti_param)
# print("Spuel:",self.opti_param)
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = self.split_individual(individual)
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = (
self.split_individual(individual)
)
# Haushaltsgeraete
if self.opti_param["haushaltsgeraete"]>0:
ems.set_haushaltsgeraet_start(spuelstart_int,global_start_hour=start_hour)
if self.opti_param["haushaltsgeraete"] > 0:
ems.set_haushaltsgeraet_start(spuelstart_int, global_start_hour=start_hour)
#discharge_hours_bin = np.full(self.prediction_hours,0)
# discharge_hours_bin = np.full(self.prediction_hours,0)
ems.set_akku_discharge_hours(discharge_hours_bin)
# Setze die festen Werte für die letzten x Stunden
for i in range(self.prediction_hours - self.fixed_eauto_hours, self.prediction_hours):
eautocharge_hours_float[i] = 0.0 # Setze die letzten x Stunden auf einen festen Wert (oder vorgegebenen Wert)
for i in range(
self.prediction_hours - self.fixed_eauto_hours, self.prediction_hours
):
eautocharge_hours_float[i] = (
0.0 # Setze die letzten x Stunden auf einen festen Wert (oder vorgegebenen Wert)
)
#print(eautocharge_hours_float)
# print(eautocharge_hours_float)
ems.set_eauto_charge_hours(eautocharge_hours_float)
o = ems.simuliere(start_hour)
return o
# Fitness-Funktion (muss Ihre EnergieManagementSystem-Logik integrieren)
def evaluate(self,individual,ems,parameter,start_hour,worst_case):
def evaluate(self, individual, ems, parameter, start_hour, worst_case):
try:
o = self.evaluate_inner(individual,ems,start_hour)
o = self.evaluate_inner(individual, ems, start_hour)
except:
return (100000.0,)
return (100000.0,)
gesamtbilanz = o["Gesamtbilanz_Euro"]
if worst_case:
gesamtbilanz = gesamtbilanz * -1.0
gesamtbilanz = gesamtbilanz * -1.0
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = self.split_individual(individual)
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = (
self.split_individual(individual)
)
max_ladeleistung = np.max(moegliche_ladestroeme_in_prozent)
strafe_überschreitung = 0.0
# Ladeleistung überschritten?
for ladeleistung in eautocharge_hours_float:
if ladeleistung > max_ladeleistung:
# Berechne die Überschreitung
überschreitung = ladeleistung - max_ladeleistung
# Füge eine Strafe hinzu (z.B. 10 Einheiten Strafe pro Prozentpunkt Überschreitung)
strafe_überschreitung += self.strafe * 10 # Hier ist die Strafe proportional zur Überschreitung
if ladeleistung > max_ladeleistung:
# Berechne die Überschreitung
überschreitung = ladeleistung - max_ladeleistung
# Füge eine Strafe hinzu (z.B. 10 Einheiten Strafe pro Prozentpunkt Überschreitung)
strafe_überschreitung += (
self.strafe * 10
) # Hier ist die Strafe proportional zur Überschreitung
# Für jeden Discharge 0, eine kleine Strafe von 1 Cent, da die Lastvertelung noch fehlt. Also wenn es egal ist, soll er den Akku entladen lassen
for i in range(0, self.prediction_hours):
if discharge_hours_bin[i] == 0.0: # Wenn die letzten x Stunden von einem festen Wert abweichen
if (
discharge_hours_bin[i] == 0.0
): # Wenn die letzten x Stunden von einem festen Wert abweichen
gesamtbilanz += 0.01 # Bestrafe den Optimierer
# E-Auto nur die ersten self.fixed_eauto_hours
for i in range(self.prediction_hours - self.fixed_eauto_hours, self.prediction_hours):
if eautocharge_hours_float[i] != 0.0: # Wenn die letzten x Stunden von einem festen Wert abweichen
for i in range(
self.prediction_hours - self.fixed_eauto_hours, self.prediction_hours
):
if (
eautocharge_hours_float[i] != 0.0
): # Wenn die letzten x Stunden von einem festen Wert abweichen
gesamtbilanz += self.strafe # Bestrafe den Optimierer
# Überprüfung, ob der Mindest-SoC erreicht wird
final_soc = ems.eauto.ladezustand_in_prozent() # Nimmt den SoC am Ende des Optimierungszeitraums
final_soc = (
ems.eauto.ladezustand_in_prozent()
) # Nimmt den SoC am Ende des Optimierungszeitraums
if (parameter['eauto_min_soc']-ems.eauto.ladezustand_in_prozent()) <= 0.0:
#print (parameter['eauto_min_soc']," " ,ems.eauto.ladezustand_in_prozent()," ",(parameter['eauto_min_soc']-ems.eauto.ladezustand_in_prozent()))
for i in range(0, self.prediction_hours):
if eautocharge_hours_float[i] != 0.0: # Wenn die letzten x Stunden von einem festen Wert abweichen
gesamtbilanz += self.strafe # Bestrafe den Optimierer
eauto_roi = (parameter['eauto_min_soc']-ems.eauto.ladezustand_in_prozent())
individual.extra_data = (o["Gesamtbilanz_Euro"],o["Gesamt_Verluste"], eauto_roi )
if (parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent()) <= 0.0:
# print (parameter['eauto_min_soc']," " ,ems.eauto.ladezustand_in_prozent()," ",(parameter['eauto_min_soc']-ems.eauto.ladezustand_in_prozent()))
for i in range(0, self.prediction_hours):
if (
eautocharge_hours_float[i] != 0.0
): # Wenn die letzten x Stunden von einem festen Wert abweichen
gesamtbilanz += self.strafe # Bestrafe den Optimierer
eauto_roi = parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent()
individual.extra_data = (
o["Gesamtbilanz_Euro"],
o["Gesamt_Verluste"],
eauto_roi,
)
restenergie_akku = ems.akku.aktueller_energieinhalt()
restwert_akku = restenergie_akku*parameter["preis_euro_pro_wh_akku"]
restwert_akku = restenergie_akku * parameter["preis_euro_pro_wh_akku"]
# print(restenergie_akku)
# print(parameter["preis_euro_pro_wh_akku"])
# print(restwert_akku)
# print()
strafe = 0.0
strafe = max(0,(parameter['eauto_min_soc']-ems.eauto.ladezustand_in_prozent()) * self.strafe )
gesamtbilanz += strafe - restwert_akku + strafe_überschreitung
#gesamtbilanz += o["Gesamt_Verluste"]/10000.0
strafe = max(
0,
(parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent())
* self.strafe,
)
gesamtbilanz += strafe - restwert_akku + strafe_überschreitung
# gesamtbilanz += o["Gesamt_Verluste"]/10000.0
return (gesamtbilanz,)
# Genetischer Algorithmus
def optimize(self,start_solution=None):
def optimize(self, start_solution=None):
population = self.toolbox.population(n=300)
hof = tools.HallOfFame(1)
@ -267,104 +300,136 @@ class optimization_problem:
stats.register("min", np.min)
stats.register("max", np.max)
print("Start:",start_solution)
print("Start:", start_solution)
if start_solution is not None and start_solution != -1:
population.insert(0, creator.Individual(start_solution))
population.insert(1, creator.Individual(start_solution))
population.insert(2, creator.Individual(start_solution))
population.insert(0, creator.Individual(start_solution))
population.insert(1, creator.Individual(start_solution))
population.insert(2, creator.Individual(start_solution))
algorithms.eaMuPlusLambda(population, self.toolbox, mu=100, lambda_=200, cxpb=0.5, mutpb=0.3, ngen=400, stats=stats, halloffame=hof, verbose=True)
#algorithms.eaSimple(population, self.toolbox, cxpb=0.3, mutpb=0.3, ngen=200, stats=stats, halloffame=hof, verbose=True)
#algorithms.eaMuCommaLambda(population, self.toolbox, mu=100, lambda_=200, cxpb=0.2, mutpb=0.4, ngen=300, stats=stats, halloffame=hof, verbose=True)
#population, log = differential_evolution(population, self.toolbox, cxpb=0.2, mutpb=0.5, ngen=200, stats=stats, halloffame=hof, verbose=True)
algorithms.eaMuPlusLambda(
population,
self.toolbox,
mu=100,
lambda_=200,
cxpb=0.5,
mutpb=0.3,
ngen=400,
stats=stats,
halloffame=hof,
verbose=True,
)
# algorithms.eaSimple(population, self.toolbox, cxpb=0.3, mutpb=0.3, ngen=200, stats=stats, halloffame=hof, verbose=True)
# algorithms.eaMuCommaLambda(population, self.toolbox, mu=100, lambda_=200, cxpb=0.2, mutpb=0.4, ngen=300, stats=stats, halloffame=hof, verbose=True)
# population, log = differential_evolution(population, self.toolbox, cxpb=0.2, mutpb=0.5, ngen=200, stats=stats, halloffame=hof, verbose=True)
member = {"bilanz":[],"verluste":[],"nebenbedingung":[]}
member = {"bilanz": [], "verluste": [], "nebenbedingung": []}
for ind in population:
if hasattr(ind, 'extra_data'):
extra_value1, extra_value2,extra_value3 = ind.extra_data
member["bilanz"].append(extra_value1)
member["verluste"].append(extra_value2)
member["nebenbedingung"].append(extra_value3)
if hasattr(ind, "extra_data"):
extra_value1, extra_value2, extra_value3 = ind.extra_data
member["bilanz"].append(extra_value1)
member["verluste"].append(extra_value2)
member["nebenbedingung"].append(extra_value3)
return hof[0], member
def optimierung_ems(self,parameter=None, start_hour=None,worst_case=False, startdate=None):
def optimierung_ems(
self, parameter=None, start_hour=None, worst_case=False, startdate=None
):
############
# Parameter
############
if startdate == None:
date = (datetime.now().date() + timedelta(hours = self.prediction_hours)).strftime("%Y-%m-%d")
date_now = datetime.now().strftime("%Y-%m-%d")
date = (
datetime.now().date() + timedelta(hours=self.prediction_hours)
).strftime("%Y-%m-%d")
date_now = datetime.now().strftime("%Y-%m-%d")
else:
date = (startdate + timedelta(hours = self.prediction_hours)).strftime("%Y-%m-%d")
date_now = startdate.strftime("%Y-%m-%d")
#print("Start_date:",date_now)
date = (startdate + timedelta(hours=self.prediction_hours)).strftime(
"%Y-%m-%d"
)
date_now = startdate.strftime("%Y-%m-%d")
# print("Start_date:",date_now)
akku_size = parameter['pv_akku_cap'] # Wh
akku_size = parameter["pv_akku_cap"] # Wh
einspeiseverguetung_euro_pro_wh = np.full(self.prediction_hours, parameter["einspeiseverguetung_euro_pro_wh"]) #= # € / Wh 7/(1000.0*100.0)
discharge_array = np.full(self.prediction_hours,1) #np.array([1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]) #
akku = PVAkku(kapazitaet_wh=akku_size,hours=self.prediction_hours,start_soc_prozent=parameter["pv_soc"], max_ladeleistung_w=5000)
einspeiseverguetung_euro_pro_wh = np.full(
self.prediction_hours, parameter["einspeiseverguetung_euro_pro_wh"]
) # = # € / Wh 7/(1000.0*100.0)
discharge_array = np.full(
self.prediction_hours, 1
) # np.array([1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0]) #
akku = PVAkku(
kapazitaet_wh=akku_size,
hours=self.prediction_hours,
start_soc_prozent=parameter["pv_soc"],
max_ladeleistung_w=5000,
)
akku.set_charge_per_hour(discharge_array)
laden_moeglich = np.full(self.prediction_hours,1) # np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0])
eauto = PVAkku(kapazitaet_wh=parameter["eauto_cap"], hours=self.prediction_hours, lade_effizienz=parameter["eauto_charge_efficiency"], entlade_effizienz=1.0, max_ladeleistung_w=parameter["eauto_charge_power"] ,start_soc_prozent=parameter["eauto_soc"])
laden_moeglich = np.full(
self.prediction_hours, 1
) # np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0])
eauto = PVAkku(
kapazitaet_wh=parameter["eauto_cap"],
hours=self.prediction_hours,
lade_effizienz=parameter["eauto_charge_efficiency"],
entlade_effizienz=1.0,
max_ladeleistung_w=parameter["eauto_charge_power"],
start_soc_prozent=parameter["eauto_soc"],
)
eauto.set_charge_per_hour(laden_moeglich)
min_soc_eauto = parameter['eauto_min_soc']
start_params = parameter['start_solution']
min_soc_eauto = parameter["eauto_min_soc"]
start_params = parameter["start_solution"]
###############
# spuelmaschine
##############
print(parameter)
if parameter["haushaltsgeraet_dauer"] >0:
spuelmaschine = Haushaltsgeraet(hours=self.prediction_hours, verbrauch_kwh=parameter["haushaltsgeraet_wh"], dauer_h=parameter["haushaltsgeraet_dauer"])
spuelmaschine.set_startzeitpunkt(start_hour) # Startet jetzt
if parameter["haushaltsgeraet_dauer"] > 0:
spuelmaschine = Haushaltsgeraet(
hours=self.prediction_hours,
verbrauch_kwh=parameter["haushaltsgeraet_wh"],
dauer_h=parameter["haushaltsgeraet_dauer"],
)
spuelmaschine.set_startzeitpunkt(start_hour) # Startet jetzt
else:
spuelmaschine = None
spuelmaschine = None
###############
# PV Forecast
###############
#PVforecast = PVForecast(filepath=os.path.join(r'test_data', r'pvprognose.json'))
# PVforecast = PVForecast(filepath=os.path.join(r'test_data', r'pvprognose.json'))
# PVforecast = PVForecast(prediction_hours = self.prediction_hours, url=pv_forecast_url)
# #print("PVPOWER",parameter['pvpowernow'])
# if isfloat(parameter['pvpowernow']):
# PVforecast.update_ac_power_measurement(date_time=datetime.now(), ac_power_measurement=float(parameter['pvpowernow']))
# #PVforecast.print_ac_power_and_measurement()
pv_forecast = parameter['pv_forecast'] #PVforecast.get_pv_forecast_for_date_range(date_now,date) #get_forecast_for_date(date)
temperature_forecast = parameter['temperature_forecast'] #PVforecast.get_temperature_for_date_range(date_now,date)
# PVforecast.update_ac_power_measurement(date_time=datetime.now(), ac_power_measurement=float(parameter['pvpowernow']))
# #PVforecast.print_ac_power_and_measurement()
pv_forecast = parameter[
"pv_forecast"
] # PVforecast.get_pv_forecast_for_date_range(date_now,date) #get_forecast_for_date(date)
temperature_forecast = parameter[
"temperature_forecast"
] # PVforecast.get_temperature_for_date_range(date_now,date)
###############
# Strompreise
###############
specific_date_prices = parameter["strompreis_euro_pro_wh"]
print(specific_date_prices)
#print("https://api.akkudoktor.net/prices?start="+date_now+"&end="+date)
# print("https://api.akkudoktor.net/prices?start="+date_now+"&end="+date)
wr = Wechselrichter(10000, akku)
ems = EnergieManagementSystem(gesamtlast = parameter["gesamtlast"], pv_prognose_wh=pv_forecast, strompreis_euro_pro_wh=specific_date_prices, einspeiseverguetung_euro_pro_wh=einspeiseverguetung_euro_pro_wh, eauto=eauto, haushaltsgeraet=spuelmaschine,wechselrichter=wr)
ems = EnergieManagementSystem(
gesamtlast=parameter["gesamtlast"],
pv_prognose_wh=pv_forecast,
strompreis_euro_pro_wh=specific_date_prices,
einspeiseverguetung_euro_pro_wh=einspeiseverguetung_euro_pro_wh,
eauto=eauto,
haushaltsgeraet=spuelmaschine,
wechselrichter=wr,
)
o = ems.simuliere(start_hour)
###############
@ -373,45 +438,61 @@ class optimization_problem:
opti_param = {}
opti_param["haushaltsgeraete"] = 0
if spuelmaschine != None:
opti_param["haushaltsgeraete"] = 1
opti_param["haushaltsgeraete"] = 1
self.setup_deap_environment(opti_param, start_hour)
def evaluate_wrapper(individual):
return self.evaluate(individual, ems, parameter,start_hour,worst_case)
return self.evaluate(individual, ems, parameter, start_hour, worst_case)
self.toolbox.register("evaluate", evaluate_wrapper)
start_solution, extra_data = self.optimize(start_params)
best_solution = start_solution
o = self.evaluate_inner(best_solution, ems,start_hour)
o = self.evaluate_inner(best_solution, ems, start_hour)
eauto = ems.eauto.to_dict()
spuelstart_int = None
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = self.split_individual(best_solution)
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = (
self.split_individual(best_solution)
)
print(parameter)
print(best_solution)
visualisiere_ergebnisse(parameter["gesamtlast"], pv_forecast, specific_date_prices, o,discharge_hours_bin,eautocharge_hours_float , temperature_forecast, start_hour, self.prediction_hours,einspeiseverguetung_euro_pro_wh,extra_data=extra_data)
visualisiere_ergebnisse(
parameter["gesamtlast"],
pv_forecast,
specific_date_prices,
o,
discharge_hours_bin,
eautocharge_hours_float,
temperature_forecast,
start_hour,
self.prediction_hours,
einspeiseverguetung_euro_pro_wh,
extra_data=extra_data,
)
os.system("cp visualisierungsergebnisse.pdf ~/")
# 'Eigenverbrauch_Wh_pro_Stunde': eigenverbrauch_wh_pro_stunde,
# 'Netzeinspeisung_Wh_pro_Stunde': netzeinspeisung_wh_pro_stunde,
# 'Netzbezug_Wh_pro_Stunde': netzbezug_wh_pro_stunde,
# 'Kosten_Euro_pro_Stunde': kosten_euro_pro_stunde,
# 'akku_soc_pro_stunde': akku_soc_pro_stunde,
# 'Einnahmen_Euro_pro_Stunde': einnahmen_euro_pro_stunde,
# 'Gesamtbilanz_Euro': gesamtkosten_euro,
# 'E-Auto_SoC_pro_Stunde':eauto_soc_pro_stunde,
# 'Gesamteinnahmen_Euro': sum(einnahmen_euro_pro_stunde),
# 'Gesamtkosten_Euro': sum(kosten_euro_pro_stunde),
# "Verluste_Pro_Stunde":verluste_wh_pro_stunde,
# "Gesamt_Verluste":sum(verluste_wh_pro_stunde),
# "Haushaltsgeraet_wh_pro_stunde":haushaltsgeraet_wh_pro_stunde
#print(eauto)
return {"discharge_hours_bin":discharge_hours_bin, "eautocharge_hours_float":eautocharge_hours_float ,"result":o ,"eauto_obj":eauto,"start_solution":best_solution,"spuelstart":spuelstart_int,"simulation_data":o}
# 'Eigenverbrauch_Wh_pro_Stunde': eigenverbrauch_wh_pro_stunde,
# 'Netzeinspeisung_Wh_pro_Stunde': netzeinspeisung_wh_pro_stunde,
# 'Netzbezug_Wh_pro_Stunde': netzbezug_wh_pro_stunde,
# 'Kosten_Euro_pro_Stunde': kosten_euro_pro_stunde,
# 'akku_soc_pro_stunde': akku_soc_pro_stunde,
# 'Einnahmen_Euro_pro_Stunde': einnahmen_euro_pro_stunde,
# 'Gesamtbilanz_Euro': gesamtkosten_euro,
# 'E-Auto_SoC_pro_Stunde':eauto_soc_pro_stunde,
# 'Gesamteinnahmen_Euro': sum(einnahmen_euro_pro_stunde),
# 'Gesamtkosten_Euro': sum(kosten_euro_pro_stunde),
# "Verluste_Pro_Stunde":verluste_wh_pro_stunde,
# "Gesamt_Verluste":sum(verluste_wh_pro_stunde),
# "Haushaltsgeraet_wh_pro_stunde":haushaltsgeraet_wh_pro_stunde
# print(eauto)
return {
"discharge_hours_bin": discharge_hours_bin,
"eautocharge_hours_float": eautocharge_hours_float,
"result": o,
"eauto_obj": eauto,
"start_solution": best_solution,
"spuelstart": spuelstart_int,
"simulation_data": o,
}

View File

@ -1,15 +1,25 @@
from flask import Flask, jsonify, request
import numpy as np
from datetime import datetime, timedelta
import hashlib
import json
import os
from datetime import datetime
from pprint import pprint
import json, sys, os
import requests, hashlib
from dateutil import parser
import numpy as np
import pandas as pd
import requests
from dateutil import parser
class ForecastData:
def __init__(self, date_time, dc_power, ac_power, windspeed_10m=None, temperature=None, ac_power_measurement=None):
def __init__(
self,
date_time,
dc_power,
ac_power,
windspeed_10m=None,
temperature=None,
ac_power_measurement=None,
):
self.date_time = date_time
self.dc_power = dc_power
self.ac_power = ac_power
@ -38,8 +48,9 @@ class ForecastData:
def get_temperature(self):
return self.temperature
class PVForecast:
def __init__(self, filepath=None, url=None, cache_dir='cache', prediction_hours=48):
def __init__(self, filepath=None, url=None, cache_dir="cache", prediction_hours=48):
self.meta = {}
self.forecast_data = []
self.cache_dir = cache_dir
@ -54,51 +65,56 @@ class PVForecast:
self.load_data_with_caching(url)
if len(self.forecast_data) < self.prediction_hours:
raise ValueError(f"Die Vorhersage muss mindestens {self.prediction_hours} Stunden umfassen, aber es wurden nur {len(self.forecast_data)} Stunden vorhergesagt.")
raise ValueError(
f"Die Vorhersage muss mindestens {self.prediction_hours} Stunden umfassen, aber es wurden nur {len(self.forecast_data)} Stunden vorhergesagt."
)
def update_ac_power_measurement(self, date_time=None, ac_power_measurement=None):
found = False
input_date_hour = date_time.replace(minute=0, second=0, microsecond=0)
for forecast in self.forecast_data:
forecast_date_hour = parser.parse(forecast.date_time).replace(minute=0, second=0, microsecond=0)
forecast_date_hour = parser.parse(forecast.date_time).replace(
minute=0, second=0, microsecond=0
)
if forecast_date_hour == input_date_hour:
forecast.ac_power_measurement = ac_power_measurement
found = True
break
def process_data(self, data):
self.meta = data.get('meta', {})
all_values = data.get('values', [])
self.meta = data.get("meta", {})
all_values = data.get("values", [])
for i in range(len(all_values[0])): # Annahme, dass alle Listen gleich lang sind
sum_dc_power = sum(values[i]['dcPower'] for values in all_values)
sum_ac_power = sum(values[i]['power'] for values in all_values)
for i in range(
len(all_values[0])
): # Annahme, dass alle Listen gleich lang sind
sum_dc_power = sum(values[i]["dcPower"] for values in all_values)
sum_ac_power = sum(values[i]["power"] for values in all_values)
# Zeige die ursprünglichen und berechneten Zeitstempel an
original_datetime = all_values[0][i].get('datetime')
#print(original_datetime," ",sum_dc_power," ",all_values[0][i]['dcPower'])
original_datetime = all_values[0][i].get("datetime")
# print(original_datetime," ",sum_dc_power," ",all_values[0][i]['dcPower'])
dt = datetime.strptime(original_datetime, "%Y-%m-%dT%H:%M:%S.%f%z")
dt = dt.replace(tzinfo=None)
#iso_datetime = parser.parse(original_datetime).isoformat() # Konvertiere zu ISO-Format
#print()
# iso_datetime = parser.parse(original_datetime).isoformat() # Konvertiere zu ISO-Format
# print()
# Optional: 2 Stunden abziehen, um die Zeitanpassung zu testen
#adjusted_datetime = parser.parse(original_datetime) - timedelta(hours=2)
#print(f"Angepasste Zeitstempel: {adjusted_datetime.isoformat()}")
# adjusted_datetime = parser.parse(original_datetime) - timedelta(hours=2)
# print(f"Angepasste Zeitstempel: {adjusted_datetime.isoformat()}")
forecast = ForecastData(
date_time=dt, # Verwende angepassten Zeitstempel
dc_power=sum_dc_power,
ac_power=sum_ac_power,
windspeed_10m=all_values[0][i].get('windspeed_10m'),
temperature=all_values[0][i].get('temperature')
windspeed_10m=all_values[0][i].get("windspeed_10m"),
temperature=all_values[0][i].get("temperature"),
)
self.forecast_data.append(forecast)
def load_data_from_file(self, filepath):
with open(filepath, 'r') as file:
with open(filepath, "r") as file:
data = json.load(file)
self.process_data(data)
@ -109,31 +125,37 @@ class PVForecast:
pprint(data)
self.process_data(data)
else:
print(f"Failed to load data from {url}. Status Code: {response.status_code}")
print(
f"Failed to load data from {url}. Status Code: {response.status_code}"
)
self.load_data_from_url(url)
def load_data_with_caching(self, url):
date = datetime.now().strftime("%Y-%m-%d")
cache_file = os.path.join(self.cache_dir, self.generate_cache_filename(url, date))
cache_file = os.path.join(
self.cache_dir, self.generate_cache_filename(url, date)
)
if os.path.exists(cache_file):
with open(cache_file, 'r') as file:
with open(cache_file, "r") as file:
data = json.load(file)
print("Loading data from cache.")
else:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
with open(cache_file, 'w') as file:
with open(cache_file, "w") as file:
json.dump(data, file)
print("Data fetched from URL and cached.")
else:
print(f"Failed to load data from {url}. Status Code: {response.status_code}")
print(
f"Failed to load data from {url}. Status Code: {response.status_code}"
)
return
self.process_data(data)
def generate_cache_filename(self, url, date):
cache_key = hashlib.sha256(f"{url}{date}".encode('utf-8')).hexdigest()
cache_key = hashlib.sha256(f"{url}{date}".encode("utf-8")).hexdigest()
return f"cache_{cache_key}.json"
def get_forecast_data(self):
@ -141,7 +163,11 @@ class PVForecast:
def get_temperature_forecast_for_date(self, input_date_str):
input_date = datetime.strptime(input_date_str, "%Y-%m-%d")
daily_forecast_obj = [data for data in self.forecast_data if parser.parse(data.get_date_time()).date() == input_date.date()]
daily_forecast_obj = [
data
for data in self.forecast_data
if parser.parse(data.get_date_time()).date() == input_date.date()
]
daily_forecast = []
for d in daily_forecast_obj:
daily_forecast.append(d.get_temperature())
@ -154,14 +180,18 @@ class PVForecast:
date_range_forecast = []
for data in self.forecast_data:
data_date = data.get_date_time().date()#parser.parse(data.get_date_time()).date()
data_date = (
data.get_date_time().date()
) # parser.parse(data.get_date_time()).date()
if start_date <= data_date <= end_date:
date_range_forecast.append(data)
print(data.get_date_time()," ",data.get_ac_power())
print(data.get_date_time(), " ", data.get_ac_power())
ac_power_forecast = np.array([data.get_ac_power() for data in date_range_forecast])
ac_power_forecast = np.array(
[data.get_ac_power() for data in date_range_forecast]
)
return np.array(ac_power_forecast)[:self.prediction_hours]
return np.array(ac_power_forecast)[: self.prediction_hours]
def get_temperature_for_date_range(self, start_date_str, end_date_str):
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
@ -174,31 +204,41 @@ class PVForecast:
date_range_forecast.append(data)
temperature_forecast = [data.get_temperature() for data in date_range_forecast]
return np.array(temperature_forecast)[:self.prediction_hours]
return np.array(temperature_forecast)[: self.prediction_hours]
def get_forecast_dataframe(self):
# Wandelt die Vorhersagedaten in ein Pandas DataFrame um
data = [{
'date_time': f.get_date_time(),
'dc_power': f.get_dc_power(),
'ac_power': f.get_ac_power(),
'windspeed_10m': f.get_windspeed_10m(),
'temperature': f.get_temperature()
} for f in self.forecast_data]
data = [
{
"date_time": f.get_date_time(),
"dc_power": f.get_dc_power(),
"ac_power": f.get_ac_power(),
"windspeed_10m": f.get_windspeed_10m(),
"temperature": f.get_temperature(),
}
for f in self.forecast_data
]
# Erstelle ein DataFrame
df = pd.DataFrame(data)
return df
def print_ac_power_and_measurement(self):
"""Druckt die DC-Leistung und den Messwert für jede Stunde."""
for forecast in self.forecast_data:
date_time = forecast.date_time
print(f"Zeit: {date_time}, DC: {forecast.dc_power}, AC: {forecast.ac_power}, Messwert: {forecast.ac_power_measurement}, AC GET: {forecast.get_ac_power()}")
print(
f"Zeit: {date_time}, DC: {forecast.dc_power}, AC: {forecast.ac_power}, Messwert: {forecast.ac_power_measurement}, AC GET: {forecast.get_ac_power()}"
)
# Beispiel für die Verwendung der Klasse
if __name__ == '__main__':
forecast = PVForecast(prediction_hours=24, url="https://api.akkudoktor.net/forecast?lat=50.8588&lon=7.3747&power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&timezone=Europe%2FBerlin&hourly=relativehumidity_2m%2Cwindspeed_10m")
forecast.update_ac_power_measurement(date_time=datetime.now(), ac_power_measurement=1000)
if __name__ == "__main__":
forecast = PVForecast(
prediction_hours=24,
url="https://api.akkudoktor.net/forecast?lat=50.8588&lon=7.3747&power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&timezone=Europe%2FBerlin&hourly=relativehumidity_2m%2Cwindspeed_10m",
)
forecast.update_ac_power_measurement(
date_time=datetime.now(), ac_power_measurement=1000
)
forecast.print_ac_power_and_measurement()

View File

@ -1,13 +1,21 @@
import mariadb
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import simpson
from datetime import datetime, timedelta
import mariadb
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
class BatteryDataProcessor:
def __init__(self, config, voltage_high_threshold, voltage_low_threshold, current_low_threshold, gap, battery_capacity_ah):
def __init__(
self,
config,
voltage_high_threshold,
voltage_low_threshold,
current_low_threshold,
gap,
battery_capacity_ah,
):
self.config = config
self.voltage_high_threshold = voltage_high_threshold
self.voltage_low_threshold = voltage_low_threshold
@ -35,32 +43,34 @@ class BatteryDataProcessor:
"""
self.cursor.execute(query, (start_time,))
rows = self.cursor.fetchall()
self.data = pd.DataFrame(rows, columns=['timestamp', 'data', 'topic'])
self.data['timestamp'] = pd.to_datetime(self.data['timestamp'])
self.data['data'] = self.data['data'].astype(float)
self.data = pd.DataFrame(rows, columns=["timestamp", "data", "topic"])
self.data["timestamp"] = pd.to_datetime(self.data["timestamp"])
self.data["data"] = self.data["data"].astype(float)
def process_data(self):
self.data.drop_duplicates(subset=['timestamp', 'topic'], inplace=True)
self.data.drop_duplicates(subset=["timestamp", "topic"], inplace=True)
data_pivot = self.data.pivot(index='timestamp', columns='topic', values='data')
data_pivot = data_pivot.resample('1T').mean().interpolate()
data_pivot = self.data.pivot(index="timestamp", columns="topic", values="data")
data_pivot = data_pivot.resample("1T").mean().interpolate()
data_pivot.columns.name = None
data_pivot.reset_index(inplace=True)
self.data = data_pivot
def group_points(self, df):
df = df.sort_values('timestamp')
df = df.sort_values("timestamp")
groups = []
group = []
last_time = None
for _, row in df.iterrows():
if last_time is None or (row['timestamp'] - last_time) <= pd.Timedelta(minutes=self.gap):
if last_time is None or (row["timestamp"] - last_time) <= pd.Timedelta(
minutes=self.gap
):
group.append(row)
else:
groups.append(group)
group = [row]
last_time = row['timestamp']
last_time = row["timestamp"]
if group:
groups.append(group)
@ -69,11 +79,19 @@ class BatteryDataProcessor:
return last_points
def find_soc_points(self):
condition_soc_100 = (self.data['battery_voltage'] >= self.voltage_high_threshold) & (self.data['battery_current'].abs() <= self.current_low_threshold)
condition_soc_0 = (self.data['battery_voltage'] <= self.voltage_low_threshold) & (self.data['battery_current'].abs() <= self.current_low_threshold)
condition_soc_100 = (
self.data["battery_voltage"] >= self.voltage_high_threshold
) & (self.data["battery_current"].abs() <= self.current_low_threshold)
condition_soc_0 = (
self.data["battery_voltage"] <= self.voltage_low_threshold
) & (self.data["battery_current"].abs() <= self.current_low_threshold)
times_soc_100_all = self.data[condition_soc_100][['timestamp', 'battery_voltage', 'battery_current']]
times_soc_0_all = self.data[condition_soc_0][['timestamp', 'battery_voltage', 'battery_current']]
times_soc_100_all = self.data[condition_soc_100][
["timestamp", "battery_voltage", "battery_current"]
]
times_soc_0_all = self.data[condition_soc_0][
["timestamp", "battery_voltage", "battery_current"]
]
last_points_100 = self.group_points(times_soc_100_all)
last_points_0 = self.group_points(times_soc_0_all)
@ -86,35 +104,46 @@ class BatteryDataProcessor:
def calculate_resetting_soc(self, last_points_100_df, last_points_0_df):
soc_values = []
integration_results = []
reset_points = pd.concat([last_points_100_df, last_points_0_df]).sort_values('timestamp')
reset_points = pd.concat([last_points_100_df, last_points_0_df]).sort_values(
"timestamp"
)
# Initialisieren der SoC-Liste
self.data['calculated_soc'] = np.nan
self.data["calculated_soc"] = np.nan
for i in range(len(reset_points)):
start_point = reset_points.iloc[i]
if i < len(reset_points) - 1:
end_point = reset_points.iloc[i + 1]
else:
end_point = self.data.iloc[-1] # Verwenden des letzten Datensatzes als Endpunkt
end_point = self.data.iloc[
-1
] # Verwenden des letzten Datensatzes als Endpunkt
if start_point['timestamp'] in last_points_100_df['timestamp'].values:
if start_point["timestamp"] in last_points_100_df["timestamp"].values:
initial_soc = 100
elif start_point['timestamp'] in last_points_0_df['timestamp'].values:
elif start_point["timestamp"] in last_points_0_df["timestamp"].values:
initial_soc = 0
cut_data = self.data[(self.data['timestamp'] >= start_point['timestamp']) & (self.data['timestamp'] <= end_point['timestamp'])].copy()
cut_data['time_diff_hours'] = cut_data['timestamp'].diff().dt.total_seconds() / 3600
cut_data.dropna(subset=['time_diff_hours'], inplace=True)
cut_data = self.data[
(self.data["timestamp"] >= start_point["timestamp"])
& (self.data["timestamp"] <= end_point["timestamp"])
].copy()
cut_data["time_diff_hours"] = (
cut_data["timestamp"].diff().dt.total_seconds() / 3600
)
cut_data.dropna(subset=["time_diff_hours"], inplace=True)
calculated_soc = initial_soc
calculated_soc_list = [calculated_soc]
integrated_current = 0
for j in range(1, len(cut_data)):
current = cut_data.iloc[j]['battery_current']
delta_t = cut_data.iloc[j]['time_diff_hours']
delta_soc = (current * delta_t) / self.battery_capacity_ah * 100 # Convert to percentage
current = cut_data.iloc[j]["battery_current"]
delta_t = cut_data.iloc[j]["time_diff_hours"]
delta_soc = (
(current * delta_t) / self.battery_capacity_ah * 100
) # Convert to percentage
calculated_soc += delta_soc
calculated_soc = min(max(calculated_soc, 0), 100) # Clip to 0-100%
@ -123,41 +152,51 @@ class BatteryDataProcessor:
# Integration des Stroms aufaddieren
integrated_current += current * delta_t
cut_data['calculated_soc'] = calculated_soc_list
soc_values.append(cut_data[['timestamp', 'calculated_soc']])
cut_data["calculated_soc"] = calculated_soc_list
soc_values.append(cut_data[["timestamp", "calculated_soc"]])
integration_results.append({
'start_time': start_point['timestamp'],
'end_time': end_point['timestamp'],
'integrated_current': integrated_current,
'start_soc': initial_soc,
'end_soc': calculated_soc_list[-1]
})
integration_results.append(
{
"start_time": start_point["timestamp"],
"end_time": end_point["timestamp"],
"integrated_current": integrated_current,
"start_soc": initial_soc,
"end_soc": calculated_soc_list[-1],
}
)
soc_df = pd.concat(soc_values).drop_duplicates(subset=['timestamp']).reset_index(drop=True)
soc_df = (
pd.concat(soc_values)
.drop_duplicates(subset=["timestamp"])
.reset_index(drop=True)
)
return soc_df, integration_results
def calculate_soh(self, integration_results):
soh_values = []
for result in integration_results:
delta_soc = abs(result['start_soc'] - result['end_soc']) # Use the actual change in SoC
delta_soc = abs(
result["start_soc"] - result["end_soc"]
) # Use the actual change in SoC
if delta_soc > 0: # Avoid division by zero
effective_capacity_ah = result['integrated_current']
effective_capacity_ah = result["integrated_current"]
soh = (effective_capacity_ah / self.battery_capacity_ah) * 100
soh_values.append({'timestamp': result['end_time'], 'soh': soh})
soh_values.append({"timestamp": result["end_time"], "soh": soh})
soh_df = pd.DataFrame(soh_values)
return soh_df
def delete_existing_soc_entries(self, soc_df):
delete_query = """
DELETE FROM pip
WHERE timestamp = %s AND topic = 'calculated_soc'
"""
timestamps = [(row['timestamp'].strftime('%Y-%m-%d %H:%M:%S'),) for _, row in soc_df.iterrows() if pd.notna(row['timestamp'])]
timestamps = [
(row["timestamp"].strftime("%Y-%m-%d %H:%M:%S"),)
for _, row in soc_df.iterrows()
if pd.notna(row["timestamp"])
]
self.cursor.executemany(delete_query, timestamps)
self.conn.commit()
@ -167,9 +206,9 @@ class BatteryDataProcessor:
self.delete_existing_soc_entries(soc_df)
# Resample `soc_df` auf 5-Minuten-Intervalle und berechnen des Mittelwerts
soc_df.set_index('timestamp', inplace=True)
soc_df_resampled = soc_df.resample('5T').mean().dropna().reset_index()
#soc_df_resampled['timestamp'] = soc_df_resampled['timestamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
soc_df.set_index("timestamp", inplace=True)
soc_df_resampled = soc_df.resample("5T").mean().dropna().reset_index()
# soc_df_resampled['timestamp'] = soc_df_resampled['timestamp'].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S'))
print(soc_df_resampled)
# Einfügen der berechneten SoC-Werte in die Datenbank
@ -179,8 +218,11 @@ class BatteryDataProcessor:
"""
for _, row in soc_df_resampled.iterrows():
print(row)
print(row['timestamp'])
record = (row['timestamp'].strftime('%Y-%m-%d %H:%M:%S'), row['calculated_soc'])
print(row["timestamp"])
record = (
row["timestamp"].strftime("%Y-%m-%d %H:%M:%S"),
row["calculated_soc"],
)
try:
self.cursor.execute(insert_query, record)
except mariadb.OperationalError as e:
@ -188,38 +230,57 @@ class BatteryDataProcessor:
self.conn.commit()
def plot_data(self, last_points_100_df, last_points_0_df, soc_df):
plt.figure(figsize=(14, 10))
plt.subplot(4, 1, 1)
plt.plot(self.data['timestamp'], self.data['battery_voltage'], label='Battery Voltage', color='blue')
plt.scatter(last_points_100_df['timestamp'], last_points_100_df['battery_voltage'], color='green', marker='o', label='100% SoC Points')
#plt.scatter(last_points_0_df['timestamp'], last_points_0_df['battery_voltage'], color='red', marker='x', label='0% SoC Points')
plt.xlabel('Timestamp')
plt.ylabel('Voltage (V)')
plt.plot(
self.data["timestamp"],
self.data["battery_voltage"],
label="Battery Voltage",
color="blue",
)
plt.scatter(
last_points_100_df["timestamp"],
last_points_100_df["battery_voltage"],
color="green",
marker="o",
label="100% SoC Points",
)
# plt.scatter(last_points_0_df['timestamp'], last_points_0_df['battery_voltage'], color='red', marker='x', label='0% SoC Points')
plt.xlabel("Timestamp")
plt.ylabel("Voltage (V)")
plt.legend()
plt.title('Battery Voltage over Time')
plt.title("Battery Voltage over Time")
plt.subplot(4, 1, 2)
plt.plot(self.data['timestamp'], self.data['battery_current'], label='Battery Current', color='orange')
plt.scatter(last_points_100_df['timestamp'], last_points_100_df['battery_current'], color='green', marker='o', label='100% SoC Points')
#plt.scatter(last_points_0_df['timestamp'], last_points_0_df['battery_current'], color='red', marker='x', label='0% SoC Points')
plt.xlabel('Timestamp')
plt.ylabel('Current (A)')
plt.plot(
self.data["timestamp"],
self.data["battery_current"],
label="Battery Current",
color="orange",
)
plt.scatter(
last_points_100_df["timestamp"],
last_points_100_df["battery_current"],
color="green",
marker="o",
label="100% SoC Points",
)
# plt.scatter(last_points_0_df['timestamp'], last_points_0_df['battery_current'], color='red', marker='x', label='0% SoC Points')
plt.xlabel("Timestamp")
plt.ylabel("Current (A)")
plt.legend()
plt.title('Battery Current over Time')
plt.title("Battery Current over Time")
plt.subplot(4, 1, 3)
plt.plot(soc_df['timestamp'], soc_df['calculated_soc'], label='SoC', color='purple')
plt.xlabel('Timestamp')
plt.ylabel('SoC (%)')
plt.plot(
soc_df["timestamp"], soc_df["calculated_soc"], label="SoC", color="purple"
)
plt.xlabel("Timestamp")
plt.ylabel("SoC (%)")
plt.legend()
plt.title('State of Charge (SoC) over Time')
plt.title("State of Charge (SoC) over Time")
# plt.subplot(4, 1, 4)
# plt.plot(soh_df['timestamp'], soh_df['soh'], label='SoH', color='brown')
@ -228,16 +289,13 @@ class BatteryDataProcessor:
# plt.legend()
# plt.title('State of Health (SoH) over Time')
plt.tight_layout()
plt.show()
if __name__ == '__main__':
if __name__ == "__main__":
# MariaDB Verbindungsdetails
# Parameter festlegen
voltage_high_threshold = 55.4 # 100% SoC
voltage_low_threshold = 46.5 # 0% SoC
@ -246,26 +304,27 @@ if __name__ == '__main__':
bat_capacity = 33 * 1000 / 48
# Zeitpunkt X definieren
zeitpunkt_x = (datetime.now() - timedelta(weeks=100)).strftime('%Y-%m-%d %H:%M:%S')
zeitpunkt_x = (datetime.now() - timedelta(weeks=100)).strftime("%Y-%m-%d %H:%M:%S")
# BatteryDataProcessor instanziieren und verwenden
processor = BatteryDataProcessor(config, voltage_high_threshold, voltage_low_threshold, current_low_threshold, gap,bat_capacity)
processor = BatteryDataProcessor(
config,
voltage_high_threshold,
voltage_low_threshold,
current_low_threshold,
gap,
bat_capacity,
)
processor.connect_db()
processor.fetch_data(zeitpunkt_x)
processor.process_data()
last_points_100_df, last_points_0_df = processor.find_soc_points()
soc_df, integration_results = processor.calculate_resetting_soc(last_points_100_df, last_points_0_df)
#soh_df = processor.calculate_soh(integration_results)
soc_df, integration_results = processor.calculate_resetting_soc(
last_points_100_df, last_points_0_df
)
# soh_df = processor.calculate_soh(integration_results)
processor.update_database_with_soc(soc_df)
processor.plot_data(last_points_100_df, last_points_0_df, soc_df)
processor.disconnect_db()

View File

@ -1,6 +1,8 @@
import datetime
import pytz
def ist_dst_wechsel(tag, timezone="Europe/Berlin"):
"""Checks if Daylight Saving Time (DST) starts or ends on a given day."""
tz = pytz.timezone(timezone)
@ -17,9 +19,10 @@ def ist_dst_wechsel(tag, timezone="Europe/Berlin"):
return dst_change
# # Example usage
# start_date = datetime.datetime(2024, 3, 31) # Date of the DST change
# if ist_dst_wechsel(start_date):
# prediction_hours = 23 # Adjust to 23 hours for DST change days
# prediction_hours = 23 # Adjust to 23 hours for DST change days
# else:
# prediction_hours = 24 # Default value for days without DST change
# prediction_hours = 24 # Default value for days without DST change

View File

@ -1,23 +1,27 @@
import hashlib
import json
import os
import hashlib
import requests
from datetime import datetime, timedelta
import numpy as np
import pytz
import requests
# Example: Converting a UTC timestamp to local time
utc_time = datetime.strptime('2024-03-28T01:00:00.000Z', '%Y-%m-%dT%H:%M:%S.%fZ')
utc_time = datetime.strptime("2024-03-28T01:00:00.000Z", "%Y-%m-%dT%H:%M:%S.%fZ")
utc_time = utc_time.replace(tzinfo=pytz.utc)
# Replace 'Europe/Berlin' with your own timezone
local_time = utc_time.astimezone(pytz.timezone('Europe/Berlin'))
local_time = utc_time.astimezone(pytz.timezone("Europe/Berlin"))
print(local_time)
def repeat_to_shape(array, target_shape):
# Check if the array fits the target shape
if len(target_shape) != array.ndim:
raise ValueError("Array and target shape must have the same number of dimensions")
raise ValueError(
"Array and target shape must have the same number of dimensions"
)
# Number of repetitions per dimension
repeats = tuple(target_shape[i] // array.shape[i] for i in range(array.ndim))
@ -26,36 +30,39 @@ def repeat_to_shape(array, target_shape):
expanded_array = np.tile(array, repeats)
return expanded_array
class HourlyElectricityPriceForecast:
def __init__(self, source, cache_dir='cache', charges=0.000228, prediction_hours=24): # 228
def __init__(
self, source, cache_dir="cache", charges=0.000228, prediction_hours=24
): # 228
self.cache_dir = cache_dir
os.makedirs(self.cache_dir, exist_ok=True)
self.cache_time_file = os.path.join(self.cache_dir, 'cache_timestamp.txt')
self.cache_time_file = os.path.join(self.cache_dir, "cache_timestamp.txt")
self.prices = self.load_data(source)
self.charges = charges
self.prediction_hours = prediction_hours
def load_data(self, source):
cache_filename = self.get_cache_filename(source)
if source.startswith('http'):
if source.startswith("http"):
if os.path.exists(cache_filename) and not self.is_cache_expired():
print("Loading data from cache...")
with open(cache_filename, 'r') as file:
with open(cache_filename, "r") as file:
json_data = json.load(file)
else:
print("Loading data from the URL...")
response = requests.get(source)
if response.status_code == 200:
json_data = response.json()
with open(cache_filename, 'w') as file:
with open(cache_filename, "w") as file:
json.dump(json_data, file)
self.update_cache_timestamp()
else:
raise Exception(f"Error fetching data: {response.status_code}")
else:
with open(source, 'r') as file:
with open(source, "r") as file:
json_data = json.load(file)
return json_data['values']
return json_data["values"]
def get_cache_filename(self, url):
hash_object = hashlib.sha256(url.encode())
@ -65,34 +72,36 @@ class HourlyElectricityPriceForecast:
def is_cache_expired(self):
if not os.path.exists(self.cache_time_file):
return True
with open(self.cache_time_file, 'r') as file:
with open(self.cache_time_file, "r") as file:
timestamp_str = file.read()
last_cache_time = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
last_cache_time = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")
return datetime.now() - last_cache_time > timedelta(hours=1)
def update_cache_timestamp(self):
with open(self.cache_time_file, 'w') as file:
file.write(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
with open(self.cache_time_file, "w") as file:
file.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def get_price_for_date(self, date_str):
"""Returns all prices for the specified date, including the price from 00:00 of the previous day."""
# Convert date string to datetime object
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
# Calculate the previous day
previous_day = date_obj - timedelta(days=1)
previous_day_str = previous_day.strftime('%Y-%m-%d')
previous_day_str = previous_day.strftime("%Y-%m-%d")
# Extract the price from 00:00 of the previous day
last_price_of_previous_day = [
entry["marketpriceEurocentPerKWh"] + self.charges
for entry in self.prices if previous_day_str in entry['end']
for entry in self.prices
if previous_day_str in entry["end"]
][-1]
# Extract all prices for the specified date
date_prices = [
entry["marketpriceEurocentPerKWh"] + self.charges
for entry in self.prices if date_str in entry['end']
for entry in self.prices
if date_str in entry["end"]
]
print(f"getPrice: {len(date_prices)}")
@ -106,10 +115,14 @@ class HourlyElectricityPriceForecast:
"""Returns all prices between the start and end dates."""
print(start_date_str)
print(end_date_str)
start_date_utc = datetime.strptime(start_date_str, "%Y-%m-%d").replace(tzinfo=pytz.utc)
end_date_utc = datetime.strptime(end_date_str, "%Y-%m-%d").replace(tzinfo=pytz.utc)
start_date = start_date_utc.astimezone(pytz.timezone('Europe/Berlin'))
end_date = end_date_utc.astimezone(pytz.timezone('Europe/Berlin'))
start_date_utc = datetime.strptime(start_date_str, "%Y-%m-%d").replace(
tzinfo=pytz.utc
)
end_date_utc = datetime.strptime(end_date_str, "%Y-%m-%d").replace(
tzinfo=pytz.utc
)
start_date = start_date_utc.astimezone(pytz.timezone("Europe/Berlin"))
end_date = end_date_utc.astimezone(pytz.timezone("Europe/Berlin"))
price_list = []

View File

@ -1,21 +1,34 @@
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from datetime import datetime
from modules.class_sommerzeit import * # Ensure this matches the actual import path
from modules.class_load_container import Gesamtlast # Ensure this matches the actual import path
# Set the backend for matplotlib to Agg
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.backends.backend_pdf import PdfPages
def visualisiere_ergebnisse(gesamtlast, pv_forecast, strompreise, ergebnisse, discharge_hours, laden_moeglich, temperature, start_hour, prediction_hours, einspeiseverguetung_euro_pro_wh, filename="visualization_results.pdf", extra_data=None):
from modules.class_sommerzeit import * # Ensure this matches the actual import path
matplotlib.use("Agg")
def visualisiere_ergebnisse(
gesamtlast,
pv_forecast,
strompreise,
ergebnisse,
discharge_hours,
laden_moeglich,
temperature,
start_hour,
prediction_hours,
einspeiseverguetung_euro_pro_wh,
filename="visualization_results.pdf",
extra_data=None,
):
#####################
# 24-hour visualization
#####################
with PdfPages(filename) as pdf:
# Load and PV generation
plt.figure(figsize=(14, 14))
plt.subplot(3, 3, 1)
@ -23,50 +36,68 @@ def visualisiere_ergebnisse(gesamtlast, pv_forecast, strompreise, ergebnisse, di
gesamtlast_array = np.array(gesamtlast)
# Plot individual loads
plt.plot(hours, gesamtlast_array, label='Load (Wh)', marker='o')
plt.plot(hours, gesamtlast_array, label="Load (Wh)", marker="o")
# Calculate and plot total load
plt.plot(hours, gesamtlast_array, label='Total Load (Wh)', marker='o', linewidth=2, linestyle='--')
plt.xlabel('Hour')
plt.ylabel('Load (Wh)')
plt.title('Load Profiles')
plt.plot(
hours,
gesamtlast_array,
label="Total Load (Wh)",
marker="o",
linewidth=2,
linestyle="--",
)
plt.xlabel("Hour")
plt.ylabel("Load (Wh)")
plt.title("Load Profiles")
plt.grid(True)
plt.legend()
# Electricity prices
hours_p = np.arange(0, len(strompreise))
plt.subplot(3, 2, 2)
plt.plot(hours_p, strompreise, label='Electricity Price (€/Wh)', color='purple', marker='s')
plt.title('Electricity Prices')
plt.xlabel('Hour of the Day')
plt.ylabel('Price (€/Wh)')
plt.plot(
hours_p,
strompreise,
label="Electricity Price (€/Wh)",
color="purple",
marker="s",
)
plt.title("Electricity Prices")
plt.xlabel("Hour of the Day")
plt.ylabel("Price (€/Wh)")
plt.legend()
plt.grid(True)
# PV forecast
plt.subplot(3, 2, 3)
plt.plot(hours, pv_forecast, label='PV Generation (Wh)', marker='x')
plt.title('PV Forecast')
plt.xlabel('Hour of the Day')
plt.ylabel('Wh')
plt.plot(hours, pv_forecast, label="PV Generation (Wh)", marker="x")
plt.title("PV Forecast")
plt.xlabel("Hour of the Day")
plt.ylabel("Wh")
plt.legend()
plt.grid(True)
# Feed-in remuneration
plt.subplot(3, 2, 4)
plt.plot(hours, einspeiseverguetung_euro_pro_wh, label='Remuneration (€/Wh)', marker='x')
plt.title('Remuneration')
plt.xlabel('Hour of the Day')
plt.ylabel('€/Wh')
plt.plot(
hours,
einspeiseverguetung_euro_pro_wh,
label="Remuneration (€/Wh)",
marker="x",
)
plt.title("Remuneration")
plt.xlabel("Hour of the Day")
plt.ylabel("€/Wh")
plt.legend()
plt.grid(True)
# Temperature forecast
plt.subplot(3, 2, 5)
plt.title('Temperature Forecast (°C)')
plt.plot(hours, temperature, label='Temperature (°C)', marker='x')
plt.xlabel('Hour of the Day')
plt.ylabel('°C')
plt.title("Temperature Forecast (°C)")
plt.plot(hours, temperature, label="Temperature (°C)", marker="x")
plt.xlabel("Hour of the Day")
plt.ylabel("°C")
plt.legend()
plt.grid(True)
@ -86,29 +117,69 @@ def visualisiere_ergebnisse(gesamtlast, pv_forecast, strompreise, ergebnisse, di
# Energy flow, grid feed-in, and grid consumption
plt.subplot(3, 2, 1)
plt.plot(hours, ergebnisse['Last_Wh_pro_Stunde'], label='Load (Wh)', marker='o')
plt.plot(hours, ergebnisse['Haushaltsgeraet_wh_pro_stunde'], label='Household Device (Wh)', marker='o')
plt.plot(hours, ergebnisse['Netzeinspeisung_Wh_pro_Stunde'], label='Grid Feed-in (Wh)', marker='x')
plt.plot(hours, ergebnisse['Netzbezug_Wh_pro_Stunde'], label='Grid Consumption (Wh)', marker='^')
plt.plot(hours, ergebnisse['Verluste_Pro_Stunde'], label='Losses (Wh)', marker='^')
plt.title('Energy Flow per Hour')
plt.xlabel('Hour')
plt.ylabel('Energy (Wh)')
plt.plot(hours, ergebnisse["Last_Wh_pro_Stunde"], label="Load (Wh)", marker="o")
plt.plot(
hours,
ergebnisse["Haushaltsgeraet_wh_pro_stunde"],
label="Household Device (Wh)",
marker="o",
)
plt.plot(
hours,
ergebnisse["Netzeinspeisung_Wh_pro_Stunde"],
label="Grid Feed-in (Wh)",
marker="x",
)
plt.plot(
hours,
ergebnisse["Netzbezug_Wh_pro_Stunde"],
label="Grid Consumption (Wh)",
marker="^",
)
plt.plot(
hours, ergebnisse["Verluste_Pro_Stunde"], label="Losses (Wh)", marker="^"
)
plt.title("Energy Flow per Hour")
plt.xlabel("Hour")
plt.ylabel("Energy (Wh)")
plt.legend()
# State of charge for batteries
plt.subplot(3, 2, 2)
plt.plot(hours, ergebnisse['akku_soc_pro_stunde'], label='PV Battery (%)', marker='x')
plt.plot(hours, ergebnisse['E-Auto_SoC_pro_Stunde'], label='E-Car Battery (%)', marker='x')
plt.legend(loc='upper left', bbox_to_anchor=(1, 1)) # Place legend outside the plot
plt.grid(True, which='both', axis='x') # Grid for every hour
plt.plot(
hours, ergebnisse["akku_soc_pro_stunde"], label="PV Battery (%)", marker="x"
)
plt.plot(
hours,
ergebnisse["E-Auto_SoC_pro_Stunde"],
label="E-Car Battery (%)",
marker="x",
)
plt.legend(
loc="upper left", bbox_to_anchor=(1, 1)
) # Place legend outside the plot
plt.grid(True, which="both", axis="x") # Grid for every hour
ax1 = plt.subplot(3, 2, 3)
for hour, value in enumerate(discharge_hours):
ax1.axvspan(hour, hour + 1, color='red', ymax=value, alpha=0.3, label='Discharge Possibility' if hour == 0 else "")
ax1.axvspan(
hour,
hour + 1,
color="red",
ymax=value,
alpha=0.3,
label="Discharge Possibility" if hour == 0 else "",
)
for hour, value in enumerate(laden_moeglich):
ax1.axvspan(hour, hour + 1, color='green', ymax=value, alpha=0.3, label='Charging Possibility' if hour == 0 else "")
ax1.legend(loc='upper left')
ax1.axvspan(
hour,
hour + 1,
color="green",
ymax=value,
alpha=0.3,
label="Charging Possibility" if hour == 0 else "",
)
ax1.legend(loc="upper left")
ax1.set_xlim(0, prediction_hours)
pdf.savefig() # Save the current figure state to the PDF
@ -116,33 +187,45 @@ def visualisiere_ergebnisse(gesamtlast, pv_forecast, strompreise, ergebnisse, di
# Financial overview
fig, axs = plt.subplots(1, 2, figsize=(14, 10)) # Create a 1x2 grid of subplots
total_costs = ergebnisse['Gesamtkosten_Euro']
total_revenue = ergebnisse['Gesamteinnahmen_Euro']
total_balance = ergebnisse['Gesamtbilanz_Euro']
losses = ergebnisse['Gesamt_Verluste']
total_costs = ergebnisse["Gesamtkosten_Euro"]
total_revenue = ergebnisse["Gesamteinnahmen_Euro"]
total_balance = ergebnisse["Gesamtbilanz_Euro"]
losses = ergebnisse["Gesamt_Verluste"]
# Costs and revenues per hour on the first axis (axs[0])
axs[0].plot(hours, ergebnisse['Kosten_Euro_pro_Stunde'], label='Costs (Euro)', marker='o', color='red')
axs[0].plot(hours, ergebnisse['Einnahmen_Euro_pro_Stunde'], label='Revenue (Euro)', marker='x', color='green')
axs[0].set_title('Financial Balance per Hour')
axs[0].set_xlabel('Hour')
axs[0].set_ylabel('Euro')
axs[0].plot(
hours,
ergebnisse["Kosten_Euro_pro_Stunde"],
label="Costs (Euro)",
marker="o",
color="red",
)
axs[0].plot(
hours,
ergebnisse["Einnahmen_Euro_pro_Stunde"],
label="Revenue (Euro)",
marker="x",
color="green",
)
axs[0].set_title("Financial Balance per Hour")
axs[0].set_xlabel("Hour")
axs[0].set_ylabel("Euro")
axs[0].legend()
axs[0].grid(True)
# Summary of finances on the second axis (axs[1])
labels = ['Total Costs [€]', 'Total Revenue [€]', 'Total Balance [€]']
labels = ["Total Costs [€]", "Total Revenue [€]", "Total Balance [€]"]
values = [total_costs, total_revenue, total_balance]
colors = ['red' if value > 0 else 'green' for value in values]
colors = ["red" if value > 0 else "green" for value in values]
axs[1].bar(labels, values, color=colors)
axs[1].set_title('Financial Overview')
axs[1].set_ylabel('Euro')
axs[1].set_title("Financial Overview")
axs[1].set_ylabel("Euro")
# Second axis (ax2) for losses, shared with axs[1]
ax2 = axs[1].twinx()
ax2.bar('Total Losses', losses, color='blue')
ax2.set_ylabel('Losses [Wh]', color='blue')
ax2.tick_params(axis='y', labelcolor='blue')
ax2.bar("Total Losses", losses, color="blue")
ax2.set_ylabel("Losses [Wh]", color="blue")
ax2.tick_params(axis="y", labelcolor="blue")
pdf.savefig() # Save the complete figure to the PDF
plt.close() # Close the figure
@ -154,17 +237,31 @@ def visualisiere_ergebnisse(gesamtlast, pv_forecast, strompreise, ergebnisse, di
f1 = np.array(extra_data["verluste"])
f2 = np.array(extra_data["bilanz"])
n1 = np.array(extra_data["nebenbedingung"])
scatter = plt.scatter(f1, f2, c=n1, cmap='viridis')
scatter = plt.scatter(f1, f2, c=n1, cmap="viridis")
# Add color legend
plt.colorbar(scatter, label='Constraint')
plt.colorbar(scatter, label="Constraint")
pdf.savefig() # Save the complete figure to the PDF
plt.close() # Close the figure
plt.figure(figsize=(14, 10))
filtered_losses = np.array([v for v, n in zip(extra_data["verluste"], extra_data["nebenbedingung"]) if n < 0.01])
filtered_balance = np.array([b for b, n in zip(extra_data["bilanz"], extra_data["nebenbedingung"]) if n < 0.01])
filtered_losses = np.array(
[
v
for v, n in zip(
extra_data["verluste"], extra_data["nebenbedingung"]
)
if n < 0.01
]
)
filtered_balance = np.array(
[
b
for b, n in zip(extra_data["bilanz"], extra_data["nebenbedingung"])
if n < 0.01
]
)
best_loss = min(filtered_losses)
worst_loss = max(filtered_losses)
@ -172,19 +269,21 @@ def visualisiere_ergebnisse(gesamtlast, pv_forecast, strompreise, ergebnisse, di
worst_balance = max(filtered_balance)
data = [filtered_losses, filtered_balance]
labels = ['Losses', 'Balance']
labels = ["Losses", "Balance"]
# Create plots
fig, axs = plt.subplots(1, 2, figsize=(10, 6), sharey=False) # Two subplots, separate y-axes
fig, axs = plt.subplots(
1, 2, figsize=(10, 6), sharey=False
) # Two subplots, separate y-axes
# First violin plot for losses
axs[0].violinplot(data[0], showmeans=True, showmedians=True)
axs[0].set_title('Losses')
axs[0].set_xticklabels(['Losses'])
axs[0].set_title("Losses")
axs[0].set_xticklabels(["Losses"])
# Second violin plot for balance
axs[1].violinplot(data[1], showmeans=True, showmedians=True)
axs[1].set_title('Balance')
axs[1].set_xticklabels(['Balance'])
axs[1].set_title("Balance")
axs[1].set_xticklabels(["Balance"])
# Fine-tuning
plt.tight_layout()

View File

@ -1,35 +1,43 @@
from flask import Flask, jsonify, request
import numpy as np
from datetime import datetime
import modules.class_load as cl
from pprint import pprint
from flask import Flask, jsonify, request
import modules.class_load as cl
app = Flask(__name__)
# Constants
DATE_FORMAT = '%Y-%m-%d'
DATE_FORMAT = "%Y-%m-%d"
EXPECTED_ARRAY_SHAPE = (2, 24)
FILEPATH = r'.\load_profiles.npz'
FILEPATH = r".\load_profiles.npz"
def get_load_forecast(year_energy):
"""Initialize LoadForecast with the given year_energy."""
return cl.LoadForecast(filepath=FILEPATH, year_energy=float(year_energy))
def validate_date(date_str):
"""Validate the date string and return a datetime object."""
try:
return datetime.strptime(date_str, DATE_FORMAT)
except ValueError:
raise ValueError("Date is not in the correct format. Expected format: YYYY-MM-DD.")
raise ValueError(
"Date is not in the correct format. Expected format: YYYY-MM-DD."
)
@app.route('/getdata', methods=['GET'])
@app.route("/getdata", methods=["GET"])
def get_data():
# Retrieve the date and year_energy from query parameters
date_str = request.args.get('date')
year_energy = request.args.get('year_energy')
date_str = request.args.get("date")
year_energy = request.args.get("year_energy")
if not date_str or not year_energy:
return jsonify({"error": "Missing 'date' or 'year_energy' query parameter."}), 400
return jsonify(
{"error": "Missing 'date' or 'year_energy' query parameter."}
), 400
try:
# Validate and convert the date
@ -54,5 +62,6 @@ def get_data():
# Return a generic error message for unexpected errors
return jsonify({"error": "An unexpected error occurred."}), 500
if __name__ == '__main__':
if __name__ == "__main__":
app.run(debug=True)

382
test.py
View File

@ -1,10 +1,6 @@
#!/usr/bin/env python3
import numpy as np
from datetime import datetime
from pprint import pprint
import matplotlib.pyplot as plt
from deap import base, creator, tools, algorithms
# Import necessary modules from the project
from modules.class_optimize import optimization_problem
@ -14,75 +10,355 @@ start_hour = 10
# PV Forecast (in W)
pv_forecast = [
0, 0, 0, 0, 0, 0, 0, 8.05, 352.91, 728.51, 930.28, 1043.25,
1106.74, 1161.69, 6018.82, 5519.07, 3969.88, 3017.96, 1943.07,
1007.17, 319.67, 7.88, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5.04, 335.59,
705.32, 1121.12, 1604.79, 2157.38, 1433.25, 5718.49, 4553.96,
3027.55, 2574.46, 1720.4, 963.4, 383.3, 0, 0, 0
0,
0,
0,
0,
0,
0,
0,
8.05,
352.91,
728.51,
930.28,
1043.25,
1106.74,
1161.69,
6018.82,
5519.07,
3969.88,
3017.96,
1943.07,
1007.17,
319.67,
7.88,
0,
0,
0,
0,
0,
0,
0,
0,
0,
5.04,
335.59,
705.32,
1121.12,
1604.79,
2157.38,
1433.25,
5718.49,
4553.96,
3027.55,
2574.46,
1720.4,
963.4,
383.3,
0,
0,
0,
]
# Temperature Forecast (in degree C)
temperature_forecast = [
18.3, 17.8, 16.9, 16.2, 15.6, 15.1, 14.6, 14.2, 14.3, 14.8, 15.7,
16.7, 17.4, 18.0, 18.6, 19.2, 19.1, 18.7, 18.5, 17.7, 16.2, 14.6,
13.6, 13.0, 12.6, 12.2, 11.7, 11.6, 11.3, 11.0, 10.7, 10.2, 11.4,
14.4, 16.4, 18.3, 19.5, 20.7, 21.9, 22.7, 23.1, 23.1, 22.8, 21.8,
20.2, 19.1, 18.0, 17.4
18.3,
17.8,
16.9,
16.2,
15.6,
15.1,
14.6,
14.2,
14.3,
14.8,
15.7,
16.7,
17.4,
18.0,
18.6,
19.2,
19.1,
18.7,
18.5,
17.7,
16.2,
14.6,
13.6,
13.0,
12.6,
12.2,
11.7,
11.6,
11.3,
11.0,
10.7,
10.2,
11.4,
14.4,
16.4,
18.3,
19.5,
20.7,
21.9,
22.7,
23.1,
23.1,
22.8,
21.8,
20.2,
19.1,
18.0,
17.4,
]
# Electricity Price (in Euro per Wh)
strompreis_euro_pro_wh = [
0.0003384, 0.0003318, 0.0003284, 0.0003283, 0.0003289, 0.0003334,
0.0003290, 0.0003302, 0.0003042, 0.0002430, 0.0002280, 0.0002212,
0.0002093, 0.0001879, 0.0001838, 0.0002004, 0.0002198, 0.0002270,
0.0002997, 0.0003195, 0.0003081, 0.0002969, 0.0002921, 0.0002780,
0.0003384, 0.0003318, 0.0003284, 0.0003283, 0.0003289, 0.0003334,
0.0003290, 0.0003302, 0.0003042, 0.0002430, 0.0002280, 0.0002212,
0.0002093, 0.0001879, 0.0001838, 0.0002004, 0.0002198, 0.0002270,
0.0002997, 0.0003195, 0.0003081, 0.0002969, 0.0002921, 0.0002780
0.0003384,
0.0003318,
0.0003284,
0.0003283,
0.0003289,
0.0003334,
0.0003290,
0.0003302,
0.0003042,
0.0002430,
0.0002280,
0.0002212,
0.0002093,
0.0001879,
0.0001838,
0.0002004,
0.0002198,
0.0002270,
0.0002997,
0.0003195,
0.0003081,
0.0002969,
0.0002921,
0.0002780,
0.0003384,
0.0003318,
0.0003284,
0.0003283,
0.0003289,
0.0003334,
0.0003290,
0.0003302,
0.0003042,
0.0002430,
0.0002280,
0.0002212,
0.0002093,
0.0001879,
0.0001838,
0.0002004,
0.0002198,
0.0002270,
0.0002997,
0.0003195,
0.0003081,
0.0002969,
0.0002921,
0.0002780,
]
# Overall System Load (in W)
gesamtlast = [
676.71, 876.19, 527.13, 468.88, 531.38, 517.95, 483.15, 472.28,
1011.68, 995.00, 1053.07, 1063.91, 1320.56, 1132.03, 1163.67,
1176.82, 1216.22, 1103.78, 1129.12, 1178.71, 1050.98, 988.56,
912.38, 704.61, 516.37, 868.05, 694.34, 608.79, 556.31, 488.89,
506.91, 804.89, 1141.98, 1056.97, 992.46, 1155.99, 827.01,
1257.98, 1232.67, 871.26, 860.88, 1158.03, 1222.72, 1221.04,
949.99, 987.01, 733.99, 592.97
676.71,
876.19,
527.13,
468.88,
531.38,
517.95,
483.15,
472.28,
1011.68,
995.00,
1053.07,
1063.91,
1320.56,
1132.03,
1163.67,
1176.82,
1216.22,
1103.78,
1129.12,
1178.71,
1050.98,
988.56,
912.38,
704.61,
516.37,
868.05,
694.34,
608.79,
556.31,
488.89,
506.91,
804.89,
1141.98,
1056.97,
992.46,
1155.99,
827.01,
1257.98,
1232.67,
871.26,
860.88,
1158.03,
1222.72,
1221.04,
949.99,
987.01,
733.99,
592.97,
]
# Start Solution (binary)
start_solution = [
1, 1, 1, 1, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0,
1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0,
1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0,
1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
1,
1,
1,
1,
0,
1,
0,
0,
1,
1,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
0,
0,
0,
0,
0,
0,
1,
0,
0,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
0,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
1,
]
# Define parameters for the optimization problem
parameter = {
"preis_euro_pro_wh_akku": 10e-05, # Cost of storing energy in battery (per Wh)
'pv_soc': 80, # Initial state of charge (SOC) of PV battery (%)
'pv_akku_cap': 26400, # Battery capacity (in Wh)
'year_energy': 4100000, # Yearly energy consumption (in Wh)
'einspeiseverguetung_euro_pro_wh': 7e-05, # Feed-in tariff for exporting electricity (per Wh)
'max_heizleistung': 1000, # Maximum heating power (in W)
"gesamtlast": gesamtlast, # Overall load on the system
'pv_forecast': pv_forecast, # PV generation forecast (48 hours)
"temperature_forecast": temperature_forecast, # Temperature forecast (48 hours)
"strompreis_euro_pro_wh": strompreis_euro_pro_wh, # Electricity price forecast (48 hours)
'eauto_min_soc': 0, # Minimum SOC for electric car
'eauto_cap': 60000, # Electric car battery capacity (Wh)
'eauto_charge_efficiency': 0.95, # Charging efficiency of the electric car
'eauto_charge_power': 11040, # Charging power of the electric car (W)
'eauto_soc': 54, # Current SOC of the electric car (%)
'pvpowernow': 211.137503624, # Current PV power generation (W)
'start_solution': start_solution, # Initial solution for the optimization
'haushaltsgeraet_wh': 937, # Household appliance consumption (Wh)
'haushaltsgeraet_dauer': 0 # Duration of appliance usage (hours)
# Cost of storing energy in battery (per Wh)
"preis_euro_pro_wh_akku": 10e-05,
# Initial state of charge (SOC) of PV battery (%)
"pv_soc": 80,
# Battery capacity (in Wh)
"pv_akku_cap": 26400,
# Yearly energy consumption (in Wh)
"year_energy": 4100000,
# Feed-in tariff for exporting electricity (per Wh)
"einspeiseverguetung_euro_pro_wh": 7e-05,
# Maximum heating power (in W)
"max_heizleistung": 1000,
# Overall load on the system
"gesamtlast": gesamtlast,
# PV generation forecast (48 hours)
"pv_forecast": pv_forecast,
# Temperature forecast (48 hours)
"temperature_forecast": temperature_forecast,
# Electricity price forecast (48 hours)
"strompreis_euro_pro_wh": strompreis_euro_pro_wh,
# Minimum SOC for electric car
"eauto_min_soc": 0,
# Electric car battery capacity (Wh)
"eauto_cap": 60000,
# Charging efficiency of the electric car
"eauto_charge_efficiency": 0.95,
# Charging power of the electric car (W)
"eauto_charge_power": 11040,
# Current SOC of the electric car (%)
"eauto_soc": 54,
# Current PV power generation (W)
"pvpowernow": 211.137503624,
# Initial solution for the optimization
"start_solution": start_solution,
# Household appliance consumption (Wh)
"haushaltsgeraet_wh": 937,
# Duration of appliance usage (hours)
"haushaltsgeraet_dauer": 0,
}
# Initialize the optimization problem

View File

@ -1 +0,0 @@
from test_heatpump import heatpump

View File

@ -1,9 +1,9 @@
import unittest
import numpy as np
from modules.class_akku import PVAkku
class TestPVAkku(unittest.TestCase):
class TestPVAkku(unittest.TestCase):
def setUp(self):
# Initializing common parameters for tests
self.kapazitaet_wh = 10000 # 10,000 Wh capacity
@ -13,54 +13,129 @@ class TestPVAkku(unittest.TestCase):
self.max_soc_prozent = 80 # Maximum SoC is 80%
def test_initial_state_of_charge(self):
akku = PVAkku(self.kapazitaet_wh, hours=1, start_soc_prozent=50, min_soc_prozent=self.min_soc_prozent, max_soc_prozent=self.max_soc_prozent)
self.assertEqual(akku.ladezustand_in_prozent(), 50.0, "Initial SoC should be 50%")
akku = PVAkku(
self.kapazitaet_wh,
hours=1,
start_soc_prozent=50,
min_soc_prozent=self.min_soc_prozent,
max_soc_prozent=self.max_soc_prozent,
)
self.assertEqual(
akku.ladezustand_in_prozent(), 50.0, "Initial SoC should be 50%"
)
def test_discharge_below_min_soc(self):
akku = PVAkku(self.kapazitaet_wh, hours=1, start_soc_prozent=50, min_soc_prozent=self.min_soc_prozent, max_soc_prozent=self.max_soc_prozent)
akku = PVAkku(
self.kapazitaet_wh,
hours=1,
start_soc_prozent=50,
min_soc_prozent=self.min_soc_prozent,
max_soc_prozent=self.max_soc_prozent,
)
akku.reset()
# Try to discharge more energy than available above min_soc
abgegeben_wh, verlust_wh = akku.energie_abgeben(5000, 0) # Try to discharge 5000 Wh
abgegeben_wh, verlust_wh = akku.energie_abgeben(
5000, 0
) # Try to discharge 5000 Wh
expected_soc = self.min_soc_prozent # SoC should not drop below min_soc
self.assertEqual(akku.ladezustand_in_prozent(), expected_soc, "SoC should not drop below min_soc after discharge")
self.assertEqual(abgegeben_wh, 2640.0, "The energy discharged should be limited by min_soc")
self.assertEqual(
akku.ladezustand_in_prozent(),
expected_soc,
"SoC should not drop below min_soc after discharge",
)
self.assertEqual(
abgegeben_wh, 2640.0, "The energy discharged should be limited by min_soc"
)
def test_charge_above_max_soc(self):
akku = PVAkku(self.kapazitaet_wh, hours=1, start_soc_prozent=50, min_soc_prozent=self.min_soc_prozent, max_soc_prozent=self.max_soc_prozent)
akku = PVAkku(
self.kapazitaet_wh,
hours=1,
start_soc_prozent=50,
min_soc_prozent=self.min_soc_prozent,
max_soc_prozent=self.max_soc_prozent,
)
akku.reset()
# Try to charge more energy than available up to max_soc
geladen_wh, verlust_wh = akku.energie_laden(5000, 0) # Try to charge 5000 Wh
expected_soc = self.max_soc_prozent # SoC should not exceed max_soc
self.assertEqual(akku.ladezustand_in_prozent(), expected_soc, "SoC should not exceed max_soc after charge")
self.assertEqual(geladen_wh, 3000.0, "The energy charged should be limited by max_soc")
self.assertEqual(
akku.ladezustand_in_prozent(),
expected_soc,
"SoC should not exceed max_soc after charge",
)
self.assertEqual(
geladen_wh, 3000.0, "The energy charged should be limited by max_soc"
)
def test_charging_at_max_soc(self):
akku = PVAkku(self.kapazitaet_wh, hours=1, start_soc_prozent=80, min_soc_prozent=self.min_soc_prozent, max_soc_prozent=self.max_soc_prozent)
akku = PVAkku(
self.kapazitaet_wh,
hours=1,
start_soc_prozent=80,
min_soc_prozent=self.min_soc_prozent,
max_soc_prozent=self.max_soc_prozent,
)
akku.reset()
# Try to charge when SoC is already at max_soc
geladen_wh, verlust_wh = akku.energie_laden(5000, 0)
self.assertEqual(geladen_wh, 0.0, "No energy should be charged when at max_soc")
self.assertEqual(akku.ladezustand_in_prozent(), self.max_soc_prozent, "SoC should remain at max_soc")
self.assertEqual(
akku.ladezustand_in_prozent(),
self.max_soc_prozent,
"SoC should remain at max_soc",
)
def test_discharging_at_min_soc(self):
akku = PVAkku(self.kapazitaet_wh, hours=1, start_soc_prozent=20, min_soc_prozent=self.min_soc_prozent, max_soc_prozent=self.max_soc_prozent)
akku = PVAkku(
self.kapazitaet_wh,
hours=1,
start_soc_prozent=20,
min_soc_prozent=self.min_soc_prozent,
max_soc_prozent=self.max_soc_prozent,
)
akku.reset()
# Try to discharge when SoC is already at min_soc
abgegeben_wh, verlust_wh = akku.energie_abgeben(5000, 0)
self.assertEqual(abgegeben_wh, 0.0, "No energy should be discharged when at min_soc")
self.assertEqual(akku.ladezustand_in_prozent(), self.min_soc_prozent, "SoC should remain at min_soc")
self.assertEqual(
abgegeben_wh, 0.0, "No energy should be discharged when at min_soc"
)
self.assertEqual(
akku.ladezustand_in_prozent(),
self.min_soc_prozent,
"SoC should remain at min_soc",
)
def test_soc_limits(self):
# Test to ensure that SoC never exceeds max_soc or drops below min_soc
akku = PVAkku(self.kapazitaet_wh, hours=1, start_soc_prozent=50, min_soc_prozent=self.min_soc_prozent, max_soc_prozent=self.max_soc_prozent)
akku = PVAkku(
self.kapazitaet_wh,
hours=1,
start_soc_prozent=50,
min_soc_prozent=self.min_soc_prozent,
max_soc_prozent=self.max_soc_prozent,
)
akku.reset()
akku.soc_wh = (self.max_soc_prozent / 100) * self.kapazitaet_wh + 1000 # Manually set SoC above max limit
akku.soc_wh = (
self.max_soc_prozent / 100
) * self.kapazitaet_wh + 1000 # Manually set SoC above max limit
akku.soc_wh = min(akku.soc_wh, akku.max_soc_wh)
self.assertLessEqual(akku.ladezustand_in_prozent(), self.max_soc_prozent, "SoC should not exceed max_soc")
self.assertLessEqual(
akku.ladezustand_in_prozent(),
self.max_soc_prozent,
"SoC should not exceed max_soc",
)
akku.soc_wh = (self.min_soc_prozent / 100) * self.kapazitaet_wh - 1000 # Manually set SoC below min limit
akku.soc_wh = (
self.min_soc_prozent / 100
) * self.kapazitaet_wh - 1000 # Manually set SoC below min limit
akku.soc_wh = max(akku.soc_wh, akku.min_soc_wh)
self.assertGreaterEqual(akku.ladezustand_in_prozent(), self.min_soc_prozent, "SoC should not drop below min_soc")
self.assertGreaterEqual(
akku.ladezustand_in_prozent(),
self.min_soc_prozent,
"SoC should not drop below min_soc",
)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()

View File

@ -1,13 +1,14 @@
import pytest
from modules.class_heatpump import Heatpump
@pytest.fixture(scope='function')
@pytest.fixture(scope="function")
def heatpump() -> Heatpump:
""" Heatpump with 5 kw heating power and 24 h prediction
"""
"""Heatpump with 5 kw heating power and 24 h prediction"""
return Heatpump(5000, 24)
class TestHeatpump:
def test_cop(self, heatpump):
"""Testing calculate COP for variouse outside temperatures"""