Improve EOSdash.

Make EOSdash use UI components from MonsterUI to ease further development.

- Add a first menu with some dummy pages and the configuration page.
- Make the configuration scrollable.
- Add markdown component that uses markdown-it-py (same as used by
  the myth-parser for documentation generation).
- Add bokeh (https://docs.bokeh.org/) component for charts
- Added several prediction charts to demo
- Add a footer that displays connection status with EOS server
- Add logo and favicon

Update EOS server:

- Move error message generation to extra module
- Use redirect instead of proxy

Signed-off-by: Bobby Noelte <b0661n0e17e@gmail.com>
This commit is contained in:
Bobby Noelte
2025-01-22 23:47:28 +01:00
parent 80bfe4d0f0
commit ab6a518b5f
34 changed files with 1802 additions and 351 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 112 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 724 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

View File

@@ -0,0 +1 @@
{"name":"","short_name":"","icons":[{"src":"/android-chrome-192x192.png","sizes":"192x192","type":"image/png"},{"src":"/android-chrome-512x512.png","sizes":"512x512","type":"image/png"}],"theme_color":"#ffffff","background_color":"#ffffff","display":"standalone"}

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

View File

@@ -0,0 +1,38 @@
# Module taken from https://github.com/koaning/fh-altair
# MIT license
from typing import Optional
from bokeh.embed import components
from bokeh.models import Plot
from monsterui.franken import H4, Card, NotStr, Script
BokehJS = [
Script(src="https://cdn.bokeh.org/bokeh/release/bokeh-3.6.3.min.js", crossorigin="anonymous"),
Script(
src="https://cdn.bokeh.org/bokeh/release/bokeh-widgets-3.6.3.min.js",
crossorigin="anonymous",
),
Script(
src="https://cdn.bokeh.org/bokeh/release/bokeh-tables-3.6.3.min.js", crossorigin="anonymous"
),
Script(
src="https://cdn.bokeh.org/bokeh/release/bokeh-gl-3.6.3.min.js", crossorigin="anonymous"
),
Script(
src="https://cdn.bokeh.org/bokeh/release/bokeh-mathjax-3.6.3.min.js",
crossorigin="anonymous",
),
]
def Bokeh(plot: Plot, header: Optional[str] = None) -> Card:
"""Converts an Bokeh plot to a FastHTML FT component."""
script, div = components(plot)
if header:
header = H4(header, cls="mt-2")
return Card(
NotStr(div),
NotStr(script),
header=header,
)

View File

@@ -0,0 +1,224 @@
from typing import Any, Optional, Union
from fasthtml.common import H1, Div, Li
# from mdit_py_plugins import plugin1, plugin2
from monsterui.foundations import stringify
from monsterui.franken import (
Button,
ButtonT,
Card,
Container,
ContainerT,
Details,
DivLAligned,
DivRAligned,
Grid,
Input,
P,
Summary,
TabContainer,
UkIcon,
)
scrollbar_viewport_styles = (
"scrollbar-width: none; -ms-overflow-style: none; -webkit-overflow-scrolling: touch;"
)
scrollbar_cls = "flex touch-none select-none transition-colors p-[1px]"
def ScrollArea(
*c: Any, cls: Optional[Union[str, tuple]] = None, orientation: str = "vertical", **kwargs: Any
) -> Div:
"""Creates a styled scroll area.
Args:
orientation (str): The orientation of the scroll area. Defaults to vertical.
"""
new_cls = "relative overflow-hidden"
if cls:
new_cls += f" {stringify(cls)}"
kwargs["cls"] = new_cls
content = Div(
Div(*c, style="min-width:100%;display:table;"),
style=f"overflow: {'hidden scroll' if orientation == 'vertical' else 'scroll'}; {scrollbar_viewport_styles}",
cls="w-full h-full rounded-[inherit]",
data_ref="viewport",
)
scrollbar = Div(
Div(cls="bg-border rounded-full hidden relative flex-1", data_ref="thumb"),
cls=f"{scrollbar_cls} flex-col h-2.5 w-full border-t border-t-transparent"
if orientation == "horizontal"
else f"{scrollbar_cls} w-2.5 h-full border-l border-l-transparent",
data_ref="scrollbar",
style=f"position: absolute;{'right:0; top:0;' if orientation == 'vertical' else 'bottom:0; left:0;'}",
)
return Div(
content,
scrollbar,
role="region",
tabindex="0",
data_orientation=orientation,
data_ref_scrollarea=True,
aria_label="Scrollable content",
**kwargs,
)
def ConfigCard(
config_name: str, config_type: str, read_only: str, value: str, default: str, description: str
) -> Card:
return Card(
Details(
Summary(
Grid(
Grid(
DivLAligned(
UkIcon(icon="play"),
P(config_name),
),
DivRAligned(
P(read_only),
),
),
Input(value=value) if read_only == "rw" else P(value),
),
# cls="flex cursor-pointer list-none items-center gap-4",
cls="list-none",
),
Grid(
P(description),
P(config_type),
),
Grid(
DivRAligned(
P("default") if read_only == "rw" else P(""),
),
P(default) if read_only == "rw" else P(""),
)
if read_only == "rw"
else None,
cls="space-y-4 gap-4",
),
cls="w-full",
)
def DashboardHeader(title: Optional[str]) -> Div:
"""Creates a styled header with a title.
Args:
title (Optional[str]): The title text for the header.
Returns:
Div: A styled `Div` element containing the header.
"""
if title is None:
return Div("", cls="header")
return Div(H1(title, cls="text-2xl font-bold mb-4"), cls="header")
def DashboardFooter(*c: Any, path: str) -> Card:
"""Creates a styled footer with the provided information.
The footer content is reloaded every 5 seconds from path.
Args:
path (str): Path to reload footer content from
Returns:
Card: A styled `Card` element containing the footer.
"""
return Card(
Container(*c, id="footer-content"),
hx_get=f"{path}",
hx_trigger="every 5s",
hx_target="#footer-content",
hx_swap="innerHTML",
)
def DashboardTrigger(*c: Any, cls: Optional[Union[str, tuple]] = None, **kwargs: Any) -> Button:
"""Creates a styled button for the dashboard trigger.
Args:
*c: Positional arguments to pass to the button.
cls (Optional[str]): Additional CSS classes for styling. Defaults to None.
**kwargs: Additional keyword arguments for the button.
Returns:
Button: A styled `Button` component.
"""
new_cls = f"{ButtonT.primary}"
if cls:
new_cls += f" {stringify(cls)}"
kwargs["cls"] = new_cls
return Button(*c, submit=False, **kwargs)
def DashboardTabs(dashboard_items: dict[str, str]) -> Card:
"""Creates a dashboard tab with dynamic dashboard items.
Args:
dashboard_items (dict[str, str]): A dictionary of dashboard items where keys are item names
and values are paths for navigation.
Returns:
Card: A styled `Card` component containing the dashboard tabs.
"""
dash_items = [
Li(
DashboardTrigger(
menu,
hx_get=f"{path}",
hx_target="#page-content",
hx_swap="innerHTML",
),
)
for menu, path in dashboard_items.items()
]
return Card(TabContainer(*dash_items, cls="gap-4"), alt=True)
def DashboardContent(content: Any) -> Card:
"""Creates a content section within a styled card.
Args:
content (Any): The content to display.
Returns:
Card: A styled `Card` element containing the content.
"""
return Card(ScrollArea(Container(content, id="page-content"), cls="h-[75vh] w-full rounded-md"))
def Page(
title: Optional[str],
dashboard_items: dict[str, str],
content: Any,
footer_content: Any,
footer_path: str,
) -> Div:
"""Generates a full-page layout with a header, dashboard items, content, and footer.
Args:
title (Optional[str]): The page title.
dashboard_items (dict[str, str]): A dictionary of dashboard items.
content (Any): The main content for the page.
footer_content (Any): Footer content.
footer_path (Any): Path to reload footer content from.
Returns:
Div: A `Div` element representing the entire page layout.
"""
return Container(
DashboardHeader(title),
DashboardTabs(dashboard_items),
DashboardContent(content),
DashboardFooter(footer_content, path=footer_path),
cls=("bg-background text-foreground w-screen p-4 space-y-4", ContainerT.xl),
)

View File

@@ -0,0 +1,275 @@
from typing import Any, Dict, List, Optional, Sequence, TypeVar, Union
import requests
from monsterui.franken import Div, DividerLine, P, Table, Tbody, Td, Th, Thead, Tr
from pydantic.fields import ComputedFieldInfo, FieldInfo
from pydantic_core import PydanticUndefined
from akkudoktoreos.config.config import get_config
from akkudoktoreos.core.logging import get_logger
from akkudoktoreos.core.pydantic import PydanticBaseModel
from akkudoktoreos.server.dash.components import ConfigCard
logger = get_logger(__name__)
config_eos = get_config()
T = TypeVar("T")
def get_nested_value(
dictionary: Union[Dict[str, Any], List[Any]],
keys: Sequence[Union[str, int]],
default: Optional[T] = None,
) -> Union[Any, T]:
"""Retrieve a nested value from a dictionary or list using a sequence of keys.
Args:
dictionary (Union[Dict[str, Any], List[Any]]): The nested dictionary or list to search.
keys (Sequence[Union[str, int]]): A sequence of keys or indices representing the path to the desired value.
default (Optional[T]): A value to return if the path is not found.
Returns:
Union[Any, T]: The value at the specified nested path, or the default value if not found.
Raises:
TypeError: If the input is not a dictionary or list, or if keys are not a sequence.
KeyError: If a key is not found in a dictionary.
IndexError: If an index is out of range in a list.
"""
if not isinstance(dictionary, (dict, list)):
raise TypeError("The first argument must be a dictionary or list")
if not isinstance(keys, Sequence):
raise TypeError("Keys must be provided as a sequence (e.g., list, tuple)")
if not keys:
return dictionary
try:
# Traverse the structure
current = dictionary
for key in keys:
if isinstance(current, dict) and isinstance(key, str):
current = current[key]
elif isinstance(current, list) and isinstance(key, int):
current = current[key]
else:
raise KeyError(f"Invalid key or index: {key}")
return current
except (KeyError, IndexError, TypeError):
return default
def get_default_value(field_info: Union[FieldInfo, ComputedFieldInfo], regular_field: bool) -> Any:
"""Retrieve the default value of a field.
Args:
field_info (Union[FieldInfo, ComputedFieldInfo]): The field metadata from Pydantic.
regular_field (bool): Indicates if the field is a regular field.
Returns:
Any: The default value of the field or "N/A" if not a regular field.
"""
default_value = ""
if regular_field:
if (val := field_info.default) is not PydanticUndefined:
default_value = val
else:
default_value = "N/A"
return default_value
def resolve_nested_types(field_type: Any, parent_types: list[str]) -> list[tuple[Any, list[str]]]:
"""Resolve nested types within a field and return their structure.
Args:
field_type (Any): The type of the field to resolve.
parent_types (List[str]): A list of parent type names.
Returns:
List[tuple[Any, List[str]]]: A list of tuples containing resolved types and their parent hierarchy.
"""
resolved_types: list[tuple[Any, list[str]]] = []
origin = getattr(field_type, "__origin__", field_type)
if origin is Union:
for arg in getattr(field_type, "__args__", []):
if arg is not type(None):
resolved_types.extend(resolve_nested_types(arg, parent_types))
else:
resolved_types.append((field_type, parent_types))
return resolved_types
def configuration(values: dict) -> list[dict]:
"""Generate configuration details based on provided values and model metadata.
Args:
values (dict): A dictionary containing the current configuration values.
Returns:
List[dict]: A sorted list of configuration details, each represented as a dictionary.
"""
configs = []
inner_types: set[type[PydanticBaseModel]] = set()
for field_name, field_info in list(config_eos.model_fields.items()) + list(
config_eos.model_computed_fields.items()
):
def extract_nested_models(
subfield_info: Union[ComputedFieldInfo, FieldInfo], parent_types: list[str]
) -> None:
regular_field = isinstance(subfield_info, FieldInfo)
subtype = subfield_info.annotation if regular_field else subfield_info.return_type
if subtype in inner_types:
return
nested_types = resolve_nested_types(subtype, [])
found_basic = False
for nested_type, nested_parent_types in nested_types:
if not isinstance(nested_type, type) or not issubclass(
nested_type, PydanticBaseModel
):
if found_basic:
continue
config = {}
config["name"] = ".".join(parent_types)
config["value"] = str(get_nested_value(values, parent_types, "<unknown>"))
config["default"] = str(get_default_value(subfield_info, regular_field))
config["description"] = (
subfield_info.description if subfield_info.description else ""
)
if isinstance(subfield_info, ComputedFieldInfo):
config["read-only"] = "ro"
type_description = str(subfield_info.return_type)
else:
config["read-only"] = "rw"
type_description = str(subfield_info.annotation)
config["type"] = (
type_description.replace("typing.", "")
.replace("pathlib.", "")
.replace("[", "[ ")
.replace("NoneType", "None")
)
configs.append(config)
found_basic = True
else:
new_parent_types = parent_types + nested_parent_types
inner_types.add(nested_type)
for nested_field_name, nested_field_info in list(
nested_type.model_fields.items()
) + list(nested_type.model_computed_fields.items()):
extract_nested_models(
nested_field_info,
new_parent_types + [nested_field_name],
)
extract_nested_models(field_info, [field_name])
return sorted(configs, key=lambda x: x["name"])
def get_configuration(eos_host: Optional[str], eos_port: Optional[Union[str, int]]) -> list[dict]:
"""Fetch and process configuration data from the specified EOS server.
Args:
eos_host (Optional[str]): The hostname of the server.
eos_port (Optional[Union[str, int]]): The port of the server.
Returns:
List[dict]: A list of processed configuration entries.
"""
if eos_host is None:
eos_host = config_eos.server.host
if eos_port is None:
eos_port = config_eos.server.port
server = f"http://{eos_host}:{eos_port}"
# Get current configuration from server
try:
result = requests.get(f"{server}/v1/config")
result.raise_for_status()
except requests.exceptions.HTTPError as e:
detail = result.json()["detail"]
warning_msg = f"Can not retrieve configuration from {server}: {e}, {detail}"
logger.warning(warning_msg)
return configuration({})
config = result.json()
return configuration(config)
def Configuration(eos_host: Optional[str], eos_port: Optional[Union[str, int]]) -> Div:
"""Create a visual representation of the configuration.
Args:
eos_host (Optional[str]): The hostname of the EOS server.
eos_port (Optional[Union[str, int]]): The port of the EOS server.
Returns:
Table: A `monsterui.franken.Table` component displaying configuration details.
"""
flds = "Name", "Type", "RO/RW", "Value", "Default", "Description"
rows = []
last_category = ""
for config in get_configuration(eos_host, eos_port):
category = config["name"].split(".")[0]
if category != last_category:
rows.append(P(category))
rows.append(DividerLine())
last_category = category
rows.append(
ConfigCard(
config["name"],
config["type"],
config["read-only"],
config["value"],
config["default"],
config["description"],
)
)
return Div(*rows, cls="space-y-4")
def ConfigurationOrg(eos_host: Optional[str], eos_port: Optional[Union[str, int]]) -> Table:
"""Create a visual representation of the configuration.
Args:
eos_host (Optional[str]): The hostname of the EOS server.
eos_port (Optional[Union[str, int]]): The port of the EOS server.
Returns:
Table: A `monsterui.franken.Table` component displaying configuration details.
"""
flds = "Name", "Type", "RO/RW", "Value", "Default", "Description"
rows = [
Tr(
Td(
config["name"],
cls="max-w-64 text-wrap break-all",
),
Td(
config["type"],
cls="max-w-48 text-wrap break-all",
),
Td(
config["read-only"],
cls="max-w-24 text-wrap break-all",
),
Td(
config["value"],
cls="max-w-md text-wrap break-all",
),
Td(config["default"], cls="max-w-48 text-wrap break-all"),
Td(
config["description"],
cls="max-w-prose text-wrap",
),
cls="",
)
for config in get_configuration(eos_host, eos_port)
]
head = Thead(*map(Th, flds), cls="text-left")
return Table(head, Tbody(*rows), cls="w-full uk-table uk-table-divider uk-table-striped")

View File

@@ -0,0 +1,86 @@
{
"elecprice": {
"charges_kwh": 0.21,
"provider": "ElecPriceAkkudoktor"
},
"general": {
"latitude": 52.5,
"longitude": 13.4
},
"prediction": {
"historic_hours": 48,
"hours": 48
},
"load": {
"provider": "LoadAkkudoktor",
"provider_settings": {
"loadakkudoktor_year_energy": 20000
}
},
"optimization": {
"hours": 48
},
"pvforecast": {
"planes": [
{
"peakpower": 5.0,
"surface_azimuth": -10,
"surface_tilt": 7,
"userhorizon": [
20,
27,
22,
20
],
"inverter_paco": 10000
},
{
"peakpower": 4.8,
"surface_azimuth": -90,
"surface_tilt": 7,
"userhorizon": [
30,
30,
30,
50
],
"inverter_paco": 10000
},
{
"peakpower": 1.4,
"surface_azimuth": -40,
"surface_tilt": 60,
"userhorizon": [
60,
30,
0,
30
],
"inverter_paco": 2000
},
{
"peakpower": 1.6,
"surface_azimuth": 5,
"surface_tilt": 45,
"userhorizon": [
45,
25,
30,
60
],
"inverter_paco": 1400
}
],
"provider": "PVForecastAkkudoktor"
},
"server": {
"startup_eosdash": true,
"host": "0.0.0.0",
"port": 8503,
"eosdash_host": "0.0.0.0",
"eosdash_port": 8504
},
"weather": {
"provider": "BrightSky"
}
}

View File

@@ -0,0 +1,217 @@
import json
from pathlib import Path
from typing import Union
import pandas as pd
import requests
from bokeh.models import ColumnDataSource, Range1d
from bokeh.plotting import figure
from monsterui.franken import FT, Grid, P
from akkudoktoreos.core.logging import get_logger
from akkudoktoreos.core.pydantic import PydanticDateTimeDataFrame
from akkudoktoreos.server.dash.bokeh import Bokeh
DIR_DEMODATA = Path(__file__).absolute().parent.joinpath("data")
FILE_DEMOCONFIG = DIR_DEMODATA.joinpath("democonfig.json")
if not FILE_DEMOCONFIG.exists():
raise ValueError(f"File does not exist: {FILE_DEMOCONFIG}")
logger = get_logger(__name__)
# bar width for 1 hour bars (time given in millseconds)
BAR_WIDTH_1HOUR = 1000 * 60 * 60
def DemoPVForecast(predictions: pd.DataFrame, config: dict) -> FT:
source = ColumnDataSource(predictions)
provider = config["pvforecast"]["provider"]
plot = figure(
x_axis_type="datetime",
title=f"PV Power Prediction ({provider})",
x_axis_label="Datetime",
y_axis_label="Power [W]",
sizing_mode="stretch_width",
height=400,
)
plot.vbar(
x="date_time",
top="pvforecast_ac_power",
source=source,
width=BAR_WIDTH_1HOUR * 0.8,
legend_label="AC Power",
color="lightblue",
)
return Bokeh(plot)
def DemoElectricityPriceForecast(predictions: pd.DataFrame, config: dict) -> FT:
source = ColumnDataSource(predictions)
provider = config["elecprice"]["provider"]
plot = figure(
x_axis_type="datetime",
y_range=Range1d(
predictions["elecprice_marketprice_kwh"].min() - 0.1,
predictions["elecprice_marketprice_kwh"].max() + 0.1,
),
title=f"Electricity Price Prediction ({provider})",
x_axis_label="Datetime",
y_axis_label="Price [€/kWh]",
sizing_mode="stretch_width",
height=400,
)
plot.vbar(
x="date_time",
top="elecprice_marketprice_kwh",
source=source,
width=BAR_WIDTH_1HOUR * 0.8,
legend_label="Market Price",
color="lightblue",
)
return Bokeh(plot)
def DemoWeatherTempAir(predictions: pd.DataFrame, config: dict) -> FT:
source = ColumnDataSource(predictions)
provider = config["weather"]["provider"]
plot = figure(
x_axis_type="datetime",
y_range=Range1d(
predictions["weather_temp_air"].min() - 1.0, predictions["weather_temp_air"].max() + 1.0
),
title=f"Air Temperature Prediction ({provider})",
x_axis_label="Datetime",
y_axis_label="Temperature [°C]",
sizing_mode="stretch_width",
height=400,
)
plot.line(
"date_time", "weather_temp_air", source=source, legend_label="Air Temperature", color="blue"
)
return Bokeh(plot)
def DemoWeatherIrradiance(predictions: pd.DataFrame, config: dict) -> FT:
source = ColumnDataSource(predictions)
provider = config["weather"]["provider"]
plot = figure(
x_axis_type="datetime",
title=f"Irradiance Prediction ({provider})",
x_axis_label="Datetime",
y_axis_label="Irradiance [W/m2]",
sizing_mode="stretch_width",
height=400,
)
plot.line(
"date_time",
"weather_ghi",
source=source,
legend_label="Global Horizontal Irradiance",
color="red",
)
plot.line(
"date_time",
"weather_dni",
source=source,
legend_label="Direct Normal Irradiance",
color="green",
)
plot.line(
"date_time",
"weather_dhi",
source=source,
legend_label="Diffuse Horizontal Irradiance",
color="blue",
)
return Bokeh(plot)
def Demo(eos_host: str, eos_port: Union[str, int]) -> str:
server = f"http://{eos_host}:{eos_port}"
# Get current configuration from server
try:
result = requests.get(f"{server}/v1/config")
result.raise_for_status()
except requests.exceptions.HTTPError as err:
detail = result.json()["detail"]
return P(
f"Can not retrieve configuration from {server}: {err}, {detail}",
cls="text-center",
)
config = result.json()
# Set demo configuration
with FILE_DEMOCONFIG.open("r", encoding="utf-8") as fd:
democonfig = json.load(fd)
try:
result = requests.put(f"{server}/v1/config", json=democonfig)
result.raise_for_status()
except requests.exceptions.HTTPError as err:
detail = result.json()["detail"]
# Try to reset to original config
requests.put(f"{server}/v1/config", json=config)
return P(
f"Can not set demo configuration on {server}: {err}, {detail}",
cls="text-center",
)
# Update all predictions
try:
result = requests.post(f"{server}/v1/prediction/update")
result.raise_for_status()
except requests.exceptions.HTTPError as err:
detail = result.json()["detail"]
# Try to reset to original config
requests.put(f"{server}/v1/config", json=config)
return P(
f"Can not update predictions on {server}: {err}, {detail}",
cls="text-center",
)
# Get Forecasts
try:
params = {
"keys": [
"pvforecast_ac_power",
"elecprice_marketprice_kwh",
"weather_temp_air",
"weather_ghi",
"weather_dni",
"weather_dhi",
],
}
result = requests.get(f"{server}/v1/prediction/dataframe", params=params)
result.raise_for_status()
predictions = PydanticDateTimeDataFrame(**result.json()).to_dataframe()
except requests.exceptions.HTTPError as err:
detail = result.json()["detail"]
return P(
f"Can not retrieve predictions from {server}: {err}, {detail}",
cls="text-center",
)
except Exception as err:
return P(
f"Can not retrieve predictions from {server}: {err}",
cls="text-center",
)
# Reset to original config
requests.put(f"{server}/v1/config", json=config)
return Grid(
DemoPVForecast(predictions, democonfig),
DemoElectricityPriceForecast(predictions, democonfig),
DemoWeatherTempAir(predictions, democonfig),
DemoWeatherIrradiance(predictions, democonfig),
cols_max=2,
)

View File

@@ -0,0 +1,92 @@
from typing import Optional, Union
import requests
from monsterui.daisy import Loading, LoadingT
from monsterui.franken import A, ButtonT, DivFullySpaced, P
from requests.exceptions import RequestException
from akkudoktoreos.config.config import get_config
from akkudoktoreos.core.logging import get_logger
logger = get_logger(__name__)
config_eos = get_config()
def get_alive(eos_host: str, eos_port: Union[str, int]) -> str:
"""Fetch alive information from the specified EOS server.
Args:
eos_host (str): The hostname of the server.
eos_port (Union[str, int]): The port of the server.
Returns:
str: Alive data.
"""
result = requests.Response()
try:
result = requests.get(f"http://{eos_host}:{eos_port}/v1/health")
if result.status_code == 200:
alive = result.json()["status"]
else:
alive = f"Server responded with status code: {result.status_code}"
except RequestException as e:
warning_msg = f"{e}"
logger.warning(warning_msg)
alive = warning_msg
return alive
def Footer(eos_host: Optional[str], eos_port: Optional[Union[str, int]]) -> str:
if eos_host is None:
eos_host = config_eos.server.host
if eos_port is None:
eos_port = config_eos.server.port
alive_icon = None
if eos_host is None or eos_port is None:
alive = "EOS server not given: {eos_host}:{eos_port}"
else:
alive = get_alive(eos_host, eos_port)
if alive == "alive":
alive_icon = Loading(
cls=(
LoadingT.ring,
LoadingT.sm,
),
)
alive = f"EOS {eos_host}:{eos_port}"
if alive_icon:
alive_cls = f"{ButtonT.primary} uk-link rounded-md"
else:
alive_cls = f"{ButtonT.secondary} uk-link rounded-md"
return DivFullySpaced(
P(
alive_icon,
A(alive, href=f"http://{eos_host}:{eos_port}/docs", target="_blank", cls=alive_cls),
),
P(
A(
"Documentation",
href="https://akkudoktor-eos.readthedocs.io/en/latest/",
target="_blank",
cls="uk-link",
),
),
P(
A(
"Issues",
href="https://github.com/Akkudoktor-EOS/EOS/issues",
target="_blank",
cls="uk-link",
),
),
P(
A(
"GitHub",
href="https://github.com/Akkudoktor-EOS/EOS/",
target="_blank",
cls="uk-link",
),
),
cls="uk-padding-remove-top uk-padding-remove-botton",
)

View File

@@ -0,0 +1,24 @@
from typing import Any
from fasthtml.common import Div
from akkudoktoreos.server.dash.markdown import Markdown
hello_md = """![Logo](/eosdash/assets/logo.png)
# Akkudoktor EOSdash
The dashboard for Akkudoktor EOS.
EOS provides a comprehensive solution for simulating and optimizing an energy system based
on renewable energy sources. With a focus on photovoltaic (PV) systems, battery storage (batteries),
load management (consumer requirements), heat pumps, electric vehicles, and consideration of
electricity price data, this system enables forecasting and optimization of energy flow and costs
over a specified period.
Documentation can be found at [Akkudoktor-EOS](https://akkudoktor-eos.readthedocs.io/en/latest/).
"""
def Hello(**kwargs: Any) -> Div:
return Markdown(hello_md, **kwargs)

View File

@@ -0,0 +1,136 @@
"""Markdown rendering with MonsterUI HTML classes."""
from typing import Any, List, Optional, Union
from fasthtml.common import FT, Div, NotStr
from markdown_it import MarkdownIt
from markdown_it.renderer import RendererHTML
from markdown_it.token import Token
from monsterui.foundations import stringify
def render_heading(
self: RendererHTML, tokens: List[Token], idx: int, options: dict, env: dict
) -> str:
"""Custom renderer for Markdown headings.
Adds specific CSS classes based on the heading level.
Parameters:
self: The renderer instance.
tokens: List of tokens to be rendered.
idx: Index of the current token.
options: Rendering options.
env: Environment sandbox for plugins.
Returns:
The rendered token as a string.
"""
if tokens[idx].markup == "#":
tokens[idx].attrSet("class", "uk-heading-divider uk-h1 uk-margin")
elif tokens[idx].markup == "##":
tokens[idx].attrSet("class", "uk-heading-divider uk-h2 uk-margin")
elif tokens[idx].markup == "###":
tokens[idx].attrSet("class", "uk-heading-divider uk-h3 uk-margin")
elif tokens[idx].markup == "####":
tokens[idx].attrSet("class", "uk-heading-divider uk-h4 uk-margin")
# pass token to default renderer.
return self.renderToken(tokens, idx, options, env)
def render_paragraph(
self: RendererHTML, tokens: List[Token], idx: int, options: dict, env: dict
) -> str:
"""Custom renderer for Markdown paragraphs.
Adds specific CSS classes.
Parameters:
self: The renderer instance.
tokens: List of tokens to be rendered.
idx: Index of the current token.
options: Rendering options.
env: Environment sandbox for plugins.
Returns:
The rendered token as a string.
"""
tokens[idx].attrSet("class", "uk-paragraph")
# pass token to default renderer.
return self.renderToken(tokens, idx, options, env)
def render_blockquote(
self: RendererHTML, tokens: List[Token], idx: int, options: dict, env: dict
) -> str:
"""Custom renderer for Markdown blockquotes.
Adds specific CSS classes.
Parameters:
self: The renderer instance.
tokens: List of tokens to be rendered.
idx: Index of the current token.
options: Rendering options.
env: Environment sandbox for plugins.
Returns:
The rendered token as a string.
"""
tokens[idx].attrSet("class", "uk-blockquote")
# pass token to default renderer.
return self.renderToken(tokens, idx, options, env)
def render_link(self: RendererHTML, tokens: List[Token], idx: int, options: dict, env: dict) -> str:
"""Custom renderer for Markdown links.
Adds the target attribute to open links in a new tab.
Parameters:
self: The renderer instance.
tokens: List of tokens to be rendered.
idx: Index of the current token.
options: Rendering options.
env: Environment sandbox for plugins.
Returns:
The rendered token as a string.
"""
tokens[idx].attrSet("class", "uk-link")
tokens[idx].attrSet("target", "_blank")
# pass token to default renderer.
return self.renderToken(tokens, idx, options, env)
markdown = MarkdownIt("gfm-like")
markdown.add_render_rule("heading_open", render_heading)
markdown.add_render_rule("paragraph_open", render_paragraph)
markdown.add_render_rule("blockquote_open", render_blockquote)
markdown.add_render_rule("link_open", render_link)
markdown_cls = "bg-background text-lg ring-offset-background placeholder:text-muted-foreground focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring focus-visible:ring-offset-2 disabled:cursor-not-allowed disabled:opacity-50"
def Markdown(*c: Any, cls: Optional[Union[str, tuple]] = None, **kwargs: Any) -> FT:
"""Component to render Markdown content with custom styling.
Parameters:
c: Markdown content to be rendered.
cls: Optional additional CSS classes to be added.
kwargs: Additional keyword arguments for the Div component.
Returns:
An FT object representing the rendered HTML content wrapped in a Div component.
"""
new_cls = markdown_cls
if cls:
new_cls += f" {stringify(cls)}"
kwargs["cls"] = new_cls
md_html = markdown.render(*c)
return Div(NotStr(md_html), **kwargs)