Page MenuHomec4science

bdparser.py
No OneTemporary

File Metadata

Created
Sat, Jun 22, 06:03

bdparser.py

#!/usr/bin/env python
from __future__ import print_function
__all__ = [ "BDParser", "RunParser" ]
import BlackDynamite as BD
import sys
import re
import os
import pwd
import base
import argcomplete, argparse
from argcomplete.completers import EnvironCompleter
import traceback
from types import ModuleType
import imp
class BDParser(object):
"""
"""
def debug(self,mesg,mode = 'a'):
if self.debugfname:
with open(self.debugfname, mode) as fh:
print(mesg, file=fh)
def completer(self,prefix,**kwargs):
self.debug("BDparser prefix " + str(prefix) + "\n")
for k, v in kwargs.iteritems():
self.debug("kwargs[" + str(k) + "] = " + str(v) + "\n")
self.debug("dest " + str(vars(kwargs["action"])["dest"]) + "\n")
params = vars(kwargs["parsed_args"])
for k in params.keys():
if params[k] is None:
del params[k]
key = vars(kwargs["action"])["dest"]
self.debug("key " + str(key) + "\n")
if (key == "BDconf"):
return self.listPossibleConf()
if (key == "host"):
return ["localhost","lsmssrv1.epfl.ch"]
if (key == "study"):
if ("password" in params):
del params["password"]
params["should_not_check_study"] = True
if ("host" not in params):
params["host"] = "lsmssrv1.epfl.ch"
try:
mybase = base.Base(**params)
except Exception as e:
self.debug("params " + str(params) + "\n")
self.debug("Exception :" + str(e) + "\n")
self.debug("trace :" + traceback.format_exc())
return ["ConnectionError"]
return mybase.getSchemaList()
return []
def listPossibleConf(self):
files = []
for dir in ["./", os.path.expanduser("~/.blackdynamite")]:
for filename in os.listdir(dir):
fileName, fileExtension = os.path.splitext(filename)
if (fileExtension == ".bd"):
files.append(filename)
# print (files)
return files
def readConfFile(self,read_params,fname):
for dir in ["./",os.path.expanduser("~/.blackdynamite")]:
fullpath = os.path.join(dir,fname)
# print ("searching BDconf " + fname + " in dir " + dir)
if (os.path.isfile(fullpath)):
fname = fullpath
break
# print (fname)
with open(fname) as fh:
lines = [line.strip() for line in fh]
regex = "(.*)=(.*)"
for line in lines:
match = re.match(regex,line)
if (not match):
print("malformed line:" + line)
sys.exit(-1)
param = match.group(1).strip()
val = match.group(2).strip()
if (param in self.admissible_params):
read_params[param] = val
def checkParam(self,p,dico):
print ("****************")
print ("Obsolete: should use the mandatory argument for the declare_params function")
print ("It was used by object " + str(self) + " for keyword " + p)
print ("FATAL => ABORT")
print ("****************")
sys.exit(-1)
def createParamsMap(self,pre_args):
read_params = {}
print (pre_args)
for opt,args in pre_args.iteritems():
if (args is None):
continue
if (not type(args) == list):
args = [args]
for arg in args:
if (arg is None):
continue
if (opt == "BDconf"):
self.readConfFile(read_params,arg)
continue
for param, typ in self.admissible_params.iteritems():
# print ("AAAAAAA " + str(typ) + str(opt) + " " + str(param))
if opt == param:
if (typ == ModuleType):
myscript = arg
paths = []
if ("PYTHONPATH" in os.environ):
paths = os.environ["PYTHONPATH"].split(':')
# print (paths)
if ("module_path" in pre_args):
paths += pre_args["module_path"].split(':')
paths += BD.__path__
paths += [ path + "/coating" for path in BD.__path__ ]
mymod = None
for p in paths:
try:
modfile = os.path.join(p,myscript+".py")
# print ("loading file " + modfile)
mymod = imp.load_source(myscript,modfile)
except Exception as e:
# print (e)
pass
if (mymod is None):
raise Exception("cannot find module '" + myscript + "' from paths " + str(paths))
read_params[param] = mymod
elif (type(typ) == list):
args = arg.split(",")
if (not param in read_params):
read_params[param] = []
if (not len(typ) == 1):
subtype = str
else:
subtype = typ[0]
for i in range(0,len(args)):
read_params[param].append(subtype(args[i]))
else:
read_params[param] = typ(arg)
break
return read_params
def addModulesAdmissibleParameters(self,read_params):
for k,v in read_params.iteritems():
if self.admissible_params[k] == ModuleType:
mymod = read_params[k]
modname = mymod.__name__
if "admissible_params" in mymod.__dict__:
self.admissible_params.update(mymod.__dict__["admissible_params"])
self.group_params["'" + modname + "' module options" ] = mymod.__dict__["admissible_params"].keys()
if "default_params" in mymod.__dict__:
self.default_params.update(mymod.__dict__["default_params"])
if "help" in mymod.__dict__:
self.help.update(mymod.__dict__["help"])
if "mandatory" in mymod.__dict__:
self.mandatory.update(mymod.__dict__["mandatory"])
self.debug("BBBBBB " + str(self.admissible_params)+"\n")
def addModulesAdmissibleParametersForComplete(self,read_params):
self.debug("arg complete ? " + os.environ["_ARGCOMPLETE"]+"\n")
if not "_ARGCOMPLETE" in os.environ:
return
# breaks = os.environ["COMP_WORDBREAKS"]
tmp_read_params = {}
breaks = " |=|&|<|>|;"
self.debug("break line " + os.environ["COMP_LINE"]+"\n")
all_args = re.split(breaks,os.environ["COMP_LINE"])
self.debug("break line " + str(all_args)+"\n")
for i in range(0,len(all_args)):
a = all_args[i]
res = re.match("--(.*)",a)
if res is None:
continue
a = res.group(1)
if a in self.admissible_params:
if self.admissible_params[a] == ModuleType:
if i+1 >= len(all_args):
continue
b = all_args[i+1]
# print (b)
res = re.match("--(.*)",b)
if res is not None:
continue
tmp_read_params[a] = b
if ("module_path" in read_params):
tmp_read_params["module_path"] = read_params["module_path"]
tmp_read_params = self.createParamsMap(tmp_read_params)
self.debug("AAAAAAAAA " + str(tmp_read_params)+"\n")
self.addModulesAdmissibleParameters(tmp_read_params)
self.debug("CCCCCCCCCC" + str(self.admissible_params)+"\n")
def constructArgParser(self,add_help=True,add_mandatory=True):
parser = argparse.ArgumentParser(description = "BlackDynamite option parser",formatter_class=argparse.ArgumentDefaultsHelpFormatter,add_help=add_help)
self.params_group = {}
group = parser.add_argument_group("General")
self.params_group["General"] = group
for g,param_list in self.group_params.iteritems():
group = parser.add_argument_group(g)
for p in param_list:
self.params_group[p] = group
for param, typ in self.admissible_params.iteritems():
p_help = "help TODO"
is_mandatory = (param in self.mandatory.keys() and self.mandatory[param] == True and add_mandatory)
# print (param + ": " + str(is_mandatory) )
if (param in self.help):
p_help = self.help[param]
if (param in self.params_group):
grp = self.params_group[param]
else:
grp = self.params_group["General"]
if (typ is None):
raise Exception("Deprectated option type for " + param + " : should be changed to 'bool'")
if (typ is bool):
if (param in self.default_params and self.default_params[param] == True):
grp.add_argument("--" + param,
help = p_help,
action = 'store_false',
required = is_mandatory)
else:
grp.add_argument("--" + param,
help = p_help,
action = 'store_true',
required = is_mandatory)
elif (typ is list):
grp.add_argument("--" + param,
action = 'append',
help = p_help,
required = is_mandatory).completer = self.completer
else:
grp.add_argument("--" + param,
help = p_help,
required = is_mandatory).completer = self.completer
parser.set_defaults(**self.default_params)
return parser
def register_params(self,group="General",params=None,defaults=None,help=None, mandatory=None):
if (params is not None):
self.admissible_params.update(params)
if group not in self.group_params:
self.group_params[group] = []
self.group_params[group] += params.keys()
if (defaults is not None):
self.default_params.update(defaults)
for key in defaults.keys():
self.mandatory[key] = False
if (help is not None):
self.help.update(help)
if (mandatory is not None):
self.mandatory.update(mandatory)
for param,typ in self.admissible_params.iteritems():
if typ == bool and param not in self.default_params:
self.default_params[param] = False
def parseBDParameters(self, argv = None):
self.debug("program called with " + str(len(sys.argv)) + " args " + str(sys.argv) + "\n")
self.debug("env is\n\n")
for k,v in os.environ.iteritems():
self.debug("export " + k + "='" + v + "'\n")
self.debug("constructArgParser\n")
parser = self.constructArgParser(add_help=False,add_mandatory=False)
self.debug("parse_known_args\n")
pre_args = parser.parse_known_args(args=argv)[0]
self.debug("createParamsMap\n")
read_params = self.createParamsMap(vars(pre_args))
self.debug("addModuleAdmissibleParameters\n")
self.addModulesAdmissibleParameters(read_params)
self.debug("addModulesAdmissibleParametersForComplete\n")
try:
self.addModulesAdmissibleParametersForComplete(read_params)
except Exception as e:
self.debug("trace :" + traceback.format_exc())
self.debug("constructArgParser\n")
parser = self.constructArgParser()
argcomplete.autocomplete(parser)
pre_args = parser.parse_args(args=argv)
read_params = self.createParamsMap(vars(pre_args))
if not "user" in read_params:
# read_params["user"] = os.getlogin()
read_params["user"] = pwd.getpwuid(os.getuid())[0]
return read_params
def __init__ (self):
#self.debugfname = "debug-completion"
self.debugfname = None
self.admissible_params = {}
self.help = {}
self.default_params = {}
self.group_params = {}
self.mandatory = {}
self.admissible_params["study"] = str
self.help["study"] = "Specify the study from the BlackDynamite database. This refers to the schemas in PostgreSQL language"
self.admissible_params["host"] = str
self.help["host"] = "Specify data base server address"
self.admissible_params["port"] = int
self.help["port"] = "Specify data base server port"
self.admissible_params["user"] = str
self.help["user"] = "Specify user name to connect to data base server"
self.admissible_params["password"] = bool
self.help["password"] = "Flag to request prompt for typing password"
self.admissible_params["BDconf"] = str
self.help["BDconf"] = "Path to a BlackDynamite file (*.bd) configuring current optons"
self.admissible_params["truerun"] = bool
self.help["truerun"] = "Set this flag if you want to truly perform the action on base. If not set all action are mainly dryrun"
self.default_params["truerun"] = False
self.admissible_params["job_constraints"] = [str]
self.help["job_constraints"] = "This allows to constraint run selections by job properties"
self.default_params["job_constraints"] = None
self.admissible_params["run_constraints"] = [str]
self.help["run_constraints"] = "This allows to constraint run selections by run properties"
self.default_params["run_constraints"] = None
self.admissible_params["list_parameters"] = bool
self.help["list_parameters"] = "Request to list the possible job/run parameters"
self.group_params["BDParser"] = ["study",
"host",
"port",
"user",
"password",
"BDconf",
"truerun",
"job_constraints",
"run_constraints",
"list_parameters"]
################################################################
def filterParams(sub_list,total_list):
new_list = {}
for p in sub_list:
if (p in total_list and total_list[p] is not False):
new_list[p] = total_list[p]
return new_list
################################################################
class RunParser(BDParser):
"""
"""
def parseBDParameters(self):
params = BDParser.parseBDParameters(self)
params['run_name'], nb_subs = re.subn('\s', '_', params['run_name'])
return params
def __init__ (self):
BDParser.__init__(self)
self.mandatory["machine_name"] = True
self.mandatory["nproc"] = True
self.mandatory["run_name"] = True
self.admissible_params["machine_name"] = str
self.help["machine_name"] = "Specify the name of the machine where the job is to be launched"
self.admissible_params["nproc"] = int
self.help["nproc"] = "Specify the number of processors onto which this run is supposed to be launched"
self.admissible_params["run_name"] = str
self.help["run_name"] = "User friendly name given to this run. This is usually helpful to recall a run kind"
self.default_params = {}
self.default_params["job_constraints"] = None
self.group_params["RunParser"] = [
"machine_name",
"nproc",
"run_name"]

Event Timeline