Page MenuHomec4science

threadedQueue.py
No OneTemporary

File Metadata

Created
Sun, Nov 17, 10:41

threadedQueue.py

###########################################################################
# #
# Copyright 2017 Andrea Cimatoribus #
# EPFL ENAC IIE ECOL #
# GR A1 435 (Batiment GR) #
# Station 2 #
# CH-1015 Lausanne #
# Andrea.Cimatoribus@epfl.ch #
# #
# Alexandre Serex #
# alexandre.serex@epfl.ch #
# #
# This file is part of ctracker #
# #
# ctracker is free software: you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# ctracker is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty #
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. #
# See the GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with ctracker. If not, see <http://www.gnu.org/licenses/>. #
# #
###########################################################################
import numpy as np
from queue import Queue
from threading import Thread
from core.parameter import Parameter
from abc import abstractmethod
class ThreadedQueue(Parameter):
def __init__(self, name, *args, **kwargs):
super(ThreadedQueue, self).__init__(name, Queue())
self.thread = Thread(*args, target=self.process_output, **kwargs)
self.thread.daemon = True
@abstractmethod
def process_output(self):
pass
def start(self):
self.thread.start()
class INWorker(ThreadedQueue):
def __init__(self, name, iters, grid, roots, is2D, precision, ff, *args, **kwargs):
super(INWorker, self).__init__(name, *args, **kwargs)
self.iters = iters
self.grid = grid
self.u_root = roots[0]
self.v_root = roots[1]
self.w_root = roots[2]
self.is2D = is2D
self.out_prec = precision
self.ff = ff
self.outputs = ['default']
def process_output(self):
for ngi in range(1, self.iters.size):
it_old = self.iters[ngi - 1]
it_new = self.iters[ngi]
# define flux arrays
uflux = np.zeros((2, self.grid.kmax, self.grid.jmax, self.grid.imax + 1),
"float64")
vflux = np.zeros((2, self.grid.kmax, self.grid.jmax + 1, self.grid.imax),
"float64")
wflux = np.zeros((2, self.grid.kmax + 1, self.grid.jmax, self.grid.imax),
"float64")
# read old GCM step for interpolating
# QUICK FIX IS MEH...
fstamp = "000{0}.data".format(it_old)
uflux[0, :, :, :-1] = np.fromfile(self.u_root + fstamp,
dtype=self.out_prec) \
.reshape(self.grid.grid_shape)[::-1, ...] * self.grid.dzu * self.ff
vflux[0, :, :-1, :] = np.fromfile(self.v_root + fstamp,
dtype=self.out_prec) \
.reshape(self.grid.grid_shape)[::-1, ...] * self.grid.dzv * self.ff
if not self.is2D:
wflux[0, 1:, :, :] = np.fromfile(self.w_root + fstamp,
dtype=self.out_prec) \
.reshape(self.grid.grid_shape)[::-1, ...] * self.grid.dxdy * self.ff
# read new GCM step for interpolating
fstamp = "%010d.data" % it_new
uflux[1, :, :, :-1] = np.fromfile(self.u_root + fstamp,
dtype=self.out_prec) \
.reshape(self.grid.grid_shape)[::-1, ...] * self.grid.dzu * self.ff
vflux[1, :, :-1, :] = np.fromfile(self.v_root + fstamp,
dtype=self.out_prec) \
.reshape(self.grid.grid_shape)[::-1, ...] * self.grid.dzv * self.ff
if not self.is2D:
wflux[1, 1:, :, :] = np.fromfile(self.w_root + fstamp,
dtype=self.out_prec) \
.reshape(self.grid.grid_shape)[::-1, ...] * self.grid.dxdy * self.ff
# self.value.put((ngi, uflux.copy(), vflux.copy(), wflux.copy()))
self.output('default', (ngi, uflux.copy(), vflux.copy(), wflux.copy()))
class OUTWorker(ThreadedQueue):
def __init__(self, name, ijk_seed, xyz_seed, outfreq, *args, **kwargs):
super(OUTWorker, self).__init__(name, *args, **kwargs)
self.ijk_seed = ijk_seed
self.xyz_seed = xyz_seed
self.outfreq = outfreq
self.inputs['default'] = [self.process_input]
self.outputs = ['default', 'sync']
def process_input(self, *args, **kwargs):
self.value.put(*args)
def process_output(self, *args, **kwargs):
ncall = 0
while True:
if self.value.full():
print("Warning: output queue is full "
"(slows down the execution).")
ncall += 1
in_value = self.value.get()
if in_value is None:
break
t0, particles, times, rec_outs, out_c, init = in_value
# interfacing class based output with numpy number based output
ids = [p.id for p in particles]
active_ids = np.array(ids)
if init:
self.output('default', (t0, active_ids, self.ijk_seed, self.xyz_seed), init=True)
self.value.task_done()
else:
# interfacing class based output with numpy number based output
out_code = np.array(out_c, dtype="short")
out_tijk = np.zeros((len(particles), 1, 4),
dtype="f8")
for i, p in enumerate(particles):
out_tijk[i, rec_outs[i], 0] = times[i]
out_tijk[i, rec_outs[i], 1] = p.x
out_tijk[i, rec_outs[i], 2] = p.y
out_tijk[i, rec_outs[i], 3] = p.z
out_xyz = np.zeros((len(particles), 1, 3),
dtype="f8")
for i, p in enumerate(particles):
out_xyz[i, rec_outs[i], 0] = p.chx
out_xyz[i, rec_outs[i], 1] = p.chy
out_xyz[i, rec_outs[i], 2] = p.chz
# write to netCDF4 file
# identify writing times
# (usually, only 1 or a few different)
towrite = out_tijk[:, :, 0] >= 0
wtimes = np.unique(out_tijk[towrite, 0])
for wtime in wtimes:
# if outfreq is 1, we know there is only one record
# to be written
if self.outfreq == "gcmstep" or self.outfreq == "custom":
# particles running
positions = (out_tijk[:, 0, 0] == wtime) & \
(out_code == 0)
if positions.sum() > 0:
self.output('default',
(t0 + wtime,
active_ids[positions],
out_tijk[positions, 0, 1:],
out_xyz[positions, 0, :]),
outc=out_code[positions])
# particles exiting
positions = (out_tijk[:, 0, 0] == wtime) & \
(out_code > 0)
if positions.sum() > 0:
self.output('default',
(t0 + wtime,
active_ids[positions],
out_tijk[positions, 0, 1:],
out_xyz[positions, 0, :]),
file="inout",
outc=out_code[positions])
else:
# TODO
raise ValueError("Unimplemented")
self.value.task_done()
# once in a while, make sure we sync to disk
# this is important for reading the output
# while a simulation is still running
if (ncall % 100) == 0:
self.output('sync')
ncall = 0

Event Timeline