diff --git a/BlackDynamite/base_zeo.py b/BlackDynamite/base_zeo.py index 0e04341..6634c68 100644 --- a/BlackDynamite/base_zeo.py +++ b/BlackDynamite/base_zeo.py @@ -1,392 +1,396 @@ #!/usr/bin/env python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . ################################################################ from . import bdparser from . import bdlogging from . import base from . import conffile_zeo from . import zeoobject from . import lowercase_btree from .constraints_zeo import ZEOconstraints ################################################################ import re import os import subprocess import ZEO import ZODB import sys from BTrees.OOBTree import OOSet, BTree from . import job from . import run_zeo import psutil ################################################################ __all__ = ["BaseZEO"] print = bdlogging.invalidPrint logger = bdlogging.getLogger(__name__) PBTree = lowercase_btree.PersistentLowerCaseBTree ################################################################ def check_socket(socket_name): if not os.path.exists(socket_name): return False conns = psutil.net_connections(kind='all') addrs = [s.laddr for s in conns if s.laddr != ''] for a in addrs: if a == socket_name: logger.info("Found already running zeo server") return True return False class BaseZEO(base.AbstractBase): """ """ singleton_base = None def __init__(self, truerun=False, creation=False, read_only=False, **kwargs): BaseZEO.singleton_base = self self.Job = job.JobZEO self.Run = run_zeo.RunZEO self.ConfFile = conffile_zeo.ConfFile self.BDconstraints = ZEOconstraints zeo_params = ["host"] connection_params = bdparser.filterParams(zeo_params, kwargs) logger.info('connection arguments: {0}'.format(connection_params)) self.filename = connection_params['host'] filename_split = self.filename.split('://') if filename_split[0] != 'zeo': raise RuntimeError( f"wrong protocol with this database: {type(self)}") self.filename = filename_split[1] dirname = os.path.dirname(self.filename) if dirname == '': dirname = os.path.abspath('./') socket_name = os.path.join(dirname, 'zeo.socket') zeo_server_conf_filename = os.path.join(dirname, 'zeo.conf') if not check_socket(socket_name): zeo_server_conf = f''' address {socket_name} path bd.zeo blob-dir bd.blob path zeo.log format %(asctime)s %(message)s ''' with open(zeo_server_conf_filename, 'w') as f: f.write(zeo_server_conf) cmd = "runzeo -C zeo.conf" logger.error("Spawning new zeo server: " + cmd) self.process = subprocess.Popen( cmd, shell=True, cwd=dirname) # stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: self.connection = ZEO.connection( socket_name, read_only=read_only, server_sync=True, blob_dir=os.path.join(dirname, 'bd.blob'), shared_blob_dir=True, ) self.root = self.connection.root logger.debug('connected to base') except Exception as e: logger.error( "Connection failed: check your connection settings:\n" + str(e)) sys.exit(-1) assert(isinstance(self.connection, ZODB.Connection.Connection)) self.dbhost = (kwargs["host"] if "host" in kwargs.keys() else "localhost") super().__init__(connection=self.connection, truerun=truerun, creation=creation, **kwargs) def getSchemaList(self, filter_names=True): try: schemas = self.root.schemas except AttributeError: self.root.schemas = PBTree(key_string='study_') schemas = self.root.schemas filtered_schemas = [] if filter_names is True: for s in schemas: m = re.match('{0}_(.+)'.format(self.user), s) if m: s = m.group(1) filtered_schemas.append(s) else: filtered_schemas = schemas return filtered_schemas def getStudySize(self, study): curs = self.connection.cursor() try: logger.info(study) curs.execute(""" select sz from (SELECT SUM(pg_total_relation_size(quote_ident(schemaname) || '.' || quote_ident(tablename)))::BIGINT FROM pg_tables WHERE schemaname = '{0}') as sz """.format(study)) size = curs.fetchone()[0] curs.execute(""" select pg_size_pretty(cast({0} as bigint)) """.format(size)) size = curs.fetchone()[0] curs.execute(""" select count({0}.runs.id) from {0}.runs """.format(study)) nruns = curs.fetchone()[0] curs.execute(""" select count({0}.jobs.id) from {0}.jobs """.format(study)) njobs = curs.fetchone()[0] except psycopg2.ProgrammingError: self.connection.rollback() size = '????' return {'size': size, 'nruns': nruns, 'njobs': njobs} def createSchema(self, params={"yes": False}): # create the schema of the simulation if not hasattr(self.root, 'schemas'): self.root.schemas = PBTree(key_string='study_') if self.schema in self.root.schemas: validated = bdparser.validate_question( "Are you sure you want to drop the schema named '" + self.schema + "'", params, False) if validated is True: del self.root.schemas[self.schema] else: logger.debug("creation canceled: exit program") sys.exit(-1) self.root.schemas[self.schema] = PBTree() self.root.schemas[self.schema]['Quantities'] = OOSet() self.root.schemas[self.schema]['Jobs'] = PBTree(key_string='job_') self.root.schemas[self.schema]['JobsIndex'] = BTree() self.root.schemas[self.schema]['RunsIndex'] = BTree() self.root.schemas[self.schema]['Runs'] = PBTree(key_string='run_') self.root.schemas[self.schema]['ConfigFiles'] = BTree() self.root.schemas[self.schema]['Jobs_counter'] = 1 self.root.schemas[self.schema]['Runs_counter'] = 1 def prepare(self, obj, descriptor): if not hasattr(self.root, 'schemas'): return if descriptor in self.root.schemas[self.schema]: desc = self.root.schemas[self.schema][descriptor] for t in desc.types.keys(): obj.types[t] = desc.types[t] if t not in obj: obj.t = None def createBase(self, job_desc, run_desc, quantities={}, **kwargs): self.createSchema(kwargs) self.root.schemas[self.schema]['job_desc'] = job_desc self.root.schemas[self.schema]['run_desc'] = run_desc for qname, type in quantities.items(): self.pushQuantity(qname, type) if self.truerun: self.commit() @property def configfiles(self): return self.root.schemas[self.schema]['ConfigFiles'] def _get_jobs(self): return self.root.schemas[self.schema]['Jobs'] @property def jobs(self): return self._get_jobs() @jobs.setter def jobs(self, val): self.root.schemas[self.schema]['Jobs'] = val @property def jobs_index(self): return self._get_jobs_index() @jobs_index.setter def jobs_index(self, val): self.root.schemas[self.schema]['JobsIndex'] = val @property def quantities(self): return self.root.schemas[self.schema]['Quantities'] @quantities.setter def quantities(self, value): self.root.schemas[self.schema]['Quantities'] = value @property def jobs_counter(self): return self.root.schemas[self.schema]['Jobs_counter'] @jobs_counter.setter def jobs_counter(self, val): self.root.schemas[self.schema]['Jobs_counter'] = val def _get_runs(self): return self.root.schemas[self.schema]['Runs'] def _get_runs_index(self): return self.root.schemas[self.schema]['RunsIndex'] def _get_jobs_index(self): return self.root.schemas[self.schema]['JobsIndex'] @property def runs(self): return self._get_runs() @runs.setter def runs(self, val): self.root.schemas[self.schema]['Runs'] = val @property def runs_counter(self): return self.root.schemas[self.schema]['Runs_counter'] @runs_counter.setter def runs_counter(self, val): self.root.schemas[self.schema]['Runs_counter'] = val def select(self, _types, constraints=None, sort_by=None): if not isinstance(_types, list): _types = [_types] _type = _types[0] if isinstance(_type, zeoobject.ZEOObject): _type = type(_type) if _type == self.Job: obj_container = self._get_jobs() elif _type == self.Run: obj_container = self._get_runs() else: raise RuntimeError(f'{type(_types)}') if (sort_by is not None) and (not isinstance(sort_by, str)): raise RuntimeError( 'sort_by argument is not correct: {0}'.format(sort_by)) if isinstance(constraints, zeoobject.ZEOObject): if hasattr(constraints, 'id') and constraints.id is not None: obj = obj_container[constraints.id] if isinstance(obj, self.Run): obj = (obj, self._get_jobs()[obj.job_id]) return [obj] else: + constraints = constraints.copy() + constraints.evalFunctorEntries() params = constraints.get_params() keys = constraints.get_keys() n_params = len(keys) if len(params) == n_params: if params in self.jobs_index: return [self.jobs[self.jobs_index[params]]] else: return [] const = ZEOconstraints(self, constraints) condition = const.getMatchingCondition() obj_list = [] for key, obj in obj_container.items(): objs = [obj] if _type == self.Run: j = self._get_jobs()[obj.job_id] objs.append(j) if condition(objs): if len(objs) == 1: obj_list.append(objs[0]) else: obj_list.append(objs) return obj_list def insert(self, zeoobject, keep_state=False): if isinstance(zeoobject, self.Job): objs = self.jobs zeoobject = zeoobject.copy() + zeoobject.evalFunctorEntries() + logger.debug(zeoobject) if not keep_state: zeoobject['id'] = self.jobs_counter self.jobs_counter += 1 params = zeoobject.get_params() self.jobs_index[params] = zeoobject['id'] elif isinstance(zeoobject, self.Run): objs = self.runs zeoobject = zeoobject.copy() if not keep_state: zeoobject["id"] = self.runs_counter zeoobject["state"] = 'CREATED' job_id = zeoobject['job_id'] run_id = zeoobject['id'] job = self._get_jobs()[job_id] if not hasattr(job, 'runs'): job.runs = PBTree(key_string='runs_') job.runs[run_id] = zeoobject self.runs_counter += 1 else: raise RuntimeError( f'cannot insert object of type {type(zeoobject)}') objs[zeoobject.id] = zeoobject.copy() # logger.error(f'inserted {zeoobject.id} {objs[zeoobject.id]}') def setObjectItemTypes(self, zeoobject): if isinstance(zeoobject, self.Job): zeoobject.types = self.root.schemas[self.schema]['job_desc'].types elif isinstance(zeoobject, self.Run): zeoobject.types = self.root.schemas[self.schema]['run_desc'].types else: raise RuntimeError(f'{type(zeoobject)}') def commit(self): import transaction transaction.commit() def pack(self): self.connection.db().pack() def close(self): import transaction transaction.abort() ################################################################ diff --git a/BlackDynamite/zeoobject.py b/BlackDynamite/zeoobject.py index bd70730..b932a27 100644 --- a/BlackDynamite/zeoobject.py +++ b/BlackDynamite/zeoobject.py @@ -1,219 +1,225 @@ #!/usr/bin/env python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . ################################################################ from . import bdlogging from . import lowercase_btree ################################################################ import copy import re import sys import persistent import transaction from ZODB.POSException import ConflictError ################################################################ print = bdlogging.invalidPrint logger = bdlogging.getLogger(__name__) BTree = lowercase_btree._LowerCaseBTree PBTree = lowercase_btree.PersistentLowerCaseBTree ################################################################ def _transaction(foo): def _protected_transation(self, *args, **kwargs): saved_state = self.__getstate__() max_attempts = 10 attempts = 0 while True: try: foo(self, *args, **kwargs) except ConflictError as e: # logger.error('***************CONFLICT************') transaction.abort() new_state = self.__getstate__() found_diff = False for k, v in saved_state.items(): if k in ['types', 'quantities']: continue if k in new_state and v != new_state[k]: logger.error( f'attempt {attempts} diff state for key {k}: {v} != {new_state[k]}') if k == 'configfiles': for f in v: logger.error(f.file) for f in new_state[k]: logger.error(f.file) found_diff = True attempts += 1 if found_diff: raise e if attempts == max_attempts: raise e else: break return _protected_transation class ZEOObject(persistent.Persistent, BTree): " The generic object related to entries in the database " def commit(self): from .base_zeo import BaseZEO BaseZEO.singleton_base.commit() def __setattr__(self, attr, value): BTree.__setattr__(self, attr, value) def setFields(self, constraints): for cons in constraints: _regex = "(\w*)\s*=\s*(.*)" match = re.match(_regex, cons) if (not match or (not len(match.groups()) == 2)): print("malformed assignment: " + cons) sys.exit(-1) key = match.group(1).lower().strip() val = match.group(2) if key not in self.types: print("unknown key '{0}'".format(key)) print("possible keys are:") for k in self.types.keys(): print("\t" + k) sys.exit(-1) val = self.types[key](val) self.entries[key] = val def __init__(self): persistent.Persistent.__init__(self) BTree.__init__(self) super().__init__() self.allowNull = {} self.types = PBTree() self.operators = {} def __getstate__(self): "Get the state of the object for a pickling operations" state = {} for k in self.__dict__.keys(): if k == 'base': continue state[k] = self.__dict__[k] return state _flag_debug = False def __setstate__(self, state): for k in state.keys(): self.__dict__[k] = state[k] if self._flag_debug: raise def copy(self): return copy.deepcopy(self) def __deepcopy__(self, memo): _cp = type(self)() for k in self.__dict__.keys(): if k == 'base': continue _cp.__dict__[k] = copy.deepcopy(self.__dict__[k]) return _cp @_transaction def update(self): from .base_zeo import BaseZEO if isinstance(self, BaseZEO.singleton_base.Job): obj_list = BaseZEO.singleton_base._get_jobs() elif isinstance(self, BaseZEO.singleton_base.Run): obj_list = BaseZEO.singleton_base._get_runs() else: raise RuntimeError("undefined yet") obj = obj_list[self.id] # logger.error(self.base) # logger.error(obj) for key, value in self.entries.items(): # logger.error(f'update {key}: {obj[key]} -> value') obj[key] = value BaseZEO.singleton_base.commit() def createTableRequest(self): self.base.root.schemas[self.base.schema]['default_job'] = self def matchConstraint(self, constraint): # case it is an object of same type if isinstance(constraint, type(self)): for key, value in self.items(): if constraint[key] != value: return False return True # case it is a list/dict of constraints to evaluate else: logger.error(type(self)) logger.error(constraint) for key, value in self.items(): if key not in constraint: continue if constraint[key] != value: return False return True raise RuntimeError("toimplement") def getMatchedObjectList(self): from .base_zeo import BaseZEO return BaseZEO.singleton_base.select(self, self) def __repr__(self): type_prefix = 'object:\n' entries = self.entries keys = set(self.entries.keys()) keys.remove('id') if not len(keys): type_prefix = 'descriptor:\n' entries = self.types if not len(entries.keys()): return "Empty ZEO object" outputs = [] for k, v in sorted(entries.items()): if k == 'id' and v is None: continue outputs += [' ' + k + ": " + str(v)] return type(self).__name__ + ' ' + type_prefix + "\n".join(outputs) def get_params(self): params = tuple( [v for e, v in self.entries.items() if e != 'id']) return params def get_keys(self): keys = tuple( [e for e, v in self.entries.items() if e != 'id']) return keys + def evalFunctorEntries(self): + keys = self.get_keys() + for k in keys: + if callable(self.entries[k]): + self.entries[k] = self.entries[k](self) + @property def base(self): from .base_zeo import BaseZEO return BaseZEO.singleton_base diff --git a/scripts/createRuns.py b/scripts/createRuns.py index d8640a2..181cb9e 100755 --- a/scripts/createRuns.py +++ b/scripts/createRuns.py @@ -1,60 +1,62 @@ #!/usr/bin/env python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # First we need to set the python headers # and to import the blackdynamite modules import BlackDynamite as BD import yaml import tqdm fname = 'bd.yaml' with open(fname) as f: config = yaml.load(f, Loader=yaml.SafeLoader) # import a runparser (instead of a generic BD parser) parser = BD.RunParser() params = parser.parseBDParameters() params['study'] = config['study'] # Then we can connect to the black dynamite database base = BD.Base(**params) # create a run object myrun = base.Run() # set the run parameters from the parsed entries myrun.setEntries(params) # add a configuration file for f in config['config_files']: myrun.addConfigFiles(f) # set the entry point (executable) file myrun.setExecFile(config['exec_file']) # create a job selector jobSelector = BD.JobSelector(base) # select the jobs that should be associated with the runs about to be created job_list = jobSelector.selectJobs(params) # create the runs for j in tqdm.tqdm(job_list): for param, v in config['run_space'].items(): + if isinstance(v, str): + v = eval(v) myrun[param] = myrun.types[param](v) myrun.attachToJob(j) # if truerun, commit the changes to the base if (params["truerun"] is True): base.commit()