Page MenuHomec4science

import_user_db.py
No OneTemporary

File Metadata

Created
Thu, Nov 7, 07:36

import_user_db.py

# -*- coding: utf-8 -*-
import logging
import yaml
from .. import export
from .. import colored
__author__ = "Nicolas Richart"
__copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \
"de Lausanne) - SCITAS (Scientific IT and Application " \
"Support)"
__credits__ = ["Nicolas Richart"]
__license__ = "BSD"
__version__ = "0.1"
__maintainer__ = "Nicolas Richart"
__email__ = "nicolas.richart@epfl.ch"
_logger = logging.getLogger(__name__)
@export
class ImportUserDB:
def __init__(self, in_directory, out_directory, cache_file=None):
self._in_directory = in_directory
self._out_directory = out_directory
self._users = {}
self._imported_groups = {}
self._default_importer = None
self._cache_file = cache_file
if self._cache_file is not None:
self._populate()
def _populate(self):
try:
with open(self._cache_file, 'r+') as _cache_file:
_data = yaml.load(_cache_file)
if 'users' in _data:
self._users = _data['users']
if 'groups' in _data:
self._imported_groups = _data['groups']
except FileNotFoundError:
pass
def save(self):
if self._cache_file is None:
return
with open(self._cache_file, 'w') as _cache_file:
yaml.dump(
{'users': self._users,
'groups': self._imported_groups},
_cache_file,
default_flow_style=False)
def import_group(self, _id, _imported_id, _in_name, _out_name):
self._imported_groups[_id] = {'out_id': _imported_id,
'in_name': _in_name,
'out_name': _out_name}
def add_users(self, users):
for _user in users:
if _user in self._users:
_logger.debug(
'User {0} ({1}) from {2} already in cache [{3} - {4}]'.format(
self._in_directory.color_name(
self._users[_user]['name'], type='user'),
self._in_directory.color_name(_user),
self._in_directory.backend_name,
self._out_directory.color_name(
self._users[_user]['oid']),
self._out_directory.backend_name))
continue
_user_info = {'id': _user}
try:
_mail = self._in_directory.get_user_email(_user)
_user_info['email'] = _mail
_user_info['name'] = self._in_directory.get_user_name(_user)
_user_info['in_login'] = self._in_directory.get_user_login(
_user)
_out_id = self._out_directory.get_user_unique_id(_mail)
if _out_id is not None:
_logger.debug(
'Found {0} ({1}) in {2} as {3} in {4}'.format(
self._in_directory.color_name(_user_info['name'],
type='user'),
self._in_directory.color_name(_user),
self._in_directory.backend_name,
self._out_directory.color_name(_out_id),
self._out_directory.backend_name))
_user_info['oid'] = _out_id
self._users[_user] = _user_info
else:
_logger.debug(
'Did not find {0} ({1}) from {2} in {3}'.format(
self._in_directory.color_name(_user_info['name'],
type='user'),
self._in_directory.color_name(_user),
self._in_directory.backend_name,
self._out_directory.backend_name))
except:
_logger.warning(
"The user {0} does not exists in {1}".format(
self._in_directory.color_name(_user),
colored(self._in_directory.backend_name,
attrs=['bold'])))
def get_user_oids(self, users_ids):
return [self._users[_user]['oid'] for _user in users_ids
if (_user in self._users) and ('oid' in self._users[_user])]
@property
def default_importer(self):
return self._default_importer
@default_importer.setter
def default_importer(self, importer):
self._default_importer = importer
@property
def users(self):
return self._users
@property
def directory(self):
return self._out_directory
@property
def in_directory(self):
return self._in_directory
def group(self, _id, create=False):
if _id in self._imported_groups:
_logger.debug('Found group {0} in cache: {1}'.format(
self._in_directory.color_name(_id),
self._out_directory.color_name(self._imported_groups[_id])))
return self._imported_groups[_id]['out_id']
else:
_name = self._in_directory.get_group_name(_id)
if _name is not None:
_t_name = self._default_importer.transfered_name(_name)
_out = self._out_directory.get_group_unique_id(_t_name)
if (_out is None) and create:
_out = self._default_importer.transfer(_name)
else:
_logger.error(
' Could not find {1} ({0}) in directory {2} {3}'.format(
self._in_directory.color_name(_id),
self._in_directory.color_name(_t_name, type='group'),
self._out_directory.backend_name,
create))
return _out
else:
_logger.error(
' Could not find {0} in directory {1}'.format(
_id, self._in_directory.backend_name))
return None
def group_by_in_name(self, _name):
for _id, data in self._imported_groups.items():
if data['in_name'] == _name:
return data['out_id']
return None
def group_by_out_name(self, _name):
for _id, data in self._imported_groups.items():
if data['out_name'] == _name:
return data['out_id']
return None
def get_group_unique_id(self, _name):
_gid = self.group_by_out_name(_name)
if _gid is None:
return self._out_directory.get_group_unique_id(_name)
return _gid
def user_by_in_login(self, login):
for _id, data in self._users.items():
if data['in_login'] == login:
return data
_id = self._in_directory.get_user_unique_id_from_login(login)
if _id is not None:
_name = self._in_directory.get_user_name(_id)
_email = self._in_directory.get_user_email(_id)
self._users[_id] = {'name': _name, 'email': _email, 'in_login': login}
return self._users[_id]
return None
def user(self, _id, **kwargs):
if _id in self._users:
_logger.debug('Found user {0} in cache: {1}'.format(
self._in_directory.color_name(_id),
self._out_directory.color_name(self._users[_id]['oid'])))
return self._users[_id]['oid']
else:
_email = self._in_directory.get_user_email(_id)
if _email is not None:
return self._out_directory.get_user_unique_id(_email)
return None

Event Timeline