diff --git a/BlackDynamite/lowercase_btree.py b/BlackDynamite/lowercase_btree.py index 44ace7a..d41d7ee 100644 --- a/BlackDynamite/lowercase_btree.py +++ b/BlackDynamite/lowercase_btree.py @@ -1,124 +1,127 @@ #!/usr/bin/env python3 from . import bdlogging ################################################################ import BTrees import persistent ################################################################ print = bdlogging.invalidPrint logger = bdlogging.getLogger(__name__) BTree = BTrees.OOBTree.BTree ################################################################ class _LowerCaseBTree(object): def __init__(self, key_string='key_'): self.entries = BTree() self.key_string = key_string def __delitem__(self, attr): - del self.entries[attr.lower()] + key = attr + if isinstance(key, str): + key = key.lower() + del self.entries[key] def __getattr__(self, attr): if 'entries' not in self.__dict__: raise AttributeError(attr) key = attr.lower() _key = key # logger.error(self.__dict__) key_string = self.__dict__['key_string'] if isinstance(key, str) and key.startswith(key_string): _key = key[len(key_string):] # logger.error(_key) try: _key = int(_key) except ValueError: pass # logger.error(_key) try: if _key in self.entries: return self.entries[_key] elif key in self.entries: return self.entries[key] else: raise AttributeError(attr) except TypeError: # logger.error(attr) raise AttributeError(attr) def __setattr__(self, attr, value): if persistent.Persistent._p_setattr(self, attr, value): return self._p_changed = True key = attr.lower() if key == 'entries' or key == 'key_string': try: object.__setattr__(self, key, value) except TypeError: persistent.Persistent.__setattr__(self, key, value) # logger.error(f"set attr {attr} {value}") entries = self.entries # logger.error(f"set attr {[ e for e in entries.keys()]}") if key in entries: # logger.error(f"set attr in entries {attr} {value}") self.__setitem__(attr, value) self.entries._p_changed = True else: try: object.__setattr__(self, attr, value) except TypeError: persistent.Persistent.__setattr__(self, attr, value) def __getitem__(self, index): if isinstance(index, str): index = index.lower() return self.entries[index] def keys(self): return self.entries.keys() def __iter__(self): return self.entries.__iter__() def __setitem__(self, index, value): if isinstance(index, str): index = index.lower() self.entries[index] = value def items(self): # logger.error(f'{[e for e in self.entries.items()]}') return self.entries.items() def __deepcopy__(self, memo): cp = self.__class__() for k, v in self.entries.items(): cp.entries[k] = v return cp def setEntries(self, params): for p, val in params.items(): if p in self.types: self.entries[p] = val def __dir__(self): def valid_start_character(_str): return not _str[0].isdigit() return ([e for e in self.entries.keys() if type(e) == str and valid_start_character(e)] + [f'{self.key_string}{e}' for e in self.entries.keys() if type(e) == str and not valid_start_character(e)] + [f'{self.key_string}{e}' for e in self.entries.keys()if type(e) == int]) ################################################################ class PersistentLowerCaseBTree(persistent.Persistent, _LowerCaseBTree): def __init__(self, *args, **kwargs): persistent.Persistent.__init__(self) _LowerCaseBTree.__init__(self, *args, **kwargs) diff --git a/BlackDynamite/run_zeo.py b/BlackDynamite/run_zeo.py index f66e6eb..f8d4685 100755 --- a/BlackDynamite/run_zeo.py +++ b/BlackDynamite/run_zeo.py @@ -1,387 +1,391 @@ #!/usr/bin/env python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . ################################################################ import BTrees from . import conffile_zeo from . import zeoobject from . import bdparser from . import base from .base_zeo import BaseZEO from . import runselector from . import bdlogging import ZODB.blob as blob ################################################################ import numpy as np import datetime import subprocess import socket import os ################################################################ __all__ = ['RunZEO', 'getRunFromScript'] print = bdlogging.invalidPrint logger = bdlogging.getLogger(__name__) # PBTree = lowercase_btree.PersistentLowerCaseBTree BTree = BTrees.OOBTree.BTree ################################################################ class UnknownQuantity(RuntimeError): pass class RunZEO(zeoobject.ZEOObject): """ """ table_name = 'runs' def getJob(self): return BaseZEO.singleton_base.getJobFromID(self.entries["job_id"]) def start(self): # logger.error(self.entries['state']) self.entries['state'] = 'START' # logger.error(self['state']) logger.debug('starting run') BaseZEO.singleton_base.commit() logger.debug('commited') def finish(self): self.entries['state'] = 'FINISHED' logger.debug('finish run') BaseZEO.singleton_base.commit() logger.debug('commited') def attachToJob(self, job): # logger.error(f"attach job {job.id}") self["job_id"] = job.id BaseZEO.singleton_base.insert(self) def getExecFile(self): conf_exec = self.configfiles[self.exec] return self.getUpdatedConfigFile(conf_exec) def setExecFile(self, file_name, **kwargs): # check if the file is already in the config files for _id, f in self.configfiles.items(): if f.filename == file_name: self.entries["exec"] = f.id return f.id # the file is not in the current config files # so it has to be added conf = conffile_zeo.addFile( file_name, BaseZEO.singleton_base, **kwargs) self.configfiles[conf.id] = conf self.entries["exec"] = conf.id return conf.id def listFiles(self, subdir=""): """List files in run directory / specified sub-directory""" command = 'ls {0}'.format(os.path.join(self['run_path'], subdir)) if not self['machine_name'] == socket.gethostname(): command = 'ssh {0} "{1}"'.format(self['machine_name'], command) logger.info(command) p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) out = p.stdout.readlines() out = [o.strip().decode() for o in out] return out def getFile(self, filename, outpath='/tmp'): dest_path = os.path.join( outpath, "BD-" + self.base.schema + "-cache", "run-{0}".format(self.id)) dest_file = os.path.join(dest_path, filename) full_filename = self.getFullFileName(filename) # Check if file is local if os.path.isfile(full_filename): return full_filename # If file is distant, prepare cache directory hierarchy dest_path = os.path.dirname(dest_file) logger.debug('Directories: ' + dest_path) logger.debug('File: ' + dest_file) # Making directories try: os.makedirs(dest_path, exist_ok=True) except Exception as e: logger.error(e) pass if os.path.isfile(dest_file): logger.info('File {} already cached'.format(dest_file)) return dest_file cmd = 'scp {0}:{1} {2}'.format(self['machine_name'], self.getFullFileName(filename), dest_file) logger.info(cmd) p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) errors = bytes(p.stderr.read()).decode().strip() if errors: logger.warning(errors) return dest_file def getFullFileName(self, filename): return os.path.join(self['run_path'], filename) def addConfigFiles(self, file_list, regex_params=None): if not type(file_list) == list: file_list = [file_list] params_list = list(self.types.keys()) myjob = BaseZEO.singleton_base.Job(BaseZEO.singleton_base) params_list += list(myjob.types.keys()) # logger.debug (regex_params) # file_ids = [f for f in self.configfiles] files_to_add = [ conffile_zeo.addFile( fname, BaseZEO.singleton_base, regex_params=regex_params, params=params_list) for fname in file_list] for f in files_to_add: if (f.id not in self.configfiles): self.configfiles[f.id] = f BaseZEO.singleton_base.commit() return self.configfiles def getConfigFiles(self): conffiles = [self.getUpdatedConfigFile(f) for _id, f in self.configfiles.items()] return conffiles def getConfigFile(self, file_id): return self.configfiles[file_id] def replaceBlackDynamiteVariables(self, text): myjob = BaseZEO.singleton_base.Job(BaseZEO.singleton_base) myjob["id"] = self.entries["job_id"] myjob = myjob.getMatchedObjectList()[0] for key, val in myjob.entries.items(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): raise Exception("unset job parameter " + key) text = tmp for key, val in self.entries.items(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): logger.debug(self.entries) raise Exception("unset run parameter " + key) text = tmp text = text.replace("__BLACKDYNAMITE__dbhost__", BaseZEO.singleton_base.dbhost) text = text.replace("__BLACKDYNAMITE__study__", BaseZEO.singleton_base.schema) text = text.replace("__BLACKDYNAMITE__run_id__", str(self.id)) return text def getUpdatedConfigFile(self, conf): conf["file"] = self.replaceBlackDynamiteVariables(conf["file"]) return conf def listQuantities(self): return BaseZEO.singleton_base.quantities def getLastStep(self): if 'last_step' in self.entries: return self.last_step, self.last_step_time else: return None, None def getScalarQuantity(self, name, additional_request=None): if name not in self.quantities: raise UnknownQuantity( f"for run {self}\n" f"unknown quantity '{name}'\n" f"possible quantities are {[e for e in self.quantities.keys()]}") step, array = self.getQuantityArrayFromBlob(name) return step, array def getScalarQuantities(self, names, additional_request=None): res = [] for q in names: try: step, array = self.getScalarQuantity(q) res.append((q, step, array)) except UnknownQuantity: logger.warning(f'run {self.id} has no quantity: {q}') return None return res def getVectorQuantity(self, name, step): step_array, array = self.getQuantityArrayFromBlob(name) i = np.where(step_array == step)[0] if i.shape[0] == 0: raise RuntimeError('the step {step} could not be found') if i.shape[0] > 1: raise RuntimeError('the step {step} was registered more than once') i = i[0] return array[i] @zeoobject._transaction def saveStepTimeStamp(self, step): self.last_step = step self.last_step_time = datetime.now() @zeoobject._transaction def pushVectorQuantity(self, vec, step, name, is_integer=None, description=None): quantities = BaseZEO.singleton_base.quantities quantities.add(name) if name not in self.quantities: list_vec = np.array([vec], dtype=object) array_step = np.array([step]) else: array_step, list_vec = self.getQuantityArrayFromBlob(name) array_step = np.append(array_step, [step], axis=0) if (len(list_vec.shape) == 2 and list_vec.shape[1] != vec.shape[0]) or len(list_vec.shape) == 1: list_vec = np.array( [e for e in list_vec] + [vec], dtype=object) else: list_vec = np.append(list_vec, [vec], axis=0) self.saveQuantityArrayToBlob(name, array_step, list_vec) self.saveStepTimeStamp(step) BaseZEO.singleton_base.commit() @zeoobject._transaction def pushScalarQuantity(self, val, step, name, is_integer=None, description=None): quantities = BaseZEO.singleton_base.quantities quantities.add(name) if name not in self.quantities: array_val = np.array([val]) array_step = np.array([step]) else: array_step, array_val = self.getQuantityArrayFromBlob(name) array_step = np.append(array_step, [step], axis=0) array_val = np.append(array_val, [val], axis=0) self.saveQuantityArrayToBlob(name, array_step, array_val) self.saveStepTimeStamp(step) BaseZEO.singleton_base.commit() def getQuantityBlob(self, name): if name not in self.quantities: logger.info(f'create quantity {name}') self.quantities[name] = blob.Blob() return self.quantities[name] def getQuantityArrayFromBlob(self, name): buf = self.getQuantityBlob(name).open() # logger.error(buf.name) try: _f = np.load(buf, allow_pickle=True) except IOError as e: logger.error(e) raise RuntimeError( f"Cannot read file {buf.name} for quantity {name}") # logger.error(f'{name} {_f["step"]}') return _f['step'], _f['val'] def saveQuantityArrayToBlob(self, name, array_step, array_val): buf = self.getQuantityBlob(name).open('w') # logger.error(f'{name} {buf.name}') # logger.error(f'{name} {array_step}') np.savez_compressed(buf, val=array_val, step=array_step) def getAllVectorQuantity(self, name): quantity_id, is_integer, is_vector = self.getQuantityID( name, is_vector=True) request = """ SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) order by step """.format(self.base.schema, "vector_real" if is_integer is False else "vector_integer", self.id, quantity_id) curs = self.base.performRequest(request, [name]) fetch = curs.fetchall() if (not fetch): return [None, None] matres = np.array([val[1] for val in fetch]) stepres = np.array([val[0] for val in fetch]) return (stepres, matres) + def delete(self): + del self.base.runs[self.id] + self.base.commit() + def deleteData(self): del self.quantities self.quantities = BTree() - BaseZEO.singleton_base.commit() + self.base.commit() def __init__(self, base): super().__init__(base) self.configfiles = BTree() self.quantities = BTree() # logger.error(self.quantities) base.prepare(self, 'run_desc') self['id'] = None self.types['id'] = int self.types["machine_name"] = str self.types["run_path"] = str self.allowNull["run_path"] = True self.types["job_id"] = int self.types["nproc"] = int self.types["run_name"] = str self.types["wait_id"] = int self.allowNull["wait_id"] = True self.types["start_time"] = datetime.datetime self.allowNull["start_time"] = True self.types["state"] = str self.allowNull["state"] = True self.types["exec"] = str self.types["last_step"] = int self.types["last_step_time"] = datetime.datetime self["last_step"] = None self["last_step_time"] = None self["start_time"] = None self["wait_id"] = None ################################################################ def getRunFromScript(): parser = bdparser.BDParser() parser.register_params(params={"run_id": int}) params = parser.parseBDParameters(argv=[]) mybase = base.Base(**params) runSelector = runselector.RunSelector(mybase) run_list = runSelector.selectRuns(params) if len(run_list) > 1: raise Exception(f'internal error {params}') if len(run_list) == 0: raise Exception(f'internal error {params}') myrun, myjob = run_list[0] # myrun.setEntries(params) return myrun, myjob