diff --git a/BlackDynamite/zeoobject.py b/BlackDynamite/zeoobject.py index 8462d51..72bd204 100644 --- a/BlackDynamite/zeoobject.py +++ b/BlackDynamite/zeoobject.py @@ -1,230 +1,230 @@ #!/usr/bin/env python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . ################################################################ from . import bdlogging from . import lowercase_btree ################################################################ import copy import re import sys import persistent import transaction from ZODB.POSException import ConflictError ################################################################ print = bdlogging.invalidPrint logger = bdlogging.getLogger(__name__) BTree = lowercase_btree._LowerCaseBTree PBTree = lowercase_btree.PersistentLowerCaseBTree ################################################################ def _transaction(foo): def _protected_transation(self, *args, **kwargs): saved_state = self.__getstate__() max_attempts = 10 attempts = 0 while True: try: foo(self, *args, **kwargs) except ConflictError as e: # logger.error('***************CONFLICT************') transaction.abort() new_state = self.__getstate__() found_diff = False for k, v in saved_state.items(): if k in ['types', 'quantities']: continue if k in new_state and v != new_state[k]: logger.error( f'attempt {attempts} diff state for key' f' {k}: {v} != {new_state[k]}') if k == 'configfiles': for f in v: logger.error(f.file) for f in new_state[k]: logger.error(f.file) found_diff = True attempts += 1 if found_diff: raise e if attempts == max_attempts: raise e else: break return _protected_transation class ZEOObject(persistent.Persistent, BTree): " The generic object related to entries in the database " def commit(self): from .base_zeo import BaseZEO BaseZEO.singleton_base.commit() def __setattr__(self, attr, value): BTree.__setattr__(self, attr, value) def setFields(self, constraints): for cons in constraints: _regex = "(\w*)\s*=\s*(.*)" match = re.match(_regex, cons) if (not match or (not len(match.groups()) == 2)): print("malformed assignment: " + cons) sys.exit(-1) key = match.group(1).lower().strip() val = match.group(2) if key not in self.types: print("unknown key '{0}'".format(key)) print("possible keys are:") for k in self.types.keys(): print("\t" + k) sys.exit(-1) val = self.types[key](val) self.entries[key] = val def __init__(self): persistent.Persistent.__init__(self) BTree.__init__(self) super().__init__() self.allowNull = {} self.types = PBTree() self.operators = {} def __getstate__(self): "Get the state of the object for a pickling operations" state = {} for k in self.__dict__.keys(): if k == 'base': continue state[k] = self.__dict__[k] return state _flag_debug = False def __setstate__(self, state): for k in state.keys(): self.__dict__[k] = state[k] if self._flag_debug: raise def copy(self): return copy.deepcopy(self) def __deepcopy__(self, memo): _cp = type(self)() for k in self.__dict__.keys(): if k == 'base': continue _cp.__dict__[k] = copy.deepcopy(self.__dict__[k]) return _cp @_transaction def update(self): from .base_zeo import BaseZEO if isinstance(self, BaseZEO.singleton_base.Job): obj_list = BaseZEO.singleton_base._get_jobs() elif isinstance(self, BaseZEO.singleton_base.Run): obj_list = BaseZEO.singleton_base._get_runs() else: raise RuntimeError("undefined yet") # logger.warning(self.id) obj = obj_list[self.id] if obj != self: logger.error(id(obj)) logger.error(id(self)) raise # logger.error(self.base) # logger.error(obj) for key, value in self.entries.items(): # logger.error(f'update {key}: {obj[key]} -> value') obj[key] = value - BaseZEO.singleton_base.commit() + # BaseZEO.singleton_base.commit() def createTableRequest(self): self.base.root.schemas[self.base.schema]['default_job'] = self def matchConstraint(self, constraint): # case it is an object of same type if isinstance(constraint, type(self)): for key, value in self.items(): if constraint[key] != value: return False return True # case it is a list/dict of constraints to evaluate else: logger.error(type(self)) logger.error(constraint) for key, value in self.items(): if key not in constraint: continue if constraint[key] != value: return False return True raise RuntimeError("toimplement") def getMatchedObjectList(self): from .base_zeo import BaseZEO return BaseZEO.singleton_base.select(self, self) def __repr__(self): type_prefix = 'object:\n' entries = self.entries keys = set(self.entries.keys()) keys.remove('id') if not len(keys): type_prefix = 'descriptor:\n' entries = self.types if not len(entries.keys()): return "Empty ZEO object" outputs = [] for k, v in sorted(entries.items()): if k == 'id' and v is None: continue outputs += [' ' + k + ": " + str(v)] return type(self).__name__ + ' ' + type_prefix + "\n".join(outputs) def get_params(self): params = tuple( [v for e, v in self.entries.items() if e != 'id']) return params def get_keys(self): keys = tuple( [e for e, v in self.entries.items() if e != 'id']) return keys def evalFunctorEntries(self): keys = self.get_keys() for k in keys: if callable(self.entries[k]): self.entries[k] = self.entries[k](self) @property def base(self): from .base_zeo import BaseZEO return BaseZEO.singleton_base diff --git a/scripts/cleanRuns.py b/scripts/cleanRuns.py index f62c596..bf138d4 100755 --- a/scripts/cleanRuns.py +++ b/scripts/cleanRuns.py @@ -1,152 +1,159 @@ #!/usr/bin/env python3 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import BlackDynamite as BD import os import sys import socket import re import shutil import yaml import tqdm ################################################################ def validate(question): if params["truerun"] is True: validated = BD.bdparser.validate_question(question, params) else: print("{0}? Forced N".format(question)) validated = False return validated parser = BD.BDParser() parser.register_params( "clearRun", params={ "runid": int, "clean_orphans": str, "machine_name": str, "constraints": [str], "delete": bool }, defaults={ "machine_name": socket.gethostname(), "delete": False, }, help={ "machine_name": "Machine name for desired runs", "delete": "Entirely remove runs from database", "runid": "ID of a specific run" } ) params = parser.parseBDParameters() fname = 'bd.yaml' with open(fname) as f: config = yaml.load(f, Loader=yaml.SafeLoader) if 'study' in config: params['study'] = config['study'] if "machine_name" in params: if "constraints" in params: params["constraints"].append( "machine_name = " + params["machine_name"]) else: params["constraints"] = ["machine_name = " + params["machine_name"]] base = BD.Base(**params) runSelector = BD.RunSelector(base) if "clean_orphans" in params: run_list = runSelector.selectRuns([]) run_ids = [r.id for r, j in run_list] resdir = params["clean_orphans"] + "/BD-" + params["study"] + "-runs" print("clean orphans from " + resdir) if not os.path.exists(resdir): print("Directory '" + resdir + "' do not exists") sys.exit(-1) to_delete = {} for filename in os.listdir(resdir): fullname = os.path.join(resdir, filename) # print(fullname) if (os.path.isdir(fullname)): match = re.match("run-([0-9]+)", filename) if (match): # print(filename) id = int(match.group(1)) if (id not in run_ids): to_delete[id] = fullname if (len(to_delete.keys()) == 0): print("No orphans found") sys.exit(0) validated = validate("Delete output from runs " + str(to_delete.keys())) if (validated): for id, fullname in to_delete.items(): print("Delete output from run " + str(id)) shutil.rmtree(fullname) sys.exit(0) runSelector = BD.RunSelector(base) run_list = runSelector.selectRuns(params, quiet=True) if (len(run_list) == 0): - print("No runs to be cleared") + print("No runs selected") + sys.exit() -validated = validate(f"Delete {len(run_list)} runs") +delete_flag = params["delete"] +if delete_flag: + validated = validate(f"Delete {len(run_list)} runs") +else: + validated = validate(f"Reset {len(run_list)} runs") for i, (r, j) in enumerate(tqdm.tqdm(run_list)): - delete_flag = params["delete"] if "run_path" in r: run_path = r["run_path"] else: run_path = None if run_path: if os.path.exists(run_path): if (validated): print("Deleting directory: " + run_path) shutil.rmtree(run_path) else: print("Simulate deletion of directory: " + run_path) else: print("output directory: '" + run_path + "' not found: are we on the right machine ?") if delete_flag: if validated: # print("Deleting run " + str(r.id) + " from base") r.delete() - base.commit() else: print("Simulate deletion of run " + str(r.id) + " from base") else: if validated: # print("Deleting data associated with run " + str(r.id)) r.deleteData() r["STATE"] = "CREATED" r["start_time"] = None r.update() - base.commit() else: print("Simulate deletion of data associated with run " + str(r.id)) + if i % 1000 == 0: + # if truerun, commit the changes to the base + if params["truerun"] is True: + base.commit() + if validated: base.pack()