Update class_battery_soc_predictor.py

initial clean up. import sorted and unused removed, comments translated, commented debug functions removed
This commit is contained in:
NormannK 2024-09-20 12:11:39 +02:00 committed by Andreas
parent dfff675ca7
commit 0125dc219b

View File

@ -1,65 +1,55 @@
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import joblib, json import joblib
from sklearn.preprocessing import StandardScaler import json
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel, WhiteKernel, Matern,DotProduct from sklearn.gaussian_process.kernels import WhiteKernel, Matern, DotProduct
from sklearn.model_selection import train_test_split from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential, load_model from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense,Dropout from tensorflow.keras.layers import LSTM, Dense, Dropout, RepeatVector, TimeDistributed
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from tensorflow.keras.optimizers import Adam from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l1, l2, l1_l2 from tensorflow.keras.regularizers import l1, l2, l1_l2
from scipy.signal import savgol_filter
import numpy as np
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, RepeatVector, TimeDistributed
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, mean_squared_error from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt
class BatterySocPredictorGauss: class BatterySocPredictorGauss:
def __init__(self): def __init__(self):
# Initialisierung von Scaler und Gaußschem Prozessmodell # Initialize scaler and Gaussian process model
self.scaler = StandardScaler() self.scaler = StandardScaler()
kernel = WhiteKernel(1.0, (1e-7, 1e3)) + Matern(length_scale=(0.1,0.1,0.1), length_scale_bounds=((1e-7, 1e3),(1e-7, 1e3),(1e-7, 1e3))) + DotProduct() kernel = (WhiteKernel(1.0, (1e-7, 1e3)) +
Matern(length_scale=(0.1, 0.1, 0.1),
length_scale_bounds=((1e-7, 1e3), (1e-7, 1e3), (1e-7, 1e3))) +
DotProduct())
self.gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=10, alpha=1e-3, normalize_y=True) self.gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=10, alpha=1e-3, normalize_y=True)
def fit(self, X, y): def fit(self, X, y):
# Transformiere die Zielvariable # Transform the target variable
y_transformed = np.log(y / (101 - y)) y_transformed = np.log(y / (101 - y))
# Skaliere die Features # Scale the features
X_scaled = self.scaler.fit_transform(X) X_scaled = self.scaler.fit_transform(X)
# Trainiere das Modell # Train the model
self.gp.fit(X_scaled, y_transformed) self.gp.fit(X_scaled, y_transformed)
def predict(self, X): def predict(self, X):
# Skaliere die Features # Scale the features
X_scaled = self.scaler.transform(X) X_scaled = self.scaler.transform(X)
# Vorhersagen und Unsicherheiten # Predictions and uncertainties
y_pred_transformed, sigma_transformed = self.gp.predict(X_scaled, return_std=True) y_pred_transformed, sigma_transformed = self.gp.predict(X_scaled, return_std=True)
# Rücktransformieren der Vorhersagen # Reverse transform the predictions
y_pred = 101 / (1 + np.exp(-y_pred_transformed)) y_pred = 101 / (1 + np.exp(-y_pred_transformed))
# Rücktransformieren der Unsicherheiten # Reverse transform the uncertainties
sigmoid_y_pred = 1 / (1 + np.exp(-y_pred_transformed)) sigmoid_y_pred = 1 / (1 + np.exp(-y_pred_transformed))
sigma = sigma_transformed * 101 * sigmoid_y_pred * (1 - sigmoid_y_pred) sigma = sigma_transformed * 101 * sigmoid_y_pred * (1 - sigmoid_y_pred)
return y_pred return y_pred
def save_model(self, file_path): def save_model(self, file_path):
# Speichere das gesamte Modell-Objekt # Save the entire model object
joblib.dump(self, file_path) joblib.dump(self, file_path)
@staticmethod @staticmethod
def load_model(file_path): def load_model(file_path):
# Lade das Modell-Objekt # Load the model object
return joblib.load(file_path) return joblib.load(file_path)
@ -67,8 +57,8 @@ class BatterySoCPredictorLSTM:
def __init__(self, model_path=None, scaler_path=None, gauss=None): def __init__(self, model_path=None, scaler_path=None, gauss=None):
self.scaler = MinMaxScaler(feature_range=(0, 1)) self.scaler = MinMaxScaler(feature_range=(0, 1))
self.target_scaler = MinMaxScaler(feature_range=(0, 1)) self.target_scaler = MinMaxScaler(feature_range=(0, 1))
self.seq_length = 5 # Anzahl der Zeitschritte in der Eingabesequenz self.seq_length = 5 # Number of time steps in input sequence
self.n_future_steps = 1 # Anzahl der zukünftigen Schritte, die vorhergesagt werden sollen self.n_future_steps = 1 # Number of future steps to predict
self.gauss_model = BatterySocPredictorGauss.load_model(gauss) self.gauss_model = BatterySocPredictorGauss.load_model(gauss)
if model_path: if model_path:
@ -80,19 +70,18 @@ class BatterySoCPredictorLSTM:
self.load_scalers(scaler_path) self.load_scalers(scaler_path)
def _build_model(self): def _build_model(self):
regu = 0.00 # Regularisierungsrate regu = 0.00 # Regularization rate
model = Sequential() model = Sequential()
model.add(LSTM(20, activation='relu', return_sequences=True, input_shape=(self.seq_length, 4), kernel_regularizer=l2(regu))) model.add(LSTM(20, activation='relu', return_sequences=True, input_shape=(self.seq_length, 4), kernel_regularizer=l2(regu)))
model.add(LSTM(20, activation='relu', return_sequences=False, kernel_regularizer=l2(regu))) model.add(LSTM(20, activation='relu', return_sequences=False, kernel_regularizer=l2(regu)))
model.add(RepeatVector(self.n_future_steps)) model.add(RepeatVector(self.n_future_steps))
model.add(LSTM(20, activation='relu', return_sequences=True, kernel_regularizer=l2(regu))) model.add(LSTM(20, activation='relu', return_sequences=True, kernel_regularizer=l2(regu)))
model.add(TimeDistributed(Dense(1, kernel_regularizer=l2(regu)))) # TimeDistributed Layer für Multi-Step Output model.add(TimeDistributed(Dense(1, kernel_regularizer=l2(regu)))) # TimeDistributed layer for multi-step output
optimizer = Adam(learning_rate=0.0005) optimizer = Adam(learning_rate=0.0005)
model.compile(optimizer=optimizer, loss='mae') model.compile(optimizer=optimizer, loss='mae')
return model return model
def fit(self, data_path, epochs=100, batch_size=50, validation_split=0.1): def fit(self, data_path, epochs=100, batch_size=50, validation_split=0.1):
data = pd.read_csv(data_path) data = pd.read_csv(data_path)
data['Time'] = pd.to_datetime(data['Time'], unit='ms') data['Time'] = pd.to_datetime(data['Time'], unit='ms')
@ -100,12 +89,9 @@ class BatterySoCPredictorLSTM:
data.dropna(inplace=True) data.dropna(inplace=True)
# Gauss # Use Gaussian model to predict SoC
#data["temperature_mean"] = data[["data","data.1"]].mean(axis=1)
#data[['battery_voltage', 'battery_current', 'data']]
data["battery_soc_gauss"] = self.gauss_model.predict(data[['battery_voltage', 'battery_current', 'data']].values) data["battery_soc_gauss"] = self.gauss_model.predict(data[['battery_voltage', 'battery_current', 'data']].values)
# print(data)
# sys.exit()
scaled_data = self.scaler.fit_transform(data[['battery_voltage', 'battery_current', 'data', 'battery_soc_gauss']].values) scaled_data = self.scaler.fit_transform(data[['battery_voltage', 'battery_current', 'data', 'battery_soc_gauss']].values)
data['scaled_soc'] = self.target_scaler.fit_transform(data[['battery_soc']]) data['scaled_soc'] = self.target_scaler.fit_transform(data[['battery_soc']])
@ -119,47 +105,19 @@ class BatterySoCPredictorLSTM:
xs, ys = [], [] xs, ys = [], []
for i in range(len(data) - seq_length - n_future_steps): for i in range(len(data) - seq_length - n_future_steps):
x = data[i:(i + seq_length)] x = data[i:(i + seq_length)]
y = data[(i + seq_length):(i + seq_length + n_future_steps), -1] # Multi-Step Output y = data[(i + seq_length):(i + seq_length + n_future_steps), -1] # Multi-step output
xs.append(x) xs.append(x)
ys.append(y) ys.append(y)
return np.array(xs), np.array(ys) return np.array(xs), np.array(ys)
# def predict(self, test_data_path):
# test_data = pd.read_csv(test_data_path)
# test_data['Time'] = pd.to_datetime(test_data['Time'], unit='ms')
# test_data.set_index('Time', inplace=True)
# test_data.replace('undefined', np.nan, inplace=True)
# test_data.dropna(inplace=True)
# test_data['battery_voltage'] = pd.to_numeric(test_data['battery_voltage'], errors='coerce')
# test_data['battery_current'] = pd.to_numeric(test_data['battery_current'], errors='coerce')
# test_data['battery_soc'] = pd.to_numeric(test_data['battery_soc'], errors='coerce')
# test_data['data.1'] = pd.to_numeric(test_data['data.1'], errors='coerce')
# test_data.dropna(inplace=True)
# scaled_test_data = self.scaler.transform(test_data[['battery_voltage', 'battery_current', 'data.1', 'battery_soc']])
# test_data['scaled_soc'] = self.target_scaler.transform(test_data[['battery_soc']])
# test_data.dropna(inplace=True)
# X_test, _ = self._create_sequences(scaled_test_data, self.seq_length, self.n_future_steps)
# predictions = self.model.predict(X_test)
# predictions = self.target_scaler.inverse_transform(predictions.reshape(-1, 1)).reshape(-1, self.n_future_steps)
# return predictions
def predict_single(self, voltage_current_temp_soc_sequence): def predict_single(self, voltage_current_temp_soc_sequence):
if len(voltage_current_temp_soc_sequence) != self.seq_length or len(voltage_current_temp_soc_sequence[0]) != 3: if len(voltage_current_temp_soc_sequence) != self.seq_length or len(voltage_current_temp_soc_sequence[0]) != 3:
raise ValueError("Die Eingabesequenz muss die Form (seq_length, 3) haben.") raise ValueError("Input sequence must have the shape (seq_length, 3).")
soc_gauss = self.gauss_model.predict(voltage_current_temp_soc_sequence) soc_gauss = self.gauss_model.predict(voltage_current_temp_soc_sequence)
soc_gauss = soc_gauss.reshape(-1, 1) soc_gauss = soc_gauss.reshape(-1, 1)
#print(voltage_current_temp_soc_sequence.shape)
#print(soc_gauss.shape)
voltage_current_sequence = np.hstack([voltage_current_temp_soc_sequence, soc_gauss]) voltage_current_sequence = np.hstack([voltage_current_temp_soc_sequence, soc_gauss])
#print(voltage_current_sequence.shape)
print(voltage_current_sequence)
scaled_sequence = self.scaler.transform(voltage_current_sequence) scaled_sequence = self.scaler.transform(voltage_current_sequence)
X = np.array([scaled_sequence]) X = np.array([scaled_sequence])
@ -188,8 +146,6 @@ class BatterySoCPredictorLSTM:
self.target_scaler.scale_ = np.array(scaler_params['target_scaler_scale_']) self.target_scaler.scale_ = np.array(scaler_params['target_scaler_scale_'])
if __name__ == '__main__': if __name__ == '__main__':
train_data_path = 'lstm_train/raw_data_clean.csv' train_data_path = 'lstm_train/raw_data_clean.csv'
test_data_path = 'Test_Data.csv' test_data_path = 'Test_Data.csv'
model_path = 'battery_soc_predictor_lstm_model.keras' model_path = 'battery_soc_predictor_lstm_model.keras'
@ -198,38 +154,33 @@ if __name__ == '__main__':
#################### ####################
# GAUSS + K-Means # GAUSS + K-Means
#################### ####################
# Daten laden und vorbereiten # Load and prepare data
data_path = 'k_means.csv' data_path = 'k_means.csv'
data = pd.read_csv(data_path, decimal='.') data = pd.read_csv(data_path, decimal='.')
data.dropna(inplace=True) # Entfernen von Zeilen mit NaN-Werten, die durch das Rolling entstehen data.dropna(inplace=True) # Remove rows with NaN values
#print(data[["data","data.1"]].mean(axis=1)) data["temperature_mean"] = data[["data", "data.1"]].mean(axis=1) # Calculate mean temperature
data["temperature_mean"] = data[["data","data.1"]].mean(axis=1)
# Features und Zielvariable definieren # Define features and target variable
X = data[['battery_voltage', 'battery_current',"temperature_mean"]] # X = data[['battery_voltage', 'battery_current', "temperature_mean"]]
y = data['battery_soc'] y = data['battery_soc']
# Aufteilen der Daten in Trainings- und Testdatensätze # Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)
# # # Modell instanziieren und trainieren
#battery_model = BatterySocPredictorGauss()
#battery_model.fit(X_train, y_train)
#battery_model.save_model('battery_model.pkl')
battery_model = BatterySocPredictorGauss.load_model('battery_model.pkl') battery_model = BatterySocPredictorGauss.load_model('battery_model.pkl')
# Vorhersagen auf den Testdaten # Make predictions on the test data
y_pred_test = battery_model.predict(X_test) y_pred_test = battery_model.predict(X_test)
print(y_pred_test.shape, " ", y_test.shape) print(y_pred_test.shape, " ", y_test.shape)
# Berechnung des MAE und RMSE # Calculate MAE and RMSE
mae = mean_absolute_error(y_test, y_pred_test) mae = mean_absolute_error(y_test, y_pred_test)
rmse = mean_squared_error(y_test, y_pred_test, squared=False) rmse = mean_squared_error(y_test, y_pred_test, squared=False)
print(f'Mean Absolute Error (MAE): {mae}') print(f'Mean Absolute Error (MAE): {mae}')
print(f'Root Mean Squared Error (RMSE): {rmse}') print(f'Root Mean Squared Error (RMSE): {rmse}')
# Plotten der tatsächlichen Werte vs. Vorhersagen # Plot actual vs predicted values
# plt.figure(figsize=(12, 6)) # plt.figure(figsize=(12, 6))
# plt.plot(y_test.values, label='Actual SoC') # plt.plot(y_test.values, label='Actual SoC')
# plt.plot(y_pred_test, label='Predicted SoC') # plt.plot(y_pred_test, label='Predicted SoC')
@ -239,27 +190,18 @@ if __name__ == '__main__':
# plt.legend() # plt.legend()
# plt.show() # plt.show()
# # # Modell speichern
#battery_model.save_model('battery_model.pkl')
# Modell für Vorhersagen laden
#loaded_model = BatterySocPredictorGauss.load_model('battery_model.pkl')
#################### ####################
# LSTM # LSTM
#################### ####################
predictor = BatterySoCPredictorLSTM(gauss='battery_model.pkl') predictor = BatterySoCPredictorLSTM(gauss='battery_model.pkl')
# # Training mit rekursiver Vorhersage # Training with recursive prediction
predictor.fit(train_data_path, epochs=50, batch_size=50, validation_split=0.1) predictor.fit(train_data_path, epochs=50, batch_size=50, validation_split=0.1)
# # # Speichern des Modells und der Scaler # Save the model and scalers
predictor.save_model(model_path=model_path, scaler_path=scaler_path) predictor.save_model(model_path=model_path, scaler_path=scaler_path)
# # # Laden des Modells und der Scaler # Load the model and scalers
loaded_predictor = BatterySoCPredictorLSTM(model_path=model_path, scaler_path=scaler_path, gauss='battery_model.pkl') loaded_predictor = BatterySoCPredictorLSTM(model_path=model_path, scaler_path=scaler_path, gauss='battery_model.pkl')
test_data = pd.read_csv(test_data_path) test_data = pd.read_csv(test_data_path)
@ -281,11 +223,6 @@ if __name__ == '__main__':
predictions = loaded_predictor.model.predict(X_test) predictions = loaded_predictor.model.predict(X_test)
predictions = loaded_predictor.target_scaler.inverse_transform(predictions.reshape(-1, 1)).reshape(-1, loaded_predictor.n_future_steps) predictions = loaded_predictor.target_scaler.inverse_transform(predictions.reshape(-1, 1)).reshape(-1, loaded_predictor.n_future_steps)
# print(test_data['battery_soc'].values[5:-5,...].shape)
# print(predictions[:,0].shape)
test_data_y = test_data['battery_soc'].values[5:-1, ...] test_data_y = test_data['battery_soc'].values[5:-1, ...]
mae = mean_absolute_error(test_data_y, predictions[:, 0]) mae = mean_absolute_error(test_data_y, predictions[:, 0])
rmse = mean_squared_error(test_data_y, predictions[:, 0], squared=False) rmse = mean_squared_error(test_data_y, predictions[:, 0], squared=False)