Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F91451258
webinterface_handler_flask_utils.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Mon, Nov 11, 06:17
Size
15 KB
Mime Type
text/x-python
Expires
Wed, Nov 13, 06:17 (2 d)
Engine
blob
Format
Raw Data
Handle
22263340
Attached To
R3600 invenio-infoscience
webinterface_handler_flask_utils.py
View Options
# -*- coding: utf-8 -*-
## This file is part of Invenio.
## Copyright (C) 2012 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""
Invenio -> Flask adapter utilities
"""
from
functools
import
wraps
from
flask
import
Blueprint
,
current_app
,
request
,
session
,
redirect
,
abort
,
g
,
\
render_template
,
jsonify
,
get_flashed_messages
,
flash
,
\
Response
,
_request_ctx_stack
,
stream_with_context
from
invenio.webuser_flask
import
current_user
,
login_required
from
invenio.urlutils
import
create_url
from
invenio.sqlalchemyutils
import
db
from
invenio.cache
import
cache
from
sqlalchemy.sql
import
operators
## Placemark for the i18n function
_
=
lambda
x
:
x
def
register_template_context_processor
(
f
):
g
.
_template_context_processor
.
append
(
f
)
class
InvenioBlueprint
(
Blueprint
):
## Cache decorator alias
#TODO language independent cache
def
invenio_cached
(
self
,
*
cargs
,
**
ckwargs
):
try
:
return
cache
.
cached
(
*
cargs
,
**
ckwargs
)
except
:
def
decorator
(
f
):
@wraps
(
f
)
def
decorated_func
(
*
args
,
**
kwargs
):
current_app
.
logger
.
error
(
'Check the cache engine'
)
return
f
(
*
args
,
**
kwargs
)
return
decorated_func
return
decorator
def
invenio_memoize
(
self
,
*
cargs
,
**
ckwargs
):
try
:
return
cache
.
memoize
(
*
cargs
,
**
ckwargs
)
except
:
def
decorator
(
f
):
@wraps
(
f
)
def
decorated_func
(
*
args
,
**
kwargs
):
current_app
.
logger
.
error
(
'Check the cache engine'
)
return
f
(
*
args
,
**
kwargs
)
return
decorated_func
return
decorator
def
__init__
(
self
,
name
,
import_name
,
url_prefix
=
None
,
config
=
None
,
breadcrumbs
=
None
,
menubuilder
=
None
,
force_https
=
False
):
"""
Invenio extension of standard Flask blueprint.
@param name: blueprint unique text identifier
@param import_name: class name (usually __name__)
@param url_prefix: URL prefix for all blueprints' view functions
@param config: importable config class
@param breadcrumbs: list of breadcrumbs
@param menubuilder: list of menus
@param force_https: requires blueprint to be accessible only via https
"""
Blueprint
.
__init__
(
self
,
name
,
import_name
,
url_prefix
=
url_prefix
)
self
.
config
=
config
self
.
breadcrumbs
=
breadcrumbs
or
[]
self
.
breadcrumbs_map
=
{}
self
.
menubuilder
=
menubuilder
or
[]
self
.
menubuilder_map
=
{}
self
.
_force_https
=
force_https
def
invenio_set_breadcrumb
(
self
,
label
,
name
=
None
):
def
decorator
(
f
):
endpoint
=
'.'
.
join
([
self
.
name
,
name
or
f
.
__name__
])
self
.
breadcrumbs_map
[
endpoint
]
=
[(
_
(
'Home'
),
''
)]
+
self
.
breadcrumbs
+
[(
label
,
endpoint
)]
return
f
return
decorator
def
invenio_add_menuitem
(
self
,
name
=
None
,
*
args
):
def
decorator
(
f
):
endpoint
=
'.'
.
join
([
self
.
name
,
name
or
f
.
__name__
])
self
.
menubuilder_map
[
endpoint
]
=
args
return
decorator
def
invenio_templated
(
self
,
template
=
None
,
stream
=
False
,
mimetype
=
'text/html'
):
"""
The idea of this decorator is that you return a dictionary with the
values passed to the template from the view function and the template
is automatically rendered or a JSON object is generated if it is reply
to an AJAX request.
@note: user password ... TODO
@see: http://flask.pocoo.org/docs/patterns/viewdecorators/
"""
def
stream_template
(
template_name
,
**
context
):
"""
The jinja2 template supports rendering templates piece by piece,
however the request context is not kept during whole time of
template rendering process.
@ see: http://flask.pocoo.org/docs/patterns/streaming/
"""
current_app
.
update_template_context
(
context
)
t
=
current_app
.
jinja_env
.
get_template
(
template_name
)
rv
=
t
.
stream
(
context
)
return
stream_with_context
(
rv
)
### Get real objects from Werkzeug proxy objects.
#app = current_app._get_current_object()
#rq = request._get_current_object()
#def wrap_context():
# """
# This iterator wrapper tries to solve problem of keeping
# request context until the template is fully rendered.
# """
# with app.request_context(rq.environ):
# for r in rv:
# yield r
#return wrap_context()
if
stream
:
render
=
lambda
template
,
**
ctx
:
\
Response
(
stream_template
(
template
,
**
ctx
),
mimetype
=
mimetype
)
else
:
render
=
render_template
def
decorator
(
f
):
@wraps
(
f
)
def
decorated_function
(
*
args
,
**
kwargs
):
template_name
=
template
if
template_name
is
None
:
template_name
=
request
.
endpoint
\
.
replace
(
'.'
,
'/'
)
+
'.html'
ctx
=
f
(
*
args
,
**
kwargs
)
if
ctx
is
None
:
ctx
=
{}
elif
not
isinstance
(
ctx
,
dict
):
return
ctx
if
request
.
is_xhr
:
#FIXME think about more possible types
for
k
,
v
in
ctx
.
iteritems
():
if
isinstance
(
v
,
list
):
try
:
ctx
[
k
]
=
[
dict
(
zip
(
x
.
keys
(),
[
dict
(
i
)
for
i
in
x
]))
for
x
in
v
]
except
:
ctx
[
k
]
=
v
else
:
try
:
ctx
[
k
]
=
dict
(
v
)
except
:
ctx
[
k
]
=
v
ctx
[
'_messages'
]
=
get_flashed_messages
(
with_categories
=
True
)
return
jsonify
(
**
ctx
)
return
render
(
template_name
,
**
ctx
)
return
decorated_function
return
decorator
def
invenio_sorted
(
self
,
model
=
None
,
cols
=
None
):
"""
This decorator fills `sort` argument with `ORDER BY` expression used by
sqlalchemy for defined URL arguments `sort_by` and `order`.
"""
def
decorator
(
f
):
@wraps
(
f
)
def
decorated_function
(
*
args
,
**
kwargs
):
sort_by
=
request
.
args
.
get
(
'sort_by'
,
None
)
order_fn
=
{
'asc'
:
db
.
asc
,
'desc'
:
db
.
desc
}
.
get
(
request
.
args
.
get
(
'order'
,
'asc'
),
db
.
asc
)
sort
=
False
if
model
is
not
None
and
sort_by
is
not
None
and
(
cols
is
None
or
sort_by
in
cols
):
try
:
sort_keys
=
sort_by
.
split
(
'.'
)
if
hasattr
(
model
,
sort_keys
[
0
]):
sort
=
order_fn
(
reduce
(
lambda
x
,
y
:
getattr
(
x
.
property
.
table
.
columns
,
y
),
sort_keys
[
1
:],
getattr
(
model
,
sort_keys
[
0
])))
except
:
flash
(
_
(
"Invalid sorting key '
%s
'."
)
%
sort_by
)
kwargs
[
'sort'
]
=
sort
return
f
(
*
args
,
**
kwargs
)
return
decorated_function
return
decorator
def
invenio_filtered
(
self
,
model
=
None
,
columns
=
None
,
form
=
None
,
filter_empty
=
False
):
"""
This decorator
The `filter_form` is also injected to template context
"""
def
decorator
(
f
):
@wraps
(
f
)
def
decorated_function
(
*
args
,
**
kwargs
):
if
not
model
or
not
columns
:
return
f
(
*
args
,
**
kwargs
)
where
=
[]
for
column
,
op
in
columns
.
iteritems
():
try
:
values
=
request
.
values
.
getlist
(
column
)
if
not
values
:
continue
column_keys
=
column
.
split
(
'.'
)
if
hasattr
(
model
,
column_keys
[
0
]):
cond
=
reduce
(
lambda
x
,
y
:
getattr
(
x
.
property
.
table
.
columns
,
y
),
column_keys
[
1
:],
getattr
(
model
,
column_keys
[
0
]))
current_app
.
logger
.
info
(
"Filtering by:
%s
=
%s
"
%
\
(
cond
,
values
))
# Multi-values support
if
len
(
values
)
>
0
:
# Ignore empty values when using start with,
# contains or similar.
# FIXME: add per field configuration
values
=
[
value
for
value
in
values
if
len
(
value
)
>
0
or
filter_empty
]
if
op
==
operators
.
eq
:
where
.
append
(
db
.
in_
(
values
))
else
:
or_list
=
[]
for
value
in
values
:
or_list
.
append
(
op
(
cond
,
value
))
where
.
append
(
db
.
or_
(
*
or_list
))
else
:
where
.
append
(
op
(
cond
,
value
))
except
:
flash
(
_
(
"Invalid filtering key '
%s
'."
)
%
column
)
if
form
is
not
None
:
filter_form
=
form
(
request
.
values
)
@register_template_context_processor
def
inject_filter_form
():
return
dict
(
filter_form
=
filter_form
)
# Generate ClauseElement for filtered columns.
kwargs
[
'filter'
]
=
db
.
and_
(
*
where
)
return
f
(
*
args
,
**
kwargs
)
return
decorated_function
return
decorator
@property
def
invenio_force_https
(
self
):
"""
Decorator: This forces the view function be available only through
HTTPS.
"""
def
decorator
(
f
):
f
.
_force_https
=
True
return
f
return
decorator
def
invenio_authenticated
(
self
,
f
):
"""
Decorator: This requires user to be logged in otherwise login manager
redirects request to defined view or returns http error 401.
"""
return
login_required
(
f
)
def
invenio_authorized
(
self
,
action
,
**
params
):
"""
Decorator: This checks is current user is authorized to the action.
When not authorized returns http error 401.
"""
def
decorator
(
f
):
@wraps
(
f
)
def
inner
(
*
a
,
**
kw
):
try
:
from
invenio.access_control_engine
import
acc_authorize_action
auth
,
message
=
acc_authorize_action
(
current_user
.
get_id
(),
action
,
**
dict
((
k
,
v
()
if
callable
(
v
)
else
v
)
\
for
(
k
,
v
)
in
params
.
iteritems
()))
if
auth
==
1
:
current_app
.
logger
.
info
(
message
)
abort
(
401
)
except
:
#FIXME add some better message to log
current_app
.
logger
.
info
(
"NOT KEY FOR AUTHORIZATION"
)
abort
(
401
)
return
f
(
*
a
,
**
kw
)
return
inner
return
decorator
def
invenio_wash_urlargd
(
self
,
config
):
def
_invenio_wash_urlargd
(
f
):
@wraps
(
f
)
def
decorator
():
argd
=
wash_urlargd
(
request
.
values
,
config
)
return
f
(
**
argd
)
return
decorator
return
_invenio_wash_urlargd
def
unicodifier
(
obj
):
"""
Tries to (recursively) convert the given object into unicode, assuming
a UTF-8 encoding)
:Parameters:
- `obj`: the object to convert (can be e.g. unicode, str, list, tuple, dict...
"""
if
isinstance
(
obj
,
unicode
):
return
obj
elif
isinstance
(
obj
,
str
):
return
obj
.
decode
(
'utf8'
)
elif
isinstance
(
obj
,
list
):
return
[
unicodifier
(
elem
)
for
elem
in
obj
]
elif
isinstance
(
obj
,
tuple
):
return
tuple
(
unicodifier
(
elem
)
for
elem
in
obj
)
elif
isinstance
(
obj
,
dict
):
return
dict
((
key
,
unicodifier
(
value
))
for
key
,
value
in
obj
.
iteritems
())
return
obj
def
wash_urlargd
(
form
,
content
):
"""
Wash the complete form based on the specification in
content. Content is a dictionary containing the field names as a
key, and a tuple (type, default) as value.
'type' can be list, unicode, invenio.webinterface_handler_wsgi_utils.StringField, int, tuple, or
invenio.webinterface_handler_wsgi_utils.Field (for
file uploads).
The specification automatically includes the 'ln' field, which is
common to all queries.
Arguments that are not defined in 'content' are discarded.
Note that in case {list,tuple} were asked for, we assume that
{list,tuple} of strings is to be returned. Therefore beware when
you want to use wash_urlargd() for multiple file upload forms.
@Return: argd dictionary that can be used for passing function
parameters by keywords.
"""
result
=
{}
for
k
,
(
dst_type
,
default
)
in
content
.
items
():
try
:
value
=
form
[
k
]
except
KeyError
:
result
[
k
]
=
default
continue
src_type
=
type
(
value
)
# First, handle the case where we want all the results. In
# this case, we need to ensure all the elements are strings,
# and not Field instances.
if
src_type
in
(
list
,
tuple
):
if
dst_type
is
list
:
result
[
k
]
=
[
x
for
x
in
value
]
continue
if
dst_type
is
tuple
:
result
[
k
]
=
tuple
([
x
for
x
in
value
])
continue
# in all the other cases, we are only interested in the
# first value.
value
=
value
[
0
]
# Maybe we already have what is expected? Then don't change
# anything.
if
isinstance
(
value
,
dst_type
):
result
[
k
]
=
value
continue
# Since we got here, 'value' is sure to be a single symbol,
# not a list kind of structure anymore.
if
dst_type
in
(
int
,
float
,
long
):
try
:
result
[
k
]
=
dst_type
(
value
)
except
:
result
[
k
]
=
default
elif
dst_type
is
tuple
:
result
[
k
]
=
(
value
,
)
elif
dst_type
is
list
:
result
[
k
]
=
[
value
]
else
:
raise
ValueError
(
'cannot cast form value
%s
of type
%r
into type
%r
'
%
(
value
,
src_type
,
dst_type
))
return
result
Event Timeline
Log In to Comment