Page MenuHomec4science

base.py
No OneTemporary

File Metadata

Created
Sat, May 11, 09:08
#!/usr/bin/env python
from __future__ import print_function
__all__ = [ "Base" ]
import job
import os
import psycopg2
import re
import copy
import numpy as np
import psycopg2
import bdparser
import sys
import getpass
import datetime
import run
################################################################
import bdlogging,logging
print = bdlogging.invalidPrint
logger = logging.getLogger(__name__)
################################################################
class Base(object):
"""
"""
def getRunFromID(self,run_id):
myrun = run.Run(self)
myrun["id"] = run_id
myrun.id = run_id
run_list = myrun.getMatchedObjectList()
if len(run_list) != 1:
raise Exception('Unknown run {0}'.format(run_id))
return run_list[0]
def getJobFromID(self,job_id):
myjob = job.Job(self)
myjob["id"] = job_id
myjob.id = job_id
job_list = myjob.getMatchedObjectList()
if len(job_list) != 1:
raise Exception('Unknown run {0}'.format(job_id))
return job_list[0]
def createBase(self,job_desc,run_desc,quantities={},**kwargs):
#logger.debug (quantities)
self.createSchema(kwargs)
self.createTable(job_desc)
self.createTable(run_desc)
self.createGenericTables()
for qname,type in quantities.iteritems():
self.pushQuantity(qname,type)
if (self.truerun):
self.commit()
def getObject(self,sqlobject):
curs = self.connection.cursor()
curs.execute("SELECT * FROM {0}.{1} WHERE id = {2}".format(self.schema,sqlobject.table_name,sqlobject.id))
col_info = self.getColumnProperties(sqlobject)
line = curs.fetchone()
for i in range(0,len(col_info)):
col_name = col_info[i][0]
sqlobject[col_name] = line[i]
def createSchema(self, params = {"yes": False}):
# create the schema of the simulation
curs = self.connection.cursor()
curs.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{0}'".format(self.schema).lower())
if (curs.rowcount):
validated = bdparser.validate_question("Are you sure you want to drop the schema named '" + self.schema + "'", params, False)
if (validated == True):
curs.execute("DROP SCHEMA {0} cascade".format(self.schema))
else:
logger.debug ("creation canceled: exit program")
sys.exit(-1)
curs.execute("CREATE SCHEMA {0}".format(self.schema))
def createTypeCodes(self):
curs = self.connection.cursor()
curs.execute("SELECT typname,oid from pg_type;")
self.type_code = {}
for i in curs:
# logger.debug (i[0])
if (i[0] == 'float8'):
self.type_code[i[1]] = float
if (i[0] == 'text'):
self.type_code[i[1]] = str
if (i[0] == 'int8'):
self.type_code[i[1]] = int
if (i[0] == 'int4'):
self.type_code[i[1]] = int
if (i[0] == 'bool'):
self.type_code[i[1]] = bool
if (i[0] == 'timestamp'):
self.type_code[i[1]] = datetime.datetime
def createTable(self, object):
request = object.createTableRequest()
curs = self.connection.cursor()
# logger.debug (request)
curs.execute(request)
def createGenericTables(self,):
sql_script_name = os.path.join(os.path.dirname(__file__),"build_tables.sql")
curs = self.connection.cursor()
# create generic tables
query_list = list()
with open(sql_script_name,"r") as fh:
for line in fh:
query_list.append(re.sub("SCHEMAS_IDENTIFIER",self.schema,line))
curs.execute("\n".join(query_list))
def getColumnProperties(self, sqlobject):
curs = self.connection.cursor()
curs.execute("SELECT * FROM {0}.{1} LIMIT 0".format(self.schema,sqlobject.table_name))
column_names = [ desc[0] for desc in curs.description]
column_type = [ desc[1] for desc in curs.description]
return zip(column_names,column_type)
def setObjectItemTypes(self, sqlobject):
col_info = self.getColumnProperties(sqlobject)
for i,j in col_info:
sqlobject.types[i] = self.type_code[j]
# logger.debug (str(i) + " " + str(self.type_code[j]))
def insert(self, sqlobject):
sqlobject.prepare()
curs = self.performRequest(*(sqlobject.insert()))
sqlobject.id = curs.fetchone()[0]
def performRequest(self, request, params=[]):
curs = self.connection.cursor()
# logger.debug (request)
# logger.debug (params)
try:
curs.execute(request,params)
except psycopg2.ProgrammingError as err:
raise psycopg2.ProgrammingError(
("While trying to execute the query '{0}' with parameters " +
"'{1}', I caught this: '{2}'").format(request, params, err))
return curs
def createParameterSpace(self,myjob,entry_nb=0,tmp_job=None,nb_inserted = 0):
keys = myjob.entries.keys()
nparam = len(keys)
if (entry_nb == nparam):
if (not tmp_job):
logger.debug ("internal error")
sys.exit(-1)
if (len(tmp_job.getMatchedObjectList()) > 0):
return nb_inserted
nb_inserted += 1
logger.info("insert job #{0}".format(nb_inserted) + ': ' + str(tmp_job.entries))
self.insert(tmp_job)
return nb_inserted
if (not tmp_job):
tmp_job = job.Job(self)
key = keys[entry_nb]
e = myjob[key]
if (type(e) == list):
for typ in e:
tmp_job[key.lower()] = typ
nb_inserted = self.createParameterSpace(myjob,entry_nb+1,tmp_job,nb_inserted)
else:
tmp_job[key.lower()] = e
nb_inserted = self.createParameterSpace(myjob,entry_nb+1,tmp_job,nb_inserted)
if (self.truerun): self.commit()
return nb_inserted
def pushQuantity(self,name, type_code, description=None):
""" implemented type_codes: "int" "float" "int.vector" "float.vector"
"""
if ((type_code == "int") or (type_code == int)):
is_integer = True
is_vector = False
elif (type_code == "int.vector"):
is_integer = True
is_vector = True
elif ((type_code == "float") or (type_code == float)):
is_integer = False
is_vector = False
elif (type_code == "float.vector"):
is_integer = False
is_vector = True
else:
raise Exception("invalid type '{0}' for a quantity".format(type_code))
curs = self.connection.cursor()
curs.execute("INSERT INTO {0}.quantities (name,is_integer,is_vector,description) VALUES (%s , %s , %s, %s) RETURNING id".format(self.schema),(name,is_integer,is_vector,description) )
item = curs.fetchone()
if (item is None):
raise Exception("Counld not create quantity \"" + name + "\"")
return item[0]
def commit(self):
logger.debug("commiting changes to base")
self.connection.commit()
def getSchemaList(self):
curs = self.connection.cursor()
curs.execute("SELECT distinct(table_schema) from information_schema.tables where table_name='runs'")
schemas = [ desc[0] for desc in curs]
return schemas
def checkStudy(self,dico):
if not "study" in dico:
logger.debug ("*"*30)
logger.debug ("Parameter 'study' must be provided at command line")
logger.debug ("possibilities are:")
schemas = self.getSchemaList()
for s in schemas:
logger.debug ("\t" + s)
logger.debug ("")
logger.debug ("FATAL => ABORT")
logger.debug ("*"*30)
sys.exit(-1)
def close(self):
if 'connection' in self.__dict__:
logger.debug ('closing database session' )
self.connection.close()
del (self.__dict__['connection'])
def __del__(self):
self.close()
def __init__ (self, truerun=False,**kwargs):
psycopg2_params = ["host","user","port","password"]
connection_params = bdparser.filterParams(psycopg2_params,kwargs)
if "password" in connection_params and connection_params["password"] == 'ask':
connection_params["password"] = getpass.getpass()
logger.debug('connection arguments: {0}'.format(connection_params))
try:
self.connection = psycopg2.connect(**connection_params)
logger.debug('connected to base')
except Exception as e:
logger.error("Connection failed: check your connection settings:\n" + str(e))
sys.exit(-1)
assert(isinstance(self.connection,psycopg2._psycopg.connection))
self.dbhost = kwargs["host"] if "host" in kwargs.keys() else "localhost"
if ("should_not_check_study" not in kwargs):
self.checkStudy(kwargs)
self.schema = kwargs["study"]
self.createTypeCodes()
self.truerun = truerun
if("list_parameters" in kwargs and kwargs["list_parameters"] == True):
myjob = job.Job(self)
myjob.prepare()
message = ""
message += ("****************************************************************\n")
message += ("Job parameters:\n")
message += ("****************************************************************\n")
params = [str(j[0]) + ": " + str(j[1]) for j in myjob.types.iteritems() ]
message += ("\n".join(params)+"\n")
myrun = run.Run(self)
myrun.prepare()
message += ("****************************************************************\n")
message += ("Run parameters:\n")
message += ("****************************************************************\n")
params = [str(j[0]) + ": " + str(j[1]) for j in myrun.types.iteritems() ]
message += ("\n".join(params))
logger.info("\n{0}".format(message))
sys.exit(0)
################################################################
if __name__ == "__main__":
connection = psycopg2.connect(host = "localhost")
job_description = job.Job(dict(hono=int,lulu=float,toto=str))
base = Base("honoluluSchema",connection,job_description)
base.create()
connection.commit()
base.pushJob(dict(hono=12,lulu=24.2,toto="toto"))
base.pushQuantity("ekin", "float")
connection.commit()

Event Timeline