Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F91064892
external_authentication_oauth1.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, Nov 7, 12:15
Size
8 KB
Mime Type
text/x-python
Expires
Sat, Nov 9, 12:15 (2 d)
Engine
blob
Format
Raw Data
Handle
22189083
Attached To
R3600 invenio-infoscience
external_authentication_oauth1.py
View Options
# -*- coding: utf-8 -*-
##
## This file is part of Invenio.
## Copyright (C) 2012 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""
This module contains functions and methods to authenticate with OAuth1
providers.
"""
__revision__
=
\
"$Id$"
from
invenio.containerutils
import
get_substructure
from
invenio.dbquery
import
run_sql
from
invenio.external_authentication
import
ExternalAuth
class
ExternalOAuth1
(
ExternalAuth
):
"""
Contains methods for authenticate with an OpenID provider.
"""
@staticmethod
def
__init_req
(
req
):
req
.
g
[
'oauth1_provider_name'
]
=
''
req
.
g
[
'oauth1_debug'
]
=
0
req
.
g
[
'oauth1_msg'
]
=
''
req
.
g
[
'oauth1_debug_msg'
]
=
''
req
.
g
[
'oauth1_response'
]
=
None
def
auth_user
(
self
,
username
,
password
,
req
=
None
):
"""
Tries to find email and identity of the user from OAuth1 provider. If it
doesn't find any of them, returns (None, None)
@param username: Isn't used in this function
@type username: str
@param password: Isn't used in this function
@type password: str
@param req: request
@type req: invenio.webinterface_handler_wsgi.SimulatedModPythonRequest
@rtype: str|NoneType, str|NoneType
"""
from
invenio.access_control_config
import
CFG_OAUTH1_CONFIGURATIONS
from
invenio.access_control_config
import
CFG_OAUTH1_PROVIDERS
from
invenio.webinterface_handler
import
wash_urlargd
from
rauth.service
import
OAuth1Service
self
.
__init_req
(
req
)
args
=
wash_urlargd
(
req
.
form
,
{
'provider'
:
(
str
,
''
),
'login_method'
:
(
str
,
''
),
'oauth_token'
:
(
str
,
''
),
'oauth_verifier'
:
(
str
,
''
),
'denied'
:
(
str
,
''
)
})
provider_name
=
req
.
g
[
'oauth1_provider_name'
]
=
args
[
'provider'
]
if
not
provider_name
in
CFG_OAUTH1_PROVIDERS
:
req
.
g
[
'oauth1_msg'
]
=
22
return
None
,
None
# Load the configurations to construct OAuth1 service
config
=
CFG_OAUTH1_CONFIGURATIONS
[
args
[
'provider'
]]
req
.
g
[
'oauth1_debug'
]
=
config
.
get
(
'debug'
,
0
)
if
not
args
[
'oauth_token'
]:
# In case of an error, display corresponding message
if
args
[
'denied'
]:
req
.
g
[
'oauth1_msg'
]
=
21
return
None
,
None
else
:
req
.
g
[
'oauth1_msg'
]
=
22
return
None
,
None
provider
=
OAuth1Service
(
name
=
req
.
g
[
'oauth1_provider_name'
],
consumer_key
=
config
[
'consumer_key'
],
consumer_secret
=
config
[
'consumer_secret'
],
request_token_url
=
config
[
'request_token_url'
],
access_token_url
=
config
[
'access_token_url'
],
authorize_url
=
config
[
'authorize_url'
],
header_auth
=
True
)
# Get the request token secret from database and exchange it with the
# access token.
query
=
"""SELECT secret FROM oauth1_storage WHERE token = %s"""
params
=
(
args
[
'oauth_token'
],)
try
:
# If the request token is already used, return
request_token_secret
=
run_sql
(
query
,
params
)[
0
][
0
]
except
IndexError
:
req
.
g
[
'oauth1_msg'
]
=
22
return
None
,
None
response
=
provider
.
get_access_token
(
'GET'
,
request_token
=
args
[
'oauth_token'
],
request_token_secret
=
request_token_secret
,
params
=
{
'oauth_verifier'
:
args
[
'oauth_verifier'
]
}
)
if
req
.
g
[
'oauth1_debug'
]:
req
.
g
[
'oauth1_debug_msg'
]
=
str
(
response
.
content
)
+
"<br/>"
# Some providers send the identity and access token together.
email
,
identity
=
self
.
_get_user_email_and_id
(
response
.
content
,
req
)
if
not
identity
and
config
.
has_key
(
'request_url'
):
# For some providers, to reach user profile we need to make request
# to a specific url.
params
=
config
.
get
(
'request_parameters'
,
{})
response
=
provider
.
get
(
config
[
'request_url'
],
params
=
params
,
access_token
=
response
.
content
[
'oauth_token'
],
access_token_secret
=
response
.
content
[
'oauth_token_secret'
]
)
if
req
.
oauth1_debug
:
req
.
g
[
'oauth1_debug_msg'
]
+=
str
(
response
.
content
)
+
"<br/>"
email
,
identity
=
self
.
_get_user_email_and_id
(
response
.
content
,
req
)
if
identity
:
# If identity is found, add the name of the provider at the
# beginning of the identity because different providers may have
# different users with same id.
identity
=
"
%s
:
%s
"
%
(
req
.
g
[
'oauth1_provider_name'
],
identity
)
else
:
req
.
g
[
'oauth1_msg'
]
=
23
# Delete the token saved in the database since it is useless now.
query
=
"""
DELETE FROM oauth1_storage
WHERE token=%s
OR date_creation < DATE_SUB(NOW(), INTERVAL 1 HOUR)
"""
params
=
(
args
[
'oauth_token'
],)
run_sql
(
query
,
params
)
if
req
.
g
[
'oauth1_debug'
]:
req
.
g
[
'oauth1_msg'
]
=
"<code>
%s
</code>"
%
req
.
g
[
'oauth1_debug_msg'
]
.
replace
(
"
\n
"
,
"<br/>"
)
return
None
,
None
return
email
,
identity
def
fetch_user_nickname
(
self
,
username
,
password
=
None
,
req
=
None
):
"""
Fetches the OAuth1 provider for nickname of the user. If it doesn't
find any, returns None.
This function doesn't need username, password or req. They are exist
just because this class is derived from ExternalAuth
@param username: Isn't used in this function
@type username: str
@param password: Isn't used in this function
@type password: str
@param req: Isn't used in this function
@type req: invenio.webinterface_handler_wsgi.SimulatedModPythonRequest
@rtype: str or NoneType
"""
from
invenio.access_control_config
import
CFG_OAUTH1_CONFIGURATIONS
if
req
.
g
[
'oauth1_provider_name'
]:
path
=
None
if
CFG_OAUTH1_CONFIGURATIONS
[
req
.
g
[
'oauth1_provider_name'
]]
.
has_key
(
'nickname'
):
path
=
CFG_OAUTH1_CONFIGURATIONS
[
req
.
g
[
'oauth1_provider_name'
]][
'nickname'
]
if
path
:
return
get_substructure
(
req
.
oauth1_response
,
path
)
else
:
return
None
def
_get_user_email_and_id
(
self
,
container
,
req
):
"""
Returns external identity and email address together. Since identity is
essential for OAuth1 authentication, if it doesn't find external
identity returns None, None.
@param container: container which contains email and id
@type container: list|dict
@rtype str|NoneType, str|NoneType
"""
from
invenio.access_control_config
import
CFG_OAUTH1_CONFIGURATIONS
identity
=
None
email
=
None
if
CFG_OAUTH1_CONFIGURATIONS
[
req
.
g
[
'oauth1_provider_name'
]]
.
has_key
(
'id'
):
path
=
CFG_OAUTH1_CONFIGURATIONS
[
req
.
g
[
'oauth1_provider_name'
]][
'id'
]
identity
=
get_substructure
(
container
,
path
)
if
identity
:
if
CFG_OAUTH1_CONFIGURATIONS
[
req
.
g
[
'oauth1_provider_name'
]]
.
has_key
(
'email'
):
path
=
CFG_OAUTH1_CONFIGURATIONS
[
req
.
g
[
'oauth1_provider_name'
]][
'email'
]
email
=
get_substructure
(
container
,
path
)
req
.
g
[
'oauth1_response'
]
=
container
return
email
,
identity
@staticmethod
def
get_msg
(
req
):
return
req
.
g
[
'oauth1_msg'
]
Event Timeline
Log In to Comment