diff --git a/BlackDynamite/base.py b/BlackDynamite/base.py index 02d0fe0..727449e 100755 --- a/BlackDynamite/base.py +++ b/BlackDynamite/base.py @@ -1,336 +1,355 @@ #!/usr/bin/env python ################################################################ from __future__ import print_function ################################################################ import job import os import psycopg2 import re import bdparser import sys import getpass import datetime import run import bdlogging import logging ################################################################ __all__ = ["Base"] print = bdlogging.invalidPrint logger = logging.getLogger(__name__) ################################################################ class Base(object): """ """ def getRunFromID(self, run_id): myrun = run.Run(self) myrun["id"] = run_id myrun.id = run_id run_list = myrun.getMatchedObjectList() if len(run_list) != 1: raise Exception('Unknown run {0}'.format(run_id)) return run_list[0] def getJobFromID(self, job_id): myjob = job.Job(self) myjob["id"] = job_id myjob.id = job_id job_list = myjob.getMatchedObjectList() if len(job_list) != 1: raise Exception('Unknown run {0}'.format(job_id)) return job_list[0] def createBase(self, job_desc, run_desc, quantities={}, **kwargs): # logger.debug (quantities) self.createSchema(kwargs) self.createTable(job_desc) self.createTable(run_desc) self.createGenericTables() for qname, type in quantities.iteritems(): self.pushQuantity(qname, type) if self.truerun: self.commit() def getObject(self, sqlobject): curs = self.connection.cursor() curs.execute("SELECT * FROM {0}.{1} WHERE id = {2}".format( self.schema, sqlobject.table_name, sqlobject.id)) col_info = self.getColumnProperties(sqlobject) line = curs.fetchone() for i in range(0, len(col_info)): col_name = col_info[i][0] sqlobject[col_name] = line[i] def createSchema(self, params={"yes": False}): # create the schema of the simulation curs = self.connection.cursor() curs.execute(("SELECT schema_name FROM information_schema.schemata" " WHERE schema_name = '{0}'").format( self.schema).lower()) if curs.rowcount: validated = bdparser.validate_question( "Are you sure you want to drop the schema named '" + self.schema + "'", params, False) if validated is True: curs.execute("DROP SCHEMA {0} cascade".format(self.schema)) else: logger.debug("creation canceled: exit program") sys.exit(-1) curs.execute("CREATE SCHEMA {0}".format(self.schema)) def createTypeCodes(self): curs = self.connection.cursor() curs.execute("SELECT typname,oid from pg_type;") self.type_code = {} for i in curs: # logger.debug (i[0]) if (i[0] == 'float8'): self.type_code[i[1]] = float if (i[0] == 'text'): self.type_code[i[1]] = str if (i[0] == 'int8'): self.type_code[i[1]] = int if (i[0] == 'int4'): self.type_code[i[1]] = int if (i[0] == 'bool'): self.type_code[i[1]] = bool if (i[0] == 'timestamp'): self.type_code[i[1]] = datetime.datetime def createTable(self, object): request = object.createTableRequest() curs = self.connection.cursor() # logger.debug (request) curs.execute(request) def createGenericTables(self,): sql_script_name = os.path.join(os.path.dirname(__file__), "build_tables.sql") curs = self.connection.cursor() # create generic tables query_list = list() with open(sql_script_name, "r") as fh: for line in fh: query_list.append(re.sub("SCHEMAS_IDENTIFIER", self.schema, line)) curs.execute("\n".join(query_list)) def getColumnProperties(self, sqlobject): curs = self.connection.cursor() curs.execute("SELECT * FROM {0}.{1} LIMIT 0".format( self.schema, sqlobject.table_name)) column_names = [desc[0] for desc in curs.description] column_type = [desc[1] for desc in curs.description] return zip(column_names, column_type) def setObjectItemTypes(self, sqlobject): col_info = self.getColumnProperties(sqlobject) for i, j in col_info: sqlobject.types[i] = self.type_code[j] # logger.debug (str(i) + " " + str(self.type_code[j])) def insert(self, sqlobject): sqlobject.prepare() curs = self.performRequest(*(sqlobject.insert())) sqlobject.id = curs.fetchone()[0] def performRequest(self, request, params=[]): curs = self.connection.cursor() # logger.debug (request) # logger.debug (params) try: curs.execute(request, params) except psycopg2.ProgrammingError as err: raise psycopg2.ProgrammingError( ("While trying to execute the query '{0}' with parameters " + "'{1}', I caught this: '{2}'").format(request, params, err)) return curs def createParameterSpace(self, myjob, entry_nb=0, tmp_job=None, nb_inserted=0): keys = myjob.entries.keys() nparam = len(keys) if (entry_nb == nparam): if (not tmp_job): logger.debug("internal error") sys.exit(-1) if (len(tmp_job.getMatchedObjectList()) > 0): return nb_inserted nb_inserted += 1 logger.info("insert job #{0}".format(nb_inserted) + ': ' + str(tmp_job.entries)) self.insert(tmp_job) return nb_inserted if not tmp_job: tmp_job = job.Job(self) key = keys[entry_nb] e = myjob[key] if (type(e) == list): for typ in e: tmp_job[key.lower()] = typ nb_inserted = self.createParameterSpace( myjob, entry_nb+1, tmp_job, nb_inserted) else: tmp_job[key.lower()] = e nb_inserted = self.createParameterSpace( myjob, entry_nb+1, tmp_job, nb_inserted) if self.truerun: self.commit() return nb_inserted def pushQuantity(self, name, type_code, description=None): """ implemented type_codes: "int" "float" "int.vector" "float.vector" """ if ((type_code == "int") or (type_code == int)): is_integer = True is_vector = False elif (type_code == "int.vector"): is_integer = True is_vector = True elif ((type_code == "float") or (type_code == float)): is_integer = False is_vector = False elif (type_code == "float.vector"): is_integer = False is_vector = True else: raise Exception( "invalid type '{0}' for a quantity".format(type_code)) curs = self.connection.cursor() curs.execute(""" INSERT INTO {0}.quantities (name, is_integer, is_vector, description) VALUES (%s , %s , %s, %s) RETURNING id """.format(self.schema), (name, is_integer, is_vector, description)) item = curs.fetchone() if (item is None): raise Exception("Counld not create quantity \"" + name + "\"") return item[0] def commit(self): logger.debug("commiting changes to base") self.connection.commit() - def getSchemaList(self): + def getUserList(self): + curs = self.connection.cursor() + curs.execute(""" +select tableowner from pg_tables where tablename = 'runs'; +""") + users = [desc[0] for desc in curs] + return users + + def getStudyOwner(self, schema): + curs = self.connection.cursor() + curs.execute(""" + select grantor from information_schema.table_privileges where (table_name,table_schema,privilege_type) = ('runs','{0}','SELECT'); +""".format(schema)) + owners = [desc[0] for desc in curs] + return owners[0] + + def getSchemaList(self, filter_names=True): curs = self.connection.cursor() curs.execute(""" SELECT distinct(table_schema) from information_schema.tables where table_name='runs' """) schemas = [desc[0] for desc in curs] filtered_schemas = [] - for s in schemas: - m = re.match('{0}_(.+)'.format(self.user), s) - if m: - s = m.group(1) - filtered_schemas.append(s) - logger.info(filtered_schemas) + if filter_names is True: + for s in schemas: + m = re.match('{0}_(.+)'.format(self.user), s) + if m: + s = m.group(1) + filtered_schemas.append(s) + logger.info(filtered_schemas) + else: + filtered_schemas = schemas return filtered_schemas def checkStudy(self, dico): if "study" not in dico: logger.debug("*"*30) logger.debug("Parameter 'study' must be provided at command line") logger.debug("possibilities are:") schemas = self.getSchemaList() for s in schemas: logger.debug("\t" + s) logger.debug("") logger.debug("FATAL => ABORT") logger.debug("*"*30) sys.exit(-1) def close(self): if 'connection' in self.__dict__: logger.debug('closing database session') self.connection.close() del (self.__dict__['connection']) def __del__(self): self.close() def __init__(self, truerun=False, **kwargs): psycopg2_params = ["host", "user", "port", "password"] connection_params = bdparser.filterParams(psycopg2_params, kwargs) connection_params['dbname'] = 'blackdynamite' if ("password" in connection_params and connection_params["password"] == 'ask'): connection_params["password"] = getpass.getpass() logger.debug('connection arguments: {0}'.format(connection_params)) try: self.connection = psycopg2.connect(**connection_params) logger.debug('connected to base') except Exception as e: logger.error( "Connection failed: check your connection settings:\n" + str(e)) sys.exit(-1) assert(isinstance(self.connection, psycopg2._psycopg.connection)) self.dbhost = (kwargs["host"] if "host" in kwargs.keys() else "localhost") if 'user' in kwargs: self.user = kwargs["user"] else: self.user = os.getlogin() if ("should_not_check_study" not in kwargs): self.checkStudy(kwargs) self.schema = kwargs["user"] + '_' + kwargs["study"] self.createTypeCodes() self.truerun = truerun if("list_parameters" in kwargs and kwargs["list_parameters"] is True): myjob = job.Job(self) myjob.prepare() message = "" message += ("*"*65 + "\n") message += ("Job parameters:\n") message += ("*"*65 + "\n") params = [str(j[0]) + ": " + str(j[1]) for j in myjob.types.iteritems()] message += ("\n".join(params)+"\n") myrun = run.Run(self) myrun.prepare() message += ("*"*65 + "\n") message += ("Run parameters:\n") message += ("*"*65 + "\n") params = [str(j[0]) + ": " + str(j[1]) for j in myrun.types.iteritems()] message += ("\n".join(params)) logger.info("\n{0}".format(message)) sys.exit(0) ################################################################ if __name__ == "__main__": connection = psycopg2.connect(host="localhost") job_description = job.Job(dict(hono=int, lulu=float, toto=str)) base = Base("honoluluSchema", connection, job_description) base.create() connection.commit() base.pushJob(dict(hono=12, lulu=24.2, toto="toto")) base.pushQuantity("ekin", "float") connection.commit() diff --git a/scripts/studyInfo.py b/scripts/studyInfo.py index 6f7a5ed..4afbdb9 100755 --- a/scripts/studyInfo.py +++ b/scripts/studyInfo.py @@ -1,100 +1,114 @@ #!/usr/bin/env python from __future__ import print_function ################################################################ import BlackDynamite as BD import psycopg2 ################################################################ def printDataBaseInfo(base, params): curs = base.connection.cursor() curs.execute("select current_database()") datname = curs.fetchone()[0] curs.execute(""" select d.datname, pg_catalog.pg_size_pretty(pg_catalog.pg_database_size(d.datname)) from pg_catalog.pg_database d where (d.datname) = ('{0}') """.format(datname)) datsize = curs.fetchone()[1] print ("Database:", datname, datsize) +def printUserInfo(base, params): + users = base.getUserList() + print('registered Users: {0}'.format(', '.join(users))) + + def printStudyInfo(base, study, params): curs = base.connection.cursor() curs.execute(""" select sz from (SELECT SUM(pg_total_relation_size(quote_ident(schemaname) || '.' || quote_ident(tablename)))::BIGINT FROM pg_tables WHERE schemaname = '{0}') as sz """.format(study)) try: schemasize = curs.fetchone()[0] curs.execute(""" select pg_size_pretty(cast({0} as bigint)) """.format(schemasize)) except psycopg2.ProgrammingError: raise Exception("could not fetch study '{0}'".format(study)) schemasize = curs.fetchone()[0] curs.execute(""" select count({0}.runs.id) from {0}.runs """.format(study)) nruns = curs.fetchone()[0] curs.execute(""" select count({0}.jobs.id) from {0}.jobs """.format(study)) njobs = curs.fetchone()[0] - print('{:40}'.format(study), + owner = base.getStudyOwner(study) + print('{:10}:'.format(owner), + '{:40}'.format(study), + '#jobs:{:7} '.format(njobs), '#runs:{:7} '.format(nruns), 'size:', schemasize) if params['full'] is False: return # # curs.execute(""" #SELECT distinct(schemaname,tablename) FROM pg_catalog.pg_tables # where (tableowner, schemaname) = ('{0}', '{1}') #""".format(datname,study)) # tables = [res[0][1:-1].split(',') for res in curs] # # print (tables) # for t in tables: # curs.execute(""" #SELECT count(id) FROM {0}.{1} #""".format(t[0], t[1])) # cardinal = curs.fetchone()[0] # print (t[0], t[1],': #',cardinal) def fetchInfo(base, params): + + printUserInfo(base, params) + if "study" not in params: - study_list = base.getSchemaList() + study_list = base.getSchemaList(filter_names=False) else: study_list = [params['study']] printDataBaseInfo(base, params) for s in study_list: - printStudyInfo(base, s, params) + try: + printStudyInfo(base, s, params) + except: + pass def main(argv=None): if (type(argv) == str): argv = argv.split() parser = BD.BDParser() parser.register_params( group="studyInfo.py", params={"full": bool, "study": str}, help={"full": "Say that you want details (can be costful)", "study": "specify a study to analyse"}) params = parser.parseBDParameters(argv=argv) params["should_not_check_study"] = True mybase = BD.Base(**params) fetchInfo(mybase, params) if __name__ == '__main__': main()