diff --git a/invenio/legacy/search_engine/__init__.py b/invenio/legacy/search_engine/__init__.py index bbe63d2df..ecb79ae8e 100644 --- a/invenio/legacy/search_engine/__init__.py +++ b/invenio/legacy/search_engine/__init__.py @@ -1,512 +1,408 @@ # -*- coding: utf-8 -*- # # This file is part of Invenio. # Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, # 2013, 2014, 2015 CERN. # # Invenio is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # Invenio is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Invenio; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. # pylint: disable=C0301,W0703 """Invenio Search Engine in mod_python.""" import warnings from invenio_utils.deprecation import RemovedInInvenio23Warning warnings.warn("Legacy search_engine will be removed in 2.3. Please check " "'invenio_search' module.", RemovedInInvenio23Warning) __lastupdated__ = """$Date$""" __revision__ = "$Id$" # import general modules: import cgi import re import urllib import urlparse import zlib +from flask_login import current_user + +from invenio.config import CFG_SITE_NAME, CFG_SITE_LANG, CFG_SITE_URL, \ + CFG_WEBSEARCH_DEF_RECORDS_IN_GROUPS + # import Invenio stuff: -from invenio.config import \ - CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS, \ - CFG_BASE_URL, \ - CFG_BIBFORMAT_HIDDEN_TAGS, \ - CFG_BIBINDEX_CHARS_PUNCTUATION, \ - CFG_BIBRANK_SHOW_DOWNLOAD_GRAPHS, \ - CFG_BIBSORT_BUCKETS, \ - CFG_BIBSORT_ENABLED, \ - CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG, \ - CFG_BIBUPLOAD_SERIALIZE_RECORD_STRUCTURE, \ - CFG_CERN_SITE, \ - CFG_INSPIRE_SITE, \ - CFG_LOGDIR, \ - CFG_OAI_ID_FIELD, \ - CFG_WEBSEARCH_FIELDS_CONVERT, \ - CFG_WEBSEARCH_DEF_RECORDS_IN_GROUPS, \ - CFG_WEBSEARCH_FULLTEXT_SNIPPETS, \ - CFG_WEBSEARCH_DISPLAY_NEAREST_TERMS, \ - CFG_WEBSEARCH_WILDCARD_LIMIT, \ - CFG_WEBSEARCH_IDXPAIRS_FIELDS,\ - CFG_WEBSEARCH_IDXPAIRS_EXACT_SEARCH, \ - CFG_BIBUPLOAD_SERIALIZE_RECORD_STRUCTURE, \ - CFG_BIBUPLOAD_EXTERNAL_SYSNO_TAG, \ - CFG_BIBRANK_SHOW_DOWNLOAD_GRAPHS, \ - CFG_WEBSEARCH_SYNONYM_KBRS, \ - CFG_SITE_LANG, \ - CFG_SITE_NAME, \ - CFG_LOGDIR, \ - CFG_SITE_URL, \ - CFG_WEBSEARCH_DETAILED_META_FORMAT, \ - CFG_SITE_RECORD, \ - CFG_WEBSEARCH_PREV_NEXT_HIT_LIMIT, \ - CFG_WEBSEARCH_VIEWRESTRCOLL_POLICY, \ - CFG_WEBSEARCH_WILDCARD_LIMIT - -from invenio_search.utils import get_most_popular_field_values -from invenio_search.errors import \ - InvenioWebSearchUnknownCollectionError, \ - InvenioWebSearchWildcardLimitError, \ - InvenioWebSearchReferstoLimitError, \ - InvenioWebSearchCitedbyLimitError -from invenio.legacy.bibrecord import (get_fieldvalues, - get_fieldvalues_alephseq_like) +from invenio.legacy.bibrecord import get_fieldvalues from .utils import record_exists -from invenio.legacy.bibrecord import create_record, record_xml_output -from invenio.legacy.miscutil.data_cacher import DataCacher -from invenio_access.control import acc_get_action_id -from invenio_access.local_config import VIEWRESTRCOLL, \ - CFG_ACC_GRANT_AUTHOR_RIGHTS_TO_EMAILS_IN_TAGS, \ - CFG_ACC_GRANT_AUTHOR_RIGHTS_TO_USERIDS_IN_TAGS, \ - CFG_ACC_GRANT_VIEWER_RIGHTS_TO_EMAILS_IN_TAGS, \ - CFG_ACC_GRANT_VIEWER_RIGHTS_TO_USERIDS_IN_TAGS - -from intbitset import intbitset -from invenio.legacy.dbquery import InvenioDbQueryWildcardLimitError -from invenio_utils.serializers import deserialize_via_marshal -from invenio_access.engine import acc_authorize_action -from invenio.ext.logging import register_exception -from invenio_utils.text import encode_for_xml, wash_for_utf8, strip_accents -from invenio.legacy import bibrecord - -from invenio.legacy.dbquery import run_sql, run_sql_with_limit, \ - wash_table_column_name, get_table_update_time -from invenio.legacy.webuser import getUid, collect_user_info -from invenio_base.i18n import gettext_set_language - -from invenio_utils import apache - -from sqlalchemy.exc import DatabaseError - -VIEWRESTRCOLL_ID = acc_get_action_id(VIEWRESTRCOLL) + +from invenio.legacy.dbquery import run_sql + +from invenio.base.i18n import gettext_set_language +from invenio.modules.collections.models import Collection # em possible values EM_REPOSITORY={"body" : "B", "header" : "H", "footer" : "F", "search_box" : "S", "see_also_box" : "L", "basket" : "K", "alert" : "A", "search_info" : "I", "overview" : "O", "all_portalboxes" : "P", "te_portalbox" : "Pte", "tp_portalbox" : "Ptp", "np_portalbox" : "Pnp", "ne_portalbox" : "Pne", "lt_portalbox" : "Plt", "rt_portalbox" : "Prt", "search_services": "SER"}; -from invenio_collections.cache import collection_restricted_p -from invenio_collections.cache import restricted_collection_cache -from invenio_search.utils import get_permitted_restricted_collections - - -from invenio_records.access import check_user_can_view_record - -from invenio_collections.cache import get_coll_i18nname -from invenio_search.cache import get_field_i18nname def create_navtrail_links(cc=CFG_SITE_NAME, aas=0, ln=CFG_SITE_LANG, self_p=1, tab=''): """Creates navigation trail links, i.e. links to collection ancestors (except Home collection). If aas==1, then links to Advanced Search interfaces; otherwise Simple Search. """ return '' -from invenio_search.washers import * + +from invenio_search.washers import wash_pattern, wash_output_format, \ + wash_field, wash_dates def get_coll_ancestors(coll): "Returns a list of ancestors for collection 'coll'." coll_ancestors = [] coll_ancestor = coll while 1: res = run_sql("""SELECT c.name FROM collection AS c LEFT JOIN collection_collection AS cc ON c.id=cc.id_dad LEFT JOIN collection AS ccc ON ccc.id=cc.id_son WHERE ccc.name=%s ORDER BY cc.id_dad ASC LIMIT 1""", (coll_ancestor,)) if res: coll_name = res[0][0] coll_ancestors.append(coll_name) coll_ancestor = coll_name else: break # ancestors found, return reversed list: coll_ancestors.reverse() return coll_ancestors -from invenio_collections.cache import get_collection_allchildren - - def search_pattern(req=None, p=None, f=None, m=None, ap=0, of="id", verbose=0, ln=CFG_SITE_LANG, display_nearest_terms_box=True, wl=0): """Search for complex pattern 'p' within field 'f' according to matching type 'm'. Return hitset of recIDs. The function uses multi-stage searching algorithm in case of no exact match found. See the Search Internals document for detailed description. The 'ap' argument governs whether an alternative patterns are to be used in case there is no direct hit for (p,f,m). For example, whether to replace non-alphanumeric characters by spaces if it would give some hits. See the Search Internals document for detailed description. (ap=0 forbits the alternative pattern usage, ap=1 permits it.) 'ap' is also internally used for allowing hidden tag search (for requests coming from webcoll, for example). In this case ap=-9 The 'of' argument governs whether to print or not some information to the user in case of no match found. (Usually it prints the information in case of HTML formats, otherwise it's silent). The 'verbose' argument controls the level of debugging information to be printed (0=least, 9=most). All the parameters are assumed to have been previously washed. This function is suitable as a mid-level API. """ from invenio_search.api import Query results = Query(p).search() import warnings warnings.warn( 'Deprecated search_pattern(p={0}, f={1}, m={2}) = {3}.'.format( p, f, m, results), stacklevel=2 ) return results def guess_primary_collection_of_a_record(recID): """Return primary collection name a record recid belongs to, by testing 980 identifier. May lead to bad guesses when a collection is defined dynamically via dbquery. In that case, return 'CFG_SITE_NAME'.""" out = CFG_SITE_NAME dbcollids = get_fieldvalues(recID, "980__a") for dbcollid in dbcollids: variants = ("collection:" + dbcollid, 'collection:"' + dbcollid + '"', "980__a:" + dbcollid, '980__a:"' + dbcollid + '"', '980:' + dbcollid , '980:"' + dbcollid + '"') res = run_sql("SELECT name FROM collection WHERE dbquery IN (%s,%s,%s,%s,%s,%s)", variants) if res: out = res[0][0] break - if CFG_CERN_SITE: - recID = int(recID) - # dirty hack for ATLAS collections at CERN: - if out in ('ATLAS Communications', 'ATLAS Internal Notes'): - for alternative_collection in ('ATLAS Communications Physics', - 'ATLAS Communications General', - 'ATLAS Internal Notes Physics', - 'ATLAS Internal Notes General',): - if recID in get_collection_reclist(alternative_collection): - return alternative_collection - - # dirty hack for FP - FP_collections = {'DO': ['Current Price Enquiries', 'Archived Price Enquiries'], - 'IT': ['Current Invitation for Tenders', 'Archived Invitation for Tenders'], - 'MS': ['Current Market Surveys', 'Archived Market Surveys']} - fp_coll_ids = [coll for coll in dbcollids if coll in FP_collections] - for coll in fp_coll_ids: - for coll_name in FP_collections[coll]: - if recID in get_collection_reclist(coll_name): - return coll_name return out _re_collection_url = re.compile('/collection/(.+)') def guess_collection_of_a_record(recID, referer=None, recreate_cache_if_needed=True): """Return collection name a record recid belongs to, by first testing the referer URL if provided and otherwise returning the primary collection.""" if referer: dummy, hostname, path, dummy, query, dummy = urlparse.urlparse(referer) # requests can come from different invenio installations, with # different collections if CFG_SITE_URL.find(hostname) < 0: return guess_primary_collection_of_a_record(recID) g = _re_collection_url.match(path) if g: name = urllib.unquote_plus(g.group(1)) # check if this collection actually exist (also normalize the name # if case-insensitive) name = Collection.query.filter_by(name=name).value('name') - if name and recID in get_collection_reclist(name): - return name elif path.startswith('/search'): - if recreate_cache_if_needed: - collection_reclist_cache.recreate_cache_if_needed() query = cgi.parse_qs(query) for name in query.get('cc', []) + query.get('c', []): name = Collection.query.filter_by(name=name).value('name') - if name and recID in get_collection_reclist(name, recreate_cache_if_needed=False): - return name return guess_primary_collection_of_a_record(recID) def get_all_collections_of_a_record(recID, recreate_cache_if_needed=True): """Return all the collection names a record belongs to. Note this function is O(n_collections).""" ret = [] - if recreate_cache_if_needed: - collection_reclist_cache.recreate_cache_if_needed() - for name in collection_reclist_cache.cache.keys(): - if recID in get_collection_reclist(name, recreate_cache_if_needed=False): - ret.append(name) return ret - def slice_records(recIDs, jrec, rg): if not jrec: jrec = 1 if rg: recIDs = recIDs[jrec-1:jrec-1+rg] else: recIDs = recIDs[jrec-1:] return recIDs def get_interval_for_records_to_sort(nb_found, jrec=None, rg=None): """calculates in which interval should the sorted records be a value of 'rg=-9999' means to print all records: to be used with care.""" if not jrec: jrec = 1 if not rg: #return all return jrec-1, nb_found if rg == -9999: # print all records rg = nb_found else: rg = abs(rg) if jrec < 1: # sanity checks jrec = 1 if jrec > nb_found: jrec = max(nb_found-rg+1, 1) # will sort records from irec_min to irec_max excluded irec_min = jrec - 1 irec_max = irec_min + rg if irec_min < 0: irec_min = 0 if irec_max > nb_found: irec_max = nb_found return irec_min, irec_max def get_record(recid): """Directly the record object corresponding to the recid.""" import warnings warnings.warn('Deprecated get_record({}).'.format(str(recid)), stacklevel=2) from invenio_records import api try: return api.get_record(recid).legacy_create_recstruct() except AttributeError: return api.Record.create({'recid': recid}, 'json').legacy_create_recstruct() def print_record(recID, format='hb', ot='', ln=CFG_SITE_LANG, decompress=zlib.decompress, search_pattern=None, user_info=None, verbose=0, sf='', so='d', sp='', rm='', brief_links=True): """ Print record 'recID' formatted according to 'format'. 'sf' is sort field and 'rm' is ranking method that are passed here only for proper linking purposes: e.g. when a certain ranking method or a certain sort field was selected, keep it selected in any dynamic search links that may be printed. """ from invenio_formatter import format_record return format_record( recID, of=format, ln=ln, verbose=verbose, search_pattern=search_pattern ) if record_exists(recID) != 0 else "" def create_add_to_search_pattern(p, p1, f1, m1, op1): """Create the search pattern """ if not p1: return p init_search_pattern = p # operation: AND, OR, AND NOT if op1 == 'a' and p: # we don't want '+' at the begining of the query op = ' +' elif op1 == 'o': op = ' |' elif op1 == 'n': op = ' -' else: op = ' ' if p else '' # field field = '' if f1: field = f1 + ':' # type of search pattern = p1 start = '(' end = ')' if m1 == 'e': start = end = '"' elif m1 == 'p': start = end = "'" elif m1 == 'r': start = end = '/' else: # m1 == 'o' or m1 =='a' words = p1.strip().split(' ') if len(words) == 1: start = end = '' pattern = field + words[0] elif m1 == 'o': pattern = ' |'.join([field + word for word in words]) else: pattern = ' '.join([field + word for word in words]) #avoid having field:(word1 word2) since this is not currently correctly working return init_search_pattern + op + start + pattern + end if not pattern: return '' #avoid having field:(word1 word2) since this is not currently correctly working return init_search_pattern + op + field + start + pattern + end ### CALLABLES def perform_request_search(req=None, cc=CFG_SITE_NAME, c=None, p="", f="", rg=None, sf="", so="a", sp="", rm="", of="id", ot="", aas=0, p1="", f1="", m1="", op1="", p2="", f2="", m2="", op2="", p3="", f3="", m3="", sc=0, jrec=0, recid=-1, recidb=-1, sysno="", id=-1, idb=-1, sysnb="", action="", d1="", d1y=0, d1m=0, d1d=0, d2="", d2y=0, d2m=0, d2d=0, dt="", verbose=0, ap=0, ln=CFG_SITE_LANG, ec=None, tab="", wl=0, em=""): kwargs = prs_wash_arguments(req=req, cc=cc, c=c, p=p, f=f, rg=rg, sf=sf, so=so, sp=sp, rm=rm, of=of, ot=ot, aas=aas, p1=p1, f1=f1, m1=m1, op1=op1, p2=p2, f2=f2, m2=m2, op2=op2, p3=p3, f3=f3, m3=m3, sc=sc, jrec=jrec, recid=recid, recidb=recidb, sysno=sysno, id=id, idb=idb, sysnb=sysnb, action=action, d1=d1, d1y=d1y, d1m=d1m, d1d=d1d, d2=d2, d2y=d2y, d2m=d2m, d2d=d2d, dt=dt, verbose=verbose, ap=ap, ln=ln, ec=ec, tab=tab, wl=wl, em=em) import warnings warnings.warn('Deprecated perform_request_search({}).'.format(str(kwargs)), stacklevel=2) from invenio_search.api import Query p = create_add_to_search_pattern(p, p1, f1, m1, "") p = create_add_to_search_pattern(p, p2, f2, m2, op1) p = create_add_to_search_pattern(p, p3, f3, m3, op2) return Query(p).search(collection=cc) def prs_wash_arguments(req=None, cc=CFG_SITE_NAME, c=None, p="", f="", rg=CFG_WEBSEARCH_DEF_RECORDS_IN_GROUPS, sf="", so="d", sp="", rm="", of="id", ot="", aas=0, p1="", f1="", m1="", op1="", p2="", f2="", m2="", op2="", p3="", f3="", m3="", sc=0, jrec=0, recid=-1, recidb=-1, sysno="", id=-1, idb=-1, sysnb="", action="", d1="", d1y=0, d1m=0, d1d=0, d2="", d2y=0, d2m=0, d2d=0, dt="", verbose=0, ap=0, ln=CFG_SITE_LANG, ec=None, tab="", uid=None, wl=0, em="", **dummy): """ Sets the (default) values and checks others for the PRS call """ # wash output format: of = wash_output_format(of) # wash all arguments requiring special care p = wash_pattern(p) f = wash_field(f) p1 = wash_pattern(p1) f1 = wash_field(f1) p2 = wash_pattern(p2) f2 = wash_field(f2) p3 = wash_pattern(p3) f3 = wash_field(f3) (d1y, d1m, d1d, d2y, d2m, d2d) = map(int, (d1y, d1m, d1d, d2y, d2m, d2d)) datetext1, datetext2 = wash_dates(d1, d1y, d1m, d1d, d2, d2y, d2m, d2d) - # wash ranking method: - if not is_method_valid(None, rm): - rm = "" if id > 0 and recid == -1: recid = id if idb > 0 and recidb == -1: recidb = idb # deduce collection we are in (if applicable): if recid > 0: referer = None if req: referer = req.headers_in.get('Referer') cc = guess_collection_of_a_record(recid, referer) # deduce user id (if applicable): if uid is None: try: - uid = getUid(req) + uid = current_user.get_id() except: uid = 0 _ = gettext_set_language(ln) kwargs = {'req': req, 'cc': cc, 'c': c, 'p': p, 'f': f, 'rg': rg, 'sf': sf, 'so': so, 'sp': sp, 'rm': rm, 'of': of, 'ot': ot, 'aas': aas, 'p1': p1, 'f1': f1, 'm1': m1, 'op1': op1, 'p2': p2, 'f2': f2, 'm2': m2, 'op2': op2, 'p3': p3, 'f3': f3, 'm3': m3, 'sc': sc, 'jrec': jrec, 'recid': recid, 'recidb': recidb, 'sysno': sysno, 'id': id, 'idb': idb, 'sysnb': sysnb, 'action': action, 'd1': d1, 'd1y': d1y, 'd1m': d1m, 'd1d': d1d, 'd2': d2, 'd2y': d2y, 'd2m': d2m, 'd2d': d2d, 'dt': dt, 'verbose': verbose, 'ap': ap, 'ln': ln, 'ec': ec, 'tab': tab, 'wl': wl, 'em': em, 'datetext1': datetext1, 'datetext2': datetext2, 'uid': uid, '_': _, 'selected_external_collections_infos': None, } kwargs.update(**dummy) return kwargs diff --git a/invenio/legacy/webpage.py b/invenio/legacy/webpage.py index a40803d21..b926ae2c9 100644 --- a/invenio/legacy/webpage.py +++ b/invenio/legacy/webpage.py @@ -1,270 +1,298 @@ # This file is part of Invenio. # Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, # 2011, 2012, 2015 CERN. # # Invenio is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # Invenio is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Invenio; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. """Invenio Web Page Functions""" __revision__ = "$Id$" +from flask_login import current_user + from invenio.config import \ CFG_WEBSTYLE_CDSPAGEBOXLEFTBOTTOM, \ CFG_WEBSTYLE_CDSPAGEBOXLEFTTOP, \ CFG_WEBSTYLE_CDSPAGEBOXRIGHTBOTTOM, \ CFG_WEBSTYLE_CDSPAGEBOXRIGHTTOP, \ CFG_SITE_LANG, \ CFG_SITE_URL, \ CFG_SITE_NAME_INTL, \ - CFG_SITE_NAME + CFG_SITE_NAME, CFG_SITE_SECURE_URL from invenio_base.i18n import gettext_set_language -from invenio.legacy.webuser import \ - create_userinfobox_body, \ - getUid +from invenio.legacy.dbquery import run_sql import invenio.legacy.template webstyle_templates = invenio.legacy.template.load('webstyle') from xml.dom.minidom import getDOMImplementation + +def get_nickname_or_email(uid): + """Return nickname (preferred) or the email address of the user uid. + Return string 'guest' in case the user is not found.""" + out = "guest" + res = run_sql("SELECT nickname, email FROM user WHERE id=%s", (uid,), 1) + if res and res[0]: + if res[0][0]: + out = res[0][0] + elif res[0][1]: + out = res[0][1].lower() + return out + + +def create_userinfobox_body(req, uid, language="en"): + """Create user info box body for user UID in language LANGUAGE.""" + + if req: + if req.is_https(): + url_referer = CFG_SITE_SECURE_URL + req.unparsed_uri + else: + url_referer = CFG_SITE_URL + req.unparsed_uri + if '/youraccount/logout' in url_referer: + url_referer = '' + else: + url_referer = CFG_SITE_URL + + return "" + + def create_navtrailbox_body(title, previous_links, prolog="", separator=""" > """, epilog="", language=CFG_SITE_LANG): """Create navigation trail box body input: title = page title; previous_links = the trail content from site title until current page (both ends exclusive). output: text containing the navtrail """ return webstyle_templates.tmpl_navtrailbox_body(ln = language, title = title, previous_links = \ previous_links, separator = separator, prolog = prolog, epilog = epilog) def page(title, body, navtrail="", description="", keywords="", metaheaderadd="", uid=None, cdspageheaderadd="", cdspageboxlefttopadd="", cdspageboxleftbottomadd="", cdspageboxrighttopadd="", cdspageboxrightbottomadd="", cdspagefooteradd="", lastupdated="", language=CFG_SITE_LANG, verbose=1, titleprologue="", titleepilogue="", secure_page_p=0, req=None, errors=None, warnings=None, navmenuid="admin", navtrail_append_title_p=1, of="", rssurl=CFG_SITE_URL+"/rss", show_title_p=True, body_css_classes=None, show_header=True, show_footer=True): """page(): display CDS web page input: title of the page body of the page in html format description goes to the metadata in the header of the HTML page keywords goes to the metadata in the header of the html page metaheaderadd goes to further metadata in the header of the html page cdspageheaderadd is a message to be displayed just under the page header cdspageboxlefttopadd is a message to be displayed in the page body on left top cdspageboxleftbottomadd is a message to be displayed in the page body on left bottom cdspageboxrighttopadd is a message to be displayed in the page body on right top cdspageboxrightbottomadd is a message to be displayed in the page body on right bottom cdspagefooteradd is a message to be displayed on the top of the page footer lastupdated is a text containing the info on last update (optional) language is the language version of the page verbose is verbosity of the page (useful for debugging) titleprologue is to be printed right before page title titleepilogue is to be printed right after page title req is the mod_python request object log is the string of data that should be appended to the log file (errors automatically logged) secure_page_p is 0 or 1 and tells whether we are to use HTTPS friendly page elements or not navmenuid the section of the website this page belongs (search, submit, baskets, etc.) navtrail_append_title_p is 0 or 1 and tells whether page title is appended to navtrail of is an output format (use xx for xml output (e.g. AJAX)) rssfeed is the url of the RSS feed for this page show_title_p is 0 or 1 and tells whether page title should be displayed in body of the page show_header is 0 or 1 and tells whether page header should be displayed or not show_footer is 0 or 1 and tells whether page footer should be displayed or not output: the final cds page with header, footer, etc. """ - _ = gettext_set_language(language) if req is not None: if uid is None: - uid = getUid(req) + uid = current_user.get_id() secure_page_p = req.is_https() and 1 or 0 if uid is None: ## 0 means generic guest user. uid = 0 if of == 'xx': #xml output (e.g. AJAX calls) => of=xx req.content_type = 'text/xml' impl = getDOMImplementation() output = impl.createDocument(None, "invenio-message", None) root = output.documentElement body_node = output.createElement('body') body_text = output.createCDATASection(unicode(body, 'utf_8')) body_node.appendChild(body_text) root.appendChild(body_node) return output.toprettyxml(encoding="utf-8" ) else: return webstyle_templates.tmpl_page(req, ln=language, description = description, keywords = keywords, metaheaderadd = metaheaderadd, userinfobox = create_userinfobox_body(req, uid, language), navtrailbox = create_navtrailbox_body(navtrail_append_title_p \ and title or '', navtrail, language=language), uid = uid, secure_page_p = secure_page_p, pageheaderadd = cdspageheaderadd, boxlefttop = CFG_WEBSTYLE_CDSPAGEBOXLEFTTOP, boxlefttopadd = cdspageboxlefttopadd, boxleftbottomadd = cdspageboxleftbottomadd, boxleftbottom = CFG_WEBSTYLE_CDSPAGEBOXLEFTBOTTOM, boxrighttop = CFG_WEBSTYLE_CDSPAGEBOXRIGHTTOP, boxrighttopadd = cdspageboxrighttopadd, boxrightbottomadd = cdspageboxrightbottomadd, boxrightbottom = CFG_WEBSTYLE_CDSPAGEBOXRIGHTBOTTOM, titleprologue = titleprologue, title = title, titleepilogue = titleepilogue, body = body, lastupdated = lastupdated, pagefooteradd = cdspagefooteradd, navmenuid = navmenuid, rssurl = rssurl, show_title_p = show_title_p, body_css_classes=body_css_classes, show_header=show_header, show_footer=show_footer) def pageheaderonly(title, navtrail="", description="", keywords="", uid=0, cdspageheaderadd="", language=CFG_SITE_LANG, req=None, secure_page_p=0, verbose=1, navmenuid="admin", navtrail_append_title_p=1, metaheaderadd="", rssurl=CFG_SITE_URL+"/rss", body_css_classes=None): """Return just the beginning of page(), with full headers. Suitable for the search results page and any long-taking scripts.""" if req is not None: if uid is None: - uid = getUid(uid) + uid = current_user.get_id(uid) secure_page_p = req.is_https() and 1 or 0 return webstyle_templates.tmpl_pageheader(req, ln = language, headertitle = title, description = description, keywords = keywords, metaheaderadd = metaheaderadd, userinfobox = create_userinfobox_body(req, uid, language), navtrailbox = create_navtrailbox_body(navtrail_append_title_p \ and title or '', navtrail, language=language), uid = uid, secure_page_p = secure_page_p, pageheaderadd = cdspageheaderadd, navmenuid = navmenuid, rssurl = rssurl, body_css_classes=body_css_classes) def pagefooteronly(cdspagefooteradd="", lastupdated="", language=CFG_SITE_LANG, req=None, verbose=1): """Return just the ending of page(), with full footer. Suitable for the search results page and any long-taking scripts.""" return webstyle_templates.tmpl_pagefooter(req, ln=language, lastupdated = lastupdated, pagefooteradd = cdspagefooteradd) def create_error_box(req, title=None, verbose=1, ln=CFG_SITE_LANG, errors=None): """Analyse the req object and the sys traceback and return a text message box with internal information that would be suitful to display when something bad has happened. """ - _ = gettext_set_language(ln) return webstyle_templates.tmpl_error_box(title = title, ln = ln, verbose = verbose, req = req, errors = errors) def adderrorbox(header='', datalist=[]): """used to create table around main data on a page, row based""" try: perc = str(100 // len(datalist)) + '%' except ZeroDivisionError: perc = 1 output = '' output += '' % (len(datalist), header) output += '' for row in [datalist]: output += '' for data in row: output += '' output += '' output += '
%s
' % (perc, ) output += data output += '
' return output def error_page(title, req, ln=CFG_SITE_LANG): # load the right message language _ = gettext_set_language(ln) site_name = CFG_SITE_NAME_INTL.get(ln, CFG_SITE_NAME) return page(title = _("Error"), body = create_error_box(req, title=str(title), verbose=0, ln=ln), description="%s - Internal Error" % site_name, keywords="%s, Internal Error" % site_name, - uid = getUid(req), + uid = current_user.get_id(req), language=ln, req=req) def warning_page(title, req, ln=CFG_SITE_LANG): # load the right message language _ = gettext_set_language(ln) site_name = CFG_SITE_NAME_INTL.get(ln, CFG_SITE_NAME) return page(title = _("Warning"), body = title, description="%s - Internal Error" % site_name, keywords="%s, Internal Error" % site_name, - uid = getUid(req), + uid = current_user.get_id(req), language=ln, req=req) def write_warning(msg, type='', prologue='
', epilogue='
', req=None): """Prints warning message and flushes output.""" if msg: ret = webstyle_templates.tmpl_write_warning( msg = msg, type = type, prologue = prologue, epilogue = epilogue, ) if req is None: return ret else: req.write(ret) else: return '' diff --git a/invenio/legacy/webuser.py b/invenio/legacy/webuser.py deleted file mode 100644 index 22f90b9f5..000000000 --- a/invenio/legacy/webuser.py +++ /dev/null @@ -1,770 +0,0 @@ -# -*- coding: utf-8 -*- -# -# This file is part of Invenio. -# Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, -# 2013, 2014, 2015 CERN. -# -# Invenio is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; either version 2 of the -# License, or (at your option) any later version. -# -# Invenio is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Invenio; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. - -""" -This file implements all methods necessary for working with users and -sessions in Invenio. Contains methods for logging/registration -when a user log/register into the system, checking if it is a guest -user or not. - -At the same time this presents all the stuff it could need with -sessions managements, working with websession. - -It also contains Apache-related user authentication stuff. -""" - -__revision__ = "$Id$" - -import cgi -import urllib -import urlparse -import socket -import smtplib -import re -import random -import datetime - -from flask import Request, current_app -from six import iteritems -from socket import gaierror -import os -import binascii -import time - -from invenio_base.wrappers import lazy_import -from invenio.config import \ - CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS, \ - CFG_ACCESS_CONTROL_LEVEL_GUESTS, \ - CFG_ACCESS_CONTROL_LEVEL_SITE, \ - CFG_ACCESS_CONTROL_LIMIT_REGISTRATION_TO_DOMAIN, \ - CFG_ACCESS_CONTROL_NOTIFY_ADMIN_ABOUT_NEW_ACCOUNTS, \ - CFG_ACCESS_CONTROL_NOTIFY_USER_ABOUT_NEW_ACCOUNT, \ - CFG_SITE_ADMIN_EMAIL, \ - CFG_SITE_LANG, \ - CFG_SITE_NAME, \ - CFG_SITE_NAME_INTL, \ - CFG_SITE_SUPPORT_EMAIL, \ - CFG_SITE_SECURE_URL, \ - CFG_SITE_URL, \ - CFG_WEBSESSION_ADDRESS_ACTIVATION_EXPIRE_IN_DAYS, \ - CFG_CERN_SITE, \ - CFG_INSPIRE_SITE, \ - CFG_BIBAUTHORID_ENABLED, \ - CFG_SITE_RECORD - -try: - from flask import session -except ImportError: - pass -from invenio.legacy.dbquery import run_sql -from invenio_utils.serializers import serialize_via_marshal - - -from invenio_base.i18n import gettext_set_language, wash_languages, wash_language -from invenio.ext.email import send_email -from invenio.ext.logging import register_exception -from invenio.ext.sqlalchemy import db -from invenio_accounts.models import User - -from sqlalchemy.exc import OperationalError - - -acc_get_role_id = lazy_import('invenio_access.control:acc_get_role_id') -acc_get_action_roles = lazy_import('invenio_access.control:acc_get_action_roles') -acc_get_action_id = lazy_import('invenio_access.control:acc_get_action_id') -acc_is_user_in_role = lazy_import('invenio_access.control:acc_is_user_in_role') -acc_find_possible_activities = lazy_import('invenio_access.control:acc_find_possible_activities') -mail_cookie_create_mail_activation = lazy_import('invenio_access.mailcookie:mail_cookie_create_mail_activation') -acc_firerole_check_user = lazy_import('invenio_access.firerole:acc_firerole_check_user') -load_role_definition = lazy_import('invenio_access.firerole:load_role_definition') -from invenio_access.local_config import SUPERADMINROLE -CFG_EXTERNAL_AUTH_USING_SSO = lazy_import('invenio_access.local_config:CFG_EXTERNAL_AUTH_USING_SSO') -CFG_EXTERNAL_AUTHENTICATION = lazy_import('invenio_access.local_config:CFG_EXTERNAL_AUTHENTICATION') -CFG_WEBACCESS_MSGS = lazy_import('invenio_access.local_config:CFG_WEBACCESS_MSGS') -CFG_WEBACCESS_WARNING_MSGS = lazy_import('invenio_access.local_config:CFG_WEBACCESS_WARNING_MSGS') -CFG_EXTERNAL_AUTH_DEFAULT = lazy_import('invenio_access.local_config:CFG_EXTERNAL_AUTH_DEFAULT') -CFG_TEMP_EMAIL_ADDRESS = lazy_import('invenio_access.local_config:CFG_TEMP_EMAIL_ADDRESS') - - -re_invalid_nickname = re.compile(""".*[,'@]+.*""") - -# pylint: disable=C0301 - -def createGuestUser(): - """Create a guest user , insert into user null values in all fields - - createGuestUser() -> GuestUserID - """ - if CFG_ACCESS_CONTROL_LEVEL_GUESTS == 0: - try: - return run_sql("""insert into "user" (email, note) values ('', '1')""") - except OperationalError: - return None - else: - try: - return run_sql("""insert into "user" (email, note) values ('', '0')""") - except OperationalError: - return None - -def page_not_authorized(req, referer='', uid='', text='', navtrail='', ln=CFG_SITE_LANG, - navmenuid=""): - """Show error message when user is not authorized to do something. - - @param referer: in case the displayed message propose a login link, this - is the url to return to after logging in. If not specified it is guessed - from req. - - @param uid: the uid of the user. If not specified it is guessed from req. - - @param text: the message to be displayed. If not specified it will be - guessed from the context. - """ - - from invenio.legacy.webpage import page - - _ = gettext_set_language(ln) - - if not referer: - referer = req.unparsed_uri - - if not CFG_ACCESS_CONTROL_LEVEL_SITE: - title = CFG_WEBACCESS_MSGS[5] - if not uid: - uid = getUid(req) - try: - res = run_sql("""SELECT email FROM "user" WHERE id=%s AND note=1""", (uid,)) - - if res and res[0][0]: - if text: - body = text - else: - body = "%s %s" % (CFG_WEBACCESS_WARNING_MSGS[9] % cgi.escape(res[0][0]), - ("%s %s" % (CFG_WEBACCESS_MSGS[0] % urllib.quote(referer), CFG_WEBACCESS_MSGS[1]))) - else: - if text: - body = text - else: - if CFG_ACCESS_CONTROL_LEVEL_GUESTS == 1: - body = CFG_WEBACCESS_MSGS[3] - else: - body = CFG_WEBACCESS_WARNING_MSGS[4] + CFG_WEBACCESS_MSGS[2] - - except OperationalError as e: - body = _("Database problem") + ': ' + str(e) - - - elif CFG_ACCESS_CONTROL_LEVEL_SITE == 1: - title = CFG_WEBACCESS_MSGS[8] - body = "%s %s" % (CFG_WEBACCESS_MSGS[7], CFG_WEBACCESS_MSGS[2]) - - elif CFG_ACCESS_CONTROL_LEVEL_SITE == 2: - title = CFG_WEBACCESS_MSGS[6] - body = "%s %s" % (CFG_WEBACCESS_MSGS[4], CFG_WEBACCESS_MSGS[2]) - - return page(title=title, - language=ln, - uid=getUid(req), - body=body, - navtrail=navtrail, - req=req, - navmenuid=navmenuid) - -def getUid(req): - """Return user ID taking it from the cookie of the request. - Includes control mechanism for the guest users, inserting in - the database table when need be, raising the cookie back to the - client. - - User ID is set to 0 when client refuses cookie or we are in the - read-only site operation mode. - - User ID is set to -1 when we are in the permission denied site - operation mode. - - getUid(req) -> userId - """ - #if hasattr(req, '_user_info'): - # return req._user_info['_uid'] - if CFG_ACCESS_CONTROL_LEVEL_SITE == 1: return 0 - if CFG_ACCESS_CONTROL_LEVEL_SITE == 2: return -1 - - guest = 0 - from flask import session - - uid = session.uid - if not session.need_https: - if uid == -1: # first time, so create a guest user - if CFG_ACCESS_CONTROL_LEVEL_GUESTS == 0: - session['uid'] = 0 - session.set_remember_me(False) - return 0 - else: - return -1 - else: - if not hasattr(req, '_user_info') and 'user_info' in session: - req._user_info = session['user_info'] - req._user_info = collect_user_info(req, refresh=True) - - if guest == 0: - guest = isGuestUser(uid) - - if guest: - if CFG_ACCESS_CONTROL_LEVEL_GUESTS == 0: - return uid - elif CFG_ACCESS_CONTROL_LEVEL_GUESTS >= 1: - return -1 - else: - res = run_sql("""SELECT note FROM "user" WHERE id=%s""", (uid,)) - if CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS == 0: - return uid - elif CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS >= 1 and res and res[0][0] in [1, "1"]: - return uid - else: - return -1 - - -from invenio.ext.login import current_user, login_user, logout_user -getUid = lambda req: current_user.get_id() - - -def setUid(req, uid, remember_me=False): - """It sets the userId into the session, and raise the cookie to the client. - """ - if uid > 0: - login_user(uid, remember_me) - else: - logout_user() - return uid - -def session_param_del(req, key): - """ - Remove a given key from the session. - """ - del session[key] - -def session_param_set(req, key, value): - """ - Set a VALUE for the session param KEY for the current session. - """ - session[key] = value - -def session_param_get(req, key, default = None): - """ - Return session parameter value associated with session parameter KEY for the current session. - If the key doesn't exists return the provided default. - """ - return session.get(key, default) - -def session_param_list(req): - """ - List all available session parameters. - """ - return session.keys() - -def get_last_login(uid): - """Return the last_login datetime for uid if any, otherwise return the Epoch.""" - res = run_sql('SELECT last_login FROM "user" WHERE id=%s', (uid,), 1) - if res and res[0][0]: - return res[0][0] - else: - return datetime.datetime(1970, 1, 1) - -def get_user_info(uid, ln=CFG_SITE_LANG): - """Get infos for a given user. - @param uid: user id (int) - @return: tuple: (uid, nickname, display_name) - """ - _ = gettext_set_language(ln) - query = """SELECT id, nickname - FROM "user" - WHERE id=%s""" - res = run_sql(query, (uid,)) - if res: - if res[0]: - user = list(res[0]) - if user[1]: - user.append(user[1]) - else: - user[1] = str(user[0]) - user.append(_("user") + ' #' + str(user[0])) - return tuple(user) - return (uid, '', _("N/A")) - -def get_uid_from_email(email): - """Return the uid corresponding to an email. - Return -1 when the email does not exists.""" - try: - res = run_sql("""SELECT id FROM "user" WHERE email=%s""", (email,)) - if res: - return res[0][0] - else: - return -1 - except OperationalError: - register_exception() - return -1 - -def isGuestUser(uid, run_on_slave=True): - """It Checks if the userId corresponds to a guestUser or not - - isGuestUser(uid) -> boolean - """ - out = 1 - try: - res = run_sql("""SELECT email FROM "user" WHERE id=%s LIMIT 1""", (uid,), 1, - run_on_slave=run_on_slave) - if res: - if res[0][0]: - out = 0 - except OperationalError: - register_exception() - return out - -def isUserSubmitter(user_info): - """Return True if the user is a submitter for something; False otherwise.""" - u_email = get_email(user_info['uid']) - res = run_sql("""SELECT email FROM "sbmSUBMISSIONS" WHERE email=%s LIMIT 1""", (u_email,), 1) - return len(res) > 0 - -def isUserReferee(user_info): - """Return True if the user is a referee for something; False otherwise.""" - if CFG_CERN_SITE: - return True - else: - for (role_id, role_name, role_description) in acc_get_action_roles(acc_get_action_id('referee')): - if acc_is_user_in_role(user_info, role_id): - return True - return False - -def isUserAdmin(user_info): - """Return True if the user has some admin rights; False otherwise.""" - user = User.query.get(user_info['uid']) - return user and user.has_admin_role - -def isUserSuperAdmin(user_info): - """Return True if the user is superadmin; False otherwise.""" - user = User.query.get(user_info['uid']) - if user and user.has_super_admin_role: - return True - return acc_firerole_check_user( - user_info, load_role_definition(acc_get_role_id(SUPERADMINROLE))) - -def nickname_valid_p(nickname): - """Check whether wanted NICKNAME supplied by the user is valid. - At the moment we just check whether it is not empty, does not - contain blanks or @, is not equal to `guest', etc. - - This check relies on re_invalid_nickname regexp (see above) - Return 1 if nickname is okay, return 0 if it is not. - """ - if nickname and \ - not(nickname.startswith(' ') or nickname.endswith(' ')) and \ - nickname.lower() != 'guest': - if not re_invalid_nickname.match(nickname): - return 1 - return 0 - -def email_valid_p(email): - """Check whether wanted EMAIL address supplied by the user is valid. - At the moment we just check whether it contains '@' and whether - it doesn't contain blanks. We also check the email domain if - CFG_ACCESS_CONTROL_LIMIT_REGISTRATION_TO_DOMAIN is set. - - Return 1 if email is okay, return 0 if it is not. - """ - if (email.find("@") <= 0) or (email.find(" ") > 0): - return 0 - elif CFG_ACCESS_CONTROL_LIMIT_REGISTRATION_TO_DOMAIN: - if not email.endswith(CFG_ACCESS_CONTROL_LIMIT_REGISTRATION_TO_DOMAIN): - return 0 - return 1 - -def confirm_email(email): - """Confirm the email. It returns None when there are problems, otherwise - it return the uid involved.""" - if CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS == 0: - activated = 1 - elif CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS == 1: - activated = 0 - elif CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS >= 2: - return -1 - run_sql('UPDATE "user" SET note=%s where email=%s', (activated, email)) - res = run_sql('SELECT id FROM "user" where email=%s', (email,)) - if res: - if CFG_ACCESS_CONTROL_NOTIFY_ADMIN_ABOUT_NEW_ACCOUNTS: - send_new_admin_account_warning(email, CFG_SITE_ADMIN_EMAIL) - return res[0][0] - else: - return None - - -def registerUser(req, email, passw, nickname, register_without_nickname=False, - login_method=None, ln=CFG_SITE_LANG): - """Register user with the desired values of NICKNAME, EMAIL and - PASSW. - - If REGISTER_WITHOUT_NICKNAME is set to True, then ignore - desired NICKNAME and do not set any. This is suitable for - external authentications so that people can login without - having to register an internal account first. - - Return 0 if the registration is successful, 1 if email is not - valid, 2 if nickname is not valid, 3 if email is already in the - database, 4 if nickname is already in the database, 5 when - users cannot register themselves because of the site policy, 6 when the - site is having problem contacting the user. - - If login_method is None or is equal to the key corresponding to local - authentication, then CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS is taken - in account for deciding the behaviour about registering. - """ - - # is email valid? - email = email.lower() - if not email_valid_p(email): - return 1 - - _ = gettext_set_language(ln) - - # is email already taken? - res = run_sql("""SELECT email FROM "user" WHERE email=%s""", (email,)) - if len(res) > 0: - return 3 - - if register_without_nickname: - # ignore desired nick and use default empty string one: - nickname = "" - else: - # is nickname valid? - if not nickname_valid_p(nickname): - return 2 - # is nickname already taken? - res = run_sql("""SELECT nickname FROM "user" WHERE nickname=%s""", (nickname,)) - if len(res) > 0: - return 4 - - activated = 1 # By default activated - - if not login_method or not CFG_EXTERNAL_AUTHENTICATION[login_method]: # local login - if CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS >= 2: - return 5 - elif CFG_ACCESS_CONTROL_NOTIFY_USER_ABOUT_NEW_ACCOUNT: - activated = 2 # Email confirmation required - elif CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS >= 1: - activated = 0 # Administrator confirmation required - - - # okay, go on and register the user: FIXME - user = User(nickname=nickname, - email=email, - password=passw, - note=activated, - last_login=datetime.datetime.now()) - - if CFG_ACCESS_CONTROL_NOTIFY_USER_ABOUT_NEW_ACCOUNT: - user.verify_email() - - try: - db.session.add(user) - db.session.commit() - except Exception: - current_app.logger.exception("Could not store user.") - db.session.rollback() - return 7 - if activated == 1: # Ok we consider the user as logged in :-) - setUid(req, uid) - return 0 - -def updateDataUser(uid, email, nickname): - """ - Update user data. Used when a user changed his email or password - or nickname. - """ - email = email.lower() - if email == 'guest': - return 0 - - if CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS < 2: - run_sql("""update "user" set email=%s where id=%s""", (email, uid)) - - if nickname and nickname != '': - run_sql("""update "user" set nickname=%s where id=%s""", (nickname, uid)) - - return 1 - -def updatePasswordUser(uid, password): - """Update the password of a user.""" - if CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS < 3: - u = User.query.filter_by(id=uid).first() - if u: - u.password = password - db.session.commit() - return 1 - - -def username_exists_p(username): - """Check if USERNAME exists in the system. Username may be either - nickname or email. - - Return 1 if it does exist, 0 if it does not. - """ - - if username == "": - # return not exists if asked for guest users - return 0 - res = run_sql("""SELECT email FROM "user" WHERE email=%s OR nicname=%s""", - (username, username)) - if len(res) > 0: - return 1 - return 0 - -def emailUnique(p_email): - """Check if the email address only exists once. If yes, return userid, if not, -1 - """ - - query_result = run_sql("""select id, email from "user" where email=%s""", (p_email,)) - if len(query_result) == 1: - return query_result[0][0] - elif len(query_result) == 0: - return 0 - return -1 - - -def update_Uid(req, p_email, remember_me=False): - """It updates the userId of the session. It is used when a guest user is logged in succesfully in the system with a given email and password. - As a side effect it will discover all the restricted collection to which the user has right to - """ - query_ID = int(run_sql("""select id from "user" where email=%s""", - (p_email,))[0][0]) - - setUid(req, query_ID, remember_me) - return query_ID - -def send_new_admin_account_warning(new_account_email, send_to, ln=CFG_SITE_LANG): - """Send an email to the address given by send_to about the new account new_account_email.""" - _ = gettext_set_language(ln) - sub = _("New account on") + " '%s'" % CFG_SITE_NAME - if CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS == 1: - sub += " - " + _("PLEASE ACTIVATE") - body = _("A new account has been created on") + " '%s'" % CFG_SITE_NAME - if CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS == 1: - body += _(" and is awaiting activation") - body += ":\n\n" - body += _(" Username/Email") + ": %s\n\n" % new_account_email - body += _("You can approve or reject this account request at") + ": %s/admin/webaccess/webaccessadmin.py/manageaccounts\n" % CFG_SITE_URL - return send_email(CFG_SITE_SUPPORT_EMAIL, send_to, subject=sub, content=body) - -def get_email(uid): - """Return email address of the user uid. Return string 'guest' in case - the user is not found.""" - out = "guest" - res = run_sql("""SELECT email FROM "user" WHERE id=%s""", (uid,), 1) - if res and res[0][0]: - out = res[0][0].lower() - return out - -def get_email_from_username(username): - """Return email address of the user corresponding to USERNAME. - The username may be either nickname or email. Return USERNAME - untouched if not found in the database or if found several - matching entries. - """ - if username == '': - return '' - out = username - res = run_sql("""SELECT email FROM "user" WHERE email=%s""", (username,), 1) + \ - run_sql("""SELECT email FROM "user" WHERE nickname=%s""", (username,), 1) - if res and len(res) == 1: - out = res[0][0].lower() - return out - -def get_nickname(uid): - """Return nickname of the user uid. Return None in case - the user is not found.""" - out = None - res = run_sql("""SELECT nickname FROM "user" WHERE id=%s""", (uid,), 1) - if res and res[0][0]: - out = res[0][0] - return out - -def get_nickname_or_email(uid): - """Return nickname (preferred) or the email address of the user uid. - Return string 'guest' in case the user is not found.""" - out = "guest" - res = run_sql("""SELECT nickname, email FROM "user" WHERE id=%s""", (uid,), 1) - if res and res[0]: - if res[0][0]: - out = res[0][0] - elif res[0][1]: - out = res[0][1].lower() - return out - - -def create_userinfobox_body(req, uid, language="en"): - """Create user info box body for user UID in language LANGUAGE.""" - # TODO return nothing. webuser'll be removed! - return "" - - -def list_registered_users(): - """List all registered users.""" - return run_sql("""SELECT id,email FROM "user" where email!=''""") - -def list_users_in_role(role): - """List all users of a given role (see table accROLE) - @param role: role of user (string) - @return: list of uids - """ - res = run_sql("""SELECT uacc.id_user - FROM user_accROLE uacc JOIN accROLE acc - ON uacc.id_accROLE=acc.id - WHERE acc.name=%s""", - (role,), run_on_slave=True) - if res: - return map(lambda x: int(x[0]), res) - return [] - -def list_users_in_roles(role_list): - """List all users of given roles (see table accROLE) - @param role_list: list of roles [string] - @return: list of uids - """ - if not(type(role_list) is list or type(role_list) is tuple): - role_list = [role_list] - query = """SELECT DISTINCT(uacc.id_user) - FROM user_accROLE uacc JOIN accROLE acc - ON uacc.id_accROLE=acc.id - """ - query_addons = "" - query_params = () - if len(role_list) > 0: - query_params = role_list - query_addons = " WHERE " - for role in role_list[:-1]: - query_addons += "acc.name=%s OR " - query_addons += "acc.name=%s" - res = run_sql(query + query_addons, query_params, run_on_slave=True) - if res: - return map(lambda x: int(x[0]), res) - return [] - -def get_user_preferences(uid): - user = User.query.get(uid) - if user is not None: - return user.settings - from invenio_accounts.models import get_default_user_preferences - return get_default_user_preferences() # empty dict mean no preferences - -def set_user_preferences(uid, pref): - assert isinstance(pref, dict) - run_sql("""UPDATE "user" SET settings=%s WHERE id=%s""", - (serialize_via_marshal(pref), uid)) - - -def collect_user_info(req, login_time=False, refresh=False): - """Given the mod_python request object rec or a uid it returns a dictionary - containing at least the keys uid, nickname, email, groups, plus any external keys in - the user preferences (collected at login time and built by the different - external authentication plugins) and if the mod_python request object is - provided, also the remote_ip, remote_host, referer, agent fields. - NOTE: if req is a mod_python request object, the user_info dictionary - is saved into req._user_info (for caching purpouses) - setApacheUser & setUid will properly reset it. - """ - from flask_login import current_user - from invenio.ext.login import UserInfo - - if (type(req) in [long, int] and current_user.get_id() != req) \ - or req is None: - return UserInfo(req) - - return current_user._get_current_object() - - -def generate_csrf_token(req): - """Generate a new CSRF token and store it in the user session. - - Generate random CSRF token for the current user and store it in - the current session. Also, store the time stamp when it was - generated. - - Return tuple (csrf_token, csrf_token_time). - - """ - csrf_token = binascii.hexlify(os.urandom(32)) - csrf_token_time = time.time() - session_param_set(req, 'csrf_token', csrf_token) - session_param_set(req, 'csrf_token_time', csrf_token_time) - return (csrf_token, csrf_token_time) - - -def regenerate_csrf_token_if_needed(req, token_expiry=300): - """Regenerate CSRF token, if necessary, and store it in session. - - Check whether user session has stored CSRF token, and whether it - is still not expired, i.e. whether not more than `token_expiry` - seconds elapsed since current session's CSRF token was created. - If not, then create new one. - - Return tuple (csrf_token, csrf_token_time). - """ - - csrf_token = session_param_get(req, 'csrf_token') - csrf_token_time = session_param_get(req, 'csrf_token_time') - - if not csrf_token or not csrf_token_time: - csrf_token, csrf_token_time = generate_csrf_token(req) - - if csrf_token_time + token_expiry < time.time(): - csrf_token, csrf_token_time = generate_csrf_token(req) - - return (csrf_token, csrf_token_time) - - -def is_csrf_token_valid(req, token_value, token_expiry=300): - """Check whether CSRF token is still valid. - - Take CSRF token value from current session and check whether it is - equal to the passed `token_value`. Also, check whether it has not - expired yet, i.e. whether not more than `token_expiry` seconds - elapsed since current session's CSRF token was created. - - Return True if everything is OK, False otherwise. - """ - - # retrieve CSRF token from session: - csrf_token = session_param_get(req, 'csrf_token') - if not csrf_token: - return False - - # retrieve CSRF token's timestamp from session: - csrf_token_time = session_param_get(req, 'csrf_token_time') - if not csrf_token_time: - return False - - # is session's CSRF token not yet expired? - if csrf_token_time + token_expiry < time.time(): - return False - - # is session's CSRF token equal to given value? - if not token_value or token_value != csrf_token: - return False - - # OK, every test passed, we are good: - return True