Apply isort and ruff code style

This commit is contained in:
Michael Osthege
2024-10-03 11:05:44 +02:00
committed by Andreas
parent bbaaacaca0
commit 3045d53bd6
23 changed files with 1787 additions and 866 deletions

View File

@@ -1,15 +1,25 @@
from flask import Flask, jsonify, request
import numpy as np
from datetime import datetime, timedelta
import hashlib
import json
import os
from datetime import datetime
from pprint import pprint
import json, sys, os
import requests, hashlib
from dateutil import parser
import numpy as np
import pandas as pd
import requests
from dateutil import parser
class ForecastData:
def __init__(self, date_time, dc_power, ac_power, windspeed_10m=None, temperature=None, ac_power_measurement=None):
def __init__(
self,
date_time,
dc_power,
ac_power,
windspeed_10m=None,
temperature=None,
ac_power_measurement=None,
):
self.date_time = date_time
self.dc_power = dc_power
self.ac_power = ac_power
@@ -38,8 +48,9 @@ class ForecastData:
def get_temperature(self):
return self.temperature
class PVForecast:
def __init__(self, filepath=None, url=None, cache_dir='cache', prediction_hours=48):
def __init__(self, filepath=None, url=None, cache_dir="cache", prediction_hours=48):
self.meta = {}
self.forecast_data = []
self.cache_dir = cache_dir
@@ -54,51 +65,56 @@ class PVForecast:
self.load_data_with_caching(url)
if len(self.forecast_data) < self.prediction_hours:
raise ValueError(f"Die Vorhersage muss mindestens {self.prediction_hours} Stunden umfassen, aber es wurden nur {len(self.forecast_data)} Stunden vorhergesagt.")
raise ValueError(
f"Die Vorhersage muss mindestens {self.prediction_hours} Stunden umfassen, aber es wurden nur {len(self.forecast_data)} Stunden vorhergesagt."
)
def update_ac_power_measurement(self, date_time=None, ac_power_measurement=None):
found = False
input_date_hour = date_time.replace(minute=0, second=0, microsecond=0)
for forecast in self.forecast_data:
forecast_date_hour = parser.parse(forecast.date_time).replace(minute=0, second=0, microsecond=0)
forecast_date_hour = parser.parse(forecast.date_time).replace(
minute=0, second=0, microsecond=0
)
if forecast_date_hour == input_date_hour:
forecast.ac_power_measurement = ac_power_measurement
found = True
break
def process_data(self, data):
self.meta = data.get('meta', {})
all_values = data.get('values', [])
for i in range(len(all_values[0])): # Annahme, dass alle Listen gleich lang sind
sum_dc_power = sum(values[i]['dcPower'] for values in all_values)
sum_ac_power = sum(values[i]['power'] for values in all_values)
self.meta = data.get("meta", {})
all_values = data.get("values", [])
for i in range(
len(all_values[0])
): # Annahme, dass alle Listen gleich lang sind
sum_dc_power = sum(values[i]["dcPower"] for values in all_values)
sum_ac_power = sum(values[i]["power"] for values in all_values)
# Zeige die ursprünglichen und berechneten Zeitstempel an
original_datetime = all_values[0][i].get('datetime')
#print(original_datetime," ",sum_dc_power," ",all_values[0][i]['dcPower'])
original_datetime = all_values[0][i].get("datetime")
# print(original_datetime," ",sum_dc_power," ",all_values[0][i]['dcPower'])
dt = datetime.strptime(original_datetime, "%Y-%m-%dT%H:%M:%S.%f%z")
dt = dt.replace(tzinfo=None)
#iso_datetime = parser.parse(original_datetime).isoformat() # Konvertiere zu ISO-Format
#print()
# iso_datetime = parser.parse(original_datetime).isoformat() # Konvertiere zu ISO-Format
# print()
# Optional: 2 Stunden abziehen, um die Zeitanpassung zu testen
#adjusted_datetime = parser.parse(original_datetime) - timedelta(hours=2)
#print(f"Angepasste Zeitstempel: {adjusted_datetime.isoformat()}")
# adjusted_datetime = parser.parse(original_datetime) - timedelta(hours=2)
# print(f"Angepasste Zeitstempel: {adjusted_datetime.isoformat()}")
forecast = ForecastData(
date_time=dt, # Verwende angepassten Zeitstempel
dc_power=sum_dc_power,
ac_power=sum_ac_power,
windspeed_10m=all_values[0][i].get('windspeed_10m'),
temperature=all_values[0][i].get('temperature')
windspeed_10m=all_values[0][i].get("windspeed_10m"),
temperature=all_values[0][i].get("temperature"),
)
self.forecast_data.append(forecast)
def load_data_from_file(self, filepath):
with open(filepath, 'r') as file:
with open(filepath, "r") as file:
data = json.load(file)
self.process_data(data)
@@ -109,31 +125,37 @@ class PVForecast:
pprint(data)
self.process_data(data)
else:
print(f"Failed to load data from {url}. Status Code: {response.status_code}")
print(
f"Failed to load data from {url}. Status Code: {response.status_code}"
)
self.load_data_from_url(url)
def load_data_with_caching(self, url):
date = datetime.now().strftime("%Y-%m-%d")
cache_file = os.path.join(self.cache_dir, self.generate_cache_filename(url, date))
cache_file = os.path.join(
self.cache_dir, self.generate_cache_filename(url, date)
)
if os.path.exists(cache_file):
with open(cache_file, 'r') as file:
with open(cache_file, "r") as file:
data = json.load(file)
print("Loading data from cache.")
else:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
with open(cache_file, 'w') as file:
with open(cache_file, "w") as file:
json.dump(data, file)
print("Data fetched from URL and cached.")
else:
print(f"Failed to load data from {url}. Status Code: {response.status_code}")
print(
f"Failed to load data from {url}. Status Code: {response.status_code}"
)
return
self.process_data(data)
def generate_cache_filename(self, url, date):
cache_key = hashlib.sha256(f"{url}{date}".encode('utf-8')).hexdigest()
cache_key = hashlib.sha256(f"{url}{date}".encode("utf-8")).hexdigest()
return f"cache_{cache_key}.json"
def get_forecast_data(self):
@@ -141,11 +163,15 @@ class PVForecast:
def get_temperature_forecast_for_date(self, input_date_str):
input_date = datetime.strptime(input_date_str, "%Y-%m-%d")
daily_forecast_obj = [data for data in self.forecast_data if parser.parse(data.get_date_time()).date() == input_date.date()]
daily_forecast_obj = [
data
for data in self.forecast_data
if parser.parse(data.get_date_time()).date() == input_date.date()
]
daily_forecast = []
for d in daily_forecast_obj:
daily_forecast.append(d.get_temperature())
return np.array(daily_forecast)
def get_pv_forecast_for_date_range(self, start_date_str, end_date_str):
@@ -154,51 +180,65 @@ class PVForecast:
date_range_forecast = []
for data in self.forecast_data:
data_date = data.get_date_time().date()#parser.parse(data.get_date_time()).date()
data_date = (
data.get_date_time().date()
) # parser.parse(data.get_date_time()).date()
if start_date <= data_date <= end_date:
date_range_forecast.append(data)
print(data.get_date_time()," ",data.get_ac_power())
ac_power_forecast = np.array([data.get_ac_power() for data in date_range_forecast])
print(data.get_date_time(), " ", data.get_ac_power())
ac_power_forecast = np.array(
[data.get_ac_power() for data in date_range_forecast]
)
return np.array(ac_power_forecast)[: self.prediction_hours]
return np.array(ac_power_forecast)[:self.prediction_hours]
def get_temperature_for_date_range(self, start_date_str, end_date_str):
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()
date_range_forecast = []
for data in self.forecast_data:
data_date = data.get_date_time().date()
if start_date <= data_date <= end_date:
date_range_forecast.append(data)
temperature_forecast = [data.get_temperature() for data in date_range_forecast]
return np.array(temperature_forecast)[:self.prediction_hours]
return np.array(temperature_forecast)[: self.prediction_hours]
def get_forecast_dataframe(self):
# Wandelt die Vorhersagedaten in ein Pandas DataFrame um
data = [{
'date_time': f.get_date_time(),
'dc_power': f.get_dc_power(),
'ac_power': f.get_ac_power(),
'windspeed_10m': f.get_windspeed_10m(),
'temperature': f.get_temperature()
} for f in self.forecast_data]
data = [
{
"date_time": f.get_date_time(),
"dc_power": f.get_dc_power(),
"ac_power": f.get_ac_power(),
"windspeed_10m": f.get_windspeed_10m(),
"temperature": f.get_temperature(),
}
for f in self.forecast_data
]
# Erstelle ein DataFrame
df = pd.DataFrame(data)
return df
def print_ac_power_and_measurement(self):
"""Druckt die DC-Leistung und den Messwert für jede Stunde."""
for forecast in self.forecast_data:
date_time = forecast.date_time
print(f"Zeit: {date_time}, DC: {forecast.dc_power}, AC: {forecast.ac_power}, Messwert: {forecast.ac_power_measurement}, AC GET: {forecast.get_ac_power()}")
print(
f"Zeit: {date_time}, DC: {forecast.dc_power}, AC: {forecast.ac_power}, Messwert: {forecast.ac_power_measurement}, AC GET: {forecast.get_ac_power()}"
)
# Beispiel für die Verwendung der Klasse
if __name__ == '__main__':
forecast = PVForecast(prediction_hours=24, url="https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405&power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&timezone=Europe%2FBerlin&hourly=relativehumidity_2m%2Cwindspeed_10m")
forecast.update_ac_power_measurement(date_time=datetime.now(), ac_power_measurement=1000)
if __name__ == "__main__":
forecast = PVForecast(
prediction_hours=24,
url="https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405&power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&timezone=Europe%2FBerlin&hourly=relativehumidity_2m%2Cwindspeed_10m",
)
forecast.update_ac_power_measurement(
date_time=datetime.now(), ac_power_measurement=1000
)
forecast.print_ac_power_and_measurement()