diff --git a/modules/class_battery_soc_predictor.py b/modules/class_battery_soc_predictor.py deleted file mode 100644 index debc7fb..0000000 --- a/modules/class_battery_soc_predictor.py +++ /dev/null @@ -1,240 +0,0 @@ -import numpy as np -import pandas as pd -import joblib -import json -from sklearn.preprocessing import StandardScaler, MinMaxScaler -from sklearn.gaussian_process import GaussianProcessRegressor -from sklearn.gaussian_process.kernels import WhiteKernel, Matern, DotProduct -from sklearn.model_selection import train_test_split -from tensorflow.keras.models import Sequential, load_model -from tensorflow.keras.layers import LSTM, Dense, Dropout, RepeatVector, TimeDistributed -from tensorflow.keras.optimizers import Adam -from tensorflow.keras.regularizers import l1, l2, l1_l2 -from sklearn.metrics import mean_absolute_error, mean_squared_error -import matplotlib.pyplot as plt - -class BatterySocPredictorGauss: - def __init__(self): - # Initialize scaler and Gaussian process model - self.scaler = StandardScaler() - kernel = (WhiteKernel(1.0, (1e-7, 1e3)) + - Matern(length_scale=(0.1, 0.1, 0.1), - length_scale_bounds=((1e-7, 1e3), (1e-7, 1e3), (1e-7, 1e3))) + - DotProduct()) - self.gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=10, alpha=1e-3, normalize_y=True) - - def fit(self, X, y): - # Transform the target variable - y_transformed = np.log(y / (101 - y)) - # Scale the features - X_scaled = self.scaler.fit_transform(X) - # Train the model - self.gp.fit(X_scaled, y_transformed) - - def predict(self, X): - # Scale the features - X_scaled = self.scaler.transform(X) - # Predictions and uncertainties - y_pred_transformed, sigma_transformed = self.gp.predict(X_scaled, return_std=True) - # Reverse transform the predictions - y_pred = 101 / (1 + np.exp(-y_pred_transformed)) - # Reverse transform the uncertainties - sigmoid_y_pred = 1 / (1 + np.exp(-y_pred_transformed)) - sigma = sigma_transformed * 101 * sigmoid_y_pred * (1 - sigmoid_y_pred) - return y_pred - - def save_model(self, file_path): - # Save the entire model object - joblib.dump(self, file_path) - - @staticmethod - def load_model(file_path): - # Load the model object - return joblib.load(file_path) - - -class BatterySoCPredictorLSTM: - def __init__(self, model_path=None, scaler_path=None, gauss=None): - self.scaler = MinMaxScaler(feature_range=(0, 1)) - self.target_scaler = MinMaxScaler(feature_range=(0, 1)) - self.seq_length = 5 # Number of time steps in input sequence - self.n_future_steps = 1 # Number of future steps to predict - self.gauss_model = BatterySocPredictorGauss.load_model(gauss) - - if model_path: - self.model = load_model(model_path) - else: - self.model = self._build_model() - - if scaler_path: - self.load_scalers(scaler_path) - - def _build_model(self): - regu = 0.00 # Regularization rate - model = Sequential() - model.add(LSTM(20, activation='relu', return_sequences=True, input_shape=(self.seq_length, 4), kernel_regularizer=l2(regu))) - model.add(LSTM(20, activation='relu', return_sequences=False, kernel_regularizer=l2(regu))) - model.add(RepeatVector(self.n_future_steps)) - model.add(LSTM(20, activation='relu', return_sequences=True, kernel_regularizer=l2(regu))) - model.add(TimeDistributed(Dense(1, kernel_regularizer=l2(regu)))) # TimeDistributed layer for multi-step output - - optimizer = Adam(learning_rate=0.0005) - model.compile(optimizer=optimizer, loss='mae') - return model - - def fit(self, data_path, epochs=100, batch_size=50, validation_split=0.1): - data = pd.read_csv(data_path) - data['Time'] = pd.to_datetime(data['Time'], unit='ms') - data.set_index('Time', inplace=True) - - data.dropna(inplace=True) - - # Use Gaussian model to predict SoC - data["battery_soc_gauss"] = self.gauss_model.predict(data[['battery_voltage', 'battery_current', 'data']].values) - - scaled_data = self.scaler.fit_transform(data[['battery_voltage', 'battery_current', 'data', 'battery_soc_gauss']].values) - data['scaled_soc'] = self.target_scaler.fit_transform(data[['battery_soc']]) - - X, y = self._create_sequences(scaled_data, self.seq_length, self.n_future_steps) - - print(y.shape) - - self.model.fit(X, y, epochs=epochs, batch_size=batch_size, validation_split=validation_split) - - def _create_sequences(self, data, seq_length, n_future_steps): - xs, ys = [], [] - for i in range(len(data) - seq_length - n_future_steps): - x = data[i:(i + seq_length)] - y = data[(i + seq_length):(i + seq_length + n_future_steps), -1] # Multi-step output - xs.append(x) - ys.append(y) - return np.array(xs), np.array(ys) - - def predict_single(self, voltage_current_temp_soc_sequence): - if len(voltage_current_temp_soc_sequence) != self.seq_length or len(voltage_current_temp_soc_sequence[0]) != 3: - raise ValueError("Input sequence must have the shape (seq_length, 3).") - - soc_gauss = self.gauss_model.predict(voltage_current_temp_soc_sequence) - soc_gauss = soc_gauss.reshape(-1, 1) - voltage_current_sequence = np.hstack([voltage_current_temp_soc_sequence, soc_gauss]) - - scaled_sequence = self.scaler.transform(voltage_current_sequence) - X = np.array([scaled_sequence]) - - prediction = self.model.predict(X) - prediction = self.target_scaler.inverse_transform(prediction.reshape(-1, 1)).reshape(-1, self.n_future_steps) - return prediction # Return the sequence of future SoC predictions - - def save_model(self, model_path=None, scaler_path=None): - self.model.save(model_path) - - scaler_params = { - 'scaler_min_': self.scaler.min_.tolist(), - 'scaler_scale_': self.scaler.scale_.tolist(), - 'target_scaler_min_': self.target_scaler.min_.tolist(), - 'target_scaler_scale_': self.target_scaler.scale_.tolist() - } - with open(scaler_path, 'w') as f: - json.dump(scaler_params, f) - - def load_scalers(self, scaler_path): - with open(scaler_path, 'r') as f: - scaler_params = json.load(f) - self.scaler.min_ = np.array(scaler_params['scaler_min_']) - self.scaler.scale_ = np.array(scaler_params['scaler_scale_']) - self.target_scaler.min_ = np.array(scaler_params['target_scaler_min_']) - self.target_scaler.scale_ = np.array(scaler_params['target_scaler_scale_']) - -if __name__ == '__main__': - train_data_path = 'lstm_train/raw_data_clean.csv' - test_data_path = 'Test_Data.csv' - model_path = 'battery_soc_predictor_lstm_model.keras' - scaler_path = 'battery_soc_predictor_scaler_model' - - #################### - # GAUSS + K-Means - #################### - # Load and prepare data - data_path = 'k_means.csv' - data = pd.read_csv(data_path, decimal='.') - data.dropna(inplace=True) # Remove rows with NaN values - data["temperature_mean"] = data[["data", "data.1"]].mean(axis=1) # Calculate mean temperature - - # Define features and target variable - X = data[['battery_voltage', 'battery_current', "temperature_mean"]] - y = data['battery_soc'] - - # Split the data into training and testing sets - X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42) - - battery_model = BatterySocPredictorGauss.load_model('battery_model.pkl') - - # Make predictions on the test data - y_pred_test = battery_model.predict(X_test) - - print(y_pred_test.shape, " ", y_test.shape) - # Calculate MAE and RMSE - mae = mean_absolute_error(y_test, y_pred_test) - rmse = mean_squared_error(y_test, y_pred_test, squared=False) - - print(f'Mean Absolute Error (MAE): {mae}') - print(f'Root Mean Squared Error (RMSE): {rmse}') - - # Plot actual vs predicted values - # plt.figure(figsize=(12, 6)) - # plt.plot(y_test.values, label='Actual SoC') - # plt.plot(y_pred_test, label='Predicted SoC') - # plt.xlabel('Samples') - # plt.ylabel('State of Charge (SoC)') - # plt.title('Actual vs Predicted SoC') - # plt.legend() - # plt.show() - - #################### - # LSTM - #################### - predictor = BatterySoCPredictorLSTM(gauss='battery_model.pkl') - - # Training with recursive prediction - predictor.fit(train_data_path, epochs=50, batch_size=50, validation_split=0.1) - - # Save the model and scalers - predictor.save_model(model_path=model_path, scaler_path=scaler_path) - - # Load the model and scalers - loaded_predictor = BatterySoCPredictorLSTM(model_path=model_path, scaler_path=scaler_path, gauss='battery_model.pkl') - - test_data = pd.read_csv(test_data_path) - test_data['Time'] = pd.to_datetime(test_data['Time'], unit='ms') - test_data.set_index('Time', inplace=True) - test_data.replace('undefined', np.nan, inplace=True) - test_data.dropna(inplace=True) - test_data['battery_voltage'] = pd.to_numeric(test_data['battery_voltage'], errors='coerce') - test_data['battery_current'] = pd.to_numeric(test_data['battery_current'], errors='coerce') - test_data['battery_soc'] = pd.to_numeric(test_data['battery_soc'], errors='coerce') - test_data['data'] = pd.to_numeric(test_data['data.1'], errors='coerce') - test_data.dropna(inplace=True) - - scaled_test_data = loaded_predictor.scaler.transform(test_data[['battery_voltage', 'battery_current', 'data', 'battery_soc']]) - test_data['scaled_soc'] = loaded_predictor.target_scaler.transform(test_data[['battery_soc']]) - test_data.dropna(inplace=True) - - X_test, y_test = loaded_predictor._create_sequences(scaled_test_data, loaded_predictor.seq_length, loaded_predictor.n_future_steps) - predictions = loaded_predictor.model.predict(X_test) - predictions = loaded_predictor.target_scaler.inverse_transform(predictions.reshape(-1, 1)).reshape(-1, loaded_predictor.n_future_steps) - - test_data_y = test_data['battery_soc'].values[5:-1, ...] - mae = mean_absolute_error(test_data_y, predictions[:, 0]) - rmse = mean_squared_error(test_data_y, predictions[:, 0], squared=False) - - print(f'Mean Absolute Error (MAE): {mae}') - print(f'Root Mean Squared Error (RMSE): {rmse}') - - plt.figure(figsize=(12, 6)) - plt.plot(test_data_y, label='Actual SoC') - plt.plot(predictions[:, 0].flatten(), label='Predicted SoC') - plt.xlabel('Samples') - plt.ylabel('State of Charge (SoC)') - plt.title('Actual vs Predicted SoC using LSTM') - plt.legend() - plt.show()