Page MenuHomec4science

particleContainer.py
No OneTemporary

File Metadata

Created
Sun, Oct 13, 18:23

particleContainer.py

###########################################################################
# #
# Copyright 2017 Andrea Cimatoribus #
# EPFL ENAC IIE ECOL #
# GR A1 435 (Batiment GR) #
# Station 2 #
# CH-1015 Lausanne #
# Andrea.Cimatoribus@epfl.ch #
# #
# Alexandre Serex #
# alexandre.serex@epfl.ch #
# #
# This file is part of ctracker #
# #
# ctracker is free software: you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# ctracker is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty #
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. #
# See the GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with ctracker. If not, see <http://www.gnu.org/licenses/>. #
# #
###########################################################################
import numpy as np
from queue import Queue
from collections import OrderedDict
from threading import Thread
from core.parameterContainer import ParameterContainer
from parameters.particle import Particle
from tools.CFunctions import ext_loop
class ParticleContainer(ParameterContainer):
def __init__(self, name, data, grid):
super(ParticleContainer, self).__init__(name, data)
'''
Initializes the seeding frequency and position and seeding steps for the simulation
'''
self.outfreq = data.outfreq
self.ff = float(data.ff)
self.gcm_geometry = data.gcm_geometry
self.grid = grid
self.gcm_dt = data.gcm_dt
self.out_dt = data.out_dt
self.gcm_start = data.gcm_start
self.iende = np.asarray(data.iende)
self.iendw = np.asarray(data.iendw)
self.jendn = np.asarray(data.jendn)
self.jends = np.asarray(data.jends)
self.nend = self.iende.size
# Total number of active particles
self.ntact = 0
# Number of particles which exited the domain
self.ntout = 0
self.queue_in = Queue(maxsize=3)
# self.queue_out = Queue(maxsize=50)
self.ntact = 0
self._children = OrderedDict() # current tracked particles
self.exited = {}
self.nvals = 1
self.nb_threads = data.nb_threads;
self.is_running = False
self.inputs["default"] = [self.process_input]
self.outputs = ['default']
def process_input(self, *args, **kwargs):
self.queue_in.put(*args, **kwargs)
def process_output(self, *args, **kwargs):
pass
def feed_children(self, children):
self._children = OrderedDict(children)
def is_config_valid(self):
if self.ff not in [1.0, -1.0]:
raise ValueError("ff controls the time direction, "
"it can only be +1 or -1")
if self.ff == -1.0:
raise NotImplementedError("Backward integration not yet supported.")
if (self.nend != self.iendw.size) or \
(self.nend != self.jendn.size) or \
(self.nend != self.jends.size):
raise ValueError("Wrong kill area definition")
return True
def run(self, seed_ctx):
self.is_running = True
#
# Start time loop over GCM output steps
#
# array with particle ids
out_code = []
out_id = []
self.nvals = seed_ctx.nvals
iters = seed_ctx.iters
timestamps_in = seed_ctx.timestamps_in
timestamps_out = seed_ctx.timestamps_out
all_stops = np.sort(np.unique(np.concatenate((iters, timestamps_in, timestamps_out))))
# initialize stuff
ts = 0.0 # normalised time between two GCM time steps
tt = 0.0 # time in seconds between two GCM time steps
part_count = 0
for ngi in range(1, all_stops.size):
# we are starting from the start!
# even if we will read the following
# gcm output for interpolating
t = all_stops[ngi - 1]
ngiQ = uflux = vflux = wflux = 0
t0 = t * self.gcm_dt
t1 = all_stops[ngi] * self.gcm_dt
dt = t1 - t0
# check if we reached a timestamp at which we should extract flux data
if t in iters:
# get data from the queue
ngiQ, uflux, vflux, wflux = self.queue_in.get()
ts = tt = 0.0
if ngiQ != ngi:
raise ValueError("Queue provides wrong timestep")
# remove particles that went out of the domain or other
for id in out_id:
if id >= 0:
self.exited[id] = self._children[id]
del self._children[id]
self.ntout = len(self.exited.values())
if t in timestamps_in:
new_parts = []
for i in range(seed_ctx.n_seed):
part_id = part_count
part_count += 1
p = (Particle("part{0}".format(part_id),
id=part_id,
x=seed_ctx.ijk_seed[i, 0],
y=seed_ctx.ijk_seed[i, 1],
z=seed_ctx.ijk_seed[i, 2],
i=seed_ctx.ijk_indseed[i, 0],
j=seed_ctx.ijk_indseed[i, 1],
k=seed_ctx.ijk_indseed[i, 2]
))
new_parts.append(p)
for p in new_parts:
self._children[p.id] = p
# self._children.update(new_parts)
# we store the seeding position
self.output('default', (t0, list(new_parts), None, None, None, True))
self.ntact = len(self._children.values())
out_code = np.array([0] * self.ntact)
out_id = np.array([-1] * self.ntact)
print("\nStarting numerical loop at ",
np.datetime64(self.gcm_start) +
np.timedelta64(int(t0 * 1000), "[ms]"),
"\n|.... active: {:d} ....|.... out: {:d} ....|"
.format(self.ntact, self.ntout))
if self.outfreq == "gcmstep":
outfreq = 1
elif self.outfreq == "always":
outfreq = 2
elif self.outfreq == "cross":
outfreq = 3
elif self.outfreq == "custom":
outfreq = 4
else:
raise ValueError("outfreq {0} not recognised"
.format(self.outfreq))
all_part = [np.array(a) for a in zip(*list(self._children.values()))]
times = np.array([0.0] * self.ntact) # dimension for times
rec = np.array([0] * self.ntact) # dimension for rec_out
ids, xx, yy, zz, ii, jj, kk, chxs, chys, chzs = all_part
threads = []
chunksize = int(self.ntact / self.nb_threads)
rest = self.ntact % self.nb_threads
for i in range(self.nb_threads):
begin = i*chunksize
end = (i + 1) * chunksize
if i is self.nb_threads - 1:
end += rest
num_part = end - begin
th = Thread(args=[self.nvals, num_part,
ids[begin:end], xx[begin:end], yy[begin:end], zz[begin:end],
ii[begin:end], jj[begin:end], kk[begin:end],
chxs[begin:end], chys[begin:end], chzs[begin:end],
times[begin:end], rec[begin:end],
timestamps_out, t, [ts, tt],
dt, outfreq,
self.nend, self.iendw, self.iende, self.jendn, self.jends,
self.grid.imax, self.grid.jmax, self.grid.kmax,
self.grid.dsmax, self.grid.dxyz, self.grid.dtmax,
self.grid.dstep, self.grid.CS, self.grid.SN, self.grid.dX,
self.grid.dY, self.grid.xG, self.grid.yG, self.grid.Z,
uflux, vflux, wflux, out_code[begin:end], out_id[begin:end]], target=ext_loop)
th.daemon = True
threads.append(th)
th.start()
for th in threads:
th.join()
"""
ext_loop(self.nvals, self.ntact,
ids, xx, yy, zz, ii, jj, kk, times, rec,
out_xyz, out_ijk,
timestamps_out, t, [ts, tt],
dt, outfreq,
self.nend, self.iendw, self.iende, self.jendn, self.jends,
self.grid.imax, self.grid.jmax, self.grid.kmax,
self.grid.dsmax, self.grid.dxyz, self.grid.dtmax,
self.grid.dstep, self.grid.CS, self.grid.SN, self.grid.dX,
self.grid.dY, self.grid.xG, self.grid.yG, self.grid.Z,
uflux, vflux, wflux, out_code, out_id)
"""
for i, p in enumerate(list(self._children.values())):
p.x = np.float64(xx[i])
p.y = np.float64(yy[i])
p.z = np.float64(zz[i])
p.i = np.int16(ii[i])
p.j = np.int16(jj[i])
p.k = np.int16(kk[i])
p.chx = np.float64(chxs[i])
p.chy = np.float64(chys[i])
p.chz = np.float64(chzs[i])
# pXYZ = out_ijk = position réelle dans la grille
# pIJK = position entière dans la grille
# outXYZ = position réelle en suisse
# instead of doing the netCDF writing here, we rather
# put the output in a queue, a separate thread will
# deal with it
# Note that we must store copies, otherwise we would be
# changing the output before it's written
self.output('default',
(t0,
list(self._children.values()),
times.copy(),
rec.copy(),
out_code.copy(),
False)
)
if t in timestamps_out:
timestamps_out.pop(0)
if t in timestamps_in:
timestamps_in.pop(0)
# end for cycle over iterations
# Tell the output the processing is finished
self.output('default', None)
return self._children

Event Timeline