diff --git a/BlackDynamite/base.py b/BlackDynamite/base.py index f949fb9..c853212 100755 --- a/BlackDynamite/base.py +++ b/BlackDynamite/base.py @@ -1,305 +1,321 @@ #!/usr/bin/env python - +################################################################ from __future__ import print_function - -__all__ = [ "Base" ] - +################################################################ import job import os import psycopg2 import re -import copy -import numpy as np -import psycopg2 import bdparser import sys import getpass import datetime import run - +import bdlogging +import logging ################################################################ -import bdlogging,logging +__all__ = ["Base"] print = bdlogging.invalidPrint logger = logging.getLogger(__name__) ################################################################ + class Base(object): """ """ - def getRunFromID(self,run_id): + def getRunFromID(self, run_id): myrun = run.Run(self) myrun["id"] = run_id myrun.id = run_id run_list = myrun.getMatchedObjectList() if len(run_list) != 1: raise Exception('Unknown run {0}'.format(run_id)) return run_list[0] - def getJobFromID(self,job_id): + def getJobFromID(self, job_id): myjob = job.Job(self) myjob["id"] = job_id myjob.id = job_id job_list = myjob.getMatchedObjectList() if len(job_list) != 1: raise Exception('Unknown run {0}'.format(job_id)) return job_list[0] - - def createBase(self,job_desc,run_desc,quantities={},**kwargs): - #logger.debug (quantities) + def createBase(self, job_desc, run_desc, quantities={}, **kwargs): + # logger.debug (quantities) self.createSchema(kwargs) self.createTable(job_desc) self.createTable(run_desc) self.createGenericTables() - for qname,type in quantities.iteritems(): - self.pushQuantity(qname,type) + for qname, type in quantities.iteritems(): + self.pushQuantity(qname, type) - - if (self.truerun): + if self.truerun: self.commit() - def getObject(self,sqlobject): + def getObject(self, sqlobject): curs = self.connection.cursor() - curs.execute("SELECT * FROM {0}.{1} WHERE id = {2}".format(self.schema,sqlobject.table_name,sqlobject.id)) + curs.execute("SELECT * FROM {0}.{1} WHERE id = {2}".format( + self.schema, sqlobject.table_name, sqlobject.id)) col_info = self.getColumnProperties(sqlobject) line = curs.fetchone() - for i in range(0,len(col_info)): + for i in range(0, len(col_info)): col_name = col_info[i][0] sqlobject[col_name] = line[i] - def createSchema(self, params = {"yes": False}): + def createSchema(self, params={"yes": False}): # create the schema of the simulation curs = self.connection.cursor() - curs.execute("SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{0}'".format(self.schema).lower()) - if (curs.rowcount): - validated = bdparser.validate_question("Are you sure you want to drop the schema named '" + self.schema + "'", params, False) - if (validated == True): + curs.execute(("SELECT schema_name FROM information_schema.schemata" + " WHERE schema_name = '{0}'").format( + self.schema).lower()) + if curs.rowcount: + validated = bdparser.validate_question( + "Are you sure you want to drop the schema named '" + + self.schema + "'", params, False) + if validated is True: curs.execute("DROP SCHEMA {0} cascade".format(self.schema)) else: - logger.debug ("creation canceled: exit program") + logger.debug("creation canceled: exit program") sys.exit(-1) curs.execute("CREATE SCHEMA {0}".format(self.schema)) def createTypeCodes(self): curs = self.connection.cursor() curs.execute("SELECT typname,oid from pg_type;") self.type_code = {} for i in curs: # logger.debug (i[0]) if (i[0] == 'float8'): self.type_code[i[1]] = float if (i[0] == 'text'): self.type_code[i[1]] = str if (i[0] == 'int8'): self.type_code[i[1]] = int if (i[0] == 'int4'): self.type_code[i[1]] = int if (i[0] == 'bool'): self.type_code[i[1]] = bool if (i[0] == 'timestamp'): self.type_code[i[1]] = datetime.datetime - def createTable(self, object): request = object.createTableRequest() curs = self.connection.cursor() # logger.debug (request) curs.execute(request) def createGenericTables(self,): - sql_script_name = os.path.join(os.path.dirname(__file__),"build_tables.sql") + sql_script_name = os.path.join(os.path.dirname(__file__), + "build_tables.sql") curs = self.connection.cursor() # create generic tables query_list = list() - with open(sql_script_name,"r") as fh: + with open(sql_script_name, "r") as fh: for line in fh: - query_list.append(re.sub("SCHEMAS_IDENTIFIER",self.schema,line)) + query_list.append(re.sub("SCHEMAS_IDENTIFIER", + self.schema, line)) curs.execute("\n".join(query_list)) def getColumnProperties(self, sqlobject): curs = self.connection.cursor() - curs.execute("SELECT * FROM {0}.{1} LIMIT 0".format(self.schema,sqlobject.table_name)) - column_names = [ desc[0] for desc in curs.description] - column_type = [ desc[1] for desc in curs.description] - return zip(column_names,column_type) + curs.execute("SELECT * FROM {0}.{1} LIMIT 0".format( + self.schema, sqlobject.table_name)) + column_names = [desc[0] for desc in curs.description] + column_type = [desc[1] for desc in curs.description] + return zip(column_names, column_type) def setObjectItemTypes(self, sqlobject): col_info = self.getColumnProperties(sqlobject) - for i,j in col_info: + for i, j in col_info: sqlobject.types[i] = self.type_code[j] -# logger.debug (str(i) + " " + str(self.type_code[j])) + # logger.debug (str(i) + " " + str(self.type_code[j])) def insert(self, sqlobject): sqlobject.prepare() curs = self.performRequest(*(sqlobject.insert())) sqlobject.id = curs.fetchone()[0] def performRequest(self, request, params=[]): curs = self.connection.cursor() # logger.debug (request) # logger.debug (params) try: - curs.execute(request,params) + curs.execute(request, params) except psycopg2.ProgrammingError as err: - raise psycopg2.ProgrammingError( - ("While trying to execute the query '{0}' with parameters " + - "'{1}', I caught this: '{2}'").format(request, params, err)) + raise psycopg2.ProgrammingError( + ("While trying to execute the query '{0}' with parameters " + + "'{1}', I caught this: '{2}'").format(request, params, err)) return curs - def createParameterSpace(self,myjob,entry_nb=0,tmp_job=None,nb_inserted = 0): + def createParameterSpace(self, myjob, entry_nb=0, + tmp_job=None, nb_inserted=0): keys = myjob.entries.keys() nparam = len(keys) if (entry_nb == nparam): if (not tmp_job): - logger.debug ("internal error") + logger.debug("internal error") sys.exit(-1) if (len(tmp_job.getMatchedObjectList()) > 0): return nb_inserted nb_inserted += 1 - logger.info("insert job #{0}".format(nb_inserted) + ': ' + str(tmp_job.entries)) + logger.info("insert job #{0}".format(nb_inserted) + + ': ' + str(tmp_job.entries)) self.insert(tmp_job) return nb_inserted - - - if (not tmp_job): + if not tmp_job: tmp_job = job.Job(self) key = keys[entry_nb] e = myjob[key] if (type(e) == list): for typ in e: tmp_job[key.lower()] = typ - nb_inserted = self.createParameterSpace(myjob,entry_nb+1,tmp_job,nb_inserted) + nb_inserted = self.createParameterSpace( + myjob, entry_nb+1, tmp_job, nb_inserted) else: tmp_job[key.lower()] = e - nb_inserted = self.createParameterSpace(myjob,entry_nb+1,tmp_job,nb_inserted) + nb_inserted = self.createParameterSpace( + myjob, entry_nb+1, tmp_job, nb_inserted) - if (self.truerun): self.commit() + if self.truerun: + self.commit() return nb_inserted - - def pushQuantity(self,name, type_code, description=None): + def pushQuantity(self, name, type_code, description=None): """ implemented type_codes: "int" "float" "int.vector" "float.vector" """ if ((type_code == "int") or (type_code == int)): is_integer = True is_vector = False elif (type_code == "int.vector"): is_integer = True is_vector = True elif ((type_code == "float") or (type_code == float)): is_integer = False is_vector = False elif (type_code == "float.vector"): is_integer = False is_vector = True else: - raise Exception("invalid type '{0}' for a quantity".format(type_code)) - + raise Exception( + "invalid type '{0}' for a quantity".format(type_code)) + curs = self.connection.cursor() - curs.execute("INSERT INTO {0}.quantities (name,is_integer,is_vector,description) VALUES (%s , %s , %s, %s) RETURNING id".format(self.schema),(name,is_integer,is_vector,description) ) + curs.execute(""" +INSERT INTO {0}.quantities (name, is_integer, is_vector, description) +VALUES (%s , %s , %s, %s) RETURNING id +""".format(self.schema), (name, is_integer, is_vector, description)) item = curs.fetchone() if (item is None): raise Exception("Counld not create quantity \"" + name + "\"") return item[0] - - def commit(self): logger.debug("commiting changes to base") self.connection.commit() def getSchemaList(self): curs = self.connection.cursor() - curs.execute("SELECT distinct(table_schema) from information_schema.tables where table_name='runs'") - schemas = [ desc[0] for desc in curs] + curs.execute(""" +SELECT distinct(table_schema) from information_schema.tables +where table_name='runs' +""") + schemas = [desc[0] for desc in curs] return schemas - def checkStudy(self,dico): - if not "study" in dico: - logger.debug ("*"*30) - logger.debug ("Parameter 'study' must be provided at command line") - logger.debug ("possibilities are:") + def checkStudy(self, dico): + if "study" not in dico: + logger.debug("*"*30) + logger.debug("Parameter 'study' must be provided at command line") + logger.debug("possibilities are:") schemas = self.getSchemaList() for s in schemas: - logger.debug ("\t" + s) - logger.debug ("") - logger.debug ("FATAL => ABORT") - logger.debug ("*"*30) + logger.debug("\t" + s) + logger.debug("") + logger.debug("FATAL => ABORT") + logger.debug("*"*30) sys.exit(-1) - def close(self): if 'connection' in self.__dict__: - logger.debug ('closing database session' ) + logger.debug('closing database session') self.connection.close() del (self.__dict__['connection']) - + def __del__(self): self.close() - - def __init__ (self, truerun=False,**kwargs): - psycopg2_params = ["host","user","port","password"] - connection_params = bdparser.filterParams(psycopg2_params,kwargs) - if "password" in connection_params and connection_params["password"] == 'ask': + + def __init__(self, truerun=False, **kwargs): + psycopg2_params = ["host", "user", "port", "password"] + connection_params = bdparser.filterParams(psycopg2_params, kwargs) + if ("password" in connection_params + and connection_params["password"] == 'ask'): connection_params["password"] = getpass.getpass() logger.debug('connection arguments: {0}'.format(connection_params)) try: self.connection = psycopg2.connect(**connection_params) logger.debug('connected to base') except Exception as e: - logger.error("Connection failed: check your connection settings:\n" + str(e)) + logger.error( + "Connection failed: check your connection settings:\n" + + str(e)) sys.exit(-1) - assert(isinstance(self.connection,psycopg2._psycopg.connection)) + assert(isinstance(self.connection, psycopg2._psycopg.connection)) - self.dbhost = kwargs["host"] if "host" in kwargs.keys() else "localhost" + self.dbhost = (kwargs["host"] + if "host" in kwargs.keys() + else "localhost") if ("should_not_check_study" not in kwargs): self.checkStudy(kwargs) self.schema = kwargs["study"] self.createTypeCodes() self.truerun = truerun - if("list_parameters" in kwargs and kwargs["list_parameters"] == True): + if("list_parameters" in kwargs and kwargs["list_parameters"] is True): myjob = job.Job(self) myjob.prepare() message = "" - message += ("****************************************************************\n") + message += ("*"*65 + "\n") message += ("Job parameters:\n") - message += ("****************************************************************\n") - params = [str(j[0]) + ": " + str(j[1]) for j in myjob.types.iteritems() ] + message += ("*"*65 + "\n") + params = [str(j[0]) + ": " + str(j[1]) + for j in myjob.types.iteritems()] message += ("\n".join(params)+"\n") myrun = run.Run(self) myrun.prepare() - message += ("****************************************************************\n") + message += ("*"*65 + "\n") message += ("Run parameters:\n") - message += ("****************************************************************\n") - params = [str(j[0]) + ": " + str(j[1]) for j in myrun.types.iteritems() ] + message += ("*"*65 + "\n") + params = [str(j[0]) + ": " + str(j[1]) + for j in myrun.types.iteritems()] message += ("\n".join(params)) logger.info("\n{0}".format(message)) sys.exit(0) ################################################################ + if __name__ == "__main__": - connection = psycopg2.connect(host = "localhost") - job_description = job.Job(dict(hono=int,lulu=float,toto=str)) - base = Base("honoluluSchema",connection,job_description) + connection = psycopg2.connect(host="localhost") + job_description = job.Job(dict(hono=int, lulu=float, toto=str)) + base = Base("honoluluSchema", connection, job_description) base.create() connection.commit() - base.pushJob(dict(hono=12,lulu=24.2,toto="toto")) + base.pushJob(dict(hono=12, lulu=24.2, toto="toto")) base.pushQuantity("ekin", "float") connection.commit() diff --git a/BlackDynamite/job.py b/BlackDynamite/job.py index dbdf601..a6271e8 100755 --- a/BlackDynamite/job.py +++ b/BlackDynamite/job.py @@ -1,13 +1,15 @@ #!/usr/bin/env python - -__all__ = [ "Job" ] - +################################################################ import sqlobject +################################################################ +__all__ = ["Job"] +################################################################ + class Job(sqlobject.SQLObject): """ """ - def __init__ (self, base): + def __init__(self, base): sqlobject.SQLObject.__init__(self, base) self.table_name = "jobs"