EOS/modules/class_battery_soc_predictor.py

241 lines
10 KiB
Python
Raw Normal View History

import numpy as np
import pandas as pd
import joblib
import json
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import WhiteKernel, Matern, DotProduct
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout, RepeatVector, TimeDistributed
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l1, l2, l1_l2
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt
class BatterySocPredictorGauss:
def __init__(self):
# Initialize scaler and Gaussian process model
self.scaler = StandardScaler()
kernel = (WhiteKernel(1.0, (1e-7, 1e3)) +
Matern(length_scale=(0.1, 0.1, 0.1),
length_scale_bounds=((1e-7, 1e3), (1e-7, 1e3), (1e-7, 1e3))) +
DotProduct())
self.gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=10, alpha=1e-3, normalize_y=True)
def fit(self, X, y):
# Transform the target variable
y_transformed = np.log(y / (101 - y))
# Scale the features
X_scaled = self.scaler.fit_transform(X)
# Train the model
self.gp.fit(X_scaled, y_transformed)
def predict(self, X):
# Scale the features
X_scaled = self.scaler.transform(X)
# Predictions and uncertainties
y_pred_transformed, sigma_transformed = self.gp.predict(X_scaled, return_std=True)
# Reverse transform the predictions
y_pred = 101 / (1 + np.exp(-y_pred_transformed))
# Reverse transform the uncertainties
sigmoid_y_pred = 1 / (1 + np.exp(-y_pred_transformed))
sigma = sigma_transformed * 101 * sigmoid_y_pred * (1 - sigmoid_y_pred)
return y_pred
def save_model(self, file_path):
# Save the entire model object
joblib.dump(self, file_path)
@staticmethod
def load_model(file_path):
# Load the model object
return joblib.load(file_path)
class BatterySoCPredictorLSTM:
def __init__(self, model_path=None, scaler_path=None, gauss=None):
self.scaler = MinMaxScaler(feature_range=(0, 1))
self.target_scaler = MinMaxScaler(feature_range=(0, 1))
self.seq_length = 5 # Number of time steps in input sequence
self.n_future_steps = 1 # Number of future steps to predict
self.gauss_model = BatterySocPredictorGauss.load_model(gauss)
if model_path:
self.model = load_model(model_path)
else:
self.model = self._build_model()
if scaler_path:
self.load_scalers(scaler_path)
def _build_model(self):
regu = 0.00 # Regularization rate
model = Sequential()
model.add(LSTM(20, activation='relu', return_sequences=True, input_shape=(self.seq_length, 4), kernel_regularizer=l2(regu)))
model.add(LSTM(20, activation='relu', return_sequences=False, kernel_regularizer=l2(regu)))
model.add(RepeatVector(self.n_future_steps))
model.add(LSTM(20, activation='relu', return_sequences=True, kernel_regularizer=l2(regu)))
model.add(TimeDistributed(Dense(1, kernel_regularizer=l2(regu)))) # TimeDistributed layer for multi-step output
optimizer = Adam(learning_rate=0.0005)
model.compile(optimizer=optimizer, loss='mae')
return model
def fit(self, data_path, epochs=100, batch_size=50, validation_split=0.1):
data = pd.read_csv(data_path)
data['Time'] = pd.to_datetime(data['Time'], unit='ms')
data.set_index('Time', inplace=True)
data.dropna(inplace=True)
# Use Gaussian model to predict SoC
data["battery_soc_gauss"] = self.gauss_model.predict(data[['battery_voltage', 'battery_current', 'data']].values)
scaled_data = self.scaler.fit_transform(data[['battery_voltage', 'battery_current', 'data', 'battery_soc_gauss']].values)
data['scaled_soc'] = self.target_scaler.fit_transform(data[['battery_soc']])
X, y = self._create_sequences(scaled_data, self.seq_length, self.n_future_steps)
print(y.shape)
self.model.fit(X, y, epochs=epochs, batch_size=batch_size, validation_split=validation_split)
def _create_sequences(self, data, seq_length, n_future_steps):
xs, ys = [], []
for i in range(len(data) - seq_length - n_future_steps):
x = data[i:(i + seq_length)]
y = data[(i + seq_length):(i + seq_length + n_future_steps), -1] # Multi-step output
xs.append(x)
ys.append(y)
return np.array(xs), np.array(ys)
def predict_single(self, voltage_current_temp_soc_sequence):
if len(voltage_current_temp_soc_sequence) != self.seq_length or len(voltage_current_temp_soc_sequence[0]) != 3:
raise ValueError("Input sequence must have the shape (seq_length, 3).")
soc_gauss = self.gauss_model.predict(voltage_current_temp_soc_sequence)
soc_gauss = soc_gauss.reshape(-1, 1)
voltage_current_sequence = np.hstack([voltage_current_temp_soc_sequence, soc_gauss])
scaled_sequence = self.scaler.transform(voltage_current_sequence)
X = np.array([scaled_sequence])
prediction = self.model.predict(X)
prediction = self.target_scaler.inverse_transform(prediction.reshape(-1, 1)).reshape(-1, self.n_future_steps)
return prediction # Return the sequence of future SoC predictions
def save_model(self, model_path=None, scaler_path=None):
self.model.save(model_path)
scaler_params = {
'scaler_min_': self.scaler.min_.tolist(),
'scaler_scale_': self.scaler.scale_.tolist(),
'target_scaler_min_': self.target_scaler.min_.tolist(),
'target_scaler_scale_': self.target_scaler.scale_.tolist()
}
with open(scaler_path, 'w') as f:
json.dump(scaler_params, f)
def load_scalers(self, scaler_path):
with open(scaler_path, 'r') as f:
scaler_params = json.load(f)
self.scaler.min_ = np.array(scaler_params['scaler_min_'])
self.scaler.scale_ = np.array(scaler_params['scaler_scale_'])
self.target_scaler.min_ = np.array(scaler_params['target_scaler_min_'])
self.target_scaler.scale_ = np.array(scaler_params['target_scaler_scale_'])
if __name__ == '__main__':
train_data_path = 'lstm_train/raw_data_clean.csv'
test_data_path = 'Test_Data.csv'
model_path = 'battery_soc_predictor_lstm_model.keras'
scaler_path = 'battery_soc_predictor_scaler_model'
####################
# GAUSS + K-Means
####################
# Load and prepare data
data_path = 'k_means.csv'
data = pd.read_csv(data_path, decimal='.')
data.dropna(inplace=True) # Remove rows with NaN values
data["temperature_mean"] = data[["data", "data.1"]].mean(axis=1) # Calculate mean temperature
# Define features and target variable
X = data[['battery_voltage', 'battery_current', "temperature_mean"]]
y = data['battery_soc']
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)
battery_model = BatterySocPredictorGauss.load_model('battery_model.pkl')
# Make predictions on the test data
y_pred_test = battery_model.predict(X_test)
print(y_pred_test.shape, " ", y_test.shape)
# Calculate MAE and RMSE
mae = mean_absolute_error(y_test, y_pred_test)
rmse = mean_squared_error(y_test, y_pred_test, squared=False)
print(f'Mean Absolute Error (MAE): {mae}')
print(f'Root Mean Squared Error (RMSE): {rmse}')
# Plot actual vs predicted values
# plt.figure(figsize=(12, 6))
# plt.plot(y_test.values, label='Actual SoC')
# plt.plot(y_pred_test, label='Predicted SoC')
# plt.xlabel('Samples')
# plt.ylabel('State of Charge (SoC)')
# plt.title('Actual vs Predicted SoC')
# plt.legend()
# plt.show()
####################
# LSTM
####################
predictor = BatterySoCPredictorLSTM(gauss='battery_model.pkl')
# Training with recursive prediction
predictor.fit(train_data_path, epochs=50, batch_size=50, validation_split=0.1)
# Save the model and scalers
predictor.save_model(model_path=model_path, scaler_path=scaler_path)
# Load the model and scalers
loaded_predictor = BatterySoCPredictorLSTM(model_path=model_path, scaler_path=scaler_path, gauss='battery_model.pkl')
test_data = pd.read_csv(test_data_path)
test_data['Time'] = pd.to_datetime(test_data['Time'], unit='ms')
test_data.set_index('Time', inplace=True)
test_data.replace('undefined', np.nan, inplace=True)
test_data.dropna(inplace=True)
test_data['battery_voltage'] = pd.to_numeric(test_data['battery_voltage'], errors='coerce')
test_data['battery_current'] = pd.to_numeric(test_data['battery_current'], errors='coerce')
test_data['battery_soc'] = pd.to_numeric(test_data['battery_soc'], errors='coerce')
test_data['data'] = pd.to_numeric(test_data['data.1'], errors='coerce')
test_data.dropna(inplace=True)
scaled_test_data = loaded_predictor.scaler.transform(test_data[['battery_voltage', 'battery_current', 'data', 'battery_soc']])
test_data['scaled_soc'] = loaded_predictor.target_scaler.transform(test_data[['battery_soc']])
test_data.dropna(inplace=True)
X_test, y_test = loaded_predictor._create_sequences(scaled_test_data, loaded_predictor.seq_length, loaded_predictor.n_future_steps)
predictions = loaded_predictor.model.predict(X_test)
predictions = loaded_predictor.target_scaler.inverse_transform(predictions.reshape(-1, 1)).reshape(-1, loaded_predictor.n_future_steps)
test_data_y = test_data['battery_soc'].values[5:-1, ...]
mae = mean_absolute_error(test_data_y, predictions[:, 0])
rmse = mean_squared_error(test_data_y, predictions[:, 0], squared=False)
print(f'Mean Absolute Error (MAE): {mae}')
print(f'Root Mean Squared Error (RMSE): {rmse}')
plt.figure(figsize=(12, 6))
plt.plot(test_data_y, label='Actual SoC')
plt.plot(predictions[:, 0].flatten(), label='Predicted SoC')
plt.xlabel('Samples')
plt.ylabel('State of Charge (SoC)')
plt.title('Actual vs Predicted SoC using LSTM')
plt.legend()
plt.show()