Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F66206108
base_zeo.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sun, Jun 9, 00:21
Size
10 KB
Mime Type
text/x-python
Expires
Tue, Jun 11, 00:21 (2 d)
Engine
blob
Format
Raw Data
Handle
18186143
Attached To
R3127 blackdynamite
base_zeo.py
View Options
#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
################################################################
from
.
import
bdparser
from
.
import
bdlogging
from
.
import
base
from
.
import
conffile_zeo
from
.
import
zeoobject
from
.
import
lowercase_btree
from
.constraints_zeo
import
ZEOconstraints
################################################################
import
re
import
os
import
subprocess
import
ZEO
import
ZODB
import
sys
from
BTrees.OOBTree
import
OOSet
from
.
import
job
from
.
import
run_zeo
import
transaction
################################################################
__all__
=
[
"BaseZEO"
]
print
=
bdlogging
.
invalidPrint
logger
=
bdlogging
.
getLogger
(
__name__
)
BTree
=
lowercase_btree
.
LowerCaseBTree
################################################################
def
_transaction
(
foo
):
def
_protected_transaction
(
*
args
,
**
kwargs
):
for
attempt
in
transaction
.
manager
.
attempts
():
with
attempt
:
foo
(
*
args
,
**
kwargs
)
return
_protected_transaction
class
BaseZEO
(
base
.
AbstractBase
):
"""
"""
singleton_base
=
None
def
__init__
(
self
,
truerun
=
False
,
creation
=
False
,
read_only
=
False
,
**
kwargs
):
BaseZEO
.
singleton_base
=
self
self
.
Job
=
job
.
JobZEO
self
.
Run
=
run_zeo
.
RunZEO
self
.
ConfFile
=
conffile_zeo
.
ConfFile
self
.
BDconstraints
=
ZEOconstraints
zeo_params
=
[
"host"
]
connection_params
=
bdparser
.
filterParams
(
zeo_params
,
kwargs
)
logger
.
info
(
'connection arguments: {0}'
.
format
(
connection_params
))
self
.
filename
=
connection_params
[
'host'
]
filename_split
=
self
.
filename
.
split
(
'://'
)
if
filename_split
[
0
]
!=
'zeo'
:
raise
RuntimeError
(
f
"wrong protocol with this database: {type(self)}"
)
self
.
filename
=
filename_split
[
1
]
# logger.error(self.filename)
dirname
=
os
.
path
.
dirname
(
self
.
filename
)
# logger.error(dirname)
socket_name
=
os
.
path
.
join
(
dirname
,
'zeo.socket'
)
if
not
os
.
path
.
exists
(
socket_name
):
self
.
process
=
subprocess
.
Popen
(
f
"runzeo -f {self.filename} -a {socket_name}"
,
shell
=
True
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
)
try
:
self
.
connection
=
ZEO
.
connection
(
socket_name
,
read_only
=
read_only
,
server_sync
=
True
)
self
.
root
=
self
.
connection
.
root
logger
.
debug
(
'connected to base'
)
except
Exception
as
e
:
logger
.
error
(
"Connection failed: check your connection settings:
\n
"
+
str
(
e
))
sys
.
exit
(
-
1
)
assert
(
isinstance
(
self
.
connection
,
ZODB
.
Connection
.
Connection
))
self
.
dbhost
=
(
kwargs
[
"host"
]
if
"host"
in
kwargs
.
keys
()
else
"localhost"
)
super
()
.
__init__
(
connection
=
self
.
connection
,
truerun
=
truerun
,
creation
=
creation
,
**
kwargs
)
def
getSchemaList
(
self
,
filter_names
=
True
):
try
:
schemas
=
self
.
root
.
schemas
except
AttributeError
:
self
.
root
.
schemas
=
BTree
(
key_string
=
'study_'
)
schemas
=
self
.
root
.
schemas
filtered_schemas
=
[]
if
filter_names
is
True
:
for
s
in
schemas
:
m
=
re
.
match
(
'{0}_(.+)'
.
format
(
self
.
user
),
s
)
if
m
:
s
=
m
.
group
(
1
)
filtered_schemas
.
append
(
s
)
else
:
filtered_schemas
=
schemas
return
filtered_schemas
def
getStudySize
(
self
,
study
):
curs
=
self
.
connection
.
cursor
()
try
:
logger
.
info
(
study
)
curs
.
execute
(
"""
select sz from (SELECT SUM(pg_total_relation_size(quote_ident(schemaname)
|| '.' || quote_ident(tablename)))::BIGINT
FROM pg_tables WHERE schemaname = '{0}') as sz
"""
.
format
(
study
))
size
=
curs
.
fetchone
()[
0
]
curs
.
execute
(
"""
select pg_size_pretty(cast({0} as bigint))
"""
.
format
(
size
))
size
=
curs
.
fetchone
()[
0
]
curs
.
execute
(
"""
select count({0}.runs.id) from {0}.runs
"""
.
format
(
study
))
nruns
=
curs
.
fetchone
()[
0
]
curs
.
execute
(
"""
select count({0}.jobs.id) from {0}.jobs
"""
.
format
(
study
))
njobs
=
curs
.
fetchone
()[
0
]
except
psycopg2
.
ProgrammingError
:
self
.
connection
.
rollback
()
size
=
'????'
return
{
'size'
:
size
,
'nruns'
:
nruns
,
'njobs'
:
njobs
}
def
createSchema
(
self
,
params
=
{
"yes"
:
False
}):
# create the schema of the simulation
if
not
hasattr
(
self
.
root
,
'schemas'
):
self
.
root
.
schemas
=
BTree
(
key_string
=
'study_'
)
if
self
.
schema
in
self
.
root
.
schemas
:
validated
=
bdparser
.
validate_question
(
"Are you sure you want to drop the schema named '"
+
self
.
schema
+
"'"
,
params
,
False
)
if
validated
is
True
:
# logger.error(self.root.schemas[self.schema])
# logger.error(type(self.root.schemas[self.schema]))
del
self
.
root
.
schemas
[
self
.
schema
]
else
:
logger
.
debug
(
"creation canceled: exit program"
)
sys
.
exit
(
-
1
)
self
.
root
.
schemas
[
self
.
schema
]
=
BTree
()
self
.
root
.
schemas
[
self
.
schema
][
'Quantities'
]
=
OOSet
()
self
.
root
.
schemas
[
self
.
schema
][
'Jobs'
]
=
BTree
(
key_string
=
'job_'
)
self
.
root
.
schemas
[
self
.
schema
][
'Runs'
]
=
BTree
(
key_string
=
'run_'
)
self
.
root
.
schemas
[
self
.
schema
][
'ConfFiles'
]
=
BTree
(
key_string
=
'file_'
)
self
.
root
.
schemas
[
self
.
schema
][
'Jobs_counter'
]
=
0
self
.
root
.
schemas
[
self
.
schema
][
'Runs_counter'
]
=
0
def
prepare
(
self
,
obj
,
descriptor
):
if
not
hasattr
(
self
.
root
,
'schemas'
):
return
if
descriptor
in
self
.
root
.
schemas
[
self
.
schema
]:
desc
=
self
.
root
.
schemas
[
self
.
schema
][
descriptor
]
for
t
in
desc
.
types
.
keys
():
obj
.
types
[
t
]
=
desc
.
types
[
t
]
if
t
not
in
obj
:
obj
.
t
=
None
def
createBase
(
self
,
job_desc
,
run_desc
,
quantities
=
{},
**
kwargs
):
self
.
createSchema
(
kwargs
)
self
.
root
.
schemas
[
self
.
schema
][
'job_desc'
]
=
job_desc
self
.
root
.
schemas
[
self
.
schema
][
'run_desc'
]
=
run_desc
for
qname
,
type
in
quantities
.
items
():
self
.
pushQuantity
(
qname
,
type
)
if
self
.
truerun
:
self
.
commit
()
def
_get_jobs
(
self
):
return
self
.
root
.
schemas
[
self
.
schema
][
'Jobs'
]
@property
def
quantities
(
self
):
return
self
.
root
.
schemas
[
self
.
schema
][
'Quantities'
]
@quantities.setter
def
quantities
(
self
,
value
):
self
.
root
.
schemas
[
self
.
schema
][
'Quantities'
]
=
value
@property
def
jobs_counter
(
self
):
return
self
.
root
.
schemas
[
self
.
schema
][
'Jobs_counter'
]
@jobs_counter.setter
def
jobs_counter
(
self
,
val
):
self
.
root
.
schemas
[
self
.
schema
][
'Jobs_counter'
]
=
val
def
_get_runs
(
self
):
return
self
.
root
.
schemas
[
self
.
schema
][
'Runs'
]
@property
def
runs_counter
(
self
):
return
self
.
root
.
schemas
[
self
.
schema
][
'Runs_counter'
]
@runs_counter.setter
def
runs_counter
(
self
,
val
):
self
.
root
.
schemas
[
self
.
schema
][
'Runs_counter'
]
=
val
def
_get_conffiles
(
self
):
return
self
.
root
.
schemas
[
self
.
schema
][
'ConfFiles'
]
def
select
(
self
,
_types
,
constraints
=
None
,
sort_by
=
None
):
if
not
isinstance
(
_types
,
list
):
_types
=
[
_types
]
_type
=
_types
[
0
]
if
isinstance
(
_type
,
zeoobject
.
ZEOObject
):
_type
=
type
(
_type
)
if
_type
==
self
.
Job
:
obj_container
=
self
.
_get_jobs
()
elif
_type
==
self
.
Run
:
obj_container
=
self
.
_get_runs
()
elif
_type
==
self
.
ConfFile
:
obj_container
=
self
.
_get_conffiles
()
else
:
raise
RuntimeError
(
f
'{type(_types)}'
)
if
(
sort_by
is
not
None
)
and
(
not
isinstance
(
sort_by
,
str
)):
raise
RuntimeError
(
'sort_by argument is not correct: {0}'
.
format
(
sort_by
))
# logger.error(_type)
# logger.error(type(constraints))
# logger.error(constraints)
const
=
ZEOconstraints
(
self
,
constraints
)
condition
=
const
.
getMatchingCondition
()
obj_list
=
[]
for
key
,
obj
in
obj_container
.
items
():
obj
.
base
=
self
objs
=
[
obj
]
# logger.error(type(obj))
if
_type
==
self
.
Run
:
j
=
self
.
_get_jobs
()[
obj
.
job_id
]
j
.
base
=
self
objs
.
append
(
j
)
if
condition
(
objs
):
# logger.error(key)
# logger.error(obj)
# logger.error(_type)
if
len
(
objs
)
==
1
:
obj_list
.
append
(
objs
[
0
])
else
:
obj_list
.
append
(
objs
)
return
obj_list
@_transaction
def
insert
(
self
,
zeoobject
):
if
isinstance
(
zeoobject
,
self
.
Job
):
objs
=
self
.
_get_jobs
()
zeoobject
.
id
=
self
.
jobs_counter
self
.
jobs_counter
+=
1
elif
isinstance
(
zeoobject
,
self
.
Run
):
objs
=
self
.
_get_runs
()
zeoobject
=
zeoobject
.
copy
()
zeoobject
[
"id"
]
=
self
.
runs_counter
zeoobject
[
"state"
]
=
'CREATED'
self
.
runs_counter
+=
1
elif
isinstance
(
zeoobject
,
self
.
ConfFile
):
objs
=
self
.
_get_conffiles
()
# logger.error(f'inserting {zeoobject.id} {zeoobject["id"]}')
# logger.error(zeoobject.id)
# logger.error(zeoobject.entries['id'])
# logger.error(zeoobject)
objs
[
zeoobject
.
id
]
=
zeoobject
.
copy
()
# logger.error(f'inserted {zeoobject.id} {objs[zeoobject.id]}')
def
setObjectItemTypes
(
self
,
zeoobject
):
if
isinstance
(
zeoobject
,
self
.
Job
):
zeoobject
.
types
=
self
.
root
.
schemas
[
self
.
schema
][
'job_desc'
]
.
types
elif
isinstance
(
zeoobject
,
self
.
Run
):
zeoobject
.
types
=
self
.
root
.
schemas
[
self
.
schema
][
'run_desc'
]
.
types
else
:
raise
RuntimeError
(
f
'{type(zeoobject)}'
)
def
commit
(
self
):
import
transaction
transaction
.
commit
()
################################################################
Event Timeline
Log In to Comment