Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F87622261
particleContainer.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sun, Oct 13, 18:23
Size
11 KB
Mime Type
text/x-python
Expires
Tue, Oct 15, 18:23 (2 d)
Engine
blob
Format
Raw Data
Handle
21574257
Attached To
rCTRACKER ctracker3
particleContainer.py
View Options
###########################################################################
# #
# Copyright 2017 Andrea Cimatoribus #
# EPFL ENAC IIE ECOL #
# GR A1 435 (Batiment GR) #
# Station 2 #
# CH-1015 Lausanne #
# Andrea.Cimatoribus@epfl.ch #
# #
# Alexandre Serex #
# alexandre.serex@epfl.ch #
# #
# This file is part of ctracker #
# #
# ctracker is free software: you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# ctracker is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty #
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. #
# See the GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with ctracker. If not, see <http://www.gnu.org/licenses/>. #
# #
###########################################################################
import
numpy
as
np
from
queue
import
Queue
from
collections
import
OrderedDict
from
threading
import
Thread
from
core.parameterContainer
import
ParameterContainer
from
parameters.particle
import
Particle
from
tools.CFunctions
import
ext_loop
class
ParticleContainer
(
ParameterContainer
):
def
__init__
(
self
,
name
,
data
,
grid
):
super
(
ParticleContainer
,
self
)
.
__init__
(
name
,
data
)
'''
Initializes the seeding frequency and position and seeding steps for the simulation
'''
self
.
outfreq
=
data
.
outfreq
self
.
ff
=
float
(
data
.
ff
)
self
.
gcm_geometry
=
data
.
gcm_geometry
self
.
grid
=
grid
self
.
gcm_dt
=
data
.
gcm_dt
self
.
out_dt
=
data
.
out_dt
self
.
gcm_start
=
data
.
gcm_start
self
.
iende
=
np
.
asarray
(
data
.
iende
)
self
.
iendw
=
np
.
asarray
(
data
.
iendw
)
self
.
jendn
=
np
.
asarray
(
data
.
jendn
)
self
.
jends
=
np
.
asarray
(
data
.
jends
)
self
.
nend
=
self
.
iende
.
size
# Total number of active particles
self
.
ntact
=
0
# Number of particles which exited the domain
self
.
ntout
=
0
self
.
queue_in
=
Queue
(
maxsize
=
3
)
# self.queue_out = Queue(maxsize=50)
self
.
ntact
=
0
self
.
_children
=
OrderedDict
()
# current tracked particles
self
.
exited
=
{}
self
.
nvals
=
1
self
.
nb_threads
=
data
.
nb_threads
;
self
.
is_running
=
False
self
.
inputs
[
"default"
]
=
[
self
.
process_input
]
self
.
outputs
=
[
'default'
]
def
process_input
(
self
,
*
args
,
**
kwargs
):
self
.
queue_in
.
put
(
*
args
,
**
kwargs
)
def
process_output
(
self
,
*
args
,
**
kwargs
):
pass
def
feed_children
(
self
,
children
):
self
.
_children
=
OrderedDict
(
children
)
def
is_config_valid
(
self
):
if
self
.
ff
not
in
[
1.0
,
-
1.0
]:
raise
ValueError
(
"ff controls the time direction, "
"it can only be +1 or -1"
)
if
self
.
ff
==
-
1.0
:
raise
NotImplementedError
(
"Backward integration not yet supported."
)
if
(
self
.
nend
!=
self
.
iendw
.
size
)
or
\
(
self
.
nend
!=
self
.
jendn
.
size
)
or
\
(
self
.
nend
!=
self
.
jends
.
size
):
raise
ValueError
(
"Wrong kill area definition"
)
return
True
def
run
(
self
,
seed_ctx
):
self
.
is_running
=
True
#
# Start time loop over GCM output steps
#
# array with particle ids
out_code
=
[]
out_id
=
[]
self
.
nvals
=
seed_ctx
.
nvals
iters
=
seed_ctx
.
iters
timestamps_in
=
seed_ctx
.
timestamps_in
timestamps_out
=
seed_ctx
.
timestamps_out
all_stops
=
np
.
sort
(
np
.
unique
(
np
.
concatenate
((
iters
,
timestamps_in
,
timestamps_out
))))
# initialize stuff
ts
=
0.0
# normalised time between two GCM time steps
tt
=
0.0
# time in seconds between two GCM time steps
part_count
=
0
for
ngi
in
range
(
1
,
all_stops
.
size
):
# we are starting from the start!
# even if we will read the following
# gcm output for interpolating
t
=
all_stops
[
ngi
-
1
]
ngiQ
=
uflux
=
vflux
=
wflux
=
0
t0
=
t
*
self
.
gcm_dt
t1
=
all_stops
[
ngi
]
*
self
.
gcm_dt
dt
=
t1
-
t0
# check if we reached a timestamp at which we should extract flux data
if
t
in
iters
:
# get data from the queue
ngiQ
,
uflux
,
vflux
,
wflux
=
self
.
queue_in
.
get
()
ts
=
tt
=
0.0
if
ngiQ
!=
ngi
:
raise
ValueError
(
"Queue provides wrong timestep"
)
# remove particles that went out of the domain or other
for
id
in
out_id
:
if
id
>=
0
:
self
.
exited
[
id
]
=
self
.
_children
[
id
]
del
self
.
_children
[
id
]
self
.
ntout
=
len
(
self
.
exited
.
values
())
if
t
in
timestamps_in
:
new_parts
=
[]
for
i
in
range
(
seed_ctx
.
n_seed
):
part_id
=
part_count
part_count
+=
1
p
=
(
Particle
(
"part{0}"
.
format
(
part_id
),
id
=
part_id
,
x
=
seed_ctx
.
ijk_seed
[
i
,
0
],
y
=
seed_ctx
.
ijk_seed
[
i
,
1
],
z
=
seed_ctx
.
ijk_seed
[
i
,
2
],
i
=
seed_ctx
.
ijk_indseed
[
i
,
0
],
j
=
seed_ctx
.
ijk_indseed
[
i
,
1
],
k
=
seed_ctx
.
ijk_indseed
[
i
,
2
]
))
new_parts
.
append
(
p
)
for
p
in
new_parts
:
self
.
_children
[
p
.
id
]
=
p
# self._children.update(new_parts)
# we store the seeding position
self
.
output
(
'default'
,
(
t0
,
list
(
new_parts
),
None
,
None
,
None
,
True
))
self
.
ntact
=
len
(
self
.
_children
.
values
())
out_code
=
np
.
array
([
0
]
*
self
.
ntact
)
out_id
=
np
.
array
([
-
1
]
*
self
.
ntact
)
print
(
"
\n
Starting numerical loop at "
,
np
.
datetime64
(
self
.
gcm_start
)
+
np
.
timedelta64
(
int
(
t0
*
1000
),
"[ms]"
),
"
\n
|.... active: {:d} ....|.... out: {:d} ....|"
.
format
(
self
.
ntact
,
self
.
ntout
))
if
self
.
outfreq
==
"gcmstep"
:
outfreq
=
1
elif
self
.
outfreq
==
"always"
:
outfreq
=
2
elif
self
.
outfreq
==
"cross"
:
outfreq
=
3
elif
self
.
outfreq
==
"custom"
:
outfreq
=
4
else
:
raise
ValueError
(
"outfreq {0} not recognised"
.
format
(
self
.
outfreq
))
all_part
=
[
np
.
array
(
a
)
for
a
in
zip
(
*
list
(
self
.
_children
.
values
()))]
times
=
np
.
array
([
0.0
]
*
self
.
ntact
)
# dimension for times
rec
=
np
.
array
([
0
]
*
self
.
ntact
)
# dimension for rec_out
ids
,
xx
,
yy
,
zz
,
ii
,
jj
,
kk
,
chxs
,
chys
,
chzs
=
all_part
threads
=
[]
chunksize
=
int
(
self
.
ntact
/
self
.
nb_threads
)
rest
=
self
.
ntact
%
self
.
nb_threads
for
i
in
range
(
self
.
nb_threads
):
begin
=
i
*
chunksize
end
=
(
i
+
1
)
*
chunksize
if
i
is
self
.
nb_threads
-
1
:
end
+=
rest
num_part
=
end
-
begin
th
=
Thread
(
args
=
[
self
.
nvals
,
num_part
,
ids
[
begin
:
end
],
xx
[
begin
:
end
],
yy
[
begin
:
end
],
zz
[
begin
:
end
],
ii
[
begin
:
end
],
jj
[
begin
:
end
],
kk
[
begin
:
end
],
chxs
[
begin
:
end
],
chys
[
begin
:
end
],
chzs
[
begin
:
end
],
times
[
begin
:
end
],
rec
[
begin
:
end
],
timestamps_out
,
t
,
[
ts
,
tt
],
dt
,
outfreq
,
self
.
nend
,
self
.
iendw
,
self
.
iende
,
self
.
jendn
,
self
.
jends
,
self
.
grid
.
imax
,
self
.
grid
.
jmax
,
self
.
grid
.
kmax
,
self
.
grid
.
dsmax
,
self
.
grid
.
dxyz
,
self
.
grid
.
dtmax
,
self
.
grid
.
dstep
,
self
.
grid
.
CS
,
self
.
grid
.
SN
,
self
.
grid
.
dX
,
self
.
grid
.
dY
,
self
.
grid
.
xG
,
self
.
grid
.
yG
,
self
.
grid
.
Z
,
uflux
,
vflux
,
wflux
,
out_code
[
begin
:
end
],
out_id
[
begin
:
end
]],
target
=
ext_loop
)
th
.
daemon
=
True
threads
.
append
(
th
)
th
.
start
()
for
th
in
threads
:
th
.
join
()
"""
ext_loop(self.nvals, self.ntact,
ids, xx, yy, zz, ii, jj, kk, times, rec,
out_xyz, out_ijk,
timestamps_out, t, [ts, tt],
dt, outfreq,
self.nend, self.iendw, self.iende, self.jendn, self.jends,
self.grid.imax, self.grid.jmax, self.grid.kmax,
self.grid.dsmax, self.grid.dxyz, self.grid.dtmax,
self.grid.dstep, self.grid.CS, self.grid.SN, self.grid.dX,
self.grid.dY, self.grid.xG, self.grid.yG, self.grid.Z,
uflux, vflux, wflux, out_code, out_id)
"""
for
i
,
p
in
enumerate
(
list
(
self
.
_children
.
values
())):
p
.
x
=
np
.
float64
(
xx
[
i
])
p
.
y
=
np
.
float64
(
yy
[
i
])
p
.
z
=
np
.
float64
(
zz
[
i
])
p
.
i
=
np
.
int16
(
ii
[
i
])
p
.
j
=
np
.
int16
(
jj
[
i
])
p
.
k
=
np
.
int16
(
kk
[
i
])
p
.
chx
=
np
.
float64
(
chxs
[
i
])
p
.
chy
=
np
.
float64
(
chys
[
i
])
p
.
chz
=
np
.
float64
(
chzs
[
i
])
# pXYZ = out_ijk = position réelle dans la grille
# pIJK = position entière dans la grille
# outXYZ = position réelle en suisse
# instead of doing the netCDF writing here, we rather
# put the output in a queue, a separate thread will
# deal with it
# Note that we must store copies, otherwise we would be
# changing the output before it's written
self
.
output
(
'default'
,
(
t0
,
list
(
self
.
_children
.
values
()),
times
.
copy
(),
rec
.
copy
(),
out_code
.
copy
(),
False
)
)
if
t
in
timestamps_out
:
timestamps_out
.
pop
(
0
)
if
t
in
timestamps_in
:
timestamps_in
.
pop
(
0
)
# end for cycle over iterations
# Tell the output the processing is finished
self
.
output
(
'default'
,
None
)
return
self
.
_children
Event Timeline
Log In to Comment