diff --git a/config/invenio.conf b/config/invenio.conf index 49dcfea9a..5ca727fac 100644 --- a/config/invenio.conf +++ b/config/invenio.conf @@ -1,874 +1,881 @@ ## $Id$ ## This file is part of CDS Invenio. ## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008 CERN. ## ## CDS Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## CDS Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDS Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. ################################################### ## About 'invenio.conf' and 'invenio-local.conf' ## ################################################### ## The 'invenio.conf' file contains the vanilla default configuration ## parameters of a CDS Invenio installation, as coming from the ## distribution. The file should be self-explanatory. Once installed ## in its usual location (usually /opt/cds-invenio/etc), you could in ## principle go ahead and change the values according to your local ## needs. ## ## However, you can also create a file named 'invenio-local.conf' in ## the same directory where 'invenio.conf' lives and put there only ## the localizations you need to have different from the default ones. ## For example: ## ## $ cat /opt/cds-invenio/etc/invenio-local.conf ## [Invenio] ## CFG_SITE_URL = http://your.site.com ## CFG_SITE_SECURE_URL = https://your.site.com ## CFG_SITE_ADMIN_EMAIL = john.doe@your.site.com ## CFG_SITE_SUPPORT_EMAIL = john.doe@your.site.com ## ## The Invenio system will then read both the default invenio.conf ## file and your customized invenio-local.conf file and it will ## override any default options with the ones you have set in your ## local file. This cascading of configuration parameters will ease ## you future upgrades. [Invenio] ################################### ## Part 1: Essential parameters ## ################################### ## This part defines essential CDS Invenio internal parameters that ## everybody should override, like the name of the server or the email ## address of the local CDS Invenio administrator. ## CFG_DATABASE_* - specify which MySQL server to use, the name of the ## database to use, and the database access credentials. CFG_DATABASE_HOST = localhost CFG_DATABASE_PORT = 3306 CFG_DATABASE_NAME = cdsinvenio CFG_DATABASE_USER = cdsinvenio CFG_DATABASE_PASS = my123p$ss ## CFG_SITE_URL - specify URL under which your installation will be ## visible. For example, use "http://your.site.com". Do not leave ## trailing slash. CFG_SITE_URL = http://localhost ## CFG_SITE_SECURE_URL - specify secure URL under which your ## installation secure pages such as login or registration will be ## visible. For example, use "https://your.site.com". Do not leave ## trailing slash. If you don't plan on using HTTPS, then you may ## leave this empty. CFG_SITE_SECURE_URL = https://localhost ## CFG_SITE_NAME -- the visible name of your CDS Invenio installation. CFG_SITE_NAME = Atlantis Institute of Fictive Science ## CFG_SITE_NAME_INTL -- the international versions of CFG_SITE_NAME ## in various languages. (See also CFG_SITE_LANGS below.) CFG_SITE_NAME_INTL_en = Atlantis Institute of Fictive Science CFG_SITE_NAME_INTL_fr = Atlantis Institut des Sciences Fictives CFG_SITE_NAME_INTL_de = Atlantis Institut der fiktiven Wissenschaft CFG_SITE_NAME_INTL_es = Atlantis Instituto de la Ciencia Fictive CFG_SITE_NAME_INTL_ca = Institut Atlantis de Ciència Fictícia CFG_SITE_NAME_INTL_pt = Instituto Atlantis de Ciência Fictícia CFG_SITE_NAME_INTL_it = Atlantis Istituto di Scienza Fittizia CFG_SITE_NAME_INTL_ru = Атлантис Институт фиктивных Наук CFG_SITE_NAME_INTL_sk = Atlantis Inštitút Fiktívnych Vied CFG_SITE_NAME_INTL_cs = Atlantis Institut Fiktivních Věd CFG_SITE_NAME_INTL_no = Atlantis Institutt for Fiktiv Vitenskap CFG_SITE_NAME_INTL_sv = Atlantis Institut för Fiktiv Vetenskap CFG_SITE_NAME_INTL_el = Ινστιτούτο Φανταστικών Επιστημών Ατλαντίδος CFG_SITE_NAME_INTL_uk = Інститут вигаданих наук в Атлантісі CFG_SITE_NAME_INTL_ja = Fictive 科学のAtlantis の協会 CFG_SITE_NAME_INTL_pl = Instytut Fikcyjnej Nauki Atlantis CFG_SITE_NAME_INTL_bg = Институт за фиктивни науки Атлантис CFG_SITE_NAME_INTL_hr = Institut Fiktivnih Znanosti Atlantis CFG_SITE_NAME_INTL_zh_CN = 阿特兰提斯虚拟科学学院 CFG_SITE_NAME_INTL_zh_TW = 阿特蘭提斯虛擬科學學院 CFG_SITE_NAME_INTL_hu = Kitalált Tudományok Atlantiszi Intézete CFG_SITE_NAME_INTL_af = Atlantis Instituut van Fiktiewe Wetenskap CFG_SITE_NAME_INTL_gl = Instituto Atlantis de Ciencia Fictive ## CFG_SITE_LANG -- the default language of the interface: CFG_SITE_LANG = en ## CFG_SITE_LANGS -- list of all languages the user interface should ## be available in, separated by commas. The order specified below ## will be respected on the interface pages. A good default would be ## to use the alphabetical order. Currently supported languages ## include Afrikaans, Bulgarian, Catalan, Czech, German, Greek, ## English, Spanish, French, Croatian, Hungarian, Galician, Italian, ## Japanese, Norwegian, Polish, Portuguese, Russian, Slovak, Swedish, ## Ukrainian, Chinese (China), Chinese (Taiwan), so that the current ## eventual maximum you can currently select is ## "af,bg,ca,cs,de,el,en,es,fr,hr,gl,it,hu,ja,no,pl,pt,ru,sk,sv,uk,zh_CN,zh_TW". CFG_SITE_LANGS = af,bg,ca,cs,de,el,en,es,fr,hr,gl,it,hu,ja,no,pl,pt,ru,sk,sv,uk,zh_CN,zh_TW ## CFG_SITE_SUPPORT_EMAIL -- the email address of the support team for ## this installation: CFG_SITE_SUPPORT_EMAIL = cds.support@cern.ch ## CFG_SITE_ADMIN_EMAIL -- the email address of the 'superuser' for ## this installation. Enter your email address below and login with ## this address when using CDS Invenio administration modules. You ## will then be automatically recognized as superuser of the system. CFG_SITE_ADMIN_EMAIL = cds.support@cern.ch ## CFG_SITE_EMERGENCY_PHONE_NUMBERS -- list of mobile phone numbers to ## which an sms should be sent in case of emergency (e.g. bibsched queue ## has been stopped because of an error). ## Note that in order to use this function, if CFG_CERN_SITE is set to 0, ## the function send_sms in errorlib should be reimplemented. CFG_SITE_EMERGENCY_PHONE_NUMBERS = ## CFG_CERN_SITE -- do we want to enable CERN-specific code? ## Put "1" for "yes" and "0" for "no". CFG_CERN_SITE = 0 ## CFG_INSPIRE_SITE -- do we want to enable INSPIRE-specific code? ## Put "1" for "yes" and "0" for "no". CFG_INSPIRE_SITE = 0 ## CFG_DEVEL_SITE -- is this a development site? If it is, you might ## prefer that it doesn't do certain things. For example, you might ## not want WebSubmit to send certain emails or trigger certain ## processes on a development site. ## Put "1" for "yes" (this is a development site) or "0" for "no" ## (this isn't a development site.) CFG_DEVEL_SITE = 0 ################################ ## Part 2: Web page style ## ################################ ## The variables affecting the page style. The most important one is ## the 'template skin' you would like to use and the obfuscation mode ## for your email addresses. Please refer to the WebStyle Admin Guide ## for more explanation. The other variables are listed here mostly ## for backwards compatibility purposes only. ## CFG_WEBSTYLE_TEMPLATE_SKIN -- what template skin do you want to ## use? CFG_WEBSTYLE_TEMPLATE_SKIN = default ## CFG_WEBSTYLE_EMAIL_ADDRESSES_OBFUSCATION_MODE. How do we "protect" ## email addresses from undesired automated email harvesters? This ## setting will not affect 'support' and 'admin' emails. ## NOTE: there is no ultimate solution to protect against email ## harvesting. All have drawbacks and can more or less be ## circumvented. Choose you preferred mode ([t] means "transparent" ## for the user): ## -1: hide all emails. ## [t] 0 : no protection, email returned as is. ## foo@example.com => foo@example.com ## 1 : basic email munging: replaces @ by [at] and . by [dot] ## foo@example.com => foo [at] example [dot] com ## [t] 2 : transparent name mangling: characters are replaced by ## equivalent HTML entities. ## foo@example.com => foo@example.com ## [t] 3 : javascript insertion. Requires Javascript enabled on client ## side. ## 4 : replaces @ and . characters by gif equivalents. ## foo@example.com => foo [at] example [dot] com CFG_WEBSTYLE_EMAIL_ADDRESSES_OBFUSCATION_MODE = 2 ## (deprecated) CFG_WEBSTYLE_CDSPAGEBOXLEFTTOP -- eventual global HTML ## left top box: CFG_WEBSTYLE_CDSPAGEBOXLEFTTOP = ## (deprecated) CFG_WEBSTYLE_CDSPAGEBOXLEFTBOTTOM -- eventual global ## HTML left bottom box: CFG_WEBSTYLE_CDSPAGEBOXLEFTBOTTOM = ## (deprecated) CFG_WEBSTYLE_CDSPAGEBOXRIGHTTOP -- eventual global ## HTML right top box: CFG_WEBSTYLE_CDSPAGEBOXRIGHTTOP = ## (deprecated) CFG_WEBSTYLE_CDSPAGEBOXRIGHTBOTTOM -- eventual global ## HTML right bottom box: CFG_WEBSTYLE_CDSPAGEBOXRIGHTBOTTOM = ################################## ## Part 3: WebSearch parameters ## ################################## ## This section contains some configuration parameters for WebSearch ## module. Please note that WebSearch is mostly configured on ## run-time via its WebSearch Admin web interface. The parameters ## below are the ones that you do not probably want to modify very ## often during the runtime. (Note that you may modify them ## afterwards too, though.) ## CFG_WEBSEARCH_SEARCH_CACHE_SIZE -- how many queries we want to ## cache in memory per one Apache httpd process? This cache is used ## mainly for "next/previous page" functionality, but it caches also ## "popular" user queries if more than one user happen to search for ## the same thing. Note that large numbers may lead to great memory ## consumption. We recommend a value not greater than 100. CFG_WEBSEARCH_SEARCH_CACHE_SIZE = 100 ## CFG_WEBSEARCH_FIELDS_CONVERT -- if you migrate from an older ## system, you may want to map field codes of your old system (such as ## 'ti') to CDS Invenio/MySQL ("title"). Use Python dictionary syntax ## for the translation table, e.g. {'wau':'author', 'wti':'title'}. ## Usually you don't want to do that, and you would use empty dict {}. CFG_WEBSEARCH_FIELDS_CONVERT = {} ## CFG_WEBSEARCH_LIGHTSEARCH_PATTERN_BOX_WIDTH -- width of the ## search pattern window in the light search interface, in ## characters. CFG_WEBSEARCH_LIGHTSEARCH_PATTERN_BOX_WIDTH = 60 CFG_WEBSEARCH_LIGHTSEARCH_PATTERN_BOX_WIDTH = 60 ## CFG_WEBSEARCH_SIMPLESEARCH_PATTERN_BOX_WIDTH -- width of the search ## pattern window in the simple search interface, in characters. CFG_WEBSEARCH_SIMPLESEARCH_PATTERN_BOX_WIDTH = 40 ## CFG_WEBSEARCH_ADVANCEDSEARCH_PATTERN_BOX_WIDTH -- width of the ## search pattern window in the advanced search interface, in ## characters. CFG_WEBSEARCH_ADVANCEDSEARCH_PATTERN_BOX_WIDTH = 30 ## CFG_WEBSEARCH_NB_RECORDS_TO_SORT -- how many records do we still ## want to sort? For higher numbers we print only a warning and won't ## perform any sorting other than default 'latest records first', as ## sorting would be very time consuming then. We recommend a value of ## not more than a couple of thousands. CFG_WEBSEARCH_NB_RECORDS_TO_SORT = 1000 ## CFG_WEBSEARCH_CALL_BIBFORMAT -- if a record is being displayed but ## it was not preformatted in the "HTML brief" format, do we want to ## call BibFormatting on the fly? Put "1" for "yes" and "0" for "no". ## Note that "1" will display the record exactly as if it were fully ## preformatted, but it may be slow due to on-the-fly processing; "0" ## will display a default format very fast, but it may not have all ## the fields as in the fully preformatted HTML brief format. Note ## also that this option is active only for old (PHP) formats; the new ## (Python) formats are called on the fly by default anyway, since ## they are much faster. When usure, please set "0" here. CFG_WEBSEARCH_CALL_BIBFORMAT = 0 ## CFG_WEBSEARCH_USE_ALEPH_SYSNOS -- do we want to make old SYSNOs ## visible rather than MySQL's record IDs? You may use this if you ## migrate from a different e-doc system, and you store your old ## system numbers into 970__a. Put "1" for "yes" and "0" for ## "no". Usually you don't want to do that, though. CFG_WEBSEARCH_USE_ALEPH_SYSNOS = 0 ## CFG_WEBSEARCH_I18N_LATEST_ADDITIONS -- Put "1" if you want the ## "Latest Additions" in the web collection pages to show ## internationalized records. Useful only if your brief BibFormat ## templates contains internationalized strings. Otherwise put "0" in ## order not to slow down the creation of latest additions by WebColl. CFG_WEBSEARCH_I18N_LATEST_ADDITIONS = 0 ## CFG_WEBSEARCH_INSTANT_BROWSE -- the number of records to display ## under 'Latest Additions' in the web collection pages. CFG_WEBSEARCH_INSTANT_BROWSE = 10 ## CFG_WEBSEARCH_INSTANT_BROWSE_RSS -- the number of records to ## display in the RSS feed. CFG_WEBSEARCH_INSTANT_BROWSE_RSS = 25 ## CFG_WEBSEARCH_RSS_TTL -- number of minutes that indicates how long ## a feed cache is valid. CFG_WEBSEARCH_RSS_TTL = 360 ## CFG_WEBSEARCH_RSS_MAX_CACHED_REQUESTS -- maximum number of request kept ## in cache. If the cache is filled, following request are not cached. CFG_WEBSEARCH_RSS_MAX_CACHED_REQUESTS = 1000 ## CFG_WEBSEARCH_AUTHOR_ET_AL_THRESHOLD -- up to how many author names ## to print explicitely; for more print "et al". Note that this is ## used in default formatting that is seldomly used, as usually ## BibFormat defines all the format. The value below is only used ## when BibFormat fails, for example. CFG_WEBSEARCH_AUTHOR_ET_AL_THRESHOLD = 3 ## CFG_WEBSEARCH_NARROW_SEARCH_SHOW_GRANDSONS -- whether to show or ## not collection grandsons in Narrow Search boxes (sons are shown by ## default, grandsons are configurable here). Use 0 for no and 1 for ## yes. CFG_WEBSEARCH_NARROW_SEARCH_SHOW_GRANDSONS = 1 ## CFG_WEBSEARCH_CREATE_SIMILARLY_NAMED_AUTHORS_LINK_BOX -- shall we ## create help links for Ellis, Nick or Ellis, Nicholas and friends ## when Ellis, N was searched for? Useful if you have one author ## stored in the database under several name formats, namely surname ## comma firstname and surname comma initial cataloging policy. Use 0 ## for no and 1 for yes. CFG_WEBSEARCH_CREATE_SIMILARLY_NAMED_AUTHORS_LINK_BOX = 1 ## CFG_WEBSEARCH_USE_JSMATH_FOR_FORMATS -- jsMath is a JavaScript ## library that renders (La)TeX mathematical formulas in the client ## browser. This parameter must contain a comma-separated list of ## output formats for which to apply the jsMath rendering, for example ## "hb,hd". If the list is empty, jsMath is disabled. CFG_WEBSEARCH_USE_JSMATH_FOR_FORMATS = ## CFG_WEBSEARCH_EXTERNAL_COLLECTION_SEARCH_TIMEOUT -- when searching ## external collections (e.g. SPIRES, CiteSeer, etc), how many seconds ## do we wait for reply before abandonning? CFG_WEBSEARCH_EXTERNAL_COLLECTION_SEARCH_TIMEOUT = 5 ## CFG_WEBSEARCH_EXTERNAL_COLLECTION_SEARCH_MAXRESULTS -- how many ## results do we fetch? CFG_WEBSEARCH_EXTERNAL_COLLECTION_SEARCH_MAXRESULTS = 10 ## CFG_WEBSEARCH_SPLIT_BY_COLLECTION -- do we want to split the search ## results by collection or not? Use 0 for not, 1 for yes. CFG_WEBSEARCH_SPLIT_BY_COLLECTION = 1 ## CFG_WEBSEARCH_MAX_RECORDS_IN_GROUPS -- in order to limit denial of ## service attacks the total number of records per group displayed as a ## result of a search query will be limited to this number. Only the superuser ## queries will not be affected by this limit. CFG_WEBSEARCH_MAX_RECORDS_IN_GROUPS = 200 ####################################### ## Part 4: BibHarvest OAI parameters ## ####################################### ## This part defines parameters for the CDS Invenio OAI gateway. ## Useful if you are running CDS Invenio as OAI data provider. ## CFG_OAI_ID_FIELD -- OAI identifier MARC field: CFG_OAI_ID_FIELD = 909COo ## CFG_OAI_SET_FIELD -- OAI set MARC field: CFG_OAI_SET_FIELD = 909COp ## CFG_OAI_DELETED_POLICY -- OAI deletedrecordspolicy ## (no/transient/persistent). CFG_OAI_DELETED_POLICY = no ## CFG_OAI_ID_PREFIX -- OAI identifier prefix: CFG_OAI_ID_PREFIX = atlantis.cern.ch ## CFG_OAI_SAMPLE_IDENTIFIER -- OAI sample identifier: CFG_OAI_SAMPLE_IDENTIFIER = oai:atlantis.cern.ch:CERN-TH-4036 ## CFG_OAI_IDENTIFY_DESCRIPTION -- description for the OAI Identify verb: CFG_OAI_IDENTIFY_DESCRIPTION = oai atlantis.cern.ch : oai:atlantis.cern.ch:CERN-TH-4036 http://atlantis.cern.ch/ Free and unlimited use by anybody with obligation to refer to original record Full content, i.e. preprints may not be harvested by robots Submission restricted. Submitted documents are subject of approval by OAI repository admins. ## CFG_OAI_LOAD -- OAI number of records in a response: CFG_OAI_LOAD = 1000 ## CFG_OAI_EXPIRE -- OAI resumptionToken expiration time: CFG_OAI_EXPIRE = 90000 ## CFG_OAI_SLEEP -- service unavailable between two consecutive ## requests for CFG_OAI_SLEEP seconds: CFG_OAI_SLEEP = 10 ################################## ## Part 5: WebSubmit parameters ## ################################## ## This section contains some configuration parameters for WebSubmit ## module. Please note that WebSubmit is mostly configured on ## run-time via its WebSubmit Admin web interface. The parameters ## below are the ones that you do not probably want to modify during ## the runtime. ## CFG_WEBSUBMIT_FILESYSTEM_BIBDOC_GROUP_LIMIT -- the fulltext ## documents are stored under "/opt/cds-invenio/var/data/files/gX/Y" ## directories where X is 0,1,... and Y stands for bibdoc ID. Thusly ## documents Y are grouped into directories X and this variable ## indicates the maximum number of documents Y stored in each ## directory X. This limit is imposed solely for filesystem ## performance reasons in order not to have too many subdirectories in ## a given directory. CFG_WEBSUBMIT_FILESYSTEM_BIBDOC_GROUP_LIMIT = 5000 ## CFG_WEBSUBMIT_ADDITIONAL_KNOWN_FILE_EXTENSIONS -- a comma-separated ## list of document extensions not listed in Python standard mimetype ## library that should be recognized by Invenio. CFG_WEBSUBMIT_ADDITIONAL_KNOWN_FILE_EXTENSIONS = hpg,link,lis,llb,mat,mpp,msg,docx,docm,xlsx,xlsm,xlsb,pptx,pptm,ppsx,ppsm ################################# ## Part 6: BibIndex parameters ## ################################# ## This section contains some configuration parameters for BibIndex ## module. Please note that BibIndex is mostly configured on run-time ## via its BibIndex Admin web interface. The parameters below are the ## ones that you do not probably want to modify very often during the ## runtime. ## CFG_BIBINDEX_FULLTEXT_INDEX_LOCAL_FILES_ONLY -- when fulltext indexing, do ## you want to index locally stored files only, or also external URLs? ## Use "0" to say "no" and "1" to say "yes". CFG_BIBINDEX_FULLTEXT_INDEX_LOCAL_FILES_ONLY = 0 ## CFG_BIBINDEX_REMOVE_STOPWORDS -- when indexing, do we want to remove ## stopwords? Use "0" to say "no" and "1" to say "yes". CFG_BIBINDEX_REMOVE_STOPWORDS = 0 ## CFG_BIBINDEX_CHARS_ALPHANUMERIC_SEPARATORS -- characters considered as ## alphanumeric separators of word-blocks inside words. You probably ## don't want to change this. CFG_BIBINDEX_CHARS_ALPHANUMERIC_SEPARATORS = \!\"\#\$\%\&\'\(\)\*\+\,\-\.\/\:\;\<\=\>\?\@\[\\\]\^\_\`\{\|\}\~ ## CFG_BIBINDEX_CHARS_PUNCTUATION -- characters considered as punctuation ## between word-blocks inside words. You probably don't want to ## change this. CFG_BIBINDEX_CHARS_PUNCTUATION = \.\,\:\;\?\!\" ## CFG_BIBINDEX_REMOVE_HTML_MARKUP -- should we attempt to remove HTML markup ## before indexing? Use 1 if you have HTML markup inside metadata ## (e.g. in abstracts), use 0 otherwise. CFG_BIBINDEX_REMOVE_HTML_MARKUP = 0 ## CFG_BIBINDEX_REMOVE_LATEX_MARKUP -- should we attempt to remove LATEX markup ## before indexing? Use 1 if you have LATEX markup inside metadata ## (e.g. in abstracts), use 0 otherwise. CFG_BIBINDEX_REMOVE_LATEX_MARKUP = 0 ## CFG_BIBINDEX_MIN_WORD_LENGTH -- minimum word length allowed to be added to ## index. The terms smaller then this amount will be discarded. ## Useful to keep the database clean, however you can safely leave ## this value on 0 for up to 1,000,000 documents. CFG_BIBINDEX_MIN_WORD_LENGTH = 0 ## CFG_BIBINDEX_URLOPENER_USERNAME and CFG_BIBINDEX_URLOPENER_PASSWORD -- ## access credentials to access restricted URLs, interesting only if ## you are fulltext-indexing files located on a remote server that is ## only available via username/password. But it's probably better to ## handle this case via IP or some convention; the current scheme is ## mostly there for demo only. CFG_BIBINDEX_URLOPENER_USERNAME = mysuperuser CFG_BIBINDEX_URLOPENER_PASSWORD = mysuperpass ## CFG_INTBITSET_ENABLE_SANITY_CHECKS -- ## Enable sanity checks for integers passed to the intbitset data ## structures. It is good to enable this during debugging ## and to disable this value for speed improvements. CFG_INTBITSET_ENABLE_SANITY_CHECKS = False ####################################### ## Part 7: Access control parameters ## ####################################### ## This section contains some configuration parameters for the access ## control system. Please note that WebAccess is mostly configured on ## run-time via its WebAccess Admin web interface. The parameters ## below are the ones that you do not probably want to modify very ## often during the runtime. (If you do want to modify them during ## runtime, for example te deny access temporarily because of backups, ## you can edit access_control_config.py directly, no need to get back ## here and no need to redo the make process.) ## CFG_ACCESS_CONTROL_LEVEL_SITE -- defines how open this site is. ## Use 0 for normal operation of the site, 1 for read-only site (all ## write operations temporarily closed), 2 for site fully closed, ## 3 for also disabling any database connection. ## Useful for site maintenance. CFG_ACCESS_CONTROL_LEVEL_SITE = 0 ## CFG_ACCESS_CONTROL_LEVEL_GUESTS -- guest users access policy. Use ## 0 to allow guest users, 1 not to allow them (all users must login). CFG_ACCESS_CONTROL_LEVEL_GUESTS = 0 ## CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS -- account registration and ## activation policy. When 0, users can register and accounts are ## automatically activated. When 1, users can register but admin must ## activate the accounts. When 2, users cannot register nor update ## their email address, only admin can register accounts. When 3, ## users cannot register nor update email address nor password, only ## admin can register accounts. When 4, the same as 3 applies, nor ## user cannot change his login method. CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS = 0 ## CFG_ACCESS_CONTROL_LIMIT_REGISTRATION_TO_DOMAIN -- limit account ## registration to certain email addresses? If wanted, give domain ## name below, e.g. "cern.ch". If not wanted, leave it empty. CFG_ACCESS_CONTROL_LIMIT_REGISTRATION_TO_DOMAIN = ## CFG_ACCESS_CONTROL_NOTIFY_ADMIN_ABOUT_NEW_ACCOUNTS -- send a ## notification email to the administrator when a new account is ## created? Use 0 for no, 1 for yes. CFG_ACCESS_CONTROL_NOTIFY_ADMIN_ABOUT_NEW_ACCOUNTS = 0 ## CFG_ACCESS_CONTROL_NOTIFY_USER_ABOUT_NEW_ACCOUNT -- send a ## notification email to the user when a new account is created in order to ## to verify the validity of the provided email address? Use ## 0 for no, 1 for yes. CFG_ACCESS_CONTROL_NOTIFY_USER_ABOUT_NEW_ACCOUNT = 1 ## CFG_ACCESS_CONTROL_NOTIFY_USER_ABOUT_ACTIVATION -- send a ## notification email to the user when a new account is activated? ## Use 0 for no, 1 for yes. CFG_ACCESS_CONTROL_NOTIFY_USER_ABOUT_ACTIVATION = 0 ## CFG_ACCESS_CONTROL_NOTIFY_USER_ABOUT_DELETION -- send a ## notification email to the user when a new account is deleted or ## account demand rejected? Use 0 for no, 1 for yes. CFG_ACCESS_CONTROL_NOTIFY_USER_ABOUT_DELETION = 0 ## CFG_APACHE_PASSWORD_FILE -- the file where Apache user credentials ## are stored. Must be an absolute pathname. If the value does not ## start by a slash, it is considered to be the filename of a file ## located under prefix/var/tmp directory. This is useful for the ## demo site testing purposes. For the production site, if you plan ## to restrict access to some collections based on the Apache user ## authentication mechanism, you should put here an absolute path to ## your Apache password file. CFG_APACHE_PASSWORD_FILE = demo-site-apache-user-passwords ## CFG_APACHE_GROUP_FILE -- the file where Apache user groups are ## defined. See the documentation of the preceding config variable. CFG_APACHE_GROUP_FILE = demo-site-apache-user-groups ################################### ## Part 8: WebSession parameters ## ################################### ## This section contains some configuration parameters for tweaking ## session handling. ## CFG_WEBSESSION_EXPIRY_LIMIT_DEFAULT -- number of days after which a session ## and the corresponding cookie is considered expired. CFG_WEBSESSION_EXPIRY_LIMIT_DEFAULT = 2 ## CFG_WEBSESSION_EXPIRY_LIMIT_REMEMBER -- number of days after which a session ## and the corresponding cookie is considered expired, when the user has ## requested to permanently stay logged in. CFG_WEBSESSION_EXPIRY_LIMIT_REMEMBER = 365 ## CFG_WEBSESSION_RESET_PASSWORD_EXPIRE_IN_DAYS -- when user requested ## a password reset, for how many days is the URL valid? CFG_WEBSESSION_RESET_PASSWORD_EXPIRE_IN_DAYS = 3 ## CFG_WEBSESSION_ADDRESS_ACTIVATION_EXPIRE_IN_DAYS -- when an account ## activation email was sent, for how many days is the URL valid? CFG_WEBSESSION_ADDRESS_ACTIVATION_EXPIRE_IN_DAYS = 3 ## CFG_WEBSESSION_NOT_CONFIRMED_EMAIL_ADDRESS_EXPIRE_IN_DAYS -- when ## user won't confirm his email address and not complete ## registeration, after how many days will it expire? CFG_WEBSESSION_NOT_CONFIRMED_EMAIL_ADDRESS_EXPIRE_IN_DAYS = 10 ## CFG_WEBSESSION_DIFFERENTIATE_BETWEEN_GUESTS -- when set to 1, the session ## system allocates the same uid=0 to all guests users regardless of where they ## come from. 0 allocate a unique uid to each guest. CFG_WEBSESSION_DIFFERENTIATE_BETWEEN_GUESTS = 0 ################################ ## Part 9: BibRank parameters ## ################################ ## This section contains some configuration parameters for the ranking ## system. ## CFG_BIBRANK_SHOW_READING_STATS -- do we want to show reading ## similarity stats? ('People who viewed this page also viewed') CFG_BIBRANK_SHOW_READING_STATS = 1 ## CFG_BIBRANK_SHOW_DOWNLOAD_STATS -- do we want to show the download ## similarity stats? ('People who downloaded this document also ## downloaded') CFG_BIBRANK_SHOW_DOWNLOAD_STATS = 1 ## CFG_BIBRANK_SHOW_DOWNLOAD_GRAPHS -- do we want to show download ## history graph? CFG_BIBRANK_SHOW_DOWNLOAD_GRAPHS = 1 ## CFG_BIBRANK_SHOW_DOWNLOAD_GRAPHS_CLIENT_IP_DISTRIBUTION -- do we ## want to show a graph representing the distribution of client IPs ## downloading given document? CFG_BIBRANK_SHOW_DOWNLOAD_GRAPHS_CLIENT_IP_DISTRIBUTION = 0 ## CFG_BIBRANK_SHOW_CITATION_LINKS -- do we want to show the 'Cited ## by' links? (useful only when you have citations in the metadata) CFG_BIBRANK_SHOW_CITATION_LINKS = 1 ## CFG_BIBRANK_SHOW_CITATION_STATS -- de we want to show citation ## stats? ('Cited by M recors', 'Co-cited with N records') CFG_BIBRANK_SHOW_CITATION_STATS = 1 ## CFG_BIBRANK_SHOW_CITATION_GRAPHS -- do we want to show citation ## history graph? CFG_BIBRANK_SHOW_CITATION_GRAPHS = 1 #################################### ## Part 10: WebComment parameters ## #################################### ## This section contains some configuration parameters for the ## commenting and reviewing facilities. ## CFG_WEBCOMMENT_ALLOW_COMMENTS -- do we want to allow users write ## public comments on records? CFG_WEBCOMMENT_ALLOW_COMMENTS = 1 ## CFG_WEBCOMMENT_ALLOW_REVIEWS -- do we want to allow users write ## public reviews of records? CFG_WEBCOMMENT_ALLOW_REVIEWS = 1 ## CFG_WEBCOMMENT_ALLOW_SHORT_REVIEWS -- do we want to allow short ## reviews, that is just the attribution of stars without submitting ## detailed review text? CFG_WEBCOMMENT_ALLOW_SHORT_REVIEWS = 0 ## CFG_WEBCOMMENT_NB_REPORTS_BEFORE_SEND_EMAIL_TO_ADMIN -- if users ## report a comment to be abusive, how many they have to be before the ## site admin is alerted? CFG_WEBCOMMENT_NB_REPORTS_BEFORE_SEND_EMAIL_TO_ADMIN = 5 ## CFG_WEBCOMMENT_NB_COMMENTS_IN_DETAILED_VIEW -- how many comments do ## we display in the detailed record page upon welcome? CFG_WEBCOMMENT_NB_COMMENTS_IN_DETAILED_VIEW = 1 ## CFG_WEBCOMMENT_NB_REVIEWS_IN_DETAILED_VIEW -- how many reviews do ## we display in the detailed record page upon welcome? CFG_WEBCOMMENT_NB_REVIEWS_IN_DETAILED_VIEW = 1 ## CFG_WEBCOMMENT_ADMIN_NOTIFICATION_LEVEL -- do we notify the site ## admin after every comment? CFG_WEBCOMMENT_ADMIN_NOTIFICATION_LEVEL = 1 ## CFG_WEBCOMMENT_TIMELIMIT_PROCESSING_COMMENTS_IN_SECONDS -- how many ## elapsed seconds do we consider enough when checking for possible ## multiple comment submissions by a user? CFG_WEBCOMMENT_TIMELIMIT_PROCESSING_COMMENTS_IN_SECONDS = 20 ## CFG_WEBCOMMENT_TIMELIMIT_PROCESSING_REVIEWS_IN_SECONDS -- how many ## elapsed seconds do we consider enough when checking for possible ## multiple review submissions by a user? CFG_WEBCOMMENT_TIMELIMIT_PROCESSING_REVIEWS_IN_SECONDS = 20 ## CFG_WEBCOMMENT_USE_RICH_EDITOR -- enable the WYSIWYG ## Javascript-based editor when user edits comments? CFG_WEBCOMMENT_USE_RICH_TEXT_EDITOR = False ################################## ## Part 11: BibSched parameters ## ################################## ## This section contains some configuration parameters for the ## bibliographic task scheduler. ## CFG_BIBSCHED_REFRESHTIME -- how often do we want to refresh ## bibsched monitor? (in seconds) CFG_BIBSCHED_REFRESHTIME = 5 ## CFG_BIBSCHED_LOG_PAGER -- what pager to use to view bibsched task ## logs? CFG_BIBSCHED_LOG_PAGER = /bin/more ## CFG_BIBSCHED_GC_TASKS_OLDER_THAN -- after how many days to perform the ## gargbage collector of BibSched queue (i.e. removing/moving task to archive). CFG_BIBSCHED_GC_TASKS_OLDER_THAN = 30 ## CFG_BIBSCHED_GC_TASKS_TO_REMOVE -- list of BibTask that can be safely ## removed from the BibSched queue once they are DONE. CFG_BIBSCHED_GC_TASKS_TO_REMOVE = bibindex,bibreformat,webcoll,bibrank,inveniogc ## CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE -- list of BibTasks that should be safely ## archived out of the BibSched queue once they are DONE. CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE = bibupload,oaiarchive ## CFG_BIBSCHED_MAX_NUMBER_CONCURRENT_TASKS -- maximum number of BibTasks ## that can run concurrently. ## NOTE: concurrent tasks are still considered as an experimental ## feature. Please keep this value set to 1 on production environments. CFG_BIBSCHED_MAX_NUMBER_CONCURRENT_TASKS = 1 +## CFG_BIBSCHED_PROCESS_USER -- bibsched and bibtasks processes must run with +## the credential of the user that should able to create or modify files in a +## compatible way with the user owning the Apache web server processes. +## If this variable is empty, the Apache user will be auto-discovered and +## enforced to be the bibsched/bibtasks proccesses owner +CFG_BIBSCHED_PROCESS_USER = + ################################### ## Part 12: WebBasket parameters ## ################################### ## CFG_WEBBASKET_MAX_NUMBER_OF_DISPLAYED_BASKETS -- a safety limit for ## a maximum number of displayed baskets CFG_WEBBASKET_MAX_NUMBER_OF_DISPLAYED_BASKETS = 20 ## CFG_WEBBASKET_USE_RICH_TEXT_EDITOR -- enable the WYSIWYG ## Javascript-based editor when user edits comments in WebBasket? CFG_WEBBASKET_USE_RICH_TEXT_EDITOR = False ################################## ## Part 13: WebAlert parameters ## ################################## ## This section contains some configuration parameters for the ## automatic email notification alert system. ## CFG_WEBALERT_ALERT_ENGINE_EMAIL -- the email address from which the ## alert emails will appear to be sent: CFG_WEBALERT_ALERT_ENGINE_EMAIL = cds.alert@cdsdev.cern.ch ## CFG_WEBALERT_MAX_NUM_OF_RECORDS_IN_ALERT_EMAIL -- how many records ## at most do we send in an outgoing alert email? CFG_WEBALERT_MAX_NUM_OF_RECORDS_IN_ALERT_EMAIL = 20 ## CFG_WEBALERT_MAX_NUM_OF_CHARS_PER_LINE_IN_ALERT_EMAIL -- number of ## chars per line in an outgoing alert email? CFG_WEBALERT_MAX_NUM_OF_CHARS_PER_LINE_IN_ALERT_EMAIL = 72 ## CFG_WEBALERT_SEND_EMAIL_NUMBER_OF_TRIES -- when sending alert ## emails fails, how many times we retry? CFG_WEBALERT_SEND_EMAIL_NUMBER_OF_TRIES = 3 ## CFG_WEBALERT_SEND_EMAIL_SLEEPTIME_BETWEEN_TRIES -- when sending ## alert emails fails, what is the sleeptime between tries? (in ## seconds) CFG_WEBALERT_SEND_EMAIL_SLEEPTIME_BETWEEN_TRIES = 300 #################################### ## Part 14: WebMessage parameters ## #################################### ## CFG_WEBMESSAGE_MAX_SIZE_OF_MESSAGE -- how large web messages do we ## allow? CFG_WEBMESSAGE_MAX_SIZE_OF_MESSAGE = 20000 ## CFG_WEBMESSAGE_MAX_NB_OF_MESSAGES -- how many messages for a ## regular user do we allow in its inbox? CFG_WEBMESSAGE_MAX_NB_OF_MESSAGES = 30 ## CFG_WEBMESSAGE_DAYS_BEFORE_DELETE_ORPHANS -- how many days before ## we delete orphaned messages? CFG_WEBMESSAGE_DAYS_BEFORE_DELETE_ORPHANS = 60 ################################## ## Part 15: MiscUtil parameters ## ################################## ## CFG_MISCUTIL_SQL_MAX_CACHED_QUERIES -- maximum number of cached SQL ## queries possible. After reaching this number the cache is pruned ## by deleting half of the older queries. CFG_MISCUTIL_SQL_MAX_CACHED_QUERIES = 10000 ## CFG_MISCUTIL_SQL_USE_SQLALCHEMY -- whether to use SQLAlchemy.pool ## in the DB engine of CDS Invenio. It is okay to enable this flag ## even if you have not installed SQLAlchemy. Note that Invenio will ## loose some perfomance if this option is enabled. CFG_MISCUTIL_SQL_USE_SQLALCHEMY = False ## CFG_MISCUTIL_SMTP_HOST -- which server to use as outgoing mail server to ## send outgoing emails generated by the system, for example concerning ## submissions or email notification alerts. CFG_MISCUTIL_SMTP_HOST = localhost ## CFG_MISCUTIL_SMTP_PORT -- which port to use on the outgoing mail server ## defined in the previous step. CFG_MISCUTIL_SMTP_PORT = 25 ################################# ## Part 16: BibEdit parameters ## ################################# ## CFG_BIBEDIT_TIMEOUT -- when a user edits a record, this record is ## locked to prevent other users to edit it at the same time. After ## how many seconds the locked record will be again free for other ## people to edit? CFG_BIBEDIT_TIMEOUT = 3600 ## CFG_BIBEDIT_LOCKLEVEL -- when a user tries to edit a record being edited by ## another user, the lock level determines when it is permitted to do so. ## Level 0 - permits editing if there are no recent edit sessions in tmp directory ## (unsafe, use only if you know what you are doing) ## Level 1 - permits editing if there are no queued bibedit tasks for this record ## (safe with respect to bibedit, but not for other bibupload maintenance jobs) ## Level 2 - permits editing if there are no queued bibupload tasks of any sort ## (safe, but may lock more than necessary if many cataloguers around) ## Level 3 - permits editing if no queued bibupload task concerns given record ## (safe, most precise locking, but slow, ## checks for 001/EXTERNAL_SYSNO_TAG/EXTERNAL_OAIID_TAG) ## The recommended level is 3 (default) or 2 (if you use maintenance jobs often). CFG_BIBEDIT_LOCKLEVEL = 3 ################################### ## Part 17: BibUpload parameters ## ################################### ## CFG_BIBUPLOAD_REFERENCE_TAG -- where do we store references? CFG_BIBUPLOAD_REFERENCE_TAG = 999 ## CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG -- where do we store external ## system numbers? Useful for matching when our records come from an ## external digital library system. CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG = 970__a ## CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG -- where do we store OAI ID tags ## of harvested records? Useful for matching when we harvest stuff ## via OAI that we do not want to reexport via Invenio OAI; so records ## may have only the source OAI ID stored in this tag (kind of like ## external system number too). CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG = 035__a ## CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG -- where do we store OAI SRC ## tags of harvested records? Useful for matching when we harvest stuff ## via OAI that we do not want to reexport via Invenio OAI; so records ## may have only the source OAI SRC stored in this tag (kind of like ## external system number too). Note that the field should be the same of ## CFG_BIBUPLOAD_EXTERNAL_OAIID_TAG. CFG_BIBUPLOAD_EXTERNAL_OAIID_PROVENANCE_TAG = 035__9 ## CFG_BIBUPLOAD_STRONG_TAGS -- a comma-separated list of tags that ## are strong enough to resist the replace mode. Useful for tags that ## might be created from an external non-metadata-like source, ## e.g. the information about the number of copies left. CFG_BIBUPLOAD_STRONG_TAGS = 964 ## CFG_BIBUPLOAD_FFT_ALLOWED_LOCAL_PATHS -- a comma-separated list of system ## paths from which it is allowed to take fulltextes that will be uploaded via ## FFT (CFG_TMPDIR is included by default). CFG_BIBUPLOAD_FFT_ALLOWED_LOCAL_PATHS = /tmp,/home ########################## ## THAT's ALL, FOLKS! ## ########################## diff --git a/modules/bibsched/lib/bibsched.py b/modules/bibsched/lib/bibsched.py index e35bb8129..347b2159d 100644 --- a/modules/bibsched/lib/bibsched.py +++ b/modules/bibsched/lib/bibsched.py @@ -1,1272 +1,1275 @@ # -*- coding: utf-8 -*- ## ## $Id$ ## ## This file is part of CDS Invenio. ## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008 CERN. ## ## CDS Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## CDS Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDS Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. """BibSched - task management, scheduling and executing system for CDS Invenio """ __revision__ = "$Id$" import os import string import sys import time import re import marshal import getopt import copy from socket import gethostname import signal from invenio.bibtask_config import CFG_BIBTASK_VALID_TASKS from invenio.config import \ CFG_PREFIX, \ CFG_BIBSCHED_REFRESHTIME, \ CFG_BIBSCHED_LOG_PAGER, \ CFG_BINDIR, \ CFG_LOGDIR, \ CFG_BIBSCHED_GC_TASKS_OLDER_THAN, \ CFG_BIBSCHED_GC_TASKS_TO_REMOVE, \ CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE, \ CFG_BIBSCHED_MAX_NUMBER_CONCURRENT_TASKS, \ CFG_SITE_URL from invenio.dbquery import run_sql, escape_string from invenio.textutils import wrap_text_in_a_box from invenio.errorlib import register_exception, register_emergency CFG_VALID_STATUS = ('WAITING', 'SCHEDULED', 'RUNNING', 'CONTINUING', 'DELETED %', 'ABOUT TO STOP', 'ABOUT TO SLEEP', 'STOPPED', 'SLEEPING', 'KILLED') shift_re = re.compile("([-\+]{0,1})([\d]+)([dhms])") def get_datetime(var, format_string="%Y-%m-%d %H:%M:%S"): """Returns a date string according to the format string. It can handle normal date strings and shifts with respect to now.""" try: date = time.time() factors = {"d":24*3600, "h":3600, "m":60, "s":1} m = shift_re.match(var) if m: sign = m.groups()[0] == "-" and -1 or 1 factor = factors[m.groups()[2]] value = float(m.groups()[1]) date = time.localtime(date + sign * factor * value) date = time.strftime(format_string, date) else: date = time.strptime(var, format_string) date = time.strftime(format_string, date) return date except: return None def get_my_pid(process, args=''): if sys.platform.startswith('freebsd'): COMMAND = "ps -o pid,args | grep '%s %s' | grep -v 'grep' | sed -n 1p" % (process, args) else: COMMAND = "ps -C %s o '%%p%%a' | grep '%s %s' | grep -v 'grep' | sed -n 1p" % (process, process, args) answer = string.strip(os.popen(COMMAND).read()) if answer == '': answer = 0 else: answer = answer[:string.find(answer,' ')] return int(answer) def get_task_pid(task_name, task_id, ignore_error=False): """Return the pid of task_name/task_id""" try: pid = int(open(os.path.join(CFG_PREFIX, 'var', 'run', 'bibsched_task_%d.pid' % task_id)).read()) os.kill(pid, signal.SIGUSR2) return pid except (OSError, IOError): if ignore_error: return None register_exception() return get_my_pid(task_name, str(task_id)) def get_output_channelnames(task_id): "Construct and return filename for stdout and stderr for the task 'task_id'." filenamebase = "%s/bibsched_task_%d" % (CFG_LOGDIR, task_id) return [filenamebase + ".log", filenamebase + ".err"] def is_task_scheduled(task_name): """Check if a certain task_name is due for execution (WAITING or RUNNING)""" sql = "SELECT COUNT(proc) FROM schTASK WHERE proc = %s AND (status='WAITING' OR status='RUNNING')" return run_sql(sql, (task_name,))[0][0] > 0 def get_task_ids_by_descending_date(task_name, statuses=['SCHEDULED']): """Returns list of task ids, ordered by descending runtime.""" sql = "SELECT id FROM schTASK WHERE proc=%s AND (" + \ " OR ".join(["status = '%s'" % x for x in statuses]) + ") ORDER BY runtime DESC" return [x[0] for x in run_sql(sql, (task_name,))] def get_task_options(task_id): """Returns options for task_id read from the BibSched task queue table.""" res = run_sql("SELECT arguments FROM schTASK WHERE id=%s", (task_id,)) try: return marshal.loads(res[0][0]) except IndexError: return list() def gc_tasks(verbose=False, statuses=None, since=None, tasks=None): """Garbage collect the task queue.""" if tasks is None: tasks = CFG_BIBSCHED_GC_TASKS_TO_REMOVE + CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE if since is None: since = '-%id' % CFG_BIBSCHED_GC_TASKS_OLDER_THAN if statuses is None: statuses = ['DONE'] statuses = [status.upper() for status in statuses if status.upper() != 'RUNNING'] date = get_datetime(since) status_query = 'status in (%s)' % ','.join([repr(escape_string(status)) for status in statuses]) for task in tasks: if task in CFG_BIBSCHED_GC_TASKS_TO_REMOVE: res = run_sql("""DELETE FROM schTASK WHERE proc=%%s AND %s AND runtime<%%s""" % status_query, (task, date)) write_message('Deleted %s %s tasks (created before %s) with %s' % (res, task, date, status_query)) elif task in CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE: run_sql("""INSERT INTO hstTASK(id,proc,host,user, runtime,sleeptime,arguments,status,progress) SELECT id,proc,host,user, runtime,sleeptime,arguments,status,progress FROM schTASK WHERE proc=%%s AND %s AND runtime<%%s""" % status_query, (task, date)) res = run_sql("""DELETE FROM schTASK WHERE proc=%%s AND %s AND runtime<%%s""" % status_query, (task, date)) write_message('Archived %s %s tasks (created before %s) with %s' % (res, task, date, status_query)) def bibsched_set_status(task_id, status): """Update the status of task_id.""" return run_sql("UPDATE schTASK SET status=%s WHERE id=%s", (status, task_id)) def bibsched_set_progress(task_id, progress): """Update the progress of task_id.""" return run_sql("UPDATE schTASK SET progress=%s WHERE id=%s", (progress, task_id)) def bibsched_set_priority(task_id, priority): """Update the priority of task_id.""" return run_sql("UPDATE schTASK SET priority=%s WHERE id=%s", (priority, task_id)) def bibsched_send_signal(proc, task_id, signal): """Send a signal to a given task.""" try: pid = get_task_pid(proc, task_id, True) os.kill(pid, signal) except OSError: pass except KeyError: register_exception() class Manager: def __init__(self, old_stdout): import curses import curses.panel from curses.wrapper import wrapper self.old_stdout = old_stdout self.curses = curses self.helper_modules = CFG_BIBTASK_VALID_TASKS self.running = 1 #self.footer_move_mode = "[KeyUp/KeyDown Move] [M Select mode] [Q Quit]" self.footer_auto_mode = "Automatic Mode [A Manual] [1/2/3 Display] [P Purge] [l/L Log] [O Opts] [Q Quit]" self.footer_select_mode = "Manual Mode [A Automatic] [1/2/3 Display Type] [P Purge] [l/L Log] [O Opts] [Q Quit]" self.footer_waiting_item = "[R Run] [D Delete] [N Priority]" self.footer_running_item = "[S Sleep] [T Stop] [K Kill]" self.footer_stopped_item = "[I Initialise] [D Delete] [K Acknowledge]" self.footer_sleeping_item = "[W Wake Up]" self.item_status = "" self.selected_line = 2 self.rows = [] self.panel = None self.display = 2 self.first_visible_line = 0 #self.move_mode = 0 self.auto_mode = 0 self.currentrow = None self.current_attr = 0 wrapper(self.start) def handle_keys(self, chr): if chr == -1: return if self.auto_mode and (chr not in (self.curses.KEY_UP, self.curses.KEY_DOWN, self.curses.KEY_PPAGE, self.curses.KEY_NPAGE, ord("q"), ord("Q"), ord("a"), ord("A"), ord("1"), ord("2"), ord("3"), ord("p"), ord("P"), ord("o"), ord("O"), ord("l"), ord("L"))): self.display_in_footer("in automatic mode") self.stdscr.refresh() #elif self.move_mode and (chr not in (self.curses.KEY_UP, #self.curses.KEY_DOWN, #ord("m"), ord("M"), ord("q"), #ord("Q"))): #self.display_in_footer("in move mode") #self.stdscr.refresh() else: status = self.currentrow and self.currentrow[5] or None if chr == self.curses.KEY_UP: #if self.move_mode: #self.move_up() #else: self.selected_line = max(self.selected_line - 1, 2) self.repaint() if chr == self.curses.KEY_PPAGE: self.selected_line = max(self.selected_line - 10, 2) self.repaint() elif chr == self.curses.KEY_DOWN: #if self.move_mode: #self.move_down() #else: self.selected_line = min(self.selected_line + 1, len(self.rows) + 1 ) self.repaint() elif chr == self.curses.KEY_NPAGE: self.selected_line = min(self.selected_line + 10, len(self.rows) + 1 ) self.repaint() elif chr == self.curses.KEY_HOME: self.first_visible_line = 0 self.selected_line = 2 elif chr in (ord("a"), ord("A")): self.change_auto_mode() elif chr == ord("l"): self.openlog() elif chr == ord("L"): self.openlog(err=True) elif chr in (ord("w"), ord("W")): self.wakeup() elif chr in (ord("n"), ord("N")): self.change_priority() elif chr in (ord("r"), ord("R")): if status in ('WAITING', 'SCHEDULED'): self.run() elif chr in (ord("s"), ord("S")): self.sleep() elif chr in (ord("k"), ord("K")): if status in ('ERROR', 'DONE WITH ERRORS'): self.acknowledge() elif status is not None: self.kill() elif chr in (ord("t"), ord("T")): self.stop() elif chr in (ord("d"), ord("D")): self.delete() elif chr in (ord("i"), ord("I")): self.init() #elif chr in (ord("m"), ord("M")): #self.change_select_mode() elif chr in (ord("p"), ord("P")): self.purge_done() elif chr in (ord("o"), ord("O")): self.display_task_options() elif chr == ord("1"): self.display = 1 self.first_visible_line = 0 self.selected_line = 2 self.display_in_footer("only done processes are displayed") elif chr == ord("2"): self.display = 2 self.first_visible_line = 0 self.selected_line = 2 self.display_in_footer("only not done processes are displayed") elif chr == ord("3"): self.display = 3 self.first_visible_line = 0 self.selected_line = 2 self.display_in_footer("only archived processes are displayed") elif chr in (ord("q"), ord("Q")): if self.curses.panel.top_panel() == self.panel: self.panel.bottom() self.curses.panel.update_panels() else: self.running = 0 return def openlog(self, err=False): task_id = self.currentrow[0] status = self.currentrow[5] if err: logname = os.path.join(CFG_LOGDIR, 'bibsched_task_%d.err' % task_id) else: logname = os.path.join(CFG_LOGDIR, 'bibsched_task_%d.log' % task_id) if os.path.exists(logname): pager = CFG_BIBSCHED_LOG_PAGER or os.environ.get('PAGER', '/bin/more') if os.path.exists(pager): self.curses.endwin() os.system('%s %s' % (pager, logname)) print >> self.old_stdout, "\rPress ENTER to continue", self.old_stdout.flush() raw_input() self.curses.panel.update_panels() def display_task_options(self): """Nicely display information about current process.""" msg = ' id: %i\n\n' % self.currentrow[0] pid = get_task_pid(self.currentrow[1], self.currentrow[0], True) if pid is not None: msg += ' pid: %s\n\n' % pid msg += ' priority: %s\n\n' % self.currentrow[8] msg += ' proc: %s\n\n' % self.currentrow[1] msg += ' user: %s\n\n' % self.currentrow[2] msg += ' runtime: %s\n\n' % self.currentrow[3].strftime("%Y-%m-%d %H:%M:%S") msg += ' sleeptime: %s\n\n' % self.currentrow[4] msg += ' status: %s\n\n' % self.currentrow[5] msg += ' progress: %s\n\n' % self.currentrow[6] arguments = marshal.loads(self.currentrow[7]) if type(arguments) is dict: # FIXME: REMOVE AFTER MAJOR RELEASE 1.0 msg += ' options : %s\n\n' % arguments else: msg += 'executable : %s\n\n' % arguments[0] msg += ' arguments : %s\n\n' % ' '.join(arguments[1:]) msg += '\n\nPress a key to continue...' msg = wrap_text_in_a_box(msg, style='no_border') rows = msg.split('\n') height = len(rows) + 2 width = max([len(row) for row in rows]) + 4 try: self.win = self.curses.newwin( height, width, (self.height - height) / 2 + 1, (self.width - width) / 2 + 1 ) except self.curses.error: return self.panel = self.curses.panel.new_panel(self.win) self.panel.top() self.win.border() i = 1 for row in rows: self.win.addstr(i, 2, row, self.current_attr) i += 1 self.win.refresh() self.win.getch() def count_processes(self, status): out = 0 res = run_sql("SELECT COUNT(id) FROM schTASK WHERE status=%s GROUP BY status", (status,)) try: out = res[0][0] except: pass return out def change_priority(self): task_id = self.currentrow[0] priority = self.currentrow[8] new_priority = self._display_ask_number_box("Insert the desired priority for task %s. The smaller the number the less the priority. Note that a negative number will mean to always postpone the task while a number bigger than 10 will mean some tasks with less priority could be stopped in order to let this task run. The current priority is %s. New value:" % (task_id, priority)) try: new_priority = int(new_priority) except ValueError: return bibsched_set_priority(task_id, new_priority) def wakeup(self): task_id = self.currentrow[0] process = self.currentrow[1] status = self.currentrow[5] #if self.count_processes('RUNNING') + self.count_processes('CONTINUING') >= 1: #self.display_in_footer("a process is already running!") if status == "SLEEPING": bibsched_send_signal(process, task_id, signal.SIGCONT) self.display_in_footer("process woken up") else: self.display_in_footer("process is not sleeping") self.stdscr.refresh() def _display_YN_box(self, msg): """Utility to display confirmation boxes.""" msg += ' (Y/N)' msg = wrap_text_in_a_box(msg, style='no_border') rows = msg.split('\n') height = len(rows) + 2 width = max([len(row) for row in rows]) + 4 self.win = self.curses.newwin( height, width, (self.height - height) / 2 + 1, (self.width - width) / 2 + 1 ) self.panel = self.curses.panel.new_panel( self.win ) self.panel.top() self.win.border() i = 1 for row in rows: self.win.addstr(i, 2, row, self.current_attr) i += 1 self.win.refresh() while 1: c = self.win.getch() if c in (ord('y'), ord('Y')): self.curses.panel.update_panels() return True elif c in (ord('n'), ord('N')): self.curses.panel.update_panels() return False def _display_ask_number_box(self, msg): """Utility to display confirmation boxes.""" msg = wrap_text_in_a_box(msg, style='no_border') rows = msg.split('\n') height = len(rows) + 3 width = max([len(row) for row in rows]) + 4 self.win = self.curses.newwin( height, width, (self.height - height) / 2 + 1, (self.width - width) / 2 + 1 ) self.panel = self.curses.panel.new_panel( self.win ) self.panel.top() self.win.border() i = 1 for row in rows: self.win.addstr(i, 2, row, self.current_attr) i += 1 self.win.refresh() self.win.move(height - 2, 2) self.curses.echo() ret = self.win.getstr() self.curses.noecho() return ret def purge_done(self): """Garbage collector.""" if self._display_YN_box("You are going to purge the list of DONE tasks.\n\n" "%s tasks, submitted since %s days, will be archived.\n\n" "%s tasks, submitted since %s days, will be deleted.\n\n" "Are you sure?" % ( ','.join(CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE), CFG_BIBSCHED_GC_TASKS_OLDER_THAN, ','.join(CFG_BIBSCHED_GC_TASKS_TO_REMOVE), CFG_BIBSCHED_GC_TASKS_OLDER_THAN)): gc_tasks() self.display_in_footer("DONE processes purged") def run(self): task_id = self.currentrow[0] process = self.currentrow[1].split(':')[0] status = self.currentrow[5] #if self.count_processes('RUNNING') + self.count_processes('CONTINUING') >= 1: #self.display_in_footer("a process is already running!") if status in ("SCHEDULED", "WAITING"): if process in self.helper_modules: program = os.path.join(CFG_BINDIR, process) fdout, fderr = get_output_channelnames(task_id) COMMAND = "%s %s >> %s 2>> %s &" % (program, str(task_id), fdout, fderr) os.system(COMMAND) Log("manually running task #%d (%s)" % (task_id, process)) else: self.display_in_footer("Process %s is not in the list of allowed processes." % process) else: self.display_in_footer("Process status should be SCHEDULED or WAITING!") def acknowledge(self): task_id = self.currentrow[0] status = self.currentrow[5] if status in ('ERROR', 'DONE WITH ERRORS'): bibsched_set_status(task_id, 'ACK ' + status) self.display_in_footer("Acknowledged error") def sleep(self): task_id = self.currentrow[0] process = self.currentrow[1].split(':')[0] status = self.currentrow[5] if status in ('RUNNING', 'CONTINUING'): bibsched_send_signal(process, task_id, signal.SIGUSR1) self.display_in_footer("SLEEP signal sent to task #%s" % task_id) else: self.display_in_footer("Cannot put to sleep non-running processes") def kill(self): task_id = self.currentrow[0] process = self.currentrow[1] status = self.currentrow[5] if status in ('RUNNING', 'CONTINUING', 'ABOUT TO STOP', 'ABOUT TO SLEEP'): if self._display_YN_box("Are you sure you want to kill the %s process %s?" % (process, task_id)): bibsched_send_signal(process, task_id, signal.SIGKILL) bibsched_set_status(task_id, 'KILLED') self.display_in_footer("KILL signal sent to task #%s" % task_id) else: self.display_in_footer("Cannot kill non-running processes") def stop(self): task_id = self.currentrow[0] process = self.currentrow[1] status = self.currentrow[5] if status in ('RUNNING', 'CONTINUING'): bibsched_send_signal(process, task_id, signal.SIGTERM) self.display_in_footer("TERM signal sent to task #%s" % task_id) else: self.display_in_footer("Cannot stop non-running processes") def delete(self): task_id = self.currentrow[0] status = self.currentrow[5] if status not in ('RUNNING', 'CONTINUING', 'SLEEPING', 'SCHEDULED', 'ABOUT TO STOP', 'ABOUT TO SLEEP'): bibsched_set_status(task_id, "%s_DELETED" % status) self.display_in_footer("process deleted") self.selected_line = max(self.selected_line, 2) else: self.display_in_footer("Cannot delete running processes") def init(self): task_id = self.currentrow[0] status = self.currentrow[5] if status not in ('RUNNING', 'CONTINUING', 'SLEEPING'): bibsched_set_status(task_id, "WAITING") bibsched_set_progress(task_id, "") self.display_in_footer("process initialised") else: self.display_in_footer("Cannot initialise running processes") #def change_select_mode(self): #if self.move_mode: #self.move_mode = 0 #else: #status = self.currentrow[5] #if status in ("RUNNING" , "CONTINUING" , "SLEEPING"): #self.display_in_footer("cannot move running processes!") #else: #self.move_mode = 1 #self.stdscr.refresh() def change_auto_mode(self): if self.auto_mode: program = os.path.join(CFG_BINDIR, "bibsched") COMMAND = "%s -q stop" % program os.system(COMMAND) self.auto_mode = 0 else: program = os.path.join( CFG_BINDIR, "bibsched") COMMAND = "%s -q start" % program os.system(COMMAND) self.auto_mode = 1 self.move_mode = 0 self.stdscr.refresh() #def move_up(self): #self.display_in_footer("not implemented yet") #self.stdscr.refresh() #def move_down(self): #self.display_in_footer("not implemented yet") #self.stdscr.refresh() def put_line(self, row, header=False): col_w = [7 , 15, 10, 21, 7, 11, 25] maxx = self.width if self.y == self.selected_line - self.first_visible_line and self.y > 1: #if self.auto_mode: #attr = self.curses.color_pair(2) + self.curses.A_STANDOUT + self.curses.A_BOLD + self.current.A_REVERSE ##elif self.move_mode: ##attr = self.curses.color_pair(7) + self.curses.A_STANDOUT + self.curses.A_BOLD #else: #attr = self.curses.color_pair(8) + self.curses.A_STANDOUT + self.curses.A_BOLD + self.current.A_REVERSE self.item_status = row[5] self.currentrow = row if self.y == 0: if self.auto_mode: attr = self.curses.color_pair(2) + self.curses.A_STANDOUT + self.curses.A_BOLD #elif self.move_mode: #attr = self.curses.color_pair(7) + self.curses.A_STANDOUT + self.curses.A_BOLD else: attr = self.curses.color_pair(8) + self.curses.A_STANDOUT + self.curses.A_BOLD elif row[5] == "DONE": attr = self.curses.color_pair(5) + self.curses.A_BOLD elif row[5] == "STOPPED": attr = self.curses.color_pair(6) + self.curses.A_BOLD elif row[5].find("ERROR") > -1: attr = self.curses.color_pair(4) + self.curses.A_BOLD elif row[5] == "WAITING": attr = self.curses.color_pair(3) + self.curses.A_BOLD elif row[5] in ("RUNNING","CONTINUING") : attr = self.curses.color_pair(2) + self.curses.A_BOLD elif not header and row[8]: attr = self.curses.A_BOLD else: attr = self.curses.A_NORMAL if self.y == self.selected_line - self.first_visible_line and self.y > 1: self.current_attr = attr attr += self.curses.A_REVERSE if header: # Dirty hack. put_line should be better refactored. # row contains one less element: arguments myline = str(row[0]).ljust(col_w[0]) myline += str(row[1]).ljust(col_w[1]) myline += str(row[2])[:19].ljust(col_w[2]) myline += str(row[3]).ljust(col_w[3]) myline += str(row[4]).ljust(col_w[4]) myline += str(row[5]).ljust(col_w[5]) myline += str(row[6]).ljust(col_w[6]) else: myline = str(row[0]).ljust(col_w[0]) myline += (str(row[1]) + (row[8] and ' [%s]' % row[8] or '')).ljust(col_w[1]) myline += str(row[2]).ljust(col_w[2]) myline += str(row[3])[:19].ljust(col_w[3]) myline += str(row[4]).ljust(col_w[4]) myline += str(row[5]).ljust(col_w[5]) myline += str(row[6]).ljust(col_w[6]) myline = myline.ljust(maxx) try: self.stdscr.addnstr(self.y, 0, myline, maxx, attr) except self.curses.error: pass self.y = self.y+1 def display_in_footer(self, footer, i = 0, print_time_p=0): if print_time_p: footer = "%s %s" % (footer, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) maxx = self.stdscr.getmaxyx()[1] footer = footer.ljust(maxx) if self.auto_mode: colorpair = 2 #elif self.move_mode: #colorpair = 7 else: colorpair = 1 try: self.stdscr.addnstr(self.y - i, 0, footer, maxx - 1, self.curses.A_STANDOUT + self.curses.color_pair(colorpair) + self.curses.A_BOLD ) except self.curses.error: pass def repaint(self): if server_pid(): self.auto_mode = 1 else: if self.auto_mode == 1: self.curses.beep() self.auto_mode = 0 self.y = 0 self.stdscr.clear() self.height, self.width = self.stdscr.getmaxyx() maxy = self.height - 2 #maxx = self.width self.put_line(("ID", "PROC [PRI]", "USER", "RUNTIME", "SLEEP", "STATUS", "PROGRESS"), True) self.put_line(("------", "---------", "----", "-------------------", "-----", "-----", "--------"), True) if self.selected_line > maxy + self.first_visible_line - 1: self.first_visible_line = self.selected_line - maxy + 1 if self.selected_line < self.first_visible_line + 2: self.first_visible_line = self.selected_line - 2 for row in self.rows[self.first_visible_line:self.first_visible_line+maxy-2]: self.put_line(row) self.y = self.stdscr.getmaxyx()[0] - 1 if self.auto_mode: self.display_in_footer(self.footer_auto_mode, print_time_p=1) #elif self.move_mode: #self.display_in_footer(self.footer_move_mode, print_time_p=1) else: self.display_in_footer(self.footer_select_mode, print_time_p=1) footer2 = "" if self.item_status.find("DONE") > -1 or self.item_status in ("ERROR", "STOPPED", "KILLED"): footer2 += self.footer_stopped_item elif self.item_status in ("RUNNING", "CONTINUING", "ABOUT TO STOP", "ABOUT TO SLEEP"): footer2 += self.footer_running_item elif self.item_status == "SLEEPING": footer2 += self.footer_sleeping_item elif self.item_status == "WAITING": footer2 += self.footer_waiting_item self.display_in_footer(footer2, 1) self.stdscr.refresh() def start(self, stdscr): if self.curses.has_colors(): self.curses.start_color() self.curses.init_pair(8, self.curses.COLOR_WHITE, self.curses.COLOR_BLACK) self.curses.init_pair(1, self.curses.COLOR_WHITE, self.curses.COLOR_RED) self.curses.init_pair(2, self.curses.COLOR_GREEN, self.curses.COLOR_BLACK) self.curses.init_pair(3, self.curses.COLOR_MAGENTA, self.curses.COLOR_BLACK) self.curses.init_pair(4, self.curses.COLOR_RED, self.curses.COLOR_BLACK) self.curses.init_pair(5, self.curses.COLOR_BLUE, self.curses.COLOR_BLACK) self.curses.init_pair(6, self.curses.COLOR_CYAN, self.curses.COLOR_BLACK) self.curses.init_pair(7, self.curses.COLOR_YELLOW, self.curses.COLOR_BLACK) self.stdscr = stdscr self.base_panel = self.curses.panel.new_panel( self.stdscr ) self.base_panel.bottom() self.curses.panel.update_panels() self.height, self.width = stdscr.getmaxyx() self.stdscr.clear() if server_pid(): self.auto_mode = 1 ring = 4 while self.running: if ring == 4: if self.display == 1: table = "schTASK" where = "and status='DONE'" order = "runtime DESC" elif self.display == 2: table = "schTASK" where = "and status<>'DONE'" order = "runtime ASC" else: table = "hstTASK" order = "runtime DESC" where = '' self.rows = run_sql("""SELECT id,proc,user,runtime,sleeptime,status,progress,arguments,priority FROM %s WHERE status NOT LIKE '%%DELETED%%' %s ORDER BY %s""" % (table, where, order)) ring = 0 self.repaint() ring += 1 char = -1 try: char = timed_out(self.stdscr.getch, 1) if char == 27: # escaping sequence char = self.stdscr.getch() if char == 79: # arrow char = self.stdscr.getch() if char == 65: #arrow up char = self.curses.KEY_UP elif char == 66: #arrow down char = self.curses.KEY_DOWN elif char == 72: char = self.curses.KEY_PPAGE elif char == 70: char = self.curses.KEY_NPAGE elif char == 91: char = self.stdscr.getch() if char == 53: char = self.stdscr.getch() if char == 126: char = self.curses.KEY_HOME except TimedOutExc: char = -1 self.handle_keys(char) _refresh_tasks = True def _bibsched_sig_info(sig, frame): """Signal handler for the 'USR2' signal sent by a finished task.""" global _refresh_tasks _refresh_tasks = True write_message('A task has terminated. Refreshing the task list.') class BibSched: def __init__(self): self.helper_modules = CFG_BIBTASK_VALID_TASKS self.scheduled = None signal.signal(signal.SIGUSR2, _bibsched_sig_info) def tasks_safe_p(self, proc1, proc2): """Return True when the two tasks can run concurrently.""" return proc1 != proc2 and not proc1.startswith('bibupload') and not proc2.startswith('bibupload') def get_tasks_to_sleep_and_stop(self, proc, task_set): """Among the task_set, return the dict of task to stop and the dict of task to sleep. """ min_prio = None min_task_id = None min_proc = None to_stop = {} for this_task_id, (this_proc, this_priority) in task_set.iteritems(): if self.tasks_safe_p(proc, this_proc): if min_prio is None or this_priority < min_prio: min_prio = this_priority min_task_id = this_task_id min_proc = this_proc else: to_stop[this_task_id] = (this_proc, this_priority) if len(task_set) < CFG_BIBSCHED_MAX_NUMBER_CONCURRENT_TASKS and not to_stop: ## All the task are safe and there are enough resources return {}, {} else: if to_stop: return to_stop, {} else: return {}, {min_task_id : (min_proc, min_prio)} def get_running_tasks(self, task_status): running_tasks = {} for status in ('RUNNING', 'CONTINUING', 'SLEEPING', 'ABOUT TO SLEEP', 'ABOUT TO STOP', 'SCHEDULED'): running_tasks.update(copy.deepcopy(task_status.get(status, {}))) return running_tasks def split_running_tasks_by_priority(self, task_status, task_id, priority): """Return two sets (by dict): the set of task_ids with lower priority and those with higher or equal priority.""" higher = {} lower = {} for other_task_id, (task_proc, dummy, task_priority) in self.get_running_tasks(task_status).iteritems(): if task_id == other_task_id: continue if task_priority < priority: lower[other_task_id] = (task_proc, task_priority) else: higher[other_task_id] = (task_proc, task_priority) return lower, higher def bibupload_in_the_queue(self, task_id, runtime): """Check if bibupload is scheduled/running before runtime. This is useful in order to enforce bibupload order.""" return run_sql("SELECT id, status FROM schTASK WHERE proc='bibupload' AND runtime<=%s AND id<%s AND (status='RUNNING' OR status='WAITING' OR status='CONTINUING' OR status='SLEEPING' OR status='SCHEDULED' OR status='ABOUT TO SLEEP' OR status='ABOUT TO STOP' OR status='ERROR' OR status='DONE WITH ERRORS')", (runtime, task_id)) def task_really_running_p(self, proc, task_id): """Ping the task and update its status to error if necessary.""" if run_sql("SELECT id FROM schTASK WHERE id=%s AND status in ('CONTINUING', 'RUNNING', 'ABOUT TO STOP', 'SLEEPING', 'ABOUT TO SLEEP')", (task_id, )): if not get_task_pid(proc, task_id): bibsched_set_status(task_id, "ERROR") return False return True return False def handle_row(self, task_status, task_id, proc, runtime, status, priority): """Perform needed action of the row representing a task. Return True when task_status need to be refreshed""" #write_message('%s id: %s, proc: %s, runtime: %s, status: %s, priority: %s' % (task_status, task_id, proc, runtime, status, priority)) #write_message("task_id: %s, proc: %s, runtime: %s, status: %s, priority: %s" % (task_id, proc, runtime, status, priority)) if task_id in task_status['RUNNING'] or task_id in task_status['CONTINUING']: if not self.task_really_running_p(proc, task_id): #write_message('update required') return True elif task_id in task_status['WAITING'] or task_id in task_status['SLEEPING']: #write_message("Trying to run %s" % task_id) if self.scheduled is not None and task_id != self.scheduled: ## Another task is scheduled for running. #write_message("cannot run because %s is already scheduled" % self.scheduled) return False nothing_was_scheduled = self.scheduled is None res = self.bibupload_in_the_queue(task_id, runtime) if _refresh_tasks: ## Some tasks have finished. Better refresh things... return True if priority < 0: return False if res: ## All bibupload must finish before. for (atask_id, astatus) in res: if astatus in ('ERROR', 'DONE WITH ERRORS'): raise StandardError('BibSched had to halt because a bibupload with id %s has status %s. Please do your checks and delete/reinitialize the failed bibupload.' % (atask_id, astatus)) #write_message("cannot run because these bibupload are scheduled: %s" % res) Log("Task #%d (%s) not yet run because there is a bibupload in the queue" % (task_id, proc)) return False self.scheduled = task_id if nothing_was_scheduled: Log("Task #%d (%s) scheduled for running" % (task_id, proc)) #write_message('Scheduled task %s' % self.scheduled) ## Schedule the task for running. lower, higher = self.split_running_tasks_by_priority(task_status, task_id, priority) #write_message('lower: %s' % lower) #write_message('higher: %s' % higher) for other_task_id, (other_proc, dummy) in higher.iteritems(): if not self.tasks_safe_p(proc, other_proc): ## There's at least a higher priority task running that ## cannot run at the same time of the given task. ## We give up #write_message("cannot run because task_id: %s, proc: %s is the queue and incompatible" % (other_task_id, other_proc)) return False ## No higer priority task have issue with the given task. if len(higher) >= CFG_BIBSCHED_MAX_NUMBER_CONCURRENT_TASKS: ## Not enough resources. #write_message("cannot run because all resource (%s) are used (%s), higher: %s" % (CFG_BIBSCHED_MAX_NUMBER_CONCURRENT_TASKS, len(higher), higher)) return False ## We check if it is necessary to stop/put to sleep some lower priority ## task. tasks_to_stop, tasks_to_sleep = self.get_tasks_to_sleep_and_stop(proc, lower) #write_message('tasks_to_stop: %s' % tasks_to_stop) #write_message('tasks_to_sleep: %s' % tasks_to_sleep) if tasks_to_stop and priority < 10: ## Only tasks with priority higher than 10 have the power ## to put task to stop. #write_message("cannot run because there are task to stop: %s and priority < 10" % tasks_to_stop) return False procname = proc.split(':')[0] if not tasks_to_stop and not tasks_to_sleep: self.scheduled = None if status in ("SLEEPING", "ABOUT TO SLEEP"): bibsched_send_signal(proc, task_id, signal.SIGCONT) Log("Task #%d (%s) woken up" % (task_id, proc)) return True elif procname in self.helper_modules: program = os.path.join(CFG_BINDIR, procname) fdout, fderr = get_output_channelnames(task_id) ## Trick to log in bibsched.log the task exiting exit_str = '&& echo "`date "+%%Y-%%m-%%d %%H:%%M:%%S"` --> Task #%d (%s) exited" >> %s/bibsched.log' % (task_id, proc, CFG_LOGDIR) COMMAND = "%s %s >> %s 2>> %s %s &" % (program, str(task_id), fdout, fderr, exit_str) bibsched_set_status(task_id, "SCHEDULED") Log("Task #%d (%s) started" % (task_id, proc)) os.system(COMMAND) return True else: ## It's not still safe to run the task. for other_task_id in tasks_to_stop: if other_task_id not in task_status['ABOUT TO STOP']: bibsched_set_status(other_task_id, 'ABOUT TO STOP') bibsched_send_signal(proc, other_task_id, signal.SIGTERM) for other_task_id in tasks_to_sleep: if other_task_id not in task_status['ABOUT TO SLEEP']: bibsched_set_status(other_task_id, 'ABOUT TO SLEEP') bibsched_send_signal(proc, other_task_id, signal.SIGUSR1) return True def watch_loop(self): global _refresh_tasks def get_rows(): """Return all the rows to work on.""" return run_sql("SELECT id,proc,runtime,status,priority FROM schTASK WHERE status NOT LIKE 'DONE' AND status NOT LIKE '%%DELETED%%' AND (runtime<=NOW() OR status='RUNNING' OR status='ABOUT TO STOP' OR status='ABOUT TO SLEEP' OR status='SLEEPING' OR status='SCHEDULED') ORDER BY priority DESC, runtime ASC, id ASC") def get_task_status(rows): """Return a handy data structure to analize the task status.""" ret = { 'RUNNING' : {}, 'CONTINUING' : {}, 'SLEEPING' : {}, 'WAITING' : {}, 'ABOUT TO STOP' : {}, 'ABOUT TO SLEEP' : {}, 'ERROR' : {}, 'DONE WITH ERRORS' : {}, 'SCHEDULED' : {} } for (id, proc, runtime, status, priority) in rows: if status not in ret: ret[status] = {} ret[status][id] = (proc, runtime, priority) return ret ## Cleaning up scheduled task not run because of bibsched being ## interrupted in the middle. run_sql("UPDATE schTASK SET status='WAITING' WHERE status='SCHEDULED'") try: while True: rows = get_rows() _refresh_tasks = False task_status = get_task_status(rows) if task_status['ERROR'] or task_status['DONE WITH ERRORS']: raise StandardError('BibSched had to halt because at least a task is in status ERROR (%s) or DONE WITH ERRORS (%s)' % (task_status['ERROR'], task_status['DONE WITH ERRORS'])) for row in rows: if _refresh_tasks or self.handle_row(task_status, *row): # Things have changed let's restart break time.sleep(CFG_BIBSCHED_REFRESHTIME) except: register_emergency('Emergency from %s: BibSched had to halt!' % CFG_SITE_URL) register_exception(alert_admin=True) raise class TimedOutExc(Exception): def __init__(self, value = "Timed Out"): Exception.__init__(self) self.value = value def __str__(self): return repr(self.value) def timed_out(f, timeout, *args, **kwargs): def handler(signum, frame): raise TimedOutExc() old = signal.signal(signal.SIGALRM, handler) signal.alarm(timeout) try: result = f(*args, **kwargs) finally: signal.signal(signal.SIGALRM, old) signal.alarm(0) return result def Log(message): log = open(CFG_LOGDIR + "/bibsched.log","a") log.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) log.write(message) log.write("\n") log.close() def redirect_stdout_and_stderr(): "This function redirects stdout and stderr to bibsched.log and bibsched.err file." old_stdout = sys.stdout sys.stdout = open(CFG_LOGDIR + "/bibsched.log", "a") sys.stderr = open(CFG_LOGDIR + "/bibsched.err", "a") return old_stdout def usage(exitcode=1, msg=""): """Prints usage info.""" if msg: sys.stderr.write("Error: %s.\n" % msg) sys.stderr.write("""\ Usage: %s [options] [start|stop|restart|monitor|status] The following commands are available for bibsched: start start bibsched in background stop stop a running bibsched restart restart a running bibsched monitor enter the interactive monitor status get report about current status of the queue purge purge the scheduler queue from old tasks Command options: -d, --daemon \t Launch BibSched in the daemon mode (deprecated, use 'start') General options: -h, --help \t Print this help. -V, --version \t Print version information. Status options: -s, --status=LIST\t Which BibTask status should be considered (default is Running,waiting) -S, --since=TIME\t Since how long time to consider tasks e.g.: 30m, 2h, 1d (default is all) -t, --tasks=LIST\t Comma separated list of BibTask to consider (default \t is all) Purge options: -s, --status=LIST\t Which BibTask status should be considered (default is DONE) -S, --since=TIME\t Since how long time to consider tasks e.g.: 30m, 2h, 1d (default is %s days) -t, --tasks=LIST\t Comma separated list of BibTask to consider (default \t is %s) """ % (sys.argv[0], CFG_BIBSCHED_GC_TASKS_OLDER_THAN, ','.join(CFG_BIBSCHED_GC_TASKS_TO_REMOVE + CFG_BIBSCHED_GC_TASKS_TO_ARCHIVE))) #sys.stderr.write(" -v, --verbose=LEVEL \t Verbose level (0=min, 1=default, 9=max).\n") sys.exit(exitcode) pidfile = os.path.join(CFG_PREFIX, 'var', 'run', 'bibsched.pid') def error(msg): print >> sys.stderr, "error: " + msg sys.exit(1) def server_pid(): # The pid must be stored on the filesystem try: pid = int(open(pidfile).read()) except IOError: return None # Even if the pid is available, we check if it corresponds to an # actual process, as it might have been killed externally try: os.kill(pid, signal.SIGCONT) except OSError: return None return pid def start(verbose = True): """ Fork this process in the background and start processing requests. The process PID is stored in a pid file, so that it can be stopped later on.""" if verbose: sys.stdout.write("starting bibsched: ") sys.stdout.flush() pid = server_pid() if pid: error("another instance of bibsched (pid %d) is running" % pid) # start the child process using the "double fork" technique pid = os.fork() if pid > 0: sys.exit(0) os.setsid() os.chdir('/') pid = os.fork() if pid > 0: if verbose: sys.stdout.write('pid %d\n' % pid) Log("daemon started (pid %d)" % pid) open(pidfile, 'w').write('%d' % pid) return sys.stdin.close() redirect_stdout_and_stderr() sched = BibSched() sched.watch_loop() return def stop(verbose=True, soft=False): pid = server_pid() if not pid: if soft: print >> sys.stderr, 'bibsched seems not to be running.' return else: error('bibsched seems not to be running.') try: os.kill(pid, signal.SIGKILL) except OSError: print >> sys.stderr, 'no bibsched process found' Log("daemon stopped (pid %d)" % pid) if verbose: print "stopping bibsched: pid %d" % pid os.unlink(pidfile) return def monitor(verbose = True): old_stdout = redirect_stdout_and_stderr() manager = Manager(old_stdout) return def write_message(msg, stream=None, verbose=1): """Write message and flush output stream (may be sys.stdout or sys.stderr). Useful for debugging stuff.""" if stream is None: stream = sys.stdout if msg: if stream == sys.stdout or stream == sys.stderr: stream.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) try: stream.write("%s\n" % msg) except UnicodeEncodeError: stream.write("%s\n" % msg.encode('ascii', 'backslashreplace')) stream.flush() else: sys.stderr.write("Unknown stream %s. [must be sys.stdout or sys.stderr]\n" % stream) def report_queue_status(verbose=True, status=None, since=None, tasks=None): """ Report about the current status of BibSched queue on standard output. """ def report_about_processes(status='RUNNING', since=None, tasks=None): """ Helper function to report about processes with the given status. """ if tasks is None: task_query = '' else: task_query = 'AND proc IN (%s)' % ( ','.join([repr(escape_string(task)) for task in tasks])) if since is None: since_query = '' else: # We're not interested in future task if since.startswith('+') or since.startswith('-'): since = since[1:] since = '-' + since since_query = "AND runtime >= '%s'" % get_datetime(since) res = run_sql("""SELECT id,proc,user,runtime,sleeptime,status,progress,priority FROM schTASK WHERE status=%%s %(task_query)s %(since_query)s ORDER BY id ASC""" % { 'task_query' : task_query, 'since_query' : since_query }, (status,)) write_message("%s processes: %d" % (status, len(res))) for (proc_id, proc_proc, proc_user, proc_runtime, proc_sleeptime, proc_status, proc_progress, proc_priority) in res: write_message(' * ID="%s" PRIORITY="%s" PROC="%s" USER="%s" RUNTIME="%s" SLEEPTIME="%s" STATUS="%s" PROGRESS="%s"' % \ (proc_id, proc_priority, proc_proc, proc_user, proc_runtime, proc_sleeptime, proc_status, proc_progress)) return write_message("BibSched queue status report for %s:" % gethostname()) if status is None: report_about_processes('Running', since, tasks) report_about_processes('Waiting', since, tasks) else: for state in status: report_about_processes(state, since, tasks) write_message("Done.") return def restart(verbose = True): stop(verbose, soft=True) start(verbose) return def main(): + from invenio.bibtask import check_running_as_apache_process_user + check_running_as_apache_process_user() + verbose = True status = None since = None tasks = None try: opts, args = getopt.gnu_getopt(sys.argv[1:], "hVdqS:s:t:", [ "help","version","daemon", "quiet", "since=", "status=", "task="]) except getopt.GetoptError, err: Log("Error: %s" % err) usage(1, err) for opt, arg in opts: if opt in ["-h", "--help"]: usage(0) elif opt in ["-V", "--version"]: print __revision__ sys.exit(0) elif opt in ["-d", "--daemon"]: redirect_stdout_and_stderr() sched = BibSched() Log("daemon started") sched.watch_loop() elif opt in ['-q', '--quiet']: verbose = False elif opt in ['-s', '--status']: status = arg.split(',') elif opt in ['-S', '--since']: since = arg elif opt in ['-t', '--task']: tasks = arg.split(',') else: usage(1) try: cmd = args [0] except IndexError: cmd = 'monitor' try: if cmd in ('status', 'purge'): { 'status' : report_queue_status, 'purge' : gc_tasks, } [cmd] (verbose, status, since, tasks) else: {'start': start, 'stop': stop, 'restart': restart, 'monitor': monitor} [cmd] (verbose) except KeyError: usage(1, 'unkown command: %s' % cmd) return if __name__ == '__main__': main() diff --git a/modules/bibsched/lib/bibtask.py b/modules/bibsched/lib/bibtask.py index d6f2c7c86..5523e6d7b 100644 --- a/modules/bibsched/lib/bibtask.py +++ b/modules/bibsched/lib/bibtask.py @@ -1,704 +1,746 @@ # -*- coding: utf-8 -*- ## ## $Id$ ## ## This file is part of CDS Invenio. ## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008 CERN. ## ## CDS Invenio is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License as ## published by the Free Software Foundation; either version 2 of the ## License, or (at your option) any later version. ## ## CDS Invenio is distributed in the hope that it will be useful, but ## WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with CDS Invenio; if not, write to the Free Software Foundation, Inc., ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. """CDS Invenio Bibliographic Task Class. BibTask class. A BibTask is an executable under CFG_BINDIR, whose name is stored in bibtask_config.CFG_BIBTASK_VALID_TASKS. A valid task must call the task_init function with the proper parameters. Generic task related parameters (user, sleeptime, runtime, task_id, task_name verbose) go to _task_params global dictionary accessible through task_get_task_param. Option specific to the particular BibTask go to _options global dictionary and are accessible via task_get_option/task_set_option. In order to log something properly, just use write_message(s) with the desired verbose level. task_update_status and task_update_progress can be used to update the status of the task (DONE, FAILED, DONE WITH ERRORS...) and it's progress (1 out 100..) within the bibsched monitor. It is possible to enqueue a BibTask via API call by means of task_low_level_submission. """ __revision__ = "$Id$" import getopt import getpass import marshal import os +import pwd import re import signal import sys import time +import popen2 import traceback from invenio.dbquery import run_sql, _db_login from invenio.access_control_engine import acc_authorize_action -from invenio.config import CFG_PREFIX, CFG_BINDIR +from invenio.config import CFG_PREFIX, CFG_BINDIR, CFG_BIBSCHED_PROCESS_USER from invenio.errorlib import register_exception from invenio.access_control_config import CFG_EXTERNAL_AUTH_USING_SSO, \ CFG_EXTERNAL_AUTHENTICATION from invenio.webuser import get_user_preferences, get_email from invenio.bibtask_config import CFG_BIBTASK_VALID_TASKS, \ CFG_BIBTASK_DEFAULT_TASK_SETTINGS # Which tasks don't need to ask the user for authorization? CFG_VALID_PROCESSES_NO_AUTH_NEEDED = ("bibupload", ) CFG_TASK_IS_NOT_A_DEAMON = ("bibupload", ) def fix_argv_paths(paths, argv=None): """Given the argv vector of cli parameters, and a list of path that can be relative and may have been specified within argv, it substitute all the occurencies of these paths in argv. argv is changed in place and returned. """ if argv is None: argv = sys.argv for path in paths: for count in xrange(len(argv)): if path == argv[count]: argv[count] = os.path.realpath(path) return argv def task_low_level_submission(name, user, *argv): """Let special lowlevel enqueuing of a task on the bibsche queue. @param name is the name of the bibtask. It must be a valid executable under CFG_BINDIR. @param user is a string that will appear as the "user" submitting the task. Since task are submitted via API it make sense to set the user to the name of the module/function that called task_low_level_submission. @param argv will be merged with the default setting of the given task as they can be found in bibtask_config. In order to know which variable are valid and which is the semantic, please have a glimpse at bibtask_config and to the source of the task_submit_elaborate_specific_parameter function of the desired bibtask. @return the task_id when the task is correctly enqueued. Use with care! Please use absolute paths in argv! """ def get_priority(argv): """Try to get the priority by analysing the arguments.""" priority = 0 try: stripped_argv = [arg for arg in argv if not arg.startswith('-') or arg.startswith('-P') or arg.startswith('--priority')] opts, args = getopt.gnu_getopt(stripped_argv, 'P:', ['priority=']) for opt in opts: if opt[0] in ('-P', '--priority'): priority = opt[1] except: pass return priority task_id = None try: if not name in CFG_BIBTASK_VALID_TASKS: raise StandardError('%s is not a valid task name' % name) priority = get_priority(argv) argv = tuple([os.path.join(CFG_BINDIR, name)] + list(argv)) ## submit task: task_id = run_sql("""INSERT INTO schTASK (proc,user, runtime,sleeptime,status,progress,arguments,priority) VALUES (%s,%s,NOW(),'','WAITING','',%s,%s)""", (name, user, marshal.dumps(argv), priority)) except Exception: register_exception() if task_id: run_sql("""DELETE FROM schTASK WHERE id=%s""", (task_id, )) raise return task_id def task_init( authorization_action="", authorization_msg="", description="", help_specific_usage="", version=__revision__, specific_params=("", []), task_stop_helper_fnc=None, task_submit_elaborate_specific_parameter_fnc=None, task_submit_check_options_fnc=None, task_run_fnc=None): """ Initialize a BibTask. @param authorization_action is the name of the authorization action connected with this task; @param authorization_msg is the header printed when asking for an authorization password; @param description is the generic description printed in the usage page; @param help_specific_usage is the specific parameter help @param task_stop_fnc is a function that will be called whenever the task is stopped @param task_submit_elaborate_specific_parameter_fnc will be called passing a key and a value, for parsing specific cli parameters. Must return True if it has recognized the parameter. Must eventually update the options with bibtask_set_option; @param task_submit_check_options must check the validity of options (via bibtask_get_option) once all the options where parsed; @param task_run_fnc will be called as the main core function. Must return False in case of errors. """ global _task_params, _options _task_params = { "version" : version, "task_stop_helper_fnc" : task_stop_helper_fnc, "task_name" : os.path.basename(sys.argv[0]), "task_specific_name" : '', "user" : '', "verbose" : 1, "sleeptime" : '', "runtime" : time.strftime("%Y-%m-%d %H:%M:%S"), "priority" : 0, } to_be_submitted = True if len(sys.argv) == 2 and sys.argv[1].isdigit(): _task_params['task_id'] = int(sys.argv[1]) argv = _task_get_options(_task_params['task_id'], _task_params['task_name']) to_be_submitted = False else: argv = sys.argv if type(argv) is dict: # FIXME: REMOVE AFTER MAJOR RELEASE 1.0 # This is needed for old task submitted before CLI parameters # where stored in DB and _options dictionary was stored instead. _options = argv else: _task_build_params(_task_params['task_name'], argv, description, help_specific_usage, version, specific_params, task_submit_elaborate_specific_parameter_fnc, task_submit_check_options_fnc) write_message('argv=%s' % (argv, ), verbose=9) write_message('_options=%s' % (_options, ), verbose=9) write_message('_task_params=%s' % (_task_params, ), verbose=9) if to_be_submitted: _task_submit(argv, authorization_action, authorization_msg) else: try: if not _task_run(task_run_fnc): write_message("Error occurred. Exiting.", sys.stderr) except Exception, e: write_message("Unexpected error occurred: %s." % e, sys.stderr) write_message("Traceback is:", sys.stderr) traceback.print_tb(sys.exc_info()[2]) write_message("Exiting.", sys.stderr) task_update_status("ERROR") def _task_build_params( task_name, argv, description="", help_specific_usage="", version=__revision__, specific_params=("", []), task_submit_elaborate_specific_parameter_fnc=None, task_submit_check_options_fnc=None): """ Build the BibTask params. @param argv a list of string as in sys.argv @param description is the generic description printed in the usage page; @param help_specific_usage is the specific parameter help @param task_submit_elaborate_specific_parameter_fnc will be called passing a key and a value, for parsing specific cli parameters. Must return True if it has recognized the parameter. Must eventually update the options with bibtask_set_option; @param task_submit_check_options must check the validity of options (via bibtask_get_option) once all the options where parsed; """ global _task_params, _options _options = {} if task_name in CFG_BIBTASK_DEFAULT_TASK_SETTINGS: _options.update(CFG_BIBTASK_DEFAULT_TASK_SETTINGS[task_name]) # set user-defined options: try: (short_params, long_params) = specific_params opts, args = getopt.gnu_getopt(argv[1:], "hVv:u:s:t:P:N:" + short_params, [ "help", "version", "verbose=", "user=", "sleep=", "time=", "priority=" "task_specific_name=" ] + long_params) except getopt.GetoptError, err: _usage(1, err, help_specific_usage=help_specific_usage, description=description) try: for opt in opts: if opt[0] in ["-h", "--help"]: _usage(0, help_specific_usage=help_specific_usage, description=description) elif opt[0] in ["-V", "--version"]: print _task_params["version"] sys.exit(0) elif opt[0] in [ "-u", "--user"]: _task_params["user"] = opt[1] elif opt[0] in ["-v", "--verbose"]: _task_params["verbose"] = int(opt[1]) elif opt[0] in [ "-s", "--sleeptime" ]: if task_name not in CFG_TASK_IS_NOT_A_DEAMON: get_datetime(opt[1]) # see if it is a valid shift _task_params["sleeptime"] = opt[1] elif opt[0] in [ "-t", "--runtime" ]: _task_params["runtime"] = get_datetime(opt[1]) elif opt[0] in ("-P", "--priority"): _task_params["priority"] = int(opt[1]) elif opt[0] in ("-N", "--task_name"): _task_params["task_specific_name"] = opt[1] elif not callable(task_submit_elaborate_specific_parameter_fnc) or \ not task_submit_elaborate_specific_parameter_fnc(opt[0], opt[1], opts, args): _usage(1, help_specific_usage=help_specific_usage, description=description) except StandardError, e: _usage(e, help_specific_usage=help_specific_usage, description=description) if callable(task_submit_check_options_fnc): if not task_submit_check_options_fnc(): _usage(1, help_specific_usage=help_specific_usage, description=description) def task_set_option(key, value): """Set an value to key in the option dictionary of the task""" global _options try: _options[key] = value except NameError: _options = {key : value} def task_get_option(key, default=None): """Returns the value corresponding to key in the option dictionary of the task""" try: return _options.get(key, default) except NameError: return default def task_has_option(key): """Map the has_key query to _options""" try: return _options.has_key(key) except NameError: return False def task_get_task_param(key, default=None): """Returns the value corresponding to the particular task param""" try: return _task_params.get(key, default) except NameError: return default def task_set_task_param(key, value): """Set the value corresponding to the particular task param""" global _task_params try: _task_params[key] = value except NameError: _task_params = {key : value} def task_update_progress(msg): """Updates progress information in the BibSched task table.""" write_message("Updating task progress to %s." % msg, verbose=9) return run_sql("UPDATE schTASK SET progress=%s where id=%s", (msg, _task_params["task_id"])) def task_update_status(val): """Updates status information in the BibSched task table.""" write_message("Updating task status to %s." % val, verbose=9) return run_sql("UPDATE schTASK SET status=%s where id=%s", (val, _task_params["task_id"])) def task_read_status(): """Read status information in the BibSched task table.""" res = run_sql("SELECT status FROM schTASK where id=%s", (_task_params['task_id'],), 1) try: out = res[0][0] except: out = 'UNKNOWN' return out def write_messages(msgs, stream=sys.stdout, verbose=1): """Write many messages through write_message""" for msg in msgs.split('\n'): write_message(msg, stream, verbose) def write_message(msg, stream=sys.stdout, verbose=1): """Write message and flush output stream (may be sys.stdout or sys.stderr). Useful for debugging stuff.""" if msg and _task_params['verbose'] >= verbose: if stream == sys.stdout or stream == sys.stderr: stream.write(time.strftime("%Y-%m-%d %H:%M:%S --> ", time.localtime())) try: stream.write("%s\n" % msg) except UnicodeEncodeError: stream.write("%s\n" % msg.encode('ascii', 'backslashreplace')) stream.flush() else: sys.stderr.write("Unknown stream %s. [must be sys.stdout or sys.stderr]\n" % stream) _shift_re = re.compile("([-\+]{0,1})([\d]+)([dhms])") def get_datetime(var, format_string="%Y-%m-%d %H:%M:%S"): """Returns a date string according to the format string. It can handle normal date strings and shifts with respect to now.""" date = time.time() factors = {"d":24*3600, "h":3600, "m":60, "s":1} m = _shift_re.match(var) if m: sign = m.groups()[0] == "-" and -1 or 1 factor = factors[m.groups()[2]] value = float(m.groups()[1]) date = time.localtime(date + sign * factor * value) date = time.strftime(format_string, date) else: date = time.strptime(var, format_string) date = time.strftime(format_string, date) return date def task_sleep_now_if_required(can_stop_too=False): """This function should be called during safe state of BibTask, e.g. after flushing caches or outside of run_sql calls. """ write_message('Entering task_sleep_now_if_required with signal_request=%s' % _task_params['signal_request'], verbose=9) if _task_params['signal_request'] == 'sleep': _task_params['signal_request'] = None write_message("sleeping...") task_update_status("SLEEPING") signal.pause() # wait for wake-up signal elif _task_params['signal_request'] == 'ctrlz': _task_params['signal_request'] = None signal.signal(signal.SIGTSTP, signal.SIG_DFL) write_message("sleeping...") task_update_status("SLEEPING") os.kill(os.getpid(), signal.SIGTSTP) time.sleep(1) elif _task_params['signal_request'] == 'ctrlc' and can_stop_too: _task_params['signal_request'] = None signal.signal(signal.SIGINT, signal.SIG_DFL) write_message("stopped") task_update_status("STOPPED") os.kill(os.getpid(), signal.SIGINT) time.sleep(1) elif _task_params['signal_request'] == 'stop' and can_stop_too: _task_params['signal_request'] = None write_message("stopped") task_update_status("STOPPED") sys.exit(0) def authenticate(user, authorization_action, authorization_msg=""): """Authenticate the user against the user database. Check for its password, if it exists. Check for authorization_action access rights. Return user name upon authorization success, do system exit upon authorization failure. """ # With SSO it's impossible to check for pwd if CFG_EXTERNAL_AUTH_USING_SSO or os.path.basename(sys.argv[0]) in CFG_VALID_PROCESSES_NO_AUTH_NEEDED: return user if authorization_msg: print authorization_msg print "=" * len(authorization_msg) if user == "": print >> sys.stdout, "\rUsername: ", try: user = sys.stdin.readline().lower().strip() except EOFError: sys.stderr.write("\n") sys.exit(1) except KeyboardInterrupt: sys.stderr.write("\n") sys.exit(1) else: print >> sys.stdout, "\rUsername:", user ## first check user: # p_un passed may be an email or a nickname: res = run_sql("select id from user where email=%s", (user,), 1) + \ run_sql("select id from user where nickname=%s", (user,), 1) if not res: print "Sorry, %s does not exist." % user sys.exit(1) else: uid = res[0][0] ok = False login_method = get_user_preferences(uid)['login_method'] if not CFG_EXTERNAL_AUTHENTICATION[login_method][0]: #Local authentication, let's see if we want passwords. res = run_sql("select id from user where id=%s " "and password=AES_ENCRYPT(email,'')", (uid,), 1) if res: ok = True if not ok: try: password_entered = getpass.getpass() except EOFError: sys.stderr.write("\n") sys.exit(1) except KeyboardInterrupt: sys.stderr.write("\n") sys.exit(1) if not CFG_EXTERNAL_AUTHENTICATION[login_method][0]: res = run_sql("select id from user where id=%s " "and password=AES_ENCRYPT(email, %s)", (uid, password_entered), 1) if res: ok = True else: if CFG_EXTERNAL_AUTHENTICATION[login_method][0].auth_user(get_email(uid), password_entered): ok = True if not ok: print "Sorry, wrong credentials for %s." % user sys.exit(1) else: ## secondly check authorization for the authorization_action: (auth_code, auth_message) = acc_authorize_action(uid, authorization_action) if auth_code != 0: print auth_message sys.exit(1) return user def _task_submit(argv, authorization_action, authorization_msg): """Submits task to the BibSched task queue. This is what people will be invoking via command line.""" ## sanity check: remove eventual "task" option: ## authenticate user: _task_params['user'] = authenticate(_task_params["user"], authorization_action, authorization_msg) ## submit task: if _task_params['task_specific_name']: task_name = '%s:%s' % (_task_params['task_name'], _task_params['task_specific_name']) else: task_name = _task_params['task_name'] write_message("storing task options %s\n" % argv, verbose=9) _task_params['task_id'] = run_sql("""INSERT INTO schTASK (proc,user, runtime,sleeptime,status,progress,arguments,priority) VALUES (%s,%s,%s,%s,'WAITING','',%s, %s)""", (task_name, _task_params['user'], _task_params["runtime"], _task_params["sleeptime"], marshal.dumps(argv), _task_params['priority'])) ## update task number: write_message("Task #%d submitted." % _task_params['task_id']) return _task_params['task_id'] def _task_get_options(task_id, task_name): """Returns options for the task 'id' read from the BibSched task queue table.""" out = {} res = run_sql("SELECT arguments FROM schTASK WHERE id=%s AND proc LIKE %s", (task_id, task_name+'%')) try: out = marshal.loads(res[0][0]) except: write_message("Error: %s task %d does not seem to exist." \ % (task_name, task_id), sys.stderr) task_update_status('ERROR') sys.exit(1) write_message('Options retrieved: %s' % (out, ), verbose=9) return out def _task_run(task_run_fnc): """Runs the task by fetching arguments from the BibSched task queue. This is what BibSched will be invoking via daemon call. The task prints Fibonacci numbers for up to NUM on the stdout, and some messages on stderr. @param task_run_fnc will be called as the main core function. Must return False in case of errors. Return True in case of success and False in case of failure.""" ## We prepare the pid file inside /prefix/var/run/taskname_id.pid + check_running_as_apache_process_user() try: pidfile_name = os.path.join(CFG_PREFIX, 'var', 'run', 'bibsched_task_%d.pid' % _task_params['task_id']) pidfile = open(pidfile_name, 'w') pidfile.write(str(os.getpid())) pidfile.close() except OSError: register_exception(alert_admin=True) task_update_status("ERROR") return False ## check task status: task_status = task_read_status() if task_status not in ("WAITING", "SCHEDULED"): write_message("Error: The task #%d is %s. I expected WAITING or SCHEDULED." % (_task_params['task_id'], task_status), sys.stderr) return False ## initialize signal handler: _task_params['signal_request'] = None signal.signal(signal.SIGUSR1, _task_sig_sleep) signal.signal(signal.SIGUSR2, signal.SIG_IGN) signal.signal(signal.SIGTSTP, _task_sig_ctrlz) signal.signal(signal.SIGTERM, _task_sig_stop) signal.signal(signal.SIGQUIT, _task_sig_stop) signal.signal(signal.SIGABRT, _task_sig_suicide) signal.signal(signal.SIGCONT, _task_sig_wakeup) signal.signal(signal.SIGINT, _task_sig_ctrlc) ## we can run the task now: write_message("Task #%d started." % _task_params['task_id']) task_update_status("RUNNING") ## run the task: _task_params['task_starting_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) sleeptime = _task_params['sleeptime'] try: try: if callable(task_run_fnc) and task_run_fnc(): task_update_status("DONE") else: task_update_status("DONE WITH ERRORS") except SystemExit: pass except: register_exception(alert_admin=True) task_update_status("ERROR") finally: task_status = task_read_status() if sleeptime: new_runtime = get_datetime(sleeptime) ## The task is a daemon. We resubmit it if task_status == 'DONE': ## It has finished in a good way. We recycle the database row run_sql("UPDATE schTASK SET runtime=%s, status='WAITING', progress='' WHERE id=%s", (new_runtime, _task_params['task_id'])) write_message("Task #%d finished and resubmitted." % _task_params['task_id']) else: ## We keep the bad result and we resubmit with another id. #res = run_sql('SELECT proc,user,sleeptime,arguments,priority FROM schTASK WHERE id=%s', (_task_params['task_id'], )) #proc, user, sleeptime, arguments, priority = res[0] #run_sql("""INSERT INTO schTASK (proc,user, #runtime,sleeptime,status,arguments,priority) #VALUES (%s,%s,%s,%s,'WAITING',%s, %s)""", #(proc, user, new_runtime, sleeptime, arguments, priority)) write_message("Task #%d finished but not resubmitted. [%s]" % (_task_params['task_id'], task_status)) else: ## we are done: write_message("Task #%d finished. [%s]" % (_task_params['task_id'], task_status)) ## Removing the pid os.remove(pidfile_name) try: # Let's signal bibsched that we have finished. from invenio.bibsched import pidfile os.kill(int(open(pidfile).read()), signal.SIGUSR2) except: pass return True def _usage(exitcode=1, msg="", help_specific_usage="", description=""): """Prints usage info.""" if msg: sys.stderr.write("Error: %s.\n" % msg) sys.stderr.write("Usage: %s [options]\n" % sys.argv[0]) if help_specific_usage: sys.stderr.write("Command options:\n") sys.stderr.write(help_specific_usage) sys.stderr.write("Scheduling options:\n") sys.stderr.write(" -u, --user=USER\tUser name to submit the" " task as, password needed.\n") sys.stderr.write(" -t, --runtime=TIME\tTime to execute the" " task (now), e.g.: +15s, 5m, 3h, 2002-10-27 13:57:26\n") sys.stderr.write(" -s, --sleeptime=SLEEP\tSleeping frequency after" " which to repeat task (no), e.g.: 30m, 2h, 1d\n") sys.stderr.write(" -P, --priority=PRIORITY\tPriority level (an integer, 0 is default)\n") sys.stderr.write(" -N, --task_specific_name=TASK_SPECIFIC_NAME\tAdvanced option\n") sys.stderr.write("General options:\n") sys.stderr.write(" -h, --help\t\tPrint this help.\n") sys.stderr.write(" -V, --version\t\tPrint version information.\n") sys.stderr.write(" -v, --verbose=LEVEL\tVerbose level (0=min," " 1=default, 9=max).\n") if description: sys.stderr.write(description) sys.exit(exitcode) def _task_sig_sleep(sig, frame): """Signal handler for the 'sleep' signal sent by BibSched.""" write_message("task_sig_sleep(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("sleeping as soon as possible...") _task_params['signal_request'] = 'sleep' _db_login(1) task_update_status("ABOUT TO SLEEP") def _task_sig_ctrlz(sig, frame): """Signal handler for the 'ctrlz' signal sent by BibSched.""" write_message("task_sig_ctrlz(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("sleeping as soon as possible...") _task_params['signal_request'] = 'ctrlz' _db_login(1) task_update_status("ABOUT TO STOP") def _task_sig_wakeup(sig, frame): """Signal handler for the 'wakeup' signal sent by BibSched.""" signal.signal(signal.SIGTSTP, _task_sig_ctrlz) write_message("task_sig_wakeup(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("continuing...") _task_params['signal_request'] = None _db_login(1) task_update_status("CONTINUING") def _task_sig_ctrlc(sig, frame): """Signal handler for the 'stop' signal sent by BibSched.""" write_message("task_sig_ctrlc(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("stopping as soon as possible...") _db_login(1) # To avoid concurrency with an interrupted run_sql call task_update_status("STOPPING") _task_params['signal_request'] = 'ctrlc' def _task_sig_stop(sig, frame): """Signal handler for the 'stop' signal sent by BibSched.""" write_message("task_sig_stop(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("stopping as soon as possible...") _db_login(1) # To avoid concurrency with an interrupted run_sql call task_update_status("STOPPING") _task_params['signal_request'] = 'stop' def _task_sig_suicide(sig, frame): """Signal handler for the 'suicide' signal sent by BibSched.""" write_message("task_sig_suicide(), got signal %s frame %s" % (sig, frame), verbose=9) write_message("suiciding myself now...") task_update_status("SUICIDING") write_message("suicided") _db_login(1) task_update_status("SUICIDED") sys.exit(0) def _task_sig_unknown(sig, frame): """Signal handler for the other unknown signals sent by shell or user.""" # do nothing for unknown signals: write_message("unknown signal %d (frame %s) ignored" % (sig, frame)) +_RE_PSLINE = re.compile('^\s*(.+?)\s+(.+?)\s*$') +def guess_apache_process_user_from_ps(): + """Parse the ps call looking for the Apache process user.""" + apache_users = [] + try: + # Tested on Linux, Sun and MacOS X + for line in os.popen('ps -A -o user,comm').readlines(): + g = _RE_PSLINE.match(line) + if g: + username = g.group(2) + process = os.path.basename(g.group(1)) + if process in ('apache', 'apache2', 'httpd') : + if username not in apache_users and username != 'root': + apache_users.append(username) + except Exception, e: + print >> sys.stderr, "WARNING: %s" % e + return tuple(apache_users) + +def guess_apache_process_user(): + """Return the possible name of the user running the Apache server.""" + if CFG_BIBSCHED_PROCESS_USER: + apache_users = (CFG_BIBSCHED_PROCESS_USER, ) + else: + apache_users = guess_apache_process_user_from_ps() + ('apache2', 'apache', 'www-data') + for username in apache_users: + try: + userline = pwd.getpwnam(username) + return userline[2] + except KeyError: + pass + print >> sys.stderr, "ERROR: It's impossible to discover the name of the user running the Apache webserver process. Please set the correct value in CFG_BIBSCHED_PROCESS_USER" + sys.exit(1) + +def check_running_as_apache_process_user(): + """Check that the user running this program is the same of that running the + Apache webserver.""" + if os.getuid() != guess_apache_process_user(): + print >> sys.stderr,"ERROR: You must run \"%s\" with credentials compatible with the user running the Apache webserver process!" % os.path.basename(sys.argv[0]) + sys.exit(1)