Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F90669629
webgroup.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sun, Nov 3, 18:02
Size
31 KB
Mime Type
text/x-python
Expires
Tue, Nov 5, 18:02 (2 d)
Engine
blob
Format
Raw Data
Handle
22117284
Attached To
R3600 invenio-infoscience
webgroup.py
View Options
## This file is part of Invenio.
## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""Group features."""
__revision__
=
"$Id$"
import
sys
from
invenio.config
import
CFG_SITE_LANG
from
invenio.messages
import
gettext_set_language
from
invenio.websession_config
import
CFG_WEBSESSION_INFO_MESSAGES
,
\
CFG_WEBSESSION_USERGROUP_STATUS
,
\
CFG_WEBSESSION_GROUP_JOIN_POLICY
from
invenio.webuser
import
nickname_valid_p
,
get_user_info
from
invenio.webmessage
import
perform_request_send
import
invenio.webgroup_dblayer
as
db
from
invenio.dbquery
import
IntegrityError
try
:
import
invenio.template
websession_templates
=
invenio
.
template
.
load
(
'websession'
)
except
ImportError
:
pass
if
sys
.
hexversion
<
0x2040000
:
# pylint: disable=W0622
from
sets
import
Set
as
set
# pylint: enable=W0622
def
perform_request_groups_display
(
uid
,
infos
=
[],
errors
=
[],
warnings
=
[],
\
ln
=
CFG_SITE_LANG
):
"""Display all the groups the user belongs to.
@param uid: user id
@param info: info about last user action
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
"""
_
=
gettext_set_language
(
ln
)
body
=
""
(
body_admin
,
errors_admin
)
=
display_admin_groups
(
uid
,
ln
)
(
body_member
,
errors_member
)
=
display_member_groups
(
uid
,
ln
)
(
body_external
,
errors_external
)
=
display_external_groups
(
uid
,
ln
)
if
errors_admin
:
errors
.
extend
(
errors_admin
)
if
errors_member
:
errors
.
extend
(
errors_member
)
if
errors_external
:
errors
.
extend
(
errors_external
)
body
=
websession_templates
.
tmpl_display_all_groups
(
infos
=
infos
,
admin_group_html
=
body_admin
,
member_group_html
=
body_member
,
external_group_html
=
body_external
,
warnings
=
warnings
,
ln
=
ln
)
return
(
body
,
errors
,
warnings
)
def
display_admin_groups
(
uid
,
ln
=
CFG_SITE_LANG
):
"""Display groups the user is admin of.
@param uid: user id
@param ln: language
@return: a (body, errors[]) formed tuple
return html groups representation the user is admin of
"""
body
=
""
errors
=
[]
record
=
db
.
get_groups_by_user_status
(
uid
=
uid
,
user_status
=
CFG_WEBSESSION_USERGROUP_STATUS
[
"ADMIN"
])
body
=
websession_templates
.
tmpl_display_admin_groups
(
groups
=
record
,
ln
=
ln
)
return
(
body
,
errors
)
def
display_member_groups
(
uid
,
ln
=
CFG_SITE_LANG
):
"""Display groups the user is member of.
@param uid: user id
@param ln: language
@return: a (body, errors[]) formed tuple
body : html groups representation the user is member of
"""
body
=
""
errors
=
[]
records
=
db
.
get_groups_by_user_status
(
uid
,
user_status
=
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
]
)
body
=
websession_templates
.
tmpl_display_member_groups
(
groups
=
records
,
ln
=
ln
)
return
(
body
,
errors
)
def
display_external_groups
(
uid
,
ln
=
CFG_SITE_LANG
):
"""Display groups the user is admin of.
@param uid: user id
@param ln: language
@return: a (body, errors[]) formed tuple
return html groups representation the user is admin of
"""
body
=
""
errors
=
[]
record
=
db
.
get_external_groups
(
uid
)
if
record
:
body
=
websession_templates
.
tmpl_display_external_groups
(
groups
=
record
,
ln
=
ln
)
else
:
body
=
None
return
(
body
,
errors
)
def
perform_request_input_create_group
(
group_name
,
group_description
,
join_policy
,
warnings
=
[],
ln
=
CFG_SITE_LANG
):
"""Display form for creating new group.
@param group_name: name of the group entered if the page has been reloaded
@param group_description: description entered if the page has been reloaded
@param join_policy: join policy chosen if the page has been reloaded
@param warnings: warnings
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
body: html for group creation page
"""
body
=
""
errors
=
[]
body
=
websession_templates
.
tmpl_display_input_group_info
(
group_name
,
group_description
,
join_policy
,
act_type
=
"create"
,
warnings
=
warnings
,
ln
=
ln
)
return
(
body
,
errors
,
warnings
)
def
perform_request_create_group
(
uid
,
group_name
,
group_description
,
join_policy
,
ln
=
CFG_SITE_LANG
):
"""Create new group.
@param group_name: name of the group entered
@param group_description: description of the group entered
@param join_policy: join policy of the group entered
@param ln: language
@return: a (body, errors, warnings) formed tuple
warning != [] if group_name or join_policy are not valid
or if the name already exists in the database
body="1" if succeed in order to display info on the main page
"""
_
=
gettext_set_language
(
ln
)
body
=
""
warnings
=
[]
errors
=
[]
infos
=
[]
if
group_name
==
""
:
warnings
.
append
((
'WRN_WEBSESSION_NO_GROUP_NAME'
,))
(
body
,
errors
,
warnings
)
=
perform_request_input_create_group
(
group_name
,
group_description
,
join_policy
,
warnings
=
warnings
)
elif
not
group_name_valid_p
(
group_name
):
warnings
.
append
(
'WRN_WEBSESSION_NOT_VALID_GROUP_NAME'
)
(
body
,
errors
,
warnings
)
=
perform_request_input_create_group
(
group_name
,
group_description
,
join_policy
,
warnings
=
warnings
)
elif
join_policy
==
"-1"
:
warnings
.
append
(
'WRN_WEBSESSION_NO_JOIN_POLICY'
)
(
body
,
errors
,
warnings
)
=
perform_request_input_create_group
(
group_name
,
group_description
,
join_policy
,
warnings
=
warnings
)
elif
db
.
group_name_exist
(
group_name
):
warnings
.
append
(
'WRN_WEBSESSION_GROUP_NAME_EXISTS'
)
(
body
,
errors
,
warnings
)
=
perform_request_input_create_group
(
group_name
,
group_description
,
join_policy
,
warnings
=
warnings
)
else
:
db
.
insert_new_group
(
uid
,
group_name
,
group_description
,
join_policy
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"GROUP_CREATED"
])
(
body
,
errors
,
warnings
)
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
errors
=
errors
,
warnings
=
warnings
,
ln
=
ln
)
return
(
body
,
errors
,
warnings
)
def
perform_request_input_join_group
(
uid
,
group_name
,
search
,
warnings
=
[],
ln
=
CFG_SITE_LANG
):
"""Return html for joining new group.
@param group_name: name of the group entered if user is looking for a group
@param search=1 if search performed else 0
@param warnings: warnings coming from perform_request_join_group
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
"""
errors
=
[]
group_from_search
=
{}
records
=
db
.
get_visible_group_list
(
uid
=
uid
)
if
search
:
group_from_search
=
db
.
get_visible_group_list
(
uid
,
group_name
)
body
=
websession_templates
.
tmpl_display_input_join_group
(
records
.
items
(),
group_name
,
group_from_search
.
items
(),
search
,
warnings
=
warnings
,
ln
=
ln
)
return
(
body
,
errors
,
warnings
)
def
perform_request_join_group
(
uid
,
grpID
,
group_name
,
search
,
ln
=
CFG_SITE_LANG
):
"""Join group.
@param grpID: list of the groups the user wants to join,
only one value must be selected among the two group lists
(default group list, group list resulting from the search)
@param group_name: name of the group entered if search on name performed
@param search=1 if search performed else 0
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
warnings != [] if 0 or more than one group is selected
"""
_
=
gettext_set_language
(
ln
)
body
=
""
warnings
=
[]
errors
=
[]
infos
=
[]
if
"-1"
in
grpID
:
grpID
=
filter
(
lambda
x
:
x
!=
'-1'
,
grpID
)
if
len
(
grpID
)
==
1
:
grpID
=
int
(
grpID
[
0
])
# test if user is already member or pending
status
=
db
.
get_user_status
(
uid
,
grpID
)
if
status
:
warnings
.
append
(
'WRN_WEBSESSION_ALREADY_MEMBER'
)
(
body
,
errors
,
warnings
)
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
errors
=
errors
,
warnings
=
warnings
,
ln
=
ln
)
# insert new user of group
else
:
group_infos
=
db
.
get_group_infos
(
grpID
)
group_type
=
group_infos
[
0
][
3
]
if
group_type
==
CFG_WEBSESSION_GROUP_JOIN_POLICY
[
"VISIBLEMAIL"
]:
db
.
insert_new_member
(
uid
,
grpID
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"PENDING"
])
admin
=
db
.
get_users_by_status
(
grpID
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"ADMIN"
])[
0
][
1
]
group_name
=
group_infos
[
0
][
1
]
msg_subjet
,
msg_body
=
websession_templates
.
tmpl_admin_msg
(
group_name
=
group_name
,
grpID
=
grpID
,
ln
=
ln
)
(
body
,
errors
,
warnings
,
dummy
,
dummy
)
=
\
perform_request_send
(
uid
,
msg_to_user
=
admin
,
msg_to_group
=
""
,
msg_subject
=
msg_subjet
,
msg_body
=
msg_body
,
ln
=
ln
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"JOIN_REQUEST"
])
elif
group_type
==
CFG_WEBSESSION_GROUP_JOIN_POLICY
[
"VISIBLEOPEN"
]:
db
.
insert_new_member
(
uid
,
grpID
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
])
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"JOIN_GROUP"
])
(
body
,
errors
,
warnings
)
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
errors
=
errors
,
warnings
=
warnings
,
ln
=
ln
)
else
:
warnings
.
append
(
'WRN_WEBSESSION_MULTIPLE_GROUPS'
)
(
body
,
errors
,
warnings
)
=
perform_request_input_join_group
(
uid
,
group_name
,
search
,
warnings
,
ln
)
return
(
body
,
errors
,
warnings
)
def
perform_request_input_leave_group
(
uid
,
warnings
=
[],
ln
=
CFG_SITE_LANG
):
"""Return html for leaving group.
@param uid: user ID
@param warnings: warnings != [] if 0 group is selected or if not admin
of the
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
"""
body
=
""
errors
=
[]
groups
=
[]
records
=
db
.
get_groups_by_user_status
(
uid
=
uid
,
user_status
=
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
])
map
(
lambda
x
:
groups
.
append
((
x
[
0
],
x
[
1
])),
records
)
body
=
websession_templates
.
tmpl_display_input_leave_group
(
groups
,
warnings
=
warnings
,
ln
=
ln
)
return
(
body
,
errors
,
warnings
)
def
perform_request_leave_group
(
uid
,
grpID
,
confirmed
=
0
,
ln
=
CFG_SITE_LANG
):
"""Leave group.
@param uid: user ID
@param grpID: ID of the group the user wants to leave
@param warnings: warnings != [] if 0 group is selected
@param confirmed: a confirmed page is first displayed
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
"""
_
=
gettext_set_language
(
ln
)
body
=
""
warnings
=
[]
errors
=
[]
infos
=
[]
if
not
grpID
==
-
1
:
if
confirmed
:
db
.
leave_group
(
grpID
,
uid
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"LEAVE_GROUP"
])
(
body
,
errors
,
warnings
)
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
errors
=
errors
,
warnings
=
warnings
,
ln
=
ln
)
else
:
body
=
websession_templates
.
tmpl_confirm_leave
(
uid
,
grpID
,
ln
)
else
:
warnings
.
append
(
'WRN_WEBSESSION_NO_GROUP_SELECTED'
)
(
body
,
errors
,
warnings
)
=
perform_request_input_leave_group
(
uid
,
warnings
=
warnings
,
ln
=
ln
)
return
(
body
,
errors
,
warnings
)
def
perform_request_edit_group
(
uid
,
grpID
,
warnings
=
[],
ln
=
CFG_SITE_LANG
):
"""Return html for group editing.
@param uid: user ID
@param grpID: ID of the group
@param warnings: warnings
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
"""
body
=
''
errors
=
[]
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
len
(
user_status
):
errors
.
append
(
'ERR_WEBSESSION_DB_ERROR'
)
return
(
body
,
errors
,
warnings
)
elif
user_status
[
0
][
0
]
!=
CFG_WEBSESSION_USERGROUP_STATUS
[
'ADMIN'
]:
errors
.
append
((
'ERR_WEBSESSION_GROUP_NO_RIGHTS'
,))
return
(
body
,
errors
,
warnings
)
group_infos
=
db
.
get_group_infos
(
grpID
)[
0
]
if
not
len
(
group_infos
):
errors
.
append
(
'ERR_WEBSESSION_DB_ERROR'
)
return
(
body
,
errors
,
warnings
)
body
=
websession_templates
.
tmpl_display_input_group_info
(
group_name
=
group_infos
[
1
],
group_description
=
group_infos
[
2
],
join_policy
=
group_infos
[
3
],
act_type
=
"update"
,
grpID
=
grpID
,
warnings
=
warnings
,
ln
=
ln
)
return
(
body
,
errors
,
warnings
)
def
perform_request_update_group
(
uid
,
grpID
,
group_name
,
group_description
,
join_policy
,
ln
=
CFG_SITE_LANG
):
"""Update group datas in database.
@param uid: user ID
@param grpID: ID of the group
@param group_name: name of the group
@param group_description: description of the group
@param join_policy: join policy of the group
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
"""
body
=
''
errors
=
[]
warnings
=
[]
infos
=
[]
_
=
gettext_set_language
(
ln
)
group_name_available
=
db
.
group_name_exist
(
group_name
)
if
group_name
==
""
:
warnings
.
append
(
'WRN_WEBSESSION_NO_GROUP_NAME'
)
(
body
,
errors
,
warnings
)
=
perform_request_edit_group
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
elif
not
group_name_valid_p
(
group_name
):
warnings
.
append
(
'WRN_WEBSESSION_NOT_VALID_GROUP_NAME'
)
(
body
,
errors
,
warnings
)
=
perform_request_edit_group
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
elif
join_policy
==
"-1"
:
warnings
.
append
(
'WRN_WEBSESSION_NO_JOIN_POLICY'
)
(
body
,
errors
,
warnings
)
=
perform_request_edit_group
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
elif
(
group_name_available
and
group_name_available
[
0
][
0
]
!=
grpID
):
warnings
.
append
(
'WRN_WEBSESSION_GROUP_NAME_EXISTS'
)
(
body
,
errors
,
warnings
)
=
perform_request_edit_group
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
else
:
grpID
=
db
.
update_group_infos
(
grpID
,
group_name
,
group_description
,
join_policy
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"GROUP_UPDATED"
])
(
body
,
errors
,
warnings
)
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
errors
=
errors
,
warnings
=
warnings
,
ln
=
CFG_SITE_LANG
)
return
(
body
,
errors
,
warnings
)
def
perform_request_delete_group
(
uid
,
grpID
,
confirmed
=
0
,
ln
=
CFG_SITE_LANG
):
"""First display confirm message(confirmed=0).
then(confirmed=1) delete group and all its members
@param uid: user ID
@param grpID: ID of the group
@param confirmed: =1 if confirmed message has been previously displayed
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
"""
body
=
""
warnings
=
[]
errors
=
[]
infos
=
[]
_
=
gettext_set_language
(
ln
)
group_infos
=
db
.
get_group_infos
(
grpID
)
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
group_infos
:
warnings
.
append
(
'WRN_WEBSESSION_GROUP_ALREADY_DELETED'
)
(
body
,
errors
,
warnings
)
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
errors
=
errors
,
warnings
=
warnings
,
ln
=
CFG_SITE_LANG
)
else
:
if
not
len
(
user_status
):
errors
.
append
(
'ERR_WEBSESSION_DB_ERROR'
)
elif
confirmed
:
group_infos
=
db
.
get_group_infos
(
grpID
)
group_name
=
group_infos
[
0
][
1
]
msg_subjet
,
msg_body
=
websession_templates
.
tmpl_delete_msg
(
group_name
=
group_name
,
ln
=
ln
)
(
body
,
errors
,
warnings
,
dummy
,
dummy
)
=
perform_request_send
(
uid
,
msg_to_user
=
""
,
msg_to_group
=
group_name
,
msg_subject
=
msg_subjet
,
msg_body
=
msg_body
,
ln
=
ln
)
db
.
delete_group_and_members
(
grpID
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"GROUP_DELETED"
])
(
body
,
errors
,
warnings
)
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
errors
=
errors
,
warnings
=
warnings
,
ln
=
CFG_SITE_LANG
)
else
:
body
=
websession_templates
.
tmpl_confirm_delete
(
grpID
,
ln
)
return
(
body
,
errors
,
warnings
)
def
perform_request_manage_member
(
uid
,
grpID
,
infos
=
[],
warnings
=
[],
ln
=
CFG_SITE_LANG
):
"""Return html for managing group's members.
@param uid: user ID
@param grpID: ID of the group
@param info: info about last user action
@param warnings: warnings
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
"""
body
=
''
errors
=
[]
_
=
gettext_set_language
(
ln
)
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
len
(
user_status
):
errors
.
append
(
'ERR_WEBSESSION_DB_ERROR'
)
return
(
body
,
errors
,
warnings
)
elif
user_status
[
0
][
0
]
!=
CFG_WEBSESSION_USERGROUP_STATUS
[
'ADMIN'
]:
errors
.
append
((
'ERR_WEBSESSION_GROUP_NO_RIGHTS'
,))
return
(
body
,
errors
,
warnings
)
group_infos
=
db
.
get_group_infos
(
grpID
)
if
not
len
(
group_infos
):
errors
.
append
(
'ERR_WEBSESSION_DB_ERROR'
)
return
(
body
,
errors
,
warnings
)
members
=
db
.
get_users_by_status
(
grpID
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
])
pending_members
=
db
.
get_users_by_status
(
grpID
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"PENDING"
])
body
=
websession_templates
.
tmpl_display_manage_member
(
grpID
=
grpID
,
group_name
=
group_infos
[
0
][
1
],
members
=
members
,
pending_members
=
pending_members
,
warnings
=
warnings
,
infos
=
infos
,
ln
=
ln
)
return
(
body
,
errors
,
warnings
)
def
perform_request_remove_member
(
uid
,
grpID
,
member_id
,
ln
=
CFG_SITE_LANG
):
"""Remove member from a group.
@param uid: user ID
@param grpID: ID of the group
@param member_id: selected member ID
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
"""
body
=
''
errors
=
[]
warnings
=
[]
infos
=
[]
_
=
gettext_set_language
(
ln
)
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
len
(
user_status
):
errors
.
append
(
'ERR_WEBSESSION_DB_ERROR'
)
return
(
body
,
errors
,
warnings
)
if
member_id
==
-
1
:
warnings
.
append
(
'WRN_WEBSESSION_NO_MEMBER_SELECTED'
)
(
body
,
errors
,
warnings
)
=
perform_request_manage_member
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
else
:
db
.
delete_member
(
grpID
,
member_id
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"MEMBER_DELETED"
])
(
body
,
errors
,
warnings
)
=
perform_request_manage_member
(
uid
,
grpID
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
return
(
body
,
errors
,
warnings
)
def
perform_request_add_member
(
uid
,
grpID
,
user_id
,
ln
=
CFG_SITE_LANG
):
"""Add waiting member to a group.
@param uid: user ID
@param grpID: ID of the group
@param user_id: selected member ID
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
"""
body
=
''
errors
=
[]
warnings
=
[]
infos
=
[]
_
=
gettext_set_language
(
ln
)
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
len
(
user_status
):
errors
.
append
(
'ERR_WEBSESSION_DB_ERROR'
)
return
(
body
,
errors
,
warnings
)
if
user_id
==
-
1
:
warnings
.
append
(
'WRN_WEBSESSION_NO_USER_SELECTED_ADD'
)
(
body
,
errors
,
warnings
)
=
perform_request_manage_member
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
else
:
# test if user is already member or pending
status
=
db
.
get_user_status
(
user_id
,
grpID
)
if
status
and
status
[
0
][
0
]
==
'M'
:
warnings
.
append
(
'WRN_WEBSESSION_ALREADY_MEMBER_ADD'
)
(
body
,
errors
,
warnings
)
=
perform_request_manage_member
(
uid
,
grpID
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
else
:
db
.
add_pending_member
(
grpID
,
user_id
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
])
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"MEMBER_ADDED"
])
group_infos
=
db
.
get_group_infos
(
grpID
)
group_name
=
group_infos
[
0
][
1
]
user
=
get_user_info
(
user_id
,
ln
)[
2
]
msg_subjet
,
msg_body
=
websession_templates
.
tmpl_member_msg
(
group_name
=
group_name
,
accepted
=
1
,
ln
=
ln
)
(
body
,
errors
,
warnings
,
dummy
,
dummy
)
=
perform_request_send
(
uid
,
msg_to_user
=
user
,
msg_to_group
=
""
,
msg_subject
=
msg_subjet
,
msg_body
=
msg_body
,
ln
=
ln
)
(
body
,
errors
,
warnings
)
=
perform_request_manage_member
(
uid
,
grpID
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
return
(
body
,
errors
,
warnings
)
def
perform_request_reject_member
(
uid
,
grpID
,
user_id
,
ln
=
CFG_SITE_LANG
):
"""Reject waiting member and delete it from the list.
@param uid: user ID
@param grpID: ID of the group
@param member_id: selected member ID
@param ln: language
@return: a (body, errors[], warnings[]) formed tuple
"""
body
=
''
errors
=
[]
warnings
=
[]
infos
=
[]
_
=
gettext_set_language
(
ln
)
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
len
(
user_status
):
errors
.
append
(
'ERR_WEBSESSION_DB_ERROR'
)
return
(
body
,
errors
,
warnings
)
if
user_id
==
-
1
:
warnings
.
append
(
'WRN_WEBSESSION_NO_USER_SELECTED_DEL'
)
(
body
,
errors
,
warnings
)
=
perform_request_manage_member
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
else
:
# test if user is already member or pending
status
=
db
.
get_user_status
(
user_id
,
grpID
)
if
not
status
:
warnings
.
append
(
'WRN_WEBSESSION_ALREADY_MEMBER_REJECT'
)
(
body
,
errors
,
warnings
)
=
perform_request_manage_member
(
uid
,
grpID
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
else
:
db
.
delete_member
(
grpID
,
user_id
)
group_infos
=
db
.
get_group_infos
(
grpID
)
group_name
=
group_infos
[
0
][
1
]
user
=
get_user_info
(
user_id
,
ln
)[
2
]
msg_subjet
,
msg_body
=
websession_templates
.
tmpl_member_msg
(
group_name
=
group_name
,
accepted
=
0
,
ln
=
ln
)
(
body
,
errors
,
warnings
,
dummy
,
dummy
)
=
perform_request_send
(
uid
,
msg_to_user
=
user
,
msg_to_group
=
""
,
msg_subject
=
msg_subjet
,
msg_body
=
msg_body
,
ln
=
ln
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"MEMBER_REJECTED"
])
(
body
,
errors
,
warnings
)
=
perform_request_manage_member
(
uid
,
grpID
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
return
(
body
,
errors
,
warnings
)
def
account_group
(
uid
,
ln
=
CFG_SITE_LANG
):
"""Display group info for myaccount.py page.
@param uid: user id (int)
@param ln: language
@return: html body
"""
nb_admin_groups
=
db
.
count_nb_group_user
(
uid
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"ADMIN"
])
nb_member_groups
=
db
.
count_nb_group_user
(
uid
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
])
nb_total_groups
=
nb_admin_groups
+
nb_member_groups
return
websession_templates
.
tmpl_group_info
(
nb_admin_groups
,
nb_member_groups
,
nb_total_groups
,
ln
=
ln
)
def
get_navtrail
(
ln
=
CFG_SITE_LANG
,
title
=
""
):
"""Gets the navtrail for title.
@param title: title of the page
@param ln: language
@return: HTML output
"""
navtrail
=
websession_templates
.
tmpl_navtrail
(
ln
,
title
)
return
navtrail
def
synchronize_external_groups
(
userid
,
groups
,
login_method
):
"""Synchronize external groups adding new groups that aren't already
added, adding subscription for userid to groups, for groups that the user
isn't already subsribed to, removing subscription to groups the user is no
more subsribed to.
@param userid, the intger representing the user inside the db
@param groups, a dictionary of group_name : group_description
@param login_method, a string unique to the type of authentication
the groups are associated to, to be used inside the db
"""
groups_already_known
=
db
.
get_login_method_groups
(
userid
,
login_method
)
group_dict
=
{}
for
name
,
groupid
in
groups_already_known
:
group_dict
[
name
]
=
groupid
groups_already_known_name
=
set
([
g
[
0
]
for
g
in
groups_already_known
])
groups_name
=
set
(
groups
.
keys
())
nomore_groups
=
groups_already_known_name
-
groups_name
for
group
in
nomore_groups
:
# delete the user from no more affiliated group
db
.
delete_member
(
group_dict
[
group
],
userid
)
potential_new_groups
=
groups_name
-
groups_already_known_name
for
group
in
potential_new_groups
:
groupid
=
db
.
get_group_id
(
group
,
login_method
)
if
groupid
:
# Adding the user to an already existent group
db
.
insert_new_member
(
userid
,
groupid
[
0
][
0
],
CFG_WEBSESSION_USERGROUP_STATUS
[
'MEMBER'
])
else
:
# Adding a new group
try
:
groupid
=
db
.
insert_new_group
(
userid
,
group
,
groups
[
group
],
\
CFG_WEBSESSION_GROUP_JOIN_POLICY
[
'VISIBLEEXTERNAL'
],
\
login_method
)
db
.
add_pending_member
(
groupid
,
userid
,
CFG_WEBSESSION_USERGROUP_STATUS
[
'MEMBER'
])
except
IntegrityError
:
## The group already exists? Maybe because of concurrency?
groupid
=
db
.
get_group_id
(
group
,
login_method
)
if
groupid
:
# Adding the user to an already existent group
db
.
insert_new_member
(
userid
,
groupid
[
0
][
0
],
CFG_WEBSESSION_USERGROUP_STATUS
[
'MEMBER'
])
def
synchronize_groups_with_login_method
():
"""For each login_method, if possible, synchronize groups in a bulk fashion
(i.e. when fetch_all_users_groups_membership is implemented in the
external_authentication class). Otherwise, for each user that belong to at
least one external group for a given login_method, ask, if possible, for
his group memberships and merge them.
"""
from
invenio.access_control_config
import
CFG_EXTERNAL_AUTHENTICATION
for
login_method
,
authorizer
in
CFG_EXTERNAL_AUTHENTICATION
.
items
():
if
authorizer
:
try
:
usersgroups
=
authorizer
.
fetch_all_users_groups_membership
()
synchronize_all_external_groups
(
usersgroups
,
login_method
)
except
(
NotImplementedError
,
NameError
):
users
=
db
.
get_all_users_with_groups_with_login_method
(
login_method
)
for
email
,
uid
in
users
.
items
():
try
:
groups
=
authorizer
.
fetch_user_groups_membership
(
email
)
synchronize_external_groups
(
uid
,
groups
,
login_method
)
except
(
NotImplementedError
,
NameError
):
pass
def
synchronize_all_external_groups
(
usersgroups
,
login_method
):
"""Merges all the groups vs users memberships.
@param usersgroups: is {'mygroup': ('description',
['email1', 'email2', ...]), ...}
@return: True in case everythings is ok, False otherwise
"""
db_users
=
db
.
get_all_users
()
# All users of the database {email:uid, ...}
db_users_set
=
set
(
db_users
.
keys
())
# Set of all users set('email1',
# 'email2', ...)
for
key
,
value
in
usersgroups
.
items
():
# cleaning users not in db
cleaned_user_list
=
set
()
for
username
in
value
[
1
]:
username
=
username
.
upper
()
if
username
in
db_users_set
:
cleaned_user_list
.
add
(
db_users
[
username
])
if
cleaned_user_list
:
usersgroups
[
key
]
=
(
value
[
0
],
cleaned_user_list
)
else
:
# List of user now is empty
del
usersgroups
[
key
]
# cleaning not interesting groups
# now for each group we got a description and a set of uid
groups_already_known
=
db
.
get_all_login_method_groups
(
login_method
)
# groups in the db {groupname: id}
groups_already_known_set
=
set
(
groups_already_known
.
keys
())
# set of the groupnames in db
usersgroups_set
=
set
(
usersgroups
.
keys
())
# set of groupnames to be merged
# deleted groups!
nomore_groups
=
groups_already_known_set
-
usersgroups_set
for
group_name
in
nomore_groups
:
db
.
delete_group_and_members
(
groups_already_known
[
group_name
])
# new groups!
new_groups
=
usersgroups_set
-
groups_already_known_set
for
group_name
in
new_groups
:
groupid
=
db
.
insert_only_new_group
(
group_name
,
usersgroups
[
group_name
][
0
],
# description
CFG_WEBSESSION_GROUP_JOIN_POLICY
[
'VISIBLEEXTERNAL'
],
login_method
)
for
uid
in
usersgroups
[
group_name
][
1
]:
db
.
insert_new_member
(
uid
,
groupid
,
CFG_WEBSESSION_USERGROUP_STATUS
[
'MEMBER'
])
# changed groups?
changed_groups
=
usersgroups_set
&
groups_already_known_set
groups_description
=
db
.
get_all_groups_description
(
login_method
)
for
group_name
in
changed_groups
:
users_already_in_group
=
db
.
get_users_in_group
(
groups_already_known
[
group_name
])
users_already_in_group_set
=
set
(
users_already_in_group
)
users_in_group_set
=
usersgroups
[
group_name
][
1
]
# no more affiliation
nomore_users
=
users_already_in_group_set
-
users_in_group_set
for
uid
in
nomore_users
:
db
.
delete_member
(
groups_already_known
[
group_name
],
uid
)
# new affiliation
new_users
=
users_in_group_set
-
users_already_in_group_set
for
uid
in
new_users
:
db
.
insert_new_member
(
uid
,
groups_already_known
[
group_name
],
CFG_WEBSESSION_USERGROUP_STATUS
[
'MEMBER'
])
# check description
if
groups_description
[
group_name
]
!=
usersgroups
[
group_name
][
0
]:
db
.
update_group_infos
(
groups_already_known
[
group_name
],
group_name
,
usersgroups
[
group_name
][
0
],
CFG_WEBSESSION_GROUP_JOIN_POLICY
[
'VISIBLEEXTERNAL'
])
def
group_name_valid_p
(
group_name
):
"""Test if the group's name is valid."""
return
nickname_valid_p
(
group_name
)
Event Timeline
Log In to Comment