Page MenuHomec4science

run_zeo.py
No OneTemporary

File Metadata

Created
Wed, Jun 5, 20:07

run_zeo.py

#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
################################################################
import BTrees
from . import conffile_zeo
from . import zeoobject
from . import bdparser
from . import base
from .base_zeo import BaseZEO
from . import runselector
from . import bdlogging
from . import lowercase_btree
################################################################
import numpy as np
import datetime
import subprocess
import socket
import os
################################################################
__all__ = ['RunZEO', 'getRunFromScript']
print = bdlogging.invalidPrint
logger = bdlogging.getLogger(__name__)
# PBTree = lowercase_btree.PersistentLowerCaseBTree
BTree = BTrees.OOBTree.BTree
################################################################
class RunZEO(zeoobject.ZEOObject):
"""
"""
table_name = 'runs'
def getJob(self):
return BaseZEO.singleton_base.getJobFromID(self.entries["job_id"])
def start(self):
# logger.error(self.entries['state'])
self.entries['state'] = 'START'
# logger.error(self['state'])
logger.debug('starting run')
BaseZEO.singleton_base.commit()
logger.debug('commited')
def finish(self):
self.entries['state'] = 'FINISHED'
logger.debug('finish run')
BaseZEO.singleton_base.commit()
logger.debug('commited')
def attachToJob(self, job):
# logger.error(f"attach job {job.id}")
self["job_id"] = job.id
BaseZEO.singleton_base.insert(self)
def getExecFile(self):
return self.getUpdatedConfigFile(self.entries["exec"])
def setExecFile(self, file_name, **kwargs):
# check if the file is already in the config files
for f in self.configfiles:
if f.entries["filename"] == file_name:
self.execfile = f
self.entries["exec"] = f.id
return f.id
# the file is not in the current config files
# so it has to be added
conf = conffile_zeo.addFile(
file_name, BaseZEO.singleton_base, **kwargs)
self.configfiles.append(conf)
self.execfile = conf
self.entries["exec"] = conf.id
return conf.id
def listFiles(self, subdir=""):
"""List files in run directory / specified sub-directory"""
command = 'ls {0}'.format(os.path.join(self['run_path'], subdir))
if not self['machine_name'] == socket.gethostname():
command = 'ssh {0} "{1}"'.format(self['machine_name'], command)
logger.info(command)
p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
out = p.stdout.readlines()
out = [o.strip().decode() for o in out]
return out
def getFile(self, filename, outpath='/tmp'):
dest_path = os.path.join(
outpath, "BD-" + self.base.schema + "-cache",
"run-{0}".format(self.id))
dest_file = os.path.join(dest_path, filename)
full_filename = self.getFullFileName(filename)
# Check if file is local
if os.path.isfile(full_filename):
return full_filename
# If file is distant, prepare cache directory hierarchy
dest_path = os.path.dirname(dest_file)
logger.debug('Directories: ' + dest_path)
logger.debug('File: ' + dest_file)
# Making directories
try:
os.makedirs(dest_path, exist_ok=True)
except Exception as e:
logger.error(e)
pass
if os.path.isfile(dest_file):
logger.info('File {} already cached'.format(dest_file))
return dest_file
cmd = 'scp {0}:{1} {2}'.format(self['machine_name'],
self.getFullFileName(filename),
dest_file)
logger.info(cmd)
p = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
errors = bytes(p.stderr.read()).decode().strip()
if errors:
logger.warning(errors)
return dest_file
def getFullFileName(self, filename):
return os.path.join(self['run_path'], filename)
def addConfigFiles(self, file_list, regex_params=None):
if not type(file_list) == list:
file_list = [file_list]
params_list = list(self.types.keys())
myjob = BaseZEO.singleton_base.Job(BaseZEO.singleton_base)
params_list += list(myjob.types.keys())
# logger.debug (regex_params)
file_ids = [f.id for f in self.configfiles]
files_to_add = [
conffile_zeo.addFile(
fname, BaseZEO.singleton_base,
regex_params=regex_params,
params=params_list)
for fname in file_list]
for f in files_to_add:
if (f.id not in file_ids):
self.configfiles.append(f)
BaseZEO.singleton_base.commit()
return self.configfiles
def getConfigFiles(self):
return self.configfiles
def getConfigFile(self, file_id):
for f in self.configfiles:
if f.id == file_id:
return f
def replaceBlackDynamiteVariables(self, text):
myjob = BaseZEO.singleton_base.Job(BaseZEO.singleton_base)
myjob["id"] = self.entries["job_id"]
myjob = myjob.getMatchedObjectList()[0]
for key, val in myjob.entries.items():
tmp = text.replace("__BLACKDYNAMITE__" + key + "__",
str(val))
if ((not tmp == text) and val is None):
raise Exception("unset job parameter " + key)
text = tmp
for key, val in self.entries.items():
tmp = text.replace("__BLACKDYNAMITE__" + key + "__",
str(val))
if ((not tmp == text) and val is None):
logger.debug(self.entries)
raise Exception("unset run parameter " + key)
text = tmp
text = text.replace("__BLACKDYNAMITE__dbhost__",
BaseZEO.singleton_base.dbhost)
text = text.replace("__BLACKDYNAMITE__study__",
BaseZEO.singleton_base.schema)
text = text.replace("__BLACKDYNAMITE__run_id__",
str(self.id))
return text
def getUpdatedConfigFile(self, file_id):
conf = self.getConfigFile(file_id)
conf["file"] = self.replaceBlackDynamiteVariables(conf["file"])
return conf
def listQuantities(self):
return BaseZEO.singleton_base.quantities
def getScalarQuantities(self, names, additional_request=None):
self.base.commit()
# logger.error([q for q in self.quantities])
# logger.error(names)
return [(q, np.array(self.quantities[q]))
for q in names if q in self.quantities]
def getLastStep(self):
if 'last_step' in self.entries:
return self.last_step, self.last_step_time
else:
return None, None
def getQuantityID(self, name, is_integer=None, is_vector=None):
request = """
SELECT id,is_integer,is_vector FROM {0}.quantities WHERE (name) = (%s)
""".format(self.base.schema)
curs = self.base.performRequest(request, [name])
item = curs.fetchone()
if (item is None):
raise Exception("unknown quantity \"" + name + "\"")
if ((is_integer is not None) and (not is_integer == item[1])):
raise Exception("quantity \"" + name + "\" has is_integer = " +
str(item[1]))
if ((is_vector is not None) and (not is_vector == item[2])):
raise Exception("quantity \"" + name + "\" has is_vector = " +
str(item[2]))
return item[0], item[1], item[2]
def getScalarQuantity(self, name, additional_request=None):
res = np.array(self.quantities[name])
# logger.error(res)
return res
if (additional_request):
raise RuntimeError(f'need code review {additional_request}')
def getVectorQuantity(self, name, step):
quantity_id, is_integer, is_vector = self.getQuantityID(name)
if (is_vector is False):
raise Exception("Quantity " + name + " is not vectorial")
request = """
SELECT measurement from {0}.{1} WHERE (run_id,quantity_id,step) = ({2},{3},{4})
""".format(self.base.schema, "vector_real"
if (is_integer is False)
else "vector_integer",
self.id, quantity_id, step)
curs = self.base.performRequest(request, [name])
fetch = curs.fetchone()
if (fetch):
return np.array(fetch[0])
return None
def pushVectorQuantity(self, vec, step, name,
is_integer, description=None):
self.pushScalarQuantity(vec, step, name, is_integer, description=None)
@zeoobject._transaction
def pushScalarQuantity(self, val, step, name,
is_integer, description=None):
# logger.error(f'pushing {name} {step} {val}')
quantities = BaseZEO.singleton_base.quantities
quantities.add(name)
if name not in self.quantities:
logger.info(f'create quantity {name}')
self.quantities[name] = []
self.quantities[name].append((step, val))
# logger.error(self.quantities)
# logger.error(type(self.quantities[name]))
# logger.error(f'current quantity {self.quantities[name]}')
# logger.error(self._p_changed)
# logger.error(self.quantities._p_changed)
self._p_changed = True
self.quantities._p_changed = True
# logger.error(self._p_changed)
# logger.error(self.quantities._p_changed)
BaseZEO.singleton_base.commit()
# logger.error(self._p_changed)
# logger.error(self.quantities._p_changed)
def getAllVectorQuantity(self, name):
quantity_id, is_integer, is_vector = self.getQuantityID(
name, is_vector=True)
request = """
SELECT step,measurement from {0}.{1}
WHERE (run_id,quantity_id) = ({2},{3}) order by step
""".format(self.base.schema, "vector_real"
if is_integer is False
else "vector_integer", self.id, quantity_id)
curs = self.base.performRequest(request, [name])
fetch = curs.fetchall()
if (not fetch):
return [None, None]
matres = np.array([val[1] for val in fetch])
stepres = np.array([val[0] for val in fetch])
return (stepres, matres)
def deleteData(self):
request, params = (
"DELETE FROM {0}.scalar_real WHERE run_id={1}".format(
self.base.schema, self.id), [])
self.base.performRequest(request, params)
request, params = (
"DELETE FROM {0}.scalar_integer WHERE run_id={1}".format(
self.base.schema, self.id), [])
self.base.performRequest(request, params)
request, params = (
"DELETE FROM {0}.vector_real WHERE run_id={1}".format(
self.base.schema, self.id), [])
self.base.performRequest(request, params)
request, params = (
"DELETE FROM {0}.vector_integer WHERE run_id={1}".format(
self.base.schema, self.id), [])
self.base.performRequest(request, params)
def __init__(self, base):
super().__init__(base)
self.configfiles = []
self.quantities = BTree()
# logger.error(self.quantities)
base.prepare(self, 'run_desc')
self['id'] = None
self.types['id'] = int
self.types["machine_name"] = str
self.types["run_path"] = str
self.allowNull["run_path"] = True
self.types["job_id"] = int
self.types["nproc"] = int
self.types["run_name"] = str
self.types["wait_id"] = int
self.allowNull["wait_id"] = True
self.types["start_time"] = datetime.datetime
self.allowNull["start_time"] = True
self.types["state"] = str
self.allowNull["state"] = True
self.execfile = None
self.types["exec"] = str
self.types["last_step"] = int
self.types["last_step_time"] = datetime.datetime
self["last_step"] = None
self["last_step_time"] = None
self["start_time"] = None
self["wait_id"] = None
################################################################
def getRunFromScript():
parser = bdparser.BDParser()
parser.register_params(params={"run_id": int})
params = parser.parseBDParameters(argv=[])
mybase = base.Base(**params)
runSelector = runselector.RunSelector(mybase)
run_list = runSelector.selectRuns(params)
if len(run_list) > 1:
raise Exception('internal error')
if len(run_list) == 0:
raise Exception('internal error')
myrun, myjob = run_list[0]
# myrun.setEntries(params)
return myrun, myjob

Event Timeline