Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F97277557
webuser_flask.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Fri, Jan 3, 23:57
Size
11 KB
Mime Type
text/x-python
Expires
Sun, Jan 5, 23:57 (2 d)
Engine
blob
Format
Raw Data
Handle
23371911
Attached To
R3600 invenio-infoscience
webuser_flask.py
View Options
# -*- coding: utf-8 -*-
##
## This file is part of Invenio.
## Copyright (C) 2012 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""
Flask-sqlalchemy re-implementation of webuser.
"""
from
flask
import
Request
,
Flask
,
logging
,
session
,
request
,
g
,
url_for
,
current_app
from
invenio.config
import
\
CFG_ACCESS_CONTROL_LEVEL_ACCOUNTS
,
\
CFG_ACCESS_CONTROL_LEVEL_GUESTS
,
\
CFG_ACCESS_CONTROL_LEVEL_SITE
,
\
CFG_ACCESS_CONTROL_LIMIT_REGISTRATION_TO_DOMAIN
,
\
CFG_ACCESS_CONTROL_NOTIFY_ADMIN_ABOUT_NEW_ACCOUNTS
,
\
CFG_ACCESS_CONTROL_NOTIFY_USER_ABOUT_NEW_ACCOUNT
,
\
CFG_SITE_ADMIN_EMAIL
,
\
CFG_SITE_LANG
,
\
CFG_SITE_NAME
,
\
CFG_SITE_NAME_INTL
,
\
CFG_SITE_SUPPORT_EMAIL
,
\
CFG_SITE_SECURE_URL
,
\
CFG_SITE_URL
,
\
CFG_WEBSESSION_EXPIRY_LIMIT_DEFAULT
,
\
CFG_WEBSESSION_DIFFERENTIATE_BETWEEN_GUESTS
,
\
CFG_CERN_SITE
,
\
CFG_INSPIRE_SITE
,
\
CFG_BIBAUTHORID_ENABLED
,
\
CFG_SITE_RECORD
from
invenio.cache
import
cache
from
invenio.messages
import
gettext_set_language
,
wash_languages
,
wash_language
from
invenio.mailutils
import
send_email
from
invenio.errorlib
import
register_exception
from
invenio.external_authentication
import
InvenioWebAccessExternalAuthError
from
invenio.access_control_config
import
CFG_EXTERNAL_AUTHENTICATION
,
\
CFG_WEBACCESS_MSGS
,
CFG_WEBACCESS_WARNING_MSGS
,
CFG_EXTERNAL_AUTH_DEFAULT
from
functools
import
wraps
#from invenio.webinterface_handler_flask_utils import _
from
werkzeug.local
import
LocalProxy
from
werkzeug.datastructures
import
CallbackDict
,
CombinedMultiDict
from
flask
import
current_app
,
session
,
_request_ctx_stack
,
redirect
,
url_for
,
\
request
,
flash
,
abort
CFG_USER_DEFAULT_INFO
=
{
'remote_ip'
:
''
,
'remote_host'
:
''
,
'referer'
:
''
,
'uri'
:
''
,
'agent'
:
''
,
'uid'
:
-
1
,
'nickname'
:
''
,
'email'
:
''
,
'group'
:
[],
'guest'
:
'1'
,
'session'
:
None
,
'precached_permitted_restricted_collections'
:
[],
'precached_usebaskets'
:
False
,
'precached_useloans'
:
False
,
'precached_usegroups'
:
False
,
'precached_usealerts'
:
False
,
'precached_usemessages'
:
False
,
'precached_viewsubmissions'
:
False
,
'precached_useapprove'
:
False
,
'precached_useadmin'
:
False
,
'precached_usestats'
:
False
,
'precached_viewclaimlink'
:
False
,
'precached_usepaperclaim'
:
False
,
'precached_usepaperattribution'
:
False
,
'precached_canseehiddenmarctags'
:
False
,
}
class
UserInfo
(
CombinedMultiDict
):
def
__init__
(
self
,
uid
=
None
):
"""
Keeps information about user.
"""
def
on_update
(
self
):
self
.
modified
=
True
self
.
modified
=
False
self
.
uid
=
uid
if
uid
>
0
:
data
=
cache
.
get
(
self
.
get_key
())
if
data
is
None
:
data
=
self
.
_login
(
uid
,
precache
=
True
)
else
:
data
=
self
.
_create_guest
()
self
.
req
=
self
.
_get_request_info
()
self
.
info
=
CallbackDict
(
data
,
on_update
)
#FIXME remove req after everybody start using flask request.
CombinedMultiDict
.
__init__
(
self
,
[
self
.
info
,
self
.
req
,
dict
(
CFG_USER_DEFAULT_INFO
)])
self
.
save
()
def
get_key
(
self
):
key
=
'current_user::'
+
str
(
self
.
uid
)
+
\
'::'
+
str
(
request
.
remote_addr
)
return
key
def
save
(
self
):
"""
Saves modified data pernamently for logged users.
"""
if
not
self
.
is_guest
()
and
self
.
modified
:
cache
.
set
(
self
.
get_key
(),
dict
(
self
.
info
),
timeout
=
CFG_WEBSESSION_EXPIRY_LIMIT_DEFAULT
*
3600
)
def
reload
(
self
):
"""
Reloads user login information and saves them.
"""
data
=
self
.
_login
(
self
.
uid
)
self
.
info
.
update
(
data
)
CombinedMultiDict
.
__init__
(
self
,
[
self
.
info
,
self
.
req
,
dict
(
CFG_USER_DEFAULT_INFO
)])
self
.
save
()
def
_get_request_info
(
self
):
"""
Get request information.
"""
#FIXME: we should support IPV6 too. (hint for FireRole)
data
=
{}
data
[
'remote_ip'
]
=
request
.
remote_addr
or
''
data
[
'remote_host'
]
=
request
.
environ
.
get
(
'REMOTE_HOST'
,
''
)
data
[
'referer'
]
=
request
.
referrer
data
[
'uri'
]
=
request
.
url
or
''
data
[
'agent'
]
=
request
.
user_agent
or
'N/A'
#data['session'] = session.sid
return
data
def
_create_guest
(
self
):
data
=
{}
if
CFG_WEBSESSION_DIFFERENTIATE_BETWEEN_GUESTS
:
from
invenio.sqlalchemyutils
import
db
from
invenio.websession_model
import
User
note
=
'1'
if
CFG_ACCESS_CONTROL_LEVEL_GUESTS
==
0
else
'0'
u
=
User
(
email
=
''
,
note
=
note
,
password
=
'guest'
)
db
.
session
.
add
(
u
)
db
.
session
.
commit
()
data
.
update
(
u
.
__dict__
)
else
:
# Minimal information about user.
data
[
'id'
]
=
data
[
'uid'
]
=
0
return
data
def
_login
(
self
,
uid
,
precache
=
False
):
"""
Get account information about currently logged user from database.
Should raise an exception when session.uid is not valid User.id.
"""
from
invenio.websession_model
import
User
data
=
{}
try
:
user
=
User
.
query
.
get
(
uid
)
data
[
'id'
]
=
data
[
'uid'
]
=
user
.
id
or
-
1
data
[
'nickname'
]
=
user
.
nickname
or
''
data
[
'email'
]
=
user
.
email
or
''
data
[
'note'
]
=
user
.
note
or
''
data
.
update
(
user
.
settings
or
{})
data
[
'settings'
]
=
user
.
settings
or
{}
data
[
'guest'
]
=
str
(
int
(
user
.
guest
))
# '1' or '0'
self
.
modified
=
True
if
precache
:
self
.
_precache
(
data
)
except
:
data
=
self
.
_create_guest
()
return
data
def
_precache
(
self
,
data
):
"""
Calculate prermitions for user actions.
"""
from
invenio.webuser
import
isUserSubmitter
,
isUserReferee
,
isUserAdmin
from
invenio.access_control_engine
import
acc_authorize_action
from
invenio.access_control_admin
import
acc_get_role_id
,
acc_get_action_roles
,
acc_get_action_id
,
acc_is_user_in_role
,
acc_find_possible_activities
from
invenio.search_engine
import
get_permitted_restricted_collections
data
[
'precached_permitted_restricted_collections'
]
=
\
get_permitted_restricted_collections
(
data
)
data
[
'precached_usebaskets'
]
=
acc_authorize_action
(
data
,
'usebaskets'
)[
0
]
==
0
data
[
'precached_useloans'
]
=
acc_authorize_action
(
data
,
'useloans'
)[
0
]
==
0
data
[
'precached_usegroups'
]
=
acc_authorize_action
(
data
,
'usegroups'
)[
0
]
==
0
data
[
'precached_usealerts'
]
=
acc_authorize_action
(
data
,
'usealerts'
)[
0
]
==
0
data
[
'precached_usemessages'
]
=
acc_authorize_action
(
data
,
'usemessages'
)[
0
]
==
0
data
[
'precached_usestats'
]
=
acc_authorize_action
(
data
,
'runwebstatadmin'
)[
0
]
==
0
data
[
'precached_viewsubmissions'
]
=
isUserSubmitter
(
data
)
data
[
'precached_useapprove'
]
=
isUserReferee
(
data
)
data
[
'precached_useadmin'
]
=
isUserAdmin
(
data
)
data
[
'precached_canseehiddenmarctags'
]
=
acc_authorize_action
(
data
,
'runbibedit'
)[
0
]
==
0
usepaperclaim
=
False
usepaperattribution
=
False
viewclaimlink
=
False
if
(
CFG_BIBAUTHORID_ENABLED
and
acc_is_user_in_role
(
data
,
acc_get_role_id
(
"paperclaimviewers"
))):
usepaperclaim
=
True
if
(
CFG_BIBAUTHORID_ENABLED
and
acc_is_user_in_role
(
data
,
acc_get_role_id
(
"paperattributionviewers"
))):
usepaperattribution
=
True
viewlink
=
False
try
:
viewlink
=
session
[
'personinfo'
][
'claim_in_process'
]
except
(
KeyError
,
TypeError
):
pass
if
(
CFG_BIBAUTHORID_ENABLED
and
usepaperattribution
and
viewlink
):
viewclaimlink
=
True
# if (CFG_BIBAUTHORID_ENABLED
# and ((usepaperclaim or usepaperattribution)
# and acc_is_user_in_role(data, acc_get_role_id("paperattributionlinkviewers")))):
# viewclaimlink = True
data
[
'precached_viewclaimlink'
]
=
viewclaimlink
data
[
'precached_usepaperclaim'
]
=
usepaperclaim
data
[
'precached_usepaperattribution'
]
=
usepaperattribution
def
is_authenticated
(
self
):
return
not
self
.
is_guest
()
def
is_authorized
(
self
,
name
,
**
kwargs
):
from
invenio.access_control_engine
import
acc_authorize_action
return
acc_authorize_action
(
self
,
name
)[
0
]
==
0
def
is_active
(
self
):
return
not
self
.
is_guest
()
def
is_guest
(
self
):
return
True
if
self
[
'email'
]
==
''
else
False
def
get_id
(
self
):
return
self
.
get
(
'id'
,
None
)
KEY_USER_ID
=
'_uid'
class
InvenioLoginManager
(
object
):
def
__init__
(
self
):
self
.
key_user_id
=
KEY_USER_ID
self
.
guest_user
=
UserInfo
self
.
login_view
=
None
self
.
user_callback
=
None
self
.
unauthorized_callback
=
None
def
user_loader
(
self
,
callback
):
self
.
user_callback
=
callback
def
setup_app
(
self
,
app
):
app
.
login_manager
=
self
app
.
before_request
(
self
.
_load_user
)
def
save_user
(
response
):
current_user
.
save
()
return
response
app
.
after_request
(
save_user
)
#app.after_request(self._update_remember_cookie)
def
unauthorized_handler
(
self
,
callback
):
self
.
unauthorized_callback
=
callback
def
unauthorized
(
self
):
if
self
.
unauthorized_callback
:
return
self
.
unauthorized_callback
()
if
not
self
.
login_view
:
abort
(
401
)
return
redirect
(
url_for
(
self
.
login_view
,
referer
=
request
.
url
))
def
_load_user
(
self
):
#FIXME add remember me
self
.
reload_user
()
def
reload_user
(
self
):
ctx
=
_request_ctx_stack
.
top
uid
=
session
.
get
(
self
.
key_user_id
,
None
)
current_app
.
logger
.
info
(
"loading user:
%s
"
%
str
(
uid
))
if
uid
is
None
:
ctx
.
user
=
self
.
guest_user
()
else
:
user
=
self
.
user_callback
(
uid
)
if
user
is
None
:
logout_user
()
else
:
ctx
.
user
=
user
ctx
.
user
.
save
()
#.reload(update_session=True)
# A proxy for current user
current_user
=
LocalProxy
(
lambda
:
_request_ctx_stack
.
top
.
user
)
def
login_user
(
uid
,
remember_me
=
False
,
force
=
False
):
#if (not force) and (not user.is_active()):
# return False
current_app
.
logger
.
info
(
"logging user
%d
"
%
uid
)
session
.
uid
=
uid
session
.
set_remember_me
(
remember_me
)
current_app
.
login_manager
.
reload_user
()
return
True
def
logout_user
():
session
.
uid
=
None
current_app
.
login_manager
.
reload_user
()
return
True
def
login_required
(
fn
):
@wraps
(
fn
)
def
decorated_view
(
*
args
,
**
kwargs
):
current_app
.
logger
.
info
(
current_user
.
get_id
())
if
not
current_user
.
is_authenticated
():
return
current_app
.
login_manager
.
unauthorized
()
return
fn
(
*
args
,
**
kwargs
)
return
decorated_view
Event Timeline
Log In to Comment