diff --git a/BlackDynamite/run_zeo.py b/BlackDynamite/run_zeo.py index b61987a..b1bd1ce 100755 --- a/BlackDynamite/run_zeo.py +++ b/BlackDynamite/run_zeo.py @@ -1,394 +1,411 @@ #!/usr/bin/env python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . ################################################################ import BTrees from . import conffile_zeo from . import zeoobject from . import bdparser from . import base from .base_zeo import BaseZEO from . import runselector from . import bdlogging import ZODB.blob as blob ################################################################ import numpy as np import datetime import subprocess import socket import os ################################################################ __all__ = ['RunZEO', 'getRunFromScript'] print = bdlogging.invalidPrint logger = bdlogging.getLogger(__name__) # PBTree = lowercase_btree.PersistentLowerCaseBTree BTree = BTrees.OOBTree.BTree ################################################################ class UnknownQuantity(RuntimeError): pass class RunZEO(zeoobject.ZEOObject): """ """ table_name = 'runs' def getJob(self): return BaseZEO.singleton_base.getJobFromID(self.entries["job_id"]) def start(self): # logger.error(self.entries['state']) self.entries['state'] = 'START' # logger.error(self['state']) logger.debug('starting run') BaseZEO.singleton_base.commit() logger.debug('commited') def finish(self): self.entries['state'] = 'FINISHED' logger.debug('finish run') BaseZEO.singleton_base.commit() logger.debug('commited') def attachToJob(self, job): # logger.error(f"attach job {job.id}") self["job_id"] = job.id BaseZEO.singleton_base.insert(self) def getExecFile(self): conf_exec = self.configfiles[self.exec] return self.getUpdatedConfigFile(conf_exec) def setExecFile(self, file_name, **kwargs): # check if the file is already in the config files for _id, f in self.configfiles.items(): if f.filename == file_name: self.entries["exec"] = f.id return f.id # the file is not in the current config files # so it has to be added conf = conffile_zeo.addFile( file_name, BaseZEO.singleton_base, **kwargs) self.configfiles[conf.id] = conf self.entries["exec"] = conf.id return conf.id def listFiles(self, subdir=""): """List files in run directory / specified sub-directory""" command = 'ls {0}'.format(os.path.join(self['run_path'], subdir)) if not self['machine_name'] == socket.gethostname(): command = 'ssh {0} "{1}"'.format(self['machine_name'], command) logger.info(command) p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) out = p.stdout.readlines() out = [o.strip().decode() for o in out] return out def getFile(self, filename, outpath='/tmp'): dest_path = os.path.join( outpath, "BD-" + self.base.schema + "-cache", "run-{0}".format(self.id)) dest_file = os.path.join(dest_path, filename) full_filename = self.getFullFileName(filename) # Check if file is local if os.path.isfile(full_filename): return full_filename # If file is distant, prepare cache directory hierarchy dest_path = os.path.dirname(dest_file) logger.debug('Directories: ' + dest_path) logger.debug('File: ' + dest_file) # Making directories try: os.makedirs(dest_path, exist_ok=True) except Exception as e: logger.error(e) pass if os.path.isfile(dest_file): logger.info('File {} already cached'.format(dest_file)) return dest_file cmd = 'scp {0}:{1} {2}'.format(self['machine_name'], self.getFullFileName(filename), dest_file) logger.info(cmd) p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) errors = bytes(p.stderr.read()).decode().strip() if errors: logger.warning(errors) return dest_file def getFullFileName(self, filename): return os.path.join(self['run_path'], filename) def addConfigFiles(self, file_list, regex_params=None): if not type(file_list) == list: file_list = [file_list] params_list = list(self.types.keys()) myjob = BaseZEO.singleton_base.Job(BaseZEO.singleton_base) params_list += list(myjob.types.keys()) # logger.debug (regex_params) # file_ids = [f for f in self.configfiles] files_to_add = [ conffile_zeo.addFile( fname, BaseZEO.singleton_base, regex_params=regex_params, params=params_list) for fname in file_list] for f in files_to_add: if (f.id not in self.configfiles): self.configfiles[f.id] = f BaseZEO.singleton_base.commit() return self.configfiles def getConfigFiles(self): conffiles = [self.getUpdatedConfigFile(f) for _id, f in self.configfiles.items()] return conffiles def getConfigFile(self, file_id): return self.configfiles[file_id] def replaceBlackDynamiteVariables(self, text): myjob = BaseZEO.singleton_base.Job(BaseZEO.singleton_base) myjob["id"] = self.entries["job_id"] myjob = myjob.getMatchedObjectList()[0] for key, val in myjob.entries.items(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): raise Exception("unset job parameter " + key) text = tmp for key, val in self.entries.items(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): logger.debug(self.entries) raise Exception("unset run parameter " + key) text = tmp text = text.replace("__BLACKDYNAMITE__dbhost__", BaseZEO.singleton_base.dbhost) text = text.replace("__BLACKDYNAMITE__study__", BaseZEO.singleton_base.schema) text = text.replace("__BLACKDYNAMITE__run_id__", str(self.id)) return text def getUpdatedConfigFile(self, conf): conf["file"] = self.replaceBlackDynamiteVariables(conf["file"]) return conf def listQuantities(self): return BaseZEO.singleton_base.quantities def getLastStep(self): if 'last_step' in self.entries: return self.last_step, self.last_step_time else: return None, None def getScalarQuantity(self, name, additional_request=None): if name not in self.quantities: raise UnknownQuantity( f"for run {self}\n" f"unknown quantity '{name}'\n" f"possible quantities are {[e for e in self.quantities.keys()]}") step, array = self.getQuantityArrayFromBlob(name) return step, array def getScalarQuantities(self, names, additional_request=None): res = [] for q in names: try: step, array = self.getScalarQuantity(q) res.append((q, step, array)) except UnknownQuantity: logger.warning(f'run {self.id} has no quantity: {q}') return None return res def getVectorQuantity(self, name, step): step_array, array = self.getQuantityArrayFromBlob(name) i = np.where(step_array == step)[0] if i.shape[0] == 0: raise RuntimeError('the step {step} could not be found') if i.shape[0] > 1: raise RuntimeError('the step {step} was registered more than once') i = i[0] return array[i] @zeoobject._transaction def saveStepTimeStamp(self, step): self.last_step = step self.last_step_time = datetime.datetime.now() @zeoobject._transaction def pushVectorQuantity(self, vec, step, name, is_integer=None, description=None): quantities = BaseZEO.singleton_base.quantities quantities.add(name) if name not in self.quantities: list_vec = np.array([vec], dtype=object) array_step = np.array([step]) else: array_step, list_vec = self.getQuantityArrayFromBlob(name) array_step = np.append(array_step, [step], axis=0) if (len(list_vec.shape) == 2 and list_vec.shape[1] != vec.shape[0]) or len(list_vec.shape) == 1: list_vec = np.array( [e for e in list_vec] + [vec], dtype=object) else: list_vec = np.append(list_vec, [vec], axis=0) self.saveQuantityArrayToBlob(name, array_step, list_vec) self.saveStepTimeStamp(step) BaseZEO.singleton_base.commit() @zeoobject._transaction def pushScalarQuantity(self, val, step, name, is_integer=None, description=None): quantities = BaseZEO.singleton_base.quantities quantities.add(name) if name not in self.quantities: array_val = np.array([val]) array_step = np.array([step]) else: array_step, array_val = self.getQuantityArrayFromBlob(name) array_step = np.append(array_step, [step], axis=0) array_val = np.append(array_val, [val], axis=0) self.saveQuantityArrayToBlob(name, array_step, array_val) self.saveStepTimeStamp(step) BaseZEO.singleton_base.commit() + @zeoobject._transaction + def pushScalarQuantities(self, vals, steps, name, + is_integer=None, description=None): + quantities = BaseZEO.singleton_base.quantities + quantities.add(name) + if name not in self.quantities: + array_val = np.array(vals) + array_step = np.array(steps) + else: + array_step, array_val = self.getQuantityArrayFromBlob(name) + array_step = np.append(array_step, steps, axis=0) + array_val = np.append(array_val, vals, axis=0) + + self.saveQuantityArrayToBlob(name, array_step, array_val) + self.saveStepTimeStamp(steps[-1]) + BaseZEO.singleton_base.commit() + def getQuantityBlob(self, name): if name not in self.quantities: logger.info(f'create quantity {name}') self.quantities[name] = blob.Blob() return self.quantities[name] def getQuantityArrayFromBlob(self, name): buf = self.getQuantityBlob(name).open() # logger.error(buf.name) try: _f = np.load(buf, allow_pickle=True) except IOError as e: logger.error(e) raise RuntimeError( f"Cannot read file {buf.name} for quantity {name}") # logger.error(f'{name} {_f["step"]}') return _f['step'], _f['val'] def saveQuantityArrayToBlob(self, name, array_step, array_val): buf = self.getQuantityBlob(name).open('w') # logger.error(f'{name} {buf.name}') # logger.error(f'{name} {array_step}') np.savez_compressed(buf, val=array_val, step=array_step) def getAllVectorQuantity(self, name): quantity_id, is_integer, is_vector = self.getQuantityID( name, is_vector=True) request = """ SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) order by step """.format(self.base.schema, "vector_real" if is_integer is False else "vector_integer", self.id, quantity_id) curs = self.base.performRequest(request, [name]) fetch = curs.fetchall() if (not fetch): return [None, None] matres = np.array([val[1] for val in fetch]) stepres = np.array([val[0] for val in fetch]) return (stepres, matres) def delete(self): del self.base.runs[self.id] self.base.commit() def deleteData(self): for name in self.quantities: blob = self.getQuantityBlob(name) logger.error(dir(blob)) del self.quantities self.quantities = BTree() self.base.commit() def __init__(self, base): super().__init__(base) self.configfiles = BTree() self.quantities = BTree() # logger.error(self.quantities) base.prepare(self, 'run_desc') self['id'] = None self.types['id'] = int self.types["machine_name"] = str self.types["run_path"] = str self.allowNull["run_path"] = True self.types["job_id"] = int self.types["nproc"] = int self.types["run_name"] = str self.types["wait_id"] = int self.allowNull["wait_id"] = True self.types["start_time"] = datetime.datetime self.allowNull["start_time"] = True self.types["state"] = str self.allowNull["state"] = True self.types["exec"] = str self.types["last_step"] = int self.types["last_step_time"] = datetime.datetime self["last_step"] = None self["last_step_time"] = None self["start_time"] = None self["wait_id"] = None ################################################################ def getRunFromScript(): parser = bdparser.BDParser() parser.register_params(params={"run_id": int}) params = parser.parseBDParameters(argv=[]) mybase = base.Base(**params) runSelector = runselector.RunSelector(mybase) run_list = runSelector.selectRuns(params) if len(run_list) > 1: raise Exception(f'internal error {params}') if len(run_list) == 0: raise Exception(f'internal error {params}') myrun, myjob = run_list[0] # myrun.setEntries(params) return myrun, myjob diff --git a/scripts/createJobs.py b/scripts/createJobs.py index 8976c4c..50cb946 100644 --- a/scripts/createJobs.py +++ b/scripts/createJobs.py @@ -1,55 +1,33 @@ # First we need to set the python headers # and to import the \blackdynamite modules import BlackDynamite as BD import yaml +import numpy as np fname = 'bd.yaml' with open(fname) as f: config = yaml.load(f, Loader=yaml.SafeLoader) # Then you have to create a generic black dynamite parser # and parse the system (including the connection parameters and credentials) parser = BD.bdparser.BDParser() params = parser.parseBDParameters() params['study'] = config['study'] # Then we can connect to the black dynamite database base = BD.base.Base(**params) # create of job object job = base.Job(base) # specify a range of jobs -for param, _space in config['job_space'].items(): - if isinstance(_space, str): - _space = [_space] - elif not isinstance(_space, list): - _space = [_space] +for param, space in config['job_space'].items(): + if isinstance(space, str): + space = eval(space) - space = [] - for e in _space: - try: - space.append(job.types[param](e)) - continue - except ValueError: - pass - - if isinstance(e, str): - try: - e = job.types[param](eval(e)) - if not isinstance(e, job.types[param]): - raise RuntimeError( - f"cannot parse space range for param {param}: {e}") - space.append(e) - except ValueError: - raise RuntimeError( - f"cannot parse space range for param {param}: {e}") - - else: - # print(type(e)) - raise RuntimeError( - f"cannot parse space range for param {param}: {e}") + if isinstance(space, np.ndarray): + space = list(space) job[param] = space # creation of the jobs on the database base.createParameterSpace(job)