Page MenuHomec4science

util_hdf5.py
No OneTemporary

File Metadata

Created
Fri, Jun 27, 21:58

util_hdf5.py

import numpy as np
import pandas as pd
import xarray as xr
import csv
from tables import open_file, Atom, Filters
import h5py
import os
import fasteners # inter-process file lock
import time
import norms
import pickle
import util
def get_matrix_pointer(h5file):
# open hdf5 file
h5 = open_file(h5file)
# detect the only dataset in the file, which will be pointed to by 'node'
for node in h5.walk_nodes():
pass # find a node with whatever name
return node, h5
def get_matrix(h5file, return_shape = False):
node, h5 = get_matrix_pointer(h5file)
dt = node.dtype
N, d = node.shape # HDF5 files are transposed, for Matlab compatibility
try:
X = node[:,:]
except:
print("Cannot load full matrix - matrix size probably exceeds memory")
X = []
h5.close() # closing file
if return_shape:
return X, N, d, dt
else:
return X
def write_hdf5(data, file):
# check if data exists and delete file if necessary
if os.path.exists(file):
os.remove(file)
make_hdf5(data, file)
def make_hdf5(x, h5file):
with h5py.File(h5file, 'w') as f:
dset = f.create_dataset('data', data = x)
######################### NORMALIZATION ########################################
class Normalizer():
def __init__(self, filename):
self.load_info(filename)
def load_info(self, filename):
norm_pd = pd.read_csv(filename, index_col = 0)
self.mean = norm_pd.loc['mean',:].values.astype(np.float)
self.std = norm_pd.loc['std',:].values.astype(np.float)
self.min = norm_pd.loc['min',:].values.astype(np.float)
self.max = norm_pd.loc['max',:].values.astype(np.float)
self.status = norm_pd.loc['status',:].values[0]
self.m = len(self.mean)
self.names = norm_pd.columns
self.std[self.std == 0] = 1 # prevent division by zero for std=0
self.file= filename
def normalize(self, X, lower_bound = 0, upper_bound = 1):
if self.status == 'mean':
return self.normalize_mean(X)
elif self.status == 'range':
return self.normalize_range(X, lower_bound, upper_bound)
else:
print('cannot normalize - mode not known')
return X
def rescale(self, X, lower_bound = 0, upper_bound = 1):
if self.status == 'mean':
return self.rescale_mean(X)
elif self.status == 'range':
return self.rescale_range(X, lower_bound, upper_bound)
else:
print('cannot rescale - mode not known')
return X
def normalize_range(self, X, lower_bound = 0, upper_bound = 1):
return lower_bound + (X-self.min)*(upper_bound-lower_bound)/(self.max - self.min)
def rescale_range(self, X, lower_bound = 0, upper_bound = 1):
return self.min + (X-lower_bound)*(self.max-self.min)/(upper_bound-lower_bound)
def normalize_mean(self, X):
return (X-self.mean)/(self.std)
def rescale_mean(self, X):
return self.mean + X * self.std
def set_status(self, new_status):
norm_pd = pd.read_csv(self.file, index_col = 0)
norm_pd.loc['status'] = np.repeat(new_status,self.m).reshape((1,-1))
norm_pd.to_csv(self.file)
self.status = new_status
print("set normalization status to " + new_status)
def load_pickle(self, refname):
ref = pd.read_pickle(refname)
x0 = ref.as_matrix(columns = self.names)
x = self.normalize(x0)
return x0, x
class Norm():
def __init__(self, m = 0, k=100, column_names = None):
if m > 0:
# initialise tables
self.norm_tbl = np.zeros((0,2*m+1))
self.max = np.ones((1,m))*(float('-inf'))
self.min = np.ones((1,m))*(float('inf'))
self.mean = np.zeros((1,m))
self.std = np.zeros((1,m))
self.names = column_names
self.k = k
self.m = m
def update(self, X):
# minimum and maximum
Xmax = np.amax(X,axis=0)
Xmin = np.amin(X,axis=0)
self.max = np.maximum(self.max,Xmax)
self.min = np.minimum(self.min,Xmin)
# mean and std
divided_sum_of_elements = np.sum(X,axis = 0)/self.k
divided_squared_sum_of_elements = np.sum(np.square(X/self.k),axis = 0)
n_items = X.shape[0]
new_row = np.hstack([divided_sum_of_elements,divided_squared_sum_of_elements,n_items])
self.norm_tbl = np.vstack([self.norm_tbl, new_row])
def evaluate(self):
sigma = np.sum(self.norm_tbl[:,:self.m], axis=0)
sigma_sq = np.sum(self.norm_tbl[:,self.m:-1], axis=0)
N = np.sum(self.norm_tbl[:,-1])
self.mean = sigma * self.k/N
self.std = np.sqrt(self.k**2/(N)*(sigma_sq-np.square(sigma)/N))
def make_table(self):
tbl = np.vstack([self.mean,self.std,self.max,self.min])
lst = ['mean', 'std', 'max', 'min']
tbl = pd.DataFrame(data = tbl, columns = self.names, index = lst)
sts = pd.DataFrame(data = np.repeat('none',self.m).reshape((1,-1)), index = ['status'], columns = self.names)
return pd.concat([tbl, sts])
####################### OPERATIONS ON HDF5 TABLES ###############################
def merge_files(Y, M, norm = None, outname = None, batches = False):
path = os.path.split(Y)[0]
body = os.path.splitext(Y)[0]
name = os.path.split(body)[1]
if batches:
norms.merge_files(path, name, M, outname, norm)
return 0,0
else:
f = open_file(os.path.join(path,'%s_00.hdf5' %(name)), "r")
for node in f.walk_nodes():
pass # find a node with whatever name
sigma = np.zeros(node.shape)
sigma_sq = np.zeros(node.shape)
for i in range(M):
h5file = os.path.join(path,'%s_%02d.hdf5' %(name,i))
y = get_matrix(h5file)
os.remove(h5file)
if norm is not None:
y = norm.rescale(y)
sigma += y
sigma_sq += y**2
mean = sigma / M
var = 1.0/M*(sigma_sq-sigma**2/M)
return mean, var
class Table_Writer():
def __init__(self, path, variables, name):
self.path = path
self.cols = variables
self.name = name
self.m = len(self.cols)
self.n = 0
self.make_norm = False
def open_hdf5(self, add_norm = True, k = 100, dtype = 'Float64'):
filename = os.path.join(self.path, self.name + ".hdf5")
if os.path.exists(filename):
print('Overwriting %s...' %self.name)
self.file = h5py.File(filename, "w")
self.ds = self.file.create_dataset(self.name, (0,self.m), maxshape=(None, self.m), dtype = dtype)
if add_norm:
self.make_norm = True
self.norm = Norm(self.m, k, self.cols)
def write_hdf5(self, arr, table = True):
if table:
arr = arr.as_matrix(columns = self.cols)
idx = self.ds.shape[0]
self.ds.resize(self.ds.shape[0]+arr.shape[0], axis=0)
self.ds[idx:,:] = arr
if self.make_norm:
self.norm.update(arr)
self.n = self.ds.shape[0]
def close_hdf5(self):
self.file.close()
if self.make_norm:
self.norm.evaluate()
return self.norm.make_table()
return None
class Split_Training():
def __init__(self,path, data_type, set_norm, norm_type, split_ratio):
data_path = os.path.join(path, 'train')
train_name = 'train_' + data_type
val_name = 'val_' + data_type
train_found = os.path.exists(os.path.join(data_path, train_name+'.hdf5'))
val_found = os.path.exists(os.path.join(data_path, val_name+'.hdf5'))
self.file_found = train_found or val_found
self.file = os.path.join(data_path, data_type + '.hdf5')
# ASSUME THAT norm_file IS LOCATED ONE LEVEL ABOVE HDF5 files
norm_file = os.path.join(path, "norm_"+data_type+".csv")
self.norm = Normalizer(norm_file)
if set_norm:
self.norm.set_status(norm_type)
self.train = Table_Writer(data_path, self.norm.names, train_name)
self.val = Table_Writer(data_path, self.norm.names, val_name)
self.split_ratio = split_ratio
def open_all(self):
self.train.open_hdf5(add_norm = False)
self.val.open_hdf5(add_norm = False)
def split_and_norm(self, X, nums = None):
n = X.shape[0]
normed_data = self.norm.normalize(X)
if nums is None:
nums = np.random.choice([0, 1], size=n, p=[1-self.split_ratio, self.split_ratio])
tr = normed_data[nums==1,:]
vl = normed_data[nums==0,:]
self.train.write_hdf5(tr, table = False)
self.val.write_hdf5(vl, table = False)
return nums
def close_all(self):
self.train.close_hdf5()
self.val.close_hdf5()

Event Timeline