diff --git a/src_pyeddl/ModelHandler.py b/src_pyeddl/ModelHandler.py index cc2972f..88baf9d 100644 --- a/src_pyeddl/ModelHandler.py +++ b/src_pyeddl/ModelHandler.py @@ -1,228 +1,228 @@ import numpy as np from matplotlib import pyplot as plt # pyeddl import pyeddl._core.eddl as eddl from pyeddl.tensor import Tensor -from pyeddl._core.eddl import BatchNormalization, L2, sgd, Input, Dense, Dropout, Activation, Reshape, MaxPool, Flatten, Conv, GlorotUniform +from pyeddl._core.eddl import BatchNormalization, L2, sgd, Input, Dense, Dropout, Activation, Reshape, MaxPool, Flatten, Conv, GlorotUniform, Pad # sklearn from sklearn.metrics import confusion_matrix, classification_report, roc_curve,roc_auc_score class ModelHandler: NB_CHNS = 4 L2_K = 0.1 L2_B = 0.1 L2_A = 0.0 DROPOUT_RATE = 0.5 net = None def __init__(self): print("Object constructor") self.model = self.get_model() def get_model(self): in_ = Input([1, self.NB_CHNS, 1280]) l = in_ l = L2(GlorotUniform(Conv(l, 16, [3, 5], [1, 1], "same")), self.L2_K) l = BatchNormalization(l, 0.99, 0.001) l = Activation(l, "relu") l = MaxPool(l, [1, 2], [1, 2], "same") l = L2(GlorotUniform(Conv(l, 32, [3, 3], [1, 1], "same")), self.L2_K) l = BatchNormalization(l, 0.99, 0.001) l = Activation(l, "relu") l = MaxPool(l, [1, 2], [1, 2], "same") - l = L2(GlorotUniform(Conv(l, 32, [3, 3], [2, 2], "same")), self.L2_K) + l = L2(GlorotUniform(Conv(Pad(l, [0, 1, 1, 0]), 32, [3, 3], [2, 2], "same")), self.L2_K) l = BatchNormalization(l, 0.99, 0.001) l = Activation(l, "relu") l = Dropout(l, self.DROPOUT_RATE) l = Flatten(l) l = L2(GlorotUniform(Dense(l, 64)), self.L2_K) l = Activation(l, "relu") l = Dropout(l, self.DROPOUT_RATE) l = GlorotUniform(Dense(l, 2)) l = Activation(l, "softmax") out_ = l model = eddl.Model([in_], [out_]) eddl.build(model, sgd(0.01, 0.9, 0.0, nesterov=True), ["cross_entropy"], ["categorical_accuracy"], eddl.CS_CPU(1, 'full_mem')) eddl.summary(model) return model # Train_set and test_set are pandas dataframes where the signal is in the column labeled "signal" and the label "label" def cut_signal_data(self, train_set, test_set, size_train=1280, stride_train=640, size_test=1280, stride_test=1280): x_train = [] y_train = [] x_test = [] y_test = [] # For each training seizure/pre-ictal signals for row in train_set.itertuples(index=False): signal = row.signal label = row.label # Cut signal in chunks of 5 seconds signals = self.sliding_window(signal, size_train, stride_train) labels = np.ones(len(signals)) * label x_train += list(signals) y_train += list(labels) # For each testing seizure/pre-ictal signals for row in test_set.itertuples(index=False): signal = row.signal label = row.label # Cut signal in chunks of 5 seconds signals = self.sliding_window(signal, size_test, stride_test) labels = np.ones(len(signals)) * label x_test += list(signals) y_test += list(labels) return x_train, y_train, x_test, y_test # input: eeg signal, size: size of the window, stride: step # Default : 1280 long windows with 50% overlap # output : array of signal cuts according to the window size def sliding_window(self, signal, size=1280, stride=640): out = [] num_of_chunks = int(((signal.shape[1] - size) / stride) + 1) for i in range(0, num_of_chunks * stride, stride): out.append(signal[:, i:i + size]) return out def prepare_standardplot(self, title, xlabel): fig, (ax1, ax2) = plt.subplots(1, 2) fig.set_size_inches(12, 6) fig.suptitle(title) ax1.set_ylabel('binary cross entropy') ax1.set_xlabel(xlabel) ax1.set_yscale('log') ax2.set_ylabel('accuracy [% correct]') ax2.set_xlabel(xlabel) return fig, ax1, ax2 def finalize_standardplot(self, fig, ax1, ax2): ax1handles, ax1labels = ax1.get_legend_handles_labels() if len(ax1labels) > 0: ax1.legend(ax1handles, ax1labels) ax2handles, ax2labels = ax2.get_legend_handles_labels() if len(ax2labels) > 0: ax2.legend(ax2handles, ax2labels) fig.tight_layout() plt.subplots_adjust(top=0.9) def plot_history(self, history, title): fig, ax1, ax2 = self.prepare_standardplot(title, 'epoch') ax1.plot(history.history['loss'], label="training") ax1.plot(history.history['val_loss'], label="validation") ax2.plot(history.history['acc'], label="training") ax2.plot(history.history['val_acc'], label="validation") self.finalize_standardplot(fig, ax1, ax2) return fig # Display model performance on test set def show_confusion_matrix(self, y_true_, y_pred_, title): print("Generating Confusion Matrix") cm = confusion_matrix(y_pred=np.rint(y_pred_), y_true=np.array(y_true_)) print(cm) plt.imshow(cm, cmap="inferno_r") plt.title(title) plt.show() print(classification_report(y_pred=np.rint(y_pred_), y_true=np.array(y_true_))) # Calculate accuracy on test set def compute_accuracy(self, y_pred_, y_true_): return 1 - np.sum(np.abs(np.max(np.round(y_pred_), axis=1) - np.array(y_true_))) / len(y_true_) def false_positive_rate(self, y_test, y_pred, detect_rule): time_hr = len(y_pred) * 1280 / (256 * 60 * 60) preds = np.max(np.rint(y_pred), axis=1) fp = 0 x_ = np.zeros(detect_rule[1]) alarm_triggered = False counter_alarm = 23 # Needs 1mn seconds between seizure onsets false_alarms = [] for idx, x in enumerate(preds): if (counter_alarm == 0): alarm_triggered = False else: counter_alarm -= 1 if (alarm_triggered == False): for j, y in enumerate(x_[::-1]): if (j == len(x_) - 1): x_[1] = x_[0] x_[0] = 0 else: x_[len(x_) - 1 - j] = x_[len(x_) - 2 - j] if (x == 1): x_[0] = 1 if (np.sum(x_) >= detect_rule[0]): fp += 1 alarm_triggered = True counter_alarm = 23 false_alarms.append(idx) fpr = fp / time_hr return fpr, fp, false_alarms # Compute detection time: first time to have 2 out of 3 segments being classified as ictal def compute_detect_time(self, preds, test, detect_rule): x_ = np.zeros(detect_rule[1]) for idx, x in enumerate(preds): for j, y in enumerate(x_[::-1]): if (j == len(x_) - 1): x_[1] = x_[0] x_[0] = 0 else: x_[len(x_) - 1 - j] = x_[len(x_) - 2 - j] if (x == 1): x_[0] = 1 if (np.sum(x_) >= detect_rule[0]): return idx return -1 def train(self, model_file, epochs, batch_size, learning_rate, x_train, y_train, x_val, y_val): x_train = Tensor.fromarray(x_train) y_train = Tensor.fromarray(y_train) x_val = Tensor.fromarray(x_val) y_val = Tensor.fromarray(y_val) eddl.setlr(self.model, [learning_rate]) for e in range(epochs): print("Real Epoch number: {} of {}".format(e + 1, epochs)) eddl.fit(self.model, [x_train], [y_train], batch_size, 1) eddl.evaluate(self.model, [x_val], [y_val]) eddl.save(self.model, model_file) def evaluate(self, model_file, x_test, y_test): eddl.load(self.model, model_file) x_test = Tensor.fromarray(x_test) y_test = Tensor.fromarray(y_test) eddl.evaluate(self.model, [x_test], [y_test])