Page MenuHomec4science

features.py
No OneTemporary

File Metadata

Created
Mon, Apr 28, 00:27

features.py

import numpy as np
import pandas as pd
import xarray as xr
import os
import time
from meteo_data import Meteo_Reader
import util
import json
import norms
class Training():
def __init__(self, path, model_name, features, targets, data_type = 'meteo'):
self.data_path = os.path.join(path,"raw_data")
self.ds_path = os.path.join(path,"datasets", model_name)
self.location_path = os.path.join(path,"locations")
self.train_path = os.path.join(self.ds_path, 'train')
self.test_path = os.path.join(self.ds_path, 'test')
self.make_test = False
self.make_train = True
self.data_type = data_type
# Create a new folder if it does not exist yet:
if not os.path.exists(self.ds_path):
os.mkdir(self.ds_path)
os.mkdir(os.path.join(self.ds_path, "train"))
os.mkdir(os.path.join(self.ds_path, "test"))
os.mkdir(os.path.join(self.ds_path, "query"))
self.features = util.Table_Writer(self.train_path, features, 'features')
self.targets = util.Table_Writer(self.train_path, targets, 'targets')
print('\n\nSET UP DATASET %s' %model_name)
###################################### TRAINING DATA ###################################
def make_dataset(self, table = None, test_table = None, start = None, end = None, year = None, sample_name = None, test_name = None, k_norm = 100):
# possible adaption: use "select"-dictionary to make class usable for other data types
if (test_name is not None) or (test_table is not None):
self.make_test = True
self.ftrs_test = util.Table_Writer(self.test_path, self.features.cols, 'features')
self.tgts_test = util.Table_Writer(self.test_path, self.targets.cols, 'targets')
self.ftrs_test.open_hdf5(add_norm = False)
self.tgts_test.open_hdf5(add_norm = False)
self.features.open_hdf5(k = k_norm)
self.targets.open_hdf5(k = k_norm)
if self.data_type == 'meteo':
self.load_meteo(start, end, year, sample_name, test_name)
elif self.data_type == 'table':
self.from_table(table, test_table)
if self.make_test:
self.ftrs_test.close_hdf5()
self.tgts_test.close_hdf5()
ftr_norm = self.features.close_hdf5()
tgt_norm = self.targets.close_hdf5()
ftr_norm.to_csv(os.path.join(self.ds_path, "norm_"+self.features.name+".csv"))
tgt_norm.to_csv(os.path.join(self.ds_path, "norm_"+self.targets.name+".csv"))
def make_testset(self, start, end, sample_name):
self.make_train = False
self.make_test = True
self.ftrs_test = util.Table_Writer(self.test_path, self.features.cols, 'features')
self.tgts_test = util.Table_Writer(self.test_path, self.targets.cols, 'targets')
self.ftrs_test.open_hdf5(add_norm = False)
self.tgts_test.open_hdf5(add_norm = False)
if self.data_type == 'meteo':
self.load_meteo(start, end, test = sample_name)
self.ftrs_test.close_hdf5()
self.tgts_test.close_hdf5()
############################### NORMALIZATION ################################
def normalize_all(self, set_norm = True, feature_norm = 'mean', target_norm = 'mean', val_ratio = 0.8, force_normalization = False, batch = None):
# IF TRAIN EXISTS: normalize & split train
if os.path.exists(os.path.join(self.train_path, 'features.hdf5')) and os.path.exists(os.path.join(self.train_path, 'targets.hdf5')):
self.normalize_train(set_norm, feature_norm, target_norm, val_ratio, force_normalization, batch)
else:
print("Train features and/or targets to normalize do not exist")
# IF TEST EXISTS: normalize test
if os.path.exists(os.path.join(self.test_path, 'features.hdf5')) and os.path.exists(os.path.join(self.test_path, 'targets.hdf5')):
self.normalize_test(force_normalization = force_normalization, batch = batch)
else:
print("Test features and/or targets to normalize do not exist")
def normalize_train(self, set_norm = True, feature_norm = 'mean', target_norm = 'mean', val_ratio = 0.8, force_normalization = False, batch = None):
norms.normalize_and_split(self.ds_path, set_norm, feature_norm, target_norm, force_normalization, batch, val_ratio)
def normalize_test(self, set_norm = False, feature_norm = 'mean', target_norm = 'mean', force_normalization = False, batch = None):
ftr_normname = os.path.join(self.ds_path, 'norm_features.csv')
tgt_normname = os.path.join(self.ds_path, 'norm_targets.csv')
ftr_norm = util.Normalizer(ftr_normname)
tgt_norm = util.Normalizer(tgt_normname)
if set_norm:
ftr_norm.set_status(feature_norm)
tgt_norm.set_status(target_norm)
ftr_file = os.path.join(self.test_path, 'features.hdf5')
tgt_file = os.path.join(self.test_path, 'targets.hdf5')
label = 'test'
norms.normalize_hdf5_copy(ftr_file, label, ftr_norm, force_normalization = force_normalization)
norms.normalize_hdf5_copy(tgt_file, label, tgt_norm, force_normalization = force_normalization)
#################################################################################
def load_meteo(self, start_date = None, end_date = None, year = None, sample = None, test = None):
reader = Meteo_Reader(self.data_path)
locations = 'CH'
# read variables that are in features or in targets
vars_to_read = list(set(reader._all_vars) & set(self.features.cols)| set(reader._all_vars) & set(self.targets.cols))
if sample is not None:
reader.read_sample(filename = sample, sample_raw = False)
locations = 'sample'
if self.make_test:
reader.read_sample(filename = test, sample_name = 'sample_test', sample_raw = False)
timer = time.clock()
if year is None:
month_start = pd.date_range(start_date, end_date, freq = 'MS')
month_end = pd.date_range(start_date, end_date, freq = 'M')
for (curr_month, curr_month_end) in zip(month_start, month_end):
timer1 = time.clock()
reader.read_data(curr_month, curr_month_end, vars_to_read, reset=True, print_log = False)
# CREATE FEATURE AND TARGET ARRAYS
if self.make_train:
train_data = reader.get_subset()
tbl = reader.make_table(indata = train_data, ftrs = self.features.cols, print_log = False)
self.features.write_hdf5(tbl)
self.targets.write_hdf5(tbl)
if self.make_test:
test_data = reader.get_subset(sample_name = 'sample_test')
tbl_tst = reader.make_table(indata = test_data, ftrs = self.features.cols, print_log = False)
self.ftrs_test.write_hdf5(tbl_tst)
self.tgts_test.write_hdf5(tbl_tst)
print ("Iteration: %.2f seconds" %(time.clock() - timer1))
else:
reader.read_yearly(year, vars_to_read)
# CREATE FEATURE AND TARGET ARRAYS
if self.make_train:
train_data = reader.get_subset()
tbl = reader.make_table(indata = train_data, ftrs = self.features.cols, print_log = False)
self.features.write_hdf5(tbl)
self.targets.write_hdf5(tbl)
if self.make_test:
test_data = reader.get_subset(sample_name = 'sample_test')
tbl_tst = reader.make_table(indata = test_data, ftrs = self.features.cols, print_log = False)
self.ftrs_test.write_hdf5(tbl_tst)
self.tgts_test.write_hdf5(tbl_tst)
timer = time.clock() - timer
print ("Finished reading data in %.2f seconds" %(timer))
# write all auxilary information to csv
if self.make_test:
locs = reader.loc_mask.where(reader.loc_mask['sample_test']==1).to_dataframe().dropna().reset_index()
locs[['lon', 'lat','x', 'y']].to_csv(os.path.join(self.test_path,"test_locations.csv"))
pd.Series({'start_date' : start_date, 'end_date' : end_date}).to_csv(os.path.join(self.test_path,"test_dates.csv"))
metadata = {'start_date': start_date, 'end_date': end_date,
'locations': locs[['lon', 'lat','x', 'y']]}
if self.make_train:
locs = reader.loc_mask.where(reader.loc_mask[locations]==1).to_dataframe().dropna().reset_index()
locs[['lon', 'lat','x', 'y']].to_csv(os.path.join(self.train_path,"train_locations.csv"))
pd.Series({'start_date' : start_date, 'end_date' : end_date}).to_csv(os.path.join(self.train_path,"train_dates.csv"))
metadata = {'start_date': start_date, 'end_date': end_date,
'locations': locs[['lon', 'lat','x', 'y']]}
return metadata
def from_table(self, indata, testdata = None, metadata = None):
# CREATE FEATURE AND TARGET ARRAYS
if self.make_train:
self.features.write_hdf5(indata)
self.targets.write_hdf5(indata)
if self.make_test and (testdata is not None):
self.ftrs_test.write_hdf5(testdata)
self.tgts_test.write_hdf5(testdata)
###################################################################################
class Testing():
def __init__(self, path, model_name, query_name, data_type = 'meteo'):
self.data_path = os.path.join(path,"raw_data")
self.ds_path = os.path.join(path,"datasets", model_name)
self.location_path = os.path.join(path,"locations")
self.query_path = os.path.join(self.ds_path,"query", query_name)
self.features_query = os.path.join(self.query_path,"features.hdf5")
self.data_type = data_type
# Create a new folder if it does not exist yet:
if not (os.path.exists(self.ds_path) or os.path.exists(os.path.join(self.ds_path, "norm_features.csv"))):
print("Model does not exist - create training data first")
return
if not os.path.exists(self.query_path):
os.mkdir(self.query_path)
self.feature_norm = util.Normalizer(os.path.join(self.ds_path, "norm_features.csv"))
self.target_norm = util.Normalizer(os.path.join(self.ds_path, "norm_targets.csv"))
def make_query(self, loc, hour = list(range(3,20)), month = list(range(1,13)), year = None, day= None, hourmask = True, maskname = None, input_vars = None):
"""
write hdf5 feature table for query data
Inputs:
loc filename for location set
hour, month, year LISTS of values that should be added to the query set
input_vars include other variables in the NOT YET IMPLEMENTED
norm perform normalisation of data
"""
# Check that all prerequisite files are in the correct directory
if not os.path.exists(os.path.join(self.location_path,loc)):
print("ERROR: Location file not found - Check %s for %s" %(self.location_path,loc))
return
if not os.path.exists(os.path.join(self.ds_path, 'train', 'features.hdf5')):
print("ERROR: Training files not found - Check %s" %(self.ds_path))
return
timer = time.clock()
# Create dataset according to features
pts = pd.read_csv(os.path.join(self.location_path,loc)) # read locations
pts = pts.reset_index().set_index(['x','y']).to_xarray() # create xarray dataset from locations
# add data as required
if hour is not None:
pts.coords['hour'] = hour
if month is not None:
pts.coords['month'] = month
if year is not None:
pts.coords['year'] = year
if day is not None:
pts.coords['day'] = year
if hourmask:
if maskname is None:
maskname = 'hour_mask_mmh.nc'
mask = xr.open_dataset(os.path.join(self.data_path,maskname))
pts = xr.merge([pts, mask])
pts = pts.to_dataframe().dropna().reset_index() # turn back into table of feature points
# Make feature table
ftrs = util.Table_Writer(self.query_path, self.feature_norm.names, 'features')
ftrs.open_hdf5(add_norm = False)
arr = pts.as_matrix(columns = self.feature_norm.names)
if arr.shape[1] != len(self.feature_norm.names):
print("ERROR: Not all features given as input")
ftrs.close_hdf5()
return
else:
ftrs.write_hdf5(arr, table = False)
ftrs.close_hdf5()
timer = time.clock() - timer
print("Dataset successfully created in %.2f seconds" %(timer))
def normalize_input(self, set_norm = False, norm_type = 'none', force_normalization = False):
if os.path.exists(self.features_query):
if set_norm:
self.feature_norm.set_status(norm_type)
norms.normalize_hdf5_copy(self.features_query, 'query', self.feature_norm, force_normalization = force_normalization)
else:
print("Query features to normalize do not exist")
def rescale_output(self, h5file, target_name, set_norm = False, norm_type = 'none', force_rescaling = False):
infile = os.path.join(self.query_path, h5file)
outfile = os.path.join(self.query_path, target_name)
if os.path.exists(infile):
if set_norm:
self.target_norm.set_status(norm_type)
norms.rescale_hdf5_copy(infile, outfile, self.target_norm, force_rescaling = force_rescaling)
else:
print("Query outputs to rescale do not exist")

Event Timeline