Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F76011905
webgroup.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Mon, Aug 5, 17:35
Size
38 KB
Mime Type
text/x-python
Expires
Wed, Aug 7, 17:35 (2 d)
Engine
blob
Format
Raw Data
Handle
19648665
Attached To
R3600 invenio-infoscience
webgroup.py
View Options
# This file is part of Invenio.
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2015 CERN.
#
# Invenio is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""Group features."""
__revision__
=
"$Id$"
import
sys
from
invenio.config
import
CFG_SITE_LANG
from
invenio.base.i18n
import
gettext_set_language
from
invenio.legacy.websession.websession_config
import
CFG_WEBSESSION_INFO_MESSAGES
,
\
CFG_WEBSESSION_USERGROUP_STATUS
,
\
CFG_WEBSESSION_GROUP_JOIN_POLICY
,
\
InvenioWebSessionError
,
\
InvenioWebSessionWarning
from
invenio.legacy.webuser
import
nickname_valid_p
,
get_user_info
from
invenio.legacy.webmessage.api
import
perform_request_send
from
invenio.ext.logging
import
register_exception
import
invenio.legacy.websession.dblayer
as
db
try
:
import
invenio.legacy.template
websession_templates
=
invenio
.
legacy
.
template
.
load
(
'websession'
)
except
ImportError
:
pass
if
sys
.
hexversion
<
0x2040000
:
# pylint: disable=W0622
from
sets
import
Set
as
set
# pylint: enable=W0622
from
sqlalchemy.exc
import
IntegrityError
def
perform_request_groups_display
(
uid
,
infos
=
[],
warnings
=
[],
\
ln
=
CFG_SITE_LANG
):
"""Display all the groups the user belongs to.
@param uid: user id
@param info: info about last user action
@param ln: language
@return: body with warnings
"""
_
=
gettext_set_language
(
ln
)
body
=
""
body_admin
=
display_admin_groups
(
uid
,
ln
)
body_member
=
display_member_groups
(
uid
,
ln
)
body_external
=
display_external_groups
(
uid
,
ln
)
body
=
websession_templates
.
tmpl_display_all_groups
(
infos
=
infos
,
admin_group_html
=
body_admin
,
member_group_html
=
body_member
,
external_group_html
=
body_external
,
warnings
=
warnings
,
ln
=
ln
)
return
body
def
display_admin_groups
(
uid
,
ln
=
CFG_SITE_LANG
):
"""Display groups the user is admin of.
@param uid: user id
@param ln: language
@return: body
return html groups representation the user is admin of
"""
body
=
""
record
=
db
.
get_groups_by_user_status
(
uid
=
uid
,
user_status
=
CFG_WEBSESSION_USERGROUP_STATUS
[
"ADMIN"
])
body
=
websession_templates
.
tmpl_display_admin_groups
(
groups
=
record
,
ln
=
ln
)
return
body
def
display_member_groups
(
uid
,
ln
=
CFG_SITE_LANG
):
"""Display groups the user is member of.
@param uid: user id
@param ln: language
@return: body
body : html groups representation the user is member of
"""
body
=
""
records
=
db
.
get_groups_by_user_status
(
uid
,
user_status
=
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
]
)
body
=
websession_templates
.
tmpl_display_member_groups
(
groups
=
records
,
ln
=
ln
)
return
body
def
display_external_groups
(
uid
,
ln
=
CFG_SITE_LANG
):
"""Display groups the user is admin of.
@param uid: user id
@param ln: language
@return: body
return html groups representation the user is admin of
"""
body
=
""
record
=
db
.
get_external_groups
(
uid
)
if
record
:
body
=
websession_templates
.
tmpl_display_external_groups
(
groups
=
record
,
ln
=
ln
)
else
:
body
=
None
return
body
def
perform_request_input_create_group
(
group_name
,
group_description
,
join_policy
,
warnings
=
[],
ln
=
CFG_SITE_LANG
):
"""Display form for creating new group.
@param group_name: name of the group entered if the page has been reloaded
@param group_description: description entered if the page has been reloaded
@param join_policy: join policy chosen if the page has been reloaded
@param warnings: warnings
@param ln: language
@return: body with warnings
body: html for group creation page
"""
body
=
""
body
=
websession_templates
.
tmpl_display_input_group_info
(
group_name
,
group_description
,
join_policy
,
act_type
=
"create"
,
warnings
=
warnings
,
ln
=
ln
)
return
body
def
perform_request_create_group
(
uid
,
group_name
,
group_description
,
join_policy
,
ln
=
CFG_SITE_LANG
):
"""Create new group.
@param group_name: name of the group entered
@param group_description: description of the group entered
@param join_policy: join policy of the group entered
@param ln: language
@return: body with warnings
warning != [] if group_name or join_policy are not valid
or if the name already exists in the database
body="1" if succeed in order to display info on the main page
"""
_
=
gettext_set_language
(
ln
)
body
=
""
warnings
=
[]
infos
=
[]
if
group_name
==
""
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'Please enter a group name.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_input_create_group
(
group_name
,
group_description
,
join_policy
,
warnings
=
warnings
)
elif
not
group_name_valid_p
(
group_name
):
try
:
raise
InvenioWebSessionWarning
(
_
(
'Please enter a valid group name.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_input_create_group
(
group_name
,
group_description
,
join_policy
,
warnings
=
warnings
)
elif
join_policy
==
"-1"
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'Please choose a group join policy.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_input_create_group
(
group_name
,
group_description
,
join_policy
,
warnings
=
warnings
)
elif
db
.
group_name_exist
(
group_name
):
try
:
raise
InvenioWebSessionWarning
(
_
(
'Group name already exists. Please choose another group name.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_input_create_group
(
group_name
,
group_description
,
join_policy
,
warnings
=
warnings
)
else
:
db
.
insert_new_group
(
uid
,
group_name
,
group_description
,
join_policy
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"GROUP_CREATED"
])
body
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
return
body
def
perform_request_input_join_group
(
uid
,
group_name
,
search
,
warnings
=
[],
ln
=
CFG_SITE_LANG
):
"""Return html for joining new group.
@param group_name: name of the group entered if user is looking for a group
@param search=1 if search performed else 0
@param warnings: warnings coming from perform_request_join_group
@param ln: language
@return: body with warnings
"""
group_from_search
=
{}
records
=
db
.
get_visible_group_list
(
uid
=
uid
)
if
search
:
group_from_search
=
db
.
get_visible_group_list
(
uid
,
group_name
)
body
=
websession_templates
.
tmpl_display_input_join_group
(
records
.
items
(),
group_name
,
group_from_search
.
items
(),
search
,
warnings
=
warnings
,
ln
=
ln
)
return
body
def
perform_request_join_group
(
uid
,
grpID
,
group_name
,
search
,
ln
=
CFG_SITE_LANG
):
"""Join group.
@param grpID: list of the groups the user wants to join,
only one value must be selected among the two group lists
(default group list, group list resulting from the search)
@param group_name: name of the group entered if search on name performed
@param search=1 if search performed else 0
@param ln: language
@return: body with warnings
warnings != [] if 0 or more than one group is selected
"""
_
=
gettext_set_language
(
ln
)
body
=
""
warnings
=
[]
infos
=
[]
if
"-1"
in
grpID
:
grpID
=
filter
(
lambda
x
:
x
!=
'-1'
,
grpID
)
if
len
(
grpID
)
==
1
:
grpID
=
int
(
grpID
[
0
])
# test if user is already member or pending
status
=
db
.
get_user_status
(
uid
,
grpID
)
if
status
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'You are already member of the group.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
# insert new user of group
else
:
group_infos
=
db
.
get_group_infos
(
grpID
)
group_type
=
group_infos
[
0
][
3
]
if
group_type
==
CFG_WEBSESSION_GROUP_JOIN_POLICY
[
"VISIBLEMAIL"
]:
db
.
insert_new_member
(
uid
,
grpID
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"PENDING"
])
admin
=
db
.
get_users_by_status
(
grpID
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"ADMIN"
])[
0
][
1
]
group_name
=
group_infos
[
0
][
1
]
msg_subjet
,
msg_body
=
websession_templates
.
tmpl_admin_msg
(
group_name
=
group_name
,
grpID
=
grpID
,
ln
=
ln
)
(
body
,
dummy
,
dummy
)
=
\
perform_request_send
(
uid
,
msg_to_user
=
admin
,
msg_to_group
=
""
,
msg_subject
=
msg_subjet
,
msg_body
=
msg_body
,
ln
=
ln
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"JOIN_REQUEST"
])
elif
group_type
==
CFG_WEBSESSION_GROUP_JOIN_POLICY
[
"VISIBLEOPEN"
]:
db
.
insert_new_member
(
uid
,
grpID
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
])
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"JOIN_GROUP"
])
body
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
else
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'Please select only one group.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_input_join_group
(
uid
,
group_name
,
search
,
warnings
,
ln
)
return
body
def
perform_request_input_leave_group
(
uid
,
warnings
=
[],
ln
=
CFG_SITE_LANG
):
"""Return html for leaving group.
@param uid: user ID
@param warnings: warnings != [] if 0 group is selected or if not admin
of the
@param ln: language
@return: body with warnings
"""
body
=
""
groups
=
[]
records
=
db
.
get_groups_by_user_status
(
uid
=
uid
,
user_status
=
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
])
map
(
lambda
x
:
groups
.
append
((
x
[
0
],
x
[
1
])),
records
)
body
=
websession_templates
.
tmpl_display_input_leave_group
(
groups
,
warnings
=
warnings
,
ln
=
ln
)
return
body
def
perform_request_leave_group
(
uid
,
grpID
,
confirmed
=
0
,
ln
=
CFG_SITE_LANG
):
"""Leave group.
@param uid: user ID
@param grpID: ID of the group the user wants to leave
@param warnings: warnings != [] if 0 group is selected
@param confirmed: a confirmed page is first displayed
@param ln: language
@return: body with warnings
"""
_
=
gettext_set_language
(
ln
)
body
=
""
warnings
=
[]
infos
=
[]
if
not
grpID
==
-
1
:
if
confirmed
:
db
.
leave_group
(
grpID
,
uid
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"LEAVE_GROUP"
])
body
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
else
:
body
=
websession_templates
.
tmpl_confirm_leave
(
uid
,
grpID
,
ln
)
else
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'Please select one group.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_input_leave_group
(
uid
,
warnings
=
warnings
,
ln
=
ln
)
return
body
def
perform_request_edit_group
(
uid
,
grpID
,
warnings
=
[],
ln
=
CFG_SITE_LANG
):
"""Return html for group editing.
@param uid: user ID
@param grpID: ID of the group
@param warnings: warnings
@param ln: language
@return: body with warnings
"""
body
=
""
_
=
gettext_set_language
(
ln
)
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
len
(
user_status
):
try
:
raise
InvenioWebSessionError
(
_
(
'Sorry, there was an error with the database.'
))
except
InvenioWebSessionError
as
exc
:
register_exception
()
body
=
websession_templates
.
tmpl_error
(
exc
.
message
,
ln
)
return
body
elif
user_status
[
0
][
0
]
!=
CFG_WEBSESSION_USERGROUP_STATUS
[
'ADMIN'
]:
try
:
raise
InvenioWebSessionError
(
_
(
'Sorry, you do not have sufficient rights on this group.'
))
except
InvenioWebSessionError
as
exc
:
register_exception
()
body
=
websession_templates
.
tmpl_error
(
exc
.
message
,
ln
)
return
body
group_infos
=
db
.
get_group_infos
(
grpID
)[
0
]
if
not
len
(
group_infos
):
try
:
raise
InvenioWebSessionError
(
_
(
'Sorry, there was an error with the database.'
))
except
InvenioWebSessionError
as
exc
:
register_exception
()
body
=
websession_templates
.
tmpl_error
(
exc
.
message
,
ln
)
return
body
body
=
websession_templates
.
tmpl_display_input_group_info
(
group_name
=
group_infos
[
1
],
group_description
=
group_infos
[
2
],
join_policy
=
group_infos
[
3
],
act_type
=
"update"
,
grpID
=
grpID
,
warnings
=
warnings
,
ln
=
ln
)
return
body
def
perform_request_update_group
(
uid
,
grpID
,
group_name
,
group_description
,
join_policy
,
ln
=
CFG_SITE_LANG
):
"""Update group datas in database.
@param uid: user ID
@param grpID: ID of the group
@param group_name: name of the group
@param group_description: description of the group
@param join_policy: join policy of the group
@param ln: language
@return: body with warnings
"""
body
=
''
warnings
=
[]
infos
=
[]
_
=
gettext_set_language
(
ln
)
group_name_available
=
db
.
group_name_exist
(
group_name
)
if
group_name
==
""
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'Please enter a group name.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_edit_group
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
elif
not
group_name_valid_p
(
group_name
):
try
:
raise
InvenioWebSessionWarning
(
_
(
'Please enter a valid group name.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_edit_group
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
elif
join_policy
==
"-1"
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'Please choose a group join policy.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_edit_group
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
elif
(
group_name_available
and
group_name_available
[
0
][
0
]
!=
grpID
):
try
:
raise
InvenioWebSessionWarning
(
_
(
'Group name already exists. Please choose another group name.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_edit_group
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
else
:
grpID
=
db
.
update_group_infos
(
grpID
,
group_name
,
group_description
,
join_policy
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"GROUP_UPDATED"
])
body
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
CFG_SITE_LANG
)
return
body
def
perform_request_delete_group
(
uid
,
grpID
,
confirmed
=
0
,
ln
=
CFG_SITE_LANG
):
"""First display confirm message(confirmed=0).
then(confirmed=1) delete group and all its members
@param uid: user ID
@param grpID: ID of the group
@param confirmed: =1 if confirmed message has been previously displayed
@param ln: language
@return: body with warnings
"""
body
=
""
warnings
=
[]
infos
=
[]
_
=
gettext_set_language
(
ln
)
group_infos
=
db
.
get_group_infos
(
grpID
)
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
group_infos
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'The group has already been deleted.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
CFG_SITE_LANG
)
else
:
if
not
len
(
user_status
):
try
:
raise
InvenioWebSessionError
(
_
(
'Sorry, there was an error with the database.'
))
except
InvenioWebSessionError
as
exc
:
register_exception
()
body
=
websession_templates
.
tmpl_error
(
exc
.
message
,
ln
)
return
body
elif
confirmed
:
group_infos
=
db
.
get_group_infos
(
grpID
)
group_name
=
group_infos
[
0
][
1
]
msg_subjet
,
msg_body
=
websession_templates
.
tmpl_delete_msg
(
group_name
=
group_name
,
ln
=
ln
)
(
body
,
dummy
,
dummy
)
=
perform_request_send
(
uid
,
msg_to_user
=
""
,
msg_to_group
=
group_name
,
msg_subject
=
msg_subjet
,
msg_body
=
msg_body
,
ln
=
ln
)
db
.
delete_group_and_members
(
grpID
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"GROUP_DELETED"
])
body
=
perform_request_groups_display
(
uid
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
CFG_SITE_LANG
)
else
:
body
=
websession_templates
.
tmpl_confirm_delete
(
grpID
,
ln
)
return
body
def
perform_request_manage_member
(
uid
,
grpID
,
infos
=
[],
warnings
=
[],
ln
=
CFG_SITE_LANG
):
"""Return html for managing group's members.
@param uid: user ID
@param grpID: ID of the group
@param info: info about last user action
@param warnings: warnings
@param ln: language
@return: body with warnings
"""
body
=
''
_
=
gettext_set_language
(
ln
)
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
len
(
user_status
):
try
:
raise
InvenioWebSessionError
(
_
(
'Sorry, there was an error with the database.'
))
except
InvenioWebSessionError
as
exc
:
register_exception
()
body
=
websession_templates
.
tmpl_error
(
exc
.
message
,
ln
)
return
body
elif
user_status
[
0
][
0
]
!=
CFG_WEBSESSION_USERGROUP_STATUS
[
'ADMIN'
]:
try
:
raise
InvenioWebSessionError
(
_
(
'Sorry, you do not have sufficient rights on this group.'
))
except
InvenioWebSessionError
as
exc
:
register_exception
()
body
=
websession_templates
.
tmpl_error
(
exc
.
message
,
ln
)
return
body
group_infos
=
db
.
get_group_infos
(
grpID
)
if
not
len
(
group_infos
):
try
:
raise
InvenioWebSessionError
(
_
(
'Sorry, there was an error with the database.'
))
except
InvenioWebSessionError
as
exc
:
register_exception
()
body
=
websession_templates
.
tmpl_error
(
exc
.
message
,
ln
)
return
body
members
=
db
.
get_users_by_status
(
grpID
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
])
pending_members
=
db
.
get_users_by_status
(
grpID
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"PENDING"
])
body
=
websession_templates
.
tmpl_display_manage_member
(
grpID
=
grpID
,
group_name
=
group_infos
[
0
][
1
],
members
=
members
,
pending_members
=
pending_members
,
warnings
=
warnings
,
infos
=
infos
,
ln
=
ln
)
return
body
def
perform_request_remove_member
(
uid
,
grpID
,
member_id
,
ln
=
CFG_SITE_LANG
):
"""Remove member from a group.
@param uid: user ID
@param grpID: ID of the group
@param member_id: selected member ID
@param ln: language
@return: body with warnings
"""
body
=
''
warnings
=
[]
infos
=
[]
_
=
gettext_set_language
(
ln
)
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
len
(
user_status
):
try
:
raise
InvenioWebSessionError
(
_
(
'Sorry, there was an error with the database.'
))
except
InvenioWebSessionError
as
exc
:
register_exception
()
body
=
websession_templates
.
tmpl_error
(
exc
.
message
,
ln
)
return
body
if
member_id
==
-
1
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'Please choose a member if you want to remove him from the group.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_manage_member
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
else
:
db
.
delete_member
(
grpID
,
member_id
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"MEMBER_DELETED"
])
body
=
perform_request_manage_member
(
uid
,
grpID
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
return
body
def
perform_request_add_member
(
uid
,
grpID
,
user_id
,
ln
=
CFG_SITE_LANG
):
"""Add waiting member to a group.
@param uid: user ID
@param grpID: ID of the group
@param user_id: selected member ID
@param ln: language
@return: body with warnings
"""
body
=
''
warnings
=
[]
infos
=
[]
_
=
gettext_set_language
(
ln
)
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
len
(
user_status
):
try
:
raise
InvenioWebSessionError
(
_
(
'Sorry, there was an error with the database.'
))
except
InvenioWebSessionError
as
exc
:
register_exception
()
body
=
websession_templates
.
tmpl_error
(
exc
.
message
,
ln
)
return
body
if
user_id
==
-
1
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'Please choose a user from the list if you want him to be added to the group.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_manage_member
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
else
:
# test if user is already member or pending
status
=
db
.
get_user_status
(
user_id
,
grpID
)
if
status
and
status
[
0
][
0
]
==
'M'
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'The user is already member of the group.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_manage_member
(
uid
,
grpID
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
else
:
db
.
add_pending_member
(
grpID
,
user_id
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
])
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"MEMBER_ADDED"
])
group_infos
=
db
.
get_group_infos
(
grpID
)
group_name
=
group_infos
[
0
][
1
]
user
=
get_user_info
(
user_id
,
ln
)[
2
]
msg_subjet
,
msg_body
=
websession_templates
.
tmpl_member_msg
(
group_name
=
group_name
,
accepted
=
1
,
ln
=
ln
)
(
body
,
dummy
,
dummy
)
=
perform_request_send
(
uid
,
msg_to_user
=
user
,
msg_to_group
=
""
,
msg_subject
=
msg_subjet
,
msg_body
=
msg_body
,
ln
=
ln
)
body
=
perform_request_manage_member
(
uid
,
grpID
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
return
body
def
perform_request_reject_member
(
uid
,
grpID
,
user_id
,
ln
=
CFG_SITE_LANG
):
"""Reject waiting member and delete it from the list.
@param uid: user ID
@param grpID: ID of the group
@param member_id: selected member ID
@param ln: language
@return: body with warnings
"""
body
=
''
warnings
=
[]
infos
=
[]
_
=
gettext_set_language
(
ln
)
user_status
=
db
.
get_user_status
(
uid
,
grpID
)
if
not
len
(
user_status
):
try
:
raise
InvenioWebSessionError
(
_
(
'Sorry, there was an error with the database.'
))
except
InvenioWebSessionError
as
exc
:
register_exception
()
body
=
websession_templates
.
tmpl_error
(
exc
.
message
,
ln
)
return
body
if
user_id
==
-
1
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'Please choose a user from the list if you want him to be removed from waiting list.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_manage_member
(
uid
,
grpID
,
warnings
=
warnings
,
ln
=
ln
)
else
:
# test if user is already member or pending
status
=
db
.
get_user_status
(
user_id
,
grpID
)
if
not
status
:
try
:
raise
InvenioWebSessionWarning
(
_
(
'The user request for joining group has already been rejected.'
))
except
InvenioWebSessionWarning
as
exc
:
register_exception
(
stream
=
'warning'
)
warnings
.
append
(
exc
.
message
)
body
=
perform_request_manage_member
(
uid
,
grpID
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
else
:
db
.
delete_member
(
grpID
,
user_id
)
group_infos
=
db
.
get_group_infos
(
grpID
)
group_name
=
group_infos
[
0
][
1
]
user
=
get_user_info
(
user_id
,
ln
)[
2
]
msg_subjet
,
msg_body
=
websession_templates
.
tmpl_member_msg
(
group_name
=
group_name
,
accepted
=
0
,
ln
=
ln
)
(
body
,
dummy
,
dummy
)
=
perform_request_send
(
uid
,
msg_to_user
=
user
,
msg_to_group
=
""
,
msg_subject
=
msg_subjet
,
msg_body
=
msg_body
,
ln
=
ln
)
infos
.
append
(
CFG_WEBSESSION_INFO_MESSAGES
[
"MEMBER_REJECTED"
])
body
=
perform_request_manage_member
(
uid
,
grpID
,
infos
=
infos
,
warnings
=
warnings
,
ln
=
ln
)
return
body
def
account_group
(
uid
,
ln
=
CFG_SITE_LANG
):
"""Display group info for myaccount.py page.
@param uid: user id (int)
@param ln: language
@return: html body
"""
nb_admin_groups
=
db
.
count_nb_group_user
(
uid
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"ADMIN"
])
nb_member_groups
=
db
.
count_nb_group_user
(
uid
,
CFG_WEBSESSION_USERGROUP_STATUS
[
"MEMBER"
])
nb_total_groups
=
nb_admin_groups
+
nb_member_groups
return
websession_templates
.
tmpl_group_info
(
nb_admin_groups
,
nb_member_groups
,
nb_total_groups
,
ln
=
ln
)
def
get_navtrail
(
ln
=
CFG_SITE_LANG
,
title
=
""
):
"""Gets the navtrail for title.
@param title: title of the page
@param ln: language
@return: HTML output
"""
navtrail
=
websession_templates
.
tmpl_navtrail
(
ln
,
title
)
return
navtrail
def
synchronize_external_groups
(
userid
,
groups
,
login_method
):
"""Synchronize external groups adding new groups that aren't already
added, adding subscription for userid to groups, for groups that the user
isn't already subsribed to, removing subscription to groups the user is no
more subsribed to.
@param userid, the intger representing the user inside the db
@param groups, a dictionary of group_name : group_description
@param login_method, a string unique to the type of authentication
the groups are associated to, to be used inside the db
"""
groups_already_known
=
db
.
get_login_method_groups
(
userid
,
login_method
)
group_dict
=
{}
for
name
,
groupid
in
groups_already_known
:
group_dict
[
name
]
=
groupid
groups_already_known_name
=
set
([
g
[
0
]
for
g
in
groups_already_known
])
groups_name
=
set
(
groups
.
keys
())
nomore_groups
=
groups_already_known_name
-
groups_name
for
group
in
nomore_groups
:
# delete the user from no more affiliated group
db
.
delete_member
(
group_dict
[
group
],
userid
)
potential_new_groups
=
groups_name
-
groups_already_known_name
for
group
in
potential_new_groups
:
groupid
=
db
.
get_group_id
(
group
,
login_method
)
if
groupid
:
# Adding the user to an already existent group
db
.
insert_new_member
(
userid
,
groupid
[
0
][
0
],
CFG_WEBSESSION_USERGROUP_STATUS
[
'MEMBER'
])
else
:
# Adding a new group
try
:
groupid
=
db
.
insert_new_group
(
userid
,
group
,
groups
[
group
],
\
CFG_WEBSESSION_GROUP_JOIN_POLICY
[
'VISIBLEEXTERNAL'
],
\
login_method
)
db
.
add_pending_member
(
groupid
,
userid
,
CFG_WEBSESSION_USERGROUP_STATUS
[
'MEMBER'
])
except
IntegrityError
:
## The group already exists? Maybe because of concurrency?
groupid
=
db
.
get_group_id
(
group
,
login_method
)
if
groupid
:
# Adding the user to an already existent group
db
.
insert_new_member
(
userid
,
groupid
[
0
][
0
],
CFG_WEBSESSION_USERGROUP_STATUS
[
'MEMBER'
])
def
synchronize_groups_with_login_method
():
"""For each login_method, if possible, synchronize groups in a bulk fashion
(i.e. when fetch_all_users_groups_membership is implemented in the
external_authentication class). Otherwise, for each user that belong to at
least one external group for a given login_method, ask, if possible, for
his group memberships and merge them.
"""
from
invenio.modules.access.local_config
import
CFG_EXTERNAL_AUTHENTICATION
for
login_method
,
authorizer
in
CFG_EXTERNAL_AUTHENTICATION
.
items
():
if
authorizer
:
try
:
usersgroups
=
authorizer
.
fetch_all_users_groups_membership
()
synchronize_all_external_groups
(
usersgroups
,
login_method
)
except
(
NotImplementedError
,
NameError
):
users
=
db
.
get_all_users_with_groups_with_login_method
(
login_method
)
for
email
,
uid
in
users
.
items
():
try
:
groups
=
authorizer
.
fetch_user_groups_membership
(
email
)
synchronize_external_groups
(
uid
,
groups
,
login_method
)
except
(
NotImplementedError
,
NameError
):
pass
def
synchronize_all_external_groups
(
usersgroups
,
login_method
):
"""Merges all the groups vs users memberships.
@param usersgroups: is {'mygroup': ('description',
['email1', 'email2', ...]), ...}
@return: True in case everythings is ok, False otherwise
"""
db_users
=
db
.
get_all_users
()
# All users of the database {email:uid, ...}
db_users_set
=
set
(
db_users
.
keys
())
# Set of all users set('email1',
# 'email2', ...)
for
key
,
value
in
usersgroups
.
items
():
# cleaning users not in db
cleaned_user_list
=
set
()
for
username
in
value
[
1
]:
username
=
username
.
upper
()
if
username
in
db_users_set
:
cleaned_user_list
.
add
(
db_users
[
username
])
if
cleaned_user_list
:
usersgroups
[
key
]
=
(
value
[
0
],
cleaned_user_list
)
else
:
# List of user now is empty
del
usersgroups
[
key
]
# cleaning not interesting groups
# now for each group we got a description and a set of uid
groups_already_known
=
db
.
get_all_login_method_groups
(
login_method
)
# groups in the db {groupname: id}
groups_already_known_set
=
set
(
groups_already_known
.
keys
())
# set of the groupnames in db
usersgroups_set
=
set
(
usersgroups
.
keys
())
# set of groupnames to be merged
# deleted groups!
nomore_groups
=
groups_already_known_set
-
usersgroups_set
for
group_name
in
nomore_groups
:
db
.
delete_group_and_members
(
groups_already_known
[
group_name
])
# new groups!
new_groups
=
usersgroups_set
-
groups_already_known_set
for
group_name
in
new_groups
:
groupid
=
db
.
insert_only_new_group
(
group_name
,
usersgroups
[
group_name
][
0
],
# description
CFG_WEBSESSION_GROUP_JOIN_POLICY
[
'VISIBLEEXTERNAL'
],
login_method
)
for
uid
in
usersgroups
[
group_name
][
1
]:
db
.
insert_new_member
(
uid
,
groupid
,
CFG_WEBSESSION_USERGROUP_STATUS
[
'MEMBER'
])
# changed groups?
changed_groups
=
usersgroups_set
&
groups_already_known_set
groups_description
=
db
.
get_all_groups_description
(
login_method
)
for
group_name
in
changed_groups
:
users_already_in_group
=
db
.
get_users_in_group
(
groups_already_known
[
group_name
])
users_already_in_group_set
=
set
(
users_already_in_group
)
users_in_group_set
=
usersgroups
[
group_name
][
1
]
# no more affiliation
nomore_users
=
users_already_in_group_set
-
users_in_group_set
for
uid
in
nomore_users
:
db
.
delete_member
(
groups_already_known
[
group_name
],
uid
)
# new affiliation
new_users
=
users_in_group_set
-
users_already_in_group_set
for
uid
in
new_users
:
db
.
insert_new_member
(
uid
,
groups_already_known
[
group_name
],
CFG_WEBSESSION_USERGROUP_STATUS
[
'MEMBER'
])
# check description
if
groups_description
[
group_name
]
!=
usersgroups
[
group_name
][
0
]:
db
.
update_group_infos
(
groups_already_known
[
group_name
],
group_name
,
usersgroups
[
group_name
][
0
],
CFG_WEBSESSION_GROUP_JOIN_POLICY
[
'VISIBLEEXTERNAL'
])
def
group_name_valid_p
(
group_name
):
"""Test if the group's name is valid."""
return
nickname_valid_p
(
group_name
)
Event Timeline
Log In to Comment