feat(VRM forecast): add load and pv forecast by VRM API (#611)
Some checks failed
docker-build / platform-excludes (push) Has been cancelled
docker-build / build (push) Has been cancelled
docker-build / merge (push) Has been cancelled
pre-commit / pre-commit (push) Has been cancelled
Run Pytest on Pull Request / test (push) Has been cancelled

Add support for fetching forecasts from the VRM API by Victron Energy. Retrieve forecasts for PV generation and total load of a Victron system from the internet. 

Tests for the new modules have been added, and the documentation has been updated accordingly.

Signed-off-by: redmoon2711 <redmoon2711@gmx.de>
This commit is contained in:
redmoon2711
2025-07-19 08:55:16 +02:00
committed by GitHub
parent e5500b6857
commit 8e69caba73
11 changed files with 5458 additions and 4829 deletions

View File

@@ -8,6 +8,7 @@ from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.prediction.loadabc import LoadProvider
from akkudoktoreos.prediction.loadakkudoktor import LoadAkkudoktorCommonSettings
from akkudoktoreos.prediction.loadimport import LoadImportCommonSettings
from akkudoktoreos.prediction.loadvrm import LoadVrmCommonSettings
from akkudoktoreos.prediction.prediction import get_prediction
prediction_eos = get_prediction()
@@ -29,9 +30,9 @@ class LoadCommonSettings(SettingsBaseModel):
examples=["LoadAkkudoktor"],
)
provider_settings: Optional[Union[LoadAkkudoktorCommonSettings, LoadImportCommonSettings]] = (
Field(default=None, description="Provider settings", examples=[None])
)
provider_settings: Optional[
Union[LoadAkkudoktorCommonSettings, LoadVrmCommonSettings, LoadImportCommonSettings]
] = Field(default=None, description="Provider settings", examples=[None])
# Validators
@field_validator("provider", mode="after")

View File

@@ -0,0 +1,109 @@
"""Retrieves load forecast data from VRM API."""
from typing import Any, Optional, Union
import requests
from loguru import logger
from pendulum import DateTime
from pydantic import Field, ValidationError
from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.core.pydantic import PydanticBaseModel
from akkudoktoreos.prediction.loadabc import LoadProvider
from akkudoktoreos.utils.datetimeutil import to_datetime
class VrmForecastRecords(PydanticBaseModel):
vrm_consumption_fc: list[tuple[int, float]]
solar_yield_forecast: list[tuple[int, float]]
class VrmForecastResponse(PydanticBaseModel):
success: bool
records: VrmForecastRecords
totals: dict
class LoadVrmCommonSettings(SettingsBaseModel):
"""Common settings for VRM API."""
load_vrm_token: str = Field(
default="your-token", description="Token for Connecting VRM API", examples=["your-token"]
)
load_vrm_idsite: int = Field(default=12345, description="VRM-Installation-ID", examples=[12345])
class LoadVrm(LoadProvider):
"""Fetch Load forecast data from VRM API."""
@classmethod
def provider_id(cls) -> str:
return "LoadVrm"
@classmethod
def _validate_data(cls, json_str: Union[bytes, Any]) -> VrmForecastResponse:
"""Validate the VRM API load forecast response."""
try:
return VrmForecastResponse.model_validate_json(json_str)
except ValidationError as e:
error_msg = "\n".join(
f"Field: {' -> '.join(str(x) for x in err['loc'])}\n"
f"Error: {err['msg']}\nType: {err['type']}"
for err in e.errors()
)
logger.error(f"VRM-API schema validation failed:\n{error_msg}")
raise ValueError(error_msg)
def _request_forecast(self, start_ts: int, end_ts: int) -> VrmForecastResponse:
"""Fetch forecast data from Victron VRM API."""
base_url = "https://vrmapi.victronenergy.com/v2/installations"
installation_id = self.config.load.provider_settings.load_vrm_idsite
api_token = self.config.load.provider_settings.load_vrm_token
url = f"{base_url}/{installation_id}/stats?type=forecast&start={start_ts}&end={end_ts}&interval=hours"
headers = {"X-Authorization": f"Token {api_token}", "Content-Type": "application/json"}
logger.debug(f"Requesting VRM load forecast: {url}")
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
logger.error(f"Error during VRM API request: {e}")
raise RuntimeError("Failed to fetch load forecast from VRM API") from e
self.update_datetime = to_datetime(in_timezone=self.config.general.timezone)
return self._validate_data(response.content)
def _ts_to_datetime(self, timestamp: int) -> DateTime:
"""Convert UNIX ms timestamp to timezone-aware datetime."""
return to_datetime(timestamp / 1000, in_timezone=self.config.general.timezone)
def _update_data(self, force_update: Optional[bool] = False) -> None:
"""Fetch and store VRM load forecast as load_mean and related values."""
start_date = self.start_datetime.start_of("day")
end_date = self.start_datetime.add(hours=self.config.prediction.hours)
start_ts = int(start_date.timestamp())
end_ts = int(end_date.timestamp())
logger.info(f"Updating Load forecast from VRM: {start_date} to {end_date}")
vrm_forecast_data = self._request_forecast(start_ts, end_ts)
load_mean_data = []
for timestamp, value in vrm_forecast_data.records.vrm_consumption_fc:
date = self._ts_to_datetime(timestamp)
rounded_value = round(value, 2)
self.update_value(
date,
{"load_mean": rounded_value, "load_std": 0.0, "load_mean_adjusted": rounded_value},
)
load_mean_data.append((date, rounded_value))
logger.debug(f"Updated load_mean with {len(load_mean_data)} entries.")
self.update_datetime = to_datetime(in_timezone=self.config.general.timezone)
if __name__ == "__main__":
lv = LoadVrm()
lv._update_data()

View File

@@ -36,9 +36,11 @@ from akkudoktoreos.prediction.elecpriceenergycharts import ElecPriceEnergyCharts
from akkudoktoreos.prediction.elecpriceimport import ElecPriceImport
from akkudoktoreos.prediction.loadakkudoktor import LoadAkkudoktor
from akkudoktoreos.prediction.loadimport import LoadImport
from akkudoktoreos.prediction.loadvrm import LoadVrm
from akkudoktoreos.prediction.predictionabc import PredictionContainer
from akkudoktoreos.prediction.pvforecastakkudoktor import PVForecastAkkudoktor
from akkudoktoreos.prediction.pvforecastimport import PVForecastImport
from akkudoktoreos.prediction.pvforecastvrm import PVForecastVrm
from akkudoktoreos.prediction.weatherbrightsky import WeatherBrightSky
from akkudoktoreos.prediction.weatherclearoutside import WeatherClearOutside
from akkudoktoreos.prediction.weatherimport import WeatherImport
@@ -87,8 +89,10 @@ class Prediction(PredictionContainer):
ElecPriceEnergyCharts,
ElecPriceImport,
LoadAkkudoktor,
LoadVrm,
LoadImport,
PVForecastAkkudoktor,
PVForecastVrm,
PVForecastImport,
WeatherBrightSky,
WeatherClearOutside,
@@ -102,8 +106,10 @@ elecprice_akkudoktor = ElecPriceAkkudoktor()
elecprice_energy_charts = ElecPriceEnergyCharts()
elecprice_import = ElecPriceImport()
load_akkudoktor = LoadAkkudoktor()
load_vrm = LoadVrm()
load_import = LoadImport()
pvforecast_akkudoktor = PVForecastAkkudoktor()
pvforecast_vrm = PVForecastVrm()
pvforecast_import = PVForecastImport()
weather_brightsky = WeatherBrightSky()
weather_clearoutside = WeatherClearOutside()
@@ -120,8 +126,10 @@ def get_prediction() -> Prediction:
elecprice_energy_charts,
elecprice_import,
load_akkudoktor,
load_vrm,
load_import,
pvforecast_akkudoktor,
pvforecast_vrm,
pvforecast_import,
weather_brightsky,
weather_clearoutside,

View File

@@ -1,6 +1,6 @@
"""PV forecast module for PV power predictions."""
from typing import Any, List, Optional, Self
from typing import Any, List, Optional, Self, Union
from pydantic import Field, computed_field, field_validator, model_validator
@@ -8,6 +8,7 @@ from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.prediction.prediction import get_prediction
from akkudoktoreos.prediction.pvforecastabc import PVForecastProvider
from akkudoktoreos.prediction.pvforecastimport import PVForecastImportCommonSettings
from akkudoktoreos.prediction.pvforecastvrm import PVforecastVrmCommonSettings
from akkudoktoreos.utils.docs import get_model_structure_from_examples
prediction_eos = get_prediction()
@@ -134,9 +135,9 @@ class PVForecastCommonSettings(SettingsBaseModel):
examples=["PVForecastAkkudoktor"],
)
provider_settings: Optional[PVForecastImportCommonSettings] = Field(
default=None, description="Provider settings", examples=[None]
)
provider_settings: Optional[
Union[PVForecastImportCommonSettings, PVforecastVrmCommonSettings]
] = Field(default=None, description="Provider settings", examples=[None])
planes: Optional[list[PVForecastPlaneSetting]] = Field(
default=None,

View File

@@ -0,0 +1,110 @@
"""Retrieves pvforecast data from VRM API."""
from typing import Any, Optional, Union
import requests
from loguru import logger
from pendulum import DateTime
from pydantic import Field, ValidationError
from akkudoktoreos.config.configabc import SettingsBaseModel
from akkudoktoreos.core.pydantic import PydanticBaseModel
from akkudoktoreos.prediction.pvforecastabc import PVForecastProvider
from akkudoktoreos.utils.datetimeutil import to_datetime
class VrmForecastRecords(PydanticBaseModel):
vrm_consumption_fc: list[tuple[int, float]]
solar_yield_forecast: list[tuple[int, float]]
class VrmForecastResponse(PydanticBaseModel):
success: bool
records: VrmForecastRecords
totals: dict
class PVforecastVrmCommonSettings(SettingsBaseModel):
"""Common settings for VRM API."""
pvforecast_vrm_token: str = Field(
default="your-token", description="Token for Connecting VRM API", examples=["your-token"]
)
pvforecast_vrm_idsite: int = Field(
default=12345, description="VRM-Installation-ID", examples=[12345]
)
class PVForecastVrm(PVForecastProvider):
"""Fetch and process PV forecast data from VRM API."""
@classmethod
def provider_id(cls) -> str:
"""Return the unique identifier for the PV-Forecast-Provider."""
return "PVForecastVrm"
@classmethod
def _validate_data(cls, json_str: Union[bytes, Any]) -> VrmForecastResponse:
"""Validate the VRM forecast response data against the expected schema."""
try:
return VrmForecastResponse.model_validate_json(json_str)
except ValidationError as e:
error_msg = "\n".join(
f"Field: {' -> '.join(str(x) for x in err['loc'])}\n"
f"Error: {err['msg']}\nType: {err['type']}"
for err in e.errors()
)
logger.error(f"VRM-API schema change:\n{error_msg}")
raise ValueError(error_msg)
def _request_forecast(self, start_ts: int, end_ts: int) -> VrmForecastResponse:
"""Fetch forecast data from Victron VRM API."""
source = "https://vrmapi.victronenergy.com/v2/installations"
id_site = self.config.pvforecast.provider_settings.pvforecast_vrm_idsite
api_token = self.config.pvforecast.provider_settings.pvforecast_vrm_token
headers = {"X-Authorization": f"Token {api_token}", "Content-Type": "application/json"}
url = f"{source}/{id_site}/stats?type=forecast&start={start_ts}&end={end_ts}&interval=hours"
logger.debug(f"Requesting VRM forecast: {url}")
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
logger.error(f"Failed to fetch pvforecast: {e}")
raise RuntimeError("Failed to fetch pvforecast from VRM API") from e
self.update_datetime = to_datetime(in_timezone=self.config.general.timezone)
return self._validate_data(response.content)
def _ts_to_datetime(self, timestamp: int) -> DateTime:
"""Convert UNIX ms timestamp to timezone-aware datetime."""
return to_datetime(timestamp / 1000, in_timezone=self.config.general.timezone)
def _update_data(self, force_update: Optional[bool] = False) -> None:
"""Update forecast data in the PVForecastDataRecord format."""
start_date = self.start_datetime.start_of("day")
end_date = self.start_datetime.add(hours=self.config.prediction.hours)
start_ts = int(start_date.timestamp())
end_ts = int(end_date.timestamp())
logger.info(f"Updating PV forecast from VRM: {start_date} to {end_date}")
vrm_forecast_data = self._request_forecast(start_ts, end_ts)
pv_forecast = []
for timestamp, value in vrm_forecast_data.records.solar_yield_forecast:
date = self._ts_to_datetime(timestamp)
dc_power = round(value, 2)
ac_power = round(dc_power * 0.96, 2)
self.update_value(
date, {"pvforecast_dc_power": dc_power, "pvforecast_ac_power": ac_power}
)
pv_forecast.append((date, dc_power))
logger.debug(f"Updated pvforecast_dc_power with {len(pv_forecast)} entries.")
self.update_datetime = to_datetime(in_timezone=self.config.general.timezone)
# Example usage
if __name__ == "__main__":
pv = PVForecastVrm()
pv._update_data()