Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F77462561
import_user_db.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, Aug 14, 15:26
Size
10 KB
Mime Type
text/x-python
Expires
Fri, Aug 16, 15:26 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
19872140
Attached To
rPHAPI Phabricator API scripts
import_user_db.py
View Options
# -*- coding: utf-8 -*-
import
logging
import
yaml
from
..
import
export
from
..
import
colored
from
.group_importer
import
GroupImporter
__author__
=
"Nicolas Richart"
__copyright__
=
"Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale "
\
"de Lausanne) - SCITAS (Scientific IT and Application "
\
"Support)"
__credits__
=
[
"Nicolas Richart"
]
__license__
=
"BSD"
__version__
=
"0.1"
__maintainer__
=
"Nicolas Richart"
__email__
=
"nicolas.richart@epfl.ch"
_logger
=
logging
.
getLogger
(
__name__
)
@export
class
ImportUserDB
:
def
__init__
(
self
,
in_directory
,
out_directory
,
**
kwargs
):
self
.
_in_directory
=
in_directory
self
.
_out_directory
=
out_directory
self
.
_users
=
{}
self
.
_imported_groups
=
{}
self
.
_default_importer_config
=
{}
self
.
_cache_file
=
kwargs
.
pop
(
'cache_file'
,
None
)
if
self
.
_cache_file
is
not
None
:
self
.
_populate
()
self
.
_keyring
=
kwargs
.
pop
(
'keyring'
,
None
)
self
.
_dry_run
=
kwargs
.
pop
(
'dry_run'
,
False
)
def
_populate
(
self
):
try
:
with
open
(
self
.
_cache_file
,
'r+'
)
as
_cache_file
:
_data
=
yaml
.
load
(
_cache_file
)
if
'users'
in
_data
:
self
.
_users
=
_data
[
'users'
]
if
'groups'
in
_data
:
self
.
_imported_groups
=
_data
[
'groups'
]
except
FileNotFoundError
:
pass
def
save
(
self
):
if
self
.
_cache_file
is
None
:
return
if
self
.
_dry_run
:
return
with
open
(
self
.
_cache_file
,
'w'
)
as
_cache_file
:
yaml
.
dump
(
{
'users'
:
self
.
_users
,
'groups'
:
self
.
_imported_groups
},
_cache_file
,
default_flow_style
=
False
)
def
import_group
(
self
,
_id
,
_imported_id
,
_in_name
,
_out_name
):
self
.
_imported_groups
[
_id
]
=
{
'out_id'
:
_imported_id
,
'in_name'
:
_in_name
,
'out_name'
:
_out_name
}
def
add_users
(
self
,
users
):
for
_user
in
users
:
if
_user
in
self
.
_users
and
'oid'
in
self
.
_users
[
_user
]:
_logger
.
debug
(
'User {0} ({1}) from {2} already in cache [{3} - {4}]'
.
format
(
self
.
_in_directory
.
color_name
(
self
.
_users
[
_user
][
'name'
],
type
=
'user'
),
self
.
_in_directory
.
color_name
(
_user
),
self
.
_in_directory
.
backend_name
,
self
.
_out_directory
.
color_name
(
self
.
_users
[
_user
][
'oid'
]),
self
.
_out_directory
.
backend_name
))
continue
_user_info
=
{
'id'
:
_user
}
try
:
_mail
=
self
.
_in_directory
.
get_user_email
(
_user
)
_user_info
[
'email'
]
=
_mail
_user_info
[
'name'
]
=
self
.
_in_directory
.
get_user_name
(
_user
)
_user_info
[
'in_login'
]
=
self
.
_in_directory
.
get_user_login
(
_user
)
_out_id
=
self
.
_out_directory
.
get_user_unique_id
(
_mail
)
if
_out_id
is
not
None
:
_logger
.
debug
(
'Found {0} ({1}) in {2} as {3} in {4}'
.
format
(
self
.
_in_directory
.
color_name
(
_user_info
[
'name'
],
type
=
'user'
),
self
.
_in_directory
.
color_name
(
_user
),
self
.
_in_directory
.
backend_name
,
self
.
_out_directory
.
color_name
(
_out_id
),
self
.
_out_directory
.
backend_name
))
_user_info
[
'oid'
]
=
_out_id
self
.
_users
[
_user
]
=
_user_info
else
:
_logger
.
debug
(
'Did not find {0} ({1}) from {2} in {3}'
.
format
(
self
.
_in_directory
.
color_name
(
_user_info
[
'name'
],
type
=
'user'
),
self
.
_in_directory
.
color_name
(
_user
),
self
.
_in_directory
.
backend_name
,
self
.
_out_directory
.
backend_name
))
except
:
_logger
.
warning
(
"The user {0} does not exists in {1}"
.
format
(
self
.
_in_directory
.
color_name
(
_user
),
colored
(
self
.
_in_directory
.
backend_name
,
attrs
=
[
'bold'
])))
def
get_user_oids
(
self
,
users_ids
):
return
[
self
.
_users
[
_user
][
'oid'
]
for
_user
in
users_ids
if
(
_user
in
self
.
_users
)
and
(
'oid'
in
self
.
_users
[
_user
])]
def
is_id_in_cache
(
self
,
_id
):
return
_id
in
self
.
_users
def
get_user_name
(
self
,
_id
,
directory
=
None
):
if
_id
in
self
.
_users
and
'name'
in
self
.
_users
[
_id
]:
return
self
.
_users
[
_id
][
'name'
]
else
:
return
directory
.
get_user_name
(
_id
)
def
get_group_name
(
self
,
_id
,
directory
=
None
):
return
directory
.
get_user_name
(
_id
)
@property
def
default_importer
(
self
):
return
self
.
_default_importer_config
@default_importer.setter
def
default_importer
(
self
,
importer_config
):
self
.
_default_importer_config
=
importer_config
@property
def
users
(
self
):
return
self
.
_users
@property
def
directory
(
self
):
return
self
.
_out_directory
def
get_importer
(
self
,
name
):
if
name
not
in
self
.
_default_importer_config
:
name
=
'__all__'
importer
=
GroupImporter
(
name
,
self
.
_default_importer_config
[
name
],
backend_in
=
{
'directory'
:
self
.
_in_directory
},
backend_out
=
{
'directory'
:
self
.
_out_directory
},
user_db
=
self
,
keyring
=
self
.
_keyring
,
dry_run
=
self
.
_dry_run
)
return
importer
@property
def
in_directory
(
self
):
return
self
.
_in_directory
def
group
(
self
,
_id
,
create
=
False
):
if
_id
in
self
.
_imported_groups
:
_logger
.
debug
(
'Found group {0} in cache: {1}'
.
format
(
self
.
_in_directory
.
color_name
(
_id
),
self
.
_out_directory
.
color_name
(
self
.
_imported_groups
[
_id
][
'out_name'
])))
return
self
.
_imported_groups
[
_id
][
'out_id'
]
else
:
_name
=
self
.
_in_directory
.
get_group_name
(
_id
)
if
_name
is
not
None
:
_importer
=
self
.
get_importer
(
_name
)
_t_name
=
_importer
.
transfered_name
(
_name
)
_out
=
self
.
get_group_unique_id
(
_t_name
,
create
=
create
,
in_name
=
_name
)
if
_out
is
None
:
_logger
.
error
(
' Could not find {1} ({0}) in directory {2} {3}'
.
format
(
self
.
_in_directory
.
color_name
(
_id
),
self
.
_in_directory
.
color_name
(
_t_name
,
type
=
'group'
),
self
.
_out_directory
.
backend_name
,
create
))
return
_out
else
:
_logger
.
error
(
' Could not find {0} in directory {1}'
.
format
(
_id
,
self
.
_in_directory
.
backend_name
))
return
None
def
group_by_in_name
(
self
,
_name
):
for
_id
,
data
in
self
.
_imported_groups
.
items
():
if
data
[
'in_name'
]
==
_name
:
return
data
[
'out_id'
]
return
None
def
group_by_out_name
(
self
,
_name
):
for
_id
,
data
in
self
.
_imported_groups
.
items
():
if
data
[
'out_name'
]
==
_name
:
return
data
[
'out_id'
]
return
None
def
get_group_unique_id
(
self
,
_name
,
create
=
False
,
in_name
=
None
,
members
=
set
(),
transfer_options
=
{}):
_gid
=
self
.
group_by_out_name
(
_name
)
if
_gid
is
not
None
:
return
_gid
_gid
=
self
.
_out_directory
.
get_group_unique_id
(
_name
)
if
_gid
is
not
None
:
_logger
.
debug
(
"Found group {0} in {1}: {2}"
.
format
(
self
.
_out_directory
.
color_name
(
_name
),
self
.
_out_directory
.
backend_name
,
self
.
_out_directory
.
color_name
(
_gid
)
))
if
members
:
members
.
update
(
self
.
_out_directory
.
get_users_from_group
(
_gid
))
self
.
_out_directory
.
set_group_users
(
_gid
,
members
)
return
_gid
if
not
create
:
return
None
if
in_name
is
None
:
in_name
=
_name
_logger
.
debug
(
"Group {0} not in {1}, try to create it"
.
format
(
self
.
_out_directory
.
color_name
(
_name
),
self
.
_out_directory
.
backend_name
))
_importer
=
self
.
get_importer
(
in_name
)
_gid
=
_importer
.
transfer
(
in_name
,
**
transfer_options
)
if
members
:
self
.
_out_directory
.
set_group_users
(
_gid
,
members
)
return
_gid
def
user_by_in_login
(
self
,
login
):
for
_id
,
data
in
self
.
_users
.
items
():
if
data
[
'in_login'
]
==
login
:
return
data
_id
=
self
.
_in_directory
.
get_user_unique_id_from_login
(
login
)
if
_id
is
not
None
:
_name
=
self
.
_in_directory
.
get_user_name
(
_id
)
_email
=
self
.
_in_directory
.
get_user_email
(
_id
)
self
.
_users
[
_id
]
=
{
'name'
:
_name
,
'email'
:
_email
,
'in_login'
:
login
}
return
self
.
_users
[
_id
]
return
None
def
user
(
self
,
_id
,
**
kwargs
):
if
_id
in
self
.
_users
and
'oid'
in
self
.
_users
[
_id
]:
_logger
.
debug
(
'Found user {0} in cache: {1}'
.
format
(
self
.
_in_directory
.
color_name
(
_id
),
self
.
_out_directory
.
color_name
(
self
.
_users
[
_id
][
'oid'
])))
return
self
.
_users
[
_id
][
'oid'
]
if
_id
in
self
.
_users
and
'email'
in
self
.
_users
[
_id
]:
_logger
.
debug
(
'Partially found user {0} in cache: {1}'
.
format
(
self
.
_in_directory
.
color_name
(
_id
),
self
.
_out_directory
.
color_name
(
self
.
_users
[
_id
][
'email'
])))
_email
=
self
.
_users
[
_id
][
'email'
]
else
:
_email
=
self
.
_in_directory
.
get_user_email
(
_id
)
_oid
=
None
if
_email
is
not
None
:
_oid
=
self
.
_out_directory
.
get_user_unique_id
(
_email
)
if
_oid
:
_logger
.
debug
(
'Found a match for user {0}: {1}'
.
format
(
self
.
_in_directory
.
color_name
(
_email
),
self
.
_out_directory
.
color_name
(
_oid
)))
if
_id
not
in
self
.
_users
:
self
.
_users
[
_id
]
=
{
'name'
:
''
,
'email'
:
_email
,
'in_login'
:
_email
,
# Dirty fix
'oid'
:
_oid
}
return
_oid
Event Timeline
Log In to Comment