Page MenuHomec4science

cern_ldap.py
No OneTemporary

File Metadata

Created
Mon, Jun 17, 18:57

cern_ldap.py

## This file is part of Invenio.
## Copyright (C) 2009, 2010, 2011 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""Invenio LDAP interface for BibCirculation at CERN. """
from time import sleep
from thread import get_ident
from invenio.config import CFG_CERN_SITE
try:
import ldap
import ldap.filter
#from invenio.external_authentication_cern_wrapper import _cern_nice_soap_auth
CFG_BIBCIRCULATION_HAS_LDAP = CFG_CERN_SITE
except (ImportError, IOError):
CFG_BIBCIRCULATION_HAS_LDAP = False
# from base64 import decodestring
# This is the old configuration
# CFG_CERN_LDAP_URI = "ldaps://ldap.cern.ch:636"
# CFG_CERN_LDAP_BIND = "n=%s,ou=users,o=cern,c=ch"
# CFG_CERN_LDAP_BASE = "O=CERN,C=CH"
CFG_CERN_LDAP_URI = "ldap://xldap.cern.ch:389"
#CFG_CERN_LDAP_BASE = "ou=users,ou=organic units,dc=cern,dc=ch"
CFG_CERN_LDAP_BASE = "dc=cern,dc=ch"
# This one also works but the previous one is recommended
# CFG_CERN_LDAP_URI = "ldap://ldap.cern.ch"
# CFG_CERN_LDAP_BIND = "cn=%s,ou=users,ou=organic units,dc=cern,dc=ch"
# CFG_CERN_LDAP_BASE = "O=CERN,C=CH"
_ldap_connection_pool = {}
def _cern_ldap_login():
#user, password = decodestring(_cern_nice_soap_auth).split(':', 1)
connection = ldap.initialize(CFG_CERN_LDAP_URI)
#connection.simple_bind(CFG_CERN_LDAP_BIND % user, password)
return connection
def get_user_info_from_ldap(nickname="", email="", ccid=""):
"""Query the CERN LDAP server for information about a user.
Return a dictionary of information"""
try:
connection = _ldap_connection_pool[get_ident()]
except KeyError:
connection = _ldap_connection_pool[get_ident()] = _cern_ldap_login()
if nickname:
query = '(displayName=%s)' % ldap.filter.escape_filter_chars(nickname)
elif email:
query = '(mail=%s)' % ldap.filter.escape_filter_chars(email)
elif ccid:
query = '(employeeID=%s)' % ldap.filter.escape_filter_chars(str(ccid))
else:
return {}
query_filter = "(& %s (| (employeetype=primary) (employeetype=external) (employeetype=ExCern) ) )" % query
try:
results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE,
query_filter, timeout=5)
except ldap.LDAPError:
## Mmh.. connection error? Let's reconnect at least once just in case
sleep(1)
connection = _ldap_connection_pool[get_ident()] = _cern_ldap_login()
results = connection.search_st(CFG_CERN_LDAP_BASE, ldap.SCOPE_SUBTREE,
query_filter, timeout=5)
if len(results) > 1:
## Maybe one ExCern and primary at the same time. In this case let's give precedence to ExCern
types = {}
for result in results:
types[result[1]['employeeType'][0]] = result[1]
if 'ExCern' in types and 'Primary' in types:
return types['ExCern']
if 'Primary' in types:
return types['Primary']
## Ok otherwise we just pick up something :-)
if results:
return results[0][1]
else:
return {}

Event Timeline