Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F83200583
run_zeo.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sun, Sep 15, 17:55
Size
13 KB
Mime Type
text/x-python
Expires
Tue, Sep 17, 17:55 (1 d, 21 h)
Engine
blob
Format
Raw Data
Handle
20804129
Attached To
R3127 blackdynamite
run_zeo.py
View Options
#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
################################################################
import
BTrees
from
.
import
conffile_zeo
from
.
import
zeoobject
from
.
import
bdparser
from
.
import
base
from
.base_zeo
import
BaseZEO
from
.
import
runselector
from
.
import
bdlogging
from
.
import
lowercase_btree
################################################################
import
numpy
as
np
import
datetime
import
subprocess
import
socket
import
os
################################################################
__all__
=
[
'RunZEO'
,
'getRunFromScript'
]
print
=
bdlogging
.
invalidPrint
logger
=
bdlogging
.
getLogger
(
__name__
)
# PBTree = lowercase_btree.PersistentLowerCaseBTree
BTree
=
BTrees
.
OOBTree
.
BTree
################################################################
class
RunZEO
(
zeoobject
.
ZEOObject
):
"""
"""
table_name
=
'runs'
def
getJob
(
self
):
return
BaseZEO
.
singleton_base
.
getJobFromID
(
self
.
entries
[
"job_id"
])
def
start
(
self
):
# logger.error(self.entries['state'])
self
.
entries
[
'state'
]
=
'START'
# logger.error(self['state'])
logger
.
debug
(
'starting run'
)
BaseZEO
.
singleton_base
.
commit
()
logger
.
debug
(
'commited'
)
def
finish
(
self
):
self
.
entries
[
'state'
]
=
'FINISHED'
logger
.
debug
(
'finish run'
)
BaseZEO
.
singleton_base
.
commit
()
logger
.
debug
(
'commited'
)
def
attachToJob
(
self
,
job
):
# logger.error(f"attach job {job.id}")
self
[
"job_id"
]
=
job
.
id
BaseZEO
.
singleton_base
.
insert
(
self
)
def
getExecFile
(
self
):
return
self
.
getUpdatedConfigFile
(
self
.
entries
[
"exec"
])
def
setExecFile
(
self
,
file_name
,
**
kwargs
):
# check if the file is already in the config files
for
f
in
self
.
configfiles
:
if
f
.
entries
[
"filename"
]
==
file_name
:
self
.
execfile
=
f
self
.
entries
[
"exec"
]
=
f
.
id
return
f
.
id
# the file is not in the current config files
# so it has to be added
conf
=
conffile_zeo
.
addFile
(
file_name
,
BaseZEO
.
singleton_base
,
**
kwargs
)
self
.
configfiles
.
append
(
conf
)
self
.
execfile
=
conf
self
.
entries
[
"exec"
]
=
conf
.
id
return
conf
.
id
def
listFiles
(
self
,
subdir
=
""
):
"""List files in run directory / specified sub-directory"""
command
=
'ls {0}'
.
format
(
os
.
path
.
join
(
self
[
'run_path'
],
subdir
))
if
not
self
[
'machine_name'
]
==
socket
.
gethostname
():
command
=
'ssh {0} "{1}"'
.
format
(
self
[
'machine_name'
],
command
)
logger
.
info
(
command
)
p
=
subprocess
.
Popen
(
command
,
shell
=
True
,
stdout
=
subprocess
.
PIPE
)
out
=
p
.
stdout
.
readlines
()
out
=
[
o
.
strip
()
.
decode
()
for
o
in
out
]
return
out
def
getFile
(
self
,
filename
,
outpath
=
'/tmp'
):
dest_path
=
os
.
path
.
join
(
outpath
,
"BD-"
+
self
.
base
.
schema
+
"-cache"
,
"run-{0}"
.
format
(
self
.
id
))
dest_file
=
os
.
path
.
join
(
dest_path
,
filename
)
full_filename
=
self
.
getFullFileName
(
filename
)
# Check if file is local
if
os
.
path
.
isfile
(
full_filename
):
return
full_filename
# If file is distant, prepare cache directory hierarchy
dest_path
=
os
.
path
.
dirname
(
dest_file
)
logger
.
debug
(
'Directories: '
+
dest_path
)
logger
.
debug
(
'File: '
+
dest_file
)
# Making directories
try
:
os
.
makedirs
(
dest_path
,
exist_ok
=
True
)
except
Exception
as
e
:
logger
.
error
(
e
)
pass
if
os
.
path
.
isfile
(
dest_file
):
logger
.
info
(
'File {} already cached'
.
format
(
dest_file
))
return
dest_file
cmd
=
'scp {0}:{1} {2}'
.
format
(
self
[
'machine_name'
],
self
.
getFullFileName
(
filename
),
dest_file
)
logger
.
info
(
cmd
)
p
=
subprocess
.
Popen
(
cmd
,
shell
=
True
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
)
errors
=
bytes
(
p
.
stderr
.
read
())
.
decode
()
.
strip
()
if
errors
:
logger
.
warning
(
errors
)
return
dest_file
def
getFullFileName
(
self
,
filename
):
return
os
.
path
.
join
(
self
[
'run_path'
],
filename
)
def
addConfigFiles
(
self
,
file_list
,
regex_params
=
None
):
if
not
type
(
file_list
)
==
list
:
file_list
=
[
file_list
]
params_list
=
list
(
self
.
types
.
keys
())
myjob
=
BaseZEO
.
singleton_base
.
Job
(
BaseZEO
.
singleton_base
)
params_list
+=
list
(
myjob
.
types
.
keys
())
# logger.debug (regex_params)
file_ids
=
[
f
.
id
for
f
in
self
.
configfiles
]
files_to_add
=
[
conffile_zeo
.
addFile
(
fname
,
BaseZEO
.
singleton_base
,
regex_params
=
regex_params
,
params
=
params_list
)
for
fname
in
file_list
]
for
f
in
files_to_add
:
if
(
f
.
id
not
in
file_ids
):
self
.
configfiles
.
append
(
f
)
BaseZEO
.
singleton_base
.
commit
()
return
self
.
configfiles
def
getConfigFiles
(
self
):
return
self
.
configfiles
def
getConfigFile
(
self
,
file_id
):
for
f
in
self
.
configfiles
:
if
f
.
id
==
file_id
:
return
f
def
replaceBlackDynamiteVariables
(
self
,
text
):
myjob
=
BaseZEO
.
singleton_base
.
Job
(
BaseZEO
.
singleton_base
)
myjob
[
"id"
]
=
self
.
entries
[
"job_id"
]
myjob
=
myjob
.
getMatchedObjectList
()[
0
]
for
key
,
val
in
myjob
.
entries
.
items
():
tmp
=
text
.
replace
(
"__BLACKDYNAMITE__"
+
key
+
"__"
,
str
(
val
))
if
((
not
tmp
==
text
)
and
val
is
None
):
raise
Exception
(
"unset job parameter "
+
key
)
text
=
tmp
for
key
,
val
in
self
.
entries
.
items
():
tmp
=
text
.
replace
(
"__BLACKDYNAMITE__"
+
key
+
"__"
,
str
(
val
))
if
((
not
tmp
==
text
)
and
val
is
None
):
logger
.
debug
(
self
.
entries
)
raise
Exception
(
"unset run parameter "
+
key
)
text
=
tmp
text
=
text
.
replace
(
"__BLACKDYNAMITE__dbhost__"
,
BaseZEO
.
singleton_base
.
dbhost
)
text
=
text
.
replace
(
"__BLACKDYNAMITE__study__"
,
BaseZEO
.
singleton_base
.
schema
)
text
=
text
.
replace
(
"__BLACKDYNAMITE__run_id__"
,
str
(
self
.
id
))
return
text
def
getUpdatedConfigFile
(
self
,
file_id
):
conf
=
self
.
getConfigFile
(
file_id
)
conf
[
"file"
]
=
self
.
replaceBlackDynamiteVariables
(
conf
[
"file"
])
return
conf
def
listQuantities
(
self
):
return
BaseZEO
.
singleton_base
.
quantities
def
getScalarQuantities
(
self
,
names
,
additional_request
=
None
):
self
.
base
.
commit
()
# logger.error([q for q in self.quantities])
# logger.error(names)
return
[(
q
,
np
.
array
(
self
.
quantities
[
q
]))
for
q
in
names
if
q
in
self
.
quantities
]
def
getLastStep
(
self
):
if
'last_step'
in
self
.
entries
:
return
self
.
last_step
,
self
.
last_step_time
else
:
return
None
,
None
def
getQuantityID
(
self
,
name
,
is_integer
=
None
,
is_vector
=
None
):
request
=
"""
SELECT id,is_integer,is_vector FROM {0}.quantities WHERE (name) = (%s)
"""
.
format
(
self
.
base
.
schema
)
curs
=
self
.
base
.
performRequest
(
request
,
[
name
])
item
=
curs
.
fetchone
()
if
(
item
is
None
):
raise
Exception
(
"unknown quantity
\"
"
+
name
+
"
\"
"
)
if
((
is_integer
is
not
None
)
and
(
not
is_integer
==
item
[
1
])):
raise
Exception
(
"quantity
\"
"
+
name
+
"
\"
has is_integer = "
+
str
(
item
[
1
]))
if
((
is_vector
is
not
None
)
and
(
not
is_vector
==
item
[
2
])):
raise
Exception
(
"quantity
\"
"
+
name
+
"
\"
has is_vector = "
+
str
(
item
[
2
]))
return
item
[
0
],
item
[
1
],
item
[
2
]
def
getScalarQuantity
(
self
,
name
,
additional_request
=
None
):
res
=
np
.
array
(
self
.
quantities
[
name
])
# logger.error(res)
return
res
if
(
additional_request
):
raise
RuntimeError
(
f
'need code review {additional_request}'
)
def
getVectorQuantity
(
self
,
name
,
step
):
quantity_id
,
is_integer
,
is_vector
=
self
.
getQuantityID
(
name
)
if
(
is_vector
is
False
):
raise
Exception
(
"Quantity "
+
name
+
" is not vectorial"
)
request
=
"""
SELECT measurement from {0}.{1} WHERE (run_id,quantity_id,step) = ({2},{3},{4})
"""
.
format
(
self
.
base
.
schema
,
"vector_real"
if
(
is_integer
is
False
)
else
"vector_integer"
,
self
.
id
,
quantity_id
,
step
)
curs
=
self
.
base
.
performRequest
(
request
,
[
name
])
fetch
=
curs
.
fetchone
()
if
(
fetch
):
return
np
.
array
(
fetch
[
0
])
return
None
def
pushVectorQuantity
(
self
,
vec
,
step
,
name
,
is_integer
,
description
=
None
):
self
.
pushScalarQuantity
(
vec
,
step
,
name
,
is_integer
,
description
=
None
)
@zeoobject._transaction
def
pushScalarQuantity
(
self
,
val
,
step
,
name
,
is_integer
,
description
=
None
):
# logger.error(f'pushing {name} {step} {val}')
quantities
=
BaseZEO
.
singleton_base
.
quantities
quantities
.
add
(
name
)
if
name
not
in
self
.
quantities
:
logger
.
info
(
f
'create quantity {name}'
)
self
.
quantities
[
name
]
=
[]
self
.
quantities
[
name
]
.
append
((
step
,
val
))
# logger.error(self.quantities)
# logger.error(type(self.quantities[name]))
# logger.error(f'current quantity {self.quantities[name]}')
# logger.error(self._p_changed)
# logger.error(self.quantities._p_changed)
self
.
_p_changed
=
True
self
.
quantities
.
_p_changed
=
True
# logger.error(self._p_changed)
# logger.error(self.quantities._p_changed)
BaseZEO
.
singleton_base
.
commit
()
# logger.error(self._p_changed)
# logger.error(self.quantities._p_changed)
def
getAllVectorQuantity
(
self
,
name
):
quantity_id
,
is_integer
,
is_vector
=
self
.
getQuantityID
(
name
,
is_vector
=
True
)
request
=
"""
SELECT step,measurement from {0}.{1}
WHERE (run_id,quantity_id) = ({2},{3}) order by step
"""
.
format
(
self
.
base
.
schema
,
"vector_real"
if
is_integer
is
False
else
"vector_integer"
,
self
.
id
,
quantity_id
)
curs
=
self
.
base
.
performRequest
(
request
,
[
name
])
fetch
=
curs
.
fetchall
()
if
(
not
fetch
):
return
[
None
,
None
]
matres
=
np
.
array
([
val
[
1
]
for
val
in
fetch
])
stepres
=
np
.
array
([
val
[
0
]
for
val
in
fetch
])
return
(
stepres
,
matres
)
def
deleteData
(
self
):
request
,
params
=
(
"DELETE FROM {0}.scalar_real WHERE run_id={1}"
.
format
(
self
.
base
.
schema
,
self
.
id
),
[])
self
.
base
.
performRequest
(
request
,
params
)
request
,
params
=
(
"DELETE FROM {0}.scalar_integer WHERE run_id={1}"
.
format
(
self
.
base
.
schema
,
self
.
id
),
[])
self
.
base
.
performRequest
(
request
,
params
)
request
,
params
=
(
"DELETE FROM {0}.vector_real WHERE run_id={1}"
.
format
(
self
.
base
.
schema
,
self
.
id
),
[])
self
.
base
.
performRequest
(
request
,
params
)
request
,
params
=
(
"DELETE FROM {0}.vector_integer WHERE run_id={1}"
.
format
(
self
.
base
.
schema
,
self
.
id
),
[])
self
.
base
.
performRequest
(
request
,
params
)
def
__init__
(
self
,
base
):
super
()
.
__init__
(
base
)
self
.
configfiles
=
[]
self
.
quantities
=
BTree
()
# logger.error(self.quantities)
base
.
prepare
(
self
,
'run_desc'
)
self
[
'id'
]
=
None
self
.
types
[
'id'
]
=
int
self
.
types
[
"machine_name"
]
=
str
self
.
types
[
"run_path"
]
=
str
self
.
allowNull
[
"run_path"
]
=
True
self
.
types
[
"job_id"
]
=
int
self
.
types
[
"nproc"
]
=
int
self
.
types
[
"run_name"
]
=
str
self
.
types
[
"wait_id"
]
=
int
self
.
allowNull
[
"wait_id"
]
=
True
self
.
types
[
"start_time"
]
=
datetime
.
datetime
self
.
allowNull
[
"start_time"
]
=
True
self
.
types
[
"state"
]
=
str
self
.
allowNull
[
"state"
]
=
True
self
.
execfile
=
None
self
.
types
[
"exec"
]
=
str
self
.
types
[
"last_step"
]
=
int
self
.
types
[
"last_step_time"
]
=
datetime
.
datetime
self
[
"last_step"
]
=
None
self
[
"last_step_time"
]
=
None
self
[
"start_time"
]
=
None
self
[
"wait_id"
]
=
None
################################################################
def
getRunFromScript
():
parser
=
bdparser
.
BDParser
()
parser
.
register_params
(
params
=
{
"run_id"
:
int
})
params
=
parser
.
parseBDParameters
(
argv
=
[])
mybase
=
base
.
Base
(
**
params
)
runSelector
=
runselector
.
RunSelector
(
mybase
)
run_list
=
runSelector
.
selectRuns
(
params
)
if
len
(
run_list
)
>
1
:
raise
Exception
(
'internal error'
)
if
len
(
run_list
)
==
0
:
raise
Exception
(
'internal error'
)
myrun
,
myjob
=
run_list
[
0
]
# myrun.setEntries(params)
return
myrun
,
myjob
Event Timeline
Log In to Comment