mirror of
				https://github.com/Akkudoktor-EOS/EOS.git
				synced 2025-11-04 00:36:21 +00:00 
			
		
		
		
	Update class_strompreis.py
initial clean up, translations, os.makedirs improved
This commit is contained in:
		@@ -1,64 +1,61 @@
 | 
			
		||||
import json
 | 
			
		||||
from datetime import datetime, timedelta, timezone
 | 
			
		||||
import os
 | 
			
		||||
import hashlib
 | 
			
		||||
import requests
 | 
			
		||||
from datetime import datetime, timedelta
 | 
			
		||||
import numpy as np
 | 
			
		||||
import json, os
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
import hashlib, requests
 | 
			
		||||
import pytz
 | 
			
		||||
 | 
			
		||||
# Beispiel: Umwandlung eines UTC-Zeitstempels in lokale Zeit
 | 
			
		||||
# Example: Converting a UTC timestamp to local time
 | 
			
		||||
utc_time = datetime.strptime('2024-03-28T01:00:00.000Z', '%Y-%m-%dT%H:%M:%S.%fZ')
 | 
			
		||||
utc_time = utc_time.replace(tzinfo=pytz.utc)
 | 
			
		||||
 | 
			
		||||
# Ersetzen Sie 'Europe/Berlin' mit Ihrer eigenen Zeitzone
 | 
			
		||||
# Replace 'Europe/Berlin' with your own timezone
 | 
			
		||||
local_time = utc_time.astimezone(pytz.timezone('Europe/Berlin'))
 | 
			
		||||
print(local_time)
 | 
			
		||||
 | 
			
		||||
def repeat_to_shape(array, target_shape):
 | 
			
		||||
    # Prüfen , ob das Array in die Zielgröße passt
 | 
			
		||||
    # Check if the array fits the target shape
 | 
			
		||||
    if len(target_shape) != array.ndim:
 | 
			
		||||
        raise ValueError("Array and target shape must have the same number of dimensions")
 | 
			
		||||
 | 
			
		||||
    # die Anzahl der Wiederholungen pro Dimension
 | 
			
		||||
    # Number of repetitions per dimension
 | 
			
		||||
    repeats = tuple(target_shape[i] // array.shape[i] for i in range(array.ndim))
 | 
			
		||||
 | 
			
		||||
    #  np.tile, um das Array zu erweitern
 | 
			
		||||
    # Use np.tile to expand the array
 | 
			
		||||
    expanded_array = np.tile(array, repeats)
 | 
			
		||||
    return expanded_array
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class HourlyElectricityPriceForecast:
 | 
			
		||||
    def __init__(self, source, cache_dir='cache', abgaben=0.000228, prediction_hours=24): #228
 | 
			
		||||
    def __init__(self, source, cache_dir='cache', charges=0.000228, prediction_hours=24):  # 228
 | 
			
		||||
        self.cache_dir = cache_dir
 | 
			
		||||
        if not os.path.exists(self.cache_dir):
 | 
			
		||||
            os.makedirs(self.cache_dir)
 | 
			
		||||
        os.makedirs(self.cache_dir, exist_ok=True)
 | 
			
		||||
        self.cache_time_file = os.path.join(self.cache_dir, 'cache_timestamp.txt')
 | 
			
		||||
        self.prices = self.load_data(source)
 | 
			
		||||
        self.abgaben = abgaben
 | 
			
		||||
        self.charges = charges
 | 
			
		||||
        self.prediction_hours = prediction_hours
 | 
			
		||||
    
 | 
			
		||||
    def load_data(self, source):
 | 
			
		||||
        cache_filename = self.get_cache_filename(source)
 | 
			
		||||
        if source.startswith('http'):
 | 
			
		||||
            if os.path.exists(cache_filename) and not self.is_cache_expired():
 | 
			
		||||
                print("Lade Daten aus dem Cache...")
 | 
			
		||||
                print("Loading data from cache...")
 | 
			
		||||
                with open(cache_filename, 'r') as file:
 | 
			
		||||
                    data = json.load(file)
 | 
			
		||||
                    json_data = json.load(file)
 | 
			
		||||
            else:
 | 
			
		||||
                print("Lade Daten von der URL...")
 | 
			
		||||
                print("Loading data from the URL...")
 | 
			
		||||
                response = requests.get(source)
 | 
			
		||||
                if response.status_code == 200:
 | 
			
		||||
                    data = response.json()
 | 
			
		||||
                    json_data = response.json()
 | 
			
		||||
                    with open(cache_filename, 'w') as file:
 | 
			
		||||
                        json.dump(data, file)
 | 
			
		||||
                        json.dump(json_data, file)
 | 
			
		||||
                    self.update_cache_timestamp()
 | 
			
		||||
                else:
 | 
			
		||||
                    raise Exception(f"Fehler beim Abrufen der Daten: {response.status_code}")
 | 
			
		||||
                    raise Exception(f"Error fetching data: {response.status_code}")
 | 
			
		||||
        else:
 | 
			
		||||
            with open(source, 'r') as file:
 | 
			
		||||
                data = json.load(file)
 | 
			
		||||
        return data['values']
 | 
			
		||||
                json_data = json.load(file)
 | 
			
		||||
        return json_data['values']
 | 
			
		||||
    
 | 
			
		||||
    def get_cache_filename(self, url):
 | 
			
		||||
        hash_object = hashlib.sha256(url.encode())
 | 
			
		||||
@@ -77,52 +74,55 @@ class HourlyElectricityPriceForecast:
 | 
			
		||||
        with open(self.cache_time_file, 'w') as file:
 | 
			
		||||
            file.write(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    def get_price_for_date(self, date_str):
 | 
			
		||||
        """Gibt alle Preise für das spezifizierte Datum zurück."""
 | 
			
		||||
        #date_prices = [entry["marketpriceEurocentPerKWh"]+self.abgaben for entry in self.prices if date_str in entry['end']]
 | 
			
		||||
        """Gibt alle Preise für das spezifizierte Datum zurück, inklusive des Preises von 0:00 des vorherigen Tages."""
 | 
			
		||||
        # Datumskonversion von String zu datetime-Objekt
 | 
			
		||||
        """Returns all prices for the specified date, including the price from 00:00 of the previous day."""
 | 
			
		||||
        # Convert date string to datetime object
 | 
			
		||||
        date_obj = datetime.strptime(date_str, '%Y-%m-%d')
 | 
			
		||||
        
 | 
			
		||||
        # Berechnung des Vortages
 | 
			
		||||
        # Calculate the previous day
 | 
			
		||||
        previous_day = date_obj - timedelta(days=1)
 | 
			
		||||
        previous_day_str = previous_day.strftime('%Y-%m-%d')
 | 
			
		||||
        
 | 
			
		||||
        # Extrahieren des Preises von 0:00 des vorherigen Tages
 | 
			
		||||
        last_price_of_previous_day = [entry["marketpriceEurocentPerKWh"]+self.abgaben for entry in self.prices if previous_day_str in entry['end']][-1]
 | 
			
		||||
        # Extract the price from 00:00 of the previous day
 | 
			
		||||
        last_price_of_previous_day = [
 | 
			
		||||
            entry["marketpriceEurocentPerKWh"] + self.charges
 | 
			
		||||
            for entry in self.prices if previous_day_str in entry['end']
 | 
			
		||||
        ][-1]
 | 
			
		||||
        
 | 
			
		||||
        # Extrahieren aller Preise für das spezifizierte Datum
 | 
			
		||||
        date_prices = [entry["marketpriceEurocentPerKWh"]+self.abgaben for entry in self.prices if date_str in entry['end']]
 | 
			
		||||
        print("getPRice:",len(date_prices))
 | 
			
		||||
        # Extract all prices for the specified date
 | 
			
		||||
        date_prices = [
 | 
			
		||||
            entry["marketpriceEurocentPerKWh"] + self.charges
 | 
			
		||||
            for entry in self.prices if date_str in entry['end']
 | 
			
		||||
        ]
 | 
			
		||||
        print(f"getPrice: {len(date_prices)}")
 | 
			
		||||
        
 | 
			
		||||
        # Hinzufügen des letzten Preises des vorherigen Tages am Anfang der Liste
 | 
			
		||||
        # Add the last price of the previous day at the start of the list
 | 
			
		||||
        if len(date_prices) == 23:
 | 
			
		||||
                date_prices.insert(0, last_price_of_previous_day)
 | 
			
		||||
            date_prices.insert(0, last_price_of_previous_day)
 | 
			
		||||
 | 
			
		||||
        return np.array(date_prices)/(1000.0*100.0) + self.abgaben
 | 
			
		||||
        return np.array(date_prices) / (1000.0 * 100.0) + self.charges
 | 
			
		||||
    
 | 
			
		||||
    def get_price_for_daterange(self, start_date_str, end_date_str):
 | 
			
		||||
        """Returns all prices between the start and end dates."""
 | 
			
		||||
        print(start_date_str)
 | 
			
		||||
        print(end_date_str)
 | 
			
		||||
        """Gibt alle Preise zwischen dem Start- und Enddatum zurück."""
 | 
			
		||||
        start_date_utc = datetime.strptime(start_date_str, "%Y-%m-%d").replace(tzinfo=pytz.utc)
 | 
			
		||||
        end_date_utc = datetime.strptime(end_date_str, "%Y-%m-%d").replace(tzinfo=pytz.utc)
 | 
			
		||||
        start_date = start_date_utc.astimezone(pytz.timezone('Europe/Berlin'))
 | 
			
		||||
        end_date = end_date_utc.astimezone(pytz.timezone('Europe/Berlin'))
 | 
			
		||||
 | 
			
		||||
        
 | 
			
		||||
        price_list = []
 | 
			
		||||
 | 
			
		||||
        while start_date < end_date:
 | 
			
		||||
            date_str = start_date.strftime("%Y-%m-%d")
 | 
			
		||||
            daily_prices = self.get_price_for_date(date_str)
 | 
			
		||||
            
 | 
			
		||||
            if daily_prices.size ==24:
 | 
			
		||||
            if daily_prices.size == 24:
 | 
			
		||||
                price_list.extend(daily_prices)
 | 
			
		||||
            start_date += timedelta(days=1)
 | 
			
		||||
        
 | 
			
		||||
        if self.prediction_hours>0:
 | 
			
		||||
            price_list = repeat_to_shape(np.array(price_list),(self.prediction_hours,))        
 | 
			
		||||
        # If prediction hours are greater than 0, reshape the price list
 | 
			
		||||
        if self.prediction_hours > 0:
 | 
			
		||||
            price_list = repeat_to_shape(np.array(price_list), (self.prediction_hours,))
 | 
			
		||||
        
 | 
			
		||||
        return price_list
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user