diff --git a/BlackDynamite/base_zeo.py b/BlackDynamite/base_zeo.py index 5aa438b..9a5d216 100644 --- a/BlackDynamite/base_zeo.py +++ b/BlackDynamite/base_zeo.py @@ -1,339 +1,355 @@ #!/usr/bin/env python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . ################################################################ from . import bdparser from . import bdlogging from . import base from . import conffile_zeo from . import zeoobject from . import lowercase_btree from .constraints_zeo import ZEOconstraints ################################################################ import re import os import subprocess import ZEO import ZODB import sys from BTrees.OOBTree import OOSet from . import job from . import run_zeo ################################################################ __all__ = ["BaseZEO"] print = bdlogging.invalidPrint logger = bdlogging.getLogger(__name__) PBTree = lowercase_btree.PersistentLowerCaseBTree ################################################################ class BaseZEO(base.AbstractBase): """ """ singleton_base = None # def __del__(self): # if hasattr(self, 'process'): # self.process.kill() def __init__(self, truerun=False, creation=False, read_only=False, **kwargs): BaseZEO.singleton_base = self self.Job = job.JobZEO self.Run = run_zeo.RunZEO self.ConfFile = conffile_zeo.ConfFile self.BDconstraints = ZEOconstraints zeo_params = ["host"] connection_params = bdparser.filterParams(zeo_params, kwargs) logger.info('connection arguments: {0}'.format(connection_params)) self.filename = connection_params['host'] filename_split = self.filename.split('://') if filename_split[0] != 'zeo': raise RuntimeError( f"wrong protocol with this database: {type(self)}") self.filename = filename_split[1] # logger.error(self.filename) dirname = os.path.dirname(self.filename) # logger.error(dirname) if dirname == '': dirname = './' socket_name = os.path.join(dirname, 'zeo.socket') zeo_server_conf_filename = os.path.join(dirname, 'zeo.conf') # logger.error(socket_name) # logger.error(zeo_server_conf_filename) if not os.path.exists(socket_name): zeo_server_conf = f''' address ./zeo.socket path bd.zeo blob-dir bd.blob path zeo.log format %(asctime)s %(message)s ''' with open(zeo_server_conf_filename, 'w') as f: f.write(zeo_server_conf) cmd = "runzeo -C zeo.conf" logger.error(cmd) self.process = subprocess.Popen( cmd, shell=True, cwd=dirname) # stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: self.connection = ZEO.connection( socket_name, read_only=read_only, server_sync=True, blob_dir=os.path.join(dirname, 'bd-client.blob')) self.root = self.connection.root logger.debug('connected to base') except Exception as e: logger.error( "Connection failed: check your connection settings:\n" + str(e)) sys.exit(-1) assert(isinstance(self.connection, ZODB.Connection.Connection)) self.dbhost = (kwargs["host"] if "host" in kwargs.keys() else "localhost") super().__init__(connection=self.connection, truerun=truerun, creation=creation, **kwargs) def getSchemaList(self, filter_names=True): try: schemas = self.root.schemas except AttributeError: self.root.schemas = PBTree(key_string='study_') schemas = self.root.schemas filtered_schemas = [] if filter_names is True: for s in schemas: m = re.match('{0}_(.+)'.format(self.user), s) if m: s = m.group(1) filtered_schemas.append(s) else: filtered_schemas = schemas return filtered_schemas def getStudySize(self, study): curs = self.connection.cursor() try: logger.info(study) curs.execute(""" select sz from (SELECT SUM(pg_total_relation_size(quote_ident(schemaname) || '.' || quote_ident(tablename)))::BIGINT FROM pg_tables WHERE schemaname = '{0}') as sz """.format(study)) size = curs.fetchone()[0] curs.execute(""" select pg_size_pretty(cast({0} as bigint)) """.format(size)) size = curs.fetchone()[0] curs.execute(""" select count({0}.runs.id) from {0}.runs """.format(study)) nruns = curs.fetchone()[0] curs.execute(""" select count({0}.jobs.id) from {0}.jobs """.format(study)) njobs = curs.fetchone()[0] except psycopg2.ProgrammingError: self.connection.rollback() size = '????' return {'size': size, 'nruns': nruns, 'njobs': njobs} def createSchema(self, params={"yes": False}): # create the schema of the simulation if not hasattr(self.root, 'schemas'): self.root.schemas = PBTree(key_string='study_') if self.schema in self.root.schemas: validated = bdparser.validate_question( "Are you sure you want to drop the schema named '" + self.schema + "'", params, False) if validated is True: # logger.error(self.root.schemas[self.schema]) # logger.error(type(self.root.schemas[self.schema])) del self.root.schemas[self.schema] else: logger.debug("creation canceled: exit program") sys.exit(-1) self.root.schemas[self.schema] = PBTree() self.root.schemas[self.schema]['Quantities'] = OOSet() self.root.schemas[self.schema]['Jobs'] = PBTree(key_string='job_') self.root.schemas[self.schema]['Runs'] = PBTree(key_string='run_') - self.root.schemas[self.schema]['Jobs_counter'] = 0 - self.root.schemas[self.schema]['Runs_counter'] = 0 + self.root.schemas[self.schema]['Jobs_counter'] = 1 + self.root.schemas[self.schema]['Runs_counter'] = 1 def prepare(self, obj, descriptor): if not hasattr(self.root, 'schemas'): return if descriptor in self.root.schemas[self.schema]: desc = self.root.schemas[self.schema][descriptor] for t in desc.types.keys(): obj.types[t] = desc.types[t] if t not in obj: obj.t = None def createBase(self, job_desc, run_desc, quantities={}, **kwargs): self.createSchema(kwargs) self.root.schemas[self.schema]['job_desc'] = job_desc self.root.schemas[self.schema]['run_desc'] = run_desc for qname, type in quantities.items(): self.pushQuantity(qname, type) if self.truerun: self.commit() def _get_jobs(self): return self.root.schemas[self.schema]['Jobs'] + @property + def jobs(self): + return self._get_jobs() + + @jobs.setter + def jobs(self, val): + self.root.schemas[self.schema]['Jobs'] = val + @property def quantities(self): return self.root.schemas[self.schema]['Quantities'] @quantities.setter def quantities(self, value): self.root.schemas[self.schema]['Quantities'] = value @property def jobs_counter(self): return self.root.schemas[self.schema]['Jobs_counter'] @jobs_counter.setter def jobs_counter(self, val): self.root.schemas[self.schema]['Jobs_counter'] = val def _get_runs(self): return self.root.schemas[self.schema]['Runs'] + @property + def runs(self): + return self._get_runs() + + @runs.setter + def runs(self, val): + self.root.schemas[self.schema]['Runs'] = val + @property def runs_counter(self): return self.root.schemas[self.schema]['Runs_counter'] @runs_counter.setter def runs_counter(self, val): self.root.schemas[self.schema]['Runs_counter'] = val def select(self, _types, constraints=None, sort_by=None): if not isinstance(_types, list): _types = [_types] _type = _types[0] if isinstance(_type, zeoobject.ZEOObject): _type = type(_type) if _type == self.Job: obj_container = self._get_jobs() elif _type == self.Run: obj_container = self._get_runs() else: raise RuntimeError(f'{type(_types)}') if (sort_by is not None) and (not isinstance(sort_by, str)): raise RuntimeError( 'sort_by argument is not correct: {0}'.format(sort_by)) # logger.error(_type) # logger.error(type(constraints)) # logger.error(constraints) if isinstance(constraints, zeoobject.ZEOObject): if hasattr(constraints, 'id') and constraints.id is not None: obj = obj_container[constraints.id] if isinstance(obj, self.Run): obj = (obj, self._get_jobs()[obj.job_id]) return [obj] const = ZEOconstraints(self, constraints) condition = const.getMatchingCondition() obj_list = [] for key, obj in obj_container.items(): obj.base = self objs = [obj] # logger.error(type(obj)) # logger.error(obj.id) if _type == self.Run: j = self._get_jobs()[obj.job_id] j.base = self objs.append(j) if condition(objs): # logger.error(key) # logger.error(obj) # logger.error(_type) if len(objs) == 1: obj_list.append(objs[0]) else: obj_list.append(objs) return obj_list def insert(self, zeoobject, keep_state=False): if isinstance(zeoobject, self.Job): objs = self._get_jobs() zeoobject = zeoobject.copy() if not keep_state: zeoobject['id'] = self.jobs_counter self.jobs_counter += 1 elif isinstance(zeoobject, self.Run): objs = self._get_runs() zeoobject = zeoobject.copy() if not keep_state: zeoobject["id"] = self.runs_counter zeoobject["state"] = 'CREATED' self.runs_counter += 1 else: raise RuntimeError( f'cannot insert object of type {type(zeoobject)}') objs[zeoobject.id] = zeoobject.copy() # logger.error(f'inserted {zeoobject.id} {objs[zeoobject.id]}') def setObjectItemTypes(self, zeoobject): if isinstance(zeoobject, self.Job): zeoobject.types = self.root.schemas[self.schema]['job_desc'].types elif isinstance(zeoobject, self.Run): zeoobject.types = self.root.schemas[self.schema]['run_desc'].types else: raise RuntimeError(f'{type(zeoobject)}') def commit(self): import transaction transaction.commit() def close(self): import transaction transaction.abort() ################################################################