Page MenuHomec4science

controller.py
No OneTemporary

File Metadata

Created
Tue, Aug 6, 00:58

controller.py

###########################################################################
# #
# Copyright 2017 Andrea Cimatoribus #
# EPFL ENAC IIE ECOL #
# GR A1 435 (Batiment GR) #
# Station 2 #
# CH-1015 Lausanne #
# Andrea.Cimatoribus@epfl.ch #
# #
# Alexandre Serex #
# alexandre.serex@epfl.ch #
# #
# This file is part of ctracker #
# #
# ctracker is free software: you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# ctracker is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty #
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. #
# See the GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with ctracker. If not, see <http://www.gnu.org/licenses/>. #
# #
###########################################################################
import configparser
import importlib
import json
from time import sleep
from collections import namedtuple
from itertools import product
from os.path import join
import numpy as np
from core.router import connect
from mods.grid import Grid
from mods.particleContainer import ParticleContainer
from parameters.netCDFController import NetCDFController, dataset
from parameters.threadedQueue import INWorker, OUTWorker
from parameters.RandomGenerator import RandomGenerator
from tools.grid_extractor import mitgcm, mitgcm_raw
class Controller:
def __init__(self, config_file_name):
self.config_file_name = config_file_name
self.config = {}
self.config_methods = {
"json": self.config_json,
"ini": self.config_ini
}
# Load the configuration file
try:
file_extension = self.config_file_name.split('.')[-1]
self.config_methods.get(file_extension, self.config_invalid)()
except:
print("\nFailed to load configuration file.")
raise
print("\nConfiguration loaded successfully.")
# Initialize the grid
try:
# Concatenates the common values and the grid only values in a dict
grid_data = dict(self.config["Common"], **self.config["Grid"])
grid_tuple = namedtuple('GridTuple', grid_data.keys())(**grid_data)
# extract a raw grid from mitgcm module
self.raw_grid = mitgcm_raw(grid_tuple)
grid = mitgcm(grid_tuple)
# concatenate mitgm data to common and grid configuration data
grid_data = dict(grid_data, **grid)
# Creates a custom namedtuple with the above dict and feeds it to the grid
self.grid = Grid("grid", namedtuple(
'GridData',
grid_data.keys())(**grid_data))
except:
print("\nFailed at initializing grid.")
raise
# Initialize the simulation data as a particle container
try:
part_cont_data = dict(self.config["Common"], **self.config["Sim"])
self._part_container = ParticleContainer("part_container", namedtuple(
'ParticleContainerData',
part_cont_data.keys())(**part_cont_data), self.grid)
except:
print("\nFailed at initializing simulation data.")
raise
mitgcm_data = dict(self.config["Common"], **self.config["MITgcm"])
self._data = namedtuple('ParticleContainerData', mitgcm_data.keys())(**mitgcm_data)
# Initialize the input output necessities
try:
self.init_mitgcm()
except:
print("\nFailed at initializing controller.")
raise
# Check that all parameters' configuration is valid
self._parameters = [self.grid, self._part_container]
try:
self.is_config_valid()
[p.is_config_valid() for p in self._parameters]
except:
print("\nInvalid configuration setup.")
raise
print("\nInitialization successful.")
def is_config_valid(self):
if self.gcm_geometry not in ("curvilinear", "cartesian"):
raise ValueError("Unrecognised MITgcm geometry")
if (self.out_dt % self.gcm_dt) != 0:
raise ValueError("The GCM output interval must be "
"a multiple of the model time step.")
if self.iters.size < 2:
raise ValueError("To start the computation, the simulation "
"must span in time at least two GCM outputs")
if (self.seed_interval % self.out_dt) != 0:
raise ValueError("The seeding interval must be "
"a multiple of the GCM step.")
if self.inds_seed:
# not sure if that np.any(...) does anything really
if np.any(self.x_seed > self.grid.imax) or \
np.any(self.x_seed < 0) or \
np.any(self.y_seed > self.grid.jmax) or \
np.any(self.y_seed < 0) or \
np.any(self.z_seed > self.grid.kmax) or \
np.any(self.z_seed < 0):
raise ValueError("Seeding with indexes beyond grid limits")
if self.outfreq in ("always", "cross"):
raise NotImplementedError("Outputing data with always and cross not yet supported.")
if self.outfreq not in ("gcmstep", "always", "cross", "custom"):
raise ValueError("Outfreq not recognised")
if self.infreq not in ("gcmstep", "custom"):
raise ValueError("Infreq not recognised")
return True
def init_mitgcm(self):
'''
Initialize the gcm input / output
'''
self.gcm_directory = self._data.gcm_directory
self.out_prec = self._data.out_prec
self.gcm_geometry = self._data.gcm_geometry
self.gcm_start = self._data.gcm_start
self.gcm_dt = self._data.gcm_dt
self.out_dt = self._data.out_dt
self.subiters = int(self._data.subiters)
self.dstep = 1.0/self.subiters
self.out_dt = self._data.out_dt
self.dtmax = self.out_dt * self.dstep
self.u_root = join(self.gcm_directory, self._data.u_root)
self.v_root = join(self.gcm_directory, self._data.v_root)
self.w_root = join(self.gcm_directory, self._data.w_root)
self.is2D = self._data.is2D
print("\nSimulation in 2D : ", self.is2D)
self.complevel = self._data.complevel
print("\nIdentify seeding points")
self.inds_seed = self._data.inds_seed
self.seed_interval = self._data.seed_interval
print("\nSeeding interval: {0}s".format(self.seed_interval))
# save to netcdf, this will be the output file of the simulation
# we require NETCDF4
outfile_name = self._data.outfile.rstrip(".nc")
self.inoutfile = outfile_name + "_inout.nc"
self.runfile = outfile_name + "_run.nc"
xGp1, yGp1 = self._get_geometry()
self.raw_grid.attrs["MITgcm_dir"] = self.gcm_directory
# we also store the full mesh with edges coordinates
self.raw_grid.coords["i_p1"] = ("i_p1", np.arange(xGp1.shape[1]))
self.raw_grid.coords["j_p1"] = ("j_p1", np.arange(xGp1.shape[0]))
self.raw_grid.coords["XG_p1"] = (("j_p1", "i_p1"), xGp1)
self.raw_grid.coords["YG_p1"] = (("j_p1", "i_p1"), yGp1)
self.raw_grid.to_netcdf(self.runfile,
mode="w", format="NETCDF4", engine="netcdf4")
self.gcm_start = np.datetime64(self._data.gcm_start)
print("\nReference date of GCM:", self.gcm_start)
self.start = np.datetime64(self._data.start)
print("\nBegin particle tracking simulation at:", self.start)
self.end = np.datetime64(self._data.end)
print("\nEnd particle tracking simulation at:", self.end)
self.seed_start = np.datetime64(self._data.seed_start)
print("\nStart seeding particles at:", self.seed_start)
self.seed_end = np.datetime64(self._data.seed_end)
print("\nStop seeding particles at:", self.seed_end)
self.outfreq = self._data.outfreq
self.out_gcmstep = self._data.out_gcmstep
# determine size of output buffer
self.nvals = 1
if self.outfreq == "always":
self.nvals = 10 * self.subiters
elif self.outfreq == "cross":
self.nvals = 15 # this should be a safe maximum
# first GCM iteration
iter_i = np.floor(
(self.start - self.gcm_start)
.astype("timedelta64[s]").astype(float) /
self.gcm_dt).astype("int32")
# last GCM iteration
iter_e = np.ceil(
(self.end - self.gcm_start)
.astype("timedelta64[s]").astype(float) /
self.gcm_dt).astype("int32")
self.iters = np.arange(iter_i, iter_e + 1,
int(self.out_dt / self.gcm_dt))
self.infreq = self._data.infreq
# we seed along a transect off the Rhone outflow
name_in = self._data.in_data.rstrip(".py")
# import seed points coordinates
in_data = getattr(importlib.__import__(name_in, fromlist=[name_in]), name_in)
self.ijk_indseed, self.ijk_seed = self._ijkseed(in_data.x_seed, in_data.y_seed, in_data.z_seed, self.inds_seed)
self.xyz_seed = np.zeros(self.ijk_seed.shape)
self._part_container.grid.transform(self.ijk_seed[:, 0],
self.ijk_seed[:, 1],
self.ijk_seed[:, 2],
self.xyz_seed)
self.n_seed = self.ijk_seed.shape[0]
print("\nNumber of seeding points: {0}".format(self.n_seed))
# seeding steps
seed_i = np.floor(
(self.seed_start - self.gcm_start)
.astype("timedelta64[s]").astype(float) /
self.gcm_dt).astype("int32")
seed_e = np.ceil(
(self.seed_end - self.gcm_start)
.astype("timedelta64[s]").astype(float) /
self.gcm_dt).astype("int32")
iter_seed = np.arange(seed_i, seed_e + 1,
int(self.seed_interval / self.gcm_dt))
self.timestamps_in = np.intersect1d(self.iters, iter_seed)
print("\nNumber of seeding time steps: %d" % self.timestamps_in.size)
self.ntrackmax = len(self.timestamps_in) * self.n_seed
print("\nTotal number of particles to be released: %d" %
self.ntrackmax)
self.chunk_id = self._data.chunk_id
self.chunk_time = self._data.chunk_time
self.timestamps_out = []
if self.outfreq == "gcmstep":
pass
elif self.outfreq == "custom":
name_out = self._data.out_custom.rstrip(".py")
self.timestamps_out = getattr(importlib.__import__(name_out, fromlist=[name_out]), name_out)
def _ijkseed(self, ii, jj, kk, inds):
'''
Define the ii, jj, kk indices from which particles will be
released.
ii, jj, kk: list, array, or scalar with indices (i, j, k) from which
particles will be released (must be integers).
inds: if True (default), ii, jj and kk are interpreted
as indices, otherwise they are interpreted as lists of
exact release positions'''
ii = np.atleast_1d(np.squeeze(ii))
jj = np.atleast_1d(np.squeeze(jj))
kk = np.atleast_1d(np.squeeze(kk))
if (ii.size != jj.size) or (ii.size != kk.size):
raise ValueError("ii, jj and kk must have all the same dimension.")
if inds:
return np.atleast_2d(np.asarray(zip(
np.int16(ii),
np.int16(jj),
np.int16(kk)))), \
np.atleast_2d(np.asarray(zip(ii, jj, kk)))
else:
# if MITgcm coordinates are passed, we have to load the model grid
# and then translate into the "normalised" index coordinates of
# tracmass
def xy2grid(x, y, dX, dY, C, S):
nx = (C * x + S * y) / dX
ny = (-S * x + C * y) / dY
return nx, ny
xG = self._part_container.grid.xG
yG = self._part_container.grid.yG
dX = self._part_container.grid.dX
dY = self._part_container.grid.dY
if self.gcm_geometry == "curvilinear":
cs = self._part_container.grid.CS
sn = self._part_container.grid.SN
zG = self._part_container.grid.Z
iout = np.zeros(ii.size) * np.nan
jout = np.zeros(ii.size) * np.nan
kout = np.zeros(ii.size) * np.nan
iindout = np.zeros(ii.size, dtype="int16")
jindout = np.zeros(ii.size, dtype="int16")
kindout = np.zeros(ii.size, dtype="int16")
for nn, (xx, yy, zz) in enumerate(zip(ii, jj, kk)):
jseed, iseed = np.unravel_index(
np.argmin((xx-xG)**2 + (yy-yG)**2), xG.shape)
if self.gcm_geometry == "curvilinear":
nx, ny = xy2grid(xx - xG[jseed, iseed],
yy - yG[jseed, iseed],
dX[jseed, iseed], dY[jseed, iseed],
cs[jseed, iseed], sn[jseed, iseed])
else:
nx = (xx - xG[jseed, iseed]) / dX[jseed, iseed]
ny = (yy - yG[jseed, iseed]) / dY[jseed, iseed]
# check the computed nx and ny, and in case seek in
# nearby cell
if (nx < 0) or (nx > 1) or (ny < 0) or (ny > 1):
for ijtry in product(range(-5, 6), range(-5, 6)):
inew = iseed + ijtry[0]
jnew = jseed + ijtry[1]
if (inew < 0) or (jnew < 0) \
or (inew >= self._part_container.grid.imax) or (jnew >= self._part_container.grid.jmax):
continue
if self.gcm_geometry == "curvilinear":
nx, ny = xy2grid(xx - xG[jnew, inew],
yy - yG[jnew, inew],
dX[jnew, inew], dY[jnew, inew],
cs[jnew, inew], sn[jnew, inew])
else:
nx = (xx - xG[jnew, inew]) / dX[jnew, inew]
ny = (yy - yG[jnew, inew]) / dY[jnew, inew]
if (nx >= 0) and (nx < 1) and (ny >= 0) and (ny < 1):
iseed = inew
jseed = jnew
break
else:
raise ValueError("Could not find the point "
"(x=%f, y=%f)" % (xx, yy))
z_here = zG[:, jseed, iseed]
if (zz > z_here.max()) or (zz <= z_here.min()):
print("\nPoint outside vertical bounds at "
"x,y,z=%.2f,%.2f,%.2f" % (xx, yy, zz))
continue
kk = np.where(z_here > zz)[0][-1]
nz = (zz - z_here[kk]) / (z_here[kk+1] - z_here[kk])
iindout[nn] = iseed
jindout[nn] = jseed
kindout[nn] = kk
iout[nn] = iseed + nx
jout[nn] = jseed + ny
kout[nn] = kk + nz
good = np.isfinite(iout)
iindout = iindout[good]
jindout = jindout[good]
kindout = kindout[good]
iout = iout[good]
jout = jout[good]
kout = kout[good]
return np.atleast_2d(np.asarray(list(zip(
iindout[np.isfinite(iout)],
jindout[np.isfinite(jout)],
kindout[np.isfinite(kout)])))), \
np.atleast_2d(np.asarray(list(zip(
iout[np.isfinite(iout)],
jout[np.isfinite(jout)],
kout[np.isfinite(kout)]))))
# end _ijkseed
def config_invalid(self, *args, **kwargs):
raise EnvironmentError("Unsupported configuration file extension.")
def config_json(self, *args, **kwargs):
with open(self.config_file_name) as f:
self.config = json.load(f)
def config_ini(self, *args, **kwargs):
config = configparser.ConfigParser()
config.read(self.config_file_name)
# Store in self.config the values of the config file.
# Example:
# [DEFAULT] hello = world
# will translate to :
# >>> self.config["DEFAULT"]["hello"]
# world
for section_name in config.sections():
section = config[section_name]
self.config[section_name] = {}
for key in section:
self.config[section_name][key] = config[section_name][key]
def run(self):
"""
Main function
"""
print("\nStarting ctracker")
# open file for output
with dataset(self.inoutfile, mode="w") as nc_inout, \
dataset(self.runfile, mode="r+") as nc_run:
to_netcdf = NetCDFController("tonetcdf", nc_run, nc_inout,
(self._part_container.iendw,
self._part_container.iende,
self._part_container.jendn,
self._part_container.jends),
self.ntrackmax,
self.gcm_start,
self.complevel,
self.chunk_id,
self.chunk_time
)
in_worker = INWorker("inworker", self.iters, self.grid, (self.u_root, self.v_root, self.w_root),
self.is2D, self.out_prec, self._part_container.ff)
out_worker = OUTWorker("outworker", self.ijk_seed, self.xyz_seed, self.outfreq)
print("\nStart main loop\n")
seed_data = {'nvals': self.nvals,
'n_seed': self.n_seed,
'ijk_seed': self.ijk_seed,
'ijk_indseed': self.ijk_indseed}
seed_ctx = namedtuple('SeedCtx', seed_data.keys())(**seed_data)
connect(in_worker, self._part_container)
connect(self._part_container, out_worker)
connect(out_worker, to_netcdf)
connect(out_worker, to_netcdf, name_out="sync", name_in="sync")
in_worker.start()
out_worker.start()
self._part_container.run(self.iters, self.timestamps_in, seed_ctx)
"""
generator = RandomGenerator('randgen')
out_worker = OUTWorker("outworker", self.ijk_seed, self.xyz_seed, self.outfreq)
connect(generator, out_worker)
connect(out_worker, to_netcdf)
connect(out_worker, to_netcdf, "sync", "sync")
n_part = 200
for i in range(0, 30):
generator.to_out_worker(65665800.0 + i*1800.0, (n_part, 1, 4), (n_part, 1, 3), (n_part,), (n_part,))
out_worker.process_output()
"""
# end with statement
print("\nSimulation ended.")
# end run
def _get_geometry(self):
"""
Convenience function returning XG and YG of mitgcm
"""
xG = np.zeros((self.grid.jmax + 1, self.grid.imax + 1))
yG = np.zeros((self.grid.jmax + 1, self.grid.imax + 1))
xG[:-1, :-1] = self.grid.xG
yG[:-1, :-1] = self.grid.yG
dxG = self.grid.dX
dyG = self.grid.dY
if self.gcm_geometry == "curvilinear":
cs = self.grid.CS
sn = self.grid.SN
# Fill (approximate) end points of the grid
xG[:-1, -1] = xG[:-1, -2] + dxG[:, -1] * cs[:, -1]
xG[-1, :-1] = xG[-2, :-1] - dyG[-1, :] * sn[-1, :]
# we lack the last metric at the NE corner, so we use the
# nearby metric
xG[-1, -1] = xG[-1, -2] + dxG[-1, -1] * cs[-1, -1]
yG[-1, :-1] = yG[-2, :-1] + dyG[-1, :] * cs[-1, :]
yG[:-1, -1] = yG[:-1, -2] + dxG[:, -1] * sn[:, -1]
yG[-1, -1] = yG[-2, -1] + dyG[-1, -1] * cs[-1, -1]
elif self.gcm_geometry == "cartesian":
# Fill (approximate) end points of the grid
xG[:-1, -1] = xG[:-1, -2] + dxG[:, -1]
xG[-1, :] = xG[-2, :]
yG[-1, :-1] = yG[-2, :-1] + dyG[-1, :]
yG[:, -1] = yG[:, -2]
else:
raise ValueError("Grid geometry not recognised.")
return xG, yG

Event Timeline