diff --git a/src/akkudoktoreos/optimization/genetic.py b/src/akkudoktoreos/optimization/genetic.py index f3135ee..85be8f3 100644 --- a/src/akkudoktoreos/optimization/genetic.py +++ b/src/akkudoktoreos/optimization/genetic.py @@ -26,7 +26,6 @@ from akkudoktoreos.prediction.self_consumption_probability import ( self_consumption_probability_interpolator, ) from akkudoktoreos.utils.utils import NumpyEncoder -from akkudoktoreos.visualize import visualisiere_ergebnisse class OptimizationParameters(BaseModel): @@ -596,20 +595,23 @@ class optimization_problem: ac_charge, dc_charge, discharge = self.decode_charge_discharge(discharge_hours_bin) # Visualize the results - visualisiere_ergebnisse( - parameters.ems.gesamtlast, - parameters.ems.pv_prognose_wh, - parameters.ems.strompreis_euro_pro_wh, - o, - ac_charge, - dc_charge, - discharge, - parameters.temperature_forecast, - start_hour, - einspeiseverguetung_euro_pro_wh, - config=self._config, - extra_data=extra_data, + from akkudoktoreos.utils.visualize import ( # import here to prevent circular import + prepare_visualize, ) + + visualize = { + "ac_charge": ac_charge.tolist(), + "dc_charge": dc_charge.tolist(), + "discharge_allowed": discharge.tolist(), + "eautocharge_hours_float": eautocharge_hours_float, + "result": o, + "eauto_obj": ems.ev, + "start_solution": start_solution, + "spuelstart": washingstart_int, + "extra_data": extra_data, + } + + prepare_visualize(parameters, visualize, config=self._config, start_hour=start_hour) return OptimizeResponse( **{ "ac_charge": ac_charge, diff --git a/src/akkudoktoreos/utils/visualize.py b/src/akkudoktoreos/utils/visualize.py new file mode 100644 index 0000000..c036a0d --- /dev/null +++ b/src/akkudoktoreos/utils/visualize.py @@ -0,0 +1,510 @@ +import os +from collections.abc import Sequence +from typing import Callable, Optional, Union + +import matplotlib.pyplot as plt +import numpy as np +from matplotlib.backends.backend_pdf import PdfPages + +from akkudoktoreos.config import AppConfig +from akkudoktoreos.optimization.genetic import OptimizationParameters + + +class VisualizationReport: + def __init__(self, config: AppConfig, filename: str = "visualization_results.pdf") -> None: + # Initialize the report with a given filename and empty groups + self.filename = filename + self.groups: list[list[Callable[[], None]]] = [] # Store groups of charts + self.current_group: list[ + Callable[[], None] + ] = [] # Store current group of charts being created + self.config = config + self.pdf_pages = PdfPages(filename, metadata={}) # Initialize PdfPages without metadata + + def add_chart_to_group(self, chart_func: Callable[[], None]) -> None: + """Add a chart function to the current group.""" + self.current_group.append(chart_func) + + def finalize_group(self) -> None: + """Finalize the current group and prepare for a new group.""" + if self.current_group: # Check if current group has charts + self.groups.append(self.current_group) # Add current group to groups + else: + print("Finalizing an empty group!") # Warn if group is empty + self.current_group = [] # Reset current group for new charts + + def _initialize_pdf(self) -> None: + """Create the output directory if it doesn't exist and initialize the PDF.""" + output_dir = self.config.working_dir / self.config.directories.output + + # If self.filename is already a valid path, use it; otherwise, combine it with output_dir + if os.path.isabs(self.filename): + output_file = self.filename + else: + output_dir.mkdir(parents=True, exist_ok=True) + output_file = os.path.join(output_dir, self.filename) + + self.pdf_pages = PdfPages( + output_file, metadata={} + ) # Re-initialize PdfPages without metadata + + def _save_group_to_pdf(self, group: list[Callable[[], None]]) -> None: + """Save a group of charts to the PDF.""" + fig_count = len(group) # Number of charts in the group + if fig_count == 0: + print("Attempted to save an empty group to PDF!") # Warn if group is empty + return # Prevent saving an empty group + + # Create a figure layout based on the number of charts + if fig_count == 3: + # Layout for three charts: 1 full-width on top, 2 below + fig = plt.figure(figsize=(14, 10)) # Set a larger figure size + ax1 = fig.add_subplot(2, 1, 1) # Full-width subplot + ax2 = fig.add_subplot(2, 2, 3) # Bottom left subplot + ax3 = fig.add_subplot(2, 2, 4) # Bottom right subplot + + # Store axes in a list for easy access + axs = [ax1, ax2, ax3] + else: + # Dynamic layout for any other number of charts + cols = 2 if fig_count > 1 else 1 # Determine number of columns + rows = (fig_count // 2) + (fig_count % 2) # Calculate required rows + fig, axs = plt.subplots(rows, cols, figsize=(14, 7 * rows)) # Create subplots + # If axs is a 2D array of axes, flatten it into a 1D list + # if isinstance(axs, np.ndarray): + axs = list(np.array(axs).reshape(-1)) + + # Draw each chart in the corresponding axes + for idx, chart_func in enumerate(group): + plt.sca(axs[idx]) # Set current axes + chart_func() # Call the chart function to draw + + # Hide any unused axes + for idx in range(fig_count, len(axs)): + axs[idx].set_visible(False) # Hide unused axes + self.pdf_pages.savefig(fig) # Save the figure to the PDF + + plt.close(fig) # Close the figure to free up memory + + def create_line_chart( + self, + start_hour: Optional[int], + y_list: list[Union[np.ndarray, list[float]]], + title: str, + xlabel: str, + ylabel: str, + labels: Optional[list[str]] = None, + markers: Optional[list[str]] = None, + line_styles: Optional[list[str]] = None, + ) -> None: + """Create a line chart and add it to the current group.""" + + def chart() -> None: + nonlocal start_hour # Allow modifying `x` within the nested function + if start_hour is None: + start_hour = 0 + first_element = y_list[0] + x: np.ndarray + # Case 1: y_list contains np.ndarray elements + if isinstance(first_element, np.ndarray): + x = np.arange( + start_hour, start_hour + len(first_element) + ) # Start at x and extend by ndarray length + # Case 2: y_list contains float elements (1D list) + elif isinstance(first_element, float): + x = np.arange( + start_hour, start_hour + len(y_list) + ) # Start at x and extend by list length + # Case 3: y_list is a nested list of floats + elif isinstance(first_element, list) and all( + isinstance(i, float) for i in first_element + ): + max_len = max(len(sublist) for sublist in y_list) + x = np.arange( + start_hour, start_hour + max_len + ) # Start at x and extend by max sublist length + else: + print(f"Unsupported y_list structure: {type(y_list)}, {y_list}") + raise TypeError( + "y_list elements must be np.ndarray, float, or a nested list of floats" + ) + + for idx, y_data in enumerate(y_list): + label = labels[idx] if labels else None # Chart label + marker = markers[idx] if markers and idx < len(markers) else "o" # Marker style + line_style = ( + line_styles[idx] if line_styles and idx < len(line_styles) else "-" + ) # Line style + plt.plot(x, y_data, label=label, marker=marker, linestyle=line_style) # Plot line + plt.title(title) # Set title + plt.xlabel(xlabel) # Set x-axis label + plt.ylabel(ylabel) # Set y-axis label + if labels: + plt.legend() # Show legend if labels are provided + plt.grid(True) # Show grid + plt.xlim(x[0] - 0.5, x[-1] + 0.5) # Adjust x-limits + + self.add_chart_to_group(chart) # Add chart function to current group + + def create_scatter_plot( + self, + x: np.ndarray, + y: np.ndarray, + title: str, + xlabel: str, + ylabel: str, + c: Optional[np.ndarray] = None, + ) -> None: + """Create a scatter plot and add it to the current group.""" + + def chart() -> None: + scatter = plt.scatter(x, y, c=c, cmap="viridis") # Create scatter plot + plt.title(title) # Set title + plt.xlabel(xlabel) # Set x-axis label + plt.ylabel(ylabel) # Set y-axis label + if c is not None: + plt.colorbar(scatter, label="Constraint") # Add colorbar if color data is provided + plt.grid(True) # Show grid + + self.add_chart_to_group(chart) # Add chart function to current group + + def create_bar_chart( + self, + labels: list[str], + values_list: Sequence[Union[int, float, list[Union[int, float]]]], + title: str, + ylabel: str, + xlabels: Optional[list[str]] = None, + label_names: Optional[list[str]] = None, + colors: Optional[list[str]] = None, + bar_width: float = 0.35, + bottom: Optional[int] = None, + ) -> None: + """Create a bar chart and add it to the current group.""" + + def chart() -> None: + num_groups = len(values_list) # Number of data groups + num_bars = len(labels) # Number of bars (categories) + # Calculate the positions for each bar group on the x-axis + x = np.arange(num_bars) # x positions for bars + offset = np.linspace( + -bar_width * (num_groups - 1) / 2, bar_width * (num_groups - 1) / 2, num_groups + ) # Bar offsets + for i, values in enumerate(values_list): + bottom_use = None + if bottom == i + 1: # Set bottom if specified + bottom_use = 1 + color = colors[i] if colors and i < len(colors) else None # Bar color + label_name = label_names[i] if label_names else None # Bar label + plt.bar( + x + offset[i], + values, + bar_width, + label=label_name, + color=color, + zorder=2, + alpha=0.6, + bottom=bottom_use, + ) # Create bar + if xlabels: + plt.xticks(x, labels) # Add custom labels to the x-axis + plt.title(title) # Set title + plt.ylabel(ylabel) # Set y-axis label + + if colors and label_names: + plt.legend() # Show legend if colors are provided + plt.grid(True, zorder=0) # Show grid in the background + plt.xlim(-0.5, len(labels) - 0.5) # Set x-axis limits + + self.add_chart_to_group(chart) # Add chart function to current group + + def create_violin_plot( + self, data_list: list[np.ndarray], labels: list[str], title: str, xlabel: str, ylabel: str + ) -> None: + """Create a violin plot and add it to the current group.""" + + def chart() -> None: + plt.violinplot(data_list, showmeans=True, showmedians=True) # Create violin plot + plt.xticks(np.arange(1, len(labels) + 1), labels) # Set x-ticks and labels + plt.title(title) # Set title + plt.xlabel(xlabel) # Set x-axis label + plt.ylabel(ylabel) # Set y-axis label + plt.grid(True) # Show grid + + self.add_chart_to_group(chart) # Add chart function to current group + + def generate_pdf(self) -> None: + """Generate the PDF report with all the added chart groups.""" + self._initialize_pdf() # Initialize the PDF + + for group in self.groups: + self._save_group_to_pdf(group) # Save each group to the PDF + + self.pdf_pages.close() # Close the PDF to finalize the report + + +def prepare_visualize( + parameters: OptimizationParameters, + results: dict, + config: AppConfig, + filename: str = "visualization_results_new.pdf", + start_hour: Optional[int] = 0, +) -> None: + report = VisualizationReport(config, filename) + # Group 1: + report.create_line_chart( + None, + [parameters.ems.gesamtlast], + title="Load Profile", + xlabel="Hours", + ylabel="Load (Wh)", + labels=["Total Load (Wh)"], + markers=["s"], + line_styles=["-"], + ) + report.create_line_chart( + None, + [parameters.ems.pv_prognose_wh], + title="PV Forecast", + xlabel="Hours", + ylabel="PV Generation (Wh)", + ) + + report.create_line_chart( + None, + [np.full(len(parameters.ems.gesamtlast), parameters.ems.einspeiseverguetung_euro_pro_wh)], + title="Remuneration", + xlabel="Hours", + ylabel="€/Wh", + ) + if parameters.temperature_forecast: + report.create_line_chart( + None, + [parameters.temperature_forecast], + title="Temperature Forecast", + xlabel="Hours", + ylabel="°C", + ) + report.finalize_group() + + # Group 2: + report.create_line_chart( + start_hour, + [ + results["result"]["Last_Wh_pro_Stunde"], + results["result"]["Home_appliance_wh_per_hour"], + results["result"]["Netzeinspeisung_Wh_pro_Stunde"], + results["result"]["Netzbezug_Wh_pro_Stunde"], + results["result"]["Verluste_Pro_Stunde"], + ], + title="Energy Flow per Hour", + xlabel="Hours", + ylabel="Energy (Wh)", + labels=[ + "Load (Wh)", + "Household Device (Wh)", + "Grid Feed-in (Wh)", + "Grid Consumption (Wh)", + "Losses (Wh)", + ], + markers=["o", "o", "x", "^", "^"], + line_styles=["-", "--", ":", "-.", "-"], + ) + report.finalize_group() + + # Group 3: + report.create_line_chart( + start_hour, + [results["result"]["akku_soc_pro_stunde"], results["result"]["EAuto_SoC_pro_Stunde"]], + title="Battery SOC", + xlabel="Hours", + ylabel="%", + labels=[ + "Battery SOC (%)", + "Electric Vehicle SOC (%)", + ], + markers=["o", "x"], + ) + report.create_line_chart( + None, + [parameters.ems.strompreis_euro_pro_wh], + title="Electricity Price", + xlabel="Hours", + ylabel="Price (€/Wh)", + ) + + report.create_bar_chart( + list(str(i) for i in range(len(results["ac_charge"]))), + [results["ac_charge"], results["dc_charge"], results["discharge_allowed"]], + title="AC/DC Charging and Discharge Overview", + ylabel="Relative Power (0-1) / Discharge (0 or 1)", + label_names=["AC Charging (relative)", "DC Charging (relative)", "Discharge Allowed"], + colors=["blue", "green", "red"], + bottom=3, + ) + report.finalize_group() + + # Group 4: + + report.create_line_chart( + start_hour, + [ + results["result"]["Kosten_Euro_pro_Stunde"], + results["result"]["Einnahmen_Euro_pro_Stunde"], + ], + title="Financial Balance per Hour", + xlabel="Hours", + ylabel="Euro", + labels=["Costs", "Revenue"], + ) + + extra_data = results["extra_data"] + report.create_scatter_plot( + extra_data["verluste"], + extra_data["bilanz"], + title="", + xlabel="losses", + ylabel="balance", + c=extra_data["nebenbedingung"], + ) + + # Example usage + values_list = [ + [ + results["result"]["Gesamtkosten_Euro"], + results["result"]["Gesamteinnahmen_Euro"], + results["result"]["Gesamtbilanz_Euro"], + ] + ] + labels = ["Total Costs [€]", "Total Revenue [€]", "Total Balance [€]"] + + report.create_bar_chart( + labels=labels, + values_list=values_list, + title="Financial Overview", + ylabel="Euro", + xlabels=["Total Costs [€]", "Total Revenue [€]", "Total Balance [€]"], + ) + + report.finalize_group() + + # Group 1: Scatter plot of losses vs balance with color-coded constraints + f1 = np.array(extra_data["verluste"]) # Losses + f2 = np.array(extra_data["bilanz"]) # Balance + n1 = np.array(extra_data["nebenbedingung"]) # Constraints + + # Filter data where 'nebenbedingung' < 0.01 + filtered_indices = n1 < 0.01 + filtered_losses = f1[filtered_indices] + filtered_balance = f2[filtered_indices] + + # Group 2: Violin plot for filtered losses + if filtered_losses.size > 0: + report.create_violin_plot( + data_list=[filtered_losses], # Data for filtered losses + labels=["Filtered Losses"], # Label for the violin plot + title="Violin Plot for Filtered Losses (Constraint < 0.01)", + xlabel="Losses", + ylabel="Values", + ) + else: + print("No data available for filtered losses violin plot (Constraint < 0.01)") + + # Group 3: Violin plot for filtered balance + if filtered_balance.size > 0: + report.create_violin_plot( + data_list=[filtered_balance], # Data for filtered balance + labels=["Filtered Balance"], # Label for the violin plot + title="Violin Plot for Filtered Balance (Constraint < 0.01)", + xlabel="Balance", + ylabel="Values", + ) + else: + print("No data available for filtered balance violin plot (Constraint < 0.01)") + + if filtered_balance.size > 0 or filtered_losses.size > 0: + report.finalize_group() + + # Generate the PDF report + report.generate_pdf() + + +if __name__ == "__main__": + # Example usage + from akkudoktoreos.config import get_working_dir, load_config + + working_dir = get_working_dir() + config = load_config(working_dir) + report = VisualizationReport(config=config, filename="example_report.pdf") + x_hours = 0 # Define x-axis start values (e.g., hours) + + # Group 1: Adding charts to be displayed on the same page + report.create_line_chart( + x_hours, + [np.array([10, 20, 30, 40])], + title="Load Profile", + xlabel="Hours", + ylabel="Load (Wh)", + ) + report.create_line_chart( + x_hours, + [np.array([5, 15, 25, 35])], + title="PV Forecast", + xlabel="Hours", + ylabel="PV Generation (Wh)", + ) + report.create_line_chart( + x_hours, + [np.array([5, 15, 25, 35])], + title="PV Forecast", + xlabel="Hours", + ylabel="PV Generation (Wh)", + ) + # Note: If there are only 3 charts per page, the first is as wide as the page + + report.finalize_group() # Finalize the first group of charts + + # Group 2: Adding more charts to be displayed on another page + report.create_line_chart( + x_hours, + [np.array([0.2, 0.25, 0.3, 0.35])], + title="Electricity Price", + xlabel="Hours", + ylabel="Price (€/Wh)", + ) + report.create_bar_chart( + ["Costs", "Revenue", "Balance"], + [[500.0], [600.0], [100.0]], + title="Financial Overview", + ylabel="Euro", + label_names=["AC Charging (relative)", "DC Charging (relative)", "Discharge Allowed"], + colors=["red", "green", "blue"], + ) + report.create_scatter_plot( + np.array([5, 6, 7, 8]), + np.array([100, 200, 150, 250]), + title="Scatter Plot", + xlabel="Losses", + ylabel="Balance", + c=np.array([0.1, 0.2, 0.3, 0.4]), + ) + report.finalize_group() # Finalize the second group of charts + + # Group 3: Adding a violin plot + data = [np.random.normal(0, std, 100) for std in range(1, 5)] # Example data for violin plot + report.create_violin_plot( + data, + labels=["Group 1", "Group 2", "Group 3", "Group 4"], + title="Violin Plot", + xlabel="Groups", + ylabel="Values", + ) + data = [np.random.normal(0, 1, 100)] # Example data for violin plot + report.create_violin_plot( + data, labels=["Group 1"], title="Violin Plot", xlabel="Group", ylabel="Values" + ) + + report.finalize_group() # Finalize the third group of charts + + # Generate the PDF report + report.generate_pdf() diff --git a/src/akkudoktoreos/visualize.py b/src/akkudoktoreos/visualize.py deleted file mode 100644 index 4046915..0000000 --- a/src/akkudoktoreos/visualize.py +++ /dev/null @@ -1,350 +0,0 @@ -# Set the backend for matplotlib to Agg -from typing import Any, Optional - -import matplotlib -import matplotlib.pyplot as plt -import numpy as np -from matplotlib.backends.backend_pdf import PdfPages - -from akkudoktoreos.config import AppConfig, SetupIncomplete - -matplotlib.use("Agg") - - -def visualisiere_ergebnisse( - gesamtlast: list[float], - pv_forecast: list[float], - strompreise: list[float], - ergebnisse: dict[str, Any], - ac: np.ndarray, # AC charging allowed - dc: np.ndarray, # DC charging allowed - discharge: np.ndarray, # Discharge allowed - temperature: Optional[list[float]], - start_hour: int, - einspeiseverguetung_euro_pro_wh: np.ndarray, - config: AppConfig, - filename: str = "visualization_results.pdf", - extra_data: Optional[dict[str, Any]] = None, -) -> None: - ##################### - # 24-hour visualization - ##################### - output_dir = config.working_dir / config.directories.output - if not output_dir.is_dir(): - raise SetupIncomplete(f"Output path does not exist: {output_dir}.") - - output_file = output_dir.joinpath(filename) - with PdfPages(output_file) as pdf: - # Load and PV generation - plt.figure(figsize=(14, 14)) - plt.subplot(3, 3, 1) - hours = np.arange(0, config.eos.prediction_hours) - - gesamtlast_array = np.array(gesamtlast) - # Plot individual loads - plt.plot(hours, gesamtlast_array, label="Load (Wh)", marker="o") - - # Calculate and plot total load - plt.plot( - hours, - gesamtlast_array, - label="Total Load (Wh)", - marker="o", - linewidth=2, - linestyle="--", - ) - plt.xlabel("Hour") - plt.ylabel("Load (Wh)") - plt.title("Load Profiles") - plt.grid(True) - plt.legend() - - # PV forecast - plt.subplot(3, 2, 3) - plt.plot(hours, pv_forecast, label="PV Generation (Wh)", marker="x") - plt.title("PV Forecast") - plt.xlabel("Hour of the Day") - plt.ylabel("Wh") - plt.legend() - plt.grid(True) - - # Feed-in remuneration - plt.subplot(3, 2, 4) - plt.plot( - hours, - einspeiseverguetung_euro_pro_wh, - label="Remuneration (€/Wh)", - marker="x", - ) - plt.title("Remuneration") - plt.xlabel("Hour of the Day") - plt.ylabel("€/Wh") - plt.legend() - plt.grid(True) - - # Temperature forecast - if temperature is not None: - plt.subplot(3, 2, 5) - plt.title("Temperature Forecast (°C)") - plt.plot(hours, temperature, label="Temperature (°C)", marker="x") - plt.xlabel("Hour of the Day") - plt.ylabel("°C") - plt.legend() - plt.grid(True) - - pdf.savefig() # Save the current figure state to the PDF - plt.close() # Close the current figure to free up memory - - ##################### - # Start hour visualization - ##################### - - plt.figure(figsize=(14, 10)) - hours = np.arange(start_hour, config.eos.prediction_hours) - - # Energy flow, grid feed-in, and grid consumption - plt.subplot(3, 2, 1) - # Plot with transparency (alpha) and different linestyles - plt.plot( - hours, - ergebnisse["Last_Wh_pro_Stunde"], - label="Load (Wh)", - marker="o", - linestyle="-", - alpha=0.8, - ) - plt.plot( - hours, - ergebnisse["Home_appliance_wh_per_hour"], - label="Household Device (Wh)", - marker="o", - linestyle="--", - alpha=0.8, - ) - plt.plot( - hours, - ergebnisse["Netzeinspeisung_Wh_pro_Stunde"], - label="Grid Feed-in (Wh)", - marker="x", - linestyle=":", - alpha=0.8, - ) - plt.plot( - hours, - ergebnisse["Netzbezug_Wh_pro_Stunde"], - label="Grid Consumption (Wh)", - marker="^", - linestyle="-.", - alpha=0.8, - ) - plt.plot( - hours, - ergebnisse["Verluste_Pro_Stunde"], - label="Losses (Wh)", - marker="^", - linestyle="-", - alpha=0.8, - ) - - # Title and labels - plt.title("Energy Flow per Hour") - plt.xlabel("Hour") - plt.ylabel("Energy (Wh)") - - # Show legend with a higher number of columns to avoid overlap - plt.legend(ncol=2) - - # Electricity prices - hours_p = np.arange(0, len(strompreise)) - plt.subplot(3, 2, 3) - plt.plot( - hours_p, - strompreise, - label="Electricity Price (€/Wh)", - color="purple", - marker="s", - ) - plt.title("Electricity Prices") - plt.xlabel("Hour of the Day") - plt.ylabel("Price (€/Wh)") - plt.legend() - plt.grid(True) - - # State of charge for batteries - plt.subplot(3, 2, 2) - plt.plot(hours, ergebnisse["akku_soc_pro_stunde"], label="PV Battery (%)", marker="x") - plt.plot( - hours, - ergebnisse["EAuto_SoC_pro_Stunde"], - label="E-Car Battery (%)", - marker="x", - ) - plt.legend(loc="upper left", bbox_to_anchor=(1, 1)) # Place legend outside the plot - plt.grid(True, which="both", axis="x") # Grid for every hour - - # Plot for AC, DC charging, and Discharge status using bar charts - ax1 = plt.subplot(3, 2, 5) - hours = np.arange(0, config.eos.prediction_hours) - # Plot AC charging as bars (relative values between 0 and 1) - plt.bar(hours, ac, width=0.4, label="AC Charging (relative)", color="blue", alpha=0.6) - - # Plot DC charging as bars (relative values between 0 and 1) - plt.bar( - hours + 0.4, dc, width=0.4, label="DC Charging (relative)", color="green", alpha=0.6 - ) - - # Plot Discharge as bars (0 or 1, binary values) - plt.bar( - hours, - discharge, - width=0.4, - label="Discharge Allowed", - color="red", - alpha=0.6, - bottom=np.maximum(ac, dc), - ) - - # Configure the plot - ax1.legend(loc="upper left") - ax1.set_xlim(0, config.eos.prediction_hours) - ax1.set_xlabel("Hour") - ax1.set_ylabel("Relative Power (0-1) / Discharge (0 or 1)") - ax1.set_title("AC/DC Charging and Discharge Overview") - ax1.grid(True) - - hours = np.arange(start_hour, config.eos.prediction_hours) - - pdf.savefig() # Save the current figure state to the PDF - plt.close() # Close the current figure to free up memory - - # Financial overview - fig, axs = plt.subplots(1, 2, figsize=(14, 10)) # Create a 1x2 grid of subplots - total_costs = ergebnisse["Gesamtkosten_Euro"] - total_revenue = ergebnisse["Gesamteinnahmen_Euro"] - total_balance = ergebnisse["Gesamtbilanz_Euro"] - losses = ergebnisse["Gesamt_Verluste"] - - # Costs and revenues per hour on the first axis (axs[0]) - costs = ergebnisse["Kosten_Euro_pro_Stunde"] - revenues = ergebnisse["Einnahmen_Euro_pro_Stunde"] - - # Plot costs - axs[0].plot( - hours, - costs, - label="Costs (Euro)", - marker="o", - color="red", - ) - # Annotate costs - for hour, value in enumerate(costs): - if value is None or np.isnan(value): - value = 0 - axs[0].annotate( - f"{value:.2f}", - (hour, value), - textcoords="offset points", - xytext=(0, 5), - ha="center", - fontsize=8, - color="red", - ) - - # Plot revenues - axs[0].plot( - hours, - revenues, - label="Revenue (Euro)", - marker="x", - color="green", - ) - # Annotate revenues - for hour, value in enumerate(revenues): - if value is None or np.isnan(value): - value = 0 - axs[0].annotate( - f"{value:.2f}", - (hour, value), - textcoords="offset points", - xytext=(0, 5), - ha="center", - fontsize=8, - color="green", - ) - - # Title and labels - axs[0].set_title("Financial Balance per Hour") - axs[0].set_xlabel("Hour") - axs[0].set_ylabel("Euro") - axs[0].legend() - axs[0].grid(True) - - # Summary of finances on the second axis (axs[1]) - labels = ["Total Costs [€]", "Total Revenue [€]", "Total Balance [€]"] - values = [total_costs, total_revenue, total_balance] - colors = ["red" if value > 0 else "green" for value in values] - axs[1].bar(labels, values, color=colors) - axs[1].set_title("Financial Overview") - axs[1].set_ylabel("Euro") - - # Second axis (ax2) for losses, shared with axs[1] - ax2 = axs[1].twinx() - ax2.bar("Total Losses", losses, color="blue") - ax2.set_ylabel("Losses [Wh]", color="blue") - ax2.tick_params(axis="y", labelcolor="blue") - - pdf.savefig() # Save the complete figure to the PDF - plt.close() # Close the figure - - # Additional data visualization if provided - if extra_data is not None: - plt.figure(figsize=(14, 10)) - plt.subplot(1, 2, 1) - f1 = np.array(extra_data["verluste"]) - f2 = np.array(extra_data["bilanz"]) - n1 = np.array(extra_data["nebenbedingung"]) - scatter = plt.scatter(f1, f2, c=n1, cmap="viridis") - - # Add color legend - plt.colorbar(scatter, label="Constraint") - - pdf.savefig() # Save the complete figure to the PDF - plt.close() # Close the figure - - plt.figure(figsize=(14, 10)) - filtered_losses = np.array( - [ - v - for v, n in zip(extra_data["verluste"], extra_data["nebenbedingung"]) - if n < 0.01 - ] - ) - filtered_balance = np.array( - [b for b, n in zip(extra_data["bilanz"], extra_data["nebenbedingung"]) if n < 0.01] - ) - if filtered_losses.size != 0: - best_loss = min(filtered_losses) - worst_loss = max(filtered_losses) - best_balance = min(filtered_balance) - worst_balance = max(filtered_balance) - - data = [filtered_losses, filtered_balance] - labels = ["Losses", "Balance"] - # Create plots - fig, axs = plt.subplots( - 1, 2, figsize=(10, 6), sharey=False - ) # Two subplots, separate y-axes - - # First violin plot for losses - axs[0].violinplot(data[0], positions=[1], showmeans=True, showmedians=True) - axs[0].set(xticks=[1], xticklabels=["Losses"]) - - # Second violin plot for balance - axs[1].violinplot(data[1], positions=[1], showmeans=True, showmedians=True) - axs[1].set(xticks=[1], xticklabels=["Balance"]) - - # Fine-tuning - plt.tight_layout() - - pdf.savefig() # Save the current figure state to the PDF - plt.close() # Close the figure diff --git a/tests/test_class_optimize.py b/tests/test_class_optimize.py index 4a9d808..041a3ac 100644 --- a/tests/test_class_optimize.py +++ b/tests/test_class_optimize.py @@ -11,6 +11,7 @@ from akkudoktoreos.optimization.genetic import ( OptimizeResponse, optimization_problem, ) +from akkudoktoreos.utils.visualize import prepare_visualize DIR_TESTDATA = Path(__file__).parent / "testdata" @@ -67,16 +68,12 @@ def test_optimize( visualize_filename = str((DIR_TESTDATA / f"new_{fn_out}").with_suffix(".pdf")) - def visualize_to_file(*args, **kwargs): - from akkudoktoreos.visualize import visualisiere_ergebnisse - - # Write test output pdf to file, so we can look at it manually - kwargs["filename"] = visualize_filename - return visualisiere_ergebnisse(*args, **kwargs) - with patch( - "akkudoktoreos.optimization.genetic.visualisiere_ergebnisse", side_effect=visualize_to_file - ) as visualisiere_ergebnisse_patch: + "akkudoktoreos.utils.visualize.prepare_visualize", + side_effect=lambda parameters, results, *args, **kwargs: prepare_visualize( + parameters, results, filename=visualize_filename, **kwargs + ), + ) as prepare_visualize_patch: # Call the optimization function ergebnis = opt_class.optimierung_ems( parameters=input_data, start_hour=start_hour, ngen=ngen @@ -95,4 +92,4 @@ def test_optimize( compare_dict(ergebnis.model_dump(), expected_result.model_dump()) # The function creates a visualization result PDF as a side-effect. - visualisiere_ergebnisse_patch.assert_called_once() + prepare_visualize_patch.assert_called_once() diff --git a/tests/test_visualize.py b/tests/test_visualize.py index 3cbd6d3..81f3845 100644 --- a/tests/test_visualize.py +++ b/tests/test_visualize.py @@ -1,32 +1,42 @@ -import json +import os +import subprocess from pathlib import Path -import pytest from matplotlib.testing.compare import compare_images -from akkudoktoreos.config import AppConfig -from akkudoktoreos.visualize import visualisiere_ergebnisse +from akkudoktoreos.config import get_working_dir, load_config + +filename = "example_report.pdf" + +working_dir = get_working_dir() +config = load_config(working_dir) +output_dir = config.working_dir / config.directories.output + +# If self.filename is already a valid path, use it; otherwise, combine it with output_dir +if os.path.isabs(filename): + output_file = filename +else: + output_dir.mkdir(parents=True, exist_ok=True) + output_file = os.path.join(output_dir, filename) DIR_TESTDATA = Path(__file__).parent / "testdata" -DIR_IMAGEDATA = DIR_TESTDATA / "images" +reference_file = DIR_TESTDATA / "test_example_report.pdf" -@pytest.mark.parametrize( - "fn_in, fn_out, fn_out_base", - [("visualize_input_1.json", "visualize_output_1.pdf", "visualize_base_output_1.pdf")], -) -def test_visualisiere_ergebnisse(fn_in, fn_out, fn_out_base, tmp_config: AppConfig): - with open(DIR_TESTDATA / fn_in, "r") as f: - input_data = json.load(f) - visualisiere_ergebnisse(config=tmp_config, **input_data) - output_file: Path = tmp_config.working_dir / tmp_config.directories.output / fn_out +def test_generate_pdf_main(): + # Delete the old generated file if it exists + if os.path.isfile(output_file): + os.remove(output_file) - assert output_file.is_file() - assert ( - compare_images( - str(output_file), - str(DIR_IMAGEDATA / fn_out_base), - 0, - ) - is None - ) + # Execute the __main__ block of visualize.py by running it as a script + script_path = Path(__file__).parent.parent / "src" / "akkudoktoreos" / "utils" / "visualize.py" + subprocess.run(["python", str(script_path)], check=True) + + # Check if the file exists + assert os.path.isfile(output_file) + + # Compare the generated file with the reference file + comparison = compare_images(str(reference_file), str(output_file), tol=0) + + # Assert that there are no differences + assert comparison is None, f"Images differ: {comparison}" diff --git a/tests/testdata/images/visualize_base_output_1.pdf b/tests/testdata/images/visualize_base_output_1.pdf deleted file mode 100644 index c607cc4..0000000 Binary files a/tests/testdata/images/visualize_base_output_1.pdf and /dev/null differ diff --git a/tests/testdata/test_example_report.pdf b/tests/testdata/test_example_report.pdf new file mode 100644 index 0000000..ef05327 Binary files /dev/null and b/tests/testdata/test_example_report.pdf differ