Add test to PVForecast (#174)

* Add documentation to class_pv_forecast.py.

Added documentation. Beware mostly generated by ChatGPT.

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>

* Add CacheFileStore, datetime and logger utilities.

The `CacheFileStore` class is a singleton-based, thread-safe key-value store for managing
temporary file objects, allowing the creation, retrieval, and management of cache files.

The utility modules offer a flexible logging setup (`get_logger`) and utilities to handle
different date-time formats (`to_datetime`, `to_timestamp`) and timezone detection
(`to_timezone).

- Cache files are automatically valid for the the current date unless specified otherwise.
  This is to mimic the current behaviour used in several classes.
- The logger supports rotating log files to prevent excessive log file size.
- The `to_datetime` and `to_timestamp`functions support a wide variety of input types and formats.
  They provide the time conversion that is e.g. used in PVForecast.

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>

* Improve testability of PVForecast

Improvements for testing of PVForecast
- Use common utility functions to allow for general testing at one spot.
  - to_datetime
  - CacheFileStore
- Use logging instead of print to easily capture in testing.
- Add validation of the json schema for Akkudoktor PV forecast data.
- Allow to create an empty PVForecast instance as base instance for testing.
- Make process_data() complete for filling a PVForecast instance for testing.
- Normalize forecast datetime to timezone of system given in loaded data.
- Do not print report but provide report for test checks.
- Get rid of cache file path using the CachFileStore to automate cache file usage.
- Improved module documentation.

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>

* Add test for PVForecast and newly extracted utility modules.

- Add test for PVForecast
- Add test for CacheFileStore in the new cachefilestore module
- Add test for to_datetime, to_timestamp, to_timezone in the new
  datetimeutil module
- Add test for get_logger in the new logutil module

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>

---------

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
Co-authored-by: Normann <github@koldrack.com>
This commit is contained in:
Bobby Noelte
2024-11-10 23:49:10 +01:00
committed by GitHub
parent c4c9e59a57
commit b630625a4d
12 changed files with 2740 additions and 105 deletions

View File

@@ -0,0 +1,635 @@
"""cachefilestore.py.
This module provides a class for in-memory managing of cache files.
The `CacheFileStore` class is a singleton-based, thread-safe key-value store for managing
temporary file objects, allowing the creation, retrieval, and management of cache files.
Classes:
--------
- CacheFileStore: A thread-safe, singleton class for in-memory managing of file-like cache objects.
- CacheFileStoreMeta: Metaclass for enforcing the singleton behavior in `CacheFileStore`.
Example usage:
--------------
# CacheFileStore usage
>>> cache_store = CacheFileStore()
>>> cache_store.create('example_key')
>>> cache_file = cache_store.get('example_key')
>>> cache_file.write('Some data')
>>> cache_file.seek(0)
>>> print(cache_file.read()) # Output: 'Some data'
Notes:
------
- Cache files are automatically associated with the current date unless specified.
"""
import hashlib
import inspect
import os
import pickle
import tempfile
import threading
from datetime import date, datetime, time, timedelta
from typing import List, Optional, Union
from akkudoktoreos.datetimeutil import to_datetime, to_timedelta
from akkudoktoreos.logutil import get_logger
logger = get_logger(__file__)
class CacheFileStoreMeta(type):
"""A thread-safe implementation of CacheFileStore."""
_instances = {}
_lock: threading.Lock = threading.Lock()
"""Lock object to synchronize threads on first access to CacheFileStore."""
def __call__(cls):
"""Return CacheFileStore instance."""
with cls._lock:
if cls not in cls._instances:
instance = super().__call__()
cls._instances[cls] = instance
return cls._instances[cls]
class CacheFileStore(metaclass=CacheFileStoreMeta):
"""A key-value store that manages file-like tempfile objects to be used as cache files.
Cache files are associated with a date. If no date is specified, the cache files are
associated with the current date by default. The class provides methods to create
new cache files, retrieve existing ones, delete specific files, and clear all cache
entries.
CacheFileStore is a thread-safe singleton. Only one store instance will ever be created.
Attributes:
store (dict): A dictionary that holds the in-memory cache file objects
with their associated keys and dates.
Example usage:
>>> cache_store = CacheFileStore()
>>> cache_store.create('example_file')
>>> cache_file = cache_store.get('example_file')
>>> cache_file.write('Some data')
>>> cache_file.seek(0)
>>> print(cache_file.read()) # Output: 'Some data'
"""
def __init__(self):
"""Initializes the CacheFileStore instance.
This constructor sets up an empty key-value store (a dictionary) where each key
corresponds to a cache file that is associated with a given key and an optional date.
"""
self._store = {}
self._store_lock = threading.Lock()
def _generate_cache_file_key(
self, key: str, until_datetime: Union[datetime, None]
) -> (str, datetime):
"""Generates a unique cache file key based on the key and date.
The cache file key is a combination of the input key and the date (if provided),
hashed using SHA-256 to ensure uniqueness.
Args:
key (str): The key that identifies the cache file.
until_datetime (Union[datetime, date, str, int, float, None]): The datetime
until the cache file is valid. The default is the current date at maximum time
(23:59:59).
Returns:
A tuple of:
str: A hashed string that serves as the unique identifier for the cache file.
datetime: The datetime until the the cache file is valid.
"""
if until_datetime is None:
until_datetime = datetime.combine(date.today(), time.max)
key_datetime = to_datetime(until_datetime, as_string="UTC")
cache_key = hashlib.sha256(f"{key}{key_datetime}".encode("utf-8")).hexdigest()
return (f"{cache_key}", until_datetime)
def _get_file_path(self, file_obj):
"""Retrieve the file path from a file-like object.
Args:
file_obj: A file-like object (e.g., an instance of
NamedTemporaryFile, BytesIO, StringIO) from which to retrieve the
file path.
Returns:
str or None: The file path if available, or None if the file-like
object does not provide a file path.
"""
file_path = None
if hasattr(file_obj, "name"):
file_path = file_obj.name # Get the file path from the cache file object
return file_path
def _until_datetime_by_options(
self,
until_date: Union[datetime, date, str, int, float, None] = None,
until_datetime: Union[datetime, date, str, int, float, None] = None,
with_ttl: Union[timedelta, str, int, float, None] = None,
):
"""Get until_datetime from the given options."""
if until_datetime:
until_datetime = to_datetime(until_datetime)
elif with_ttl:
with_ttl = to_timedelta(with_ttl)
until_datetime = to_datetime(datetime.now() + with_ttl)
elif until_date:
until_datetime = to_datetime(to_datetime(until_date).date())
else:
# end of today
until_datetime = to_datetime(datetime.combine(date.today(), time.max))
return until_datetime
def _is_valid_cache_item(
self,
cache_item: (),
until_datetime: datetime = None,
at_datetime: datetime = None,
before_datetime: datetime = None,
):
cache_file_datetime = cache_item[1] # Extract the datetime associated with the cache item
if (
(until_datetime and until_datetime == cache_file_datetime)
or (at_datetime and at_datetime <= cache_file_datetime)
or (before_datetime and cache_file_datetime < before_datetime)
):
return True
return False
def _search(
self,
key: str,
until_datetime: Union[datetime, date, str, int, float] = None,
at_datetime: Union[datetime, date, str, int, float] = None,
before_datetime: Union[datetime, date, str, int, float] = None,
):
"""Searches for a cached item that matches the key and falls within the datetime range.
This method looks for a cache item with a key that matches the given `key`, and whose associated
datetime (`cache_file_datetime`) falls on or after the `at_datetime`. If both conditions are met,
it returns the cache item. Otherwise, it returns `None`.
Args:
key (str): The key to identify the cache item.
until_date (Union[datetime, date, str, int, float, None], optional): The date
until the cache file is valid. Time of day is set to maximum time (23:59:59).
at_datetime (Union[datetime, date, str, int, float], optional): The datetime to compare with
the cache item's datetime.
before_datetime (Union[datetime, date, str, int, float], optional): The datetime to compare
the cache item's datetime to be before.
Returns:
Optional[tuple]: Returns the cache_file_key, chache_file, cache_file_datetime if found,
otherwise returns `None`.
"""
# Convert input to datetime if they are not None
if until_datetime:
until_datetime = to_datetime(until_datetime)
if at_datetime:
at_datetime = to_datetime(at_datetime)
if before_datetime:
before_datetime = to_datetime(before_datetime)
for cache_file_key, cache_item in self._store.items():
# Check if the cache file datetime matches the given criteria
if self._is_valid_cache_item(
cache_item,
until_datetime=until_datetime,
at_datetime=at_datetime,
before_datetime=before_datetime,
):
# This cache file is within the given datetime range
# Extract the datetime associated with the cache item
cache_file_datetime = cache_item[1]
# Generate a cache file key based on the given key and the cache file datetime
generated_key, _until_dt = self._generate_cache_file_key(key, cache_file_datetime)
if generated_key == cache_file_key:
# The key matches, return the key and the cache item
return (cache_file_key, cache_item[0], cache_file_datetime)
# Return None if no matching cache item is found
return None
def create(
self,
key: str,
until_date: Union[datetime, date, str, int, float, None] = None,
until_datetime: Union[datetime, date, str, int, float, None] = None,
with_ttl: Union[timedelta, str, int, float, None] = None,
mode: str = "wb+",
delete: bool = False,
suffix: Optional[str] = None,
):
"""Creates a new file-like tempfile object associated with the given key.
If a cache file with the given key and valid timedate already exists, the existing file is
returned. Otherwise, a new tempfile object is created and stored in the key-value store.
Args:
key (str): The key to store the cache file under.
until_date (Union[datetime, date, str, int, float, None], optional): The date
until the cache file is valid. Time of day is set to maximum time (23:59:59).
until_datetime (Union[datetime, date, str, int, float, None], optional): The datetime
until the cache file is valid. Time of day is set to maximum time (23:59:59) if not
provided.
with_ttl (Union[timedelta, str, int, float, None], optional): The time to live that
the cache file is valid. Time starts now.
mode (str, optional): The mode in which the tempfile is opened
(e.g., 'w+', 'r+', 'wb+'). Defaults to 'wb+'.
delete (bool, optional): Whether to delete the file after it is closed.
Defaults to False (keeps the file).
suffix (str, optional): The suffix for the cache file (e.g., '.txt', '.log').
Defaults to None.
Returns:
file_obj: A file-like object representing the cache file.
Example:
>>> cache_file = cache_store.create('example_file', suffix='.txt')
>>> cache_file.write('Some cached data')
>>> cache_file.seek(0)
>>> print(cache_file.read()) # Output: 'Some cached data'
"""
until_datetime = self._until_datetime_by_options(
until_datetime=until_datetime, until_date=until_date, with_ttl=with_ttl
)
cache_file_key, until_date = self._generate_cache_file_key(key, until_datetime)
with self._store_lock: # Synchronize access to _store
if cache_file_key in self._store:
# File already available
cache_file_obj, until_datetime = self._store.get(cache_file_key)
else:
cache_file_obj = tempfile.NamedTemporaryFile(
mode=mode, delete=delete, suffix=suffix
)
self._store[cache_file_key] = (cache_file_obj, until_datetime)
cache_file_obj.seek(0)
return cache_file_obj
def set(
self,
key: str,
file_obj,
until_date: Union[datetime, date, str, int, float, None] = None,
until_datetime: Union[datetime, date, str, int, float, None] = None,
with_ttl: Union[timedelta, str, int, float, None] = None,
):
"""Stores a file-like object in the cache under the specified key and date.
This method allows you to manually set a file-like object into the cache with a specific key
and optional date.
Args:
key (str): The key to store the file object under.
file_obj: The file-like object.
until_date (Union[datetime, date, str, int, float, None], optional): The date
until the cache file is valid. Time of day is set to maximum time (23:59:59).
until_datetime (Union[datetime, date, str, int, float, None], optional): The datetime
until the cache file is valid. Time of day is set to maximum time (23:59:59) if not
provided.
with_ttl (Union[timedelta, str, int, float, None], optional): The time to live that
the cache file is valid. Time starts now.
Raises:
ValueError: If the key is already in store.
Example:
>>> cache_store.set('example_file', io.BytesIO(b'Some binary data'))
"""
until_datetime = self._until_datetime_by_options(
until_datetime=until_datetime, until_date=until_date, with_ttl=with_ttl
)
cache_file_key, until_date = self._generate_cache_file_key(key, until_datetime)
with self._store_lock: # Synchronize access to _store
if cache_file_key in self._store:
raise ValueError(f"Key already in store: `{key}`.")
self._store[cache_file_key] = (file_obj, until_date)
def get(
self,
key: str,
until_date: Union[datetime, date, str, int, float, None] = None,
until_datetime: Union[datetime, date, str, int, float, None] = None,
at_datetime: Union[datetime, date, str, int, float, None] = None,
before_datetime: Union[datetime, date, str, int, float, None] = None,
):
"""Retrieves the cache file associated with the given key and validity datetime.
If no cache file is found for the provided key and datetime, the method returns None.
The retrieved file is a file-like object that can be read from or written to.
Args:
key (str): The key to retrieve the cache file for.
until_date (Union[datetime, date, str, int, float, None], optional): The date
until the cache file is valid. Time of day is set to maximum time (23:59:59).
until_datetime (Union[datetime, date, str, int, float, None], optional): The datetime
until the cache file is valid. Time of day is set to maximum time (23:59:59) if not
provided.
at_datetime (Union[datetime, date, str, int, float, None], optional): The datetime the
cache file shall be valid at. Time of day is set to maximum time (23:59:59) if not
provided. Defaults to the current datetime if None is provided.
before_datetime (Union[datetime, date, str, int, float, None], optional): The datetime
to compare the cache files datetime to be before.
Returns:
file_obj: The file-like cache object, or None if no file is found.
Example:
>>> cache_file = cache_store.get('example_file')
>>> if cache_file:
>>> cache_file.seek(0)
>>> print(cache_file.read()) # Output: Cached data (if exists)
"""
if until_datetime or until_date:
until_datetime = self._until_datetime_by_options(
until_datetime=until_datetime, until_date=until_date
)
elif at_datetime:
at_datetime = to_datetime(at_datetime)
elif before_datetime:
before_datetime = to_datetime(before_datetime)
else:
at_datetime = to_datetime(datetime.now())
with self._store_lock: # Synchronize access to _store
search_item = self._search(key, until_datetime, at_datetime, before_datetime)
if search_item is None:
return None
return search_item[1]
def delete(
self,
key,
until_date: Union[datetime, date, str, int, float, None] = None,
until_datetime: Union[datetime, date, str, int, float, None] = None,
before_datetime: Union[datetime, date, str, int, float, None] = None,
):
"""Deletes the cache file associated with the given key and datetime.
This method removes the cache file from the store.
Args:
key (str): The key of the cache file to delete.
until_date (Union[datetime, date, str, int, float, None], optional): The date
until the cache file is valid. Time of day is set to maximum time (23:59:59).
until_datetime (Union[datetime, date, str, int, float, None], optional): The datetime
until the cache file is valid. Time of day is set to maximum time (23:59:59) if not
provided.
before_datetime (Union[datetime, date, str, int, float, None], optional): The datetime
the cache file shall become or be invalid at. Time of day is set to maximum time
(23:59:59) if not provided. Defaults to tommorow start of day.
"""
if until_datetime or until_date:
until_datetime = self._until_datetime_by_options(
until_datetime=until_datetime, until_date=until_date
)
elif before_datetime:
before_datetime = to_datetime(before_datetime)
else:
today = datetime.now().date() # Get today's date
tomorrow = today + timedelta(days=1) # Add one day to get tomorrow's date
before_datetime = to_datetime(datetime.combine(tomorrow, time.min))
with self._store_lock: # Synchronize access to _store
search_item = self._search(key, until_datetime, None, before_datetime)
if search_item:
cache_file_key = search_item[0]
cache_file = search_item[1]
cache_file_datetime = search_item[2]
file_path = self._get_file_path(cache_file)
if file_path is None:
logger.warning(
f"The cache file with key '{cache_file_key}' is an in memory "
f"file object. Will only delete store entry but not file."
)
self._store.pop(cache_file_key)
return
file_path = cache_file.name # Get the file path from the cache file object
del self._store[cache_file_key]
if os.path.exists(file_path):
try:
os.remove(file_path)
logger.debug(f"Deleted cache file: {file_path}")
except OSError as e:
logger.error(f"Error deleting cache file {file_path}: {e}")
def clear(
self, clear_all=False, before_datetime: Union[datetime, date, str, int, float, None] = None
):
"""Deletes all cache files or those expiring before `before_datetime`.
Args:
clear_all (bool, optional): Delete all cache files. Default is False.
before_datetime (Union[datetime, date, str, int, float, None], optional): The
threshold date. Cache files that are only valid before this date will be deleted.
The default datetime is beginning of today.
Raises:
OSError: If there's an error during file deletion.
"""
delete_keys = [] # List of keys to delete, prevent deleting when traversing the store
clear_timestamp = None
with self._store_lock: # Synchronize access to _store
for cache_file_key, cache_item in self._store.items():
cache_file = cache_item[0]
# Some weired logic to prevent calling to_datetime on clear_all.
# Clear_all may be set on __del__. At this time some info for to_datetime will
# not be available anymore.
clear_file = clear_all
if not clear_all:
if clear_timestamp is None:
before_datetime = to_datetime(before_datetime, to_maxtime=False)
# Convert the threshold date to a timestamp (seconds since epoch)
clear_timestamp = to_datetime(before_datetime).timestamp()
cache_file_timestamp = to_datetime(cache_item[1]).timestamp()
if cache_file_timestamp < clear_timestamp:
clear_file = True
if clear_file:
# We have to clear this cache file
delete_keys.append(cache_file_key)
file_path = self._get_file_path(cache_file)
if file_path is None:
# In memory file like object
logger.warning(
f"The cache file with key '{cache_file_key}' is an in memory "
f"file object. Will only delete store entry but not file."
)
continue
if not os.path.exists(file_path):
# Already deleted
logger.warning(f"The cache file '{file_path}' was already deleted.")
continue
# Finally remove the file
try:
os.remove(file_path)
logger.debug(f"Deleted cache file: {file_path}")
except OSError as e:
logger.error(f"Error deleting cache file {file_path}: {e}")
for delete_key in delete_keys:
del self._store[delete_key]
def cache_in_file(
ignore_params: List[str] = [],
until_date: Union[datetime, date, str, int, float, None] = None,
until_datetime: Union[datetime, date, str, int, float, None] = None,
with_ttl: Union[timedelta, str, int, float, None] = None,
mode: str = "wb+",
delete: bool = False,
suffix: Optional[str] = None,
):
"""Decorator to cache the output of a function into a temporary file.
The decorator caches function output to a cache file based on its inputs as key to identify the
cache file. Ignore parameters are used to avoid key generation on non-deterministic inputs, such
as time values. We can also ignore parameters that are slow to serialize/constant across runs,
such as large objects.
The cache file is created using `CacheFileStore` and stored with the generated key.
If the file exists in the cache and has not expired, it is returned instead of recomputing the
result.
The decorator scans the arguments of the decorated function for a 'until_date' or
'until_datetime` or `with_ttl` or `force_update` parameter. The value of this parameter will be
used instead of the one given in the decorator if available.
Content of cache files without a suffix are transparently pickled to save file space.
Args:
ignore_params (List[str], optional):
until_date (Union[datetime, date, str, int, float, None], optional): The date
until the cache file is valid. Time of day is set to maximum time (23:59:59).
until_datetime (Union[datetime, date, str, int, float, None], optional): The datetime
until the cache file is valid. Time of day is set to maximum time (23:59:59) if not
provided.
with_ttl (Union[timedelta, str, int, float, None], optional): The time to live that
the cache file is valid. Time starts now.
mode (str, optional): The mode in which the file will be opened. Defaults to 'wb+'.
delete (bool, optional): Whether the cache file will be deleted after being closed.
Defaults to False.
suffix (str, optional): A suffix for the cache file, such as an extension (e.g., '.txt').
Defaults to None.
Returns:
callable: A decorated function that caches its result in a file.
Example:
>>> @cache_in_file(suffix = '.txt')
>>> def expensive_computation(until_date = None):
>>> # Perform some expensive computation
>>> return 'Some large result'
>>>
>>> result = expensive_computation(until_date = date.today())
"""
def decorator(func):
nonlocal ignore_params, until_date, until_datetime, with_ttl, mode, delete, suffix
func_source_code = inspect.getsource(func)
def wrapper(*args, **kwargs):
nonlocal ignore_params, until_date, until_datetime, with_ttl, mode, delete, suffix
# Convert args to a dictionary based on the function's signature
args_names = func.__code__.co_varnames[: func.__code__.co_argcount]
args_dict = dict(zip(args_names, args))
# Search for caching parameters of function and remove
force_update = None
for param in ["force_update", "until_datetime", "with_ttl", "until_date"]:
if param in kwargs:
if param == "force_update":
force_update = kwargs[param]
kwargs.pop("force_update")
if param == "until_datetime":
until_datetime = kwargs[param]
until_date = None
with_ttl = None
elif param == "with_ttl":
until_datetime = None
until_date = None
with_ttl = kwargs[param]
elif param == "until_date":
until_datetime = None
until_date = kwargs[param]
with_ttl = None
kwargs.pop("until_datetime", None)
kwargs.pop("until_date", None)
kwargs.pop("with_ttl", None)
break
# Remove ignored params
kwargs_clone = kwargs.copy()
for param in ignore_params:
args_dict.pop(param, None)
kwargs_clone.pop(param, None)
# Create key based on argument names, argument values, and function source code
key = str(args_dict) + str(kwargs_clone) + str(func_source_code)
result = None
# Get cache file that is currently valid
cache_file = CacheFileStore().get(key)
if not force_update and cache_file is not None:
# cache file is available
try:
logger.debug("Used cache file for function: " + func.__name__)
cache_file.seek(0)
if "b" in mode:
result = pickle.load(cache_file)
else:
result = cache_file.read()
except Exception as e:
logger.info(f"Read failed: {e}")
# Fail gracefully - force creation
force_update = True
if force_update or cache_file is None:
# Otherwise, call the function and save its result to the cache
logger.debug("Created cache file for function: " + func.__name__)
cache_file = CacheFileStore().create(
key,
mode=mode,
delete=delete,
suffix=suffix,
until_datetime=until_datetime,
until_date=until_date,
with_ttl=with_ttl,
)
result = func(*args, **kwargs)
try:
# Assure we have an empty file
cache_file.truncate(0)
if "b" in mode:
pickle.dump(result, cache_file)
else:
cache_file.write(result)
except Exception as e:
logger.info(f"Write failed: {e}")
CacheFileStore().delete(key)
return result
return wrapper
return decorator

View File

@@ -1,25 +1,144 @@
import hashlib
"""PV Power Forecasting Module.
This module contains classes and methods to retrieve, process, and display photovoltaic (PV)
power forecast data, including temperature, windspeed, DC power, and AC power forecasts.
The module supports caching of forecast data to reduce redundant network requests and includes
functions to update AC power measurements and retrieve forecasts within a specified date range.
Classes
ForecastData: Represents a single forecast entry, including DC power, AC power,
temperature, and windspeed.
PVForecast: Retrieves, processes, and stores PV power forecast data, either from
a file or URL, with optional caching. It also provides methods to query
and update the forecast data, convert it to a DataFrame, and output key
metrics like AC power.
Example:
# Initialize PVForecast class with an URL
forecast = PVForecast(
prediction_hours=24,
url="https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405..."
)
# Update the AC power measurement for a specific date and time
forecast.update_ac_power_measurement(date_time=datetime.now(), ac_power_measurement=1000)
# Print the forecast data with DC and AC power details
forecast.print_ac_power_and_measurement()
# Get the forecast data as a Pandas DataFrame
df = forecast.get_forecast_dataframe()
print(df)
Attributes:
prediction_hours (int): Number of forecast hours. Defaults to 48.
"""
import json
import os
from datetime import datetime
from pprint import pprint
from datetime import date, datetime
from typing import List, Optional, Union
import numpy as np
import pandas as pd
import requests
from dateutil import parser
from pydantic import BaseModel, ValidationError
from akkudoktoreos.cachefilestore import cache_in_file
from akkudoktoreos.datetimeutil import to_datetime
from akkudoktoreos.logutil import get_logger
logger = get_logger(__name__, logging_level="DEBUG")
class AkkudoktorForecastHorizon(BaseModel):
altitude: int
azimuthFrom: int
azimuthTo: int
class AkkudoktorForecastMeta(BaseModel):
lat: float
lon: float
power: List[int]
azimuth: List[int]
tilt: List[int]
timezone: str
albedo: float
past_days: int
inverterEfficiency: float
powerInverter: List[int]
cellCoEff: float
range: bool
horizont: List[List[AkkudoktorForecastHorizon]]
horizontString: List[str]
class AkkudoktorForecastValue(BaseModel):
datetime: str
dcPower: float
power: float
sunTilt: float
sunAzimuth: float
temperature: float
relativehumidity_2m: float
windspeed_10m: float
class AkkudoktorForecast(BaseModel):
meta: AkkudoktorForecastMeta
values: List[List[AkkudoktorForecastValue]]
def validate_pv_forecast_data(data) -> str:
"""Validate PV forecast data."""
data_type = None
error_msg = ""
try:
AkkudoktorForecast.model_validate(data)
data_type = "Akkudoktor"
except ValidationError as e:
for error in e.errors():
field = " -> ".join(str(x) for x in error["loc"])
message = error["msg"]
error_type = error["type"]
error_msg += f"Field: {field}\nError: {message}\nType: {error_type}\n"
logger.debug(f"Validation did not succeed: {error_msg}")
return data_type
class ForecastData:
"""Stores forecast data for PV power and weather parameters.
Attributes:
date_time (datetime): The date and time of the forecast.
dc_power (float): The direct current (DC) power in watts.
ac_power (float): The alternating current (AC) power in watts.
windspeed_10m (float, optional): Wind speed at 10 meters altitude.
temperature (float, optional): Temperature in degrees Celsius.
ac_power_measurement (float, optional): Measured AC power.
"""
def __init__(
self,
date_time,
dc_power,
ac_power,
windspeed_10m=None,
temperature=None,
ac_power_measurement=None,
date_time: datetime,
dc_power: float,
ac_power: float,
windspeed_10m: Optional[float] = None,
temperature: Optional[float] = None,
ac_power_measurement: Optional[float] = None,
):
"""Initializes the ForecastData instance.
Args:
date_time (datetime): The date and time of the forecast.
dc_power (float): The DC power in watts.
ac_power (float): The AC power in watts.
windspeed_10m (float, optional): Wind speed at 10 meters altitude. Defaults to None.
temperature (float, optional): Temperature in degrees Celsius. Defaults to None.
ac_power_measurement (float, optional): Measured AC power. Defaults to None.
"""
self.date_time = date_time
self.dc_power = dc_power
self.ac_power = ac_power
@@ -27,139 +146,387 @@ class ForecastData:
self.temperature = temperature
self.ac_power_measurement = ac_power_measurement
def get_date_time(self):
def get_date_time(self) -> datetime:
"""Returns the forecast date and time.
Returns:
datetime: The date and time of the forecast.
"""
return self.date_time
def get_dc_power(self):
def get_dc_power(self) -> float:
"""Returns the DC power.
Returns:
float: DC power in watts.
"""
return self.dc_power
def ac_power_measurement(self):
def ac_power_measurement(self) -> float:
"""Returns the measured AC power.
It returns the measured AC power if available; otherwise None.
Returns:
float: Measured AC power in watts or None
"""
return self.ac_power_measurement
def get_ac_power(self):
def get_ac_power(self) -> float:
"""Returns the AC power.
If a measured value is available, it returns the measured AC power;
otherwise, it returns the forecasted AC power.
Returns:
float: AC power in watts.
"""
if self.ac_power_measurement is not None:
return self.ac_power_measurement
else:
return self.ac_power
def get_windspeed_10m(self):
def get_windspeed_10m(self) -> float:
"""Returns the wind speed at 10 meters altitude.
Returns:
float: Wind speed in meters per second.
"""
return self.windspeed_10m
def get_temperature(self):
def get_temperature(self) -> float:
"""Returns the temperature.
Returns:
float: Temperature in degrees Celsius.
"""
return self.temperature
class PVForecast:
def __init__(self, filepath=None, url=None, cache_dir="cache", prediction_hours=48):
"""Manages PV (photovoltaic) power forecasts and weather data.
Forecast data can be loaded from different sources (in-memory data, file, or URL).
Attributes:
meta (dict): Metadata related to the forecast (e.g., source, location).
forecast_data (list): A list of forecast data points of `ForecastData` objects.
prediction_hours (int): The number of hours into the future the forecast covers.
current_measurement (Optional[float]): The current AC power measurement in watts (or None if unavailable).
data (Optional[dict]): JSON data containing the forecast information (if provided).
filepath (Optional[str]): Filepath to the forecast data file (if provided).
url (Optional[str]): URL to retrieve forecast data from an API (if provided).
_forecast_start (Optional[date]): Start datetime for the forecast period.
tz_name (Optional[str]): The time zone name of the forecast data, if applicable.
"""
def __init__(
self,
data: Optional[dict] = None,
filepath: Optional[str] = None,
url: Optional[str] = None,
forecast_start: Union[datetime, date, str, int, float] = None,
prediction_hours: Optional[int] = None,
):
"""Initializes a `PVForecast` instance.
Forecast data can be loaded from in-memory `data`, a file specified by `filepath`, or
fetched from a remote `url`. If none are provided, an empty forecast will be initialized.
The `forecast_start` and `prediction_hours` parameters can be specified to control the
forecasting time period.
Use `process_data()` to fill an empty forecast later on.
Args:
data (Optional[dict]): In-memory JSON data containing forecast information. Defaults to None.
filepath (Optional[str]): Path to a local file containing forecast data in JSON format. Defaults to None.
url (Optional[str]): URL to an API providing forecast data. Defaults to None.
forecast_start (Union[datetime, date, str, int, float]): The start datetime for the forecast period.
Can be a `datetime`, `date`, `str` (formatted date), `int` (timestamp), `float`, or None. Defaults to None.
prediction_hours (Optional[int]): The number of hours to forecast into the future. Defaults to 48 hours.
Example:
forecast = PVForecast(data=my_forecast_data, forecast_start="2024-10-13", prediction_hours=72)
"""
self.meta = {}
self.forecast_data = []
self.cache_dir = cache_dir
self.prediction_hours = prediction_hours
self.current_measurement = None
self.data = data
self.filepath = filepath
self.url = url
if forecast_start:
self._forecast_start = to_datetime(forecast_start, to_naiv=True, to_maxtime=False)
else:
self._forecast_start = None
self.prediction_hours = prediction_hours
self._tz_name = None
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
if filepath:
self.load_data_from_file(filepath)
elif url:
self.load_data_with_caching(url)
if len(self.forecast_data) < self.prediction_hours:
raise ValueError(
f"Die Vorhersage muss mindestens {self.prediction_hours} Stunden umfassen, aber es wurden nur {len(self.forecast_data)} Stunden vorhergesagt."
if self.data or self.filepath or self.url:
self.process_data(
data=self.data,
filepath=self.filepath,
url=self.url,
forecast_start=self._forecast_start,
prediction_hours=self.prediction_hours,
)
def update_ac_power_measurement(self, date_time=None, ac_power_measurement=None) -> bool:
def update_ac_power_measurement(
self,
date_time: Union[datetime, date, str, int, float, None] = None,
ac_power_measurement=None,
) -> bool:
"""Updates the AC power measurement for a specific time.
Args:
date_time (datetime): The date and time of the measurement.
ac_power_measurement (float): Measured AC power.
Returns:
bool: True if a matching timestamp was found, False otherwise.
"""
found = False
input_date_hour = date_time.replace(minute=0, second=0, microsecond=0)
input_date_hour = to_datetime(
date_time, to_timezone=self._tz_name, to_naiv=True, to_maxtime=False
).replace(minute=0, second=0, microsecond=0)
for forecast in self.forecast_data:
forecast_date_hour = parser.parse(forecast.date_time).replace(
forecast_date_hour = to_datetime(forecast.date_time, to_naiv=True).replace(
minute=0, second=0, microsecond=0
)
if forecast_date_hour == input_date_hour:
forecast.ac_power_measurement = ac_power_measurement
found = True
logger.debug(
f"AC Power measurement updated at date {input_date_hour}: {ac_power_measurement}"
)
break
return found
def process_data(self, data):
self.meta = data.get("meta", {})
all_values = data.get("values", [])
def process_data(
self,
data: Optional[dict] = None,
filepath: Optional[str] = None,
url: Optional[str] = None,
forecast_start: Union[datetime, date, str, int, float] = None,
prediction_hours: Optional[int] = None,
) -> None:
"""Processes the forecast data from the provided source (in-memory `data`, `filepath`, or `url`).
for i in range(len(all_values[0])): # Annahme, dass alle Listen gleich lang sind
sum_dc_power = sum(values[i]["dcPower"] for values in all_values)
sum_ac_power = sum(values[i]["power"] for values in all_values)
If `forecast_start` and `prediction_hours` are provided, they define the forecast period.
# Zeige die ursprünglichen und berechneten Zeitstempel an
original_datetime = all_values[0][i].get("datetime")
# print(original_datetime," ",sum_dc_power," ",all_values[0][i]['dcPower'])
dt = datetime.strptime(original_datetime, "%Y-%m-%dT%H:%M:%S.%f%z")
dt = dt.replace(tzinfo=None)
# iso_datetime = parser.parse(original_datetime).isoformat() # Konvertiere zu ISO-Format
# print()
# Optional: 2 Stunden abziehen, um die Zeitanpassung zu testen
# adjusted_datetime = parser.parse(original_datetime) - timedelta(hours=2)
# print(f"Angepasste Zeitstempel: {adjusted_datetime.isoformat()}")
Args:
data (Optional[dict]): JSON data containing forecast values. Defaults to None.
filepath (Optional[str]): Path to a file with forecast data. Defaults to None.
url (Optional[str]): API URL to retrieve forecast data from. Defaults to None.
forecast_start (Union[datetime, date, str, int, float, None]): Start datetime of the forecast
period. Defaults to None. If given before it is cached.
prediction_hours (Optional[int]): The number of hours to forecast into the future.
Defaults to None. If given before it is cached.
forecast = ForecastData(
date_time=dt, # Verwende angepassten Zeitstempel
dc_power=sum_dc_power,
ac_power=sum_ac_power,
windspeed_10m=all_values[0][i].get("windspeed_10m"),
temperature=all_values[0][i].get("temperature"),
Returns:
None
Raises:
FileNotFoundError: If the specified `filepath` does not exist.
ValueError: If no valid data source or data is provided.
Example:
forecast = PVForecast(
url="https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405&"
"power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&"
"power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&"
"power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&"
"power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&"
"past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&"
"timezone=Europe%2FBerlin&hourly=relativehumidity_2m%2Cwindspeed_10m",
prediction_hours = 24,
)
"""
# Get input forecast data
if data:
pass
elif filepath:
data = self.load_data_from_file(filepath)
elif url:
data = self.load_data_from_url_with_caching(url)
elif self.data or self.filepath or self.url:
# Re-process according to previous arguments
if self.data:
data = self.data
elif self.filepath:
data = self.load_data_from_file(self.filepath)
elif self.url:
data = self.load_data_from_url_with_caching(self.url)
else:
raise NotImplementedError(
"Re-processing for None input is not implemented!"
) # Invalid path
else:
raise ValueError("No prediction input data available.")
# Validate input data to be of a known format
data_format = validate_pv_forecast_data(data)
if data_format != "Akkudoktor":
raise ValueError(f"Prediction input data are of unknown format: '{data_format}'.")
# Assure we have a forecast start datetime
if forecast_start is None:
forecast_start = self._forecast_start
if forecast_start is None:
forecast_start = datetime(1970, 1, 1)
# Assure we have prediction hours set
if prediction_hours is None:
prediction_hours = self.prediction_hours
if prediction_hours is None:
prediction_hours = 48
self.prediction_hours = prediction_hours
if data_format == "Akkudoktor":
# --------------------------------------------
# From here Akkudoktor PV forecast data format
# ---------------------------------------------
self.meta = data.get("meta")
all_values = data.get("values")
# timezone of the PV system
self._tz_name = self.meta.get("timezone", None)
if not self._tz_name:
raise NotImplementedError(
"Processing without PV system timezone info ist not implemented!"
)
# Assumption that all lists are the same length and are ordered chronologically
# in ascending order and have the same timestamps.
values_len = len(all_values[0])
if values_len < self.prediction_hours:
# Expect one value set per prediction hour
raise ValueError(
f"The forecast must cover at least {self.prediction_hours} hours, "
f"but only {values_len} data sets are given in forecast data."
)
# Convert forecast_start to timezone of PV system and make it a naiv datetime
self._forecast_start = to_datetime(
forecast_start, to_timezone=self._tz_name, to_naiv=True
)
logger.debug(f"Forecast start set to {self._forecast_start}")
for i in range(values_len):
# Zeige die ursprünglichen und berechneten Zeitstempel an
original_datetime = all_values[0][i].get("datetime")
# print(original_datetime," ",sum_dc_power," ",all_values[0][i]['dcPower'])
dt = to_datetime(original_datetime, to_timezone=self._tz_name, to_naiv=True)
# iso_datetime = parser.parse(original_datetime).isoformat() # Konvertiere zu ISO-Format
# print()
# Optional: 2 Stunden abziehen, um die Zeitanpassung zu testen
# adjusted_datetime = parser.parse(original_datetime) - timedelta(hours=2)
# print(f"Angepasste Zeitstempel: {adjusted_datetime.isoformat()}")
if dt < self._forecast_start:
# forecast data are too old
continue
sum_dc_power = sum(values[i]["dcPower"] for values in all_values)
sum_ac_power = sum(values[i]["power"] for values in all_values)
forecast = ForecastData(
date_time=dt, # Verwende angepassten Zeitstempel
dc_power=sum_dc_power,
ac_power=sum_ac_power,
windspeed_10m=all_values[0][i].get("windspeed_10m"),
temperature=all_values[0][i].get("temperature"),
)
self.forecast_data.append(forecast)
if len(self.forecast_data) < self.prediction_hours:
raise ValueError(
f"The forecast must cover at least {self.prediction_hours} hours, "
f"but only {len(self.forecast_data)} hours starting from {forecast_start} "
f"were predicted."
)
self.forecast_data.append(forecast)
# Adapt forecast start to actual value
self._forecast_start = self.forecast_data[0].get_date_time()
logger.debug(f"Forecast start adapted to {self._forecast_start}")
def load_data_from_file(self, filepath):
def load_data_from_file(self, filepath: str) -> dict:
"""Loads forecast data from a file.
Args:
filepath (str): Path to the file containing the forecast data.
Returns:
data (dict): JSON data containing forecast values.
"""
with open(filepath, "r") as file:
data = json.load(file)
self.process_data(data)
return data
def load_data_from_url(self, url):
def load_data_from_url(self, url: str) -> dict:
"""Loads forecast data from a URL.
Example:
https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405&power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&timezone=Europe%2FBerlin&hourly=relativehumidity_2m%2Cwindspeed_10m
Args:
url (str): URL of the API providing forecast data.
Returns:
data (dict): JSON data containing forecast values.
"""
response = requests.get(url)
if response.status_code == 200:
data = response.json()
pprint(data)
self.process_data(data)
else:
print(f"Failed to load data from {url}. Status Code: {response.status_code}")
self.load_data_from_url(url)
data = f"Failed to load data from `{url}`. Status Code: {response.status_code}"
logger.error(data)
return data
def load_data_with_caching(self, url):
date = datetime.now().strftime("%Y-%m-%d")
@cache_in_file() # use binary mode by default as we have python objects not text
def load_data_from_url_with_caching(self, url: str, until_date=None) -> dict:
"""Loads data from a URL or from the cache if available.
cache_file = os.path.join(self.cache_dir, self.generate_cache_filename(url, date))
if os.path.exists(cache_file):
with open(cache_file, "r") as file:
data = json.load(file)
print("Loading data from cache.")
Args:
url (str): URL of the API providing forecast data.
Returns:
data (dict): JSON data containing forecast values.
"""
response = requests.get(url)
if response.status_code == 200:
data = response.json()
logger.debug(f"Data fetched from URL `{url} and cached.")
else:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
with open(cache_file, "w") as file:
json.dump(data, file)
print("Data fetched from URL and cached.")
else:
print(f"Failed to load data from {url}. Status Code: {response.status_code}")
return
self.process_data(data)
def generate_cache_filename(self, url, date):
cache_key = hashlib.sha256(f"{url}{date}".encode("utf-8")).hexdigest()
return f"cache_{cache_key}.json"
data = f"Failed to load data from `{url}`. Status Code: {response.status_code}"
logger.error(data)
return data
def get_forecast_data(self):
"""Returns the forecast data.
Returns:
list: List of ForecastData objects.
"""
return self.forecast_data
def get_temperature_forecast_for_date(self, input_date_str):
input_date = datetime.strptime(input_date_str, "%Y-%m-%d")
def get_temperature_forecast_for_date(
self, input_date: Union[datetime, date, str, int, float, None]
):
"""Returns the temperature forecast for a specific date.
Args:
input_date (str): Date
Returns:
np.array: Array of temperature forecasts.
"""
if not self._tz_name:
raise NotImplementedError(
"Processing without PV system timezone info ist not implemented!"
)
input_date = to_datetime(input_date, to_timezone=self._tz_name, to_naiv=True).date()
daily_forecast_obj = [
data
for data in self.forecast_data
if parser.parse(data.get_date_time()).date() == input_date.date()
data for data in self.forecast_data if data.get_date_time().date() == input_date
]
daily_forecast = []
for d in daily_forecast_obj:
@@ -167,24 +534,58 @@ class PVForecast:
return np.array(daily_forecast)
def get_pv_forecast_for_date_range(self, start_date_str, end_date_str):
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()
def get_pv_forecast_for_date_range(
self,
start_date: Union[datetime, date, str, int, float, None],
end_date: Union[datetime, date, str, int, float, None],
):
"""Returns the PV forecast for a date range.
Args:
start_date_str (str): Start date in the format YYYY-MM-DD.
end_date_str (str): End date in the format YYYY-MM-DD.
Returns:
pd.DataFrame: DataFrame containing the forecast data.
"""
if not self._tz_name:
raise NotImplementedError(
"Processing without PV system timezone info ist not implemented!"
)
start_date = to_datetime(start_date, to_timezone=self._tz_name, to_naiv=True).date()
end_date = to_datetime(end_date, to_timezone=self._tz_name, to_naiv=True).date()
date_range_forecast = []
for data in self.forecast_data:
data_date = data.get_date_time().date() # parser.parse(data.get_date_time()).date()
data_date = data.get_date_time().date()
if start_date <= data_date <= end_date:
date_range_forecast.append(data)
print(data.get_date_time(), " ", data.get_ac_power())
# print(data.get_date_time(), " ", data.get_ac_power())
ac_power_forecast = np.array([data.get_ac_power() for data in date_range_forecast])
return np.array(ac_power_forecast)[: self.prediction_hours]
def get_temperature_for_date_range(self, start_date_str, end_date_str):
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
end_date = datetime.strptime(end_date_str, "%Y-%m-%d").date()
def get_temperature_for_date_range(
self,
start_date: Union[datetime, date, str, int, float, None],
end_date: Union[datetime, date, str, int, float, None],
):
"""Returns the temperature forecast for a given date range.
Args:
start_date (datetime | date | str | int | float | None): Start date.
end_date (datetime | date | str | int | float | None): End date.
Returns:
np.array: Array containing temperature forecasts for each hour within the date range.
"""
if not self._tz_name:
raise NotImplementedError(
"Processing without PV system timezone info ist not implemented!"
)
start_date = to_datetime(start_date, to_timezone=self._tz_name, to_naiv=True).date()
end_date = to_datetime(end_date, to_timezone=self._tz_name, to_naiv=True).date()
date_range_forecast = []
for data in self.forecast_data:
@@ -196,7 +597,12 @@ class PVForecast:
return np.array(temperature_forecast)[: self.prediction_hours]
def get_forecast_dataframe(self):
# Wandelt die Vorhersagedaten in ein Pandas DataFrame um
"""Converts the forecast data into a Pandas DataFrame.
Returns:
pd.DataFrame: A DataFrame containing the forecast data with columns for date/time,
DC power, AC power, windspeed, and temperature.
"""
data = [
{
"date_time": f.get_date_time(),
@@ -212,20 +618,54 @@ class PVForecast:
df = pd.DataFrame(data)
return df
def print_ac_power_and_measurement(self):
"""Druckt die DC-Leistung und den Messwert für jede Stunde."""
def get_forecast_start(self) -> datetime:
"""Return the start of the forecast data in local timezone.
Returns:
forecast_start (datetime | None): The start datetime or None if no data available.
"""
if not self._forecast_start:
return None
return to_datetime(
self._forecast_start, to_timezone=self._tz_name, to_naiv=True, to_maxtime=False
)
def report_ac_power_and_measurement(self) -> str:
"""Report DC/ AC power, and AC power measurement for each forecast hour.
For each forecast entry, the time, DC power, forecasted AC power, measured AC power
(if available), and the value returned by the `get_ac_power` method is provided.
Returns:
str: The report.
"""
rep = ""
for forecast in self.forecast_data:
date_time = forecast.date_time
print(
f"Zeit: {date_time}, DC: {forecast.dc_power}, AC: {forecast.ac_power}, Messwert: {forecast.ac_power_measurement}, AC GET: {forecast.get_ac_power()}"
rep += (
f"Zeit: {date_time}, DC: {forecast.dc_power}, AC: {forecast.ac_power}, "
f"Messwert: {forecast.ac_power_measurement}, AC GET: {forecast.get_ac_power()}"
"\n"
)
return rep
# Beispiel für die Verwendung der Klasse
# Example of how to use the PVForecast class
if __name__ == "__main__":
"""Main execution block to demonstrate the use of the PVForecast class.
Fetches PV power forecast data from a given URL, updates the AC power measurement
for the current date/time, and prints the DC and AC power information.
"""
forecast = PVForecast(
prediction_hours=24,
url="https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405&power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&timezone=Europe%2FBerlin&hourly=relativehumidity_2m%2Cwindspeed_10m",
url="https://api.akkudoktor.net/forecast?lat=52.52&lon=13.405&"
"power=5000&azimuth=-10&tilt=7&powerInvertor=10000&horizont=20,27,22,20&"
"power=4800&azimuth=-90&tilt=7&powerInvertor=10000&horizont=30,30,30,50&"
"power=1400&azimuth=-40&tilt=60&powerInvertor=2000&horizont=60,30,0,30&"
"power=1600&azimuth=5&tilt=45&powerInvertor=1400&horizont=45,25,30,60&"
"past_days=5&cellCoEff=-0.36&inverterEfficiency=0.8&albedo=0.25&timezone=Europe%2FBerlin&"
"hourly=relativehumidity_2m%2Cwindspeed_10m",
)
forecast.update_ac_power_measurement(date_time=datetime.now(), ac_power_measurement=1000)
forecast.print_ac_power_and_measurement()
print(forecast.report_ac_power_and_measurement())

View File

@@ -0,0 +1,285 @@
"""Utility functions for date-time conversion tasks.
Functions:
----------
- to_datetime: Converts various date or time inputs to a timezone-aware or naive `datetime`
object or formatted string.
- to_timedelta: Converts various time delta inputs to a `timedelta`object.
- to_timezone: Converts position latitude and longitude to a `timezone` object.
Example usage:
--------------
# Date-time conversion
>>> date_str = "2024-10-15"
>>> date_obj = to_datetime(date_str)
>>> print(date_obj) # Output: datetime object for '2024-10-15'
# Time delta conversion
>>> to_timedelta("2 days 5 hours")
# Timezone detection
>>> to_timezone(40.7128, -74.0060)
"""
import re
from datetime import date, datetime, time, timedelta, timezone
from typing import Optional, Union
from zoneinfo import ZoneInfo
from timezonefinder import TimezoneFinder
def to_datetime(
date_input: Union[datetime, date, str, int, float, None],
as_string: Optional[Union[str, bool]] = None,
to_timezone: Optional[Union[timezone, str]] = None,
to_naiv: Optional[bool] = None,
to_maxtime: Optional[bool] = None,
):
"""Converts a date input to a datetime object or a formatted string with timezone support.
Args:
date_input (Union[datetime, date, str, int, float, None]): The date input to convert.
Accepts a date string, a datetime object, a date object or a Unix timestamp.
as_string (Optional[Union[str, bool]]): If as_string is given (a format string or true)
return datetime as a string. Otherwise, return a datetime object, which is the default.
If true is given the string will returned in ISO format.
If a format string is given it may define the special formats "UTC" or "utc"
to return a string in ISO format normalized to UTC. Otherwise the format string must be
given compliant to Python's `datetime.strptime`.
to_timezone (Optional[Union[timezone, str]]):
Optional timezone object or name (e.g., 'UTC', 'Europe/Berlin').
If provided, the datetime will be converted to this timezone.
If not provided, the datetime will be converted to the local timezone.
to_naiv (Optional[bool]):
If True, remove timezone info from datetime after conversion.
If False, keep timezone info after conversion. The default.
to_maxtime (Optional[bool]):
If True, convert to maximum time if no time is given. The default.
If False, convert to minimum time if no time is given.
Example:
to_datetime("2027-12-12 24:13:12", as_string = "%Y-%m-%dT%H:%M:%S.%f%z")
Returns:
datetime or str: Converted date as a datetime object or a formatted string with timezone.
Raises:
ValueError: If the date input is not a valid type or format.
"""
if isinstance(date_input, datetime):
dt_object = date_input
elif isinstance(date_input, date):
# Convert date object to datetime object
if to_maxtime is None or to_maxtime:
dt_object = datetime.combine(date_input, time.max)
else:
dt_object = datetime.combine(date_input, time.max)
elif isinstance(date_input, (int, float)):
# Convert timestamp to datetime object
dt_object = datetime.fromtimestamp(date_input, tz=timezone.utc)
elif isinstance(date_input, str):
# Convert string to datetime object
try:
# Try ISO format
dt_object = datetime.fromisoformat(date_input)
except ValueError as e:
formats = [
"%Y-%m-%d", # Format: 2024-10-13
"%d/%m/%y", # Format: 13/10/24
"%d/%m/%Y", # Format: 13/10/2024
"%m-%d-%Y", # Format: 10-13-2024
"%Y.%m.%d", # Format: 2024.10.13
"%d %b %Y", # Format: 13 Oct 2024
"%d %B %Y", # Format: 13 October 2024
"%Y-%m-%d %H:%M:%S", # Format: 2024-10-13 15:30:00
"%Y-%m-%d %H:%M:%S%z", # Format with timezone: 2024-10-13 15:30:00+0000
"%Y-%m-%d %H:%M:%S%z:00", # Format with timezone: 2024-10-13 15:30:00+0000
"%Y-%m-%dT%H:%M:%S.%f%z", # Format with timezone: 2024-10-13T15:30:00.000+0000
]
for fmt in formats:
try:
dt_object = datetime.strptime(date_input, fmt)
break
except ValueError as e:
dt_object = None
continue
if dt_object is None:
raise ValueError(f"Date string {date_input} does not match any known formats.")
elif date_input is None:
if to_maxtime is None or to_maxtime:
dt_object = datetime.combine(date.today(), time.max)
else:
dt_object = datetime.combine(date.today(), time.min)
else:
raise ValueError(f"Unsupported date input type: {type(date_input)}")
# Get local timezone
local_date = datetime.now().astimezone()
local_tz_name = local_date.tzname()
local_utc_offset = local_date.utcoffset()
local_timezone = timezone(local_utc_offset, local_tz_name)
# Get target timezone
if to_timezone:
if isinstance(to_timezone, timezone):
target_timezone = to_timezone
elif isinstance(to_timezone, str):
try:
target_timezone = ZoneInfo(to_timezone)
except Exception as e:
raise ValueError(f"Invalid timezone: {to_timezone}") from e
else:
raise ValueError(f"Invalid timezone: {to_timezone}")
# Adjust/Add timezone information
if dt_object.tzinfo is None or dt_object.tzinfo.utcoffset(dt_object) is None:
# datetime object is naive (not timezone aware)
# Add timezone
if to_timezone is None:
# Add local timezone
dt_object = dt_object.replace(tzinfo=local_timezone)
else:
# Set to target timezone
dt_object = dt_object.replace(tzinfo=target_timezone)
elif to_timezone:
# Localize the datetime object to given target timezone
dt_object = dt_object.astimezone(target_timezone)
else:
# Localize the datetime object to local timezone
dt_object = dt_object.astimezone(local_timezone)
if to_naiv:
# Remove timezone info to make the datetime naiv
dt_object = dt_object.replace(tzinfo=None)
if as_string:
# Return formatted string as defined by as_string
if isinstance(as_string, bool):
return dt_object.isoformat()
elif as_string == "UTC" or as_string == "utc":
dt_object = dt_object.astimezone(timezone.utc)
return dt_object.isoformat()
else:
return dt_object.strftime(as_string)
else:
return dt_object
def to_timedelta(input_value):
"""Converts various input types into a timedelta object.
Args:
input_value (Union[timedelta, str, int, float, tuple, list]): Input to be converted
timedelta.
- str: A string like "2 days", "5 hours", "30 minutes", or a combination.
- int/float: Number representing seconds.
- tuple/list: A tuple or list in the format (days, hours, minutes, seconds).
Returns:
timedelta: A timedelta object corresponding to the input value.
Raises:
ValueError: If the input format is not supported.
Examples:
>>> to_timedelta("2 days 5 hours")
datetime.timedelta(days=2, seconds=18000)
>>> to_timedelta(3600)
datetime.timedelta(seconds=3600)
>>> to_timedelta((1, 2, 30, 15))
datetime.timedelta(days=1, seconds=90315)
"""
if isinstance(input_value, timedelta):
return input_value
if isinstance(input_value, (int, float)):
# Handle integers or floats as seconds
return timedelta(seconds=input_value)
elif isinstance(input_value, (tuple, list)):
# Handle tuple or list: (days, hours, minutes, seconds)
if len(input_value) == 4:
days, hours, minutes, seconds = input_value
return timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
else:
raise ValueError(f"Expected a tuple or list of length 4, got {len(input_value)}")
elif isinstance(input_value, str):
# Handle strings like "2 days 5 hours 30 minutes"
total_seconds = 0
time_units = {
"day": 86400, # 24 * 60 * 60
"hour": 3600,
"minute": 60,
"second": 1,
}
# Regular expression to match time components like '2 days', '5 hours', etc.
matches = re.findall(r"(\d+)\s*(days?|hours?|minutes?|seconds?)", input_value)
if not matches:
raise ValueError(f"Invalid time string format: {input_value}")
for value, unit in matches:
unit = unit.lower().rstrip("s") # Normalize unit
if unit in time_units:
total_seconds += int(value) * time_units[unit]
else:
raise ValueError(f"Unsupported time unit: {unit}")
return timedelta(seconds=total_seconds)
else:
raise ValueError(f"Unsupported input type: {type(input_value)}")
def to_timezone(lat: float, lon: float, as_string: Optional[bool] = None):
"""Determines the timezone for a given geographic location specified by latitude and longitude.
By default, it returns a `ZoneInfo` object representing the timezone.
If `as_string` is set to `True`, the function returns the timezone name as a string instead.
Args:
lat (float): Latitude of the location in decimal degrees. Must be between -90 and 90.
lon (float): Longitude of the location in decimal degrees. Must be between -180 and 180.
as_string (Optional[bool]):
- If `True`, returns the timezone as a string (e.g., "America/New_York").
- If `False` or not provided, returns a `ZoneInfo` object for the timezone.
Returns:
str or ZoneInfo:
- A timezone name as a string (e.g., "America/New_York") if `as_string` is `True`.
- A `ZoneInfo` timezone object if `as_string` is `False` or not provided.
Raises:
ValueError: If the latitude or longitude is out of range, or if no timezone is found for
the specified coordinates.
Example:
>>> to_timezone(40.7128, -74.0060, as_string=True)
'America/New_York'
>>> to_timezone(40.7128, -74.0060)
ZoneInfo(key='America/New_York')
"""
# Initialize the static variable only once
if not hasattr(to_timezone, "timezone_finder"):
to_timezone.timezone_finder = TimezoneFinder() # static variable
# Check and convert coordinates to timezone
try:
tz_name = to_timezone.timezone_finder.timezone_at(lat=lat, lng=lon)
if not tz_name:
raise ValueError(f"No timezone found for coordinates: latitude {lat}, longitude {lon}")
except Exception as e:
raise ValueError(f"Invalid location: latitude {lat}, longitude {lon}") from e
if as_string:
return tz_name
return ZoneInfo(tz_name)

View File

@@ -0,0 +1,95 @@
"""Utility functions for handling logging tasks.
Functions:
----------
- get_logger: Creates and configures a logger with console and optional rotating file logging.
Example usage:
--------------
# Logger setup
>>> logger = get_logger(__name__, log_file="app.log", logging_level="DEBUG")
>>> logger.info("Logging initialized.")
Notes:
------
- The logger supports rotating log files to prevent excessive log file size.
"""
import logging
import os
from logging.handlers import RotatingFileHandler
from typing import Optional
def get_logger(
name: str,
log_file: Optional[str] = None,
logging_level: Optional[str] = "INFO",
max_bytes: int = 5000000,
backup_count: int = 5,
) -> logging.Logger:
"""Creates and configures a logger with a given name.
The logger supports logging to both the console and an optional log file. File logging is
handled by a rotating file handler to prevent excessive log file size.
Args:
name (str): The name of the logger, typically `__name__` from the calling module.
log_file (Optional[str]): Path to the log file for file logging. If None, no file logging is done.
logging_level (Optional[str]): Logging level (e.g., "INFO", "DEBUG"). Defaults to "INFO".
max_bytes (int): Maximum size in bytes for log file before rotation. Defaults to 5 MB.
backup_count (int): Number of backup log files to keep. Defaults to 5.
Returns:
logging.Logger: Configured logger instance.
Example:
logger = get_logger(__name__, log_file="app.log", logging_level="DEBUG")
logger.info("Application started")
"""
# Create a logger with the specified name
logger = logging.getLogger(name)
logger.propagate = True
if logging_level == "DEBUG":
level = logging.DEBUG
elif logging_level == "INFO":
level = logging.INFO
elif logging_level == "WARNING":
level = logging.WARNING
elif logging_level == "ERROR":
level = logging.ERROR
else:
level = logging.DEBUG
logger.setLevel(level)
# The log message format
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# Prevent loggers from being added multiple times
# There may already be a logger from pytest
if not logger.handlers:
# Create a console handler with a standard output stream
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
console_handler.setFormatter(formatter)
# Add the console handler to the logger
logger.addHandler(console_handler)
if log_file and len(logger.handlers) < 2: # We assume a console logger to be the first logger
# If a log file path is specified, create a rotating file handler
# Ensure the log directory exists
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
# Create a rotating file handler
file_handler = RotatingFileHandler(log_file, maxBytes=max_bytes, backupCount=backup_count)
file_handler.setLevel(level)
file_handler.setFormatter(formatter)
# Add the file handler to the logger
logger.addHandler(file_handler)
return logger