diff --git a/BlackDynamite/base_zeo.py b/BlackDynamite/base_zeo.py
index 9a5d216..5ae13bc 100644
--- a/BlackDynamite/base_zeo.py
+++ b/BlackDynamite/base_zeo.py
@@ -1,355 +1,358 @@
#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
################################################################
from . import bdparser
from . import bdlogging
from . import base
from . import conffile_zeo
from . import zeoobject
from . import lowercase_btree
from .constraints_zeo import ZEOconstraints
################################################################
import re
import os
import subprocess
import ZEO
import ZODB
import sys
from BTrees.OOBTree import OOSet
from . import job
from . import run_zeo
################################################################
__all__ = ["BaseZEO"]
print = bdlogging.invalidPrint
logger = bdlogging.getLogger(__name__)
PBTree = lowercase_btree.PersistentLowerCaseBTree
################################################################
class BaseZEO(base.AbstractBase):
"""
"""
singleton_base = None
# def __del__(self):
# if hasattr(self, 'process'):
# self.process.kill()
def __init__(self, truerun=False, creation=False, read_only=False, **kwargs):
BaseZEO.singleton_base = self
self.Job = job.JobZEO
self.Run = run_zeo.RunZEO
self.ConfFile = conffile_zeo.ConfFile
self.BDconstraints = ZEOconstraints
zeo_params = ["host"]
connection_params = bdparser.filterParams(zeo_params, kwargs)
logger.info('connection arguments: {0}'.format(connection_params))
self.filename = connection_params['host']
filename_split = self.filename.split('://')
if filename_split[0] != 'zeo':
raise RuntimeError(
f"wrong protocol with this database: {type(self)}")
self.filename = filename_split[1]
# logger.error(self.filename)
dirname = os.path.dirname(self.filename)
# logger.error(dirname)
if dirname == '':
dirname = './'
socket_name = os.path.join(dirname, 'zeo.socket')
zeo_server_conf_filename = os.path.join(dirname, 'zeo.conf')
# logger.error(socket_name)
# logger.error(zeo_server_conf_filename)
if not os.path.exists(socket_name):
zeo_server_conf = f'''
address ./zeo.socket
path bd.zeo
blob-dir bd.blob
path zeo.log
format %(asctime)s %(message)s
'''
with open(zeo_server_conf_filename, 'w') as f:
f.write(zeo_server_conf)
cmd = "runzeo -C zeo.conf"
logger.error(cmd)
self.process = subprocess.Popen(
cmd, shell=True, cwd=dirname)
# stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
self.connection = ZEO.connection(
socket_name, read_only=read_only, server_sync=True,
blob_dir=os.path.join(dirname, 'bd-client.blob'))
self.root = self.connection.root
logger.debug('connected to base')
except Exception as e:
logger.error(
"Connection failed: check your connection settings:\n" +
str(e))
sys.exit(-1)
assert(isinstance(self.connection, ZODB.Connection.Connection))
self.dbhost = (kwargs["host"]
if "host" in kwargs.keys()
else "localhost")
super().__init__(connection=self.connection, truerun=truerun,
creation=creation, **kwargs)
def getSchemaList(self, filter_names=True):
try:
schemas = self.root.schemas
except AttributeError:
self.root.schemas = PBTree(key_string='study_')
schemas = self.root.schemas
filtered_schemas = []
if filter_names is True:
for s in schemas:
m = re.match('{0}_(.+)'.format(self.user), s)
if m:
s = m.group(1)
filtered_schemas.append(s)
else:
filtered_schemas = schemas
return filtered_schemas
def getStudySize(self, study):
curs = self.connection.cursor()
try:
logger.info(study)
curs.execute("""
select sz from (SELECT SUM(pg_total_relation_size(quote_ident(schemaname)
|| '.' || quote_ident(tablename)))::BIGINT
FROM pg_tables WHERE schemaname = '{0}') as sz
""".format(study))
size = curs.fetchone()[0]
curs.execute("""
select pg_size_pretty(cast({0} as bigint))
""".format(size))
size = curs.fetchone()[0]
curs.execute("""
select count({0}.runs.id) from {0}.runs
""".format(study))
nruns = curs.fetchone()[0]
curs.execute("""
select count({0}.jobs.id) from {0}.jobs
""".format(study))
njobs = curs.fetchone()[0]
except psycopg2.ProgrammingError:
self.connection.rollback()
size = '????'
return {'size': size, 'nruns': nruns, 'njobs': njobs}
def createSchema(self, params={"yes": False}):
# create the schema of the simulation
if not hasattr(self.root, 'schemas'):
self.root.schemas = PBTree(key_string='study_')
if self.schema in self.root.schemas:
validated = bdparser.validate_question(
"Are you sure you want to drop the schema named '" +
self.schema + "'", params, False)
if validated is True:
# logger.error(self.root.schemas[self.schema])
# logger.error(type(self.root.schemas[self.schema]))
del self.root.schemas[self.schema]
else:
logger.debug("creation canceled: exit program")
sys.exit(-1)
self.root.schemas[self.schema] = PBTree()
self.root.schemas[self.schema]['Quantities'] = OOSet()
self.root.schemas[self.schema]['Jobs'] = PBTree(key_string='job_')
self.root.schemas[self.schema]['Runs'] = PBTree(key_string='run_')
self.root.schemas[self.schema]['Jobs_counter'] = 1
self.root.schemas[self.schema]['Runs_counter'] = 1
def prepare(self, obj, descriptor):
if not hasattr(self.root, 'schemas'):
return
if descriptor in self.root.schemas[self.schema]:
desc = self.root.schemas[self.schema][descriptor]
for t in desc.types.keys():
obj.types[t] = desc.types[t]
if t not in obj:
obj.t = None
def createBase(self, job_desc, run_desc, quantities={}, **kwargs):
self.createSchema(kwargs)
self.root.schemas[self.schema]['job_desc'] = job_desc
self.root.schemas[self.schema]['run_desc'] = run_desc
for qname, type in quantities.items():
self.pushQuantity(qname, type)
if self.truerun:
self.commit()
def _get_jobs(self):
return self.root.schemas[self.schema]['Jobs']
@property
def jobs(self):
return self._get_jobs()
@jobs.setter
def jobs(self, val):
self.root.schemas[self.schema]['Jobs'] = val
@property
def quantities(self):
return self.root.schemas[self.schema]['Quantities']
@quantities.setter
def quantities(self, value):
self.root.schemas[self.schema]['Quantities'] = value
@property
def jobs_counter(self):
return self.root.schemas[self.schema]['Jobs_counter']
@jobs_counter.setter
def jobs_counter(self, val):
self.root.schemas[self.schema]['Jobs_counter'] = val
def _get_runs(self):
return self.root.schemas[self.schema]['Runs']
@property
def runs(self):
return self._get_runs()
@runs.setter
def runs(self, val):
self.root.schemas[self.schema]['Runs'] = val
@property
def runs_counter(self):
return self.root.schemas[self.schema]['Runs_counter']
@runs_counter.setter
def runs_counter(self, val):
self.root.schemas[self.schema]['Runs_counter'] = val
def select(self, _types, constraints=None, sort_by=None):
if not isinstance(_types, list):
_types = [_types]
_type = _types[0]
if isinstance(_type, zeoobject.ZEOObject):
_type = type(_type)
if _type == self.Job:
obj_container = self._get_jobs()
elif _type == self.Run:
obj_container = self._get_runs()
else:
raise RuntimeError(f'{type(_types)}')
if (sort_by is not None) and (not isinstance(sort_by, str)):
raise RuntimeError(
'sort_by argument is not correct: {0}'.format(sort_by))
# logger.error(_type)
# logger.error(type(constraints))
# logger.error(constraints)
if isinstance(constraints, zeoobject.ZEOObject):
if hasattr(constraints, 'id') and constraints.id is not None:
obj = obj_container[constraints.id]
if isinstance(obj, self.Run):
obj = (obj, self._get_jobs()[obj.job_id])
return [obj]
const = ZEOconstraints(self, constraints)
condition = const.getMatchingCondition()
obj_list = []
for key, obj in obj_container.items():
obj.base = self
objs = [obj]
# logger.error(type(obj))
# logger.error(obj.id)
if _type == self.Run:
j = self._get_jobs()[obj.job_id]
j.base = self
objs.append(j)
if condition(objs):
# logger.error(key)
# logger.error(obj)
# logger.error(_type)
if len(objs) == 1:
obj_list.append(objs[0])
else:
obj_list.append(objs)
return obj_list
def insert(self, zeoobject, keep_state=False):
if isinstance(zeoobject, self.Job):
objs = self._get_jobs()
zeoobject = zeoobject.copy()
if not keep_state:
zeoobject['id'] = self.jobs_counter
self.jobs_counter += 1
elif isinstance(zeoobject, self.Run):
objs = self._get_runs()
zeoobject = zeoobject.copy()
if not keep_state:
zeoobject["id"] = self.runs_counter
zeoobject["state"] = 'CREATED'
self.runs_counter += 1
else:
raise RuntimeError(
f'cannot insert object of type {type(zeoobject)}')
objs[zeoobject.id] = zeoobject.copy()
# logger.error(f'inserted {zeoobject.id} {objs[zeoobject.id]}')
def setObjectItemTypes(self, zeoobject):
if isinstance(zeoobject, self.Job):
zeoobject.types = self.root.schemas[self.schema]['job_desc'].types
elif isinstance(zeoobject, self.Run):
zeoobject.types = self.root.schemas[self.schema]['run_desc'].types
else:
raise RuntimeError(f'{type(zeoobject)}')
def commit(self):
import transaction
transaction.commit()
+ def pack(self):
+ self.connection.db().pack()
+
def close(self):
import transaction
transaction.abort()
################################################################
diff --git a/BlackDynamite/graphhelper.py b/BlackDynamite/graphhelper.py
index d037ce6..224666f 100755
--- a/BlackDynamite/graphhelper.py
+++ b/BlackDynamite/graphhelper.py
@@ -1,389 +1,391 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# -*- py-which-shell: "python"; -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
################################################################
from . import runselector
from . import bdparser
import re
import sys
import numpy as np
################################################################
class GraphHelper(object):
"""
"""
def getMeasures(self, run_list):
myresults = []
add_req = []
if (self.frequency):
add_req += ["step %% {0} = 0".format(self.frequency)]
if (self.start):
add_req += ["step > {0}".format(self.start)]
if (self.end):
add_req += ["step < {0}".format(self.end)]
for r, j in run_list:
# print ("retrieve data from run " + r["run_name"])
res = r.getScalarQuantities(self.quantities, add_req)
+ if res is None:
+ continue
for key, step, value in res:
if value is None:
del res[key]
myresults.append([r, j, res])
return myresults
def selectGraphs(self):
run_list = self.runSelector.selectRuns(self.constraints,
self.sort_by)
results = self.getMeasures(run_list)
return results
def show(self):
import matplotlib.pyplot as plt
plt.show()
def makeGraphs(self, fig=None, **kwargs):
import matplotlib.pyplot as plt
results = self.selectGraphs()
if fig is None:
fig = plt.figure(figsize=self.figsize)
for r, j, data in results:
if data:
self.makeCurve(data, fig=fig,
myrun=r, myjob=j, **kwargs)
return fig
def replaceRunAndJobsParameters(self, name, myrun, myjob):
res = name
# print (res)
codes = [["%r." + key, myrun[key]] for key in myrun.entries.keys()]
codes += [["%j." + key, myjob[key]] for key in myjob.entries.keys()]
for code, val in codes:
res = res.replace(code, str(val))
return res
def generateLabels(self, results, myrun, myjob):
labels = []
names = [r[0] for r in results]
for i in range(0, len(results)):
name = results[i][0]
if (not self.legend or i >= len(self.legend) or
not self.legend[i]):
labels.append(
self.replaceRunAndJobsParameters(name, myrun, myjob)
)
continue
# print (self.legend[i])
head_legend = self.legend[i].replace("{", "{{")
head_legend = head_legend.replace("}", "}}")
head_legend = re.sub(r"(%)([0-9]+)", r'{\2}', head_legend).format(
*names)
# print (head_legend)
head_legend = self.replaceRunAndJobsParameters(
head_legend, myrun, myjob)
# print (head_legend)
# if (not head_legend.find("%") == -1):
# print("unknown variable name. Possible variables are:")
# print "\n".join([c[0] for c in codes])
# sys.exit(-1)
# print (head_legend)
labels.append(head_legend)
return labels
def makeComposedQuantity(self, results, myrun, myjob):
vecs = [r[1] for r in results]
names = [r[0] for r in results]
# print (vecs[0].shape)
new_results = []
for comp in self.using:
exprs = comp.split(":")
tmp_res = []
for i in [0, 1]:
e = re.sub(r"(%)([0-9]+)\.(x)", r"vecs[\2][:,0]", exprs[i])
e = re.sub(r"(%)([0-9]+)\.(y)", r"vecs[\2][:,1]", e)
e = self.replaceRunAndJobsParameters(e, myrun, myjob)
try:
tmp_res.append(eval(e))
except Exception as ex:
print(names)
print("invalid expression: '" + exprs[i] + "'")
print("invalid expression: '" + e + "'")
print(ex)
i = 1
for v in vecs:
print('quantity {0}/{1} shape: {2}'.format(
i, len(vecs), v.shape))
i += 1
sys.exit(-1)
name = re.sub(r"(%)([0-9]+)\.([x|y])", r'(" + str(names[\2]) + ")',
exprs[1])
res = np.zeros((tmp_res[0].shape[0], 2))
res[:, 0] = tmp_res[0]
res[:, 1] = tmp_res[1]
# print (res.shape)
# expr = re.sub(r"(%)([0-9]+)", r"vecs[\2]", comp)
# res[0] = eval(expr)
# print (name)
name = "\"" + name + "\""
# print (name)
name = eval(name)
# print (name)
new_results.append([name, res])
return new_results
def decorateGraph(self, fig, myrun, myjob, results):
if not results:
return
if fig is None:
import matplotlib.pyplot as plt
fig = plt.figure()
axe = fig.add_subplot(1, 1, 1)
if self.xrange:
axe.set_xlim(self.xrange)
if self.yrange:
axe.set_ylim(self.yrange)
if (self.xlabel):
axe.set_xlabel(self.xlabel)
if (self.ylabel):
axe.set_ylabel(self.ylabel)
if (self.title):
t = self.replaceRunAndJobsParameters(self.title, myrun, myjob)
axe.set_title(t)
axe.grid(True, linewidth=0.1)
if self.using:
results = self.makeComposedQuantity(results, myrun, myjob)
labels = self.generateLabels(results, myrun, myjob)
# print (labels)
return fig, axe, results, labels
def makeCurve(self, results, myrun=None, myjob=None, fig=None, **kwargs):
fig, axe, results, labels = self.decorateGraph(
fig, myrun, myjob, results)
for count, result in enumerate(results):
# name = result[0]
step = result[1]
vec = result[2]
label = labels[count]
# print (self.quantities)
# print (name)
style = dict()
if (self.marker is not None):
style["marker"] = self.marker
if self.blackwhite:
width_index = self.cycle_index/len(self.linestyle_cycle)
style_index = self.cycle_index % len(self.linestyle_cycle)
self.cycle_index += 1
style["linewidth"] = self.linewidth_cycle[width_index]
style["linestyle"] = self.linestyle_cycle[style_index]
style["color"] = 'k'
axe.plot(step/self.xscale, vec/self.yscale,
label=label, **style)
axe.legend(loc='best')
if (self.fileout):
fig.savefig(self.fileout)
return fig
def setConstraints(self, **params):
self.constraints = []
if "constraints" in params:
self.constraints = params["constraints"]
def setBinaryOperator(self, **params):
self.binary_operator = 'and'
if ("binary_operator" in params):
self.binary_operator = params["binary_operator"]
def setQuantity(self, **params):
if ("quantity" in params):
self.quantities = params["quantity"]
else:
print("quantity should be provided using option --quantity")
self.quantities = "__BLACKDYNAMITE_ERROR__"
def __init__(self, base, **params):
self.setConstraints(**params)
self.setQuantity(**params)
self.base = base
self.runSelector = runselector.RunSelector(self.base)
self.fig = None
self.xrange = None
self.yrange = None
self.xlabel = None
self.ylabel = None
self.xscale = None
self.yscale = None
self.fileout = None
self.title = None
self.using = None
self.frequency = None
self.start = None
self.end = None
self.figsize = None
self.blackwhite = None
self.legend = None
self.sort_by = None
self.marker = None
# set the members if keys are present in params
members = set(self.__dict__.keys())
p = set(params.keys())
for key in members & p:
setattr(self, key, params[key])
if params["list_quantities"] is True:
myrun = base.Run(base)
print("list of possible quantities:\n")
print("\n".join(myrun.listQuantities()))
sys.exit(0)
if params["list_parameters"] is True:
self.base.getPossibleParameters()
sys.exit(0)
self.linewidth_cycle = [1, 2, 4]
self.linestyle_cycle = ['-', '--', '-.']
self.cycle_index = 0
################################################################
class GraphParser(bdparser.BDParser):
"""
"""
def __init__(self):
bdparser.BDParser.__init__(self)
self.admissible_params["quantity"] = [str]
self.help["quantity"] = "Specify the quantity to be outputed"
self.admissible_params["xrange"] = [float]
self.help["xrange"] = "Specify range of values in the X direction"
self.admissible_params["yrange"] = [float]
self.help["yrange"] = "Specify range of values in the Y direction"
self.admissible_params["sort_by"] = [str]
self.help["sort_by"] = (
"Specify a study parameter to be used in sorting the curves")
self.admissible_params["xlabel"] = str
self.help["xlabel"] = "Specify the label for the X axis"
self.admissible_params["ylabel"] = str
self.help["ylabel"] = "Specify the label for the Y axis"
self.admissible_params["xscale"] = float
self.default_params["xscale"] = 1.
self.help["xscale"] = "Specify a scale factor for the X axis"
self.admissible_params["yscale"] = float
self.default_params["yscale"] = 1.
self.help["yscale"] = "Specify a scale factor for the Y axis"
self.admissible_params["title"] = str
self.help["title"] = "Specify title for the graph"
self.admissible_params["legend"] = [str]
self.help["legend"] = (
"Specify a legend for the curves."
" The syntax can use %%j.param or %%r.param to use"
" get job and run values")
self.default_params["legend"] = None
self.admissible_params["using"] = [str]
self.help["using"] = (
"Allow to combine several quantities. "
"The syntax uses python syntax where "
"%%quantity1.column1:%%quantity2.column2 is the python "
"numpy vector provided by quantity number (provided using the "
"--quantities option) and column number (x or y). "
"The sytax is comparable to the GNUPlot one in using the ':' "
"to separate X from Y axis")
self.admissible_params["list_quantities"] = bool
self.help["list_quantities"] = (
"Request to list the possible quantities to be plotted")
self.admissible_params["list_parameters"] = bool
self.help["list_parameters"] = (
"Request to list the possible job/run parameters")
self.admissible_params["frequency"] = int
self.default_params["frequency"] = 1
self.help["frequency"] = (
"Set a frequency at which the quantity values "
"should be retreived "
"(helpful when the amount of data is very large)")
self.admissible_params["start"] = float
self.help["start"] = "Set the start X value for the graph"
self.admissible_params["end"] = int
self.help["end"] = "Set the end X value for the graph"
self.admissible_params["figsize"] = [float]
self.admissible_params["blackwhite"] = bool
self.default_params["blackwhite"] = False
self.help["blackwhite"] = "Request a black and white graph generation"
self.default_params["blackwhite"] = False
self.help["blackwhite"] = "Request to plot a black and white graph"
self.admissible_params["marker"] = str
self.help["marker"] = "Request a specific marker (matplotlib option)"
self.admissible_params["fileout"] = str
self.help["fileout"] = (
'Request to write a PDF file'
' (given its name) containing the graph')
self.group_params["GraphHelper"] = [
"quantity",
"xrange",
"yrange",
"sort_by",
"xlabel",
"ylabel",
"xscale",
"yscale",
"title",
"legend",
"using",
"list_quantities",
"list_parameters",
"frequency",
"start",
"end",
"figsize",
"blackwhite",
"marker",
"fileout"]
################################################################
__all__ = ["GraphHelper", "GraphParser"]
################################################################
diff --git a/BlackDynamite/run_zeo.py b/BlackDynamite/run_zeo.py
index f8d4685..b61987a 100755
--- a/BlackDynamite/run_zeo.py
+++ b/BlackDynamite/run_zeo.py
@@ -1,391 +1,394 @@
#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
################################################################
import BTrees
from . import conffile_zeo
from . import zeoobject
from . import bdparser
from . import base
from .base_zeo import BaseZEO
from . import runselector
from . import bdlogging
import ZODB.blob as blob
################################################################
import numpy as np
import datetime
import subprocess
import socket
import os
################################################################
__all__ = ['RunZEO', 'getRunFromScript']
print = bdlogging.invalidPrint
logger = bdlogging.getLogger(__name__)
# PBTree = lowercase_btree.PersistentLowerCaseBTree
BTree = BTrees.OOBTree.BTree
################################################################
class UnknownQuantity(RuntimeError):
pass
class RunZEO(zeoobject.ZEOObject):
"""
"""
table_name = 'runs'
def getJob(self):
return BaseZEO.singleton_base.getJobFromID(self.entries["job_id"])
def start(self):
# logger.error(self.entries['state'])
self.entries['state'] = 'START'
# logger.error(self['state'])
logger.debug('starting run')
BaseZEO.singleton_base.commit()
logger.debug('commited')
def finish(self):
self.entries['state'] = 'FINISHED'
logger.debug('finish run')
BaseZEO.singleton_base.commit()
logger.debug('commited')
def attachToJob(self, job):
# logger.error(f"attach job {job.id}")
self["job_id"] = job.id
BaseZEO.singleton_base.insert(self)
def getExecFile(self):
conf_exec = self.configfiles[self.exec]
return self.getUpdatedConfigFile(conf_exec)
def setExecFile(self, file_name, **kwargs):
# check if the file is already in the config files
for _id, f in self.configfiles.items():
if f.filename == file_name:
self.entries["exec"] = f.id
return f.id
# the file is not in the current config files
# so it has to be added
conf = conffile_zeo.addFile(
file_name, BaseZEO.singleton_base, **kwargs)
self.configfiles[conf.id] = conf
self.entries["exec"] = conf.id
return conf.id
def listFiles(self, subdir=""):
"""List files in run directory / specified sub-directory"""
command = 'ls {0}'.format(os.path.join(self['run_path'], subdir))
if not self['machine_name'] == socket.gethostname():
command = 'ssh {0} "{1}"'.format(self['machine_name'], command)
logger.info(command)
p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
out = p.stdout.readlines()
out = [o.strip().decode() for o in out]
return out
def getFile(self, filename, outpath='/tmp'):
dest_path = os.path.join(
outpath, "BD-" + self.base.schema + "-cache",
"run-{0}".format(self.id))
dest_file = os.path.join(dest_path, filename)
full_filename = self.getFullFileName(filename)
# Check if file is local
if os.path.isfile(full_filename):
return full_filename
# If file is distant, prepare cache directory hierarchy
dest_path = os.path.dirname(dest_file)
logger.debug('Directories: ' + dest_path)
logger.debug('File: ' + dest_file)
# Making directories
try:
os.makedirs(dest_path, exist_ok=True)
except Exception as e:
logger.error(e)
pass
if os.path.isfile(dest_file):
logger.info('File {} already cached'.format(dest_file))
return dest_file
cmd = 'scp {0}:{1} {2}'.format(self['machine_name'],
self.getFullFileName(filename),
dest_file)
logger.info(cmd)
p = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
errors = bytes(p.stderr.read()).decode().strip()
if errors:
logger.warning(errors)
return dest_file
def getFullFileName(self, filename):
return os.path.join(self['run_path'], filename)
def addConfigFiles(self, file_list, regex_params=None):
if not type(file_list) == list:
file_list = [file_list]
params_list = list(self.types.keys())
myjob = BaseZEO.singleton_base.Job(BaseZEO.singleton_base)
params_list += list(myjob.types.keys())
# logger.debug (regex_params)
# file_ids = [f for f in self.configfiles]
files_to_add = [
conffile_zeo.addFile(
fname, BaseZEO.singleton_base,
regex_params=regex_params,
params=params_list)
for fname in file_list]
for f in files_to_add:
if (f.id not in self.configfiles):
self.configfiles[f.id] = f
BaseZEO.singleton_base.commit()
return self.configfiles
def getConfigFiles(self):
conffiles = [self.getUpdatedConfigFile(f)
for _id, f in self.configfiles.items()]
return conffiles
def getConfigFile(self, file_id):
return self.configfiles[file_id]
def replaceBlackDynamiteVariables(self, text):
myjob = BaseZEO.singleton_base.Job(BaseZEO.singleton_base)
myjob["id"] = self.entries["job_id"]
myjob = myjob.getMatchedObjectList()[0]
for key, val in myjob.entries.items():
tmp = text.replace("__BLACKDYNAMITE__" + key + "__",
str(val))
if ((not tmp == text) and val is None):
raise Exception("unset job parameter " + key)
text = tmp
for key, val in self.entries.items():
tmp = text.replace("__BLACKDYNAMITE__" + key + "__",
str(val))
if ((not tmp == text) and val is None):
logger.debug(self.entries)
raise Exception("unset run parameter " + key)
text = tmp
text = text.replace("__BLACKDYNAMITE__dbhost__",
BaseZEO.singleton_base.dbhost)
text = text.replace("__BLACKDYNAMITE__study__",
BaseZEO.singleton_base.schema)
text = text.replace("__BLACKDYNAMITE__run_id__",
str(self.id))
return text
def getUpdatedConfigFile(self, conf):
conf["file"] = self.replaceBlackDynamiteVariables(conf["file"])
return conf
def listQuantities(self):
return BaseZEO.singleton_base.quantities
def getLastStep(self):
if 'last_step' in self.entries:
return self.last_step, self.last_step_time
else:
return None, None
def getScalarQuantity(self, name, additional_request=None):
if name not in self.quantities:
raise UnknownQuantity(
f"for run {self}\n"
f"unknown quantity '{name}'\n"
f"possible quantities are {[e for e in self.quantities.keys()]}")
step, array = self.getQuantityArrayFromBlob(name)
return step, array
def getScalarQuantities(self, names, additional_request=None):
res = []
for q in names:
try:
step, array = self.getScalarQuantity(q)
res.append((q, step, array))
except UnknownQuantity:
logger.warning(f'run {self.id} has no quantity: {q}')
return None
return res
def getVectorQuantity(self, name, step):
step_array, array = self.getQuantityArrayFromBlob(name)
i = np.where(step_array == step)[0]
if i.shape[0] == 0:
raise RuntimeError('the step {step} could not be found')
if i.shape[0] > 1:
raise RuntimeError('the step {step} was registered more than once')
i = i[0]
return array[i]
@zeoobject._transaction
def saveStepTimeStamp(self, step):
self.last_step = step
- self.last_step_time = datetime.now()
+ self.last_step_time = datetime.datetime.now()
@zeoobject._transaction
def pushVectorQuantity(self, vec, step, name,
is_integer=None, description=None):
quantities = BaseZEO.singleton_base.quantities
quantities.add(name)
if name not in self.quantities:
list_vec = np.array([vec], dtype=object)
array_step = np.array([step])
else:
array_step, list_vec = self.getQuantityArrayFromBlob(name)
array_step = np.append(array_step, [step], axis=0)
if (len(list_vec.shape) == 2 and list_vec.shape[1] != vec.shape[0]) or len(list_vec.shape) == 1:
list_vec = np.array(
[e for e in list_vec] + [vec], dtype=object)
else:
list_vec = np.append(list_vec, [vec], axis=0)
self.saveQuantityArrayToBlob(name, array_step, list_vec)
self.saveStepTimeStamp(step)
BaseZEO.singleton_base.commit()
@zeoobject._transaction
def pushScalarQuantity(self, val, step, name,
is_integer=None, description=None):
quantities = BaseZEO.singleton_base.quantities
quantities.add(name)
if name not in self.quantities:
array_val = np.array([val])
array_step = np.array([step])
else:
array_step, array_val = self.getQuantityArrayFromBlob(name)
array_step = np.append(array_step, [step], axis=0)
array_val = np.append(array_val, [val], axis=0)
self.saveQuantityArrayToBlob(name, array_step, array_val)
self.saveStepTimeStamp(step)
BaseZEO.singleton_base.commit()
def getQuantityBlob(self, name):
if name not in self.quantities:
logger.info(f'create quantity {name}')
self.quantities[name] = blob.Blob()
return self.quantities[name]
def getQuantityArrayFromBlob(self, name):
buf = self.getQuantityBlob(name).open()
# logger.error(buf.name)
try:
_f = np.load(buf, allow_pickle=True)
except IOError as e:
logger.error(e)
raise RuntimeError(
f"Cannot read file {buf.name} for quantity {name}")
# logger.error(f'{name} {_f["step"]}')
return _f['step'], _f['val']
def saveQuantityArrayToBlob(self, name, array_step, array_val):
buf = self.getQuantityBlob(name).open('w')
# logger.error(f'{name} {buf.name}')
# logger.error(f'{name} {array_step}')
np.savez_compressed(buf, val=array_val, step=array_step)
def getAllVectorQuantity(self, name):
quantity_id, is_integer, is_vector = self.getQuantityID(
name, is_vector=True)
request = """
SELECT step,measurement from {0}.{1}
WHERE (run_id,quantity_id) = ({2},{3}) order by step
""".format(self.base.schema, "vector_real"
if is_integer is False
else "vector_integer", self.id, quantity_id)
curs = self.base.performRequest(request, [name])
fetch = curs.fetchall()
if (not fetch):
return [None, None]
matres = np.array([val[1] for val in fetch])
stepres = np.array([val[0] for val in fetch])
return (stepres, matres)
def delete(self):
del self.base.runs[self.id]
self.base.commit()
def deleteData(self):
+ for name in self.quantities:
+ blob = self.getQuantityBlob(name)
+ logger.error(dir(blob))
del self.quantities
self.quantities = BTree()
self.base.commit()
def __init__(self, base):
super().__init__(base)
self.configfiles = BTree()
self.quantities = BTree()
# logger.error(self.quantities)
base.prepare(self, 'run_desc')
self['id'] = None
self.types['id'] = int
self.types["machine_name"] = str
self.types["run_path"] = str
self.allowNull["run_path"] = True
self.types["job_id"] = int
self.types["nproc"] = int
self.types["run_name"] = str
self.types["wait_id"] = int
self.allowNull["wait_id"] = True
self.types["start_time"] = datetime.datetime
self.allowNull["start_time"] = True
self.types["state"] = str
self.allowNull["state"] = True
self.types["exec"] = str
self.types["last_step"] = int
self.types["last_step_time"] = datetime.datetime
self["last_step"] = None
self["last_step_time"] = None
self["start_time"] = None
self["wait_id"] = None
################################################################
def getRunFromScript():
parser = bdparser.BDParser()
parser.register_params(params={"run_id": int})
params = parser.parseBDParameters(argv=[])
mybase = base.Base(**params)
runSelector = runselector.RunSelector(mybase)
run_list = runSelector.selectRuns(params)
if len(run_list) > 1:
raise Exception(f'internal error {params}')
if len(run_list) == 0:
raise Exception(f'internal error {params}')
myrun, myjob = run_list[0]
# myrun.setEntries(params)
return myrun, myjob
diff --git a/scripts/cleanRuns.py b/scripts/cleanRuns.py
index f3d65a7..398a113 100755
--- a/scripts/cleanRuns.py
+++ b/scripts/cleanRuns.py
@@ -1,143 +1,144 @@
#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import BlackDynamite as BD
import os
import sys
import socket
import re
import shutil
################################################################
def validate(question):
if params["truerun"] is True:
validated = BD.bdparser.validate_question(question, params)
else:
print("{0}? Forced N".format(question))
validated = False
return validated
parser = BD.BDParser()
parser.register_params(
"clearRun",
params={
"runid": int,
"clean_orphans": str,
"machine_name": str,
"constraints": [str],
"delete": bool
},
defaults={
"machine_name": socket.gethostname(),
"delete": False,
},
help={
"machine_name": "Machine name for desired runs",
"delete": "Entirely remove runs from database",
"runid": "ID of a specific run"
}
)
params = parser.parseBDParameters()
if "machine_name" in params:
if "constraints" in params:
params["constraints"].append(
"machine_name = " + params["machine_name"])
else:
params["constraints"] = ["machine_name = " + params["machine_name"]]
base = BD.Base(**params)
runSelector = BD.RunSelector(base)
if "clean_orphans" in params:
run_list = runSelector.selectRuns([])
run_ids = [r.id for r, j in run_list]
resdir = params["clean_orphans"] + "/BD-" + params["study"] + "-runs"
print("clean orphans from " + resdir)
if not os.path.exists(resdir):
print("Directory '" + resdir + "' do not exists")
sys.exit(-1)
to_delete = {}
for filename in os.listdir(resdir):
fullname = os.path.join(resdir, filename)
# print(fullname)
if (os.path.isdir(fullname)):
match = re.match("run-([0-9]+)", filename)
if (match):
# print(filename)
id = int(match.group(1))
if (id not in run_ids):
to_delete[id] = fullname
if (len(to_delete.keys()) == 0):
print("No orphans found")
sys.exit(0)
validated = validate("Delete output from runs " + str(to_delete.keys()))
if (validated):
for id, fullname in to_delete.items():
print("Delete output from run " + str(id))
shutil.rmtree(fullname)
sys.exit(0)
runSelector = BD.RunSelector(base)
run_list = runSelector.selectRuns(params)
if (len(run_list) == 0):
print("No runs to be cleared")
validated = validate("Delete runs " + str([r[0].id for r in run_list]))
for r, j in run_list:
delete_flag = params["delete"]
if "run_path" in r:
run_path = r["run_path"]
else:
run_path = None
if run_path:
if os.path.exists(run_path):
if (validated):
print("Deleting directory: " + run_path)
shutil.rmtree(run_path)
else:
print("Simulate deletion of directory: " + run_path)
else:
print("output directory: '" + run_path +
"' not found: are we on the right machine ?")
if (delete_flag):
if validated:
print("Deleting run " + str(r.id) + " from base")
r.delete()
base.commit()
+ base.pack()
else:
print("Simulate deletion of run " + str(r.id) + " from base")
else:
if validated:
print("Deleting data associated with run " + str(r.id))
r.deleteData()
r["STATE"] = "CREATED"
r["start_time"] = None
r.update()
base.commit()
else:
print("Simulate deletion of data associated with run " + str(r.id))