Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F92125860
threadedQueue.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sun, Nov 17, 14:18
Size
9 KB
Mime Type
text/x-python
Expires
Tue, Nov 19, 14:18 (2 d)
Engine
blob
Format
Raw Data
Handle
22377388
Attached To
rCTRACKER ctracker3
threadedQueue.py
View Options
###########################################################################
# #
# Copyright 2017 Andrea Cimatoribus #
# EPFL ENAC IIE ECOL #
# GR A1 435 (Batiment GR) #
# Station 2 #
# CH-1015 Lausanne #
# Andrea.Cimatoribus@epfl.ch #
# #
# Alexandre Serex #
# alexandre.serex@epfl.ch #
# #
# This file is part of ctracker #
# #
# ctracker is free software: you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# ctracker is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty #
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. #
# See the GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with ctracker. If not, see <http://www.gnu.org/licenses/>. #
# #
###########################################################################
import
numpy
as
np
from
queue
import
Queue
from
threading
import
Thread
from
core.parameter
import
Parameter
from
abc
import
abstractmethod
class
ThreadedQueue
(
Parameter
):
def
__init__
(
self
,
name
,
*
args
,
**
kwargs
):
super
(
ThreadedQueue
,
self
)
.
__init__
(
name
,
Queue
())
self
.
thread
=
Thread
(
*
args
,
target
=
self
.
process_output
,
**
kwargs
)
self
.
thread
.
daemon
=
True
@abstractmethod
def
process_output
(
self
):
pass
def
start
(
self
):
self
.
thread
.
start
()
class
INWorker
(
ThreadedQueue
):
def
__init__
(
self
,
name
,
iters
,
grid
,
roots
,
is2D
,
precision
,
ff
,
*
args
,
**
kwargs
):
super
(
INWorker
,
self
)
.
__init__
(
name
,
*
args
,
**
kwargs
)
self
.
iters
=
iters
self
.
grid
=
grid
self
.
u_root
=
roots
[
0
]
self
.
v_root
=
roots
[
1
]
self
.
w_root
=
roots
[
2
]
self
.
is2D
=
is2D
self
.
out_prec
=
precision
self
.
ff
=
ff
self
.
outputs
=
[
'default'
]
def
process_output
(
self
):
for
ngi
in
range
(
1
,
self
.
iters
.
size
):
it_old
=
self
.
iters
[
ngi
-
1
]
it_new
=
self
.
iters
[
ngi
]
# define flux arrays
uflux
=
np
.
zeros
((
2
,
self
.
grid
.
kmax
,
self
.
grid
.
jmax
,
self
.
grid
.
imax
+
1
),
"float64"
)
vflux
=
np
.
zeros
((
2
,
self
.
grid
.
kmax
,
self
.
grid
.
jmax
+
1
,
self
.
grid
.
imax
),
"float64"
)
wflux
=
np
.
zeros
((
2
,
self
.
grid
.
kmax
+
1
,
self
.
grid
.
jmax
,
self
.
grid
.
imax
),
"float64"
)
# read old GCM step for interpolating
# QUICK FIX IS MEH...
fstamp
=
"000{0}.data"
.
format
(
it_old
)
uflux
[
0
,
:,
:,
:
-
1
]
=
np
.
fromfile
(
self
.
u_root
+
fstamp
,
dtype
=
self
.
out_prec
)
\
.
reshape
(
self
.
grid
.
grid_shape
)[::
-
1
,
...
]
*
self
.
grid
.
dzu
*
self
.
ff
vflux
[
0
,
:,
:
-
1
,
:]
=
np
.
fromfile
(
self
.
v_root
+
fstamp
,
dtype
=
self
.
out_prec
)
\
.
reshape
(
self
.
grid
.
grid_shape
)[::
-
1
,
...
]
*
self
.
grid
.
dzv
*
self
.
ff
if
not
self
.
is2D
:
wflux
[
0
,
1
:,
:,
:]
=
np
.
fromfile
(
self
.
w_root
+
fstamp
,
dtype
=
self
.
out_prec
)
\
.
reshape
(
self
.
grid
.
grid_shape
)[::
-
1
,
...
]
*
self
.
grid
.
dxdy
*
self
.
ff
# read new GCM step for interpolating
fstamp
=
"
%010d
.data"
%
it_new
uflux
[
1
,
:,
:,
:
-
1
]
=
np
.
fromfile
(
self
.
u_root
+
fstamp
,
dtype
=
self
.
out_prec
)
\
.
reshape
(
self
.
grid
.
grid_shape
)[::
-
1
,
...
]
*
self
.
grid
.
dzu
*
self
.
ff
vflux
[
1
,
:,
:
-
1
,
:]
=
np
.
fromfile
(
self
.
v_root
+
fstamp
,
dtype
=
self
.
out_prec
)
\
.
reshape
(
self
.
grid
.
grid_shape
)[::
-
1
,
...
]
*
self
.
grid
.
dzv
*
self
.
ff
if
not
self
.
is2D
:
wflux
[
1
,
1
:,
:,
:]
=
np
.
fromfile
(
self
.
w_root
+
fstamp
,
dtype
=
self
.
out_prec
)
\
.
reshape
(
self
.
grid
.
grid_shape
)[::
-
1
,
...
]
*
self
.
grid
.
dxdy
*
self
.
ff
# self.value.put((ngi, uflux.copy(), vflux.copy(), wflux.copy()))
self
.
output
(
'default'
,
(
ngi
,
uflux
.
copy
(),
vflux
.
copy
(),
wflux
.
copy
()))
class
OUTWorker
(
ThreadedQueue
):
def
__init__
(
self
,
name
,
ijk_seed
,
xyz_seed
,
outfreq
,
*
args
,
**
kwargs
):
super
(
OUTWorker
,
self
)
.
__init__
(
name
,
*
args
,
**
kwargs
)
self
.
ijk_seed
=
ijk_seed
self
.
xyz_seed
=
xyz_seed
self
.
outfreq
=
outfreq
self
.
inputs
[
'default'
]
=
[
self
.
process_input
]
self
.
outputs
=
[
'default'
,
'sync'
]
def
process_input
(
self
,
*
args
,
**
kwargs
):
self
.
value
.
put
(
*
args
)
def
process_output
(
self
,
*
args
,
**
kwargs
):
ncall
=
0
while
True
:
if
self
.
value
.
full
():
print
(
"Warning: output queue is full "
"(slows down the execution)."
)
ncall
+=
1
in_value
=
self
.
value
.
get
()
if
in_value
is
None
:
break
t0
,
particles
,
times
,
rec_outs
,
out_c
,
init
=
in_value
# interfacing class based output with numpy number based output
ids
=
[
p
.
id
for
p
in
particles
]
active_ids
=
np
.
array
(
ids
)
if
init
:
self
.
output
(
'default'
,
(
t0
,
active_ids
,
self
.
ijk_seed
,
self
.
xyz_seed
),
init
=
True
)
self
.
value
.
task_done
()
else
:
# interfacing class based output with numpy number based output
out_code
=
np
.
array
(
out_c
,
dtype
=
"short"
)
out_tijk
=
np
.
zeros
((
len
(
particles
),
1
,
4
),
dtype
=
"f8"
)
for
i
,
p
in
enumerate
(
particles
):
out_tijk
[
i
,
rec_outs
[
i
],
0
]
=
times
[
i
]
out_tijk
[
i
,
rec_outs
[
i
],
1
]
=
p
.
x
out_tijk
[
i
,
rec_outs
[
i
],
2
]
=
p
.
y
out_tijk
[
i
,
rec_outs
[
i
],
3
]
=
p
.
z
out_xyz
=
np
.
zeros
((
len
(
particles
),
1
,
3
),
dtype
=
"f8"
)
for
i
,
p
in
enumerate
(
particles
):
out_xyz
[
i
,
rec_outs
[
i
],
0
]
=
p
.
chx
out_xyz
[
i
,
rec_outs
[
i
],
1
]
=
p
.
chy
out_xyz
[
i
,
rec_outs
[
i
],
2
]
=
p
.
chz
# write to netCDF4 file
# identify writing times
# (usually, only 1 or a few different)
towrite
=
out_tijk
[:,
:,
0
]
>=
0
wtimes
=
np
.
unique
(
out_tijk
[
towrite
,
0
])
for
wtime
in
wtimes
:
# if outfreq is 1, we know there is only one record
# to be written
if
self
.
outfreq
==
"gcmstep"
or
self
.
outfreq
==
"custom"
:
# particles running
positions
=
(
out_tijk
[:,
0
,
0
]
==
wtime
)
&
\
(
out_code
==
0
)
if
positions
.
sum
()
>
0
:
self
.
output
(
'default'
,
(
t0
+
wtime
,
active_ids
[
positions
],
out_tijk
[
positions
,
0
,
1
:],
out_xyz
[
positions
,
0
,
:]),
outc
=
out_code
[
positions
])
# particles exiting
positions
=
(
out_tijk
[:,
0
,
0
]
==
wtime
)
&
\
(
out_code
>
0
)
if
positions
.
sum
()
>
0
:
self
.
output
(
'default'
,
(
t0
+
wtime
,
active_ids
[
positions
],
out_tijk
[
positions
,
0
,
1
:],
out_xyz
[
positions
,
0
,
:]),
file
=
"inout"
,
outc
=
out_code
[
positions
])
else
:
# TODO
raise
ValueError
(
"Unimplemented"
)
self
.
value
.
task_done
()
# once in a while, make sure we sync to disk
# this is important for reading the output
# while a simulation is still running
if
(
ncall
%
100
)
==
0
:
self
.
output
(
'sync'
)
ncall
=
0
Event Timeline
Log In to Comment