diff --git a/BlackDynamite/base_zeo.py b/BlackDynamite/base_zeo.py
index 0e04341..6634c68 100644
--- a/BlackDynamite/base_zeo.py
+++ b/BlackDynamite/base_zeo.py
@@ -1,392 +1,396 @@
#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
################################################################
from . import bdparser
from . import bdlogging
from . import base
from . import conffile_zeo
from . import zeoobject
from . import lowercase_btree
from .constraints_zeo import ZEOconstraints
################################################################
import re
import os
import subprocess
import ZEO
import ZODB
import sys
from BTrees.OOBTree import OOSet, BTree
from . import job
from . import run_zeo
import psutil
################################################################
__all__ = ["BaseZEO"]
print = bdlogging.invalidPrint
logger = bdlogging.getLogger(__name__)
PBTree = lowercase_btree.PersistentLowerCaseBTree
################################################################
def check_socket(socket_name):
if not os.path.exists(socket_name):
return False
conns = psutil.net_connections(kind='all')
addrs = [s.laddr for s in conns if s.laddr != '']
for a in addrs:
if a == socket_name:
logger.info("Found already running zeo server")
return True
return False
class BaseZEO(base.AbstractBase):
"""
"""
singleton_base = None
def __init__(self, truerun=False, creation=False, read_only=False, **kwargs):
BaseZEO.singleton_base = self
self.Job = job.JobZEO
self.Run = run_zeo.RunZEO
self.ConfFile = conffile_zeo.ConfFile
self.BDconstraints = ZEOconstraints
zeo_params = ["host"]
connection_params = bdparser.filterParams(zeo_params, kwargs)
logger.info('connection arguments: {0}'.format(connection_params))
self.filename = connection_params['host']
filename_split = self.filename.split('://')
if filename_split[0] != 'zeo':
raise RuntimeError(
f"wrong protocol with this database: {type(self)}")
self.filename = filename_split[1]
dirname = os.path.dirname(self.filename)
if dirname == '':
dirname = os.path.abspath('./')
socket_name = os.path.join(dirname, 'zeo.socket')
zeo_server_conf_filename = os.path.join(dirname, 'zeo.conf')
if not check_socket(socket_name):
zeo_server_conf = f'''
address {socket_name}
path bd.zeo
blob-dir bd.blob
path zeo.log
format %(asctime)s %(message)s
'''
with open(zeo_server_conf_filename, 'w') as f:
f.write(zeo_server_conf)
cmd = "runzeo -C zeo.conf"
logger.error("Spawning new zeo server: " + cmd)
self.process = subprocess.Popen(
cmd, shell=True, cwd=dirname)
# stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
self.connection = ZEO.connection(
socket_name, read_only=read_only, server_sync=True,
blob_dir=os.path.join(dirname, 'bd.blob'),
shared_blob_dir=True,
)
self.root = self.connection.root
logger.debug('connected to base')
except Exception as e:
logger.error(
"Connection failed: check your connection settings:\n" +
str(e))
sys.exit(-1)
assert(isinstance(self.connection, ZODB.Connection.Connection))
self.dbhost = (kwargs["host"]
if "host" in kwargs.keys()
else "localhost")
super().__init__(connection=self.connection, truerun=truerun,
creation=creation, **kwargs)
def getSchemaList(self, filter_names=True):
try:
schemas = self.root.schemas
except AttributeError:
self.root.schemas = PBTree(key_string='study_')
schemas = self.root.schemas
filtered_schemas = []
if filter_names is True:
for s in schemas:
m = re.match('{0}_(.+)'.format(self.user), s)
if m:
s = m.group(1)
filtered_schemas.append(s)
else:
filtered_schemas = schemas
return filtered_schemas
def getStudySize(self, study):
curs = self.connection.cursor()
try:
logger.info(study)
curs.execute("""
select sz from (SELECT SUM(pg_total_relation_size(quote_ident(schemaname)
|| '.' || quote_ident(tablename)))::BIGINT
FROM pg_tables WHERE schemaname = '{0}') as sz
""".format(study))
size = curs.fetchone()[0]
curs.execute("""
select pg_size_pretty(cast({0} as bigint))
""".format(size))
size = curs.fetchone()[0]
curs.execute("""
select count({0}.runs.id) from {0}.runs
""".format(study))
nruns = curs.fetchone()[0]
curs.execute("""
select count({0}.jobs.id) from {0}.jobs
""".format(study))
njobs = curs.fetchone()[0]
except psycopg2.ProgrammingError:
self.connection.rollback()
size = '????'
return {'size': size, 'nruns': nruns, 'njobs': njobs}
def createSchema(self, params={"yes": False}):
# create the schema of the simulation
if not hasattr(self.root, 'schemas'):
self.root.schemas = PBTree(key_string='study_')
if self.schema in self.root.schemas:
validated = bdparser.validate_question(
"Are you sure you want to drop the schema named '" +
self.schema + "'", params, False)
if validated is True:
del self.root.schemas[self.schema]
else:
logger.debug("creation canceled: exit program")
sys.exit(-1)
self.root.schemas[self.schema] = PBTree()
self.root.schemas[self.schema]['Quantities'] = OOSet()
self.root.schemas[self.schema]['Jobs'] = PBTree(key_string='job_')
self.root.schemas[self.schema]['JobsIndex'] = BTree()
self.root.schemas[self.schema]['RunsIndex'] = BTree()
self.root.schemas[self.schema]['Runs'] = PBTree(key_string='run_')
self.root.schemas[self.schema]['ConfigFiles'] = BTree()
self.root.schemas[self.schema]['Jobs_counter'] = 1
self.root.schemas[self.schema]['Runs_counter'] = 1
def prepare(self, obj, descriptor):
if not hasattr(self.root, 'schemas'):
return
if descriptor in self.root.schemas[self.schema]:
desc = self.root.schemas[self.schema][descriptor]
for t in desc.types.keys():
obj.types[t] = desc.types[t]
if t not in obj:
obj.t = None
def createBase(self, job_desc, run_desc, quantities={}, **kwargs):
self.createSchema(kwargs)
self.root.schemas[self.schema]['job_desc'] = job_desc
self.root.schemas[self.schema]['run_desc'] = run_desc
for qname, type in quantities.items():
self.pushQuantity(qname, type)
if self.truerun:
self.commit()
@property
def configfiles(self):
return self.root.schemas[self.schema]['ConfigFiles']
def _get_jobs(self):
return self.root.schemas[self.schema]['Jobs']
@property
def jobs(self):
return self._get_jobs()
@jobs.setter
def jobs(self, val):
self.root.schemas[self.schema]['Jobs'] = val
@property
def jobs_index(self):
return self._get_jobs_index()
@jobs_index.setter
def jobs_index(self, val):
self.root.schemas[self.schema]['JobsIndex'] = val
@property
def quantities(self):
return self.root.schemas[self.schema]['Quantities']
@quantities.setter
def quantities(self, value):
self.root.schemas[self.schema]['Quantities'] = value
@property
def jobs_counter(self):
return self.root.schemas[self.schema]['Jobs_counter']
@jobs_counter.setter
def jobs_counter(self, val):
self.root.schemas[self.schema]['Jobs_counter'] = val
def _get_runs(self):
return self.root.schemas[self.schema]['Runs']
def _get_runs_index(self):
return self.root.schemas[self.schema]['RunsIndex']
def _get_jobs_index(self):
return self.root.schemas[self.schema]['JobsIndex']
@property
def runs(self):
return self._get_runs()
@runs.setter
def runs(self, val):
self.root.schemas[self.schema]['Runs'] = val
@property
def runs_counter(self):
return self.root.schemas[self.schema]['Runs_counter']
@runs_counter.setter
def runs_counter(self, val):
self.root.schemas[self.schema]['Runs_counter'] = val
def select(self, _types, constraints=None, sort_by=None):
if not isinstance(_types, list):
_types = [_types]
_type = _types[0]
if isinstance(_type, zeoobject.ZEOObject):
_type = type(_type)
if _type == self.Job:
obj_container = self._get_jobs()
elif _type == self.Run:
obj_container = self._get_runs()
else:
raise RuntimeError(f'{type(_types)}')
if (sort_by is not None) and (not isinstance(sort_by, str)):
raise RuntimeError(
'sort_by argument is not correct: {0}'.format(sort_by))
if isinstance(constraints, zeoobject.ZEOObject):
if hasattr(constraints, 'id') and constraints.id is not None:
obj = obj_container[constraints.id]
if isinstance(obj, self.Run):
obj = (obj, self._get_jobs()[obj.job_id])
return [obj]
else:
+ constraints = constraints.copy()
+ constraints.evalFunctorEntries()
params = constraints.get_params()
keys = constraints.get_keys()
n_params = len(keys)
if len(params) == n_params:
if params in self.jobs_index:
return [self.jobs[self.jobs_index[params]]]
else:
return []
const = ZEOconstraints(self, constraints)
condition = const.getMatchingCondition()
obj_list = []
for key, obj in obj_container.items():
objs = [obj]
if _type == self.Run:
j = self._get_jobs()[obj.job_id]
objs.append(j)
if condition(objs):
if len(objs) == 1:
obj_list.append(objs[0])
else:
obj_list.append(objs)
return obj_list
def insert(self, zeoobject, keep_state=False):
if isinstance(zeoobject, self.Job):
objs = self.jobs
zeoobject = zeoobject.copy()
+ zeoobject.evalFunctorEntries()
+ logger.debug(zeoobject)
if not keep_state:
zeoobject['id'] = self.jobs_counter
self.jobs_counter += 1
params = zeoobject.get_params()
self.jobs_index[params] = zeoobject['id']
elif isinstance(zeoobject, self.Run):
objs = self.runs
zeoobject = zeoobject.copy()
if not keep_state:
zeoobject["id"] = self.runs_counter
zeoobject["state"] = 'CREATED'
job_id = zeoobject['job_id']
run_id = zeoobject['id']
job = self._get_jobs()[job_id]
if not hasattr(job, 'runs'):
job.runs = PBTree(key_string='runs_')
job.runs[run_id] = zeoobject
self.runs_counter += 1
else:
raise RuntimeError(
f'cannot insert object of type {type(zeoobject)}')
objs[zeoobject.id] = zeoobject.copy()
# logger.error(f'inserted {zeoobject.id} {objs[zeoobject.id]}')
def setObjectItemTypes(self, zeoobject):
if isinstance(zeoobject, self.Job):
zeoobject.types = self.root.schemas[self.schema]['job_desc'].types
elif isinstance(zeoobject, self.Run):
zeoobject.types = self.root.schemas[self.schema]['run_desc'].types
else:
raise RuntimeError(f'{type(zeoobject)}')
def commit(self):
import transaction
transaction.commit()
def pack(self):
self.connection.db().pack()
def close(self):
import transaction
transaction.abort()
################################################################
diff --git a/BlackDynamite/zeoobject.py b/BlackDynamite/zeoobject.py
index bd70730..b932a27 100644
--- a/BlackDynamite/zeoobject.py
+++ b/BlackDynamite/zeoobject.py
@@ -1,219 +1,225 @@
#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
################################################################
from . import bdlogging
from . import lowercase_btree
################################################################
import copy
import re
import sys
import persistent
import transaction
from ZODB.POSException import ConflictError
################################################################
print = bdlogging.invalidPrint
logger = bdlogging.getLogger(__name__)
BTree = lowercase_btree._LowerCaseBTree
PBTree = lowercase_btree.PersistentLowerCaseBTree
################################################################
def _transaction(foo):
def _protected_transation(self, *args, **kwargs):
saved_state = self.__getstate__()
max_attempts = 10
attempts = 0
while True:
try:
foo(self, *args, **kwargs)
except ConflictError as e:
# logger.error('***************CONFLICT************')
transaction.abort()
new_state = self.__getstate__()
found_diff = False
for k, v in saved_state.items():
if k in ['types', 'quantities']:
continue
if k in new_state and v != new_state[k]:
logger.error(
f'attempt {attempts} diff state for key {k}: {v} != {new_state[k]}')
if k == 'configfiles':
for f in v:
logger.error(f.file)
for f in new_state[k]:
logger.error(f.file)
found_diff = True
attempts += 1
if found_diff:
raise e
if attempts == max_attempts:
raise e
else:
break
return _protected_transation
class ZEOObject(persistent.Persistent, BTree):
" The generic object related to entries in the database "
def commit(self):
from .base_zeo import BaseZEO
BaseZEO.singleton_base.commit()
def __setattr__(self, attr, value):
BTree.__setattr__(self, attr, value)
def setFields(self, constraints):
for cons in constraints:
_regex = "(\w*)\s*=\s*(.*)"
match = re.match(_regex, cons)
if (not match or (not len(match.groups()) == 2)):
print("malformed assignment: " + cons)
sys.exit(-1)
key = match.group(1).lower().strip()
val = match.group(2)
if key not in self.types:
print("unknown key '{0}'".format(key))
print("possible keys are:")
for k in self.types.keys():
print("\t" + k)
sys.exit(-1)
val = self.types[key](val)
self.entries[key] = val
def __init__(self):
persistent.Persistent.__init__(self)
BTree.__init__(self)
super().__init__()
self.allowNull = {}
self.types = PBTree()
self.operators = {}
def __getstate__(self):
"Get the state of the object for a pickling operations"
state = {}
for k in self.__dict__.keys():
if k == 'base':
continue
state[k] = self.__dict__[k]
return state
_flag_debug = False
def __setstate__(self, state):
for k in state.keys():
self.__dict__[k] = state[k]
if self._flag_debug:
raise
def copy(self):
return copy.deepcopy(self)
def __deepcopy__(self, memo):
_cp = type(self)()
for k in self.__dict__.keys():
if k == 'base':
continue
_cp.__dict__[k] = copy.deepcopy(self.__dict__[k])
return _cp
@_transaction
def update(self):
from .base_zeo import BaseZEO
if isinstance(self, BaseZEO.singleton_base.Job):
obj_list = BaseZEO.singleton_base._get_jobs()
elif isinstance(self, BaseZEO.singleton_base.Run):
obj_list = BaseZEO.singleton_base._get_runs()
else:
raise RuntimeError("undefined yet")
obj = obj_list[self.id]
# logger.error(self.base)
# logger.error(obj)
for key, value in self.entries.items():
# logger.error(f'update {key}: {obj[key]} -> value')
obj[key] = value
BaseZEO.singleton_base.commit()
def createTableRequest(self):
self.base.root.schemas[self.base.schema]['default_job'] = self
def matchConstraint(self, constraint):
# case it is an object of same type
if isinstance(constraint, type(self)):
for key, value in self.items():
if constraint[key] != value:
return False
return True
# case it is a list/dict of constraints to evaluate
else:
logger.error(type(self))
logger.error(constraint)
for key, value in self.items():
if key not in constraint:
continue
if constraint[key] != value:
return False
return True
raise RuntimeError("toimplement")
def getMatchedObjectList(self):
from .base_zeo import BaseZEO
return BaseZEO.singleton_base.select(self, self)
def __repr__(self):
type_prefix = 'object:\n'
entries = self.entries
keys = set(self.entries.keys())
keys.remove('id')
if not len(keys):
type_prefix = 'descriptor:\n'
entries = self.types
if not len(entries.keys()):
return "Empty ZEO object"
outputs = []
for k, v in sorted(entries.items()):
if k == 'id' and v is None:
continue
outputs += [' ' + k + ": " + str(v)]
return type(self).__name__ + ' ' + type_prefix + "\n".join(outputs)
def get_params(self):
params = tuple(
[v for e, v in self.entries.items() if e != 'id'])
return params
def get_keys(self):
keys = tuple(
[e for e, v in self.entries.items() if e != 'id'])
return keys
+ def evalFunctorEntries(self):
+ keys = self.get_keys()
+ for k in keys:
+ if callable(self.entries[k]):
+ self.entries[k] = self.entries[k](self)
+
@property
def base(self):
from .base_zeo import BaseZEO
return BaseZEO.singleton_base
diff --git a/scripts/createRuns.py b/scripts/createRuns.py
index d8640a2..181cb9e 100755
--- a/scripts/createRuns.py
+++ b/scripts/createRuns.py
@@ -1,60 +1,62 @@
#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
# First we need to set the python headers
# and to import the blackdynamite modules
import BlackDynamite as BD
import yaml
import tqdm
fname = 'bd.yaml'
with open(fname) as f:
config = yaml.load(f, Loader=yaml.SafeLoader)
# import a runparser (instead of a generic BD parser)
parser = BD.RunParser()
params = parser.parseBDParameters()
params['study'] = config['study']
# Then we can connect to the black dynamite database
base = BD.Base(**params)
# create a run object
myrun = base.Run()
# set the run parameters from the parsed entries
myrun.setEntries(params)
# add a configuration file
for f in config['config_files']:
myrun.addConfigFiles(f)
# set the entry point (executable) file
myrun.setExecFile(config['exec_file'])
# create a job selector
jobSelector = BD.JobSelector(base)
# select the jobs that should be associated with the runs about to be created
job_list = jobSelector.selectJobs(params)
# create the runs
for j in tqdm.tqdm(job_list):
for param, v in config['run_space'].items():
+ if isinstance(v, str):
+ v = eval(v)
myrun[param] = myrun.types[param](v)
myrun.attachToJob(j)
# if truerun, commit the changes to the base
if (params["truerun"] is True):
base.commit()