Page MenuHomec4science

ldap.py
No OneTemporary

File Metadata

Created
Thu, Aug 29, 13:56
# -*- coding: utf-8 -*-
from ... import export
from ...directory import Directory
import ldap3 as ldap
__author__ = "Nicolas Richart"
__copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \
"de Lausanne) - SCITAS (Scientific IT and Application " \
"Support)"
__credits__ = ["Nicolas Richart"]
__license__ = "BSD"
__version__ = "0.1"
__maintainer__ = "Nicolas Richart"
__email__ = "nicolas.richart@epfl.ch"
@export
class LDAPDirectory(Directory):
def __init__(self, uri, *args, **kwargs):
self.__ldap_basedn = kwargs.pop('basedn', '')
self.__ldap_scope = kwargs.pop('scope', ldap.SUBTREE)
self.__ldap_user_unique_id = kwargs.pop('uidNumber', 'uidNumber')
self.__ldap_user_gecos = kwargs.pop('gecos', 'gecos')
self.__ldap_user_id = kwargs.pop('uid', 'uid')
self.__ldap_user_email = kwargs.pop('email', 'email')
self.__ldap_user_filter = kwargs.pop('user_filter', '(&(objectClass=posixAccount)({attr}={value}))') # NOQA: ignore=E501
self.__ldap_user_group_attrs = kwargs.pop('user_group_attrs', 'memberOf') # NOQA: ignore=E501
self.__ldap_group_unique_id = kwargs.pop('gidNumber', 'gidNumber')
self.__ldap_group_id = kwargs.pop('gid', 'cn')
self.__ldap_group_filter = kwargs.pop('group_filter', '(&(objectClass=posixGroup)({attr}={value}))') # NOQA: ignore=E501
self.__ldap_group_member_filter = kwargs.pop('group_member_filter', 'uidNumber') # NOQA: ignore=E501
self.__ldap_group_user_attrs = kwargs.pop('group_user_attrs', 'memberUid') # NOQA: ignore=E501
super(LDAPDirectory, self).__init__(*args, **kwargs)
self.__ldap_uri = uri
self.__server = ldap.Server(self.__ldap_uri)
self.__ldap = ldap.Connection(self.__server, auto_bind=True)
def __get_one(self, fltr, attr):
"""get the first ldap entry of attribute (attr) for a given
filter (fltr)"""
return self.__get_all(fltr, attr)[0]
def __get_one_attr(self, fltr, attr):
"""get the first ldap entry of attribute (attr) for a given
filter (fltr)"""
_res = self.__get_all(fltr, attr)
if len(_res) != 0:
return _res[0][attr].value
return ''
def __get_all(self, fltr, attr):
"""get all the ldap attributes entries (attr) for a given
filter (fltr)"""
if type(attr) is not list:
attrs = [attr]
else:
attrs = attr
_res = self.__ldap.search(search_base=self.__ldap_basedn,
search_scope=self.__ldap_scope,
search_filter=fltr,
attributes=attrs)
if _res:
return self.__ldap.entries
else:
return []
def is_valid_user(self, id):
_res = self.__get_all(
self.__ldap_user_filter.format(
attr=self.__ldap_user_unique_id,
value=id),
self.__ldap_user_unique_id
)
return len(_res) != 0
def is_valid_group(self, id):
_res = self.__get_one(
self.__ldap_user_filter.format(
attr=self.__ldap_group_unique_id,
value=id),
self.__ldap_group_unique_id
)
return len(_res) != 0
def get_users_from_group(self, id):
_users = []
_members = self.__get_one_attr(
self.__ldap_group_filter.format(
attr=self.__ldap_group_unique_id,
value=id),
self.__ldap_group_user_attrs
)
if self.__ldap_group_member_filter != self.__ldap_user_unique_id:
for m in _members:
_filter = \
self.__ldap_user_filter.format(
attr=self.__ldap_group_member_filter,
value=m)
_id = self.__get_one_attr(
_filter,
self.__ldap_user_unique_id,
)
if _id:
_users.append(_id)
else:
for m in _members:
if self.is_valid_user(m):
_users.append(m)
return _users
def get_group_unique_id(self, name):
return self.__get_one_attr(
self.__ldap_group_filter.format(
attr=self.__ldap_group_id,
value=name),
self.__ldap_group_unique_id)
def get_user_unique_id(self, email):
return self.__get_one_attr(
self.__ldap_user_filter.format(
attr=self.__ldap_user_email,
value=email),
self.__ldap_user_unique_id)
def get_group_name(self, id):
return self.__get_one_attr(
self.__ldap_group_filter.format(
attr=self.__ldap_group_unique_id,
value=id),
self.__ldap_group_id)
def get_user_name(self, id):
return self.__get_one_attr(
self.__ldap_user_filter.format(
attr=self.__ldap_user_unique_id,
value=id),
self.__ldap_user_gecos)
def get_user_email(self, id):
return self.__get_one_attr(
self.__ldap_user_filter.format(
attr=self.__ldap_user_unique_id,
value=id),
self.__ldap_user_email)

Event Timeline