Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F61851592
phabricator.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, May 9, 09:07
Size
8 KB
Mime Type
text/x-python
Expires
Sat, May 11, 09:07 (2 d)
Engine
blob
Format
Raw Data
Handle
17567272
Attached To
rPHAPI Phabricator API scripts
phabricator.py
View Options
# -*- coding: utf-8 -*-
import
logging
import
copy
import
re
import
time
from
...
import
export
from
...
import
dry_do
from
...repo
import
Repo
from
...utils
import
get_phabricator_instance
from
..
import
color_phid
__author__
=
"Nicolas Richart"
__copyright__
=
"Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale "
\
"de Lausanne) - SCITAS (Scientific IT and Application "
\
"Support)"
__credits__
=
[
"Nicolas Richart"
]
__license__
=
"BSD"
__version__
=
"0.1"
__maintainer__
=
"Nicolas Richart"
__email__
=
"nicolas.richart@epfl.ch"
_logger
=
logging
.
getLogger
(
__name__
)
@export
class
PhabRepo
(
Repo
):
def
__init__
(
self
,
*
args
,
host
=
None
,
username
=
None
,
token
=
None
,
**
kwargs
):
super
()
.
__init__
(
**
kwargs
,
username
=
username
)
options
=
copy
.
copy
(
kwargs
)
self
.
_repo_type
=
options
.
pop
(
'repo_type'
,
'git'
)
# _create = options.pop('create', False)
self
.
_host
=
'{0}/api/'
.
format
(
host
.
rstrip
(
'/'
))
self
.
_server
=
None
_server_re
=
re
.
compile
(
r'https?://(.*)'
)
_match
=
_server_re
.
match
(
host
.
rstrip
(
'/'
))
if
_match
is
not
None
:
self
.
_server
=
_match
.
group
(
1
)
if
self
.
_server
is
None
:
raise
RuntimeError
(
'Cannot extract the server name for repo {0} from {1}'
.
format
(
self
.
_colored_name
,
host
))
self
.
_phab
=
get_phabricator_instance
(
host
=
self
.
_host
,
username
=
username
,
token
=
token
)
_data
=
self
.
_phab
.
diffusion
.
repository
.
search
(
queryKey
=
"all"
,
constraints
=
{
"name"
:
self
.
_name
})[
'data'
]
self
.
_phab_id
=
None
for
_repo
in
_data
:
_repo_name
=
_repo
[
'fields'
][
'name'
]
if
_repo_name
==
self
.
_name
:
self
.
_phab_id
=
_repo
[
'phid'
]
self
.
_id
=
_repo
[
'id'
]
_logger
.
debug
(
'Repositories {0} has id {1}'
.
format
(
self
.
_colored_name
,
self
.
_phab_id
))
if
self
.
_phab_id
is
None
:
_logger
.
debug
(
'Repositories {0} not in phabricator'
.
format
(
self
.
_colored_name
))
def
create
(
self
):
if
self
.
_phab_id
is
not
None
:
_msg
=
'The repository {0}:{1} already exists'
.
format
(
self
.
_colored_name
,
self
.
_phab_id
)
_logger
.
error
(
_msg
)
return
#raise RuntimeError(_msg)
if
self
.
_dry_run
:
self
.
_phab_id
=
"PHID-REPO-notarealrepo"
else
:
_data
=
self
.
_phab
.
diffusion
.
repository
.
edit
(
transactions
=
[{
'type'
:
'name'
,
'value'
:
self
.
_name
},
{
'type'
:
'vcs'
,
'value'
:
self
.
_repo_type
}])
self
.
_creation_data
=
_data
[
'object'
]
self
.
_phab_id
=
self
.
_creation_data
[
'phid'
]
_msg
=
'Created repository {0} id {1}'
.
format
(
self
.
_colored_name
,
color_phid
(
self
.
_phab_id
))
_logger
.
info
(
_msg
)
if
self
.
_dry_run
:
dry_do
(
_msg
)
self
.
_id
=
666
else
:
self
.
_id
=
self
.
_creation_data
[
'id'
]
@property
def
url
(
self
):
if
self
.
repo_type
==
'git'
:
self
.
_url
=
'git@{0}:/diffusion/{1}/{2}.git'
.
format
(
self
.
_server
,
self
.
_id
,
self
.
_name
)
elif
self
.
repo_type
==
'svn'
:
self
.
_url
=
'svn+ssh://git@{0}/diffusion/{1}/{2}.git'
.
format
(
self
.
_server
,
self
.
_id
)
return
self
.
_url
def
enable
(
self
):
_msg
=
'Activating repository {0} [{1}]'
.
format
(
self
.
_colored_name
,
color_phid
(
self
.
_phab_id
))
_logger
.
info
(
_msg
)
if
self
.
_dry_run
:
dry_do
(
_msg
)
else
:
self
.
_phab
.
diffusion
.
repository
.
edit
(
transactions
=
[{
'type'
:
'status'
,
'value'
:
'active'
}],
objectIdentifier
=
self
.
_phab_id
)
def
wait_enabled
(
self
,
timeout
=
3600
):
_msg
=
'Checking if {0} [{1}] is activated'
.
format
(
self
.
_colored_name
,
color_phid
(
self
.
_phab_id
))
_logger
.
info
(
_msg
)
if
self
.
_dry_run
:
dry_do
(
_msg
)
else
:
_time
=
0
while
True
:
_data
=
self
.
_phab
.
diffusion
.
repository
.
search
(
queryKey
=
"all"
,
constraints
=
{
"phids"
:
[
self
.
_phab_id
]})[
'data'
]
if
not
len
(
_data
)
==
1
:
raise
RuntimeError
(
'Cannot find the repo {0}'
.
format
(
self
.
_colored_name
))
_status
=
_data
[
0
][
'fields'
][
'status'
]
if
_status
==
'active'
or
_time
>
timeout
:
return
time
.
sleep
(
1
)
_time
+=
1
def
set_permissions
(
self
,
permissions
):
_perms
=
{
'edit'
:
[],
'push'
:
[],
'view'
:
[]}
_equivalent
=
{
'edit'
:
Repo
.
EDIT
,
'view'
:
Repo
.
VIEW
,
'push'
:
Repo
.
PUSH
}
_phab_perms
=
{
'edit'
:
'edit'
,
'view'
:
'view'
,
'push'
:
'policy.push'
}
_special_perms
=
{
'_author_'
:
'obj.repository.author'
,
'_users_'
:
'users'
,
'_public_'
:
'public'
}
for
_type
in
{
'group'
,
'user'
}:
_perms_ug
=
getattr
(
permissions
,
'{0}s'
.
format
(
_type
))
for
_entity
in
_perms_ug
:
_id
=
_entity
[
'id'
]
if
_id
in
_special_perms
:
_id
=
_special_perms
[
_id
]
for
_phab
,
_gen
in
_equivalent
.
items
():
if
_entity
[
'perm'
]
&
_gen
:
_perms
[
_phab
]
.
append
(
_id
)
if
permissions
.
anonymous
:
_perms
[
'view'
]
=
[
'public'
]
for
_type
in
[
'push'
,
'view'
,
'edit'
]:
if
_type
not
in
_perms
:
continue
_perms
[
_type
]
=
list
(
set
(
_perms
[
_type
]))
if
len
(
_perms
[
_type
])
>
1
:
if
'public'
in
_perms
[
_type
]:
_perms
[
_type
]
=
'public'
continue
if
'users'
in
_perms
[
_type
]:
_perms
[
_type
]
=
'users'
continue
# create custom policy to replace the list
regex
=
re
.
compile
(
r'PHID-([A-Z]{4})-.+'
)
_lists
=
{
'PROJ'
:
[],
'USER'
:
[]}
for
_p
in
_perms
[
_type
]:
match
=
regex
.
match
(
_p
)
if
match
is
not
None
:
_lists
[
match
.
group
(
1
)]
.
append
(
_p
)
elif
_p
==
'obj.repository.author'
:
_lists
[
'USER'
]
.
append
(
self
.
_directory
.
whoami
)
_policy
=
[
{
"action"
:
"allow"
,
"rule"
:
'PhabricatorUsersPolicyRule'
,
"value"
:
_lists
[
'USER'
]},
{
"action"
:
"allow"
,
"rule"
:
'PhabricatorProjectsPolicyRule'
,
"value"
:
_lists
[
'PROJ'
]}]
_msg
=
'Creating policy for users [{0}] and projects [{1}]'
.
format
(
', '
.
join
([
color_phid
(
_id
)
for
_id
in
_lists
[
'USER'
]]),
', '
.
join
([
color_phid
(
_id
)
for
_id
in
_lists
[
'PROJ'
]]))
_logger
.
debug
(
_msg
)
if
self
.
_dry_run
:
dry_do
(
_msg
)
_phid
=
'PHID-PLCY-notapolicy'
else
:
_data
=
self
.
_phab
.
policy
.
create
(
objectType
=
'REPO'
,
default
=
'deny'
,
policy
=
_policy
)
_phid
=
_data
[
'phid'
]
_logger
.
info
(
'Replacing list {0} by policy {1}'
.
format
(
_perms
[
_type
],
_phid
))
_perms
[
_type
]
=
_phid
else
:
_perms
[
_type
]
=
_perms
[
_type
][
0
]
_msg
=
'Setting
\'
{0}
\'
permissions for {1} to {2}:'
.
format
(
_type
,
self
.
_colored_name
,
color_phid
(
_perms
[
_type
]))
_logger
.
info
(
_msg
)
if
not
self
.
_dry_run
:
self
.
_phab
.
diffusion
.
repository
.
edit
(
transactions
=
[{
'type'
:
_phab_perms
[
_type
],
'value'
:
_perms
[
_type
]}],
objectIdentifier
=
self
.
_phab_id
)
else
:
dry_do
(
_msg
)
Event Timeline
Log In to Comment