Ruff format

This commit is contained in:
Chris 2024-10-10 15:00:32 +02:00 committed by Andreas
parent b8e7612bde
commit 141257f514
19 changed files with 101 additions and 286 deletions

View File

@ -18,17 +18,13 @@ class PVAkku:
# Initial state of charge in Wh
self.start_soc_prozent = start_soc_prozent
self.soc_wh = (start_soc_prozent / 100) * kapazitaet_wh
self.hours = (
hours if hours is not None else 24
) # Default to 24 hours if not specified
self.hours = hours if hours is not None else 24 # Default to 24 hours if not specified
self.discharge_array = np.full(self.hours, 1)
self.charge_array = np.full(self.hours, 1)
# Charge and discharge efficiency
self.lade_effizienz = lade_effizienz
self.entlade_effizienz = entlade_effizienz
self.max_ladeleistung_w = (
max_ladeleistung_w if max_ladeleistung_w else self.kapazitaet_wh
)
self.max_ladeleistung_w = max_ladeleistung_w if max_ladeleistung_w else self.kapazitaet_wh
self.min_soc_prozent = min_soc_prozent
self.max_soc_prozent = max_soc_prozent
# Calculate min and max SoC in Wh
@ -92,12 +88,8 @@ class PVAkku:
return 0.0, 0.0 # No energy discharge and no losses
# Calculate the maximum energy that can be discharged considering min_soc and efficiency
max_possible_discharge_wh = (
self.soc_wh - self.min_soc_wh
) * self.entlade_effizienz
max_possible_discharge_wh = max(
max_possible_discharge_wh, 0.0
) # Ensure non-negative
max_possible_discharge_wh = (self.soc_wh - self.min_soc_wh) * self.entlade_effizienz
max_possible_discharge_wh = max(max_possible_discharge_wh, 0.0) # Ensure non-negative
# Consider the maximum discharge power of the battery
max_abgebbar_wh = min(max_possible_discharge_wh, self.max_ladeleistung_w)
@ -107,9 +99,7 @@ class PVAkku:
# Calculate the actual amount withdrawn from the battery (before efficiency loss)
if self.entlade_effizienz > 0:
tatsaechliche_entnahme_wh = (
tatsaechlich_abgegeben_wh / self.entlade_effizienz
)
tatsaechliche_entnahme_wh = tatsaechlich_abgegeben_wh / self.entlade_effizienz
else:
tatsaechliche_entnahme_wh = 0.0
@ -137,9 +127,7 @@ class PVAkku:
# Calculate the maximum energy that can be charged considering max_soc and efficiency
if self.lade_effizienz > 0:
max_possible_charge_wh = (
self.max_soc_wh - self.soc_wh
) / self.lade_effizienz
max_possible_charge_wh = (self.max_soc_wh - self.soc_wh) / self.lade_effizienz
else:
max_possible_charge_wh = 0.0
max_possible_charge_wh = max(max_possible_charge_wh, 0.0) # Ensure non-negative

View File

@ -30,9 +30,7 @@ class EnergieManagementSystem:
def set_eauto_charge_hours(self, ds: List[int]) -> None:
self.eauto.set_charge_per_hour(ds)
def set_haushaltsgeraet_start(
self, ds: List[int], global_start_hour: int = 0
) -> None:
def set_haushaltsgeraet_start(self, ds: List[int], global_start_hour: int = 0) -> None:
self.haushaltsgeraet.set_startzeitpunkt(ds, global_start_hour=global_start_hour)
def reset(self) -> None:
@ -48,9 +46,7 @@ class EnergieManagementSystem:
# Ensure arrays have the same length
lastkurve_wh = self.gesamtlast
assert (
len(lastkurve_wh)
== len(self.pv_prognose_wh)
== len(self.strompreis_euro_pro_wh)
len(lastkurve_wh) == len(self.pv_prognose_wh) == len(self.strompreis_euro_pro_wh)
), f"Array sizes do not match: Load Curve = {len(lastkurve_wh)}, PV Forecast = {len(self.pv_prognose_wh)}, Electricity Price = {len(self.strompreis_euro_pro_wh)}"
# Optimized total hours calculation
@ -86,14 +82,10 @@ class EnergieManagementSystem:
# E-Auto handling
if self.eauto:
geladene_menge_eauto, verluste_eauto = self.eauto.energie_laden(
None, stunde
)
geladene_menge_eauto, verluste_eauto = self.eauto.energie_laden(None, stunde)
verbrauch += geladene_menge_eauto
verluste_wh_pro_stunde[stunde_since_now] += verluste_eauto
eauto_soc_pro_stunde[stunde_since_now] = (
self.eauto.ladezustand_in_prozent()
)
eauto_soc_pro_stunde[stunde_since_now] = self.eauto.ladezustand_in_prozent()
# Process inverter logic
erzeugung = self.pv_prognose_wh[stunde]
@ -117,9 +109,7 @@ class EnergieManagementSystem:
akku_soc_pro_stunde[stunde_since_now] = self.akku.ladezustand_in_prozent()
# Total cost and return
gesamtkosten_euro = np.nansum(kosten_euro_pro_stunde) - np.nansum(
einnahmen_euro_pro_stunde
)
gesamtkosten_euro = np.nansum(kosten_euro_pro_stunde) - np.nansum(einnahmen_euro_pro_stunde)
# Prepare output dictionary
out: Dict[str, Union[np.ndarray, float]] = {

View File

@ -4,9 +4,7 @@ import numpy as np
class Haushaltsgeraet:
def __init__(self, hours=None, verbrauch_wh=None, dauer_h=None):
self.hours = hours # Total duration for which the planning is done
self.verbrauch_wh = (
verbrauch_wh # Total energy consumption of the device in kWh
)
self.verbrauch_wh = verbrauch_wh # Total energy consumption of the device in kWh
self.dauer_h = dauer_h # Duration of use in hours
self.lastkurve = np.zeros(self.hours) # Initialize the load curve with zeros

View File

@ -1,8 +1,6 @@
class Wechselrichter:
def __init__(self, max_leistung_wh, akku):
self.max_leistung_wh = (
max_leistung_wh # Maximum power that the inverter can handle
)
self.max_leistung_wh = max_leistung_wh # Maximum power that the inverter can handle
self.akku = akku # Connection to a battery object
def energie_verarbeiten(self, erzeugung, verbrauch, hour):
@ -16,9 +14,7 @@ class Wechselrichter:
# If consumption exceeds maximum inverter power
verluste += erzeugung - self.max_leistung_wh
restleistung_nach_verbrauch = self.max_leistung_wh - verbrauch
netzbezug = (
-restleistung_nach_verbrauch
) # Negative indicates feeding into the grid
netzbezug = -restleistung_nach_verbrauch # Negative indicates feeding into the grid
eigenverbrauch = self.max_leistung_wh
else:
# Remaining power after consumption
@ -43,25 +39,17 @@ class Wechselrichter:
eigenverbrauch = verbrauch # Self-consumption is equal to the load
else:
benötigte_energie = (
verbrauch - erzeugung
) # Energy needed from external sources
max_akku_leistung = (
self.akku.max_ladeleistung_w
) # Maximum battery discharge power
benötigte_energie = verbrauch - erzeugung # Energy needed from external sources
max_akku_leistung = self.akku.max_ladeleistung_w # Maximum battery discharge power
# Calculate remaining AC power available
rest_ac_leistung = max(self.max_leistung_wh - erzeugung, 0)
# Discharge energy from the battery based on need
if benötigte_energie < rest_ac_leistung:
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(
benötigte_energie, hour
)
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(benötigte_energie, hour)
else:
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(
rest_ac_leistung, hour
)
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(rest_ac_leistung, hour)
verluste += akku_entladeverluste # Include losses from battery discharge
netzbezug = benötigte_energie - aus_akku # Energy drawn from the grid

View File

@ -27,9 +27,7 @@ class LoadForecast:
day_of_year = date.timetuple().tm_yday
# Extract the 24-hour profile for the given date
daily_stats = self.data_year_energy[
day_of_year - 1
] # -1 because indexing starts at 0
daily_stats = self.data_year_energy[day_of_year - 1] # -1 because indexing starts at 0
return daily_stats
def get_hourly_stats(self, date_str, hour):
@ -47,9 +45,7 @@ class LoadForecast:
day_of_year = date.timetuple().tm_yday
# Extract mean and standard deviation for the given hour
hourly_stats = self.data_year_energy[
day_of_year - 1, :, hour
] # Access the specific hour
hourly_stats = self.data_year_energy[day_of_year - 1, :, hour] # Access the specific hour
return hourly_stats
@ -80,9 +76,7 @@ class LoadForecast:
"""Loads data from the specified file."""
try:
data = np.load(self.filepath)
self.data = np.array(
list(zip(data["yearly_profiles"], data["yearly_profiles_std"]))
)
self.data = np.array(list(zip(data["yearly_profiles"], data["yearly_profiles_std"])))
self.data_year_energy = self.data * self.year_energy
# pprint(self.data_year_energy)
except FileNotFoundError:
@ -104,7 +98,5 @@ if __name__ == "__main__":
filepath = r"..\data\load_profiles.npz" # Adjust the path to the .npz file
lf = LoadForecast(filepath=filepath, year_energy=2000)
specific_date_prices = lf.get_daily_stats("2024-02-16") # Adjust date as needed
specific_hour_stats = lf.get_hourly_stats(
"2024-02-16", 12
) # Adjust date and hour as needed
specific_hour_stats = lf.get_hourly_stats("2024-02-16", 12) # Adjust date and hour as needed
print(specific_hour_stats)

View File

@ -14,9 +14,7 @@ class Gesamtlast:
:param last_array: Array of loads, where each entry corresponds to an hour
"""
if len(last_array) != self.prediction_hours:
raise ValueError(
f"Total load inconsistent lengths in arrays: {name} {len(last_array)}"
)
raise ValueError(f"Total load inconsistent lengths in arrays: {name} {len(last_array)}")
self.lasten[name] = last_array
def gesamtlast_berechnen(self):

View File

@ -17,9 +17,7 @@ class LoadPredictionAdjuster:
def _remove_outliers(self, data, threshold=2):
# Calculate the Z-Score of the 'Last' data
data["Z-Score"] = np.abs(
(data["Last"] - data["Last"].mean()) / data["Last"].std()
)
data["Z-Score"] = np.abs((data["Last"] - data["Last"].mean()) / data["Last"].std())
# Filter the data based on the threshold
filtered_data = data[data["Z-Score"] < threshold]
return filtered_data.drop(columns=["Z-Score"])
@ -31,27 +29,19 @@ class LoadPredictionAdjuster:
# Ensure both time columns have the same timezone
if self.measured_data["time"].dt.tz is None:
self.measured_data["time"] = self.measured_data["time"].dt.tz_localize(
"UTC"
)
self.measured_data["time"] = self.measured_data["time"].dt.tz_localize("UTC")
self.predicted_data["time"] = (
self.predicted_data["time"]
.dt.tz_localize("UTC")
.dt.tz_convert("Europe/Berlin")
)
self.measured_data["time"] = self.measured_data["time"].dt.tz_convert(
"Europe/Berlin"
self.predicted_data["time"].dt.tz_localize("UTC").dt.tz_convert("Europe/Berlin")
)
self.measured_data["time"] = self.measured_data["time"].dt.tz_convert("Europe/Berlin")
# Optionally: Remove timezone information if only working locally
self.predicted_data["time"] = self.predicted_data["time"].dt.tz_localize(None)
self.measured_data["time"] = self.measured_data["time"].dt.tz_localize(None)
# Now you can perform the merge
merged_data = pd.merge(
self.measured_data, self.predicted_data, on="time", how="inner"
)
merged_data = pd.merge(self.measured_data, self.predicted_data, on="time", how="inner")
print(merged_data)
merged_data["Hour"] = merged_data["time"].dt.hour
merged_data["DayOfWeek"] = merged_data["time"].dt.dayofweek
@ -59,16 +49,12 @@ class LoadPredictionAdjuster:
def calculate_weighted_mean(self, train_period_weeks=9, test_period_weeks=1):
self.merged_data = self._remove_outliers(self.merged_data)
train_end_date = self.merged_data["time"].max() - pd.Timedelta(
weeks=test_period_weeks
)
train_end_date = self.merged_data["time"].max() - pd.Timedelta(weeks=test_period_weeks)
train_start_date = train_end_date - pd.Timedelta(weeks=train_period_weeks)
test_start_date = train_end_date + pd.Timedelta(hours=1)
test_end_date = (
test_start_date
+ pd.Timedelta(weeks=test_period_weeks)
- pd.Timedelta(hours=1)
test_start_date + pd.Timedelta(weeks=test_period_weeks) - pd.Timedelta(hours=1)
)
self.train_data = self.merged_data[
@ -81,9 +67,7 @@ class LoadPredictionAdjuster:
& (self.merged_data["time"] <= test_end_date)
]
self.train_data["Difference"] = (
self.train_data["Last"] - self.train_data["Last Pred"]
)
self.train_data["Difference"] = self.train_data["Last"] - self.train_data["Last Pred"]
weekdays_train_data = self.train_data[self.train_data["DayOfWeek"] < 5]
weekends_train_data = self.train_data[self.train_data["DayOfWeek"] >= 5]
@ -102,9 +86,7 @@ class LoadPredictionAdjuster:
return weighted_mean
def adjust_predictions(self):
self.train_data["Adjusted Pred"] = self.train_data.apply(
self._adjust_row, axis=1
)
self.train_data["Adjusted Pred"] = self.train_data.apply(self._adjust_row, axis=1)
self.test_data["Adjusted Pred"] = self.test_data.apply(self._adjust_row, axis=1)
def _adjust_row(self, row):
@ -119,9 +101,7 @@ class LoadPredictionAdjuster:
def _plot_data(self, data, data_type):
plt.figure(figsize=(14, 7))
plt.plot(
data["time"], data["Last"], label=f"Actual Last - {data_type}", color="blue"
)
plt.plot(data["time"], data["Last"], label=f"Actual Last - {data_type}", color="blue")
plt.plot(
data["time"],
data["Last Pred"],
@ -144,18 +124,14 @@ class LoadPredictionAdjuster:
plt.show()
def evaluate_model(self):
mse = mean_squared_error(
self.test_data["Last"], self.test_data["Adjusted Pred"]
)
mse = mean_squared_error(self.test_data["Last"], self.test_data["Adjusted Pred"])
r2 = r2_score(self.test_data["Last"], self.test_data["Adjusted Pred"])
print(f"Mean Squared Error: {mse}")
print(f"R-squared: {r2}")
def predict_next_hours(self, hours_ahead):
last_date = self.merged_data["time"].max()
future_dates = [
last_date + pd.Timedelta(hours=i) for i in range(1, hours_ahead + 1)
]
future_dates = [last_date + pd.Timedelta(hours=i) for i in range(1, hours_ahead + 1)]
future_df = pd.DataFrame({"time": future_dates})
future_df["Hour"] = future_df["time"].dt.hour
future_df["DayOfWeek"] = future_df["time"].dt.dayofweek

View File

@ -44,9 +44,7 @@ class optimization_problem:
3. Dishwasher start time (integer if applicable).
"""
discharge_hours_bin = individual[: self.prediction_hours]
eautocharge_hours_float = individual[
self.prediction_hours : self.prediction_hours * 2
]
eautocharge_hours_float = individual[self.prediction_hours : self.prediction_hours * 2]
spuelstart_int = (
individual[-1]
if self.opti_param and self.opti_param.get("haushaltsgeraete", 0) > 0
@ -54,9 +52,7 @@ class optimization_problem:
)
return discharge_hours_bin, eautocharge_hours_float, spuelstart_int
def setup_deap_environment(
self, opti_param: Dict[str, Any], start_hour: int
) -> None:
def setup_deap_environment(self, opti_param: Dict[str, Any], start_hour: int) -> None:
"""
Set up the DEAP environment with fitness and individual creation rules.
"""
@ -97,9 +93,7 @@ class optimization_problem:
)
# Register population, mating, mutation, and selection functions
self.toolbox.register(
"population", tools.initRepeat, list, self.toolbox.individual
)
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
self.toolbox.register("mate", tools.cxTwoPoint)
self.toolbox.register("mutate", tools.mutFlipBit, indpb=0.1)
self.toolbox.register("select", tools.selTournament, tournsize=3)
@ -112,8 +106,8 @@ class optimization_problem:
using the provided individual solution.
"""
ems.reset()
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = (
self.split_individual(individual)
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = self.split_individual(
individual
)
if self.opti_param.get("haushaltsgeraete", 0) > 0:
ems.set_haushaltsgeraet_start(spuelstart_int, global_start_hour=start_hour)
@ -142,9 +136,7 @@ class optimization_problem:
return (100000.0,) # Return a high penalty in case of an exception
gesamtbilanz = o["Gesamtbilanz_Euro"] * (-1.0 if worst_case else 1.0)
discharge_hours_bin, eautocharge_hours_float, _ = self.split_individual(
individual
)
discharge_hours_bin, eautocharge_hours_float, _ = self.split_individual(individual)
max_ladeleistung = np.max(moegliche_ladestroeme_in_prozent)
# Penalty for not discharging
@ -155,9 +147,7 @@ class optimization_problem:
# Penalty for charging the electric vehicle during restricted hours
gesamtbilanz += sum(
self.strafe
for i in range(
self.prediction_hours - self.fixed_eauto_hours, self.prediction_hours
)
for i in range(self.prediction_hours - self.fixed_eauto_hours, self.prediction_hours)
if eautocharge_hours_float[i] != 0.0
)
@ -171,9 +161,7 @@ class optimization_problem:
# Penalty for not meeting the minimum SOC (State of Charge) requirement
if parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent() <= 0.0:
gesamtbilanz += sum(
self.strafe
for ladeleistung in eautocharge_hours_float
if ladeleistung != 0.0
self.strafe for ladeleistung in eautocharge_hours_float if ladeleistung != 0.0
)
individual.extra_data = (
@ -183,14 +171,11 @@ class optimization_problem:
)
# Adjust total balance with battery value and penalties for unmet SOC
restwert_akku = (
ems.akku.aktueller_energieinhalt() * parameter["preis_euro_pro_wh_akku"]
)
restwert_akku = ems.akku.aktueller_energieinhalt() * parameter["preis_euro_pro_wh_akku"]
gesamtbilanz += (
max(
0,
(parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent())
* self.strafe,
(parameter["eauto_min_soc"] - ems.eauto.ladezustand_in_prozent()) * self.strafe,
)
- restwert_akku
)
@ -298,21 +283,17 @@ class optimization_problem:
)
# Setup the DEAP environment and optimization process
self.setup_deap_environment(
{"haushaltsgeraete": 1 if spuelmaschine else 0}, start_hour
)
self.setup_deap_environment({"haushaltsgeraete": 1 if spuelmaschine else 0}, start_hour)
self.toolbox.register(
"evaluate",
lambda ind: self.evaluate(ind, ems, parameter, start_hour, worst_case),
)
start_solution, extra_data = self.optimize(
parameter["start_solution"], ngen=ngen
)
start_solution, extra_data = self.optimize(parameter["start_solution"], ngen=ngen)
# Perform final evaluation on the best solution
o = self.evaluate_inner(start_solution, ems, start_hour)
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = (
self.split_individual(start_solution)
discharge_hours_bin, eautocharge_hours_float, spuelstart_int = self.split_individual(
start_solution
)
# Visualize the results
@ -352,8 +333,7 @@ class optimization_problem:
element_list[0] = None
# Change the NaN to None (JSON)
element_list = [
None if isinstance(x, (int, float)) and np.isnan(x) else x
for x in element_list
None if isinstance(x, (int, float)) and np.isnan(x) else x for x in element_list
]
# Assign the modified list back to the dictionary

View File

@ -69,9 +69,7 @@ class PVForecast:
f"Die Vorhersage muss mindestens {self.prediction_hours} Stunden umfassen, aber es wurden nur {len(self.forecast_data)} Stunden vorhergesagt."
)
def update_ac_power_measurement(
self, date_time=None, ac_power_measurement=None
) -> bool:
def update_ac_power_measurement(self, date_time=None, ac_power_measurement=None) -> bool:
found = False
input_date_hour = date_time.replace(minute=0, second=0, microsecond=0)
@ -89,9 +87,7 @@ class PVForecast:
self.meta = data.get("meta", {})
all_values = data.get("values", [])
for i in range(
len(all_values[0])
): # Annahme, dass alle Listen gleich lang sind
for i in range(len(all_values[0])): # Annahme, dass alle Listen gleich lang sind
sum_dc_power = sum(values[i]["dcPower"] for values in all_values)
sum_ac_power = sum(values[i]["power"] for values in all_values)
@ -128,17 +124,13 @@ class PVForecast:
pprint(data)
self.process_data(data)
else:
print(
f"Failed to load data from {url}. Status Code: {response.status_code}"
)
print(f"Failed to load data from {url}. Status Code: {response.status_code}")
self.load_data_from_url(url)
def load_data_with_caching(self, url):
date = datetime.now().strftime("%Y-%m-%d")
cache_file = os.path.join(
self.cache_dir, self.generate_cache_filename(url, date)
)
cache_file = os.path.join(self.cache_dir, self.generate_cache_filename(url, date))
if os.path.exists(cache_file):
with open(cache_file, "r") as file:
data = json.load(file)
@ -151,9 +143,7 @@ class PVForecast:
json.dump(data, file)
print("Data fetched from URL and cached.")
else:
print(
f"Failed to load data from {url}. Status Code: {response.status_code}"
)
print(f"Failed to load data from {url}. Status Code: {response.status_code}")
return
self.process_data(data)
@ -183,16 +173,12 @@ class PVForecast:
date_range_forecast = []
for data in self.forecast_data:
data_date = (
data.get_date_time().date()
) # parser.parse(data.get_date_time()).date()
data_date = data.get_date_time().date() # parser.parse(data.get_date_time()).date()
if start_date <= data_date <= end_date:
date_range_forecast.append(data)
print(data.get_date_time(), " ", data.get_ac_power())
ac_power_forecast = np.array(
[data.get_ac_power() for data in date_range_forecast]
)
ac_power_forecast = np.array([data.get_ac_power() for data in date_range_forecast])
return np.array(ac_power_forecast)[: self.prediction_hours]
@ -241,7 +227,5 @@ if __name__ == "__main__":
prediction_hours=24,
url="https://api.akkudoktor.net/forecast?lat=50.8588&lon=7.3747&power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&timezone=Europe%2FBerlin&hourly=relativehumidity_2m%2Cwindspeed_10m",
)
forecast.update_ac_power_measurement(
date_time=datetime.now(), ac_power_measurement=1000
)
forecast.update_ac_power_measurement(date_time=datetime.now(), ac_power_measurement=1000)
forecast.print_ac_power_and_measurement()

View File

@ -79,12 +79,12 @@ class BatteryDataProcessor:
return last_points
def find_soc_points(self):
condition_soc_100 = (
self.data["battery_voltage"] >= self.voltage_high_threshold
) & (self.data["battery_current"].abs() <= self.current_low_threshold)
condition_soc_0 = (
self.data["battery_voltage"] <= self.voltage_low_threshold
) & (self.data["battery_current"].abs() <= self.current_low_threshold)
condition_soc_100 = (self.data["battery_voltage"] >= self.voltage_high_threshold) & (
self.data["battery_current"].abs() <= self.current_low_threshold
)
condition_soc_0 = (self.data["battery_voltage"] <= self.voltage_low_threshold) & (
self.data["battery_current"].abs() <= self.current_low_threshold
)
times_soc_100_all = self.data[condition_soc_100][
["timestamp", "battery_voltage", "battery_current"]
@ -104,9 +104,7 @@ class BatteryDataProcessor:
def calculate_resetting_soc(self, last_points_100_df, last_points_0_df):
soc_values = []
integration_results = []
reset_points = pd.concat([last_points_100_df, last_points_0_df]).sort_values(
"timestamp"
)
reset_points = pd.concat([last_points_100_df, last_points_0_df]).sort_values("timestamp")
# Initialisieren der SoC-Liste
self.data["calculated_soc"] = np.nan
@ -116,9 +114,7 @@ class BatteryDataProcessor:
if i < len(reset_points) - 1:
end_point = reset_points.iloc[i + 1]
else:
end_point = self.data.iloc[
-1
] # Verwenden des letzten Datensatzes als Endpunkt
end_point = self.data.iloc[-1] # Verwenden des letzten Datensatzes als Endpunkt
if start_point["timestamp"] in last_points_100_df["timestamp"].values:
initial_soc = 100
@ -129,9 +125,7 @@ class BatteryDataProcessor:
(self.data["timestamp"] >= start_point["timestamp"])
& (self.data["timestamp"] <= end_point["timestamp"])
].copy()
cut_data["time_diff_hours"] = (
cut_data["timestamp"].diff().dt.total_seconds() / 3600
)
cut_data["time_diff_hours"] = cut_data["timestamp"].diff().dt.total_seconds() / 3600
cut_data.dropna(subset=["time_diff_hours"], inplace=True)
calculated_soc = initial_soc
@ -165,20 +159,14 @@ class BatteryDataProcessor:
}
)
soc_df = (
pd.concat(soc_values)
.drop_duplicates(subset=["timestamp"])
.reset_index(drop=True)
)
soc_df = pd.concat(soc_values).drop_duplicates(subset=["timestamp"]).reset_index(drop=True)
return soc_df, integration_results
def calculate_soh(self, integration_results):
soh_values = []
for result in integration_results:
delta_soc = abs(
result["start_soc"] - result["end_soc"]
) # Use the actual change in SoC
delta_soc = abs(result["start_soc"] - result["end_soc"]) # Use the actual change in SoC
if delta_soc > 0: # Avoid division by zero
effective_capacity_ah = result["integrated_current"]
soh = (effective_capacity_ah / self.battery_capacity_ah) * 100
@ -274,9 +262,7 @@ class BatteryDataProcessor:
plt.title("Battery Current over Time")
plt.subplot(4, 1, 3)
plt.plot(
soc_df["timestamp"], soc_df["calculated_soc"], label="SoC", color="purple"
)
plt.plot(soc_df["timestamp"], soc_df["calculated_soc"], label="SoC", color="purple")
plt.xlabel("Timestamp")
plt.ylabel("SoC (%)")
plt.legend()

View File

@ -10,9 +10,7 @@ def ist_dst_wechsel(tag: datetime.datetime, timezone="Europe/Berlin") -> bool:
next_day = current_day + datetime.timedelta(days=1)
# Check if the UTC offsets are different (indicating a DST change)
dst_change = (
current_day.replace(tzinfo=tz).dst() != next_day.replace(tzinfo=tz).dst()
)
dst_change = current_day.replace(tzinfo=tz).dst() != next_day.replace(tzinfo=tz).dst()
return dst_change

View File

@ -11,9 +11,7 @@ import requests
def repeat_to_shape(array, target_shape):
# Check if the array fits the target shape
if len(target_shape) != array.ndim:
raise ValueError(
"Array and target shape must have the same number of dimensions"
)
raise ValueError("Array and target shape must have the same number of dimensions")
# Number of repetitions per dimension
repeats = tuple(target_shape[i] // array.shape[i] for i in range(array.ndim))
@ -24,9 +22,7 @@ def repeat_to_shape(array, target_shape):
class HourlyElectricityPriceForecast:
def __init__(
self, source, cache_dir="cache", charges=0.000228, prediction_hours=24
): # 228
def __init__(self, source, cache_dir="cache", charges=0.000228, prediction_hours=24): # 228
self.cache_dir = cache_dir
os.makedirs(self.cache_dir, exist_ok=True)
self.cache_time_file = os.path.join(self.cache_dir, "cache_timestamp.txt")
@ -107,12 +103,8 @@ class HourlyElectricityPriceForecast:
"""Returns all prices between the start and end dates."""
print(start_date_str)
print(end_date_str)
start_date_utc = datetime.strptime(start_date_str, "%Y-%m-%d").replace(
tzinfo=timezone.utc
)
end_date_utc = datetime.strptime(end_date_str, "%Y-%m-%d").replace(
tzinfo=timezone.utc
)
start_date_utc = datetime.strptime(start_date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
end_date_utc = datetime.strptime(end_date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
start_date = start_date_utc.astimezone(zoneinfo.ZoneInfo("Europe/Berlin"))
end_date = end_date_utc.astimezone(zoneinfo.ZoneInfo("Europe/Berlin"))

View File

@ -26,9 +26,7 @@ def get_start_enddate(prediction_hours=48, startdate=None):
# Parameter
############
if startdate is None:
date = (datetime.now().date() + timedelta(hours=prediction_hours)).strftime(
"%Y-%m-%d"
)
date = (datetime.now().date() + timedelta(hours=prediction_hours)).strftime("%Y-%m-%d")
date_now = datetime.now().strftime("%Y-%m-%d")
else:
date = (startdate + timedelta(hours=prediction_hours)).strftime("%Y-%m-%d")

View File

@ -72,10 +72,7 @@ class Heatpump:
"""
if self.__check_outside_temperature_range__(outside_temperature_celsius):
heat_output = (
(
self.BASE_HEATPOWER
+ outside_temperature_celsius * self.TEMPERATURE_COEFFICIENT
)
(self.BASE_HEATPOWER + outside_temperature_celsius * self.TEMPERATURE_COEFFICIENT)
* 1000
) / 24.0
return min(self.max_heat_output, heat_output)
@ -98,9 +95,7 @@ class Heatpump:
"""
if self.__check_outside_temperature_range__(outside_temperature_celsius):
return (
1164
- 77.8 * outside_temperature_celsius
+ 1.62 * outside_temperature_celsius**2.0
1164 - 77.8 * outside_temperature_celsius + 1.62 * outside_temperature_celsius**2.0
)
else:
err_msg = f"Outside temperature '{outside_temperature_celsius}' not in range (min: -100 Celsius, max: 100 Celsius) "

View File

@ -141,9 +141,7 @@ def visualisiere_ergebnisse(
label="Grid Consumption (Wh)",
marker="^",
)
plt.plot(
hours, ergebnisse["Verluste_Pro_Stunde"], label="Losses (Wh)", marker="^"
)
plt.plot(hours, ergebnisse["Verluste_Pro_Stunde"], label="Losses (Wh)", marker="^")
plt.title("Energy Flow per Hour")
plt.xlabel("Hour")
plt.ylabel("Energy (Wh)")
@ -151,18 +149,14 @@ def visualisiere_ergebnisse(
# State of charge for batteries
plt.subplot(3, 2, 2)
plt.plot(
hours, ergebnisse["akku_soc_pro_stunde"], label="PV Battery (%)", marker="x"
)
plt.plot(hours, ergebnisse["akku_soc_pro_stunde"], label="PV Battery (%)", marker="x")
plt.plot(
hours,
ergebnisse["E-Auto_SoC_pro_Stunde"],
label="E-Car Battery (%)",
marker="x",
)
plt.legend(
loc="upper left", bbox_to_anchor=(1, 1)
) # Place legend outside the plot
plt.legend(loc="upper left", bbox_to_anchor=(1, 1)) # Place legend outside the plot
plt.grid(True, which="both", axis="x") # Grid for every hour
ax1 = plt.subplot(3, 2, 3)
@ -254,18 +248,12 @@ def visualisiere_ergebnisse(
filtered_losses = np.array(
[
v
for v, n in zip(
extra_data["verluste"], extra_data["nebenbedingung"]
)
for v, n in zip(extra_data["verluste"], extra_data["nebenbedingung"])
if n < 0.01
]
)
filtered_balance = np.array(
[
b
for b, n in zip(extra_data["bilanz"], extra_data["nebenbedingung"])
if n < 0.01
]
[b for b, n in zip(extra_data["bilanz"], extra_data["nebenbedingung"]) if n < 0.01]
)
if filtered_losses.size != 0:
best_loss = min(filtered_losses)
@ -281,15 +269,11 @@ def visualisiere_ergebnisse(
) # Two subplots, separate y-axes
# First violin plot for losses
axs[0].violinplot(
data[0], positions=[1], showmeans=True, showmedians=True
)
axs[0].violinplot(data[0], positions=[1], showmeans=True, showmedians=True)
axs[1].set(title="Losses", xticks=[1], xticklabels=["Losses"])
# Second violin plot for balance
axs[1].violinplot(
data[1], positions=[1], showmeans=True, showmedians=True
)
axs[1].violinplot(data[1], positions=[1], showmeans=True, showmedians=True)
axs[1].set(title="Balance", xticks=[1], xticklabels=["Balance"])
# Fine-tuning

View File

@ -54,9 +54,7 @@ def isfloat(num: Any) -> TypeGuard[float]:
@app.route("/strompreis", methods=["GET"])
def flask_strompreis():
# Get the current date and the end date based on prediction hours
date_now, date = get_start_enddate(
prediction_hours, startdate=datetime.now().date()
)
date_now, date = get_start_enddate(prediction_hours, startdate=datetime.now().date())
filepath = os.path.join(
r"test_data", r"strompreise_akkudokAPI.json"
) # Adjust the path to the JSON file
@ -78,9 +76,7 @@ def flask_gesamtlast():
# Extract year_energy and prediction_hours from the request JSON
year_energy = float(data.get("year_energy"))
prediction_hours = int(
data.get("hours", 48)
) # Default to 48 hours if not specified
prediction_hours = int(data.get("hours", 48)) # Default to 48 hours if not specified
# Measured data in JSON format
measured_data_json = data.get("measured_data")
@ -119,9 +115,7 @@ def flask_gesamtlast():
adjuster = LoadPredictionAdjuster(measured_data, predicted_data, lf)
adjuster.calculate_weighted_mean() # Calculate weighted mean for adjustment
adjuster.adjust_predictions() # Adjust predictions based on measured data
future_predictions = adjuster.predict_next_hours(
prediction_hours
) # Predict future load
future_predictions = adjuster.predict_next_hours(prediction_hours) # Predict future load
# Extract household power predictions
leistung_haushalt = future_predictions["Adjusted Pred"].values
@ -160,9 +154,7 @@ def flask_gesamtlast_simple():
0
] # Get expected household load for the date range
gesamtlast = Gesamtlast(
prediction_hours=prediction_hours
) # Create Gesamtlast instance
gesamtlast = Gesamtlast(prediction_hours=prediction_hours) # Create Gesamtlast instance
gesamtlast.hinzufuegen(
"Haushalt", leistung_haushalt
) # Add household load to total load calculation
@ -184,9 +176,7 @@ def flask_pvprognose():
# Retrieve URL and AC power measurement from query parameters
url = request.args.get("url")
ac_power_measurement = request.args.get("ac_power_measurement")
date_now, date = get_start_enddate(
prediction_hours, startdate=datetime.now().date()
)
date_now, date = get_start_enddate(prediction_hours, startdate=datetime.now().date())
###############
# PV Forecast
@ -194,9 +184,7 @@ def flask_pvprognose():
PVforecast = PVForecast(
prediction_hours=prediction_hours, url=url
) # Instantiate PVForecast with given parameters
if isfloat(
ac_power_measurement
): # Check if the AC power measurement is a valid float
if isfloat(ac_power_measurement): # Check if the AC power measurement is a valid float
PVforecast.update_ac_power_measurement(
date_time=datetime.now(),
ac_power_measurement=float(ac_power_measurement),
@ -259,9 +247,7 @@ def flask_optimize():
parameter["min_soc_prozent"] = None
# Perform optimization simulation
result = opt_class.optimierung_ems(
parameter=parameter, start_hour=datetime.now().hour
)
result = opt_class.optimierung_ems(parameter=parameter, start_hour=datetime.now().hour)
print(result)
# convert to JSON (None accepted by dumps)
return jsonify(result)

View File

@ -20,9 +20,7 @@ class TestPVAkku(unittest.TestCase):
min_soc_prozent=self.min_soc_prozent,
max_soc_prozent=self.max_soc_prozent,
)
self.assertEqual(
akku.ladezustand_in_prozent(), 50.0, "Initial SoC should be 50%"
)
self.assertEqual(akku.ladezustand_in_prozent(), 50.0, "Initial SoC should be 50%")
def test_discharge_below_min_soc(self):
akku = PVAkku(
@ -34,18 +32,14 @@ class TestPVAkku(unittest.TestCase):
)
akku.reset()
# Try to discharge more energy than available above min_soc
abgegeben_wh, verlust_wh = akku.energie_abgeben(
5000, 0
) # Try to discharge 5000 Wh
abgegeben_wh, verlust_wh = akku.energie_abgeben(5000, 0) # Try to discharge 5000 Wh
expected_soc = self.min_soc_prozent # SoC should not drop below min_soc
self.assertEqual(
akku.ladezustand_in_prozent(),
expected_soc,
"SoC should not drop below min_soc after discharge",
)
self.assertEqual(
abgegeben_wh, 2640.0, "The energy discharged should be limited by min_soc"
)
self.assertEqual(abgegeben_wh, 2640.0, "The energy discharged should be limited by min_soc")
def test_charge_above_max_soc(self):
akku = PVAkku(
@ -64,9 +58,7 @@ class TestPVAkku(unittest.TestCase):
expected_soc,
"SoC should not exceed max_soc after charge",
)
self.assertEqual(
geladen_wh, 3000.0, "The energy charged should be limited by max_soc"
)
self.assertEqual(geladen_wh, 3000.0, "The energy charged should be limited by max_soc")
def test_charging_at_max_soc(self):
akku = PVAkku(
@ -97,9 +89,7 @@ class TestPVAkku(unittest.TestCase):
akku.reset()
# Try to discharge when SoC is already at min_soc
abgegeben_wh, verlust_wh = akku.energie_abgeben(5000, 0)
self.assertEqual(
abgegeben_wh, 0.0, "No energy should be discharged when at min_soc"
)
self.assertEqual(abgegeben_wh, 0.0, "No energy should be discharged when at min_soc")
self.assertEqual(
akku.ladezustand_in_prozent(),
self.min_soc_prozent,

View File

@ -18,9 +18,7 @@ def create_ems_instance():
Fixture to create an EnergieManagementSystem instance with given test parameters.
"""
# Initialize the battery and the inverter
akku = PVAkku(
kapazitaet_wh=5000, start_soc_prozent=80, hours=48, min_soc_prozent=10
)
akku = PVAkku(kapazitaet_wh=5000, start_soc_prozent=80, hours=48, min_soc_prozent=10)
akku.reset()
wechselrichter = Wechselrichter(10000, akku)
@ -33,9 +31,7 @@ def create_ems_instance():
home_appliance.set_startzeitpunkt(2)
# Example initialization of electric car battery
eauto = PVAkku(
kapazitaet_wh=26400, start_soc_prozent=10, hours=48, min_soc_prozent=10
)
eauto = PVAkku(kapazitaet_wh=26400, start_soc_prozent=10, hours=48, min_soc_prozent=10)
# Parameters based on previous example data
pv_prognose_wh = [

View File

@ -9,9 +9,7 @@ from akkudoktoreos.config import output_dir
DIR_TESTDATA = Path(__file__).parent / "testdata"
@pytest.mark.parametrize(
"fn_in, fn_out", [("optimize_input_1.json", "optimize_result_1.json")]
)
@pytest.mark.parametrize("fn_in, fn_out", [("optimize_input_1.json", "optimize_result_1.json")])
def test_optimize(fn_in, fn_out):
# Load input and output data
with open(DIR_TESTDATA / fn_in, "r") as f_in:
@ -26,9 +24,7 @@ def test_optimize(fn_in, fn_out):
start_hour = 10
# Call the optimization function
ergebnis = opt_class.optimierung_ems(
parameter=input_data, start_hour=start_hour, ngen=3
)
ergebnis = opt_class.optimierung_ems(parameter=input_data, start_hour=start_hour, ngen=3)
# Assert that the output contains all expected entries.
# This does not assert that the optimization always gives the same result!