Page MenuHomec4science

import_user_db.py
No OneTemporary

File Metadata

Created
Thu, May 2, 10:49

import_user_db.py

# -*- coding: utf-8 -*-
import logging
import yaml
from .. import export
from .. import colored
from .group_importer import GroupImporter
__author__ = "Nicolas Richart"
__copyright__ = "Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale " \
"de Lausanne) - SCITAS (Scientific IT and Application " \
"Support)"
__credits__ = ["Nicolas Richart"]
__license__ = "BSD"
__version__ = "0.1"
__maintainer__ = "Nicolas Richart"
__email__ = "nicolas.richart@epfl.ch"
_logger = logging.getLogger(__name__)
@export
class ImportUserDB:
def __init__(self, in_directory, out_directory, **kwargs):
self._in_directory = in_directory
self._out_directory = out_directory
self._users = {}
self._imported_groups = {}
self._default_importer_config = {}
self._cache_file = kwargs.pop('cache_file', None)
if self._cache_file is not None:
self._populate()
self._keyring = kwargs.pop('keyring', None)
self._dry_run = kwargs.pop('dry_run', False)
def _populate(self):
try:
with open(self._cache_file, 'r+') as _cache_file:
_data = yaml.load(_cache_file)
if 'users' in _data:
self._users = _data['users']
if 'groups' in _data:
self._imported_groups = _data['groups']
except FileNotFoundError:
pass
def save(self):
if self._cache_file is None:
return
if self._dry_run:
return
with open(self._cache_file, 'w') as _cache_file:
yaml.dump(
{'users': self._users,
'groups': self._imported_groups},
_cache_file,
default_flow_style=False)
def import_group(self, _id, _imported_id, _in_name, _out_name):
self._imported_groups[_id] = {'out_id': _imported_id,
'in_name': _in_name,
'out_name': _out_name}
def add_users(self, users):
for _user in users:
if _user in self._users and 'oid' in self._users[_user]:
_logger.debug(
'User {0} ({1}) from {2} already in cache [{3} - {4}]'
.format(
self._in_directory.color_name(
self._users[_user]['name'], type='user'),
self._in_directory.color_name(_user),
self._in_directory.backend_name,
self._out_directory.color_name(
self._users[_user]['oid']),
self._out_directory.backend_name))
continue
_user_info = {'id': _user}
try:
_mail = self._in_directory.get_user_email(_user)
_user_info['email'] = _mail
_user_info['name'] = self._in_directory.get_user_name(_user)
_user_info['in_login'] = self._in_directory.get_user_login(
_user)
_out_id = self._out_directory.get_user_unique_id(_mail)
if _out_id is not None:
_logger.debug(
'Found {0} ({1}) in {2} as {3} in {4}'.format(
self._in_directory.color_name(_user_info['name'],
type='user'),
self._in_directory.color_name(_user),
self._in_directory.backend_name,
self._out_directory.color_name(_out_id),
self._out_directory.backend_name))
_user_info['oid'] = _out_id
self._users[_user] = _user_info
else:
_logger.debug(
'Did not find {0} ({1}) from {2} in {3}'.format(
self._in_directory.color_name(_user_info['name'],
type='user'),
self._in_directory.color_name(_user),
self._in_directory.backend_name,
self._out_directory.backend_name))
except:
_logger.warning(
"The user {0} does not exists in {1}".format(
self._in_directory.color_name(_user),
colored(self._in_directory.backend_name,
attrs=['bold'])))
def get_user_oids(self, users_ids):
return [self._users[_user]['oid'] for _user in users_ids
if (_user in self._users) and ('oid' in self._users[_user])]
def is_id_in_cache(self, _id):
return _id in self._users
def get_user_name(self, _id, directory=None):
if _id in self._users and 'name' in self._users[_id]:
return self._users[_id]['name']
else:
return directory.get_user_name(_id)
def get_group_name(self, _id, directory=None):
return directory.get_user_name(_id)
@property
def default_importer(self):
return self._default_importer_config
@default_importer.setter
def default_importer(self, importer_config):
self._default_importer_config = importer_config
@property
def users(self):
return self._users
@property
def directory(self):
return self._out_directory
def get_importer(self, name):
if name not in self._default_importer_config:
name = '__all__'
importer = GroupImporter(
name, self._default_importer_config[name],
backend_in={'directory': self._in_directory},
backend_out={'directory': self._out_directory},
user_db=self,
keyring=self._keyring,
dry_run=self._dry_run)
return importer
@property
def in_directory(self):
return self._in_directory
def group(self, _id, create=False):
if _id in self._imported_groups:
_logger.debug('Found group {0} in cache: {1}'.format(
self._in_directory.color_name(_id),
self._out_directory.color_name(
self._imported_groups[_id]['out_name'])))
return self._imported_groups[_id]['out_id']
else:
_name = self._in_directory.get_group_name(_id)
if _name is not None:
_importer = self.get_importer(_name)
_t_name = _importer.transfered_name(_name)
_out = self.get_group_unique_id(
_t_name, create=create, in_name=_name)
if _out is None:
_logger.error(
' Could not find {1} ({0}) in directory {2} {3}'
.format(
self._in_directory.color_name(_id),
self._in_directory.color_name(
_t_name, type='group'),
self._out_directory.backend_name,
create))
return _out
else:
_logger.error(
' Could not find {0} in directory {1}'.format(
_id, self._in_directory.backend_name))
return None
def group_by_in_name(self, _name):
for _id, data in self._imported_groups.items():
if data['in_name'] == _name:
return data['out_id']
return None
def group_by_out_name(self, _name):
for _id, data in self._imported_groups.items():
if data['out_name'] == _name:
return data['out_id']
return None
def get_group_unique_id(self, _name, create=False,
in_name=None, members=set(),
transfer_options={}):
_gid = self.group_by_out_name(_name)
if _gid is not None:
return _gid
_gid = self._out_directory.get_group_unique_id(_name)
if _gid is not None:
_logger.debug("Found group {0} in {1}: {2}".format(
self._out_directory.color_name(_name),
self._out_directory.backend_name,
self._out_directory.color_name(_gid)
))
if members:
members.update(self._out_directory.get_users_from_group(_gid))
self._out_directory.set_group_users(_gid, members)
return _gid
if not create:
return None
if in_name is None:
in_name = _name
_logger.debug("Group {0} not in {1}, try to create it".format(
self._out_directory.color_name(_name),
self._out_directory.backend_name
))
_importer = self.get_importer(in_name)
_gid = _importer.transfer(in_name, **transfer_options)
if members:
self._out_directory.set_group_users(_gid, members)
return _gid
def user_by_in_login(self, login):
for _id, data in self._users.items():
if data['in_login'] == login:
return data
_id = self._in_directory.get_user_unique_id_from_login(login)
if _id is not None:
_name = self._in_directory.get_user_name(_id)
_email = self._in_directory.get_user_email(_id)
self._users[_id] = {
'name': _name,
'email': _email,
'in_login': login
}
return self._users[_id]
return None
def user(self, _id, **kwargs):
if _id in self._users and 'oid' in self._users[_id]:
_logger.debug('Found user {0} in cache: {1}'.format(
self._in_directory.color_name(_id),
self._out_directory.color_name(self._users[_id]['oid'])))
return self._users[_id]['oid']
if _id in self._users and 'email' in self._users[_id]:
_logger.debug('Partially found user {0} in cache: {1}'.format(
self._in_directory.color_name(_id),
self._out_directory.color_name(self._users[_id]['email'])))
_email = self._users[_id]['email']
else:
_email = self._in_directory.get_user_email(_id)
_oid = None
if _email is not None:
_oid = self._out_directory.get_user_unique_id(_email)
if _oid:
_logger.debug('Found a match for user {0}: {1}'.format(
self._in_directory.color_name(_email),
self._out_directory.color_name(_oid)))
if _id not in self._users:
self._users[_id] = { # dirty fix
'name': '',
'email': _email,
'in_login': _email,
'oid': _oid
}
else:
self._users[_id]['oid'] = _oid
return _oid

Event Timeline