Page MenuHomec4science

model.py
No OneTemporary

File Metadata

Created
Wed, May 8, 19:24

model.py

import os
from transformers import TFGPT2LMHeadModel
from transformers import AutoTokenizer, AutoModelForQuestionAnswering
from abc import abstractmethod
import numpy as np
import tensorflow as tf
import torch
import copy
from transformers.generation_tf_utils import TFGenerationMixin
import modif_gpt as mod_gpt
class Model:
def __init__(self, home_path, finetune_path, model_path, probaMode, printStep, buckets):
os.chdir(home_path)
os.chdir(model_path)
model_path = os.getcwd()
self.home_path = home_path
self.home_path = home_path
self.finetune_path = finetune_path
self.model_path = model_path
self.model = None
self.tokenizer = None
self.probaMode = probaMode
self.buckets = buckets
self.__init_counters()
self.printStep = printStep
self.sureLim = 0.8
# private functions
def __init_counters(self):
self.count = np.zeros((self.buckets,))
eps = 1e-6 # small number, to avoid division by zero
self.count += eps
tmp = np.zeros((self.buckets,))
tmp += eps
self.score = {
"tp": copy.deepcopy(tmp),
"fp": copy.deepcopy(tmp),
"fn": copy.deepcopy(tmp),
"tn": copy.deepcopy(tmp),
"f1": copy.deepcopy(tmp),
"recall": copy.deepcopy(tmp),
"precision": copy.deepcopy(tmp),
}
self.xsure = []
self.ysure = []
def __printScores(self, hist=True):
def nbTolines(nb1, nb2):
return std_len("|"*int(100/nb2*nb1), l=100, side="right", fill=".")
def std_len(s, l=8, side="left", fill=" "):
s = str(s)
while(len(s) < l):
if side == "left":
s = fill + s
else:
s += fill
return s
def doHist(tmp, var):
if i == self.buckets - 1:
next_bin = 0
else:
next_bin = var[i + 1]
tmp += " - {}".format(
nbTolines(var[i]-next_bin, var[0])
)
return tmp
console = ""
tmp = ""
tmp += "Number of exact matches, for each ConfidenceScore threshold"
tmp += "trs - used cases [%] - ConfidenceScore distribution"
for i in range(self.buckets):
tmp += "{} - {}%".format(
std_len(self.lim[i]),
std_len(int(100/self.count[0]*self.count[i]))
)
if hist:
tmp = doHist(tmp, self.count)
tmp += "\n"
console += tmp
print(tmp)
print("\n {} \n".format("="*100))
tmp = "trs - true positive [%] - false positive [%] - false negative [%] - true negative [%]"
for i in range(self.buckets):
tmp += "{} - {}% - {}% - {}% - {}%\n".format(
std_len(self.lim[i]),
std_len(int(100/self.count[0]*self.score["tp"][i])),
std_len(int(100 / self.count[0] * self.score["fp"][i])),
std_len(int(100 / self.count[0] * self.score["fn"][i])),
std_len(int(100 / self.count[0] * self.score["tn"][i]))
)
console += tmp
print(tmp)
print("\n {} \n".format("="*100))
tmp = "\ntrs - recall - precision - F1 - F1 histogram"
for i in range(self.buckets):
tmp += "{} - {} - {} - {}".format(
std_len(self.lim[i], 2),
std_len(round(self.score["recall"][i], 2)),
std_len(round(self.score["precision"][i], 2)),
std_len(round(self.score["f1"][i], 2))
)
if hist:
tmp += " - " + nbTolines(self.score["f1"][i], 1)
tmp += "\n"
console += tmp
print(tmp)
print("\n {} \n".format("=" * 100))
return console
def __updateF1(self):
for i in range(self.buckets):
self.score["recall"][i] = \
self.score["tp"][i]/(self.score["tp"][i] + self.score["fn"][i])
self.score["precision"][i] = \
self.score["tp"][i]/(self.score["tp"][i] + self.score["fp"][i])
self.score["f1"][i] = \
2 * self.score["precision"][i]*self.score["recall"][i] / \
(self.score["precision"][i] + self.score["recall"][i])
def __check_answer(self, confidenceScore, corrAnswer, givenAnswer):
for i, l in enumerate(self.lim):
# calculate true positive, ect
if confidenceScore > l:
self.count[i] += 1
if corrAnswer == givenAnswer:
self.score["tp"][i] += 1
else:
self.score["fp"][i] += 1
else:
if corrAnswer == givenAnswer:
self.score["fn"][i] += 1
else:
self.score["tn"][i] += 1
# update recall, precission and F1 score
self.__updateF1()
# protected functions
@abstractmethod
def _train(self, nbEpochs, outModelName, startCheckpoint, dataEnd, tokenizerLocaction):
raise NotImplementedError("__train has to be defined in each child class")
@abstractmethod
def _generate(self, x, y):
raise NotImplementedError("_generate has to be defined in each child class")
# public functions
@abstractmethod
def set_proba_mode(self, mode):
raise NotImplementedError("set_proba_mode has to be defined in each child class")
def getSureGuesses(self):
return [self.xsure, self.ysure]
def train(self, nbEpochs, outModelName, startCheckpoint=None, dataEnd="", tokenizerLocaction=None):
print("train ...")
if startCheckpoint == None:
print("... from scratch, with the Tokenizer in {}".format(tokenizerLocaction))
raise NotImplementedError("Training from Scratch is not implemented yet")
else:
pass
# tokenizerLocaction = startCheckpoint
cmd = self._train(nbEpochs, outModelName, startCheckpoint, dataEnd, tokenizerLocaction)
print(cmd)
os.chdir(self.finetune_path)
os.system(cmd)
def generate(self, data, idxStart=0, idx_End=None):
self.__init_counters()
if idx_End is None:
idx_End = len(data[0])
X = data[0][idxStart:idx_End]
Y = data[1][idxStart:idx_End]
for i, x in enumerate(X):
corrAnswer, givenAnswer, confScore = self._generate(x, Y[i])
# print out all wrong answers
if False:
if corrAnswer != givenAnswer:
print("corr:<{}> vs given:<{}>".format(corrAnswer, givenAnswer))
self.__check_answer(confScore, corrAnswer, givenAnswer)
if confScore > self.sureLim:
self.xsure.append(x)
self.ysure.append(Y[i])
if self.printStep > 0 and (i+1) % self.printStep == 0:
print("({}): Correct Answer / given Answer \n{} / {}".format(
i, corrAnswer, givenAnswer
))
_ = self.__printScores()
print("Final scores")
console = self.__printScores()
tp_lim0 = int(100/self.count[0]*self.score["tp"][0])
prec_lim0 = self.score["precision"][0]
f1_lim0 = self.score["f1"][0]
for i, l in enumerate(self.lim):
if l >= self.sureLim:
tp_limsure = int(100/self.count[0]*self.score["tp"][i])
prec_limsure = self.score["precision"][i]
f1_limsure = self.score["f1"][i]
break
return self.score, [tp_lim0, tp_limsure, prec_lim0, prec_limsure, f1_lim0, f1_limsure], console
@abstractmethod
def load_model(self, ModelName):
raise NotImplementedError("load_model has to be defined in each child class")
class GPTModel(Model):
def __init__(self, home_path, model_path="./models", probaMode = "longOk", printStep=10, buckets=10):
os.chdir(home_path)
os.chdir("./transformers/examples/")
os.chdir("./language-modeling")
finetune_path = os.getcwd()
super().__init__(home_path, finetune_path, model_path, probaMode, printStep, buckets)
def _train(self, nbEpochs, outModelName, startCheckpoint, dataEnd, tokenizerLocaction):
cmd = "python run_clm.py \
--model_type {} \
--train_file \"{}\" \
--do_train \
--validation_file \"{}\" \
--per_gpu_train_batch_size 1 \
--save_steps -1 \
--num_train_epochs {} \
--fp16 \
--output_dir=\"{}\" \
".format(
"gpt2",
"train" + dataEnd + ".txt",
"eval" + dataEnd + ".txt",
nbEpochs,
self.model_path + "/" + outModelName)
# --do_eval \
if startCheckpoint is not None:
if startCheckpoint not in ["gpt2"]:
startCheckpoint = self.model_path + "/" + startCheckpoint
tokenizerLocaction = startCheckpoint # "gpt2"
# startCheckpoint = "gpt2"
cmd += " --model_name_or_path {}".format(startCheckpoint)
if tokenizerLocaction is not None:
cmd += " --tokenizer_name {}".format(tokenizerLocaction)
return cmd
def __set_modif_gpt(self):
TFGenerationMixin._generate_no_beam_search = mod_gpt._generate_no_beam_search_modif
TFGenerationMixin.generate = mod_gpt.generate_modif
def _generate(self, x, y):
self.__set_modif_gpt()
input_ids = self.tokenizer.encode(x, return_tensors='tf')
VERBOSE = "nothing_but_score"
VERBOSE = "nothing"
generated_text_samples = self.model.generate(
input_ids,
max_length=len(input_ids[0]) + 50,
num_return_sequences=1,
no_repeat_ngram_size=0,
repetition_penalty=1.0,
top_p=1.0,
temperature=1.0,
do_sample=False,
top_k=0,
early_stopping=True,
tokenizer=self.tokenizer,
VERBOSE=VERBOSE,
probaMode=self.probaMode,
num_beams=1,
force2nd=True
)
givenAnswer = generated_text_samples[0]
corrAnswer = y[:-len("<|endoftext|>") - 1]
return corrAnswer, givenAnswer, generated_text_samples[1]
def set_proba_mode(self, mode):
if mode in ["mult", "longOk"]:
self.probaMode = mode
else:
raise NotImplementedError("This probability mode is not yet implemented")
def load_model(self, ModelName):
self.model = TFGPT2LMHeadModel.from_pretrained(self.model_path + "/" + ModelName, from_pt=True)
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path + "/" + ModelName)
class BertModel(Model):
def __init__(self, home_path, model_path="./models", probaMode = "mult", printStep=10, buckets=10):
os.chdir(home_path)
os.chdir("./transformers/examples/")
os.chdir("./question-answering")
finetune_path = os.getcwd()
super().__init__(home_path, finetune_path, model_path, probaMode, printStep, buckets)
# private functions
def _train(self, nbEpochs, outModelName, startCheckpoint, dataEnd, tokenizerLocaction):
cmd = "python run_qa.py \
--train_file \"{}\" \
--do_train \
--num_train_epochs \"{}\" \
--output_dir=\"{}\" \
--fp16 \
--save_steps -1 \
".format(
"train" + dataEnd + ".json",
nbEpochs,
self.model_path + "/" + outModelName)
# --validation_file \"{}\" \
# --do_eval \
# "eval" + end + ".json"
if startCheckpoint is not None:
if startCheckpoint not in ["xlm-roberta-base", "roberta-base"]:
startCheckpoint = self.model_path + "/" + startCheckpoint
cmd += " --model_name_or_path=\"{}\"".format(startCheckpoint)
return cmd
def _generate(self, x, y):
tmp = x.split("---")
text = tmp[1]
question = tmp[0]
# tokenize model input
inputs = self.tokenizer(question, text, return_tensors='pt')
# generate network output
outputs = self.model(**inputs)
start_scores = outputs.start_logits
end_scores = outputs.end_logits
if self.probaMode == "mult":
# find start and end of the answer
a = torch.argmax(start_scores)
b = torch.argmax(end_scores)
a = int(a)
b = int(b)
# get the probability of the answer
probs_a = tf.nn.softmax(start_scores.detach())
probs_b = tf.nn.softmax(end_scores.detach())
prob_a = probs_a[0, a]
prob_b = probs_b[0, b]
prob_ab = prob_a * prob_b
elif self.probaMode == "forceNon0":
# force b to be higher then a
# find start and end of the answer
a = torch.argmax(start_scores)
a = int(a)
b = torch.argmax(end_scores[0, a:])
b = int(b)
# get the probability of the answer
probs_a = tf.nn.softmax(start_scores.detach())
probs_b = tf.nn.softmax(end_scores[0, a:].detach())
prob_a = probs_a[0, a]
prob_b = probs_b[b]
prob_ab = prob_a * prob_b
b = a+b
elif self.probaMode == "maxNon0":
# search for the best non zero sequence
# find start and end of the answer
prob_ab = 0
for a_ in range(len(start_scores[0])):
b_ = torch.argmax(end_scores[0, a_:])
b_ = int(b_)
# get the probability of the answer
probs_a = tf.nn.softmax(start_scores.detach())
probs_b = tf.nn.softmax(end_scores[0, a_:].detach())
prob_a = probs_a[0, a_]
prob_b = probs_b[b_]
prob_ab_ = prob_a * prob_b
if prob_ab_ > prob_ab:
prob_ab = prob_ab_
a = a_
b = a_ + b_
else:
raise NotImplementedError("this probability mode is not implemented yet")
givenAnswer = self.tokenizer.decode(inputs['input_ids'][0][int(a):int(b) + 1])
# due to the tokenizer the answer often starts with a blank space, which is not part of the answer
if len(givenAnswer) > 0 and givenAnswer[0] == " ":
givenAnswer = givenAnswer[1:]
return y, givenAnswer, prob_ab
# public functions
def set_proba_mode(self, mode):
if mode in ["mult", "forceNon0", "maxNon0"]:
self.probaMode = mode
else:
raise NotImplementedError("this probability mode is not implemented yet")
def load_model(self, ModelName):
self.model = AutoModelForQuestionAnswering.from_pretrained(self.model_path+"/"+ModelName)
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path + "/" + ModelName)

Event Timeline