Structure code in logically separated submodules (#188)

This commit is contained in:
Michael Osthege
2024-11-19 21:47:43 +01:00
committed by GitHub
parent a1cc30f33d
commit 22f72e2f13
31 changed files with 75 additions and 84 deletions

View File

View File

@@ -0,0 +1,270 @@
from typing import Optional
import numpy as np
from pydantic import BaseModel, Field
def max_ladeleistung_w_field(default=None):
return Field(
default,
gt=0,
description="An integer representing the charging power of the battery in watts.",
)
def start_soc_prozent_field(description: str):
return Field(0, ge=0, le=100, description=description)
class BaseAkkuParameters(BaseModel):
kapazitaet_wh: int = Field(
gt=0, description="An integer representing the capacity of the battery in watt-hours."
)
lade_effizienz: float = Field(
0.88, gt=0, le=1, description="A float representing the charging efficiency of the battery."
)
entlade_effizienz: float = Field(0.88, gt=0, le=1)
max_ladeleistung_w: Optional[float] = max_ladeleistung_w_field()
start_soc_prozent: int = start_soc_prozent_field(
"An integer representing the state of charge of the battery at the **start** of the current hour (not the current state)."
)
min_soc_prozent: int = Field(
0,
ge=0,
le=100,
description="An integer representing the minimum state of charge (SOC) of the battery in percentage.",
)
max_soc_prozent: int = Field(100, ge=0, le=100)
class PVAkkuParameters(BaseAkkuParameters):
max_ladeleistung_w: Optional[float] = max_ladeleistung_w_field(5000)
class EAutoParameters(BaseAkkuParameters):
entlade_effizienz: float = 1.0
start_soc_prozent: int = start_soc_prozent_field(
"An integer representing the current state of charge (SOC) of the battery in percentage."
)
class PVAkku:
def __init__(self, parameters: BaseAkkuParameters, hours: int = 24):
# Battery capacity in Wh
self.kapazitaet_wh = parameters.kapazitaet_wh
# Initial state of charge in Wh
self.start_soc_prozent = parameters.start_soc_prozent
self.soc_wh = (parameters.start_soc_prozent / 100) * parameters.kapazitaet_wh
self.hours = hours
self.discharge_array = np.full(self.hours, 1)
self.charge_array = np.full(self.hours, 1)
# Charge and discharge efficiency
self.lade_effizienz = parameters.lade_effizienz
self.entlade_effizienz = parameters.entlade_effizienz
self.max_ladeleistung_w = (
parameters.max_ladeleistung_w if parameters.max_ladeleistung_w else self.kapazitaet_wh
)
# Only assign for storage battery
self.min_soc_prozent = (
parameters.min_soc_prozent if isinstance(parameters, PVAkkuParameters) else 0
)
self.max_soc_prozent = parameters.max_soc_prozent
# Calculate min and max SoC in Wh
self.min_soc_wh = (self.min_soc_prozent / 100) * self.kapazitaet_wh
self.max_soc_wh = (self.max_soc_prozent / 100) * self.kapazitaet_wh
def to_dict(self):
return {
"kapazitaet_wh": self.kapazitaet_wh,
"start_soc_prozent": self.start_soc_prozent,
"soc_wh": self.soc_wh,
"hours": self.hours,
"discharge_array": self.discharge_array.tolist(), # Convert np.array to list
"charge_array": self.charge_array.tolist(),
"lade_effizienz": self.lade_effizienz,
"entlade_effizienz": self.entlade_effizienz,
"max_ladeleistung_w": self.max_ladeleistung_w,
}
@classmethod
def from_dict(cls, data):
# Create a new object with basic data
obj = cls(
kapazitaet_wh=data["kapazitaet_wh"],
hours=data["hours"],
lade_effizienz=data["lade_effizienz"],
entlade_effizienz=data["entlade_effizienz"],
max_ladeleistung_w=data["max_ladeleistung_w"],
start_soc_prozent=data["start_soc_prozent"],
)
# Set arrays
obj.discharge_array = np.array(data["discharge_array"])
obj.charge_array = np.array(data["charge_array"])
obj.soc_wh = data[
"soc_wh"
] # Set current state of charge, which may differ from start_soc_prozent
return obj
def reset(self):
self.soc_wh = (self.start_soc_prozent / 100) * self.kapazitaet_wh
# Ensure soc_wh is within min and max limits
self.soc_wh = min(max(self.soc_wh, self.min_soc_wh), self.max_soc_wh)
self.discharge_array = np.full(self.hours, 1)
self.charge_array = np.full(self.hours, 1)
def set_discharge_per_hour(self, discharge_array):
assert len(discharge_array) == self.hours
self.discharge_array = np.array(discharge_array)
def set_charge_per_hour(self, charge_array):
assert len(charge_array) == self.hours
self.charge_array = np.array(charge_array)
def set_charge_allowed_for_hour(self, charge, hour):
assert hour < self.hours
self.charge_array[hour] = charge
def ladezustand_in_prozent(self):
return (self.soc_wh / self.kapazitaet_wh) * 100
def energie_abgeben(self, wh, hour):
if self.discharge_array[hour] == 0:
return 0.0, 0.0 # No energy discharge and no losses
# Calculate the maximum energy that can be discharged considering min_soc and efficiency
max_possible_discharge_wh = (self.soc_wh - self.min_soc_wh) * self.entlade_effizienz
max_possible_discharge_wh = max(max_possible_discharge_wh, 0.0) # Ensure non-negative
# Consider the maximum discharge power of the battery
max_abgebbar_wh = min(max_possible_discharge_wh, self.max_ladeleistung_w)
# The actually discharged energy cannot exceed requested energy or maximum discharge
tatsaechlich_abgegeben_wh = min(wh, max_abgebbar_wh)
# Calculate the actual amount withdrawn from the battery (before efficiency loss)
if self.entlade_effizienz > 0:
tatsaechliche_entnahme_wh = tatsaechlich_abgegeben_wh / self.entlade_effizienz
else:
tatsaechliche_entnahme_wh = 0.0
# Update the state of charge considering the actual withdrawal
self.soc_wh -= tatsaechliche_entnahme_wh
# Ensure soc_wh does not go below min_soc_wh
self.soc_wh = max(self.soc_wh, self.min_soc_wh)
# Calculate losses due to efficiency
verluste_wh = tatsaechliche_entnahme_wh - tatsaechlich_abgegeben_wh
# Return the actually discharged energy and the losses
return tatsaechlich_abgegeben_wh, verluste_wh
def energie_laden(self, wh, hour, relative_power=0.0):
if hour is not None and self.charge_array[hour] == 0:
return 0, 0 # Charging not allowed in this hour
if relative_power > 0.0:
wh = self.max_ladeleistung_w * relative_power
# If no value for wh is given, use the maximum charging power
wh = wh if wh is not None else self.max_ladeleistung_w
# Calculate the maximum energy that can be charged considering max_soc and efficiency
if self.lade_effizienz > 0:
max_possible_charge_wh = (self.max_soc_wh - self.soc_wh) / self.lade_effizienz
else:
max_possible_charge_wh = 0.0
max_possible_charge_wh = max(max_possible_charge_wh, 0.0) # Ensure non-negative
# The actually charged energy cannot exceed requested energy, charging power, or maximum possible charge
effektive_lademenge = min(wh, max_possible_charge_wh)
# Energy actually stored in the battery
geladene_menge = effektive_lademenge * self.lade_effizienz
# Update soc_wh
self.soc_wh += geladene_menge
# Ensure soc_wh does not exceed max_soc_wh
self.soc_wh = min(self.soc_wh, self.max_soc_wh)
# Calculate losses
verluste_wh = effektive_lademenge - geladene_menge
return geladene_menge, verluste_wh
def aktueller_energieinhalt(self):
"""This method returns the current remaining energy considering efficiency.
It accounts for both charging and discharging efficiency.
"""
# Calculate remaining energy considering discharge efficiency
nutzbare_energie = (self.soc_wh - self.min_soc_wh) * self.entlade_effizienz
return max(nutzbare_energie, 0.0)
if __name__ == "__main__":
# Test battery discharge below min_soc
print("Test: Discharge below min_soc")
akku = PVAkku(
kapazitaet_wh=10000,
hours=1,
start_soc_prozent=50,
min_soc_prozent=20,
max_soc_prozent=80,
)
akku.reset()
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")
# Try to discharge 5000 Wh
abgegeben_wh, verlust_wh = akku.energie_abgeben(5000, 0)
print(f"Energy discharged: {abgegeben_wh} Wh, Losses: {verlust_wh} Wh")
print(f"SoC after discharge: {akku.ladezustand_in_prozent()}%")
print(f"Expected min SoC: {akku.min_soc_prozent}%")
# Test battery charge above max_soc
print("\nTest: Charge above max_soc")
akku = PVAkku(
kapazitaet_wh=10000,
hours=1,
start_soc_prozent=50,
min_soc_prozent=20,
max_soc_prozent=80,
)
akku.reset()
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")
# Try to charge 5000 Wh
geladen_wh, verlust_wh = akku.energie_laden(5000, 0)
print(f"Energy charged: {geladen_wh} Wh, Losses: {verlust_wh} Wh")
print(f"SoC after charge: {akku.ladezustand_in_prozent()}%")
print(f"Expected max SoC: {akku.max_soc_prozent}%")
# Test charging when battery is at max_soc
print("\nTest: Charging when at max_soc")
akku = PVAkku(
kapazitaet_wh=10000,
hours=1,
start_soc_prozent=80,
min_soc_prozent=20,
max_soc_prozent=80,
)
akku.reset()
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")
geladen_wh, verlust_wh = akku.energie_laden(5000, 0)
print(f"Energy charged: {geladen_wh} Wh, Losses: {verlust_wh} Wh")
print(f"SoC after charge: {akku.ladezustand_in_prozent()}%")
# Test discharging when battery is at min_soc
print("\nTest: Discharging when at min_soc")
akku = PVAkku(
kapazitaet_wh=10000,
hours=1,
start_soc_prozent=20,
min_soc_prozent=20,
max_soc_prozent=80,
)
akku.reset()
print(f"Initial SoC: {akku.ladezustand_in_prozent()}%")
abgegeben_wh, verlust_wh = akku.energie_abgeben(5000, 0)
print(f"Energy discharged: {abgegeben_wh} Wh, Losses: {verlust_wh} Wh")
print(f"SoC after discharge: {akku.ladezustand_in_prozent()}%")

View File

@@ -0,0 +1,64 @@
import numpy as np
from pydantic import BaseModel, Field
class HaushaltsgeraetParameters(BaseModel):
verbrauch_wh: int = Field(
gt=0,
description="An integer representing the energy consumption of a household device in watt-hours.",
)
dauer_h: int = Field(
gt=0,
description="An integer representing the usage duration of a household device in hours.",
)
class Haushaltsgeraet:
def __init__(self, parameters: HaushaltsgeraetParameters, hours=24):
self.hours = hours # Total duration for which the planning is done
self.verbrauch_wh = (
parameters.verbrauch_wh # Total energy consumption of the device in kWh
)
self.dauer_h = parameters.dauer_h # Duration of use in hours
self.lastkurve = np.zeros(self.hours) # Initialize the load curve with zeros
def set_startzeitpunkt(self, start_hour, global_start_hour=0):
"""Sets the start time of the device and generates the corresponding load curve.
:param start_hour: The hour at which the device should start.
"""
self.reset()
# Check if the duration of use is within the available time frame
if start_hour + self.dauer_h > self.hours:
raise ValueError("The duration of use exceeds the available time frame.")
if start_hour < global_start_hour:
raise ValueError("The start time is earlier than the available time frame.")
# Calculate power per hour based on total consumption and duration
leistung_pro_stunde = self.verbrauch_wh / self.dauer_h # Convert to watt-hours
# Set the power for the duration of use in the load curve array
self.lastkurve[start_hour : start_hour + self.dauer_h] = leistung_pro_stunde
def reset(self):
"""Resets the load curve."""
self.lastkurve = np.zeros(self.hours)
def get_lastkurve(self):
"""Returns the current load curve."""
return self.lastkurve
def get_last_fuer_stunde(self, hour):
"""Returns the load for a specific hour.
:param hour: The hour for which the load is queried.
:return: The load in watts for the specified hour.
"""
if hour < 0 or hour >= self.hours:
raise ValueError("The specified hour is outside the available time frame.")
return self.lastkurve[hour]
def spaetestmoeglicher_startzeitpunkt(self):
"""Returns the latest possible start time at which the device can still run completely."""
return self.hours - self.dauer_h

View File

@@ -0,0 +1,139 @@
import logging
from typing import List, Sequence
class Heatpump:
MAX_HEATOUTPUT = 5000
"""Maximum heating power in watts"""
BASE_HEATPOWER = 235.0
"""Base heating power value"""
TEMPERATURE_COEFFICIENT = -11.645
"""Coefficient for temperature"""
COP_BASE = 3.0
"""Base COP value"""
COP_COEFFICIENT = 0.1
"""COP increase per degree"""
def __init__(self, max_heat_output, prediction_hours):
self.max_heat_output = max_heat_output
self.prediction_hours = prediction_hours
self.log = logging.getLogger(__name__)
def __check_outside_temperature_range__(self, temp_celsius: float) -> bool:
"""Check if temperature is in valid range between -100 and 100 degree Celsius.
Args:
temp_celsius: Temperature in degree Celsius
Returns:
bool: True if in range
"""
return temp_celsius > -100 and temp_celsius < 100
def calculate_cop(self, outside_temperature_celsius: float) -> float:
"""Calculate the coefficient of performance (COP) based on outside temperature.
Supported temperate range -100 degree Celsius to 100 degree Celsius.
Args:
outside_temperature_celsius: Outside temperature in degree Celsius
Raise:
ValueError: If outside temperature isn't range.
Return:
cop: Calculated COP based on temperature
"""
# TODO: Support for other temperature units (e.g Fahrenheit, Kelvin)
# Check for sensible temperature values
if self.__check_outside_temperature_range__(outside_temperature_celsius):
cop = self.COP_BASE + (outside_temperature_celsius * self.COP_COEFFICIENT)
return max(cop, 1)
else:
err_msg = f"Outside temperature '{outside_temperature_celsius}' not in range (min: -100 Celsius, max: 100 Celsius) "
self.log.error(err_msg)
raise ValueError(err_msg)
def calculate_heating_output(self, outside_temperature_celsius: float) -> float:
"""Calculate the heating output in Watts based on outside temperature in degree Celsius.
Temperature range must be between -100 and 100 degree Celsius.
Args:
outside_temperature_celsius: Outside temperature in degree Celsius
Raises:
ValueError: Raised if outside temperature isn't in described range.
Returns:
heating output: Calculated heating output in Watts.
"""
if self.__check_outside_temperature_range__(outside_temperature_celsius):
heat_output = (
(self.BASE_HEATPOWER + outside_temperature_celsius * self.TEMPERATURE_COEFFICIENT)
* 1000
) / 24.0
return min(self.max_heat_output, heat_output)
else:
err_msg = f"Outside temperature '{outside_temperature_celsius}' not in range (min: -100 Celsius, max: 100 Celsius) "
self.log.error(err_msg)
raise ValueError(err_msg)
def calculate_heat_power(self, outside_temperature_celsius: float) -> float:
"""Calculate electrical power based on outside temperature (degree Celsius).
Args:
outside_temperature_celsius: Temperature in range -100 to 100 degree Celsius.
Raises:
ValueError: Raised if temperature isn't in described range
Returns:
power: Calculated electrical power in Watt.
"""
if self.__check_outside_temperature_range__(outside_temperature_celsius):
return (
1164 - 77.8 * outside_temperature_celsius + 1.62 * outside_temperature_celsius**2.0
)
else:
err_msg = f"Outside temperature '{outside_temperature_celsius}' not in range (min: -100 Celsius, max: 100 Celsius) "
self.log.error(err_msg)
raise ValueError(err_msg)
def simulate_24h(self, temperatures: Sequence[float]) -> List[float]:
"""Simulate power data for 24 hours based on provided temperatures."""
power_data: List[float] = []
if len(temperatures) != self.prediction_hours:
raise ValueError(
f"The temperature array must contain exactly {self.prediction_hours} entries, one for each hour of the day."
)
for temp in temperatures:
power = self.calculate_heat_power(temp)
power_data.append(power)
return power_data
# Example usage of the class
if __name__ == "__main__":
max_heizleistung = 5000 # 5 kW heating power
start_innentemperatur = 15 # Initial indoor temperature
isolationseffizienz = 0.8 # Insulation efficiency
gewuenschte_innentemperatur = 20 # Desired indoor temperature
wp = Heatpump(max_heizleistung, 24) # Initialize heat pump with prediction hours
# Print COP for various outside temperatures
print(wp.calculate_cop(-10), " ", wp.calculate_cop(0), " ", wp.calculate_cop(10))
# 24 hours of outside temperatures (example values)
temperaturen = [ 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -5, -2, 5, ] # fmt: skip
# Calculate the 24-hour power data
leistungsdaten = wp.simulate_24h(temperaturen)
print(leistungsdaten)

View File

@@ -0,0 +1,69 @@
from pydantic import BaseModel, Field
from akkudoktoreos.devices.battery import PVAkku
class WechselrichterParameters(BaseModel):
max_leistung_wh: float = Field(10000, gt=0)
class Wechselrichter:
def __init__(self, parameters: WechselrichterParameters, akku: PVAkku):
self.max_leistung_wh = (
parameters.max_leistung_wh # Maximum power that the inverter can handle
)
self.akku = akku # Connection to a battery object
def energie_verarbeiten(self, erzeugung, verbrauch, hour):
verluste = 0 # Losses during processing
netzeinspeisung = 0 # Grid feed-in
netzbezug = 0.0 # Grid draw
eigenverbrauch = 0.0 # Self-consumption
if erzeugung >= verbrauch:
if verbrauch > self.max_leistung_wh:
# If consumption exceeds maximum inverter power
verluste += erzeugung - self.max_leistung_wh
restleistung_nach_verbrauch = self.max_leistung_wh - verbrauch
netzbezug = -restleistung_nach_verbrauch # Negative indicates feeding into the grid
eigenverbrauch = self.max_leistung_wh
else:
# Remaining power after consumption
restleistung_nach_verbrauch = erzeugung - verbrauch
# Load battery with excess energy
geladene_energie, verluste_laden_akku = self.akku.energie_laden(
restleistung_nach_verbrauch, hour
)
rest_überschuss = restleistung_nach_verbrauch - (
geladene_energie + verluste_laden_akku
)
# Feed-in to the grid based on remaining capacity
if rest_überschuss > self.max_leistung_wh - verbrauch:
netzeinspeisung = self.max_leistung_wh - verbrauch
verluste += rest_überschuss - netzeinspeisung
else:
netzeinspeisung = rest_überschuss
verluste += verluste_laden_akku
eigenverbrauch = verbrauch # Self-consumption is equal to the load
else:
benötigte_energie = verbrauch - erzeugung # Energy needed from external sources
max_akku_leistung = self.akku.max_ladeleistung_w # Maximum battery discharge power
# Calculate remaining AC power available
rest_ac_leistung = max(self.max_leistung_wh - erzeugung, 0)
# Discharge energy from the battery based on need
if benötigte_energie < rest_ac_leistung:
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(benötigte_energie, hour)
else:
aus_akku, akku_entladeverluste = self.akku.energie_abgeben(rest_ac_leistung, hour)
verluste += akku_entladeverluste # Include losses from battery discharge
netzbezug = benötigte_energie - aus_akku # Energy drawn from the grid
eigenverbrauch = erzeugung + aus_akku # Total self-consumption
return netzeinspeisung, netzbezug, verluste, eigenverbrauch