Page MenuHomec4science

util.py
No OneTemporary

File Metadata

Created
Sat, Jun 28, 04:05
import numpy as np
import pandas as pd
import xarray as xr
from tqdm import tqdm
import csv
from tables import open_file, Atom, Filters
import h5py
import os
import fasteners # inter-process file lock
import time
import norms
import pickle
############ MISCALLANEOUS ####################################
def find_name(filename):
path, name = os.path.split(filename)
root, ext = os.path.splitext(name)
i = 0
while os.path.exists(filename):
i += 1
filename = os.path.join(path, str(i) + '_' + root + ext)
return filename
def ranking(x):
m = x.shape[1]
n = x.shape[0]+0.0
idx = range(m)
y = np.zeros(m)
for i in range(m):
s = np.sum(x == i, axis = 0)
y[i] = np.sum(s*idx)/(n*m)
return y
def merge_models(dataset, savefile = None):
# inputs:
# - dataset: python lambda of structure dataset = lambda month : ("PATH_%02d_file" %month)
# - savefile: name of target file
merged_data = []
for month in range(1,13):
new_data = xr.open_dataset(dataset(month)).drop('z')
new_data.coords['date'] = pd.to_datetime('2001-%02d-15' %month)
merged_data.append(new_data)
merged_data = xr.concat(merged_data, dim = 'date')
if savefile is not None:
merged_data.to_netcdf(savefile)
return merged_data
def round_to_interval(arr, interval):
return np.round( arr / interval ) * interval
############# OPERATIONS ON PANDAS TABLES #####################
def to_pandas(xarray, coords = None):
df = xarray.to_dataframe().dropna().reset_index()
if coords is not None:
df = df.loc[:,coords]
return df
def to_xarray(df, index):
return df.set_index(index).to_xarray()
def attach_col_to_table(table, new_table, merge_column, value_column, value_col_target = None, how = 'left'):
# merge column in new_table with table
merged_table = table.merge(new_table.loc[ :,[merge_column, value_column] ], on = merge_column, how = how)
# change name of column if required
if value_col_target is not None:
merged_table = merged_table.rename( {value_column : value_col_target} , axis = 1)
return merged_table
####################### TIMESTAMP OPERATIONS ###################
def to_timestamp(dates, hour):
return pd.to_datetime(dates) + pd.to_timedelta(hour, unit = 'h')
def from_timestamp(timestamp):
hour = pd.to_datetime(timestamp).hour
date = pd.to_datetime(timestamp).normalize()
return date, hour
def list_of_times_to_timestamp(year, month, day, hour):
df = pd.DataFrame({'year' : year,
'month' : month,
'day' : day,
'hour' : hour})
return pd.to_datetime(df)
def month_to_dates(year, month):
if month == 12:
next_month = 1
next_year = year + 1
else:
next_month = month + 1
next_year = year
month_start = ('%d%02d01' %(year, month))
next_month_start = ('%d%02d01' %(next_year, next_month))
month_end = pd.date_range(month_start, next_month_start, freq = 'M')[0]
return month_start, month_end.strftime('%Y%m%d')
def get_yearly_sum( ds, var ):
# create an xarray datasets that contains, for each timestamp in "ds", the number of repetitions of that hour in the month
# and sums all values over a year
days_of_each_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
month = range(1,13)
days_of_month = pd.DataFrame( data = [ month, days_of_each_month ], index = ['month', 'n_days'] ).transpose()
timestamps = ds.timestamp.to_dataframe()
timestamps['month'] = timestamps.timestamp.dt.month
days_of_month = timestamps.merge( days_of_month, on = 'month', how = 'left' ).drop( columns = 'month' ).set_index('timestamp').to_xarray()
return (ds[var] * days_of_month.n_days).sum( dim = 'timestamp' )
#################### READ AND WRITE FILES ######################
def write_row_to_csv( filename, content_vector, initialize = False, delimiter = ','):
# writes a row to a csv file
if initialize: open_mode = 'w'
else: open_mode = 'a'
with open(filename, open_mode) as csvfile:
w = csv.writer(csvfile, delimiter=delimiter)
w.writerow( content_vector )
def read_csv_with_index( datafile, indexfile, data_sep = ' ', index_sep = ',' ):
# reads a csv or text file, and the according column names from a different file
column_names = pd.read_csv(indexfile, sep = index_sep).columns
return pd.read_csv(datafile, sep = data_sep, header = None, names = column_names)
def read_csv_rows( fp, start, batch_size, usecols = None ):
field_names = pd.read_csv( fp, nrows = 0).columns
return pd.read_csv(fp, skiprows = start + 1, nrows = batch_size, header = None, names = field_names, usecols = usecols)
def load_pickle(filename):
with open(filename, 'rb') as handle:
file = pickle.load(handle)
return file
def load_netcdf_parts(file_root, concat_dim, batch_dict = None):
data = []
for i in range(10):
data.append(xr.open_mfdataset(file_root + '_%d*' %i, chunks = batch_dict))
return xr.concat(data, dim = concat_dim).sortby(concat_dim)
def read_ascii(filename, n_header_rows = 6):
raster = np.loadtxt(filename, skiprows = n_header_rows)
metadata = pd.read_csv(filename, delim_whitespace = True, nrows = n_header_rows, header = None)
# add suitable column names to metadata
metadata.rename({ 0 : 'type', 1 : 'value'}, axis = 1, inplace = True)
metadata.set_index('type', inplace = True)
return { 'raster' : raster,
'metadata' : metadata }
def get_matrix_pointer(h5file):
# open hdf5 file
h5 = open_file(h5file)
# detect the only dataset in the file, which will be pointed to by 'node'
for node in h5.walk_nodes():
pass # find a node with whatever name
return node, h5
def get_matrix(h5file, return_shape = False):
node, h5 = get_matrix_pointer(h5file)
dt = node.dtype
N, d = node.shape # HDF5 files are transposed, for Matlab compatibility
try:
X = node[:,:]
except:
print("Cannot load full matrix - matrix size probably exceeds memory")
X = []
h5.close() # closing file
if return_shape:
return X, N, d, dt
else:
return X
def write_hdf5(data, file):
# check if data exists and delete file if necessary
if os.path.exists(file):
os.remove(file)
make_hdf5(data, file)
def make_hdf5(x, h5file):
with h5py.File(h5file, 'w') as f:
dset = f.create_dataset('data', data = x)
########################### TIMER ###########################################
class Timer():
def __init__(self, start = True):
self.walltime = -1
self.cputime = -1
if start:
self.start()
def start(self):
self.walltime = time.time()
self.cputime = time.clock()
def restart(self, print_wallclock = True, print_speedup = False):
self.stop(print_wallclock, print_speedup)
self.start()
def stop(self, print_wallclock = True, print_speedup = False, print_any = True):
self.walltime = time.time() - self.walltime
self.cputime = time.clock() - self.cputime
self.speedup = self.cputime/self.walltime
if print_any:
print("CPU time: %.2f seconds" %self.cputime)
if print_wallclock:
print("Wall clock time: %.2f seconds" %self.walltime)
if print_speedup:
print("Paralellisation speedup: %.2f times" %self.speedup)
######################### NORMALIZATION ########################################
def normalize(X, Xmax, Xmin, lower_bound = 0, upper_bound = 1):
return lower_bound + (X-Xmin)*(upper_bound-lower_bound)/(Xmax - Xmin)
def rescale_norm(norm,Xmax,Xmin, lower_bound = 0, upper_bound = 1):
return Xmin + (norm-lower_bound)(Xmax-Xmin)/(upper_bound-lower_bound)
def standardize(X, Xmean,Xstd):
Xstd[Xstd == 0] = 1
return (X-Xmean)/(Xstd)
def rescale_std(X,Xmean,Xstd):
return Xmean + X * Xstd
class Normalizer():
def __init__(self, filename):
self.load_info(filename)
def load_info(self, filename):
norm_pd = pd.read_csv(filename, index_col = 0)
self.mean = norm_pd.loc['mean',:].values.astype(np.float)
self.std = norm_pd.loc['std',:].values.astype(np.float)
self.min = norm_pd.loc['min',:].values.astype(np.float)
self.max = norm_pd.loc['max',:].values.astype(np.float)
self.status = norm_pd.loc['status',:].values[0]
self.m = len(self.mean)
self.names = norm_pd.columns
self.std[self.std == 0] = 1 # prevent division by zero for std=0
self.file= filename
def normalize(self, X, lower_bound = 0, upper_bound = 1):
if self.status == 'mean':
return self.normalize_mean(X)
elif self.status == 'range':
return self.normalize_range(X, lower_bound, upper_bound)
else:
print('cannot normalize - mode not known')
return X
def rescale(self, X, lower_bound = 0, upper_bound = 1):
if self.status == 'mean':
return self.rescale_mean(X)
elif self.status == 'range':
return self.rescale_range(X, lower_bound, upper_bound)
else:
print('cannot rescale - mode not known')
return X
def normalize_range(self, X, lower_bound = 0, upper_bound = 1):
return lower_bound + (X-self.min)*(upper_bound-lower_bound)/(self.max - self.min)
def rescale_range(self, X, lower_bound = 0, upper_bound = 1):
return self.min + (X-lower_bound)*(self.max-self.min)/(upper_bound-lower_bound)
def normalize_mean(self, X):
return (X-self.mean)/(self.std)
def rescale_mean(self, X):
return self.mean + X * self.std
def set_status(self, new_status):
norm_pd = pd.read_csv(self.file, index_col = 0)
norm_pd.loc['status'] = np.repeat(new_status,self.m).reshape((1,-1))
norm_pd.to_csv(self.file)
self.status = new_status
print("set normalization status to " + new_status)
def load_pickle(self, refname):
ref = pd.read_pickle(refname)
x0 = ref.as_matrix(columns = self.names)
x = self.normalize(x0)
return x0, x
class Norm():
def __init__(self, m = 0, k=100, column_names = None):
if m > 0:
# initialise tables
self.norm_tbl = np.zeros((0,2*m+1))
self.max = np.ones((1,m))*(float('-inf'))
self.min = np.ones((1,m))*(float('inf'))
self.mean = np.zeros((1,m))
self.std = np.zeros((1,m))
self.names = column_names
self.k = k
self.m = m
def update(self, X):
# minimum and maximum
Xmax = np.amax(X,axis=0)
Xmin = np.amin(X,axis=0)
self.max = np.maximum(self.max,Xmax)
self.min = np.minimum(self.min,Xmin)
# mean and std
divided_sum_of_elements = np.sum(X,axis = 0)/self.k
divided_squared_sum_of_elements = np.sum(np.square(X/self.k),axis = 0)
n_items = X.shape[0]
new_row = np.hstack([divided_sum_of_elements,divided_squared_sum_of_elements,n_items])
self.norm_tbl = np.vstack([self.norm_tbl, new_row])
def evaluate(self):
sigma = np.sum(self.norm_tbl[:,:self.m], axis=0)
sigma_sq = np.sum(self.norm_tbl[:,self.m:-1], axis=0)
N = np.sum(self.norm_tbl[:,-1])
self.mean = sigma * self.k/N
self.std = np.sqrt(self.k**2/(N)*(sigma_sq-np.square(sigma)/N))
def make_table(self):
tbl = np.vstack([self.mean,self.std,self.max,self.min])
lst = ['mean', 'std', 'max', 'min']
tbl = pd.DataFrame(data = tbl, columns = self.names, index = lst)
sts = pd.DataFrame(data = np.repeat('none',self.m).reshape((1,-1)), index = ['status'], columns = self.names)
return pd.concat([tbl, sts])
####################### OPERATIONS ON HDF5 TABLES ###############################
def merge_files(Y, M, norm = None, outname = None, batches = False):
path = os.path.split(Y)[0]
body = os.path.splitext(Y)[0]
name = os.path.split(body)[1]
if batches:
norms.merge_files(path, name, M, outname, norm)
return 0,0
else:
f = open_file(os.path.join(path,'%s_00.hdf5' %(name)), "r")
for node in f.walk_nodes():
pass # find a node with whatever name
sigma = np.zeros(node.shape)
sigma_sq = np.zeros(node.shape)
for i in range(M):
h5file = os.path.join(path,'%s_%02d.hdf5' %(name,i))
y = get_matrix(h5file)
os.remove(h5file)
if norm is not None:
y = norm.rescale(y)
sigma += y
sigma_sq += y**2
mean = sigma / M
var = 1.0/M*(sigma_sq-sigma**2/M)
return mean, var
class Table_Writer():
def __init__(self, path, variables, name):
self.path = path
self.cols = variables
self.name = name
self.m = len(self.cols)
self.n = 0
self.make_norm = False
def open_hdf5(self, add_norm = True, k = 100, dtype = 'Float64'):
filename = os.path.join(self.path, self.name + ".hdf5")
if os.path.exists(filename):
print('Overwriting %s...' %self.name)
self.file = h5py.File(filename, "w")
self.ds = self.file.create_dataset(self.name, (0,self.m), maxshape=(None, self.m), dtype = dtype)
if add_norm:
self.make_norm = True
self.norm = Norm(self.m, k, self.cols)
def write_hdf5(self, arr, table = True):
if table:
arr = arr.as_matrix(columns = self.cols)
idx = self.ds.shape[0]
self.ds.resize(self.ds.shape[0]+arr.shape[0], axis=0)
self.ds[idx:,:] = arr
if self.make_norm:
self.norm.update(arr)
self.n = self.ds.shape[0]
def close_hdf5(self):
self.file.close()
if self.make_norm:
self.norm.evaluate()
return self.norm.make_table()
return None
class Split_Training():
def __init__(self,path, data_type, set_norm, norm_type, split_ratio):
data_path = os.path.join(path, 'train')
train_name = 'train_' + data_type
val_name = 'val_' + data_type
train_found = os.path.exists(os.path.join(data_path, train_name+'.hdf5'))
val_found = os.path.exists(os.path.join(data_path, val_name+'.hdf5'))
self.file_found = train_found or val_found
self.file = os.path.join(data_path, data_type + '.hdf5')
# ASSUME THAT norm_file IS LOCATED ONE LEVEL ABOVE HDF5 files
norm_file = os.path.join(path, "norm_"+data_type+".csv")
self.norm = Normalizer(norm_file)
if set_norm:
self.norm.set_status(norm_type)
self.train = Table_Writer(data_path, self.norm.names, train_name)
self.val = Table_Writer(data_path, self.norm.names, val_name)
self.split_ratio = split_ratio
def open_all(self):
self.train.open_hdf5(add_norm = False)
self.val.open_hdf5(add_norm = False)
def split_and_norm(self, X, nums = None):
n = X.shape[0]
normed_data = self.norm.normalize(X)
if nums is None:
nums = np.random.choice([0, 1], size=n, p=[1-self.split_ratio, self.split_ratio])
tr = normed_data[nums==1,:]
vl = normed_data[nums==0,:]
self.train.write_hdf5(tr, table = False)
self.val.write_hdf5(vl, table = False)
return nums
def close_all(self):
self.train.close_hdf5()
self.val.close_hdf5()

Event Timeline