diff --git a/python/BlackDynamite/base.py b/python/BlackDynamite/base.py index 9d38251..dffd578 100755 --- a/python/BlackDynamite/base.py +++ b/python/BlackDynamite/base.py @@ -1,269 +1,268 @@ #!/usr/bin/env python from __future__ import print_function __all__ = [ "BaseError", "Base" ] import job import os import psycopg2 import re import copy import numpy as np import psycopg2 import bdparser import sys import getpass import datetime import run class BaseError (Exception): pass class Base(object): """ """ def createBase(self,job_desc,run_desc,quantities={},**kwargs): - print (quantities) + #print (quantities) self.createSchema(kwargs) self.createTable(job_desc) self.createTable(run_desc) self.createGenericTables() for qname,type in quantities.iteritems(): self.pushQuantity(qname,type) if (self.truerun): self.commit() def getObject(self,sqlobject): curs = self.connection.cursor() curs.execute("SELECT * FROM {0}.{1} WHERE id = {2}".format(self.schema,sqlobject.table_name,sqlobject.id)) col_info = self.getColumnProperties(sqlobject) line = curs.fetchone() for i in range(0,len(col_info)): col_name = col_info[i][0] sqlobject[col_name] = line[i] def createSchema(self, params = {"yes": False}): # create the schema of the simulation curs = self.connection.cursor() curs.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{0}'".format(self.schema).lower()) if (curs.rowcount): validated = bdparser.validate_question("Are you sure you want to drop the schema named '" + self.schema + "'", params, False) if (validated == True): curs.execute("DROP SCHEMA {0} cascade".format(self.schema)) else: print ("creation canceled: exit program") sys.exit(-1) curs.execute("CREATE SCHEMA {0}".format(self.schema)) def createTypeCodes(self): curs = self.connection.cursor() curs.execute("SELECT typname,oid from pg_type;") self.type_code = {} for i in curs: # print (i[0]) if (i[0] == 'float8'): self.type_code[i[1]] = float if (i[0] == 'text'): self.type_code[i[1]] = str if (i[0] == 'int8'): self.type_code[i[1]] = int if (i[0] == 'int4'): self.type_code[i[1]] = int if (i[0] == 'bool'): self.type_code[i[1]] = bool if (i[0] == 'timestamp'): self.type_code[i[1]] = datetime.datetime def createTable(self, object): request = object.createTableRequest() curs = self.connection.cursor() # print (request) curs.execute(request) def createGenericTables(self,): sql_script_name = os.path.join(os.path.dirname(__file__),"build_tables.sql") curs = self.connection.cursor() # create generic tables query_list = list() with open(sql_script_name,"r") as fh: for line in fh: query_list.append(re.sub("SCHEMAS_IDENTIFIER",self.schema,line)) curs.execute("\n".join(query_list)) def getColumnProperties(self, sqlobject): curs = self.connection.cursor() curs.execute("SELECT * FROM {0}.{1} LIMIT 0".format(self.schema,sqlobject.table_name)) column_names = [ desc[0] for desc in curs.description] column_type = [ desc[1] for desc in curs.description] return zip(column_names,column_type) def setObjectItemTypes(self, sqlobject): col_info = self.getColumnProperties(sqlobject) for i,j in col_info: sqlobject.types[i] = self.type_code[j] # print (str(i) + " " + str(self.type_code[j])) def insert(self, sqlobject): sqlobject.prepare() curs = self.performRequest(*(sqlobject.insert())) sqlobject.id = curs.fetchone()[0] def performRequest(self, request, params=[]): curs = self.connection.cursor() # print (request) # print (params) try: curs.execute(request,params) except psycopg2.ProgrammingError as err: raise psycopg2.ProgrammingError( ("While trying to execute the query '{0}' with parameters " + "'{1}', I caught this: '{2}'").format(request, params, err)) return curs - def createParameterSpace(self,myjob,entry_nb=0,tmp_job=None): + def createParameterSpace(self,myjob,entry_nb=0,tmp_job=None,nb_inserted = 0): keys = myjob.entries.keys() nparam = len(keys) -# print (nparam) -# print (entry_nb) if (entry_nb == nparam): if (not tmp_job): print ("internal error") sys.exit(-1) - print (tmp_job.entries) if (len(tmp_job.getMatchedObjectList()) > 0): return - print ("insert job " + str(tmp_job.entries)) + nb_inserted += 1 + print ("insert job #{0}".format(nb_inserted) + ': ' + str(tmp_job.entries)) self.insert(tmp_job) - return + return nb_inserted + if (not tmp_job): tmp_job = job.Job(self) key = keys[entry_nb] e = myjob[key] if (type(e) == list): for typ in e: tmp_job[key.lower()] = typ - self.createParameterSpace(myjob,entry_nb+1,tmp_job) + nb_inserted = self.createParameterSpace(myjob,entry_nb+1,tmp_job,nb_inserted) else: tmp_job[key.lower()] = e - self.createParameterSpace(myjob,entry_nb+1,tmp_job) - - if (self.truerun): - self.commit() + nb_inserted = self.createParameterSpace(myjob,entry_nb+1,tmp_job,nb_inserted) + if (self.truerun): self.commit() + return nb_inserted + def pushQuantity(self,name, type_code, description=None): """ implemented type_codes: "int" "float" "int.vector" "float.vector" """ if ((type_code == "int") or (type_code == int)): is_integer = True is_vector = False elif (type_code == "int.vector"): is_integer = True is_vector = True elif ((type_code == "float") or (type_code == float)): is_integer = False is_vector = False elif (type_code == "float.vector"): is_integer = False is_vector = True else: raise Exception("invalid type '{0}' for a quantity".format(type_code)) curs = self.connection.cursor() curs.execute("INSERT INTO {0}.quantities (name,is_integer,is_vector,description) VALUES (%s , %s , %s, %s) RETURNING id".format(self.schema),(name,is_integer,is_vector,description) ) item = curs.fetchone() if (item is None): raise Exception("Counld not create quantity \"" + name + "\"") return item[0] def commit(self): self.connection.commit() def getSchemaList(self): curs = self.connection.cursor() curs.execute("SELECT distinct(table_schema) from information_schema.tables where table_name='runs'") schemas = [ desc[0] for desc in curs] return schemas def checkStudy(self,dico): if not "study" in dico: print ("*"*30) print ("Parameter 'study' must be provided at command line") print ("possibilities are:") schemas = self.getSchemaList() for s in schemas: print ("\t" + s) print ("") print ("FATAL => ABORT") print ("*"*30) sys.exit(-1) def __init__ (self, truerun=False,**kwargs): psycopg2_params = ["host","user","port","password"] connection_params = bdparser.filterParams(psycopg2_params,kwargs) if "password" in connection_params and connection_params["password"] == 'ask': connection_params["password"] = getpass.getpass() try: self.connection = psycopg2.connect(**connection_params) except Exception as e: print ("Connection failed: check you connection settings:\n" + str(e)) sys.exit(-1) assert(isinstance(self.connection,psycopg2._psycopg.connection)) self.dbhost = kwargs["host"] if "host" in kwargs.keys() else "localhost" if ("should_not_check_study" not in kwargs): self.checkStudy(kwargs) self.schema = kwargs["study"] self.createTypeCodes() self.truerun = truerun if("list_parameters" in kwargs and kwargs["list_parameters"] == True): myjob = job.Job(self) myjob.prepare() print ("****************************************************************") print ("Job parameters:") print ("****************************************************************") params = [str(j[0]) + ": " + str(j[1]) for j in myjob.types.iteritems() ] print("\n".join(params)) myrun = run.Run(self) myrun.prepare() print ("****************************************************************") print ("Run parameters:") print ("****************************************************************") params = [str(j[0]) + ": " + str(j[1]) for j in myrun.types.iteritems() ] print("\n".join(params)) sys.exit(0) ################################################################ if __name__ == "__main__": connection = psycopg2.connect(host = "localhost") job_description = job.Job(dict(hono=int,lulu=float,toto=str)) base = Base("honoluluSchema",connection,job_description) base.create() connection.commit() base.pushJob(dict(hono=12,lulu=24.2,toto="toto")) base.pushQuantity("ekin", "float") connection.commit()