diff --git a/BlackDynamite/base_zeo.py b/BlackDynamite/base_zeo.py index 9a5d216..5ae13bc 100644 --- a/BlackDynamite/base_zeo.py +++ b/BlackDynamite/base_zeo.py @@ -1,355 +1,358 @@ #!/usr/bin/env python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . ################################################################ from . import bdparser from . import bdlogging from . import base from . import conffile_zeo from . import zeoobject from . import lowercase_btree from .constraints_zeo import ZEOconstraints ################################################################ import re import os import subprocess import ZEO import ZODB import sys from BTrees.OOBTree import OOSet from . import job from . import run_zeo ################################################################ __all__ = ["BaseZEO"] print = bdlogging.invalidPrint logger = bdlogging.getLogger(__name__) PBTree = lowercase_btree.PersistentLowerCaseBTree ################################################################ class BaseZEO(base.AbstractBase): """ """ singleton_base = None # def __del__(self): # if hasattr(self, 'process'): # self.process.kill() def __init__(self, truerun=False, creation=False, read_only=False, **kwargs): BaseZEO.singleton_base = self self.Job = job.JobZEO self.Run = run_zeo.RunZEO self.ConfFile = conffile_zeo.ConfFile self.BDconstraints = ZEOconstraints zeo_params = ["host"] connection_params = bdparser.filterParams(zeo_params, kwargs) logger.info('connection arguments: {0}'.format(connection_params)) self.filename = connection_params['host'] filename_split = self.filename.split('://') if filename_split[0] != 'zeo': raise RuntimeError( f"wrong protocol with this database: {type(self)}") self.filename = filename_split[1] # logger.error(self.filename) dirname = os.path.dirname(self.filename) # logger.error(dirname) if dirname == '': dirname = './' socket_name = os.path.join(dirname, 'zeo.socket') zeo_server_conf_filename = os.path.join(dirname, 'zeo.conf') # logger.error(socket_name) # logger.error(zeo_server_conf_filename) if not os.path.exists(socket_name): zeo_server_conf = f''' address ./zeo.socket path bd.zeo blob-dir bd.blob path zeo.log format %(asctime)s %(message)s ''' with open(zeo_server_conf_filename, 'w') as f: f.write(zeo_server_conf) cmd = "runzeo -C zeo.conf" logger.error(cmd) self.process = subprocess.Popen( cmd, shell=True, cwd=dirname) # stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: self.connection = ZEO.connection( socket_name, read_only=read_only, server_sync=True, blob_dir=os.path.join(dirname, 'bd-client.blob')) self.root = self.connection.root logger.debug('connected to base') except Exception as e: logger.error( "Connection failed: check your connection settings:\n" + str(e)) sys.exit(-1) assert(isinstance(self.connection, ZODB.Connection.Connection)) self.dbhost = (kwargs["host"] if "host" in kwargs.keys() else "localhost") super().__init__(connection=self.connection, truerun=truerun, creation=creation, **kwargs) def getSchemaList(self, filter_names=True): try: schemas = self.root.schemas except AttributeError: self.root.schemas = PBTree(key_string='study_') schemas = self.root.schemas filtered_schemas = [] if filter_names is True: for s in schemas: m = re.match('{0}_(.+)'.format(self.user), s) if m: s = m.group(1) filtered_schemas.append(s) else: filtered_schemas = schemas return filtered_schemas def getStudySize(self, study): curs = self.connection.cursor() try: logger.info(study) curs.execute(""" select sz from (SELECT SUM(pg_total_relation_size(quote_ident(schemaname) || '.' || quote_ident(tablename)))::BIGINT FROM pg_tables WHERE schemaname = '{0}') as sz """.format(study)) size = curs.fetchone()[0] curs.execute(""" select pg_size_pretty(cast({0} as bigint)) """.format(size)) size = curs.fetchone()[0] curs.execute(""" select count({0}.runs.id) from {0}.runs """.format(study)) nruns = curs.fetchone()[0] curs.execute(""" select count({0}.jobs.id) from {0}.jobs """.format(study)) njobs = curs.fetchone()[0] except psycopg2.ProgrammingError: self.connection.rollback() size = '????' return {'size': size, 'nruns': nruns, 'njobs': njobs} def createSchema(self, params={"yes": False}): # create the schema of the simulation if not hasattr(self.root, 'schemas'): self.root.schemas = PBTree(key_string='study_') if self.schema in self.root.schemas: validated = bdparser.validate_question( "Are you sure you want to drop the schema named '" + self.schema + "'", params, False) if validated is True: # logger.error(self.root.schemas[self.schema]) # logger.error(type(self.root.schemas[self.schema])) del self.root.schemas[self.schema] else: logger.debug("creation canceled: exit program") sys.exit(-1) self.root.schemas[self.schema] = PBTree() self.root.schemas[self.schema]['Quantities'] = OOSet() self.root.schemas[self.schema]['Jobs'] = PBTree(key_string='job_') self.root.schemas[self.schema]['Runs'] = PBTree(key_string='run_') self.root.schemas[self.schema]['Jobs_counter'] = 1 self.root.schemas[self.schema]['Runs_counter'] = 1 def prepare(self, obj, descriptor): if not hasattr(self.root, 'schemas'): return if descriptor in self.root.schemas[self.schema]: desc = self.root.schemas[self.schema][descriptor] for t in desc.types.keys(): obj.types[t] = desc.types[t] if t not in obj: obj.t = None def createBase(self, job_desc, run_desc, quantities={}, **kwargs): self.createSchema(kwargs) self.root.schemas[self.schema]['job_desc'] = job_desc self.root.schemas[self.schema]['run_desc'] = run_desc for qname, type in quantities.items(): self.pushQuantity(qname, type) if self.truerun: self.commit() def _get_jobs(self): return self.root.schemas[self.schema]['Jobs'] @property def jobs(self): return self._get_jobs() @jobs.setter def jobs(self, val): self.root.schemas[self.schema]['Jobs'] = val @property def quantities(self): return self.root.schemas[self.schema]['Quantities'] @quantities.setter def quantities(self, value): self.root.schemas[self.schema]['Quantities'] = value @property def jobs_counter(self): return self.root.schemas[self.schema]['Jobs_counter'] @jobs_counter.setter def jobs_counter(self, val): self.root.schemas[self.schema]['Jobs_counter'] = val def _get_runs(self): return self.root.schemas[self.schema]['Runs'] @property def runs(self): return self._get_runs() @runs.setter def runs(self, val): self.root.schemas[self.schema]['Runs'] = val @property def runs_counter(self): return self.root.schemas[self.schema]['Runs_counter'] @runs_counter.setter def runs_counter(self, val): self.root.schemas[self.schema]['Runs_counter'] = val def select(self, _types, constraints=None, sort_by=None): if not isinstance(_types, list): _types = [_types] _type = _types[0] if isinstance(_type, zeoobject.ZEOObject): _type = type(_type) if _type == self.Job: obj_container = self._get_jobs() elif _type == self.Run: obj_container = self._get_runs() else: raise RuntimeError(f'{type(_types)}') if (sort_by is not None) and (not isinstance(sort_by, str)): raise RuntimeError( 'sort_by argument is not correct: {0}'.format(sort_by)) # logger.error(_type) # logger.error(type(constraints)) # logger.error(constraints) if isinstance(constraints, zeoobject.ZEOObject): if hasattr(constraints, 'id') and constraints.id is not None: obj = obj_container[constraints.id] if isinstance(obj, self.Run): obj = (obj, self._get_jobs()[obj.job_id]) return [obj] const = ZEOconstraints(self, constraints) condition = const.getMatchingCondition() obj_list = [] for key, obj in obj_container.items(): obj.base = self objs = [obj] # logger.error(type(obj)) # logger.error(obj.id) if _type == self.Run: j = self._get_jobs()[obj.job_id] j.base = self objs.append(j) if condition(objs): # logger.error(key) # logger.error(obj) # logger.error(_type) if len(objs) == 1: obj_list.append(objs[0]) else: obj_list.append(objs) return obj_list def insert(self, zeoobject, keep_state=False): if isinstance(zeoobject, self.Job): objs = self._get_jobs() zeoobject = zeoobject.copy() if not keep_state: zeoobject['id'] = self.jobs_counter self.jobs_counter += 1 elif isinstance(zeoobject, self.Run): objs = self._get_runs() zeoobject = zeoobject.copy() if not keep_state: zeoobject["id"] = self.runs_counter zeoobject["state"] = 'CREATED' self.runs_counter += 1 else: raise RuntimeError( f'cannot insert object of type {type(zeoobject)}') objs[zeoobject.id] = zeoobject.copy() # logger.error(f'inserted {zeoobject.id} {objs[zeoobject.id]}') def setObjectItemTypes(self, zeoobject): if isinstance(zeoobject, self.Job): zeoobject.types = self.root.schemas[self.schema]['job_desc'].types elif isinstance(zeoobject, self.Run): zeoobject.types = self.root.schemas[self.schema]['run_desc'].types else: raise RuntimeError(f'{type(zeoobject)}') def commit(self): import transaction transaction.commit() + def pack(self): + self.connection.db().pack() + def close(self): import transaction transaction.abort() ################################################################ diff --git a/BlackDynamite/graphhelper.py b/BlackDynamite/graphhelper.py index d037ce6..224666f 100755 --- a/BlackDynamite/graphhelper.py +++ b/BlackDynamite/graphhelper.py @@ -1,389 +1,391 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- # -*- py-which-shell: "python"; -*- # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . ################################################################ from . import runselector from . import bdparser import re import sys import numpy as np ################################################################ class GraphHelper(object): """ """ def getMeasures(self, run_list): myresults = [] add_req = [] if (self.frequency): add_req += ["step %% {0} = 0".format(self.frequency)] if (self.start): add_req += ["step > {0}".format(self.start)] if (self.end): add_req += ["step < {0}".format(self.end)] for r, j in run_list: # print ("retrieve data from run " + r["run_name"]) res = r.getScalarQuantities(self.quantities, add_req) + if res is None: + continue for key, step, value in res: if value is None: del res[key] myresults.append([r, j, res]) return myresults def selectGraphs(self): run_list = self.runSelector.selectRuns(self.constraints, self.sort_by) results = self.getMeasures(run_list) return results def show(self): import matplotlib.pyplot as plt plt.show() def makeGraphs(self, fig=None, **kwargs): import matplotlib.pyplot as plt results = self.selectGraphs() if fig is None: fig = plt.figure(figsize=self.figsize) for r, j, data in results: if data: self.makeCurve(data, fig=fig, myrun=r, myjob=j, **kwargs) return fig def replaceRunAndJobsParameters(self, name, myrun, myjob): res = name # print (res) codes = [["%r." + key, myrun[key]] for key in myrun.entries.keys()] codes += [["%j." + key, myjob[key]] for key in myjob.entries.keys()] for code, val in codes: res = res.replace(code, str(val)) return res def generateLabels(self, results, myrun, myjob): labels = [] names = [r[0] for r in results] for i in range(0, len(results)): name = results[i][0] if (not self.legend or i >= len(self.legend) or not self.legend[i]): labels.append( self.replaceRunAndJobsParameters(name, myrun, myjob) ) continue # print (self.legend[i]) head_legend = self.legend[i].replace("{", "{{") head_legend = head_legend.replace("}", "}}") head_legend = re.sub(r"(%)([0-9]+)", r'{\2}', head_legend).format( *names) # print (head_legend) head_legend = self.replaceRunAndJobsParameters( head_legend, myrun, myjob) # print (head_legend) # if (not head_legend.find("%") == -1): # print("unknown variable name. Possible variables are:") # print "\n".join([c[0] for c in codes]) # sys.exit(-1) # print (head_legend) labels.append(head_legend) return labels def makeComposedQuantity(self, results, myrun, myjob): vecs = [r[1] for r in results] names = [r[0] for r in results] # print (vecs[0].shape) new_results = [] for comp in self.using: exprs = comp.split(":") tmp_res = [] for i in [0, 1]: e = re.sub(r"(%)([0-9]+)\.(x)", r"vecs[\2][:,0]", exprs[i]) e = re.sub(r"(%)([0-9]+)\.(y)", r"vecs[\2][:,1]", e) e = self.replaceRunAndJobsParameters(e, myrun, myjob) try: tmp_res.append(eval(e)) except Exception as ex: print(names) print("invalid expression: '" + exprs[i] + "'") print("invalid expression: '" + e + "'") print(ex) i = 1 for v in vecs: print('quantity {0}/{1} shape: {2}'.format( i, len(vecs), v.shape)) i += 1 sys.exit(-1) name = re.sub(r"(%)([0-9]+)\.([x|y])", r'(" + str(names[\2]) + ")', exprs[1]) res = np.zeros((tmp_res[0].shape[0], 2)) res[:, 0] = tmp_res[0] res[:, 1] = tmp_res[1] # print (res.shape) # expr = re.sub(r"(%)([0-9]+)", r"vecs[\2]", comp) # res[0] = eval(expr) # print (name) name = "\"" + name + "\"" # print (name) name = eval(name) # print (name) new_results.append([name, res]) return new_results def decorateGraph(self, fig, myrun, myjob, results): if not results: return if fig is None: import matplotlib.pyplot as plt fig = plt.figure() axe = fig.add_subplot(1, 1, 1) if self.xrange: axe.set_xlim(self.xrange) if self.yrange: axe.set_ylim(self.yrange) if (self.xlabel): axe.set_xlabel(self.xlabel) if (self.ylabel): axe.set_ylabel(self.ylabel) if (self.title): t = self.replaceRunAndJobsParameters(self.title, myrun, myjob) axe.set_title(t) axe.grid(True, linewidth=0.1) if self.using: results = self.makeComposedQuantity(results, myrun, myjob) labels = self.generateLabels(results, myrun, myjob) # print (labels) return fig, axe, results, labels def makeCurve(self, results, myrun=None, myjob=None, fig=None, **kwargs): fig, axe, results, labels = self.decorateGraph( fig, myrun, myjob, results) for count, result in enumerate(results): # name = result[0] step = result[1] vec = result[2] label = labels[count] # print (self.quantities) # print (name) style = dict() if (self.marker is not None): style["marker"] = self.marker if self.blackwhite: width_index = self.cycle_index/len(self.linestyle_cycle) style_index = self.cycle_index % len(self.linestyle_cycle) self.cycle_index += 1 style["linewidth"] = self.linewidth_cycle[width_index] style["linestyle"] = self.linestyle_cycle[style_index] style["color"] = 'k' axe.plot(step/self.xscale, vec/self.yscale, label=label, **style) axe.legend(loc='best') if (self.fileout): fig.savefig(self.fileout) return fig def setConstraints(self, **params): self.constraints = [] if "constraints" in params: self.constraints = params["constraints"] def setBinaryOperator(self, **params): self.binary_operator = 'and' if ("binary_operator" in params): self.binary_operator = params["binary_operator"] def setQuantity(self, **params): if ("quantity" in params): self.quantities = params["quantity"] else: print("quantity should be provided using option --quantity") self.quantities = "__BLACKDYNAMITE_ERROR__" def __init__(self, base, **params): self.setConstraints(**params) self.setQuantity(**params) self.base = base self.runSelector = runselector.RunSelector(self.base) self.fig = None self.xrange = None self.yrange = None self.xlabel = None self.ylabel = None self.xscale = None self.yscale = None self.fileout = None self.title = None self.using = None self.frequency = None self.start = None self.end = None self.figsize = None self.blackwhite = None self.legend = None self.sort_by = None self.marker = None # set the members if keys are present in params members = set(self.__dict__.keys()) p = set(params.keys()) for key in members & p: setattr(self, key, params[key]) if params["list_quantities"] is True: myrun = base.Run(base) print("list of possible quantities:\n") print("\n".join(myrun.listQuantities())) sys.exit(0) if params["list_parameters"] is True: self.base.getPossibleParameters() sys.exit(0) self.linewidth_cycle = [1, 2, 4] self.linestyle_cycle = ['-', '--', '-.'] self.cycle_index = 0 ################################################################ class GraphParser(bdparser.BDParser): """ """ def __init__(self): bdparser.BDParser.__init__(self) self.admissible_params["quantity"] = [str] self.help["quantity"] = "Specify the quantity to be outputed" self.admissible_params["xrange"] = [float] self.help["xrange"] = "Specify range of values in the X direction" self.admissible_params["yrange"] = [float] self.help["yrange"] = "Specify range of values in the Y direction" self.admissible_params["sort_by"] = [str] self.help["sort_by"] = ( "Specify a study parameter to be used in sorting the curves") self.admissible_params["xlabel"] = str self.help["xlabel"] = "Specify the label for the X axis" self.admissible_params["ylabel"] = str self.help["ylabel"] = "Specify the label for the Y axis" self.admissible_params["xscale"] = float self.default_params["xscale"] = 1. self.help["xscale"] = "Specify a scale factor for the X axis" self.admissible_params["yscale"] = float self.default_params["yscale"] = 1. self.help["yscale"] = "Specify a scale factor for the Y axis" self.admissible_params["title"] = str self.help["title"] = "Specify title for the graph" self.admissible_params["legend"] = [str] self.help["legend"] = ( "Specify a legend for the curves." " The syntax can use %%j.param or %%r.param to use" " get job and run values") self.default_params["legend"] = None self.admissible_params["using"] = [str] self.help["using"] = ( "Allow to combine several quantities. " "The syntax uses python syntax where " "%%quantity1.column1:%%quantity2.column2 is the python " "numpy vector provided by quantity number (provided using the " "--quantities option) and column number (x or y). " "The sytax is comparable to the GNUPlot one in using the ':' " "to separate X from Y axis") self.admissible_params["list_quantities"] = bool self.help["list_quantities"] = ( "Request to list the possible quantities to be plotted") self.admissible_params["list_parameters"] = bool self.help["list_parameters"] = ( "Request to list the possible job/run parameters") self.admissible_params["frequency"] = int self.default_params["frequency"] = 1 self.help["frequency"] = ( "Set a frequency at which the quantity values " "should be retreived " "(helpful when the amount of data is very large)") self.admissible_params["start"] = float self.help["start"] = "Set the start X value for the graph" self.admissible_params["end"] = int self.help["end"] = "Set the end X value for the graph" self.admissible_params["figsize"] = [float] self.admissible_params["blackwhite"] = bool self.default_params["blackwhite"] = False self.help["blackwhite"] = "Request a black and white graph generation" self.default_params["blackwhite"] = False self.help["blackwhite"] = "Request to plot a black and white graph" self.admissible_params["marker"] = str self.help["marker"] = "Request a specific marker (matplotlib option)" self.admissible_params["fileout"] = str self.help["fileout"] = ( 'Request to write a PDF file' ' (given its name) containing the graph') self.group_params["GraphHelper"] = [ "quantity", "xrange", "yrange", "sort_by", "xlabel", "ylabel", "xscale", "yscale", "title", "legend", "using", "list_quantities", "list_parameters", "frequency", "start", "end", "figsize", "blackwhite", "marker", "fileout"] ################################################################ __all__ = ["GraphHelper", "GraphParser"] ################################################################ diff --git a/BlackDynamite/run_zeo.py b/BlackDynamite/run_zeo.py index f8d4685..b61987a 100755 --- a/BlackDynamite/run_zeo.py +++ b/BlackDynamite/run_zeo.py @@ -1,391 +1,394 @@ #!/usr/bin/env python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . ################################################################ import BTrees from . import conffile_zeo from . import zeoobject from . import bdparser from . import base from .base_zeo import BaseZEO from . import runselector from . import bdlogging import ZODB.blob as blob ################################################################ import numpy as np import datetime import subprocess import socket import os ################################################################ __all__ = ['RunZEO', 'getRunFromScript'] print = bdlogging.invalidPrint logger = bdlogging.getLogger(__name__) # PBTree = lowercase_btree.PersistentLowerCaseBTree BTree = BTrees.OOBTree.BTree ################################################################ class UnknownQuantity(RuntimeError): pass class RunZEO(zeoobject.ZEOObject): """ """ table_name = 'runs' def getJob(self): return BaseZEO.singleton_base.getJobFromID(self.entries["job_id"]) def start(self): # logger.error(self.entries['state']) self.entries['state'] = 'START' # logger.error(self['state']) logger.debug('starting run') BaseZEO.singleton_base.commit() logger.debug('commited') def finish(self): self.entries['state'] = 'FINISHED' logger.debug('finish run') BaseZEO.singleton_base.commit() logger.debug('commited') def attachToJob(self, job): # logger.error(f"attach job {job.id}") self["job_id"] = job.id BaseZEO.singleton_base.insert(self) def getExecFile(self): conf_exec = self.configfiles[self.exec] return self.getUpdatedConfigFile(conf_exec) def setExecFile(self, file_name, **kwargs): # check if the file is already in the config files for _id, f in self.configfiles.items(): if f.filename == file_name: self.entries["exec"] = f.id return f.id # the file is not in the current config files # so it has to be added conf = conffile_zeo.addFile( file_name, BaseZEO.singleton_base, **kwargs) self.configfiles[conf.id] = conf self.entries["exec"] = conf.id return conf.id def listFiles(self, subdir=""): """List files in run directory / specified sub-directory""" command = 'ls {0}'.format(os.path.join(self['run_path'], subdir)) if not self['machine_name'] == socket.gethostname(): command = 'ssh {0} "{1}"'.format(self['machine_name'], command) logger.info(command) p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) out = p.stdout.readlines() out = [o.strip().decode() for o in out] return out def getFile(self, filename, outpath='/tmp'): dest_path = os.path.join( outpath, "BD-" + self.base.schema + "-cache", "run-{0}".format(self.id)) dest_file = os.path.join(dest_path, filename) full_filename = self.getFullFileName(filename) # Check if file is local if os.path.isfile(full_filename): return full_filename # If file is distant, prepare cache directory hierarchy dest_path = os.path.dirname(dest_file) logger.debug('Directories: ' + dest_path) logger.debug('File: ' + dest_file) # Making directories try: os.makedirs(dest_path, exist_ok=True) except Exception as e: logger.error(e) pass if os.path.isfile(dest_file): logger.info('File {} already cached'.format(dest_file)) return dest_file cmd = 'scp {0}:{1} {2}'.format(self['machine_name'], self.getFullFileName(filename), dest_file) logger.info(cmd) p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) errors = bytes(p.stderr.read()).decode().strip() if errors: logger.warning(errors) return dest_file def getFullFileName(self, filename): return os.path.join(self['run_path'], filename) def addConfigFiles(self, file_list, regex_params=None): if not type(file_list) == list: file_list = [file_list] params_list = list(self.types.keys()) myjob = BaseZEO.singleton_base.Job(BaseZEO.singleton_base) params_list += list(myjob.types.keys()) # logger.debug (regex_params) # file_ids = [f for f in self.configfiles] files_to_add = [ conffile_zeo.addFile( fname, BaseZEO.singleton_base, regex_params=regex_params, params=params_list) for fname in file_list] for f in files_to_add: if (f.id not in self.configfiles): self.configfiles[f.id] = f BaseZEO.singleton_base.commit() return self.configfiles def getConfigFiles(self): conffiles = [self.getUpdatedConfigFile(f) for _id, f in self.configfiles.items()] return conffiles def getConfigFile(self, file_id): return self.configfiles[file_id] def replaceBlackDynamiteVariables(self, text): myjob = BaseZEO.singleton_base.Job(BaseZEO.singleton_base) myjob["id"] = self.entries["job_id"] myjob = myjob.getMatchedObjectList()[0] for key, val in myjob.entries.items(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): raise Exception("unset job parameter " + key) text = tmp for key, val in self.entries.items(): tmp = text.replace("__BLACKDYNAMITE__" + key + "__", str(val)) if ((not tmp == text) and val is None): logger.debug(self.entries) raise Exception("unset run parameter " + key) text = tmp text = text.replace("__BLACKDYNAMITE__dbhost__", BaseZEO.singleton_base.dbhost) text = text.replace("__BLACKDYNAMITE__study__", BaseZEO.singleton_base.schema) text = text.replace("__BLACKDYNAMITE__run_id__", str(self.id)) return text def getUpdatedConfigFile(self, conf): conf["file"] = self.replaceBlackDynamiteVariables(conf["file"]) return conf def listQuantities(self): return BaseZEO.singleton_base.quantities def getLastStep(self): if 'last_step' in self.entries: return self.last_step, self.last_step_time else: return None, None def getScalarQuantity(self, name, additional_request=None): if name not in self.quantities: raise UnknownQuantity( f"for run {self}\n" f"unknown quantity '{name}'\n" f"possible quantities are {[e for e in self.quantities.keys()]}") step, array = self.getQuantityArrayFromBlob(name) return step, array def getScalarQuantities(self, names, additional_request=None): res = [] for q in names: try: step, array = self.getScalarQuantity(q) res.append((q, step, array)) except UnknownQuantity: logger.warning(f'run {self.id} has no quantity: {q}') return None return res def getVectorQuantity(self, name, step): step_array, array = self.getQuantityArrayFromBlob(name) i = np.where(step_array == step)[0] if i.shape[0] == 0: raise RuntimeError('the step {step} could not be found') if i.shape[0] > 1: raise RuntimeError('the step {step} was registered more than once') i = i[0] return array[i] @zeoobject._transaction def saveStepTimeStamp(self, step): self.last_step = step - self.last_step_time = datetime.now() + self.last_step_time = datetime.datetime.now() @zeoobject._transaction def pushVectorQuantity(self, vec, step, name, is_integer=None, description=None): quantities = BaseZEO.singleton_base.quantities quantities.add(name) if name not in self.quantities: list_vec = np.array([vec], dtype=object) array_step = np.array([step]) else: array_step, list_vec = self.getQuantityArrayFromBlob(name) array_step = np.append(array_step, [step], axis=0) if (len(list_vec.shape) == 2 and list_vec.shape[1] != vec.shape[0]) or len(list_vec.shape) == 1: list_vec = np.array( [e for e in list_vec] + [vec], dtype=object) else: list_vec = np.append(list_vec, [vec], axis=0) self.saveQuantityArrayToBlob(name, array_step, list_vec) self.saveStepTimeStamp(step) BaseZEO.singleton_base.commit() @zeoobject._transaction def pushScalarQuantity(self, val, step, name, is_integer=None, description=None): quantities = BaseZEO.singleton_base.quantities quantities.add(name) if name not in self.quantities: array_val = np.array([val]) array_step = np.array([step]) else: array_step, array_val = self.getQuantityArrayFromBlob(name) array_step = np.append(array_step, [step], axis=0) array_val = np.append(array_val, [val], axis=0) self.saveQuantityArrayToBlob(name, array_step, array_val) self.saveStepTimeStamp(step) BaseZEO.singleton_base.commit() def getQuantityBlob(self, name): if name not in self.quantities: logger.info(f'create quantity {name}') self.quantities[name] = blob.Blob() return self.quantities[name] def getQuantityArrayFromBlob(self, name): buf = self.getQuantityBlob(name).open() # logger.error(buf.name) try: _f = np.load(buf, allow_pickle=True) except IOError as e: logger.error(e) raise RuntimeError( f"Cannot read file {buf.name} for quantity {name}") # logger.error(f'{name} {_f["step"]}') return _f['step'], _f['val'] def saveQuantityArrayToBlob(self, name, array_step, array_val): buf = self.getQuantityBlob(name).open('w') # logger.error(f'{name} {buf.name}') # logger.error(f'{name} {array_step}') np.savez_compressed(buf, val=array_val, step=array_step) def getAllVectorQuantity(self, name): quantity_id, is_integer, is_vector = self.getQuantityID( name, is_vector=True) request = """ SELECT step,measurement from {0}.{1} WHERE (run_id,quantity_id) = ({2},{3}) order by step """.format(self.base.schema, "vector_real" if is_integer is False else "vector_integer", self.id, quantity_id) curs = self.base.performRequest(request, [name]) fetch = curs.fetchall() if (not fetch): return [None, None] matres = np.array([val[1] for val in fetch]) stepres = np.array([val[0] for val in fetch]) return (stepres, matres) def delete(self): del self.base.runs[self.id] self.base.commit() def deleteData(self): + for name in self.quantities: + blob = self.getQuantityBlob(name) + logger.error(dir(blob)) del self.quantities self.quantities = BTree() self.base.commit() def __init__(self, base): super().__init__(base) self.configfiles = BTree() self.quantities = BTree() # logger.error(self.quantities) base.prepare(self, 'run_desc') self['id'] = None self.types['id'] = int self.types["machine_name"] = str self.types["run_path"] = str self.allowNull["run_path"] = True self.types["job_id"] = int self.types["nproc"] = int self.types["run_name"] = str self.types["wait_id"] = int self.allowNull["wait_id"] = True self.types["start_time"] = datetime.datetime self.allowNull["start_time"] = True self.types["state"] = str self.allowNull["state"] = True self.types["exec"] = str self.types["last_step"] = int self.types["last_step_time"] = datetime.datetime self["last_step"] = None self["last_step_time"] = None self["start_time"] = None self["wait_id"] = None ################################################################ def getRunFromScript(): parser = bdparser.BDParser() parser.register_params(params={"run_id": int}) params = parser.parseBDParameters(argv=[]) mybase = base.Base(**params) runSelector = runselector.RunSelector(mybase) run_list = runSelector.selectRuns(params) if len(run_list) > 1: raise Exception(f'internal error {params}') if len(run_list) == 0: raise Exception(f'internal error {params}') myrun, myjob = run_list[0] # myrun.setEntries(params) return myrun, myjob diff --git a/scripts/cleanRuns.py b/scripts/cleanRuns.py index f3d65a7..398a113 100755 --- a/scripts/cleanRuns.py +++ b/scripts/cleanRuns.py @@ -1,143 +1,144 @@ #!/usr/bin/env python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import BlackDynamite as BD import os import sys import socket import re import shutil ################################################################ def validate(question): if params["truerun"] is True: validated = BD.bdparser.validate_question(question, params) else: print("{0}? Forced N".format(question)) validated = False return validated parser = BD.BDParser() parser.register_params( "clearRun", params={ "runid": int, "clean_orphans": str, "machine_name": str, "constraints": [str], "delete": bool }, defaults={ "machine_name": socket.gethostname(), "delete": False, }, help={ "machine_name": "Machine name for desired runs", "delete": "Entirely remove runs from database", "runid": "ID of a specific run" } ) params = parser.parseBDParameters() if "machine_name" in params: if "constraints" in params: params["constraints"].append( "machine_name = " + params["machine_name"]) else: params["constraints"] = ["machine_name = " + params["machine_name"]] base = BD.Base(**params) runSelector = BD.RunSelector(base) if "clean_orphans" in params: run_list = runSelector.selectRuns([]) run_ids = [r.id for r, j in run_list] resdir = params["clean_orphans"] + "/BD-" + params["study"] + "-runs" print("clean orphans from " + resdir) if not os.path.exists(resdir): print("Directory '" + resdir + "' do not exists") sys.exit(-1) to_delete = {} for filename in os.listdir(resdir): fullname = os.path.join(resdir, filename) # print(fullname) if (os.path.isdir(fullname)): match = re.match("run-([0-9]+)", filename) if (match): # print(filename) id = int(match.group(1)) if (id not in run_ids): to_delete[id] = fullname if (len(to_delete.keys()) == 0): print("No orphans found") sys.exit(0) validated = validate("Delete output from runs " + str(to_delete.keys())) if (validated): for id, fullname in to_delete.items(): print("Delete output from run " + str(id)) shutil.rmtree(fullname) sys.exit(0) runSelector = BD.RunSelector(base) run_list = runSelector.selectRuns(params) if (len(run_list) == 0): print("No runs to be cleared") validated = validate("Delete runs " + str([r[0].id for r in run_list])) for r, j in run_list: delete_flag = params["delete"] if "run_path" in r: run_path = r["run_path"] else: run_path = None if run_path: if os.path.exists(run_path): if (validated): print("Deleting directory: " + run_path) shutil.rmtree(run_path) else: print("Simulate deletion of directory: " + run_path) else: print("output directory: '" + run_path + "' not found: are we on the right machine ?") if (delete_flag): if validated: print("Deleting run " + str(r.id) + " from base") r.delete() base.commit() + base.pack() else: print("Simulate deletion of run " + str(r.id) + " from base") else: if validated: print("Deleting data associated with run " + str(r.id)) r.deleteData() r["STATE"] = "CREATED" r["start_time"] = None r.update() base.commit() else: print("Simulate deletion of data associated with run " + str(r.id))