Page MenuHomec4science

base_zeo.py
No OneTemporary

File Metadata

Created
Sun, May 19, 01:51

base_zeo.py

#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
################################################################
from . import bdparser
from . import bdlogging
from . import base
from . import conffile_zeo
from . import zeoobject
from . import lowercase_btree
from . import runselector
from .constraints_zeo import ZEOconstraints
################################################################
import yaml
import socket
import re
import os
import pwd
import subprocess
import ZEO
import ZODB
import sys
from BTrees.OOBTree import OOSet, BTree
from . import job
from . import run_zeo
import psutil
################################################################
__all__ = ["BaseZEO"]
print = bdlogging.invalidPrint
logger = bdlogging.getLogger(__name__)
PBTree = lowercase_btree.PersistentLowerCaseBTree
################################################################
def check_file_socket(socket_name):
if not os.path.exists(socket_name):
return False
conns = psutil.net_connections(kind='all')
addrs = [s.laddr for s in conns if s.laddr != '']
for a in addrs:
if a == socket_name:
logger.error("Found already running zeo server")
return True
return False
def check_tcp_socket(socket_name):
return False
class BaseZEO(base.AbstractBase):
"""
"""
singleton_base = None
def __init__(self, truerun=False, read_only=False, spawn_server=False,
**kwargs):
BaseZEO.singleton_base = self
self.Job = job.JobZEO
self.Run = run_zeo.RunZEO
self.ConfFile = conffile_zeo.ConfFile
self.BDconstraints = ZEOconstraints
logger.debug('connection arguments: {0}'.format(kwargs))
zeo_params = ["host", "creation", "port"]
connection_params = bdparser.filterParams(zeo_params, kwargs)
logger.debug('connection arguments: {0}'.format(connection_params))
self.read_only = read_only
self.setConfHost(connection_params['host'])
# self.checkActualConfig()
if spawn_server is True:
self.create_tcp_socket(connection_params)
return
self.connectToSocket(connection_params)
super().__init__(connection=self.connection, truerun=truerun,
**kwargs)
def checkActualConfig(self):
from html.parser import HTMLParser
class MyHTMLParser(HTMLParser):
def __init__(self):
self._data = {}
self._current_tags = []
super().__init__()
def handle_starttag(self, tag, attrs):
self._current_tags.append(tag)
def handle_endtag(self, tag):
self._current_tags.pop()
def handle_data(self, data):
if not self._current_tags:
return
_key = ".".join(self._current_tags)
self._data[_key] = data.strip()
parser = MyHTMLParser()
try:
conf = open(self.zeo_conf).read()
# logger.error(conf)
parser.feed(conf)
logger.error(parser._data['zeo'])
except FileNotFoundError:
pass
def setConfPaths(self, root_dir, creation=False, **kwargs):
self.root_dir = os.path.realpath(root_dir)
if creation and not os.path.exists(self.root_dir):
os.mkdir(self.root_dir)
elif not os.path.exists(self.root_dir):
raise RuntimeError(
f"{os.getcwd()} is not a blackdynamite directory")
self.zeo_conf = os.path.join(self.root_dir, 'zeo.conf')
self.zeo_db = os.path.join(self.root_dir, 'bd.zeo')
self.zeo_log = os.path.join(self.root_dir, 'zeo.log')
self.zeo_socket = os.path.join(self.root_dir, 'zeo.socket')
self.zeo_blob = os.path.join(self.root_dir, 'bd.blob')
self.zdaemon_socket = os.path.join(self.root_dir, 'zdaemon.socket')
self.zdaemon_conf = os.path.join(self.root_dir, 'zdaemon.conf')
def setConfHost(self, host):
# several possibilities for host = connection_params['host']
# 1) zeo://existing_directory_path
# 2) zeo://hostname:port
protocol, addr = host.split('://')
if protocol != 'zeo':
raise RuntimeError(
f"wrong protocol with this database: {type(self)}")
if os.path.isdir(addr):
self.setConfPaths(os.path.join(addr, '.bd'))
return
host_port = addr.split(':')
if len(host_port) == 2:
self.dbhost = host_port[0]
self.port = host_port[1]
return
raise RuntimeError(f'could not understand host: {host}')
def createZEOconfig(self):
socket_name = self.getSocketName()
if isinstance(socket_name, tuple):
socket_name = socket_name[0] + ':' + str(socket_name[1])
logger.error(socket_name)
zeo_server_conf = f'''
<zeo>
address {socket_name}
</zeo>
<filestorage>
path {self.zeo_db}
blob-dir {self.zeo_blob}
</filestorage>
<eventlog>
<logfile>
path {self.zeo_log}
format %(asctime)s %(message)s
</logfile>
</eventlog>
'''
with open(self.zeo_conf, 'w') as f:
f.write(zeo_server_conf)
def getSocketName(self):
if hasattr(self, 'dbhost'):
return self.dbhost, self.port
else:
return self.zeo_socket
def create_socket(self):
self.createZEOconfig()
socket_name = self.getSocketName()
if isinstance(socket_name, tuple):
raise RuntimeError("cannot create a server from client commands")
else:
cmd = f"runzeo -C {self.zeo_conf}"
logger.warning("Spawning new zeo server: " + cmd)
self.process = subprocess.Popen(cmd, shell=True, cwd=self.root_dir)
def create_tcp_socket(self, connection_params):
self.setConfPaths('./.bd')
self.createZEOconfig()
socket_name = self.getSocketName()
if not isinstance(socket_name, tuple):
logger.error(socket_name)
raise RuntimeError(
f"this method should be called only to create a daemon: {socket_name}")
conf = f'''
<runner>
program runzeo -C {self.zeo_conf}
socket-name {self.zdaemon_socket}
</runner>
'''
logger.error(self.zdaemon_conf)
with open(self.zdaemon_conf, 'w') as f:
f.write(conf)
cmd = f"zdaemon -C {self.zdaemon_conf} start"
logger.error("Spawning new zeo server: " + cmd)
subprocess.call(cmd, shell=True)
def buildConnection(self):
socket_name = self.getSocketName()
logger.debug(socket_name)
self.setConfPaths('./.bd')
self.connection = ZEO.connection(
socket_name, read_only=self.read_only,
server_sync=False,
blob_dir=self.zeo_blob,
shared_blob_dir=True,
)
self.root = self.connection.root
logger.debug('connected to base')
assert(isinstance(self.connection, ZODB.Connection.Connection))
def connectToSocket(self, connection_params):
if hasattr(self, 'dbhost'):
self.dbhost = connection_params['host'].split('://')[1]
if len(self.dbhost.split(':')) == 2:
self.dbhost, self.port = self.dbhost.split(':')
self.port = int(self.port)
else:
if not check_file_socket(self.getSocketName()):
self.create_socket()
self.buildConnection()
def getSchemaList(self, filter_names=True):
try:
schemas = self.root.schemas
except AttributeError:
self.root.schemas = PBTree(key_string='study_')
schemas = self.root.schemas
filtered_schemas = []
if filter_names is True:
for s in schemas:
m = re.match('(.+)_(.+)', s)
if m:
s = m.group(2)
filtered_schemas.append(s)
else:
filtered_schemas = schemas
return filtered_schemas
def getSchemasUser(self, study_name):
try:
schemas = self.root.schemas
except AttributeError:
self.root.schemas = PBTree(key_string='study_')
schemas = self.root.schemas
for s in schemas:
m = re.match(f'(.+)_{study_name}', s)
if m:
return m.group(1)
raise RuntimeError(
f"not found study: '{study_name}' within {[e for e in schemas]}")
def getStudySize(self, study):
raise RuntimeError("to be implemented")
def createSchema(self, params={"yes": False}):
# create the schema of the simulation
if not hasattr(self.root, 'schemas'):
self.root.schemas = PBTree(key_string='study_')
if self.schema in self.root.schemas:
validated = bdparser.validate_question(
"Are you sure you want to drop the schema named '" +
self.schema + "'", params, False)
if validated is True:
del self.root.schemas[self.schema]
else:
logger.debug("creation canceled: exit program")
sys.exit(-1)
self.root.schemas[self.schema] = PBTree()
self.root.schemas[self.schema]['Quantities'] = OOSet()
self.root.schemas[self.schema]['Jobs'] = PBTree(key_string='job_')
self.root.schemas[self.schema]['JobsIndex'] = BTree()
self.root.schemas[self.schema]['RunsIndex'] = BTree()
self.root.schemas[self.schema]['Runs'] = PBTree(key_string='run_')
self.root.schemas[self.schema]['ConfigFiles'] = BTree()
self.root.schemas[self.schema]['Jobs_counter'] = 1
self.root.schemas[self.schema]['Runs_counter'] = 1
def prepare(self, obj, descriptor):
if not hasattr(self.root, 'schemas'):
return
if (self.schema in self.root.schemas and
descriptor in self.root.schemas[self.schema]):
desc = self.root.schemas[self.schema][descriptor]
for t in desc.types.keys():
obj.types[t] = desc.types[t]
if t not in obj:
obj.t = None
def createBase(self, job_desc, run_desc, quantities={}, **kwargs):
self.createSchema(kwargs)
self.root.schemas[self.schema]['job_desc'] = job_desc
self.root.schemas[self.schema]['run_desc'] = run_desc
for qname, type in quantities.items():
self.pushQuantity(qname, type)
if self.truerun:
self.commit()
@property
def configfiles(self):
return self.root.schemas[self.schema]['ConfigFiles']
def _get_jobs(self):
return self.root.schemas[self.schema]['Jobs']
@property
def jobs(self):
return self._get_jobs()
@jobs.setter
def jobs(self, val):
self.root.schemas[self.schema]['Jobs'] = val
@property
def jobs_index(self):
return self._get_jobs_index()
@jobs_index.setter
def jobs_index(self, val):
self.root.schemas[self.schema]['JobsIndex'] = val
@property
def quantities(self):
return self.root.schemas[self.schema]['Quantities']
@quantities.setter
def quantities(self, value):
self.root.schemas[self.schema]['Quantities'] = value
@property
def jobs_counter(self):
return self.root.schemas[self.schema]['Jobs_counter']
@jobs_counter.setter
def jobs_counter(self, val):
self.root.schemas[self.schema]['Jobs_counter'] = val
def _get_runs(self):
return self.root.schemas[self.schema]['Runs']
def _get_runs_index(self):
return self.root.schemas[self.schema]['RunsIndex']
def _get_jobs_index(self):
return self.root.schemas[self.schema]['JobsIndex']
@property
def runs(self):
return self._get_runs()
@runs.setter
def runs(self, val):
self.root.schemas[self.schema]['Runs'] = val
@property
def runs_counter(self):
return self.root.schemas[self.schema]['Runs_counter']
@runs_counter.setter
def runs_counter(self, val):
self.root.schemas[self.schema]['Runs_counter'] = val
def select(self, _types, constraints=None, sort_by=None):
if not isinstance(_types, list):
_types = [_types]
_type = _types[0]
if isinstance(_type, zeoobject.ZEOObject):
_type = type(_type)
if _type == self.Job:
obj_container = self._get_jobs()
elif _type == self.Run:
obj_container = self._get_runs()
else:
raise RuntimeError(f'{type(_types)}')
if (sort_by is not None) and (not isinstance(sort_by, str)):
raise RuntimeError(
'sort_by argument is not correct: {0}'.format(sort_by))
if isinstance(constraints, zeoobject.ZEOObject):
if hasattr(constraints, 'id') and constraints.id is not None:
obj = obj_container[constraints.id]
if isinstance(obj, self.Run):
obj = (obj, self._get_jobs()[obj.job_id])
return [obj]
else:
constraints = constraints.copy()
constraints.evalFunctorEntries()
params = constraints.get_params()
keys = constraints.get_keys()
n_params = len(keys)
if len(params) == n_params:
logger.debug(params)
if params in self.jobs_index:
return [self.jobs[self.jobs_index[params]]]
else:
return []
const = ZEOconstraints(self, constraints)
condition = const.getMatchingCondition()
obj_list = []
for key, obj in obj_container.items():
objs = [obj]
if _type == self.Run:
j = self._get_jobs()[obj.job_id]
objs.append(j)
if condition(objs):
if len(objs) == 1:
obj_list.append(objs[0])
else:
obj_list.append(objs)
return obj_list
def insert(self, zeoobject, keep_state=False):
if isinstance(zeoobject, self.Job):
objs = self.jobs
zeoobject = zeoobject.copy()
zeoobject.evalFunctorEntries()
logger.debug(zeoobject)
if not keep_state:
zeoobject['id'] = self.jobs_counter
self.jobs_counter += 1
params = zeoobject.get_params()
self.jobs_index[params] = zeoobject['id']
elif isinstance(zeoobject, self.Run):
objs = self.runs
zeoobject = zeoobject.copy()
if not keep_state:
zeoobject["id"] = self.runs_counter
zeoobject["state"] = 'CREATED'
job_id = zeoobject['job_id']
run_id = zeoobject['id']
job = self._get_jobs()[job_id]
if not hasattr(job, 'runs'):
job.runs = PBTree(key_string='runs_')
job.runs[run_id] = zeoobject
self.runs_counter += 1
else:
raise RuntimeError(
f'cannot insert object of type {type(zeoobject)}')
objs[zeoobject.id] = zeoobject.copy()
def setObjectItemTypes(self, zeoobject):
if isinstance(zeoobject, self.Job):
zeoobject.types = self.root.schemas[self.schema]['job_desc'].types
elif isinstance(zeoobject, self.Run):
zeoobject.types = self.root.schemas[self.schema]['run_desc'].types
else:
raise RuntimeError(f'{type(zeoobject)}')
def commit(self):
import transaction
transaction.commit()
def pack(self):
self.connection.db().pack()
def close(self):
import transaction
transaction.abort()
def get_state_summary(self, params=[]):
runSelector = runselector.RunSelector(self)
run_list = runSelector.selectRuns(params, quiet=True)
_stats = {}
for r, j in run_list:
if (r.run_name, r.state) not in _stats:
_stats[(r.run_name, r.state)] = 0
_stats[(r.run_name, r.state)] += 1
stats = {}
for k, v in _stats.items():
run_name = k[0]
state = k[1]
count = v
if run_name not in stats:
stats[run_name] = []
stats[run_name].append((state, count))
return stats
def retreiveSchemaName(self, creation=False, **kwargs):
# Need this because getSchemaList strips prefix
match = re.match('(.+)_(.+)', kwargs["study"])
if match:
self.schema = kwargs["study"]
study_name = match.group(2)
else:
try:
study_name = kwargs["study"]
self.schema = self.getSchemasUser(
kwargs["study"]) + '_' + kwargs["study"]
except RuntimeError as e:
if creation is False:
raise e
detected_user = pwd.getpwuid(os.getuid())[0]
self.schema = detected_user + '_' + kwargs["study"]
# logger.error(self.schema)
if ((creation is not True) and
(study_name not in self.getSchemaList())):
logger.error(study_name)
raise RuntimeError(
f"Study name '{study_name}' invalid: "
f"possibilities are {self.getSchemaList()}")
def manualLaunch(self, job, run, run_name='manual', nproc=1, **params):
from . import jobselector
from . import runselector
n_insertion = self.createParameterSpace(job)
logger.info(f"Inserted {n_insertion} new jobs")
jobSelector = jobselector.JobSelector(self)
job_list = jobSelector.selectJobs(job, quiet=True)
if len(job_list) != 1:
logger.error(
'For a manual launch all parameters of jobs '
'have to be specified')
else:
job = job_list[0]
logger.debug(job)
run['run_name'] = run_name
run['nproc'] = nproc
run['machine_name'] = socket.gethostname()
fname = 'bd.yaml'
with open(fname) as f:
config = yaml.load(f, Loader=yaml.SafeLoader)
# add a configuration file
for f in config['config_files']:
run.addConfigFiles(f)
# set the entry point (executable) file
run.setExecFile(config['exec_file'])
runSelector = runselector.RunSelector(self)
run_list = runSelector.selectRuns(run, quiet=True)
logger.debug(job)
logger.debug(run)
already_created = False
if len(run_list) > 0:
for r, j in run_list:
if j.id != job.id:
continue
logger.info([e for e in r.configfiles])
logger.info([e for e in run.configfiles])
if [e for e in r.configfiles] != [e for e in run.configfiles]:
continue
if r['state'] == 'FINISHED':
logger.warning(
'Exact same run was already executed: not re-running')
return r, j
else:
run = r
already_created = True
break
if already_created is False:
run.attachToJob(job_list[0])
self.commit()
run_list = runSelector.selectRuns(run, quiet=True)
run = run_list[0][0]
if 'outpath' not in params:
params['outpath'] = './'
if 'study' not in params:
params['study'] = config['study']
params['truerun'] = True
if 'generator' not in params:
from . import bdparser
parser = bdparser.BDParser()
params['generator'] = parser.loadModule({}, 'bashCoat', {})
logger.warning(params)
self.launchRuns([(run, job)], params)
return run, job
def launchRuns(self, run_list, params):
if (len(run_list) == 0):
logger.error("No runs to be launched")
mydir = os.path.join(
params["outpath"], "BD-" + params["study"] + "-runs")
if not os.path.exists(mydir):
os.makedirs(mydir)
cwd = os.getcwd()
os.chdir(mydir)
for r, j in run_list:
logger.warning(f"Dealing with job {j.id}, run {r.id}")
r["run_path"] = os.path.join(mydir, "run-" + str(r.id))
j.update()
r.update()
if not os.path.exists("run-" + str(r.id)):
os.makedirs("run-" + str(r.id))
os.chdir("run-" + str(r.id))
conffiles = r.getConfigFiles()
for conf in conffiles:
logger.warning("create file " + conf["filename"])
f = open(conf["filename"], 'w')
f.write(conf["file"])
f.close()
logger.warning("launch in '" + mydir + "/" +
"run-" + str(r.id) + "/'")
mymod = params["generator"]
logger.warning(mymod)
mymod.launch(r, params)
os.chdir("../")
os.chdir(cwd)
if (params["truerun"] is True):
self.commit()
################################################################

Event Timeline