Page MenuHomec4science

ModelHandler.py
No OneTemporary

File Metadata

Created
Wed, Aug 14, 19:23

ModelHandler.py

import numpy as np
from matplotlib import pyplot as plt
# pyeddl
import pyeddl._core.eddl as eddl
from pyeddl.tensor import Tensor
from pyeddl._core.eddl import BatchNormalization, L2, sgd, Input, Dense, Dropout, Activation, Reshape, MaxPool, Flatten, Conv, GlorotUniform, Pad
# sklearn
from sklearn.metrics import confusion_matrix, classification_report, roc_curve,roc_auc_score
class ModelHandler:
NB_CHNS = 4
L2_K = 0.1
L2_B = 0.1
L2_A = 0.0
DROPOUT_RATE = 0.5
net = None
def __init__(self):
print("Object constructor")
self.model = self.get_model()
def get_model(self):
in_ = Input([1, self.NB_CHNS, 1280])
l = in_
l = L2(GlorotUniform(Conv(l, 16, [3, 5], [1, 1], "same")), self.L2_K)
l = BatchNormalization(l, 0.99, 0.001)
l = Activation(l, "relu")
l = MaxPool(l, [1, 2], [1, 2], "same")
l = L2(GlorotUniform(Conv(l, 32, [3, 3], [1, 1], "same")), self.L2_K)
l = BatchNormalization(l, 0.99, 0.001)
l = Activation(l, "relu")
l = MaxPool(l, [1, 2], [1, 2], "same")
l = L2(GlorotUniform(Conv(Pad(l, [0, 1, 1, 0]), 32, [3, 3], [2, 2], "same")), self.L2_K)
l = BatchNormalization(l, 0.99, 0.001)
l = Activation(l, "relu")
l = Dropout(l, self.DROPOUT_RATE)
l = Flatten(l)
l = L2(GlorotUniform(Dense(l, 64)), self.L2_K)
l = Activation(l, "relu")
l = Dropout(l, self.DROPOUT_RATE)
l = GlorotUniform(Dense(l, 2))
l = Activation(l, "softmax")
out_ = l
model = eddl.Model([in_], [out_])
eddl.build(model,
sgd(0.01, 0.9, 0.0, nesterov=True),
["cross_entropy"],
["categorical_accuracy"],
eddl.CS_GPU([1]))
eddl.summary(model)
return model
# Train_set and test_set are pandas dataframes where the signal is in the column labeled "signal" and the label "label"
def cut_signal_data(self, train_set, test_set, size_train=1280, stride_train=640, size_test=1280, stride_test=1280):
x_train = []
y_train = []
x_test = []
y_test = []
# For each training seizure/pre-ictal signals
for row in train_set.itertuples(index=False):
signal = row.signal
label = row.label
# Cut signal in chunks of 5 seconds
signals = self.sliding_window(signal, size_train, stride_train)
labels = np.ones(len(signals)) * label
x_train += list(signals)
y_train += list(labels)
# For each testing seizure/pre-ictal signals
for row in test_set.itertuples(index=False):
signal = row.signal
label = row.label
# Cut signal in chunks of 5 seconds
signals = self.sliding_window(signal, size_test, stride_test)
labels = np.ones(len(signals)) * label
x_test += list(signals)
y_test += list(labels)
return x_train, y_train, x_test, y_test
# input: eeg signal, size: size of the window, stride: step
# Default : 1280 long windows with 50% overlap
# output : array of signal cuts according to the window size
def sliding_window(self, signal, size=1280, stride=640):
out = []
num_of_chunks = int(((signal.shape[1] - size) / stride) + 1)
for i in range(0, num_of_chunks * stride, stride):
out.append(signal[:, i:i + size])
return out
def prepare_standardplot(self, title, xlabel):
fig, (ax1, ax2) = plt.subplots(1, 2)
fig.set_size_inches(12, 6)
fig.suptitle(title)
ax1.set_ylabel('binary cross entropy')
ax1.set_xlabel(xlabel)
ax1.set_yscale('log')
ax2.set_ylabel('accuracy [% correct]')
ax2.set_xlabel(xlabel)
return fig, ax1, ax2
def finalize_standardplot(self, fig, ax1, ax2):
ax1handles, ax1labels = ax1.get_legend_handles_labels()
if len(ax1labels) > 0:
ax1.legend(ax1handles, ax1labels)
ax2handles, ax2labels = ax2.get_legend_handles_labels()
if len(ax2labels) > 0:
ax2.legend(ax2handles, ax2labels)
fig.tight_layout()
plt.subplots_adjust(top=0.9)
def plot_history(self, history, title):
fig, ax1, ax2 = self.prepare_standardplot(title, 'epoch')
ax1.plot(history.history['loss'], label="training")
ax1.plot(history.history['val_loss'], label="validation")
ax2.plot(history.history['acc'], label="training")
ax2.plot(history.history['val_acc'], label="validation")
self.finalize_standardplot(fig, ax1, ax2)
return fig
# Display model performance on test set
def show_confusion_matrix(self, y_true_, y_pred_, title):
print("Generating Confusion Matrix")
cm = confusion_matrix(y_pred=np.rint(y_pred_), y_true=np.array(y_true_))
print(cm)
plt.imshow(cm, cmap="inferno_r")
plt.title(title)
plt.show()
print(classification_report(y_pred=np.rint(y_pred_), y_true=np.array(y_true_)))
# Calculate accuracy on test set
def compute_accuracy(self, y_pred_, y_true_):
return 1 - np.sum(np.abs(np.max(np.round(y_pred_), axis=1) - np.array(y_true_))) / len(y_true_)
def false_positive_rate(self, y_test, y_pred, detect_rule):
time_hr = len(y_pred) * 1280 / (256 * 60 * 60)
preds = np.max(np.rint(y_pred), axis=1)
fp = 0
x_ = np.zeros(detect_rule[1])
alarm_triggered = False
counter_alarm = 23 # Needs 1mn seconds between seizure onsets
false_alarms = []
for idx, x in enumerate(preds):
if (counter_alarm == 0):
alarm_triggered = False
else:
counter_alarm -= 1
if (alarm_triggered == False):
for j, y in enumerate(x_[::-1]):
if (j == len(x_) - 1):
x_[1] = x_[0]
x_[0] = 0
else:
x_[len(x_) - 1 - j] = x_[len(x_) - 2 - j]
if (x == 1):
x_[0] = 1
if (np.sum(x_) >= detect_rule[0]):
fp += 1
alarm_triggered = True
counter_alarm = 23
false_alarms.append(idx)
fpr = fp / time_hr
return fpr, fp, false_alarms
# Compute detection time: first time to have 2 out of 3 segments being classified as ictal
def compute_detect_time(self, preds, test, detect_rule):
x_ = np.zeros(detect_rule[1])
for idx, x in enumerate(preds):
for j, y in enumerate(x_[::-1]):
if (j == len(x_) - 1):
x_[1] = x_[0]
x_[0] = 0
else:
x_[len(x_) - 1 - j] = x_[len(x_) - 2 - j]
if (x == 1):
x_[0] = 1
if (np.sum(x_) >= detect_rule[0]):
return idx
return -1
def train(self, model_file, epochs, batch_size, learning_rate, x_train, y_train, x_val, y_val):
x_train = Tensor.fromarray(x_train)
y_train = Tensor.fromarray(y_train)
x_val = Tensor.fromarray(x_val)
y_val = Tensor.fromarray(y_val)
eddl.setlr(self.model, [learning_rate])
for e in range(epochs):
print("Real Epoch number: {} of {}".format(e + 1, epochs))
eddl.fit(self.model, [x_train], [y_train], batch_size, 1)
eddl.evaluate(self.model, [x_val], [y_val])
eddl.save(self.model, model_file)
def evaluate(self, model_file, x_test, y_test):
eddl.load(self.model, model_file)
x_test = Tensor.fromarray(x_test)
y_test = Tensor.fromarray(y_test)
eddl.evaluate(self.model, [x_test], [y_test])
def save_to_onnx(self, model_file):
eddl.save_net_to_onnx_file(self.model, model_file)

Event Timeline