Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F92013581
pluginutils.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Nov 16, 15:12
Size
33 KB
Mime Type
text/x-python
Expires
Mon, Nov 18, 15:12 (2 d)
Engine
blob
Format
Raw Data
Handle
22361310
Attached To
R3600 invenio-infoscience
pluginutils.py
View Options
# -*- coding: utf-8 -*-
##
## This file is part of Invenio.
## Copyright (C) 2009, 2010, 2011 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""
This module implement a generic plugin container facility.
"""
import
sys
import
os
import
glob
import
inspect
import
imp
from
invenio.config
import
CFG_PYLIBDIR
from
invenio.textutils
import
wrap_text_in_a_box
class
InvenioPluginContainerError
(
Exception
):
"""
Exception raised when some error happens during plugin loading.
"""
pass
class
PluginContainer
(
object
):
"""
This class implements a I{plugin container}.
This class implements part of the dict interface with the condition
that only correctly enabled plugins can be retrieved by their plugin_name.
>>> ## Loading all the plugin within a directory.
>>> websubmit_functions = PluginContainer(
... os.path.join(CFG_PYLIBDIR,
... 'invenio', 'websubmit_functions', '*.py')
... )
>>> ## Loading an explicit plugin.
>>> case_eds = websubmit_functions['CaseEDS']
@param plugin_pathnames: zero or more plugins_pathnames from where to load
the plugins.
@type plugin_pathnames: string/list
@param plugin_builder: a callable with the signature
C{plugin_builder(plugin_name, plugin_code)} that will be called
to extract the actual plugin from the module stored in plugin_code.
@type plugin_builder: callable
@param api_version: the API version of the plugin. If specified, plugins
which specify different versions will fail to be loaded. Default value
is C{None} which turns off the version checking.
@type api_version: integer
@param plugin_signature: a stub to be used in order to check if a loaded
plugin respect a particular signature or not.
@type plugin_signature: class/function
@ivar _plugin_map: a map between plugin_name and a dict with keys
"error", "plugin", "plugin_path", "enabled", "api_version"
@type _plugin_map: dict
@ivar _plugin_pathnames: the list of normalized plugin pathnames
corresponding to the plugins to be loaded.
@type _plugin_pathnames: list
@ivar _plugin_builder: the plugin builder as passed to the constructor.
@type plugin_builder: function
@ivar api_version: the version as provided to the constructor.
@type api_version: integer
@group Mapping interface: __contains__,__getitem__,get,has_key,items,
iteritems,iterkeys,itervalues,keys,values,__len__
@group Main API: __init__,add_plugin_pathnames,get_enabled_plugins,
get_broken_plugins,get_plugin,reload_plugins
"""
def
__init__
(
self
,
plugin_pathnames
=
None
,
plugin_builder
=
None
,
api_version
=
None
,
plugin_signature
=
None
,
external
=
False
):
self
.
_plugin_map
=
{}
self
.
_plugin_pathnames
=
[]
self
.
_external
=
external
self
.
api_version
=
api_version
if
plugin_builder
is
None
:
self
.
_plugin_builder
=
self
.
default_plugin_builder
else
:
self
.
_plugin_builder
=
plugin_builder
self
.
_plugin_signature
=
plugin_signature
if
plugin_pathnames
:
self
.
add_plugin_pathnames
(
plugin_pathnames
)
def
default_plugin_builder
(
plugin_name
,
plugin_code
):
"""
Default plugin builder used to extract the plugin from the module
that contains it.
@note: By default it will look for a class or function with the same
name of the plugin.
@param plugin_name: the name of the plugin.
@type plugin_name: string
@param plugin_code: the code of the module as just read from
filesystem.
@type plugin_code: module
@return: the plugin
"""
return
getattr
(
plugin_code
,
plugin_name
)
default_plugin_builder
=
staticmethod
(
default_plugin_builder
)
def
add_plugin_pathnames
(
self
,
plugin_pathnames
):
"""
Add a one or more plugin pathnames, i.e. full plugin path exploiting
wildcards, e.g. "bibformat_elements/bfe_*.py".
@note: these plugins_pathnames will be added to the current list of
plugin_pathnames, and all the plugins will be reloaded.
@param plugin_pathnames: one or more plugins_pathnames
@type plugin_pathnames: string/list
"""
if
type
(
plugin_pathnames
)
is
str
:
self
.
_plugin_pathnames
.
append
(
plugin_pathnames
)
else
:
self
.
_plugin_pathnames
.
extend
(
plugin_pathnames
)
self
.
reload_plugins
()
def
enable_plugin
(
self
,
plugin_name
):
"""
Enable plugin_name.
@param plugin_name: the plugin name.
@type plugin_name: string
@raise KeyError: if the plugin does not exists.
"""
self
.
_plugin_map
[
plugin_name
][
'enabled'
]
=
True
def
disable_plugin
(
self
,
plugin_name
):
"""
Disable plugin_name.
@param plugin_name: the plugin name.
@type plugin_name: string
@raise KeyError: if the plugin does not exists.
"""
self
.
_plugin_map
[
plugin_name
][
'enabled'
]
=
False
def
plugin_enabled_p
(
self
,
plugin_name
):
"""
Returns True if the plugin is correctly enabled.
@param plugin_name: the plugin name.
@type plugin_name: string
@return: True if the plugin is correctly enabled..
@rtype: bool
@raise KeyError: if the plugin does not exists.
"""
return
self
.
_plugin_map
[
plugin_name
][
'enabled'
]
def
get_plugin_filesystem_path
(
self
,
plugin_name
):
"""
Returns the filesystem path from where the plugin was loaded.
@param plugin_name: the plugin name.
@type plugin_name: string
@return: the filesystem path.
@rtype: string
@raise KeyError: if the plugin does not exists.
"""
return
self
.
_plugin_map
[
plugin_name
][
'plugin_path'
]
def
get_plugin
(
self
,
plugin_name
):
"""
Returns the plugin corresponding to plugin_name.
@param plugin_name: the plugin name,
@type plugin_name: string
@return: the plugin
@raise KeyError: if the plugin does not exists or is not enabled.
"""
if
self
.
_plugin_map
[
plugin_name
][
'enabled'
]:
return
self
.
_plugin_map
[
plugin_name
][
'plugin'
]
else
:
raise
KeyError
(
'"
%s
" is not enabled'
%
plugin_name
)
def
get_broken_plugins
(
self
):
"""
Returns a map between plugin names and errors, in the form of
C{sys.exc_info} structure.
@return: plugin_name -> sys.exc_info().
@rtype: dict
"""
ret
=
{}
for
plugin_name
,
plugin
in
self
.
_plugin_map
.
iteritems
():
if
plugin
[
'error'
]:
ret
[
plugin_name
]
=
plugin
[
'error'
]
return
ret
def
reload_plugins
(
self
,
reload
=
False
):
"""
For the plugins found through iterating in the plugin_pathnames, loads
and working plugin.
@note: if a plugin has the same plugin_name of an already loaded
plugin, the former will override the latter (provided that the
former had a compatible signature to the latter).
@note: any plugin that fails to load will be added to the plugin
map as disabled and the sys.exc_info() captured during the
Exception will be stored. (if the failed plugin was supposed to
override an existing one, the latter will be overridden by
the failed former).
"""
# The reload keyword argument exists for backwards compatibility.
# Previously, reload_plugins, would not reload a module due to a bug.
for
plugin_path
in
self
.
_plugin_pathnames_iterator
():
self
.
_load_plugin
(
plugin_path
,
reload
=
reload
)
def
normalize_plugin_path
(
self
,
plugin_path
):
"""
Returns a normalized plugin_path.
@param plugin_path: the plugin path.
@type plugin_path: string
@return: the normalized plugin path.
@rtype: string
@raise ValueError: if the path is not under CFG_PYLIBDIR/invenio
"""
invenio_path
=
os
.
path
.
abspath
(
os
.
path
.
join
(
CFG_PYLIBDIR
,
'invenio'
))
plugin_path
=
os
.
path
.
abspath
(
plugin_path
)
if
not
self
.
_external
and
not
os
.
path
.
abspath
(
plugin_path
)
.
startswith
(
invenio_path
):
raise
ValueError
(
'A plugin should be stored under "
%s
" ("
%s
" was'
' specified)'
%
(
invenio_path
,
plugin_path
))
return
plugin_path
def
_plugin_pathnames_iterator
(
self
):
"""
Returns an iterator over all the normalized plugin path.
@note: older plugin_pathnames are considered first, and newer
plugin_pathnames later, so that plugin overriding is possible.
@return: the iterator over plugin paths.
@rtype: iterator
"""
for
plugin_pathname
in
self
.
_plugin_pathnames
:
for
plugin_path
in
glob
.
glob
(
plugin_pathname
):
yield
self
.
normalize_plugin_path
(
plugin_path
)
def
get_plugin_name
(
plugin_path
):
"""
Returns the name of the plugin after the plugin_path.
@param plugin_path: the filesystem path to the plugin code.
@type plugin_path: string
@return: the plugin name.
@rtype: string
"""
plugin_name
=
os
.
path
.
basename
(
plugin_path
)
if
plugin_name
.
endswith
(
'.py'
):
plugin_name
=
plugin_name
[:
-
len
(
'.py'
)]
return
plugin_name
get_plugin_name
=
staticmethod
(
get_plugin_name
)
def
_load_plugin
(
self
,
plugin_path
,
reload
=
False
):
"""
Load a plugin in the plugin map.
@note: if the plugin_name calculated from plugin_path corresponds to
an already existing plugin, the old plugin will be overridden and
if the old plugin was correctly loaded but disabled also the
new plugin will be disabled.
@param plugin_path: the plugin path.
@type plugin_path: string
"""
api_version
=
None
try
:
plugin_name
=
self
.
get_plugin_name
(
plugin_path
)
# Let's see if the module is already loaded
plugin
=
None
if
plugin_name
in
sys
.
modules
:
mod
=
sys
.
modules
[
plugin_name
]
if
os
.
path
.
splitext
(
mod
.
__file__
)[
0
]
==
os
.
path
.
splitext
(
plugin_path
)[
0
]:
plugin
=
mod
if
not
plugin
or
reload
:
# Let's load the plugin module.
plugin_fp
,
plugin_path
,
plugin_desc
=
imp
.
find_module
(
plugin_name
,
[
os
.
path
.
dirname
(
plugin_path
)]
)
try
:
plugin
=
imp
.
load_module
(
plugin_name
,
plugin_fp
,
plugin_path
,
plugin_desc
)
finally
:
if
plugin_fp
:
plugin_fp
.
close
()
## Let's check for API version.
api_version
=
getattr
(
plugin
,
'__plugin_version__'
,
None
)
if
self
.
api_version
and
api_version
!=
self
.
api_version
:
raise
InvenioPluginContainerError
(
"Plugin version mismatch."
" Expected
%s
, found
%s
"
%
(
self
.
api_version
,
api_version
))
## Let's load the actual plugin
plugin
=
self
.
_plugin_builder
(
plugin_name
,
plugin
)
## Are we overriding an already loaded plugin?
enabled
=
True
if
plugin_name
in
self
.
_plugin_map
:
old_plugin
=
self
.
_plugin_map
[
plugin_name
]
if
old_plugin
[
'error'
]
is
None
:
enabled
=
old_plugin
[
'enabled'
]
check_signature
(
plugin_name
,
old_plugin
[
'plugin'
],
plugin
)
## Let's check the plugin signature.
if
self
.
_plugin_signature
:
check_signature
(
plugin_name
,
self
.
_plugin_signature
,
plugin
)
self
.
_plugin_map
[
plugin_name
]
=
{
'plugin'
:
plugin
,
'error'
:
None
,
'plugin_path'
:
plugin_path
,
'enabled'
:
enabled
,
'api_version'
:
api_version
,
}
except
Exception
:
self
.
_plugin_map
[
plugin_name
]
=
{
'plugin'
:
None
,
'error'
:
sys
.
exc_info
(),
'plugin_path'
:
plugin_path
,
'enabled'
:
False
,
'api_version'
:
api_version
,
}
def
__getitem__
(
self
,
plugin_name
):
"""
As in C{dict.__getitem__} but apply plugin name normalization and check
if the plugin is correctly enabled.
@param plugin_name: the name of the plugin
@type plugin_name: string
@return: the plugin.
@raise KeyError: if the corresponding plugin is not enabled or there
were some errors.
"""
plugin_name
=
self
.
get_plugin_name
(
plugin_name
)
if
plugin_name
in
self
.
_plugin_map
and
\
self
.
_plugin_map
[
plugin_name
][
'enabled'
]
is
True
:
return
self
.
_plugin_map
[
plugin_name
][
'plugin'
]
else
:
raise
KeyError
(
'"
%s
" does not exists or is not correctly enabled'
%
plugin_name
)
def
__contains__
(
self
,
plugin_name
):
"""
As in C{dict.__contains__} but apply plugin name normalization and
check if the plugin is correctly enabled.
@param plugin_name: the name of the plugin
@type plugin_name: string
@return: True if plugin_name is correctly there.
@rtype: bool
"""
plugin_name
=
self
.
get_plugin_name
(
plugin_name
)
return
plugin_name
in
self
.
_plugin_map
and
\
self
.
_plugin_map
[
plugin_name
][
'enabled'
]
is
True
def
__len__
(
self
):
"""
As in C{dict.__len__} but consider only correctly enabled plugins.
@return: the total number of plugins correctly enabled.
@rtype: integer
"""
count
=
0
for
plugin
in
self
.
_plugin_map
.
values
():
if
plugin
[
'enabled'
]:
count
+=
1
return
count
def
get
(
self
,
plugin_name
,
default
=
None
):
"""
As in C{dict.get} but consider only correctly enabled plugins.
@param plugin_name: the name of the plugin
@type plugin_name: string
@param default: the default value to return if plugin_name does not
correspond to a correctly enabled plugin.
@return: the total number of plugins correctly enabled.
@rtype: integer
"""
try
:
return
self
.
__getitem__
(
plugin_name
)
except
KeyError
:
return
default
def
has_key
(
self
,
plugin_name
):
"""
As in C{dict.has_key} but apply plugin name normalization and check
if the plugin is correctly enabled.
@param plugin_name: the name of the plugin
@type plugin_name: string
@return: True if plugin_name is correctly there.
@rtype: bool
"""
return
self
.
__contains__
(
plugin_name
)
def
items
(
self
):
"""
As in C{dict.items} but checks if the plugin are correctly enabled.
@return: list of (plugin_name, plugin).
@rtype: [(plugin_name, plugin), ...]
"""
ret
=
[]
for
plugin_name
,
plugin
in
self
.
_plugin_map
.
iteritems
():
if
plugin
[
'enabled'
]:
ret
.
append
((
plugin_name
,
plugin
[
'plugin'
]))
return
ret
def
iteritems
(
self
):
"""
As in C{dict.iteritems} but checks if the plugin are correctly enabled.
@return: an iterator over the (plugin_name, plugin) items.
"""
for
plugin_name
,
plugin
in
self
.
_plugin_map
.
iteritems
():
if
plugin
[
'enabled'
]:
yield
(
plugin_name
,
plugin
[
'plugin'
])
def
iterkeys
(
self
):
"""
As in C{dict.iterkeys} but checks if the plugin are correctly enabled.
@return: an iterator over the plugin_names.
"""
for
plugin_name
,
plugin
in
self
.
_plugin_map
.
iteritems
():
if
plugin
[
'enabled'
]:
yield
plugin_name
__iter__
=
iterkeys
def
itervalues
(
self
):
"""
As in C{dict.itervalues} but checks if the plugin are correctly
enabled.
@return: an iterator over the plugins.
"""
for
plugin
in
self
.
_plugin_map
.
itervalues
():
if
plugin
[
'enabled'
]:
yield
plugin
[
'plugin'
]
def
keys
(
self
):
"""
As in C{dict.keys} but checks if the plugin are correctly enabled.
@return: the list of enabled plugin_names.
@rtype: list of strings
"""
ret
=
[]
for
plugin_name
,
plugin
in
self
.
_plugin_map
.
iteritems
():
if
plugin
[
'enabled'
]:
ret
.
append
(
plugin_name
)
return
ret
def
values
(
self
):
"""
As in C{dict.values} but checks if the plugin are correctly enabled.
@return: the list of enabled plugin codes.
"""
return
[
plugin
[
'plugin'
]
\
for
plugin
in
self
.
_plugin_map
.
values
()
if
plugin
[
'enabled'
]]
def
get_enabled_plugins
(
self
):
"""
Return a map of the correctly enabled plugins.
@return: a map plugin_name -> plugin
@rtype: dict
"""
ret
=
{}
for
plugin_name
,
plugin
in
self
.
_plugin_map
.
iteritems
():
if
plugin
[
'enabled'
]:
ret
[
plugin_name
]
=
plugin
[
'plugin'
]
return
ret
def
check_signature
(
object_name
,
reference_object
,
other_object
):
"""
Given a reference class or function check if an other class or function
could be substituted without causing any instantiation/usage issues.
@param object_name: the name of the object being checked.
@type object_name: string
@param reference_object: the reference class or function.
@type reference_object: class/function
@param other_object: the other class or function to be checked.
@type other_object: class/function
@raise InvenioPluginContainerError: in case the other object is not
compatible with the reference object.
"""
try
:
if
inspect
.
isclass
(
reference_object
):
## if the reference_object is a class
if
inspect
.
isclass
(
other_object
):
## if the other_object is a class
if
issubclass
(
other_object
,
reference_object
):
## if the other_object is derived from the reference we
## should check for all the method in the former that
## exists in the the latter, wethever they recursively have
## the same signature.
reference_object_map
=
dict
(
inspect
.
getmembers
(
reference_object
,
inspect
.
isroutine
))
for
other_method_name
,
other_method_code
in
\
inspect
.
getmembers
(
other_object
,
inspect
.
isroutine
):
if
other_method_name
in
reference_object_map
:
check_signature
(
object_name
,
reference_object_map
[
other_method_name
],
other_method_code
)
else
:
## if the other_object is not derived from the
## reference_object then all the method declared in the
## latter should exist in the former and they should
## recursively have the same signature.
other_object_map
=
dict
(
inspect
.
getmembers
(
other_object
,
inspect
.
isroutine
))
for
reference_method_name
,
reference_method_code
in
\
inspect
.
getmembers
(
reference_object
,
inspect
.
isroutine
):
if
reference_method_name
in
other_object_map
:
check_signature
(
object_name
,
reference_method_code
,
other_method_code
)
else
:
raise
InvenioPluginContainerError
(
'"
%s
", which'
' exists in the reference class, does not'
' exist in the other class, and the reference'
' class is not an anchestor of the other'
%
reference_method_name
)
else
:
## We are comparing apples and oranges!
raise
InvenioPluginContainerError
(
"
%s
(the reference object)"
" is a class while
%s
(the other object) is not a class"
%
(
reference_object
,
other_object
))
elif
inspect
.
isroutine
(
reference_object
):
## if the reference_object is a function
if
inspect
.
isroutine
(
other_object
):
## if the other_object is a function we will compare the
## reference_object and other_object function signautre i.e.
## their parameters.
reference_args
,
reference_varargs
,
reference_varkw
,
\
reference_defaults
=
inspect
.
getargspec
(
reference_object
)
other_args
,
other_varargs
,
other_varkw
,
\
other_defaults
=
inspect
.
getargspec
(
other_object
)
## We normalize the reference_defaults to be a list
if
reference_defaults
is
not
None
:
reference_defaults
=
list
(
reference_defaults
)
else
:
reference_defaults
=
[]
## We normalize the other_defaults to be a list
if
other_defaults
is
not
None
:
other_defaults
=
list
(
other_defaults
)
else
:
other_defaults
=
[]
## Check for presence of missing parameters in other function
if
not
(
other_varargs
or
other_varkw
):
for
reference_arg
in
reference_args
:
if
reference_arg
not
in
other_args
:
raise
InvenioPluginContainerError
(
'Argument "
%s
"'
' in reference function
%s
does not exist in'
' the other function
%s
'
%
(
reference_arg
,
reference_object
,
other_object
))
## Check for presence of additional parameters in other
## function
if
not
(
reference_varargs
or
reference_varkw
):
for
other_arg
in
other_args
:
if
other_arg
not
in
reference_args
:
raise
InvenioPluginContainerError
(
'Argument "
%s
"'
' in other function
%s
does not exist in the'
' reference function
%s
'
%
(
other_arg
,
other_object
,
reference_object
))
## Check sorting of arguments
for
reference_arg
,
other_arg
in
map
(
None
,
reference_args
,
other_args
):
if
not
((
reference_arg
==
other_arg
)
or
(
reference_arg
is
None
and
(
reference_varargs
or
reference_varkw
))
or
(
other_arg
is
None
and
(
other_args
or
other_varargs
))):
raise
InvenioPluginContainerError
(
'Argument "
%s
" in'
' the other function is in the position of'
' argument "
%s
" in the reference function, i.e.'
' the order of arguments is not respected'
%
(
other_arg
,
reference_arg
))
if
len
(
reference_defaults
)
!=
len
(
other_defaults
)
and
\
not
(
reference_args
or
reference_varargs
or
other_args
or
other_varargs
):
raise
InvenioPluginContainerError
(
"Default parameters in"
" the other function are not corresponding to the"
" default of parameters of the reference function"
)
else
:
## We are comparing apples and oranges!
raise
InvenioPluginContainerError
(
'
%s
(the reference object)'
' is a function while
%s
(the other object) is not a'
' function'
%
(
reference_object
,
other_object
))
except
InvenioPluginContainerError
,
err
:
try
:
sourcefile
=
inspect
.
getsourcefile
(
other_object
)
sourceline
=
inspect
.
getsourcelines
(
other_object
)[
1
]
except
IOError
:
## other_object is not loaded from a real file
sourcefile
=
'N/A'
sourceline
=
'N/A'
raise
InvenioPluginContainerError
(
'Error in checking signature for'
' "
%s
" as defined at "
%s
" (line
%s
):
%s
'
%
(
object_name
,
sourcefile
,
sourceline
,
err
))
def
create_enhanced_plugin_builder
(
compulsory_objects
=
None
,
optional_objects
=
None
,
other_data
=
None
):
"""
Creates a plugin_builder function suitable to extract some specific
objects (either compulsory or optional) and other simpler data
>>> def dummy_needed_funct1(foo, bar):
... pass
>>> class dummy_needed_class1:
... def __init__(self, baz):
... pass
>>> def dummy_optional_funct2(boo):
... pass
>>> create_enhanced_plugin_builder(
... compulsory_objects={
... 'needed_funct1' : dummy_needed_funct1,
... 'needed_class1' : dummy_needed_class1
... },
... optional_objects={
... 'optional_funct2' : dummy_optional_funct2,
... },
... other_data={
... 'CFG_SOME_DATA' : (str, ''),
... 'CFG_SOME_INT' : (int, 0),
... })
<function plugin_builder at 0xb7812064>
@param compulsory_objects: map of name of an object to look for inside
the C{plugin_code} and a I{signature} for a class or callable. Every
name specified in this map B{must exists} in the plugin_code, otherwise
the plugin will fail to load.
@type compulsory_objects: dict
@param optional_objects: map of name of an object to look for inside
the C{plugin_code} and a I{signature} for a class or callable. Every
name specified in this map must B{can exists} in the plugin_code.
@type optional_objects: dict
@param other_data: map of other simple data that can be loaded from
the plugin_code. The map has the same format of the C{content}
parameter of L{invenio.webinterface_handler.wash_urlargd}.
@type other_data: dict
@return: a I{plugin_builder} function that can be used with the
C{PluginContainer} constructor. Such function will build the plugin
in the form of a map, where every key is one of the keys inside
the three maps provided as parameters and the corresponding value
is the expected class or callable or simple data.
"""
from
invenio.webinterface_handler
import
wash_urlargd
def
plugin_builder
(
plugin_name
,
plugin_code
):
"""
Enhanced plugin_builder created by L{create_enhanced_plugin_builder}.
@param plugin_name: the name of the plugin.
@type plugin_name: string
@param plugin_code: the code of the module as just read from
filesystem.
@type plugin_code: module
@return: the plugin in the form of a map.
"""
plugin
=
{}
if
compulsory_objects
:
for
object_name
,
object_signature
in
\
compulsory_objects
.
iteritems
():
the_object
=
getattr
(
plugin_code
,
object_name
,
None
)
if
the_object
is
None
:
raise
InvenioPluginContainerError
(
'Plugin "
%s
" does not '
'contain compulsory object "
%s
"'
%
(
plugin_name
,
object_name
))
try
:
check_signature
(
object_name
,
the_object
,
object_signature
)
except
InvenioPluginContainerError
,
err
:
raise
InvenioPluginContainerError
(
'Plugin "
%s
" contains '
'object "
%s
" with a wrong signature:
%s
'
%
(
plugin_name
,
object_name
,
err
))
plugin
[
object_name
]
=
the_object
if
optional_objects
:
for
object_name
,
object_signature
in
optional_objects
.
iteritems
():
the_object
=
getattr
(
plugin_code
,
object_name
,
None
)
if
the_object
is
not
None
:
try
:
check_signature
(
object_name
,
the_object
,
object_signature
)
except
InvenioPluginContainerError
,
err
:
raise
InvenioPluginContainerError
(
'Plugin "
%s
" '
'contains object "
%s
" with a wrong signature:
%s
'
%
(
plugin_name
,
object_name
,
err
))
plugin
[
object_name
]
=
the_object
if
other_data
:
the_other_data
=
{}
for
data_name
,
(
dummy
,
data_default
)
in
other_data
.
iteritems
():
the_other_data
[
data_name
]
=
getattr
(
plugin_code
,
data_name
,
data_default
)
try
:
the_other_data
=
wash_urlargd
(
the_other_data
,
other_data
)
except
Exception
,
err
:
raise
InvenioPluginContainerError
(
'Plugin "
%s
" contains other '
'data with problems:
%s
'
%
(
plugin_name
,
err
))
plugin
.
update
(
the_other_data
)
return
plugin
return
plugin_builder
def
get_callable_signature_as_string
(
the_callable
):
"""
Returns a string representing a callable as if it would have been
declared on the prompt.
>>> def foo(arg1, arg2, arg3='val1', arg4='val2', *args, **argd):
... pass
>>> get_callable_signature_as_string(foo)
def foo(arg1, arg2, arg3='val1', arg4='val2', *args, **argd)
@param the_callable: the callable to be analyzed.
@type the_callable: function/callable.
@return: the signature.
@rtype: string
"""
args
,
varargs
,
varkw
,
defaults
=
inspect
.
getargspec
(
the_callable
)
tmp_args
=
list
(
args
)
args_dict
=
{}
if
defaults
:
defaults
=
list
(
defaults
)
else
:
defaults
=
[]
while
defaults
:
args_dict
[
tmp_args
.
pop
()]
=
defaults
.
pop
()
while
tmp_args
:
args_dict
[
tmp_args
.
pop
()]
=
None
args_list
=
[]
for
arg
in
args
:
if
args_dict
[
arg
]
is
not
None
:
args_list
.
append
(
"
%s
=
%s
"
%
(
arg
,
repr
(
args_dict
[
arg
])))
else
:
args_list
.
append
(
arg
)
if
varargs
:
args_list
.
append
(
"*
%s
"
%
varargs
)
if
varkw
:
args_list
.
append
(
"**
%s
"
%
varkw
)
args_string
=
', '
.
join
(
args_list
)
return
"def
%s
(
%s
)"
%
(
the_callable
.
__name__
,
args_string
)
def
get_callable_documentation
(
the_callable
):
"""
Returns a string with the callable signature and its docstring.
@param the_callable: the callable to be analyzed.
@type the_callable: function/callable.
@return: the signature.
@rtype: string
"""
return
wrap_text_in_a_box
(
title
=
get_callable_signature_as_string
(
the_callable
),
body
=
(
getattr
(
the_callable
,
'__doc__'
)
or
'No documentation'
)
.
replace
(
'
\n
'
,
'
\n\n
'
),
style
=
'ascii_double'
)
def
check_arguments_compatibility
(
the_callable
,
argd
):
"""
Check if calling the_callable with the given arguments would be correct
or not.
>>> def foo(arg1, arg2, arg3='val1', arg4='val2', *args, **argd):
... pass
>>> try: check_arguments_compatibility(foo, {'arg1': 'bla', 'arg2': 'blo'})
... except ValueError, err: print 'failed'
... else: print 'ok'
ok
>>> try: check_arguments_compatibility(foo, {'arg1': 'bla'})
... except ValueError, err: print 'failed'
... else: print 'ok'
failed
Basically this function is simulating the call:
>>> the_callable(**argd)
but it only checks for the correctness of the arguments, without
actually calling the_callable.
@param the_callable: the callable to be analyzed.
@type the_callable: function/callable
@param argd: the arguments to be passed.
@type argd: dict
@raise ValueError: in case of uncompatibility
"""
if
not
argd
:
argd
=
{}
args
,
dummy
,
varkw
,
defaults
=
inspect
.
getargspec
(
the_callable
)
tmp_args
=
list
(
args
)
optional_args
=
[]
args_dict
=
{}
if
defaults
:
defaults
=
list
(
defaults
)
else
:
defaults
=
[]
while
defaults
:
arg
=
tmp_args
.
pop
()
optional_args
.
append
(
arg
)
args_dict
[
arg
]
=
defaults
.
pop
()
while
tmp_args
:
args_dict
[
tmp_args
.
pop
()]
=
None
for
arg
,
dummy_value
in
argd
.
iteritems
():
if
arg
in
args_dict
:
del
args_dict
[
arg
]
elif
not
varkw
:
raise
ValueError
(
'Argument
%s
not expected when calling callable '
'"
%s
" with arguments
%s
'
%
(
arg
,
get_callable_signature_as_string
(
the_callable
),
argd
))
for
arg
in
args_dict
.
keys
():
if
arg
in
optional_args
:
del
args_dict
[
arg
]
if
args_dict
:
raise
ValueError
(
'Arguments
%s
not specified when calling callable '
'"
%s
" with arguments
%s
'
%
(
', '
.
join
(
args_dict
.
keys
()),
get_callable_signature_as_string
(
the_callable
),
argd
))
Event Timeline
Log In to Comment