Page MenuHomec4science

model.py
No OneTemporary

File Metadata

Created
Wed, May 1, 15:12

model.py

import os
from transformers import TFGPT2LMHeadModel
from transformers import AutoTokenizer, AutoModelForQuestionAnswering
from abc import abstractmethod
import numpy as np
import tensorflow as tf
import torch
import copy
import random
from transformers.generation_tf_utils import TFGenerationMixin
import modif_gpt as mod_gpt
class Model:
def __init__(self, home_path, finetune_path, model_path, probaMode, printStep, buckets):
os.chdir(home_path)
os.chdir(model_path)
model_path = os.getcwd()
self.home_path = home_path
self.home_path = home_path
self.finetune_path = finetune_path
self.model_path = model_path
self.model = None
self.tokenizer = None
self.probaMode = probaMode
self.buckets = buckets
self.lim = [x/self.buckets for x in range(self.buckets)]
self.scores = {"all": self.__init_counters()}
self.printStep = printStep
self.sureLim = 0.5
self.keywords = ["<input>", "<answer>", "<find>", "<|endoftext|>"]
# private functions
def __init_counters(self):
eps = 1e-6 # small number, to avoid division by zero
tmp = np.zeros((self.buckets,))
tmp += eps
score = {
"tp": copy.deepcopy(tmp),
"fp": copy.deepcopy(tmp),
"fn": copy.deepcopy(tmp),
"tn": copy.deepcopy(tmp),
"f1": copy.deepcopy(tmp),
"recall": copy.deepcopy(tmp),
"precision": copy.deepcopy(tmp),
}
score["count"] = np.zeros((self.buckets,))
score["count"] += eps
score["xsure"] = []
score["ysure"] = []
return copy.deepcopy(score)
def __printScores(self, hist=True):
def nbTolines(nb1, nb2):
return std_len("|"*int(100/nb2*nb1), l=100, side="right", fill=".")
def std_len(s, l=8, side="left", fill=" "):
s = str(s)
while(len(s) < l):
if side == "left":
s = fill + s
else:
s += fill
return s
def doHist(tmp, var):
if i == self.buckets - 1:
next_bin = 0
else:
next_bin = var[i + 1]
tmp += " - {}".format(
nbTolines(var[i]-next_bin, var[0])
)
return tmp
console = ""
tmp = ""
keys = self.scores.keys()
for key in keys:
tmp += "{}\n<{}>\n{}\n".format("="*50, key, "="*50)
tmp += "Number of exact matches, for each ConfidenceScore threshold"
tmp += "trs - used cases [%] - ConfidenceScore distribution\n"
for i in range(self.buckets):
tmp += "{} - {}%".format(
std_len(self.lim[i]),
std_len(int(100/self.scores[key]["count"][0]*self.scores[key]["count"][i]))
)
if hist:
tmp = doHist(tmp, self.scores[key]["count"])
tmp += "\n"
console += tmp
print(tmp)
print("\n {} \n".format("="*100))
tmp = "\ntrs - true positive - false positive - false negative - true negative \n"
for i in range(self.buckets):
tmp += "{} - {}% - {}% - {}% - {}%\n".format(
std_len(self.lim[i]),
# std_len(int(100/self.scores[key]["count"][0]*self.scores[key]["tp"][i])),
# std_len(int(100 / self.scores[key]["count"][0] * self.scores[key]["fp"][i])),
# std_len(int(100 / self.scores[key]["count"][0] * self.scores[key]["fn"][i])),
# std_len(int(100 / self.scores[key]["count"][0] * self.scores[key]["tn"][i]))
std_len(self.scores[key]["tp"][i]),
std_len(self.scores[key]["fp"][i]),
std_len(self.scores[key]["fn"][i]),
std_len(self.scores[key]["tn"][i])
)
console += tmp
print(tmp)
print("\n {} \n".format("="*100))
tmp = "\ntrs - recall - precision - F1 - F1 histogram\n"
for i in range(self.buckets):
tmp += "{} - {} - {} - {}".format(
std_len(self.lim[i], 2),
std_len(round(self.scores[key]["recall"][i], 2)),
std_len(round(self.scores[key]["precision"][i], 2)),
std_len(round(self.scores[key]["f1"][i], 2))
)
if hist:
tmp += " - " + nbTolines(self.scores[key]["f1"][i], 1)
tmp += "\n"
console += tmp
print(tmp)
print("\n {} \n".format("=" * 100))
return console
def __updateF1(self, question):
for key in ["all", question]:
for i in range(self.buckets):
self.scores[key]["recall"][i] = \
self.scores[key]["tp"][i]/(self.scores[key]["tp"][i] + self.scores[key]["fn"][i])
self.scores[key]["precision"][i] = \
self.scores[key]["tp"][i]/(self.scores[key]["tp"][i] + self.scores[key]["fp"][i])
self.scores[key]["f1"][i] = \
2 * self.scores[key]["precision"][i]*self.scores[key]["recall"][i] / \
(self.scores[key]["precision"][i] + self.scores[key]["recall"][i])
def __check_answer(self, confidenceScore, corrAnswer, givenAnswer, question):
for i, l in enumerate(self.lim):
# calculate true positive, ect
if confidenceScore > l:
self._incr_scores(question, "count", i)
if corrAnswer == givenAnswer:
self._incr_scores(question, "tp", i)
else:
self._incr_scores(question, "fp", i)
else:
if corrAnswer == givenAnswer:
self._incr_scores(question, "fn", i)
else:
self._incr_scores(question, "tn", i)
"""
print("for <all>:")
print(self.scores["all"]["count"])
print(self.scores["all"]["tp"])
print("for <{}>:".format(question))
print(self.scores[question]["count"])
print(self.scores[question]["tp"])
"""
# update recall, precission and F1 score
self.__updateF1(question)
# protected functions
def _incr_scores(self, question, key, i=None, inc=1):
# create a full subdictionary, if it's a unseen question
if question not in self.scores.keys():
print("found a new question: {}".format(question))
self.scores[question] = self.__init_counters()
#update the value of "all"
if question is not "all" and key not in ["f1", "recall", "precision"]:
if i is None:
self.scores["all"][key] += inc
else:
self.scores["all"][key][i] += inc
# set the main value
if i is None:
self.scores[question][key] += inc
else:
self.scores[question][key][i] += inc
@abstractmethod
def _train(self, nbEpochs, outModelName, startCheckpoint, dataEnd, tokenizerLocaction):
raise NotImplementedError("__train has to be defined in each child class")
@abstractmethod
def _generate(self, x, y, x_bert, y_bert):
raise NotImplementedError("_generate has to be defined in each child class")
@abstractmethod
def _generate_demo(self, c, q):
raise NotImplementedError("_generate_demo has to be defined in each child class")
def _clean_sure(self):
keys = self.scores.keys()
invalid = []
for key in keys:
if key is not "all" and len(self.scores[key]["ysure"]) > 0:
inv = []
occ = [[x, self.scores[key]["ysure"].count(x)] for x in set(self.scores[key]["ysure"])]
# order it in decreasing order
occ_nb = [x[1] for x in occ]
occ_nb_tot = 0
for x in occ_nb:
occ_nb_tot += x
occ = [x for _, x in sorted(zip(occ_nb, occ))]
# one dominant solution (> 90%)
if occ[-1][1]/occ_nb_tot > 0.9:
for x in occ[:-1]:
inv.append(x[0])
# two dominant solutions
elif (occ[-1][1]+occ[-2][1])/occ_nb_tot > 0.9:
for x in occ[:-2]:
inv.append(x[0])
# no dominant solution
else:
# it looks suspicious if a solution is choosen more often then 10%
for x in occ:
if x[1]/occ_nb_tot > 0.1:
inv.append(x[0])
print(key)
print("The invalid answers for the key <{}> are {}".format(key, inv))
self.scores[key]["xsure"] = \
[x for i, x in enumerate(self.scores[key]["xsure"])
if self.scores[key]["ysure"][i] not in inv]
self.scores[key]["ysure"] = \
[x for i, x in enumerate(self.scores[key]["ysure"])
if self.scores[key]["ysure"][i] not in inv]
invalid += inv
print(invalid)
print("The invalid answers for the key <{}> are {}".format("all", invalid))
self.scores["all"]["xsure"] = \
[x for i, x in enumerate(self.scores["all"]["xsure"])
if self.scores["all"]["ysure"][i] not in invalid]
self.scores["all"]["ysure"] = \
[x for i, x in enumerate(self.scores["all"]["ysure"])
if self.scores["all"]["ysure"][i] not in invalid]
# public functions
@abstractmethod
def set_proba_mode(self, mode):
raise NotImplementedError("set_proba_mode has to be defined in each child class")
def getSureGuesses(self):
return [self.scores["all"]["xsure"], self.scores["all"]["ysure"]]
def train(self, nbEpochs, outModelName, startCheckpoint=None, dataEnd="", tokenizerLocaction=None):
print("train ...")
if startCheckpoint == None:
print("... from scratch, with the Tokenizer in {}".format(tokenizerLocaction))
raise NotImplementedError("Training from Scratch is not implemented yet")
else:
pass
# tokenizerLocaction = startCheckpoint
cmd = self._train(nbEpochs, outModelName, startCheckpoint, dataEnd, tokenizerLocaction)
print(cmd)
os.chdir(self.finetune_path)
os.system(cmd)
def generate_demo(self, c, q):
givenAnswer, confScore = self._generate_demo(c, q)
return givenAnswer, confScore
def generate(self, data, idxStart=0, idx_End=None, data_bert = None):
self.scores = {"all": self.__init_counters()}
if idx_End is None:
idx_End = len(data[0])
X = data[0][idxStart:idx_End]
Y = data[1][idxStart:idx_End]
if data_bert is not None:
X_bert = data_bert[0][idxStart:idx_End]
Y_bert = data_bert[1][idxStart:idx_End]
else:
X_bert = X
Y_bert = Y
errs = 0
for i, x in enumerate(X):
if i%100 == 0:
print(i)
if True: #try:
corrAnswer, givenAnswer, confScore, question = self._generate(x, Y[i], X_bert[i], Y_bert[i])
else: #except:
print("Model produced some error")
# i.e due to too long input (Roberta)
# add one additional example
X.append(data[0][idx_End + errs])
Y.append(data[1][idx_End + errs])
if data_bert is not None:
X_bert.append(data_bert[0][idx_End + errs])
Y_bert.append(data_bert[1][idx_End + errs])
else:
X_bert = X
Y_bert = Y
errs += 1
continue
# print out all wrong answers
if True:
if corrAnswer != givenAnswer:
print("corr:<{}> vs given:<{}> ({})".format(corrAnswer, givenAnswer, question))
self.__check_answer(confScore, corrAnswer, givenAnswer, question)
if confScore > self.sureLim:
self.scores[question]["xsure"].append(x)
self.scores[question]["ysure"].append(Y[i])
self.scores["all"]["xsure"].append(x)
self.scores["all"]["ysure"].append(Y[i])
if self.printStep > 0 and (i+1) % self.printStep == 0:
print("({}): Correct Answer / given Answer \n{} / {}".format(
i, corrAnswer, givenAnswer
))
_ = self.__printScores()
print("Final scores")
self._clean_sure()
console = self.__printScores()
tp_lim0 = int(100/self.scores["all"]["count"][0]*self.scores["all"]["tp"][0])
prec_lim0 = self.scores["all"]["precision"][0]
f1_lim0 = self.scores["all"]["f1"][0]
for i, l in enumerate(self.lim):
if l >= self.sureLim:
tp_limsure = int(100/self.scores["all"]["count"][0]*self.scores["all"]["tp"][i])
prec_limsure = self.scores["all"]["precision"][i]
f1_limsure = self.scores["all"]["f1"][i]
break
return self.scores, [tp_lim0, tp_limsure, prec_lim0, prec_limsure, f1_lim0, f1_limsure], console
@abstractmethod
def load_model(self, ModelName):
raise NotImplementedError("load_model has to be defined in each child class")
class GPTModel(Model):
def __init__(self, home_path, model_path="./models", probaMode = "longOk", printStep=10, buckets=10):
os.chdir(home_path)
os.chdir("./transformers/examples/")
os.chdir("./language-modeling")
finetune_path = os.getcwd()
super().__init__(home_path, finetune_path, model_path, probaMode, printStep, buckets)
def _train(self, nbEpochs, outModelName, startCheckpoint, dataEnd, tokenizerLocaction):
cmd = "python run_clm.py \
--model_type {} \
--train_file \"{}\" \
--do_train \
--validation_file \"{}\" \
--per_gpu_train_batch_size 1 \
--save_steps -1 \
--num_train_epochs {} \
--fp16 \
--output_dir=\"{}\" \
".format(
"gpt2",
"train" + dataEnd + ".txt",
"eval" + dataEnd + ".txt",
nbEpochs,
self.model_path + "/" + outModelName)
# --do_eval \
if startCheckpoint is not None:
if startCheckpoint not in ["gpt2", "gpt2-medium", "gpt2-large", "gpt2-xl"]:
startCheckpoint = self.model_path + "/" + startCheckpoint
tokenizerLocaction = startCheckpoint # "gpt2"
# startCheckpoint = "gpt2"
cmd += " --model_name_or_path {}".format(startCheckpoint)
if tokenizerLocaction is not None:
cmd += " --tokenizer_name {}".format(tokenizerLocaction)
return cmd
def __set_modif_gpt(self):
TFGenerationMixin._generate_no_beam_search = mod_gpt._generate_no_beam_search_modif
TFGenerationMixin.generate = mod_gpt.generate_modif
def _generate_demo(self, c, q):
self.keywords = ["<input>", "<answer>", "<find>", "<|endoftext|>"]
query = self.keywords[0] + c + self.keywords[2] + q + self.keywords[1]
self.__set_modif_gpt()
input_ids = self.tokenizer.encode(query, return_tensors='tf')
VERBOSE = "nothing"
generated_text_samples = self.model.generate(
input_ids,
max_length=len(input_ids[0]) + 30,
num_return_sequences=1,
no_repeat_ngram_size=0,
repetition_penalty=1.0,
top_p=1.0,
temperature=1.0,
do_sample=False,
top_k=0,
early_stopping=True,
tokenizer=self.tokenizer,
VERBOSE=VERBOSE,
probaMode=self.probaMode,
num_beams=1,
force2nd=True
)
givenAnswer = generated_text_samples[0]
return givenAnswer, generated_text_samples[1]
def _generate(self, x, y, x_bert, y_bert):
i_len = len(self.keywords[0])
a_len = len(self.keywords[1])
f_len = len(self.keywords[2])
e_len = len(self.keywords[3])
f_start = x.find(self.keywords[2])
question = x[f_start + f_len:-a_len]
self.__set_modif_gpt()
input_ids = self.tokenizer.encode(x, return_tensors='tf')
VERBOSE = "nothing_but_score"
VERBOSE = "nothing"
generated_text_samples = self.model.generate(
input_ids,
max_length=len(input_ids[0]) + 30,
num_return_sequences=1,
no_repeat_ngram_size=0,
repetition_penalty=1.0,
top_p=1.0,
temperature=1.0,
do_sample=False,
top_k=0,
early_stopping=True,
tokenizer=self.tokenizer,
VERBOSE=VERBOSE,
probaMode=self.probaMode,
num_beams=1,
force2nd=True
)
givenAnswer = generated_text_samples[0]
corrAnswer = y[:-len("<|endoftext|>")]
return corrAnswer, givenAnswer, generated_text_samples[1], question
def set_proba_mode(self, mode):
print("Set the proba for gpt2 to {}".format(mode))
if mode in ["mult", "longOk"]:
self.probaMode = mode
else:
raise NotImplementedError("This probability mode is not yet implemented")
def load_model(self, ModelName):
self.model = TFGPT2LMHeadModel.from_pretrained(self.model_path + "/" + ModelName, from_pt=True)
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path + "/" + ModelName)
class BertModel(Model):
def __init__(self, home_path, model_path="./models", probaMode = "mult", printStep=10, buckets=10):
os.chdir(home_path)
os.chdir("./transformers/examples/")
os.chdir("./question-answering")
finetune_path = os.getcwd()
super().__init__(home_path, finetune_path, model_path, probaMode, printStep, buckets)
# private functions
def _train(self, nbEpochs, outModelName, startCheckpoint, dataEnd, tokenizerLocaction):
cmd = "python run_qa.py \
--train_file \"{}\" \
--do_train \
--num_train_epochs \"{}\" \
--output_dir=\"{}\" \
--fp16 \
--save_steps -1 \
".format(
"train" + dataEnd + ".json",
nbEpochs,
self.model_path + "/" + outModelName)
# --validation_file \"{}\" \
# --do_eval \
# "eval" + end + ".json"
if startCheckpoint is not None:
if startCheckpoint not in ["xlm-roberta-base", "roberta-base"]:
startCheckpoint = self.model_path + "/" + startCheckpoint
cmd += " --model_name_or_path=\"{}\"".format(startCheckpoint)
return cmd
def _generate_demo(self, c, q):
inputs = self.tokenizer(q, c, return_tensors='pt')
# generate network output
outputs = self.model(**inputs)
start_scores = outputs.start_logits
end_scores = outputs.end_logits
#elif self.probaMode == "forceNon0":
# force b to be higher then a
# find start and end of the answer
a = torch.argmax(start_scores)
a = int(a)
b = torch.argmax(end_scores[0, a:])
b = int(b)
# get the probability of the answer
probs_a = tf.nn.softmax(start_scores.detach())
probs_b = tf.nn.softmax(end_scores[0, a:].detach())
prob_a = probs_a[0, a]
prob_b = probs_b[b]
prob_ab = prob_a * prob_b
b = a + b
givenAnswer = self.tokenizer.decode(inputs['input_ids'][0][int(a):int(b) + 1])
# due to the tokenizer the answer often starts with a blank space, which is not part of the answer
if len(givenAnswer) > 0 and givenAnswer[0] == " ":
givenAnswer = givenAnswer[1:]
return givenAnswer, float(prob_ab)
def _generate(self, x, y, x_bert, y_bert):
tmp = x.split("---")
text = tmp[1]
question = tmp[0]
# tokenize model input
inputs = self.tokenizer(question, text, return_tensors='pt')
# generate network output
outputs = self.model(**inputs)
start_scores = outputs.start_logits
end_scores = outputs.end_logits
if self.probaMode == "mult":
# find start and end of the answer
a = torch.argmax(start_scores)
b = torch.argmax(end_scores)
a = int(a)
b = int(b)
# get the probability of the answer
probs_a = tf.nn.softmax(start_scores.detach())
probs_b = tf.nn.softmax(end_scores.detach())
prob_a = probs_a[0, a]
prob_b = probs_b[0, b]
prob_ab = prob_a * prob_b
elif self.probaMode == "forceNon0":
# force b to be higher then a
# find start and end of the answer
a = torch.argmax(start_scores)
a = int(a)
b = torch.argmax(end_scores[0, a:])
b = int(b)
# get the probability of the answer
probs_a = tf.nn.softmax(start_scores.detach())
probs_b = tf.nn.softmax(end_scores[0, a:].detach())
prob_a = probs_a[0, a]
prob_b = probs_b[b]
prob_ab = prob_a * prob_b
b = a+b
elif self.probaMode == "maxNon0":
# search for the best non zero sequence
# find start and end of the answer
prob_ab = 0
for a_ in range(len(start_scores[0])):
b_ = torch.argmax(end_scores[0, a_:])
b_ = int(b_)
# get the probability of the answer
probs_a = tf.nn.softmax(start_scores.detach())
probs_b = tf.nn.softmax(end_scores[0, a_:].detach())
prob_a = probs_a[0, a_]
prob_b = probs_b[b_]
prob_ab_ = prob_a * prob_b
if prob_ab_ > prob_ab:
prob_ab = prob_ab_
a = a_
b = a_ + b_
else:
raise NotImplementedError("this probability mode is not implemented yet")
givenAnswer = self.tokenizer.decode(inputs['input_ids'][0][int(a):int(b) + 1])
# due to the tokenizer the answer often starts with a blank space, which is not part of the answer
if len(givenAnswer) > 0 and givenAnswer[0] == " ":
givenAnswer = givenAnswer[1:]
return y, givenAnswer, prob_ab, question
# public functions
def set_proba_mode(self, mode):
print("Set the proba for bert to {}".format(mode))
if mode in ["mult", "forceNon0", "maxNon0"]:
self.probaMode = mode
else:
raise NotImplementedError("this probability mode is not implemented yet")
def load_model(self, ModelName):
path = os.path.join(self.model_path, ModelName)
self.model = AutoModelForQuestionAnswering.from_pretrained(path)
self.tokenizer = AutoTokenizer.from_pretrained(path)
class ComboModel(Model):
def __init__(self, home_path, model_path="./models", probaMode = "mult", printStep=10, buckets=10,
models=[], ModelNames=[], probaModes=[]):
os.chdir(home_path)
os.chdir("./transformers/examples/")
os.chdir("./question-answering")
finetune_path = os.getcwd()
self.ModelNames = ModelNames
self.probaModes = probaModes
self.models = models
super().__init__(home_path, finetune_path, model_path, probaMode, printStep, buckets)
# private functions
def _train(self, nbEpochs, outModelName, startCheckpoints, dataEnd, tokenizerLocaction):
for i, mm in enumerate(self.models):
startCheckpoint = self.ModelNames[i]
self.ModelNames[i] += "_adapt"
cmd = mm._train(nbEpochs, self.ModelNames[i], startCheckpoint, dataEnd, tokenizerLocaction)
print(cmd)
os.chdir(mm.finetune_path)
os.system(cmd)
return ""
def _generate_demo(self, c, q):
pass
return givenAnswer, float(prob_ab)
def _generate(self, x, y, x_bert, y_bert):
givenAnswers = []
confScores = []
for i, mm in enumerate(self.models):
# print("Predict an answer for the model {}".format(i))
if self.probaModes[i] == "longOk":
# print("seems to be gpt2")
corrAnswer, givenAnswer, confScore, question = mm._generate(x, y, None, None)
elif self.probaModes[i] == "forceNon0":
# print("seems to be bert")
corrAnswer, givenAnswer, confScore, question = mm._generate(x_bert, y_bert, None, None)
givenAnswers.append(givenAnswer)
confScores.append(confScore)
# find maximum occuring answer
best_sol = max(set(givenAnswers), key = givenAnswers.count)
occ = givenAnswers.count(best_sol)
if occ <= 1 or (occ == 2 and givenAnswers[1] == givenAnswers[2]):
if random.randint(0,9) < 6:
ind = 0 # choose gpt with 60% chance
else:
ind = random.randint(1, len(givenAnswers)-1) # choose Roberta or XLM-R with 20% chance each
confScore = 0.6*confScores[ind]
best_sol = givenAnswers[ind]
elif occ == 2:
confScore = 0.8
for i, a in enumerate(givenAnswers):
if a == best_sol:
confScore *= confScores[i]
else:
confScore = 1
for i, a in enumerate(givenAnswers):
confScore *= confScores[i]
return corrAnswer, best_sol, confScore, question
# public functions
def set_proba_mode(self, mode):
for i, mm in enumerate(self.models):
print("Set proba mode {} for model {}, {}".format(self.probaModes[i], i, mm))
mm.set_proba_mode(self.probaModes[i])
def load_model(self, ModelName):
for i, mm in enumerate(self.models):
print("Load model {}".format(i))
mm.load_model(self.ModelNames[i])

Event Timeline