diff --git a/app/controller.py b/app/controller.py index 82afe47..6e1ee33 100644 --- a/app/controller.py +++ b/app/controller.py @@ -1,558 +1,565 @@ ########################################################################### # # # Copyright 2017 Andrea Cimatoribus # # EPFL ENAC IIE ECOL # # GR A1 435 (Batiment GR) # # Station 2 # # CH-1015 Lausanne # # Andrea.Cimatoribus@epfl.ch # # # # Alexandre Serex # # alexandre.serex@epfl.ch # # # # This file is part of ctracker # # # # ctracker is free software: you can redistribute it and/or modify it # # under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # ctracker is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty # # of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # # See the GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with ctracker. If not, see . # # # ########################################################################### import configparser import importlib import json from time import sleep from collections import namedtuple from itertools import product from os.path import join +import os import numpy as np from core.router import connect from mods.grid import Grid from mods.particleContainer import ParticleContainer from parameters.netCDFController import NetCDFController, dataset from parameters.threadedQueue import INWorker, OUTWorker from parameters.RandomGenerator import RandomGenerator from tools.grid_extractor import mitgcm, mitgcm_raw class Controller: def __init__(self, config_file_name): self.config_file_name = config_file_name self.config = {} self.config_methods = { "json": self.config_json, "ini": self.config_ini } # Load the configuration file try: file_extension = self.config_file_name.split('.')[-1] self.config_methods.get(file_extension, self.config_invalid)() except: print("\nFailed to load configuration file.") raise print("\nConfiguration loaded successfully.") # Initialize the grid try: # Concatenates the common values and the grid only values in a dict grid_data = dict(self.config["Common"], **self.config["Grid"]) grid_tuple = namedtuple('GridTuple', grid_data.keys())(**grid_data) # extract a raw grid from mitgcm module self.raw_grid = mitgcm_raw(grid_tuple) grid = mitgcm(grid_tuple) # concatenate mitgm data to common and grid configuration data grid_data = dict(grid_data, **grid) # Creates a custom namedtuple with the above dict and feeds it to the grid self.grid = Grid("grid", namedtuple( 'GridData', grid_data.keys())(**grid_data)) except: print("\nFailed at initializing grid.") raise # Initialize the simulation data as a particle container try: part_cont_data = dict(self.config["Common"], **self.config["Sim"]) self._part_container = ParticleContainer("part_container", namedtuple( 'ParticleContainerData', part_cont_data.keys())(**part_cont_data), self.grid) except: print("\nFailed at initializing simulation data.") raise mitgcm_data = dict(self.config["Common"], **self.config["MITgcm"]) self._data = namedtuple('ParticleContainerData', mitgcm_data.keys())(**mitgcm_data) # Initialize the input output necessities try: self.init_mitgcm() except: print("\nFailed at initializing controller.") raise # Check that all parameters' configuration is valid self._parameters = [self.grid, self._part_container] try: self.is_config_valid() [p.is_config_valid() for p in self._parameters] except: print("\nInvalid configuration setup.") raise print("\nInitialization successful.") def is_config_valid(self): + if self.nb_threads <= 0 or not isinstance(self.nb_threads, int): + raise ValueError("Wrong number of threads. Must be integer greater than 1.") + if self.gcm_geometry not in ("curvilinear", "cartesian"): raise ValueError("Unrecognised MITgcm geometry") if (self.out_dt % self.gcm_dt) != 0: raise ValueError("The GCM output interval must be " "a multiple of the model time step.") if self.iters.size < 2: raise ValueError("To start the computation, the simulation " "must span in time at least two GCM outputs") if (self.seed_interval % self.out_dt) != 0: raise ValueError("The seeding interval must be " "a multiple of the GCM step.") if self.inds_seed: # not sure if that np.any(...) does anything really if np.any(self.x_seed > self.grid.imax) or \ np.any(self.x_seed < 0) or \ np.any(self.y_seed > self.grid.jmax) or \ np.any(self.y_seed < 0) or \ np.any(self.z_seed > self.grid.kmax) or \ np.any(self.z_seed < 0): raise ValueError("Seeding with indexes beyond grid limits") if self.outfreq in ("always", "cross"): raise NotImplementedError("Outputing data with always and cross not yet supported.") if self.outfreq not in ("gcmstep", "always", "cross", "custom"): raise ValueError("Outfreq not recognised") if self.infreq not in ("gcmstep", "custom"): raise ValueError("Infreq not recognised") if min(self.timestamps_in) < self.iters.min() or \ max(self.timestamps_in) > self.iters.max(): raise ValueError("Timestamps of input are not within range of the experiment.") if min(self.timestamps_out) < self.iters.min() or \ max(self.timestamps_out) > self.iters.max(): raise ValueError("Timestamps of output are not within range of the experiment.") return True def init_mitgcm(self): ''' Initialize the gcm input / output ''' self.gcm_directory = self._data.gcm_directory self.out_prec = self._data.out_prec self.gcm_geometry = self._data.gcm_geometry self.gcm_start = self._data.gcm_start + self.nb_threads = self._data.nb_threads + os.environ["NUMBA_DEFAULT_NUM_THREADS"] = "{0}".format(self.nb_threads) + self.gcm_dt = self._data.gcm_dt self.out_dt = self._data.out_dt self.subiters = int(self._data.subiters) self.dstep = 1.0 / self.subiters self.out_dt = self._data.out_dt self.dtmax = self.out_dt * self.dstep self.u_root = join(self.gcm_directory, self._data.u_root) self.v_root = join(self.gcm_directory, self._data.v_root) self.w_root = join(self.gcm_directory, self._data.w_root) self.is2D = self._data.is2D print("\nSimulation in 2D : ", self.is2D) self.complevel = self._data.complevel print("\nIdentify seeding points") self.inds_seed = self._data.inds_seed self.seed_interval = self._data.seed_interval print("\nSeeding interval: {0}s".format(self.seed_interval)) # save to netcdf, this will be the output file of the simulation # we require NETCDF4 outfile_name = self._data.outfile.rstrip(".nc") self.inoutfile = outfile_name + "_inout.nc" self.runfile = outfile_name + "_run.nc" xGp1, yGp1 = self._get_geometry() self.raw_grid.attrs["MITgcm_dir"] = self.gcm_directory # we also store the full mesh with edges coordinates self.raw_grid.coords["i_p1"] = ("i_p1", np.arange(xGp1.shape[1])) self.raw_grid.coords["j_p1"] = ("j_p1", np.arange(xGp1.shape[0])) self.raw_grid.coords["XG_p1"] = (("j_p1", "i_p1"), xGp1) self.raw_grid.coords["YG_p1"] = (("j_p1", "i_p1"), yGp1) self.raw_grid.to_netcdf(self.runfile, mode="w", format="NETCDF4", engine="netcdf4") self.gcm_start = np.datetime64(self._data.gcm_start) print("\nReference date of GCM:", self.gcm_start) self.start = np.datetime64(self._data.start) print("\nBegin particle tracking simulation at:", self.start) self.end = np.datetime64(self._data.end) print("\nEnd particle tracking simulation at:", self.end) self.seed_start = np.datetime64(self._data.seed_start) print("\nStart seeding particles at:", self.seed_start) self.seed_end = np.datetime64(self._data.seed_end) print("\nStop seeding particles at:", self.seed_end) self.outfreq = self._data.outfreq self.out_gcmstep = self._data.out_gcmstep # determine size of output buffer self.nvals = 1 if self.outfreq == "always": self.nvals = 10 * self.subiters elif self.outfreq == "cross": self.nvals = 15 # this should be a safe maximum # first GCM iteration iter_i = np.floor( (self.start - self.gcm_start) .astype("timedelta64[s]").astype(float) / self.gcm_dt).astype("int32") # last GCM iteration iter_e = np.ceil( (self.end - self.gcm_start) .astype("timedelta64[s]").astype(float) / self.gcm_dt).astype("int32") self.iters = np.arange(iter_i, iter_e + 1, int(self.out_dt / self.gcm_dt)) # print("ITERS : ", self.start, self.gcm_start, self.end, iter_i, iter_e, self.iters, len(self.iters)) self.infreq = self._data.infreq # we seed along a transect off the Rhone outflow name_in = self._data.in_data.rstrip(".py") # import seed points coordinates in_data = getattr(importlib.__import__(name_in, fromlist=[name_in]), name_in) self.ijk_indseed, self.ijk_seed = self._ijkseed(in_data.x_seed, in_data.y_seed, in_data.z_seed, self.inds_seed) self.xyz_seed = np.zeros(self.ijk_seed.shape) self._part_container.grid.transform(self.ijk_seed[:, 0], self.ijk_seed[:, 1], self.ijk_seed[:, 2], self.xyz_seed) self.n_seed = self.ijk_seed.shape[0] print("\nNumber of seeding points: {0}".format(self.n_seed)) # seeding steps seed_i = np.floor( (self.seed_start - self.gcm_start) .astype("timedelta64[s]").astype(float) / self.gcm_dt).astype("int32") seed_e = np.ceil( (self.seed_end - self.gcm_start) .astype("timedelta64[s]").astype(float) / self.gcm_dt).astype("int32") iter_seed = np.arange(seed_i, seed_e + 1, int(self.seed_interval / self.gcm_dt)) if self.infreq == "gcmstep": self.timestamps_in = np.intersect1d(self.iters, iter_seed).tolist() else: self.timestamps_in = self.import_timestamps(self._data.in_custom) print("\nNumber of seeding time steps: %d" % len(self.timestamps_in)) self.ntrackmax = len(self.timestamps_in) * self.n_seed print("\nTotal number of particles to be released: %d" % self.ntrackmax) self.chunk_id = self._data.chunk_id self.chunk_time = self._data.chunk_time if self.outfreq == "gcm_step": self.timestamps_out = np.intersect1d(self.iters, iter_seed).tolist() else: self.timestamps_out = self.import_timestamps(self._data.out_custom) def import_timestamps(self, name_from_data): name = name_from_data.rstrip(".py") timestamps = getattr(importlib.__import__(name, fromlist=[name]), name) for i, t in enumerate(timestamps): if np.issubdtype(type(t), np.datetime64) or np.issubdtype(type(t), np.timedelta64): timestamps[i] = (t.astype("timedelta64[s]").astype(float) / self.gcm_dt).astype("int32").item() return timestamps def _ijkseed(self, ii, jj, kk, inds): ''' Define the ii, jj, kk indices from which particles will be released. ii, jj, kk: list, array, or scalar with indices (i, j, k) from which particles will be released (must be integers). inds: if True (default), ii, jj and kk are interpreted as indices, otherwise they are interpreted as lists of exact release positions''' from matplotlib.path import Path ii = np.atleast_1d(np.squeeze(ii)) jj = np.atleast_1d(np.squeeze(jj)) kk = np.atleast_1d(np.squeeze(kk)) if (ii.size != jj.size) or (ii.size != kk.size): raise ValueError("ii, jj and kk must have all the same dimension.") if inds: return np.atleast_2d(np.asarray(zip( np.int16(ii), np.int16(jj), np.int16(kk)))), \ np.atleast_2d(np.asarray(zip(ii, jj, kk))) else: # if MITgcm coordinates are passed, we have to load the model grid # and then translate into the "normalised" index coordinates of # tracmass def xy2grid(x, y, dX, dY, C, S): nx = (C * x + S * y) / dX ny = (-S * x + C * y) / dY return nx, ny xG = self._part_container.grid.xG yG = self._part_container.grid.yG dX = self._part_container.grid.dX dY = self._part_container.grid.dY if self.gcm_geometry == "curvilinear": cs = self._part_container.grid.CS sn = self._part_container.grid.SN zG = self._part_container.grid.Z iout = np.zeros(ii.size) * np.nan jout = np.zeros(ii.size) * np.nan kout = np.zeros(ii.size) * np.nan iindout = np.zeros(ii.size, dtype="int16") jindout = np.zeros(ii.size, dtype="int16") kindout = np.zeros(ii.size, dtype="int16") for nn, (xx, yy, zz) in enumerate(zip(ii, jj, kk)): jseed, iseed = np.unravel_index( np.argmin((xx - xG) ** 2 + (yy - yG) ** 2), xG.shape) if iseed >= self._part_container.grid.imax: iseed -= 1 if jseed >= self._part_container.grid.jmax: jseed -= 1 p = Path([[xG[jseed, iseed], yG[jseed, iseed]], [xG[jseed + 1, iseed], yG[jseed + 1, iseed]], [xG[jseed + 1, iseed + 1], yG[jseed + 1, iseed + 1]], [xG[jseed, iseed + 1], yG[jseed, iseed + 1]]]) # is the point inside this polygon? if not p.contains_point((xx, yy)): for ijtry in product(range(-1, 2), range(-1, 2)): inew = iseed + ijtry[0] jnew = jseed + ijtry[1] if (inew >= self._part_container.grid.imax) or \ (jnew >= self._part_container.grid.jmax) or \ (inew < 0) or (jnew < 0): continue p = Path([[xG[jnew, inew], yG[jnew, inew]], [xG[jnew + 1, inew], yG[jnew + 1, inew]], [xG[jnew + 1, inew + 1], yG[jnew + 1, inew + 1]], [xG[jnew, inew + 1], yG[jnew, inew + 1]]]) if not p.contains_point((xx, yy)): continue else: iseed = inew jseed = jnew break else: raise ValueError("Could not find the point " "(x=%f, y=%f)" % (xx, yy)) if self.gcm_geometry in ("curvilinear",): nx, ny = xy2grid(xx - xG[jseed, iseed], yy - yG[jseed, iseed], dX[jseed, iseed], dY[jseed, iseed], cs[jseed, iseed], sn[jseed, iseed]) else: nx = (xx - xG[jseed, iseed]) / dX[jseed, iseed] ny = (yy - yG[jseed, iseed]) / dY[jseed, iseed] # in case there is some edge issue if nx < 0 or nx >= 1 or ny < 0 or ny >= 1: print("\nInvalid point at " "x,y,z=%.2f,%.2f,%.2f" % (xx, yy, zz)) print("i,j=%d,%d" % (iseed, jseed)) print("nx,ny=%.3e,%.3e" % (nx, ny)) continue z_here = zG[:, jseed, iseed] if (zz >= z_here.max()) or (zz <= z_here.min()): print("\nPoint outside vertical bounds at " "x,y,z=%.2f,%.2f,%.2f" % (xx, yy, zz)) continue kk = np.where(z_here > zz)[0][-1] nz = (zz - z_here[kk]) / (z_here[kk + 1] - z_here[kk]) if self._part_container.grid.dxyz[kk, jseed, iseed] == 0: print("\nPoint on land " "x,y,z=%.2f,%.2f,%.2f" % (xx, yy, zz)) continue iindout[nn] = iseed jindout[nn] = jseed kindout[nn] = kk iout[nn] = iseed + nx jout[nn] = jseed + ny kout[nn] = kk + nz # end cycle on input particle positions return np.atleast_2d(np.asarray(list(zip( iindout[np.isfinite(iout)], jindout[np.isfinite(jout)], kindout[np.isfinite(kout)])))), \ np.atleast_2d(np.asarray(list(zip( iout[np.isfinite(iout)], jout[np.isfinite(jout)], kout[np.isfinite(kout)])))) # end _ijkseed def config_invalid(self, *args, **kwargs): raise EnvironmentError("Unsupported configuration file extension.") def config_json(self, *args, **kwargs): with open(self.config_file_name) as f: self.config = json.load(f) def config_ini(self, *args, **kwargs): config = configparser.ConfigParser() config.read(self.config_file_name) # Store in self.config the values of the config file. # Example: # [DEFAULT] hello = world # will translate to : # >>> self.config["DEFAULT"]["hello"] # world for section_name in config.sections(): section = config[section_name] self.config[section_name] = {} for key in section: self.config[section_name][key] = config[section_name][key] def run(self): """ Main function """ print("\nStarting ctracker") # open file for output with dataset(self.inoutfile, mode="w") as nc_inout, \ dataset(self.runfile, mode="r+") as nc_run: to_netcdf = NetCDFController("tonetcdf", nc_run, nc_inout, (self._part_container.iendw, self._part_container.iende, self._part_container.jendn, self._part_container.jends), self.ntrackmax, self.gcm_start, self.complevel, self.chunk_id, self.chunk_time ) in_worker = INWorker("inworker", self.iters, self.grid, (self.u_root, self.v_root, self.w_root), self.is2D, self.out_prec, self._part_container.ff) out_worker = OUTWorker("outworker", self.ijk_seed, self.xyz_seed, self.outfreq) print("\nStart main loop\n") seed_data = {'nvals': self.nvals, 'n_seed': self.n_seed, 'iters': self.iters, 'timestamps_in': self.timestamps_in, 'timestamps_out': self.timestamps_out, 'ijk_seed': self.ijk_seed, 'ijk_indseed': self.ijk_indseed} seed_ctx = namedtuple('SeedCtx', seed_data.keys())(**seed_data) connect(in_worker, self._part_container) connect(self._part_container, out_worker) connect(out_worker, to_netcdf) connect(out_worker, to_netcdf, name_out="sync", name_in="sync") in_worker.start() out_worker.start() self._part_container.run(seed_ctx) """ generator = RandomGenerator('randgen') out_worker = OUTWorker("outworker", self.ijk_seed, self.xyz_seed, self.outfreq) connect(generator, out_worker) connect(out_worker, to_netcdf) connect(out_worker, to_netcdf, "sync", "sync") n_part = 200 for i in range(0, 30): generator.to_out_worker(65665800.0 + i*1800.0, (n_part, 1, 4), (n_part, 1, 3), (n_part,), (n_part,)) out_worker.process_output() """ out_worker.thread.join() # end with statement print("\nSimulation ended.") # end run def _get_geometry(self): """ Convenience function returning XG and YG of mitgcm """ xG = np.zeros((self.grid.jmax + 1, self.grid.imax + 1)) yG = np.zeros((self.grid.jmax + 1, self.grid.imax + 1)) xG[:-1, :-1] = self.grid.xG yG[:-1, :-1] = self.grid.yG dxG = self.grid.dX dyG = self.grid.dY if self.gcm_geometry == "curvilinear": cs = self.grid.CS sn = self.grid.SN # Fill (approximate) end points of the grid xG[:-1, -1] = xG[:-1, -2] + dxG[:, -1] * cs[:, -1] xG[-1, :-1] = xG[-2, :-1] - dyG[-1, :] * sn[-1, :] # we lack the last metric at the NE corner, so we use the # nearby metric xG[-1, -1] = xG[-1, -2] + dxG[-1, -1] * cs[-1, -1] yG[-1, :-1] = yG[-2, :-1] + dyG[-1, :] * cs[-1, :] yG[:-1, -1] = yG[:-1, -2] + dxG[:, -1] * sn[:, -1] yG[-1, -1] = yG[-2, -1] + dyG[-1, -1] * cs[-1, -1] elif self.gcm_geometry == "cartesian": # Fill (approximate) end points of the grid xG[:-1, -1] = xG[:-1, -2] + dxG[:, -1] xG[-1, :] = xG[-2, :] yG[-1, :-1] = yG[-2, :-1] + dyG[-1, :] yG[:, -1] = yG[:, -2] else: raise ValueError("Grid geometry not recognised.") return xG, yG diff --git a/config.json b/config.json index e787cdc..ce8317c 100644 --- a/config.json +++ b/config.json @@ -1,63 +1,64 @@ { "Common":{ "gcm_dt": 20.0, "out_dt": 1800.0, "gcm_start": "2013-11-12 12:00", - "gcm_geometry": "curvilinear" + "gcm_geometry": "curvilinear", + "nb_threads": 4 }, "MITgcm":{ "gcm_directory": "gcm_output", "outfile": "results/tracks.nc", "u_root": "UVEL.", "v_root": "VVEL.", "w_root": "WVEL.", "out_prec": ">f4", "complevel": 1, "is2D": false, "start": "2015-12-12 12:30", "end": "2015-12-15 12:30", "outfreq": "gcmstep", "out_gcmstep": 1, "out_custom": "timestamps_out.py", "chunk_id": 2136, "chunk_time": 64, "in_data": "in_data.py", "infreq": "gcmstep", "in_custom": "timestamps_in.py", "inds_seed": false, "seed_start": "2015-12-12 12:30", "seed_end": "2016-10-01 00:00", "seed_interval": 1800.0, "subiters": 1000 }, "Grid":{ "gcm_directory": "gcm_output", "u_root": "UVEL.", "v_root": "VVEL.", "w_root": "WVEL.", "subiters": 1000 }, "Sim":{ "ff": 1, "iende": [300.0], "iendw": [0.0], "jendn": [100.0], "jends": [0.0], "n_threads":8, "n_min_part_per_thread":200, "outfreq": "gcmstep" } } \ No newline at end of file