Page MenuHomec4science

util.py
No OneTemporary

File Metadata

Created
Sun, Apr 27, 14:15
import numpy as np
import pandas as pd
import xarray as xr
from tqdm import tqdm
import csv
from tables import open_file, Atom, Filters
import h5py
import os
import fasteners # inter-process file lock
import time
import norms
def to_timestamp(dates, hour):
return pd.to_datetime(dates) + pd.to_timedelta(hour, unit = 'h')
def from_timestamp(timestamp):
hour = pd.to_datetime(timestamp).hour
date = pd.to_datetime(timestamp).normalize()
return date, hour
def month_to_dates(year, month):
if month == 12:
next_month = 1
next_year = year + 1
else:
next_month = month + 1
next_year = year
month_start = ('%d%02d01' %(year, month))
next_month_start = ('%d%02d01' %(next_year, next_month))
month_end = pd.date_range(month_start, next_month_start, freq = 'M')[0]
return month_start, month_end.strftime('%Y%m%d')
def normalize(X, Xmax, Xmin, lower_bound = 0, upper_bound = 1):
return lower_bound + (X-Xmin)*(upper_bound-lower_bound)/(Xmax - Xmin)
def rescale_norm(norm,Xmax,Xmin, lower_bound = 0, upper_bound = 1):
return Xmin + (norm-lower_bound)(Xmax-Xmin)/(upper_bound-lower_bound)
def standardize(X, Xmean,Xstd):
Xstd[Xstd == 0] = 1
return (X-Xmean)/(Xstd)
def rescale_std(X,Xmean,Xstd):
return Xmean + X * Xstd
def get_matrix(h5file, return_shape = False):
h5 = open_file(h5file)
for node in h5.walk_nodes():
pass # find a node with whatever name
dt = node.dtype
N, d = node.shape # HDF5 files are transposed, for Matlab compatibility
try:
X = node[:,:]
except:
print("Cannot load full matrix - matrix size probably exceeds memory")
X = []
h5.close() # closing file
if return_shape:
return X, N, d, dt
else:
return X
def make_hdf5(x, h5file):
with h5py.File(h5file, 'w') as f:
dset = f.create_dataset('data', data = x)
def merge_files(Y, M, norm = None, outname = None, batches = False):
path = os.path.split(Y)[0]
body = os.path.splitext(Y)[0]
name = os.path.split(body)[1]
if batches:
norms.merge_files(path, name, M, outname, norm)
return 0,0
else:
f = open_file(os.path.join(path,'%s_00.hdf5' %(name)), "r")
for node in f.walk_nodes():
pass # find a node with whatever name
sigma = np.zeros(node.shape)
sigma_sq = np.zeros(node.shape)
for i in range(M):
h5file = os.path.join(path,'%s_%02d.hdf5' %(name,i))
y = get_matrix(h5file)
os.remove(h5file)
if norm is not None:
y = norm.rescale(y)
sigma += y
sigma_sq += y**2
mean = sigma / M
var = 1.0/M*(sigma_sq-sigma**2/M)
return mean, var
def ranking(x):
m = x.shape[1]
n = x.shape[0]+0.0
idx = range(m)
y = np.zeros(m)
for i in range(m):
s = np.sum(x == i, axis = 0)
y[i] = np.sum(s*idx)/(n*m)
return y
class Timer():
def __init__(self, start = True):
self.walltime = -1
self.cputime = -1
if start:
self.start()
def start(self):
self.walltime = time.time()
self.cputime = time.clock()
def restart(self, print_wallclock = True, print_speedup = False):
self.stop(print_wallclock, print_speedup)
self.start()
def stop(self, print_wallclock = True, print_speedup = False, print_any = True):
self.walltime = time.time() - self.walltime
self.cputime = time.clock() - self.cputime
self.speedup = self.cputime/self.walltime
if print_any:
print("CPU time: %.2f seconds" %self.cputime)
if print_wallclock:
print("Wall clock time: %.2f seconds" %self.walltime)
if print_speedup:
print("Paralellisation speedup: %.2f times" %self.speedup)
class Normalizer():
def __init__(self, filename):
self.load_info(filename)
def load_info(self, filename):
norm_pd = pd.read_csv(filename, index_col = 0)
self.mean = norm_pd.loc['mean',:].as_matrix().astype(np.float)
self.std = norm_pd.loc['std',:].as_matrix().astype(np.float)
self.min = norm_pd.loc['min',:].as_matrix().astype(np.float)
self.max = norm_pd.loc['max',:].as_matrix().astype(np.float)
self.status = norm_pd.loc['status',:].as_matrix()[0]
self.m = len(self.mean)
self.names = norm_pd.columns
self.std[self.std == 0] = 1 # prevent division by zero for std=0
self.file= filename
def normalize(self, X, lower_bound = 0, upper_bound = 1):
if self.status == 'mean':
return self.normalize_mean(X)
elif self.status == 'range':
return self.normalize_range(X, lower_bound, upper_bound)
else:
print('cannot normalize - mode not known')
return X
def rescale(self, X, lower_bound = 0, upper_bound = 1):
if self.status == 'mean':
return self.rescale_mean(X)
elif self.status == 'range':
return self.rescale_range(X, lower_bound, upper_bound)
else:
print('cannot rescale - mode not known')
return X
def normalize_range(self, X, lower_bound = 0, upper_bound = 1):
return lower_bound + (X-self.min)*(upper_bound-lower_bound)/(self.max - self.min)
def rescale_range(self, X, lower_bound = 0, upper_bound = 1):
return self.min + (X-lower_bound)*(self.max-self.min)/(upper_bound-lower_bound)
def normalize_mean(self, X):
return (X-self.mean)/(self.std)
def rescale_mean(self, X):
return self.mean + X * self.std
def set_status(self, new_status):
norm_pd = pd.read_csv(self.file, index_col = 0)
norm_pd.loc['status'] = np.repeat(new_status,self.m).reshape((1,-1))
norm_pd.to_csv(self.file)
self.status = new_status
print("set normalization status to " + new_status)
def load_pickle(self, refname):
ref = pd.read_pickle(refname)
x0 = ref.as_matrix(columns = self.names)
x = self.normalize(x0)
return x0, x
class Norm():
def __init__(self, m = 0, k=100, column_names = None):
if m > 0:
# initialise tables
self.norm_tbl = np.zeros((0,2*m+1))
self.max = np.ones((1,m))*(float('-inf'))
self.min = np.ones((1,m))*(float('inf'))
self.mean = np.zeros((1,m))
self.std = np.zeros((1,m))
self.names = column_names
self.k = k
self.m = m
def update(self, X):
# minimum and maximum
Xmax = np.amax(X,axis=0)
Xmin = np.amin(X,axis=0)
self.max = np.maximum(self.max,Xmax)
self.min = np.minimum(self.min,Xmin)
# mean and std
divided_sum_of_elements = np.sum(X,axis = 0)/self.k
divided_squared_sum_of_elements = np.sum(np.square(X/self.k),axis = 0)
n_items = X.shape[0]
new_row = np.hstack([divided_sum_of_elements,divided_squared_sum_of_elements,n_items])
self.norm_tbl = np.vstack([self.norm_tbl, new_row])
def evaluate(self):
sigma = np.sum(self.norm_tbl[:,:self.m], axis=0)
sigma_sq = np.sum(self.norm_tbl[:,self.m:-1], axis=0)
N = np.sum(self.norm_tbl[:,-1])
self.mean = sigma * self.k/N
self.std = np.sqrt(self.k**2/(N)*(sigma_sq-np.square(sigma)/N))
def make_table(self):
tbl = np.vstack([self.mean,self.std,self.max,self.min])
lst = ['mean', 'std', 'max', 'min']
tbl = pd.DataFrame(data = tbl, columns = self.names, index = lst)
sts = pd.DataFrame(data = np.repeat('none',self.m).reshape((1,-1)), index = ['status'], columns = self.names)
return pd.concat([tbl, sts])
class Table_Writer():
def __init__(self, path, variables, name):
self.path = path
self.cols = variables
self.name = name
self.m = len(self.cols)
self.n = 0
self.make_norm = False
def open_hdf5(self, add_norm = True, k = 100, dtype = 'Float64'):
filename = os.path.join(self.path, self.name + ".hdf5")
if os.path.exists(filename):
print('Overwriting %s...' %self.name)
self.file = h5py.File(filename, "w")
self.ds = self.file.create_dataset(self.name, (0,self.m), maxshape=(None, self.m), dtype = dtype)
if add_norm:
self.make_norm = True
self.norm = Norm(self.m, k, self.cols)
def write_hdf5(self, arr, table = True):
if table:
arr = arr.as_matrix(columns = self.cols)
idx = self.ds.shape[0]
self.ds.resize(self.ds.shape[0]+arr.shape[0], axis=0)
self.ds[idx:,:] = arr
if self.make_norm:
self.norm.update(arr)
self.n = self.ds.shape[0]
def close_hdf5(self):
self.file.close()
if self.make_norm:
self.norm.evaluate()
return self.norm.make_table()
return None
class Split_Training():
def __init__(self,path, data_type, set_norm, norm_type, split_ratio):
data_path = os.path.join(path, 'train')
train_name = 'train_' + data_type
val_name = 'val_' + data_type
train_found = os.path.exists(os.path.join(data_path, train_name+'.hdf5'))
val_found = os.path.exists(os.path.join(data_path, val_name+'.hdf5'))
self.file_found = train_found or val_found
self.file = os.path.join(data_path, data_type + '.hdf5')
# ASSUME THAT norm_file IS LOCATED ONE LEVEL ABOVE HDF5 files
norm_file = os.path.join(path, "norm_"+data_type+".csv")
self.norm = Normalizer(norm_file)
if set_norm:
self.norm.set_status(norm_type)
self.train = Table_Writer(data_path, self.norm.names, train_name)
self.val = Table_Writer(data_path, self.norm.names, val_name)
self.split_ratio = split_ratio
def open_all(self):
self.train.open_hdf5(add_norm = False)
self.val.open_hdf5(add_norm = False)
def split_and_norm(self, X, nums = None):
n = X.shape[0]
normed_data = self.norm.normalize(X)
if nums is None:
nums = np.random.choice([0, 1], size=n, p=[1-self.split_ratio, self.split_ratio])
tr = normed_data[nums==1,:]
vl = normed_data[nums==0,:]
self.train.write_hdf5(tr, table = False)
self.val.write_hdf5(vl, table = False)
return nums
def close_all(self):
self.train.close_hdf5()
self.val.close_hdf5()

Event Timeline