Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F91898010
base.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Fri, Nov 15, 13:53
Size
10 KB
Mime Type
text/x-python
Expires
Sun, Nov 17, 13:53 (2 d)
Engine
blob
Format
Raw Data
Handle
22335664
Attached To
R3127 blackdynamite
base.py
View Options
#!/usr/bin/env python
from
__future__
import
print_function
__all__
=
[
"Base"
]
import
job
import
os
import
psycopg2
import
re
import
copy
import
numpy
as
np
import
psycopg2
import
bdparser
import
sys
import
getpass
import
datetime
import
run
################################################################
import
bdlogging
,
logging
print
=
bdlogging
.
invalidPrint
logger
=
logging
.
getLogger
(
__name__
)
################################################################
class
Base
(
object
):
"""
"""
def
createBase
(
self
,
job_desc
,
run_desc
,
quantities
=
{},
**
kwargs
):
#logger.debug (quantities)
self
.
createSchema
(
kwargs
)
self
.
createTable
(
job_desc
)
self
.
createTable
(
run_desc
)
self
.
createGenericTables
()
for
qname
,
type
in
quantities
.
iteritems
():
self
.
pushQuantity
(
qname
,
type
)
if
(
self
.
truerun
):
self
.
commit
()
def
getObject
(
self
,
sqlobject
):
curs
=
self
.
connection
.
cursor
()
curs
.
execute
(
"SELECT * FROM {0}.{1} WHERE id = {2}"
.
format
(
self
.
schema
,
sqlobject
.
table_name
,
sqlobject
.
id
))
col_info
=
self
.
getColumnProperties
(
sqlobject
)
line
=
curs
.
fetchone
()
for
i
in
range
(
0
,
len
(
col_info
)):
col_name
=
col_info
[
i
][
0
]
sqlobject
[
col_name
]
=
line
[
i
]
def
createSchema
(
self
,
params
=
{
"yes"
:
False
}):
# create the schema of the simulation
curs
=
self
.
connection
.
cursor
()
curs
.
execute
(
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{0}'"
.
format
(
self
.
schema
)
.
lower
())
if
(
curs
.
rowcount
):
validated
=
bdparser
.
validate_question
(
"Are you sure you want to drop the schema named '"
+
self
.
schema
+
"'"
,
params
,
False
)
if
(
validated
==
True
):
curs
.
execute
(
"DROP SCHEMA {0} cascade"
.
format
(
self
.
schema
))
else
:
logger
.
debug
(
"creation canceled: exit program"
)
sys
.
exit
(
-
1
)
curs
.
execute
(
"CREATE SCHEMA {0}"
.
format
(
self
.
schema
))
def
createTypeCodes
(
self
):
curs
=
self
.
connection
.
cursor
()
curs
.
execute
(
"SELECT typname,oid from pg_type;"
)
self
.
type_code
=
{}
for
i
in
curs
:
# logger.debug (i[0])
if
(
i
[
0
]
==
'float8'
):
self
.
type_code
[
i
[
1
]]
=
float
if
(
i
[
0
]
==
'text'
):
self
.
type_code
[
i
[
1
]]
=
str
if
(
i
[
0
]
==
'int8'
):
self
.
type_code
[
i
[
1
]]
=
int
if
(
i
[
0
]
==
'int4'
):
self
.
type_code
[
i
[
1
]]
=
int
if
(
i
[
0
]
==
'bool'
):
self
.
type_code
[
i
[
1
]]
=
bool
if
(
i
[
0
]
==
'timestamp'
):
self
.
type_code
[
i
[
1
]]
=
datetime
.
datetime
def
createTable
(
self
,
object
):
request
=
object
.
createTableRequest
()
curs
=
self
.
connection
.
cursor
()
# logger.debug (request)
curs
.
execute
(
request
)
def
createGenericTables
(
self
,):
sql_script_name
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
"build_tables.sql"
)
curs
=
self
.
connection
.
cursor
()
# create generic tables
query_list
=
list
()
with
open
(
sql_script_name
,
"r"
)
as
fh
:
for
line
in
fh
:
query_list
.
append
(
re
.
sub
(
"SCHEMAS_IDENTIFIER"
,
self
.
schema
,
line
))
curs
.
execute
(
"
\n
"
.
join
(
query_list
))
def
getColumnProperties
(
self
,
sqlobject
):
curs
=
self
.
connection
.
cursor
()
curs
.
execute
(
"SELECT * FROM {0}.{1} LIMIT 0"
.
format
(
self
.
schema
,
sqlobject
.
table_name
))
column_names
=
[
desc
[
0
]
for
desc
in
curs
.
description
]
column_type
=
[
desc
[
1
]
for
desc
in
curs
.
description
]
return
zip
(
column_names
,
column_type
)
def
setObjectItemTypes
(
self
,
sqlobject
):
col_info
=
self
.
getColumnProperties
(
sqlobject
)
for
i
,
j
in
col_info
:
sqlobject
.
types
[
i
]
=
self
.
type_code
[
j
]
# logger.debug (str(i) + " " + str(self.type_code[j]))
def
insert
(
self
,
sqlobject
):
sqlobject
.
prepare
()
curs
=
self
.
performRequest
(
*
(
sqlobject
.
insert
()))
sqlobject
.
id
=
curs
.
fetchone
()[
0
]
def
performRequest
(
self
,
request
,
params
=
[]):
curs
=
self
.
connection
.
cursor
()
# logger.debug (request)
# logger.debug (params)
try
:
curs
.
execute
(
request
,
params
)
except
psycopg2
.
ProgrammingError
as
err
:
raise
psycopg2
.
ProgrammingError
(
(
"While trying to execute the query '{0}' with parameters "
+
"'{1}', I caught this: '{2}'"
)
.
format
(
request
,
params
,
err
))
return
curs
def
createParameterSpace
(
self
,
myjob
,
entry_nb
=
0
,
tmp_job
=
None
,
nb_inserted
=
0
):
keys
=
myjob
.
entries
.
keys
()
nparam
=
len
(
keys
)
if
(
entry_nb
==
nparam
):
if
(
not
tmp_job
):
logger
.
debug
(
"internal error"
)
sys
.
exit
(
-
1
)
if
(
len
(
tmp_job
.
getMatchedObjectList
())
>
0
):
return
nb_inserted
nb_inserted
+=
1
logger
.
info
(
"insert job #{0}"
.
format
(
nb_inserted
)
+
': '
+
str
(
tmp_job
.
entries
))
self
.
insert
(
tmp_job
)
return
nb_inserted
if
(
not
tmp_job
):
tmp_job
=
job
.
Job
(
self
)
key
=
keys
[
entry_nb
]
e
=
myjob
[
key
]
if
(
type
(
e
)
==
list
):
for
typ
in
e
:
tmp_job
[
key
.
lower
()]
=
typ
nb_inserted
=
self
.
createParameterSpace
(
myjob
,
entry_nb
+
1
,
tmp_job
,
nb_inserted
)
else
:
tmp_job
[
key
.
lower
()]
=
e
nb_inserted
=
self
.
createParameterSpace
(
myjob
,
entry_nb
+
1
,
tmp_job
,
nb_inserted
)
if
(
self
.
truerun
):
self
.
commit
()
return
nb_inserted
def
pushQuantity
(
self
,
name
,
type_code
,
description
=
None
):
""" implemented type_codes: "int" "float" "int.vector" "float.vector"
"""
if
((
type_code
==
"int"
)
or
(
type_code
==
int
)):
is_integer
=
True
is_vector
=
False
elif
(
type_code
==
"int.vector"
):
is_integer
=
True
is_vector
=
True
elif
((
type_code
==
"float"
)
or
(
type_code
==
float
)):
is_integer
=
False
is_vector
=
False
elif
(
type_code
==
"float.vector"
):
is_integer
=
False
is_vector
=
True
else
:
raise
Exception
(
"invalid type '{0}' for a quantity"
.
format
(
type_code
))
curs
=
self
.
connection
.
cursor
()
curs
.
execute
(
"INSERT INTO {0}.quantities (name,is_integer,is_vector,description) VALUES (
%s
,
%s
,
%s
,
%s
) RETURNING id"
.
format
(
self
.
schema
),(
name
,
is_integer
,
is_vector
,
description
)
)
item
=
curs
.
fetchone
()
if
(
item
is
None
):
raise
Exception
(
"Counld not create quantity
\"
"
+
name
+
"
\"
"
)
return
item
[
0
]
def
commit
(
self
):
logger
.
debug
(
"commiting changes to base"
)
self
.
connection
.
commit
()
def
getSchemaList
(
self
):
curs
=
self
.
connection
.
cursor
()
curs
.
execute
(
"SELECT distinct(table_schema) from information_schema.tables where table_name='runs'"
)
schemas
=
[
desc
[
0
]
for
desc
in
curs
]
return
schemas
def
checkStudy
(
self
,
dico
):
if
not
"study"
in
dico
:
logger
.
debug
(
"*"
*
30
)
logger
.
debug
(
"Parameter 'study' must be provided at command line"
)
logger
.
debug
(
"possibilities are:"
)
schemas
=
self
.
getSchemaList
()
for
s
in
schemas
:
logger
.
debug
(
"
\t
"
+
s
)
logger
.
debug
(
""
)
logger
.
debug
(
"FATAL => ABORT"
)
logger
.
debug
(
"*"
*
30
)
sys
.
exit
(
-
1
)
def
close
(
self
):
if
'connection'
in
self
.
__dict__
:
logger
.
debug
(
'closing database session'
)
self
.
connection
.
close
()
del
(
self
.
__dict__
[
'connection'
])
def
__del__
(
self
):
self
.
close
()
def
__init__
(
self
,
truerun
=
False
,
**
kwargs
):
psycopg2_params
=
[
"host"
,
"user"
,
"port"
,
"password"
]
connection_params
=
bdparser
.
filterParams
(
psycopg2_params
,
kwargs
)
if
"password"
in
connection_params
and
connection_params
[
"password"
]
==
'ask'
:
connection_params
[
"password"
]
=
getpass
.
getpass
()
logger
.
debug
(
'connection arguments: {0}'
.
format
(
connection_params
))
try
:
self
.
connection
=
psycopg2
.
connect
(
**
connection_params
)
logger
.
debug
(
'connected to base'
)
except
Exception
as
e
:
logger
.
error
(
"Connection failed: check your connection settings:
\n
"
+
str
(
e
))
sys
.
exit
(
-
1
)
assert
(
isinstance
(
self
.
connection
,
psycopg2
.
_psycopg
.
connection
))
self
.
dbhost
=
kwargs
[
"host"
]
if
"host"
in
kwargs
.
keys
()
else
"localhost"
if
(
"should_not_check_study"
not
in
kwargs
):
self
.
checkStudy
(
kwargs
)
self
.
schema
=
kwargs
[
"study"
]
self
.
createTypeCodes
()
self
.
truerun
=
truerun
if
(
"list_parameters"
in
kwargs
and
kwargs
[
"list_parameters"
]
==
True
):
myjob
=
job
.
Job
(
self
)
myjob
.
prepare
()
logger
.
debug
(
"****************************************************************"
)
logger
.
debug
(
"Job parameters:"
)
logger
.
debug
(
"****************************************************************"
)
params
=
[
str
(
j
[
0
])
+
": "
+
str
(
j
[
1
])
for
j
in
myjob
.
types
.
iteritems
()
]
logger
.
debug
(
"
\n
"
.
join
(
params
))
myrun
=
run
.
Run
(
self
)
myrun
.
prepare
()
logger
.
debug
(
"****************************************************************"
)
logger
.
debug
(
"Run parameters:"
)
logger
.
debug
(
"****************************************************************"
)
params
=
[
str
(
j
[
0
])
+
": "
+
str
(
j
[
1
])
for
j
in
myrun
.
types
.
iteritems
()
]
logger
.
debug
(
"
\n
"
.
join
(
params
))
sys
.
exit
(
0
)
################################################################
if
__name__
==
"__main__"
:
connection
=
psycopg2
.
connect
(
host
=
"localhost"
)
job_description
=
job
.
Job
(
dict
(
hono
=
int
,
lulu
=
float
,
toto
=
str
))
base
=
Base
(
"honoluluSchema"
,
connection
,
job_description
)
base
.
create
()
connection
.
commit
()
base
.
pushJob
(
dict
(
hono
=
12
,
lulu
=
24.2
,
toto
=
"toto"
))
base
.
pushQuantity
(
"ekin"
,
"float"
)
connection
.
commit
()
Event Timeline
Log In to Comment