Page MenuHomec4science

functions.py
No OneTemporary

File Metadata

Created
Sun, Feb 23, 22:20

functions.py

# Martin Fontanet, Dennis Gankin, Vikalp Kamdar
import numpy as np
#replaces every -999 by the mean of it's column. (NOT USED ANYMORE)
def replace_outl_by_mean(x_tr,x_te):
keep_tr = (x_tr!=-999)
keep_te = (x_te!=-999)
nb_each = np.sum(keep_tr,axis=0) + np.sum(keep_te,axis=0)
x_tr_without_extr = keep_tr*x_tr
x_te_without_extr = keep_te*x_te
sum_tr = np.sum(x_tr_without_extr,axis=0)
sum_te = np.sum(x_te_without_extr,axis=0)
mean = (sum_tr + sum_te)/nb_each
#print(x_without_extr)
new_x_tr = x_tr_without_extr + (x_tr==-999)*mean
new_x_te = x_te_without_extr + (x_te==-999)*mean
return new_x_tr, new_x_te
# normalizes the data
def normalize_data(x_tr, x_te):
minimum = min(np.min(x_tr),np.min(x_te))
maximum = max(np.max(x_tr),np.max(x_te))
range_x = maximum - minimum
x_tr -= minimum
x_te -= minimum
x_tr = x_tr/range_x
x_te = x_te/range_x
return x_tr, x_te
# Standardize taking into account both the training and the testing sets
def standardize_data(x_tr, x_te):
"""Standardize the original data set."""
x = np.concatenate([x_tr,x_te])
mean_x = np.mean(x,axis=0)
x_tr = x_tr - mean_x
x_te = x_te - mean_x
std_x = np.std(x,axis=0)
x_tr = x_tr / std_x
x_te = x_te / std_x
return x_tr, x_te
# Same as build_poly, but for a matrix
def build_poly_matrix(x,degree):
tx = np.tile(x,(1,degree))
degrees = np.arange(degree)+1
powers = np.tile(degrees,(x.shape[1],1)).T.ravel()
result = np.power(tx,powers)
return np.c_[np.ones(x.shape[0]),result]
def calculate_loss(y, tx, w):
"""compute the cost by negative log likelihood."""
return np.sum(np.log(1 + np.e**(tx@w)) - y*tx@w)
def calculate_gradient(y, tx, w):
"""compute the gradient of loss."""
return tx.T@(sigmoid(tx@w)-y)
def least_squares(y, tx):
"""calculate the least squares solution."""
X = np.transpose(tx)@tx
Y = np.transpose(tx)@y
w = np.linalg.pinv(X)@Y
mse = np.mean(np.power(y-tx@w,2))/2
return mse, w
# Computes the score obtained when trying to predict y with w and tx
def compute_score(y,tx,w):
return len(np.where(predict_labels(w,tx).ravel()==y.ravel()*2-1)[0])/len(y)
def create_submission(new_x_tr,new_x_te,w):
x_tr, x_te = standardize_data(new_x_tr, new_x_te)
tx_te = build_poly_matrix(x_te,degree)
y_pred = predict_labels(w, tx_te)
create_csv_submission(ids_te, y_pred, "new_submission.csv")
def compute_new_x(x_tr_noisy, x_te_noisy):
# We create large matrices that contain both the training and testing samples
original_set = np.concatenate([x_tr_noisy,x_te_noisy])
data_set = np.concatenate([x_tr_noisy,x_te_noisy])
# We check which columns contain -999 values
columns_to_change = np.unique(np.where(data_set==-999)[1])
# We check which columns don't contain any -999 values
columns_ok = np.unique(np.where(np.all(data_set!=-999,axis=0)))
positive_columns = []
to_change = []
for col in columns_to_change:
# We set rows that contain -999 in the specified column as test sample set, and the other as train sample set
test_rows = np.unique(np.where(data_set.T[col]==-999))
train_rows = np.unique(np.where(data_set.T[col]!=-999))
# We only keep the columns that don't contain any -999 in the equation
x_train = data_set[train_rows].T[columns_ok].T
x_test = data_set[test_rows].T[columns_ok].T
y_train = data_set[train_rows,col]
y_train = np.expand_dims(y_train,axis=1)
y_test = data_set[test_rows,col]
y_test = np.expand_dims(y_test,axis=1)
# If the minimum value of the column (without taking -999 values into account) is positive, we mark the column as "positive" column (= it should not contain any negative number)
if(np.min(original_set.T[col] * (original_set.T[col]!=-999)) >=0):
positive_columns.append(col)
losses = []
# We try to predict the values with polynomial basis from degree 1 to 4, and we keep the best (= the one with the lowest loss)
for deg in range(1,5):
tx_train = build_poly_matrix(x_train,deg)
loss, w = least_squares(y_train,tx_train)
losses.append(loss)
# If the loss is too high, we mark this column so we don't transform the -999 values
if(min(losses)>10):
to_change.append(col)
# We check which degree is the best
for i in range(len(losses)):
if(losses[i] == min(losses)):
deg = i+1
# We predict the values
tx_train = build_poly_matrix(x_train,deg)
tx_test = build_poly_matrix(x_test,deg)
loss, w = least_squares(y_train,tx_train)
data_set[test_rows,col] = (tx_test@w).ravel()
for col_change in to_change:
data_set.T[col_change] = original_set.T[col_change]
# We separate the training and testint samples
new_x_tr = data_set[:len(y_tr)]
new_x_te = data_set[len(y_tr):]
return new_x_tr, new_x_te
# Newton's method
def learning_by_penalized_gradient(y, tx, w, gamma, lambda_):
loss,gradient,hessian = penalized_logistic_regression(y,tx,w,lambda_)
# ***************************************************
w_new = w-np.linalg.pinv(hessian)@gradient
return loss, w_new
# provides everything needed for the penalized logistic regression
def penalized_logistic_regression(y, tx, w, lambda_):
"""return the loss, gradient, and hessian."""
loss = calculate_log_loss(y, tx, w)
gradient = calculate_log_gradient(y,tx,w) + lambda_*w
hessian = calculate_hessian(y,tx,w) + lambda_*np.identity(len(w))
return loss,gradient,hessian
def sigmoid(t):
return 1/(1+np.power(np.e,-t))
def calculate_log_loss(y, tx, w):
"""compute the cost by negative log likelihood."""
return np.sum(np.log(1 + np.exp(tx@w)) - y*tx@w)
def calculate_log_gradient(y, tx, w):
"""compute the gradient of loss."""
return tx.T@(sigmoid(tx@w)-y)
def calculate_hessian(y, tx, w):
"""return the hessian of the loss function."""
S = sigmoid(tx@w)*(1-sigmoid(tx@w))
S = np.identity(len(S))*S
return (tx.T@S@tx)
# Applies Newton's method on the samples from indexes start to stop.
def batch_penalized_newton(y,tx,w,start,stop):
max_iter = 100
threshold = 1e-8
gamma= 0.2
lambda_ = 0.000000002
losses = []
# start the logistic regression
best_w = w
for iter in range(max_iter):
# get loss and update w.
loss, w = learning_by_penalized_gradient(y[start:stop], tx[start:stop], w, gamma, lambda_)
losses.append(loss)
# We keep track of the best w
if(loss == np.min(losses)):
best_w = w
# If the loss is suddenly infinite, we break (and keep the best w)
if(loss == np.inf):
break
# If the loss is decreasing too slowly, we break
if len(losses) > 1 and np.abs(losses[-1] - losses[-2]) < threshold:
break
return best_w.T
# Computes the best weight by computing an average of every w
def best_mean(w_list, best_w, best_score):
# We check the best combination of w (the best mean)
for i in range(1,len(w_list)+1):
best_w_list = []
# We create a list with the i first weights in w_list
for sco, ww in w_list[:i]:
best_w_list.append(ww)
w_temp = np.mean(best_w_list,axis=0)
new_score = compute_score(y_tr,tx_tr,w_temp.T)
if(best_score < new_score):
best_w = w_temp
best_score = new_score
return best_score, best_w

Event Timeline