Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F91009199
phabricator.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, Nov 6, 21:42
Size
5 KB
Mime Type
text/x-python
Expires
Fri, Nov 8, 21:42 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
22178680
Attached To
rPHAPI Phabricator API scripts
phabricator.py
View Options
# -*- coding: utf-8 -*-
import
logging
from
...
import
colored
from
...
import
export
from
...
import
dry_do
from
...
import
Directory
from
...utils
import
get_phabricator_instance
__author__
=
"Nicolas Richart"
__copyright__
=
"Copyright (C) 2016, EPFL (Ecole Polytechnique Fédérale "
\
"de Lausanne) - SCITAS (Scientific IT and Application "
\
"Support)"
__credits__
=
[
"Nicolas Richart"
]
__license__
=
"BSD"
__version__
=
"0.1"
__maintainer__
=
"Nicolas Richart"
__email__
=
"nicolas.richart@epfl.ch"
_logger
=
logging
.
getLogger
(
__name__
)
@export
class
PhabDirectory
(
Directory
):
def
__init__
(
self
,
*
args
,
host
=
None
,
username
=
None
,
token
=
None
,
**
kwargs
):
super
()
.
__init__
(
**
kwargs
)
self
.
_phab
=
get_phabricator_instance
(
host
=
host
,
username
=
username
,
token
=
token
)
def
_set_default_policy
(
self
,
phid
):
me
=
self
.
_phab
.
user
.
whoami
()
_default_policy
=
[{
"type"
:
"edit"
,
"value"
:
me
[
'phid'
]},
{
"type"
:
"view"
,
"value"
:
"obj.project.members"
},
{
"type"
:
"join"
,
"value"
:
"no-one"
}]
if
self
.
_dry_run
:
phid
=
"PHID-PROJ-notarealproject"
_msg
=
"Setting default policy for project {0} to {1}"
.
format
(
colored
(
phid
,
'red'
,
attrs
=
[
'bold'
]),
', '
.
join
([
"{0}: {1}"
.
format
(
colored
(
m
[
"type"
],
attrs
=
[
'bold'
]),
colored
(
m
[
'value'
],
'blue'
))
for
m
in
_default_policy
]))
_logger
.
debug
(
_msg
)
if
not
self
.
_dry_run
:
self
.
_phab
.
project
.
edit
(
transactions
=
_default_policy
,
objectIdentifier
=
phid
)
else
:
dry_do
(
_msg
)
def
is_valid_user
(
self
,
id
):
return
self
.
get_user_name
(
id
)
!=
''
def
is_valid_group
(
self
,
id
):
return
self
.
get_group_name
(
id
)
!=
''
def
get_users_from_group
(
self
,
id
):
_res
=
self
.
_phab
.
project
.
search
(
constraints
=
{
'phids'
:
[
id
]},
attachments
=
{
'members'
:
True
})
if
_res
[
'data'
]:
return
[
member
[
'phid'
]
for
member
in
_res
[
'data'
][
0
][
'attachments'
][
'members'
][
'members'
]]
return
[]
def
get_group_unique_id
(
self
,
name
):
_res
=
self
.
_phab
.
project
.
query
(
names
=
[
name
])
if
_res
[
'data'
]:
return
list
(
_res
[
'data'
]
.
keys
())[
0
]
return
''
def
get_user_unique_id
(
self
,
email
):
_res
=
self
.
_phab
.
user
.
query
(
emails
=
[
email
])
if
_res
:
return
_res
[
0
][
'phid'
]
return
''
def
get_group_name
(
self
,
gid
):
_res
=
self
.
_phab
.
project
.
query
(
phid
=
[
gid
])
if
_res
[
'data'
]:
return
_res
[
0
][
'data'
][
gid
][
'name'
]
return
''
def
get_user_name
(
self
,
uid
):
_res
=
self
.
_phab
.
user
.
query
(
phid
=
[
uid
])
if
_res
:
return
_res
[
0
][
'realName'
]
return
''
def
get_user_email
(
self
,
uid
):
raise
RuntimeError
(
"This information is not accessible"
)
def
create_group
(
self
,
name
,
members
=
[]):
_unique_members
=
list
(
set
(
members
))
_msg
=
'Creating group {0} with members {1}'
.
format
(
colored
(
name
,
'red'
,
attrs
=
[
'bold'
]),
colored
(
_unique_members
,
attrs
=
[
'bold'
]))
_logger
.
debug
(
_msg
)
if
not
self
.
_dry_run
:
_res
=
self
.
_phab
.
project
.
create
(
name
=
name
,
members
=
_unique_members
)
_phid
=
_res
[
'phid'
]
self
.
_set_default_policy
(
_phid
)
return
_phid
else
:
dry_do
(
_msg
)
self
.
_set_default_policy
(
None
)
return
None
def
set_group_users
(
self
,
gid
,
uids
):
_unique_uids
=
list
(
set
(
uids
))
_msg
=
'Setting users {0} as members of group {1}'
.
format
(
colored
(
_unique_uids
,
attrs
=
[
'bold'
]),
colored
(
gid
,
'red'
,
attrs
=
[
'bold'
]))
_logger
.
debug
(
_msg
)
transactions
=
[{
"type"
:
"members.set"
,
"value"
:
_unique_uids
}]
if
not
self
.
_dry_run
:
self
.
_phab
.
project
.
edit
(
transactions
=
transactions
,
objectIdentifier
=
gid
)
else
:
dry_do
(
_msg
)
def
create_subgroup
(
self
,
name
,
pgid
,
members
=
None
):
_msg
=
'Creating group {0} as a subgroup of {1}'
.
format
(
colored
(
name
,
'red'
,
attrs
=
[
'bold'
]),
colored
(
pgid
,
attrs
=
[
'bold'
]))
_logger
.
debug
(
_msg
)
transactions
=
[{
"type"
:
"parent"
,
"value"
:
pgid
},
{
"type"
:
"name"
,
"value"
:
name
}]
if
members
is
not
None
:
_unique_members
=
list
(
set
(
members
))
transactions
.
append
({
"type"
:
"members.set"
,
"value"
:
_unique_members
})
if
not
self
.
_dry_run
:
_res
=
self
.
_phab
.
project
.
edit
(
transactions
=
transactions
)
_phid
=
_res
[
'object'
][
'phid'
]
self
.
_set_default_policy
(
_phid
)
return
_phid
else
:
dry_do
(
_msg
)
self
.
_set_default_policy
(
None
)
return
None
Event Timeline
Log In to Comment