diff --git a/BlackDynamite/base_psql.py b/BlackDynamite/base_psql.py
index a05103e..1c3becb 100644
--- a/BlackDynamite/base_psql.py
+++ b/BlackDynamite/base_psql.py
@@ -1,280 +1,299 @@
#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
################################################################
from . import bdparser
from . import bdlogging
from . import base
from . import job
from . import run_sql
+from . import runselector
from .constraints_psql import PSQLconstraints
################################################################
import psycopg2
import sys
import getpass
import atexit
import datetime
import copy
################################################################
__all__ = ["BasePSQL"]
print = bdlogging.invalidPrint
logger = bdlogging.getLogger(__name__)
################################################################
class BasePSQL(base.AbstractBase):
"""
"""
def __init__(self, truerun=False, creation=False, **kwargs):
self.Job = job.JobSQL
self.Run = run_sql.RunSQL
self.BDconstraints = PSQLconstraints
psycopg2_params = ["host", "user", "port", "password"]
connection_params = bdparser.filterParams(psycopg2_params, kwargs)
connection_params['dbname'] = 'blackdynamite'
if ("password" in connection_params and
connection_params["password"] == 'ask'):
connection_params["password"] = getpass.getpass()
logger.debug('connection arguments: {0}'.format(connection_params))
try:
connection = psycopg2.connect(**connection_params)
logger.debug('connected to base')
except Exception as e:
logger.error(
"Connection failed: check your connection settings:\n" +
str(e))
sys.exit(-1)
assert(isinstance(connection, psycopg2._psycopg.connection))
self.dbhost = (kwargs["host"]
if "host" in kwargs.keys()
else "localhost")
super().__init__(connection=connection, truerun=truerun,
creation=creation, **kwargs)
self.createTypeCodes()
# We should avoid using __del__ to close DB
def close_db():
self.close()
atexit.register(close_db)
def performRequest(self, request, params=[]):
curs = self.connection.cursor()
logger.debug(request)
logger.debug(params)
try:
curs.execute(request, params)
except psycopg2.ProgrammingError as err:
raise psycopg2.ProgrammingError(
("While trying to execute the query '{0}' with parameters " +
"'{1}', I caught this: '{2}'").format(request, params, err))
return curs
def createTypeCodes(self):
curs = self.connection.cursor()
curs.execute("SELECT typname,oid from pg_type;")
self.type_code = {}
for i in curs:
if i[0] == 'float8':
self.type_code[i[1]] = float
if i[0] == 'text':
self.type_code[i[1]] = str
if i[0] == 'int8':
self.type_code[i[1]] = int
if i[0] == 'int4':
self.type_code[i[1]] = int
if i[0] == 'bool':
self.type_code[i[1]] = bool
if i[0] == 'timestamp':
self.type_code[i[1]] = datetime.datetime
def getStudySize(self, study):
curs = self.connection.cursor()
try:
logger.info(study)
curs.execute("""
select sz from (SELECT SUM(pg_total_relation_size(quote_ident(schemaname)
|| '.' || quote_ident(tablename)))::BIGINT
FROM pg_tables WHERE schemaname = '{0}') as sz
""".format(study))
size = curs.fetchone()[0]
curs.execute("""
select pg_size_pretty(cast({0} as bigint))
""".format(size))
size = curs.fetchone()[0]
curs.execute("""
select count({0}.runs.id) from {0}.runs
""".format(study))
nruns = curs.fetchone()[0]
curs.execute("""
select count({0}.jobs.id) from {0}.jobs
""".format(study))
njobs = curs.fetchone()[0]
except psycopg2.ProgrammingError:
self.connection.rollback()
size = '????'
return {'size': size, 'nruns': nruns, 'njobs': njobs}
def createBase(self, job_desc, run_desc, quantities={}, **kwargs):
# logger.debug(quantities)
self.createSchema(kwargs)
self.createTable(job_desc)
self.createTable(run_desc)
self.createGenericTables()
for qname, type in quantities.items():
self.pushQuantity(qname, type)
if self.truerun:
self.commit()
def createSchema(self, params={"yes": False}):
# create the schema of the simulation
curs = self.connection.cursor()
curs.execute(("SELECT schema_name FROM information_schema.schemata"
f" WHERE schema_name = '{self.schema.lower()}'"))
if curs.rowcount:
validated = bdparser.validate_question(
"Are you sure you want to drop the schema named '" +
self.schema + "'", params, False)
if validated is True:
curs.execute("DROP SCHEMA {0} cascade".format(self.schema))
else:
logger.debug("creation canceled: exit program")
sys.exit(-1)
curs.execute("CREATE SCHEMA {0}".format(self.schema))
def createTable(self, obj):
request = obj.createTableRequest()
curs = self.connection.cursor()
logger.debug(request)
curs.execute(request)
def getColumnProperties(self, sqlobject):
curs = self.connection.cursor()
try:
curs.execute("SELECT * FROM {0}.{1} LIMIT 0".format(
self.schema, sqlobject.table_name))
column_names = [desc[0] for desc in curs.description]
column_type = [desc[1] for desc in curs.description]
return list(zip(column_names, column_type))
except psycopg2.errors.UndefinedTable:
self.connection.rollback()
return []
return []
def setObjectItemTypes(self, sqlobject):
col_info = self.getColumnProperties(sqlobject)
for i, j in col_info:
sqlobject.types[i] = self.type_code[j]
# logger.debug (str(i) + " " + str(self.type_code[j]))
def select(self, _types, constraints=None, sort_by=None):
if (sort_by is not None) and (not isinstance(sort_by, str)):
raise RuntimeError(
'sort_by argument is not correct: {0}'.format(sort_by))
const = PSQLconstraints(self, constraints)
condition, params = const.getMatchingCondition()
if not isinstance(_types, list):
_types = [_types]
selected_tables = ['{0}.{1}'.format(self.schema, t.table_name)
for t in _types]
selected_tables = ','.join(selected_tables)
request = "SELECT * FROM {0}".format(selected_tables)
if condition:
request += " WHERE " + condition
# print (sort_by)
if sort_by:
request += " ORDER BY " + sort_by
logger.debug(request)
logger.debug(params)
curs = self.performRequest(request, params)
obj_list = self.buildList(curs, _types)
return obj_list
def buildList(self, curs, sqlobjs):
logger.debug(sqlobjs)
if not isinstance(sqlobjs, list):
sqlobjs = [sqlobjs]
col_infos = []
sqlobjs2 = []
for sqlobj in sqlobjs:
if isinstance(sqlobj, type):
sqlobj = sqlobj(self)
sqlobjs2.append(sqlobj)
col_infos.append(self.getColumnProperties(sqlobj))
sqlobjs = sqlobjs2
list_objects = []
for entries in curs:
# print(entries)
objs = []
offset = 0
logger.debug(sqlobjs)
for index, sqlobj in enumerate(sqlobjs):
obj = copy.deepcopy(sqlobj)
for col_name, size in col_infos[index]:
logger.debug((col_name, entries[offset]))
obj[col_name] = entries[offset]
offset += 1
objs.append(obj)
if len(objs) == 1:
list_objects.append(objs[0])
else:
list_objects.append(tuple(objs))
return list_objects
def insert(self, sqlobject):
curs = self.performRequest(*(sqlobject.insert()))
sqlobject.id = curs.fetchone()[0]
def commit(self):
logger.debug("commiting changes to base")
self.connection.commit()
def close(self):
if 'connection' in self.__dict__:
logger.debug('closing database session')
self.connection.close()
del (self.__dict__['connection'])
+ def get_state_summary(self, params=[]):
+ runSelector = runselector.RunSelector(self)
+ run_list = runSelector.selectRuns(params, quiet=True)
+ request = "SELECT run_name,state,count(state) from {0}.runs ".format(
+ self.schema)
+ if (len(run_list) > 0):
+ request += "where id in (" + ",".join(
+ [str(r.id) for r, j in run_list]) + ")"
+ request += " group by state,run_name order by run_name,state"
+ # print (request)
+ curs = self.performRequest(request, [])
+ stats = {}
+ for i in curs:
+ if i[0] not in stats:
+ stats[i[0]] = []
+ stats[i[0]].append([i[1], int(i[2])])
+ return stats
+
################################################################
diff --git a/BlackDynamite/base_zeo.py b/BlackDynamite/base_zeo.py
index 6634c68..5235c9f 100644
--- a/BlackDynamite/base_zeo.py
+++ b/BlackDynamite/base_zeo.py
@@ -1,396 +1,418 @@
#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
################################################################
from . import bdparser
from . import bdlogging
from . import base
from . import conffile_zeo
from . import zeoobject
from . import lowercase_btree
+from . import runselector
from .constraints_zeo import ZEOconstraints
################################################################
import re
import os
import subprocess
import ZEO
import ZODB
import sys
from BTrees.OOBTree import OOSet, BTree
from . import job
from . import run_zeo
import psutil
################################################################
__all__ = ["BaseZEO"]
print = bdlogging.invalidPrint
logger = bdlogging.getLogger(__name__)
PBTree = lowercase_btree.PersistentLowerCaseBTree
################################################################
def check_socket(socket_name):
if not os.path.exists(socket_name):
return False
conns = psutil.net_connections(kind='all')
addrs = [s.laddr for s in conns if s.laddr != '']
for a in addrs:
if a == socket_name:
logger.info("Found already running zeo server")
return True
return False
class BaseZEO(base.AbstractBase):
"""
"""
singleton_base = None
def __init__(self, truerun=False, creation=False, read_only=False, **kwargs):
BaseZEO.singleton_base = self
self.Job = job.JobZEO
self.Run = run_zeo.RunZEO
self.ConfFile = conffile_zeo.ConfFile
self.BDconstraints = ZEOconstraints
zeo_params = ["host"]
connection_params = bdparser.filterParams(zeo_params, kwargs)
logger.info('connection arguments: {0}'.format(connection_params))
self.filename = connection_params['host']
filename_split = self.filename.split('://')
if filename_split[0] != 'zeo':
raise RuntimeError(
f"wrong protocol with this database: {type(self)}")
self.filename = filename_split[1]
dirname = os.path.dirname(self.filename)
if dirname == '':
dirname = os.path.abspath('./')
socket_name = os.path.join(dirname, 'zeo.socket')
zeo_server_conf_filename = os.path.join(dirname, 'zeo.conf')
if not check_socket(socket_name):
zeo_server_conf = f'''
address {socket_name}
path bd.zeo
blob-dir bd.blob
path zeo.log
format %(asctime)s %(message)s
'''
with open(zeo_server_conf_filename, 'w') as f:
f.write(zeo_server_conf)
cmd = "runzeo -C zeo.conf"
logger.error("Spawning new zeo server: " + cmd)
self.process = subprocess.Popen(
cmd, shell=True, cwd=dirname)
# stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
self.connection = ZEO.connection(
socket_name, read_only=read_only, server_sync=True,
blob_dir=os.path.join(dirname, 'bd.blob'),
shared_blob_dir=True,
)
self.root = self.connection.root
logger.debug('connected to base')
except Exception as e:
logger.error(
"Connection failed: check your connection settings:\n" +
str(e))
sys.exit(-1)
assert(isinstance(self.connection, ZODB.Connection.Connection))
self.dbhost = (kwargs["host"]
if "host" in kwargs.keys()
else "localhost")
super().__init__(connection=self.connection, truerun=truerun,
creation=creation, **kwargs)
def getSchemaList(self, filter_names=True):
try:
schemas = self.root.schemas
except AttributeError:
self.root.schemas = PBTree(key_string='study_')
schemas = self.root.schemas
filtered_schemas = []
if filter_names is True:
for s in schemas:
m = re.match('{0}_(.+)'.format(self.user), s)
if m:
s = m.group(1)
filtered_schemas.append(s)
else:
filtered_schemas = schemas
return filtered_schemas
def getStudySize(self, study):
curs = self.connection.cursor()
try:
logger.info(study)
curs.execute("""
select sz from (SELECT SUM(pg_total_relation_size(quote_ident(schemaname)
|| '.' || quote_ident(tablename)))::BIGINT
FROM pg_tables WHERE schemaname = '{0}') as sz
""".format(study))
size = curs.fetchone()[0]
curs.execute("""
select pg_size_pretty(cast({0} as bigint))
""".format(size))
size = curs.fetchone()[0]
curs.execute("""
select count({0}.runs.id) from {0}.runs
""".format(study))
nruns = curs.fetchone()[0]
curs.execute("""
select count({0}.jobs.id) from {0}.jobs
""".format(study))
njobs = curs.fetchone()[0]
except psycopg2.ProgrammingError:
self.connection.rollback()
size = '????'
return {'size': size, 'nruns': nruns, 'njobs': njobs}
def createSchema(self, params={"yes": False}):
# create the schema of the simulation
if not hasattr(self.root, 'schemas'):
self.root.schemas = PBTree(key_string='study_')
if self.schema in self.root.schemas:
validated = bdparser.validate_question(
"Are you sure you want to drop the schema named '" +
self.schema + "'", params, False)
if validated is True:
del self.root.schemas[self.schema]
else:
logger.debug("creation canceled: exit program")
sys.exit(-1)
self.root.schemas[self.schema] = PBTree()
self.root.schemas[self.schema]['Quantities'] = OOSet()
self.root.schemas[self.schema]['Jobs'] = PBTree(key_string='job_')
self.root.schemas[self.schema]['JobsIndex'] = BTree()
self.root.schemas[self.schema]['RunsIndex'] = BTree()
self.root.schemas[self.schema]['Runs'] = PBTree(key_string='run_')
self.root.schemas[self.schema]['ConfigFiles'] = BTree()
self.root.schemas[self.schema]['Jobs_counter'] = 1
self.root.schemas[self.schema]['Runs_counter'] = 1
def prepare(self, obj, descriptor):
if not hasattr(self.root, 'schemas'):
return
if descriptor in self.root.schemas[self.schema]:
desc = self.root.schemas[self.schema][descriptor]
for t in desc.types.keys():
obj.types[t] = desc.types[t]
if t not in obj:
obj.t = None
def createBase(self, job_desc, run_desc, quantities={}, **kwargs):
self.createSchema(kwargs)
self.root.schemas[self.schema]['job_desc'] = job_desc
self.root.schemas[self.schema]['run_desc'] = run_desc
for qname, type in quantities.items():
self.pushQuantity(qname, type)
if self.truerun:
self.commit()
@property
def configfiles(self):
return self.root.schemas[self.schema]['ConfigFiles']
def _get_jobs(self):
return self.root.schemas[self.schema]['Jobs']
@property
def jobs(self):
return self._get_jobs()
@jobs.setter
def jobs(self, val):
self.root.schemas[self.schema]['Jobs'] = val
@property
def jobs_index(self):
return self._get_jobs_index()
@jobs_index.setter
def jobs_index(self, val):
self.root.schemas[self.schema]['JobsIndex'] = val
@property
def quantities(self):
return self.root.schemas[self.schema]['Quantities']
@quantities.setter
def quantities(self, value):
self.root.schemas[self.schema]['Quantities'] = value
@property
def jobs_counter(self):
return self.root.schemas[self.schema]['Jobs_counter']
@jobs_counter.setter
def jobs_counter(self, val):
self.root.schemas[self.schema]['Jobs_counter'] = val
def _get_runs(self):
return self.root.schemas[self.schema]['Runs']
def _get_runs_index(self):
return self.root.schemas[self.schema]['RunsIndex']
def _get_jobs_index(self):
return self.root.schemas[self.schema]['JobsIndex']
@property
def runs(self):
return self._get_runs()
@runs.setter
def runs(self, val):
self.root.schemas[self.schema]['Runs'] = val
@property
def runs_counter(self):
return self.root.schemas[self.schema]['Runs_counter']
@runs_counter.setter
def runs_counter(self, val):
self.root.schemas[self.schema]['Runs_counter'] = val
def select(self, _types, constraints=None, sort_by=None):
if not isinstance(_types, list):
_types = [_types]
_type = _types[0]
if isinstance(_type, zeoobject.ZEOObject):
_type = type(_type)
if _type == self.Job:
obj_container = self._get_jobs()
elif _type == self.Run:
obj_container = self._get_runs()
else:
raise RuntimeError(f'{type(_types)}')
if (sort_by is not None) and (not isinstance(sort_by, str)):
raise RuntimeError(
'sort_by argument is not correct: {0}'.format(sort_by))
if isinstance(constraints, zeoobject.ZEOObject):
if hasattr(constraints, 'id') and constraints.id is not None:
obj = obj_container[constraints.id]
if isinstance(obj, self.Run):
obj = (obj, self._get_jobs()[obj.job_id])
return [obj]
else:
constraints = constraints.copy()
constraints.evalFunctorEntries()
params = constraints.get_params()
keys = constraints.get_keys()
n_params = len(keys)
if len(params) == n_params:
if params in self.jobs_index:
return [self.jobs[self.jobs_index[params]]]
else:
return []
const = ZEOconstraints(self, constraints)
condition = const.getMatchingCondition()
obj_list = []
for key, obj in obj_container.items():
objs = [obj]
if _type == self.Run:
j = self._get_jobs()[obj.job_id]
objs.append(j)
if condition(objs):
if len(objs) == 1:
obj_list.append(objs[0])
else:
obj_list.append(objs)
return obj_list
def insert(self, zeoobject, keep_state=False):
if isinstance(zeoobject, self.Job):
objs = self.jobs
zeoobject = zeoobject.copy()
zeoobject.evalFunctorEntries()
logger.debug(zeoobject)
if not keep_state:
zeoobject['id'] = self.jobs_counter
self.jobs_counter += 1
params = zeoobject.get_params()
self.jobs_index[params] = zeoobject['id']
elif isinstance(zeoobject, self.Run):
objs = self.runs
zeoobject = zeoobject.copy()
if not keep_state:
zeoobject["id"] = self.runs_counter
zeoobject["state"] = 'CREATED'
job_id = zeoobject['job_id']
run_id = zeoobject['id']
job = self._get_jobs()[job_id]
if not hasattr(job, 'runs'):
job.runs = PBTree(key_string='runs_')
job.runs[run_id] = zeoobject
self.runs_counter += 1
else:
raise RuntimeError(
f'cannot insert object of type {type(zeoobject)}')
objs[zeoobject.id] = zeoobject.copy()
# logger.error(f'inserted {zeoobject.id} {objs[zeoobject.id]}')
def setObjectItemTypes(self, zeoobject):
if isinstance(zeoobject, self.Job):
zeoobject.types = self.root.schemas[self.schema]['job_desc'].types
elif isinstance(zeoobject, self.Run):
zeoobject.types = self.root.schemas[self.schema]['run_desc'].types
else:
raise RuntimeError(f'{type(zeoobject)}')
def commit(self):
import transaction
transaction.commit()
def pack(self):
self.connection.db().pack()
def close(self):
import transaction
transaction.abort()
+ def get_state_summary(self, params=[]):
+ runSelector = runselector.RunSelector(self)
+ run_list = runSelector.selectRuns(params, quiet=True)
+
+ _stats = {}
+ for r, j in run_list:
+ if (r.run_name, r.state) not in _stats:
+ _stats[(r.run_name, r.state)] = 0
+ _stats[(r.run_name, r.state)] += 1
+
+ stats = {}
+ for k, v in _stats.items():
+ run_name = k[0]
+ state = k[1]
+ count = v
+ if run_name not in stats:
+ stats[run_name] = []
+ stats[run_name].append((state, count))
+
+ return stats
+
################################################################
diff --git a/scripts/getRunInfo.py b/scripts/getRunInfo.py
index 3375a69..1c2119d 100755
--- a/scripts/getRunInfo.py
+++ b/scripts/getRunInfo.py
@@ -1,267 +1,248 @@
#!/usr/bin/env python3
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import BlackDynamite as BD
import sys
import datetime
################################################################
def printSummary(mybase, params):
- runSelector = BD.RunSelector(mybase)
- run_list = runSelector.selectRuns(params, quiet=True)
- print("*"*6 + " run summary => study {0} ".format(mybase.schema) + "*"*6)
- if len(run_list) == 0:
- print("no runs found")
- sys.exit(0)
-
- request = "SELECT run_name,state,count(state) from {0}.runs ".format(
- mybase.schema)
- if (len(run_list) > 0):
- request += "where id in (" + ",".join(
- [str(r.id) for r, j in run_list]) + ")"
- request += " group by state,run_name order by run_name,state"
- # print (request)
- curs = mybase.performRequest(request, [])
- stats = {}
- for i in curs:
- if i[0] not in stats:
- stats[i[0]] = []
- stats[i[0]].append([i[1], int(i[2])])
+ stats = mybase.get_state_summary(params)
for run_name, st in stats.items():
tot = 0
for n, count in st:
tot += count
for n, count in st:
print("{:20} {:>20} => {:5} ({:>5.1f}%)".format(
run_name, n, count, 100.*count/tot))
print("")
sys.exit(0)
################################################################
def getRunInfo(run_id, mybase):
myrun = mybase.Run()
myrun["id"] = run_id
myrun.id = run_id
run_list = myrun.getMatchedObjectList()
if (len(run_list) == 0):
print("no run found with id " + str(run_id))
sys.exit(1)
myrun = run_list[0]
if isinstance(myrun, mybase.Run):
myjob = mybase.Job(mybase)
myjob.id = myrun["job_id"]
myjob["id"] = myrun["job_id"]
job_list = myjob.getMatchedObjectList()
if (len(job_list) == 0):
print("no job found with id " + myjob.id)
sys.exit(1)
myjob = job_list[0]
else:
myjob = myrun[1]
myrun = myrun[0]
list_entries = myjob.entries.keys()
print("*"*6 + " job info " + "*"*6)
for entry in list_entries:
if (myjob[entry]):
print(entry + ": " + str(myjob[entry]))
print("*"*6 + " run info " + "*"*6)
list_entries = list(myrun.entries.keys())
regular_run_entries = ("run_name",
"job_id",
"state",
"start_time",
"machine_name",
"exec",
"nproc",
"wait_id")
for entry in regular_run_entries:
if (myrun[entry]):
print(entry + ": " + str(myrun[entry]))
list_entries.remove(entry)
for entry in list_entries:
if (myrun[entry]):
print(entry + ": " + str(myrun[entry]))
print("*"*6 + " config files " + "*"*6)
conffiles = myrun.getConfigFiles()
for conf in conffiles:
print("file #" + str(conf.id) + ": " + conf["filename"])
print("*"*6)
print(conf["file"])
list_quantities = list(myrun.quantities.keys())
if len(list_quantities) > 0:
print("*"*6 + " quantities " + "*"*6)
for q in list_quantities:
print(q)
else:
print("*"*6 + " no registered quantities " + "*"*6)
################################################################
def getInfoNames():
infos = []
infos.append("run_name")
infos.append("id")
infos.append("job_id")
if "infos" in params:
infos += params['infos']
else:
infos += ["state", "nproc", "machine_name"]
infos.append("start_time")
infos.append("last step")
infos.append("last update")
infos.append("Time/step")
infos.append("Total Time")
return infos
################################################################
def getFormatString(infos):
format_string = " {:<20} | {:^6} | {:^6} |"
if "infos" in params:
format_string += " {:^10} |" * len(params['infos'])
else:
format_string += " {:<15} | {:^5} | {:<20} |"
format_string += " {:14} | {:>9} | {:>16} | {:>10} | {:>16} |"
return format_string
################################################################
def formatTimeDelta(t):
if (t < datetime.timedelta(seconds=1)):
if (t < datetime.timedelta(microseconds=1000)):
t = str(t.microseconds) + "μs"
else:
t = str(1./1000.*t.microseconds) + 'ms'
else:
ms = t.microseconds
t -= datetime.timedelta(microseconds=ms)
t = str(t)
return t
################################################################
def getTimeInfos(r):
step, steptime = r.getLastStep()
start_time = r['start_time']
time_perstep = None
total_time = None
if (step is not None and steptime and start_time):
time_perstep = (steptime-start_time)/(step+1)
total_time = steptime-start_time
time_perstep = formatTimeDelta(time_perstep)
total_time = formatTimeDelta(total_time)
if start_time:
start_time = start_time.strftime("%H:%M %d/%m/%y")
if steptime:
steptime = steptime.strftime("%H:%M %d/%m/%y")
run_infos = [start_time, step, steptime, time_perstep, total_time]
return run_infos
################################################################
def getRunInfos(r, j):
run_infos = []
for col in info_names[:-5]:
key_run = col.replace('%r.', '').strip()
if not key_run == 'start_time':
if key_run in r.entries:
run_infos.append(r[key_run])
else:
key_job = col.replace('%j.', '').strip()
if key_job in j.entries:
run_infos.append(j[key_job])
else:
raise Exception('Key {0} is not a valid parameter'.format(
key_run))
run_infos += getTimeInfos(r)
return run_infos
################################################################
parser = BD.BDParser()
parser.register_params(
group="getRunInfo",
params={"run_id": int, "order": str,
"summary": bool,
"infos": [str]},
defaults={"order": "id"},
help={"run_id": "Select a run_id for complete output",
"summary": "Output a summary of the completeness of the study",
"order": "specify the column which serves to order the lines"})
params = parser.parseBDParameters()
mybase = BD.Base(**params)
if params["summary"] is True:
printSummary(mybase, params)
if ("run_id" in params):
getRunInfo(params["run_id"], mybase)
else:
info_names = getInfoNames()
format_string = getFormatString(info_names)
header = format_string.format(*info_names)
separator = "-" * len(header)
print(separator)
print(header)
print(separator)
runSelector = BD.RunSelector(mybase)
run_list = runSelector.selectRuns(params,
sort_by="runs." + params["order"],
quiet=True)
for r, j in run_list:
try:
infos = getRunInfos(r, j)
def transform_None(x):
if x is None:
return 'None'
else:
return x
infos = [transform_None(x) for x in infos]
line = format_string.format(*infos)
print(line)
except Exception as e:
print(getRunInfos(r, j))
print(e)