Page MenuHomec4science

norms.py
No OneTemporary

File Metadata

Created
Mon, Apr 28, 06:38

norms.py

import numpy as np
import pandas as pd
import xarray as xr
from tqdm import tqdm
import csv
from tables import open_file, Atom, Filters
import os
import fasteners # inter-process file lock
import util
import h5py
def normalize_and_split(model_path,
set_norm, norm_type_ftrs = 'mean', norm_type_tgts = 'range',
force_normalization = False, batch = None,
split_ratio = 0.8):
features = util.Split_Training(model_path, 'features', set_norm, norm_type_ftrs, split_ratio)
targets = util.Split_Training(model_path, 'targets', set_norm, norm_type_tgts, split_ratio)
if features.file_found and not force_normalization:
print("Normalised feature files already exist - set force_normalization to true to re-compute the file")
return
if targets.file_found and not force_normalization:
print("Normalised target files already exist - set force_normalization to true to re-compute the file")
return
features.open_all()
targets.open_all()
h5 = open_file(features.file, "r")
for node in h5.walk_nodes():
pass # find a node with whatever name
dt = node.dtype
N, d = node.shape # HDF5 files are transposed, for Matlab compatibility
if batch is None:
batch = node.chunkshape[0]
nb = int(N/batch)
if N > nb*batch:
nb += 1 # add last incomplete step
h5_tgts = open_file(targets.file, "r")
for node_tgts in h5_tgts.walk_nodes():
pass # find a node with whatever name
N_tgt, d = node.shape
print("applying normalization")
for b in tqdm(range(nb)):
start = b*batch
step = min(batch, N-start)
X_ftr = node[start: start+step].astype(np.float64)
X_tgt = node_tgts[start: start+step].astype(np.float64)
idx = features.split_and_norm(X_ftr)
targets.split_and_norm(X_tgt, idx)
print(features.train.n)
print(features.val.n)
print(targets.train.n)
print(targets.val.n)
h5.close() # closing file
h5_tgts.close()
features.close_all()
targets.close_all()
def normalize_hdf5_copy(h5file, label, norm, force_normalization = False, batch = None):
path = os.path.split(h5file)[0]
name = label + '_' + os.path.splitext(os.path.basename(h5file))[0]
normalised_file = os.path.join(path, name + '.hdf5')
if os.path.exists(normalised_file) and not force_normalization:
print("Normalised file already exists - set force_normalization to true to re-compute the file")
return
# Prepare file for copying
h5tbl = util.Table_Writer(path, norm.names, name)
h5tbl.open_hdf5(add_norm = False)
h5 = open_file(h5file, "r")
for node in h5.walk_nodes():
pass # find a node with whatever name
dt = node.dtype
N, d = node.shape # HDF5 files are transposed, for Matlab compatibility
if batch is None:
batch = node.chunkshape[0]
nb = int(N/batch)
if N > nb*batch:
nb += 1 # add last incomplete step
print("applying normalization")
for b in tqdm(range(nb)):
start = b*batch
step = min(batch, N-start)
X = node[start: start+step].astype(np.float64)
normed_data = norm.normalize(X)
h5tbl.write_hdf5(normed_data, table = False)
print(h5tbl.n)
h5tbl.close_hdf5()
h5.close() # closing file
def rescale_hdf5_copy(h5file, target_name, norm, force_rescaling = False, batch = None):
path = os.path.split(h5file)[0]
rescaled_file = os.path.join(path, target_name + '.hdf5')
if os.path.exists(rescaled_file) and not force_rescaling:
print("Rescaled file already exists - set force_rescaling to true to re-compute the file")
return
# Prepare file for copying
h5tbl = util.Table_Writer(path, norm.names, target_name)
h5tbl.open_hdf5(add_norm = False)
h5 = open_file(h5file, "r")
for node in h5.walk_nodes():
pass # find a node with whatever name
dt = node.dtype
N, d = node.shape # HDF5 files are transposed, for Matlab compatibility
if batch is None:
batch = node.chunkshape[0]
nb = int(N/batch)
if N > nb*batch:
nb += 1 # add last incomplete step
print("rescaling")
for b in tqdm(range(nb)):
start = b*batch
step = min(batch, N-start)
X = node[start: start+step].astype(np.float64)
normed_data = norm.rescale(X)
h5tbl.write_hdf5(normed_data, table = False)
print(h5tbl.n)
h5tbl.close_hdf5()
h5.close() # closing file
def normalize_hdf5_inplace(h5file, mean=None, std=None, batch=None):
"""Calculates and applies normalization to data in HDF5 file IN-PLACE.
:param mean: - known vector of mean values
:param std: - known vector of standard deviations
:param batch: - number of rows to read at once, default is a native batch size
"""
h5 = open_file(h5file, "a")
for node in h5.walk_nodes():
pass # find a node with whatever name
dt = node.dtype
N, d = node.shape # HDF5 files are transposed, for Matlab compatibility
if batch is None:
batch = node.chunkshape[0]
nb = int(N/batch)
if N > nb*batch:
nb += 1 # add last incomplete step
if mean is None or std is None:
if 'norm' not in node.attrs._f_list(): # data was not normalized before
print("calculating mean and standard deviation of data")
E_x = np.zeros((d,), dtype=np.float64)
E_x2 = np.zeros((d,), dtype=np.float64)
for b in range(nb):
start = b*batch
step = min(batch, N-start)
X1 = node[start: start+step, :].astype(np.float64)
E_x += np.mean(X1, 0) * (1.0*step/N)
E_x2 += np.mean(X1**2, 0) * (1.0*step/N)
mean = E_x
E2_x = E_x**2
std = (E_x2 - E2_x)**0.5
node.attrs.mean = mean
node.attrs.std = std
node.attrs.norm = 1
# return mean, std
else: # data is already normalized
print("data was already normalized, returning 'mean', 'std' parameters")
mean = node.attrs.mean
std = node.attrs.std
h5.close() # closing file
return mean, std
else:
if 'mean' not in node.attrs._f_list():
assert len(mean) == d, "Incorrect lenght of a vector of means: %d expected, %d found" % (d, len(mean))
assert len(std) == d, "Incorrect lenght of a vector of standard deviations: %d expected, %d found" % (d, len(std))
node.attrs.mean = mean
node.attrs.std = std
else:
print("data was already normalized, returning 'mean', 'std' parameters")
mean = node.attrs.mean
std = node.attrs.std
h5.close() # closing file
return mean, std
std[std == 0] = 1 # prevent division by zero for std=0
print("applying normalization")
for b in tqdm(range(nb)):
start = b*batch
step = min(batch, N-start)
X = node[start: start+step].astype(np.float64)
X = (X - mean) / std
node[start: start+step] = X.astype(dt)
h5.close() # closing file
return mean, std
def merge_files(path, name, M, outname = None, norm = None, batch = None):
# name has form NAME_i.hdf , with NAME = name and i equal model number
if outname is None:
outname = name
meantbl = os.path.join(path, outname+'.hdf5')
stdtbl = os.path.join(path, outname+'_var.hdf5')
nodes = []
infiles = []
for i in range(M):
h5file = os.path.join(path, ('%s_%02d.hdf5' %(name,i)))
infiles.append(open_file(h5file, "r"))
for node in infiles[i].walk_nodes():
pass # find a node with whatever name
nodes.append(node)
if i == 0:
dt = node.dtype
N, d = node.shape # HDF5 files are transposed, for Matlab compatibility
if batch is None:
batch = node.chunkshape[0]
nb = int(N/batch)
if N > nb*batch:
nb += 1 # add last incomplete step
with h5py.File(meantbl, 'w') as f:
f.create_dataset('data', data = np.zeros((N, d)))
with h5py.File(stdtbl, 'w') as f:
f.create_dataset('data', data = np.zeros((N, d)))
for b in tqdm(range(nb)):
start = b*batch
step = min(batch, N-start)
sigma = np.zeros((step,d ))
sigma_sq = np.zeros((step,d))
for i in range(M):
X = nodes[i][start: start+step,:].astype(np.float64)
if norm is not None:
X = norm.rescale(X)
sigma += X
sigma_sq += X**2
mean = sigma / M
var = 1.0/M*(sigma_sq-sigma**2/M)
with h5py.File(meantbl, 'a') as f:
f['data'][start: start+step,:] = mean
with h5py.File(stdtbl, 'a') as f:
f['data'][start: start+step,:] = var
for i in range(M):
infiles[i].close() # closing files
h5file = os.path.join(path, ('%s_%02d.hdf5' %(name,i)))
os.remove(h5file)
### FRAME FOR BATCHES:
"""
def funce_in_batch(inputfile, outputs, batch = None):
# IF OUTFILE IS WRITTEN:
# h5tbl = util.Table_Writer(path, norm.names, target_name)
# h5tbl.open_hdf5(add_norm = False)
h5 = open_file(h5file, "r")
for node in h5.walk_nodes():
pass # find a node with whatever name
dt = node.dtype
N, d = node.shape # HDF5 files are transposed, for Matlab compatibility
if batch is None:
batch = node.chunkshape[0]
nb = int(N/batch)
if N > nb*batch:
nb += 1 # add last incomplete step
for b in tqdm(range(nb)):
start = b*batch
step = min(batch, N-start)
X = node[start: start+step].astype(np.float64)
# ACTION/FUNCTION
# IF OUTFILE IS WRITTEN: h5tbl.write_hdf5(DATA_TO_WRITE, table = False)
# IF OUTFILE IS WRITTEN: h5tbl.close_hdf5()
h5.close() # closing file
"""

Event Timeline