Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F87377445
repo_importer.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Oct 12, 07:55
Size
6 KB
Mime Type
text/x-python
Expires
Mon, Oct 14, 07:55 (2 d)
Engine
blob
Format
Raw Data
Handle
21586954
Attached To
rPHAPI Phabricator API scripts
repo_importer.py
View Options
# -*- coding: utf-8 -*-
import
logging
import
copy
from
..
import
export
from
..
import
colored
from
..
import
Repo
from
.
import
Importer
from
.
import
GroupImporter
__author__
=
"Nicolas Richart"
__copyright__
=
"Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale "
\
"de Lausanne) - SCITAS (Scientific IT and Application "
\
"Support)"
__credits__
=
[
"Nicolas Richart"
]
__license__
=
"BSD"
__version__
=
"0.1"
__maintainer__
=
"Nicolas Richart"
__email__
=
"nicolas.richart@epfl.ch"
_logger
=
logging
.
getLogger
(
__name__
)
@export
class
RepoImporter
(
Importer
):
__default_import_scheme
=
{
'type'
:
'git'
,
'permissions'
:
{
'scheme'
:
'import'
}}
def
__init__
(
self
,
name
,
config
,
**
kwargs
):
super
()
.
__init__
(
name
,
config
,
self
.
__default_import_scheme
,
**
kwargs
)
if
'group_importer'
in
kwargs
and
kwargs
[
'group_importer'
]
is
not
None
:
self
.
_group_importer
=
kwargs
[
'group_importer'
]
else
:
self
.
_group_importer
=
GroupImporter
(
'__all__'
,
{},
keyring
=
self
.
_keyring
,
dry_run
=
self
.
_dry_run
,
backend_in
=
self
.
_backend_in
,
backend_out
=
self
.
_backend_out
,
user_db
=
self
.
_users_db
)
_logger
.
info
(
'Initializing importer for Repo {0}'
' with configuration: {1}'
.
format
(
self
.
_colored_name
,
self
.
_config
))
def
transfer
(
self
,
name
):
_colored_name
=
colored
(
name
,
'red'
,
attrs
=
[
'bold'
])
_logger
.
info
(
'Locking for repo: {0} ({1})'
.
format
(
_colored_name
,
self
.
_colored_name
))
_import_scheme
=
copy
.
copy
(
self
.
_config
[
'import-scheme'
])
_logger
.
debug
(
' --> repo info {0}'
.
format
(
colored
(
self
.
_config
,
attrs
=
[
'bold'
])))
_config
=
copy
.
copy
(
self
.
_config
)
_type
=
_config
.
pop
(
'type'
,
self
.
__default_import_scheme
[
'type'
])
_in_repo
=
Repo
(
name
=
name
,
keyring
=
self
.
_keyring
,
dry_run
=
self
.
_dry_run
,
type
=
_type
,
**
self
.
_backend_in
)
_type
=
_import_scheme
[
'type'
]
if
_type
==
'same'
:
_type
=
_in_repo
.
repo_type
_name
=
_import_scheme
.
pop
(
'name'
,
self
.
_name
)
.
format
(
original_name
=
self
.
_name
)
_out_repo
=
Repo
(
name
=
_name
,
keyring
=
self
.
_keyring
,
dry_run
=
self
.
_dry_run
,
type
=
_type
,
**
self
.
_backend_out
)
_out_repo
.
create
()
_permissions_scheme
=
_import_scheme
[
'permissions'
]
_perms
=
{
'edit'
:
[],
'view'
:
[],
'push'
:
[]}
if
_permissions_scheme
[
'scheme'
]
==
'import'
:
_in_perms
=
_in_repo
.
permissions
_logger
.
debug
(
"Replicating permissions {0}"
.
format
(
_in_perms
))
def
_add_to_perms
(
_perms
,
ug_perm
,
_id
):
if
ug_perm
==
Repo
.
EDIT
:
_perms
[
'edit'
]
.
append
(
_id
)
_perms
[
'push'
]
.
append
(
_id
)
_perms
[
'view'
]
.
append
(
_id
)
if
ug_perm
==
Repo
.
PUSH
:
_perms
[
'push'
]
.
append
(
_id
)
_perms
[
'view'
]
.
append
(
_id
)
if
ug_perm
==
Repo
.
VIEW
:
_perms
[
'view'
]
.
append
(
_id
)
for
_group
in
_in_perms
.
groups
:
_in_gid
=
_group
[
'id'
]
_out_gid
=
self
.
_user_db
.
group
(
_in_gid
)
if
_out_gid
is
None
:
name
=
_in_repo
.
directory
.
get_group_name
(
_in_gid
)
if
name
is
not
None
:
_out_gid
=
self
.
_group_importer
.
transfer
(
name
)
if
_out_gid
is
not
None
:
_add_to_perms
(
_perms
,
_group
[
'perm'
],
_out_gid
)
else
:
_logger
.
error
(
' Could not find {0} in directory {1}'
.
format
(
_in_gid
,
self
.
_user_db
.
directory
.
backend_name
))
for
user
in
_in_perms
.
users
:
_in_id
=
user
[
'id'
]
_out_id
=
self
.
_user_db
.
user
(
_in_id
)
if
_out_id
is
None
:
self
.
_user_db
.
add_users
([
_in_id
],
_in_repo
.
directory
)
_out_id
=
self
.
_user_db
.
user
(
_in_id
)
if
_out_id
is
not
None
:
_add_to_perms
(
_perms
,
user
[
'perm'
],
_out_id
)
else
:
_logger
.
error
(
' Could not find {0} in directory {1}'
.
format
(
colored
(
_in_id
,
attrs
=
[
'bold'
]),
self
.
_user_db
.
directory
.
backend_name
))
if
_in_perms
.
anonymous
:
_perms
[
'view'
]
=
[
'public'
]
elif
_permissions_scheme
[
'scheme'
]
==
'project'
:
if
'project'
in
_permissions_scheme
:
_gid
=
self
.
_user_db
.
directory
.
get_group_unique_id
(
_permissions_scheme
[
'project'
])
if
_gid
is
not
None
:
_perms
[
'edit'
]
=
[
_gid
]
_perms
[
'view'
]
=
[
_gid
]
_perms
[
'push'
]
=
[
_gid
]
else
:
_msg
=
'You should specify a project name in the '
+
\
'permissions of repo {0} to be able to use the '
+
\
'
\'
project
\'
import scheme'
.
format
(
_colored_name
)
_logger
.
error
(
_msg
)
raise
RuntimeError
(
_msg
)
elif
_permissions_scheme
[
'scheme'
]
==
'user'
:
_perms
[
'edit'
]
=
[
'obj.repository.author'
]
_perms
[
'view'
]
=
[
'obj.repository.author'
]
_perms
[
'push'
]
=
[
'obj.repository.author'
]
elif
_permissions_scheme
[
'scheme'
]
==
'static'
:
for
_perm_type
in
[
'edit'
,
'view'
,
'push'
]:
if
_perm_type
not
in
_permissions_scheme
:
_msg
=
'You should specify a
\'
{0}
\'
in the '
+
\
'permissions of repo {1} to be able to use the '
+
\
'
\'
project
\'
import scheme'
.
format
(
_perm_type
,
_colored_name
)
_logger
.
error
(
_msg
)
raise
RuntimeError
(
_msg
)
for
_perm_type
in
[
'edit'
,
'view'
,
'push'
]:
if
_perm_type
in
_permissions_scheme
:
_perm
=
_permissions_scheme
[
_perm_type
]
if
_perm
==
'user'
:
_perm
=
'obj.repository.author'
_perms
[
_perm_type
]
=
[
_perm
]
_out_repo
.
set_permissions
(
**
_perms
)
Event Timeline
Log In to Comment