Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F93162065
bibfield_utils.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Tue, Nov 26, 16:42
Size
21 KB
Mime Type
text/x-python
Expires
Thu, Nov 28, 16:42 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
22582509
Attached To
R3600 invenio-infoscience
bibfield_utils.py
View Options
# -*- coding: utf-8 -*-
##
## This file is part of Invenio.
## Copyright (C) 2004, 2005, 2006, 2007, 2008, 2010, 2011, 2013 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""
BibField Utils
Helper classes and functions to work with BibField
"""
import
re
__revision__
=
"$Id$"
import
os
import
datetime
from
invenio.config
import
CFG_PYLIBDIR
from
invenio.pluginutils
import
PluginContainer
CFG_BIBFIELD_FUNCTIONS
=
PluginContainer
(
os
.
path
.
join
(
CFG_PYLIBDIR
,
'invenio'
,
'bibfield_functions'
,
'*.py'
))
class
BibFieldException
(
Exception
):
"""
General exception to use within BibField
"""
pass
class
BibFieldCheckerException
(
Exception
):
"""
Exception raised when some error happens during checking
"""
pass
class
BibFieldDict
(
object
):
"""
This class implements a I{dict} mostly and uses special key naming for
accessing as describe in __getitem__
>>> #Creating a dictionary
>>> d = BibFieldDict()
>>> #Filling up the dictionary
>>> d['foo'] = {'a': 'world', 'b':'hello'}
>>> d['a'] = [ {'b':1}, {'b':2}, {'b':3} ]
>>> d['_c'] = "random.randint(1,100)"
>>> #Accessing data inside the dictionary
>>> d['a']
>>> d['a[0]']
>>> d['a.b']
>>> d['a[1:]']
>>> d['_c'] #this value will be calculated on the fly
"""
def
__init__
(
self
):
self
.
rec_json
=
{}
self
.
_aliases
=
{}
self
.
_do_not_cache
=
[]
self
.
_init_fase
=
True
def
__getitem__
(
self
,
key
):
"""
As in C{dict.__getitem__} but using BibField name convention.
@param key: String containing the name of the field and subfield.
For e.g. lest work with:
{'a': [ {'b':1}, {'b':2}, {'b':3} ], '_c': [42, random.randint(1,100)"] }
- 'a' -> All the 'a' field info
[{'b': 1}, {'b': 2}, {'b': 3}]
- 'a[0]' -> All the info of the first element inside 'a'
{'b': 1}
- 'a[0].b' -> Field 'b' for the first element in 'a'
1
- 'a[1:]' -> All the 'a' field info but the first
[{'b': 2}, {'b': 3}]
- 'a.b' -> All the 'b' inside 'b'
[1, 2, 3]
- '_c- -> will give us the random number that is cached
42
- ... any other combination ...
- ... as deep as the dictionary is ...
NOTE: accessing one value in a normal way, meaning d['a'], is almost as
fast as accessing a regular dictionary. But using the special name
convention is a bit slower than using the regular access.
d['a[0].b'] -> 10000 loops, best of 3: 18.4 us per loop
d['a'][0]['b'] -> 1000000 loops, best of 3: 591 ns per loop
@return: The value of the field, this might be, a dictionary, a list,
a string, or any combination of the three depending on the value of
field
"""
try
:
if
'.'
not
in
key
and
'['
not
in
key
:
dict_part
=
self
.
rec_json
[
key
]
else
:
dict_part
=
self
.
rec_json
for
group
in
prepare_field_keys
(
key
):
dict_part
=
self
.
_get_intermediate_value
(
dict_part
,
group
)
except
KeyError
,
err
:
return
self
[
key
.
replace
(
err
.
message
,
self
.
_aliases
[
err
.
message
]
.
replace
(
'[n]'
,
'[1:]'
),
1
)]
if
re
.
search
(
'^_[a-zA-Z0-9]'
,
key
):
if
key
in
self
.
_do_not_cache
:
self
.
update_field_cache
(
key
)
dict_part
=
dict_part
[
0
]
return
dict_part
def
__setitem__
(
self
,
key
,
value
):
"""
As in C{dict.__setitem__} but using BibField name convention.
@note: at creation time dict['a[-1]'] = 'something' will mean
dict['a'].append('something') and if the field already exists and is
not a list, then this method will create a list with the existing value
and append the new one,
dict['a'] = 'first value' -> {'a':'first value'}
dict['a'] = 'second value' -> {'a':['first value', 'second value']}
There is one class variable self._init_fase for that matter.
@param key: String containing the name of the field and subfield.
@param value: The new value
"""
if
self
.
_init_fase
:
if
'.'
not
in
key
and
'['
not
in
key
:
if
not
key
in
self
.
rec_json
:
self
.
rec_json
[
key
]
=
value
return
tmp
=
self
.
rec_json
[
key
]
if
tmp
is
None
:
self
.
rec_json
[
key
]
=
value
else
:
if
not
isinstance
(
tmp
,
list
):
self
.
rec_json
[
key
]
=
[
tmp
]
self
.
rec_json
[
key
]
.
append
(
value
)
else
:
try
:
dict_part
=
eval
(
"self.rec_json
%s
"
%
(
''
.
join
(
prepare_field_keys
(
key
)),))
# kwalitee: disable=eval
except
:
build_data_structure
(
self
.
rec_json
,
key
)
dict_part
=
eval
(
"self.rec_json
%s
"
%
(
''
.
join
(
prepare_field_keys
(
key
)),))
if
dict_part
:
exec
(
"self.rec_json
%s
.append(value)"
%
(
''
.
join
(
prepare_field_keys
(
key
,
write
=
True
)[:
-
1
]),))
else
:
exec
(
"self.rec_json
%s
= value"
%
(
''
.
join
(
prepare_field_keys
(
key
)),))
else
:
if
'.'
not
in
key
and
'['
not
in
key
:
self
.
rec_json
[
key
]
=
value
else
:
try
:
exec
(
"self.rec_json
%s
= value"
%
(
''
.
join
(
prepare_field_keys
(
key
)),))
except
:
build_data_structure
(
self
.
rec_json
,
key
)
exec
(
"self.rec_json
%s
= value"
%
(
''
.
join
(
prepare_field_keys
(
key
)),))
def
__delitem__
(
self
,
key
):
"""
As in C{dict.__delitem__}.
@note: It only works with first keys
"""
del
self
.
rec_json
[
key
]
def
__contains__
(
self
,
key
):
"""
As in C{dict.__contains__} but using BibField name convention.
@param key: Name of the key
@return: True if the dictionary contains the special key
"""
if
'.'
not
in
key
and
'['
not
in
key
:
return
key
in
self
.
rec_json
try
:
self
[
key
]
except
:
return
False
return
True
def
__eq__
(
self
,
other
):
"""@see C{dict.__eq__}"""
try
:
return
dict
.
__eq__
(
self
.
rec_json
,
other
.
rec_json
)
except
:
return
False
def
__repr__
(
self
):
"""@see C{dict.__repr__}"""
return
repr
(
self
.
rec_json
)
def
__iter__
(
self
):
"""@see C{dict.__iter__}"""
return
iter
(
self
.
rec_json
)
def
__len__
(
self
):
"""@see C{dict.__len__}"""
return
len
(
self
.
rec_json
)
def
keys
(
self
):
"""@see C{dict.keys}"""
return
self
.
rec_json
.
keys
()
def
iteritems
(
self
):
"""@see C{dict.iteritems}"""
return
self
.
rec_json
.
iteritems
()
def
iterkeys
(
self
):
"""@see C{dict.iterkeys}"""
return
self
.
rec_json
.
iterkeys
()
def
itervalues
(
self
):
"""@see C{dict.itervalues}"""
return
self
.
rec_json
.
itervalues
()
def
has_key
(
self
,
key
):
"""
As in C{dict.has_key} but using BibField name convention.
@see __contains__(self, key)
"""
return
self
.
__contains__
(
key
)
def
get
(
self
,
field
=
None
,
default
=
None
,
reset_cache
=
False
,
formatstring
=
None
,
formatfunction
=
None
):
"""
As in C{dict.get} it Retrieves the value of field from the json structure
but using BibField name convention and also applies some formating if
present.
@see __getitem__(self, key)
@param field: Name of the field/s to retrieve. If it is None then it
will return the entire dictionary.
@param default: in case of error this value will be returned
@param formatstring: Optional parameter to format the output value.
This could be a format string, like this example:
>>> d['foo'] = {'a': 'world', 'b':'hello'}
>>> get('foo', formatstring="{0[b]} {0[a]}!")
>>> 'hello world!'
@param formatfunction: Optional parameter to format the output value.
This parameter must be function and must handle all the possible
parameter types (strin, dict or list)
@return: The value of the field, this might be, a dictionary, a list,
a string, or any combination of the three depending on the value of
field. If any formating parameter is present, then the return value
will be the formated value.
"""
if
field
.
startswith
(
'_'
)
and
reset_cache
:
self
.
update_field_cache
(
field
)
value
=
self
.
rec_json
if
field
:
try
:
value
=
self
.
__getitem__
(
field
)
except
:
return
default
if
not
value
:
return
default
if
formatstring
:
value
=
self
.
_apply_formatstring
(
value
,
formatstring
)
if
formatfunction
:
value
=
formatfunction
(
value
)
return
value
def
update_field_cache
(
self
,
field
):
"""
Updates the value of the cache for the given calculated field
"""
calculated_field
=
self
.
rec_json
.
get
(
field
)
if
calculated_field
and
field
.
startswith
(
'_'
):
calculated_field
[
0
]
=
self
.
_try_to_eval
(
calculated_field
[
1
])
def
update_all_fields_cache
(
self
):
"""
Update the cache of all the calculated fields
@see: update_field_cache()
"""
for
field
in
[
key
for
key
in
self
.
keys
()
if
key
.
startswith
(
'_'
)]:
self
.
update_field_cache
(
field
)
def
_try_to_eval
(
self
,
string
,
bibfield_functions_only
=
False
,
**
context
):
"""
This method takes care of evaluating the python expression, and, if an
exception happens, it tries to import the needed module from bibfield_functions
or from the python path using plugin utils
@param string: String to evaluate
@param context: Context needed, in some cases, to evaluate the string
@return: The value of the expression inside string
"""
if
not
string
:
return
None
res
=
None
imports
=
[]
while
(
True
):
try
:
res
=
eval
(
string
,
globals
()
.
update
(
context
),
locals
())
# kwalitee: disable=eval
except
NameError
,
err
:
import_name
=
err
.
message
.
split
(
"'"
)[
1
]
if
not
import_name
in
imports
:
if
import_name
in
CFG_BIBFIELD_FUNCTIONS
:
globals
()[
import_name
]
=
CFG_BIBFIELD_FUNCTIONS
[
import_name
]
elif
not
bibfield_functions_only
:
globals
()[
import_name
]
=
__import__
(
import_name
)
imports
.
append
(
import_name
)
continue
assert
False
,
'Error not expected when trying to import bibfield function module'
return
res
def
_apply_formatstring
(
self
,
value
,
formatstring
):
"""
Helper function that simply formats the result of get() using a
format string
If the value is of type datetime it tries to apply the format using
strftime(formatstring).
@see: get(self, field=None, formatstring=None, formatfunction=None)
@param value: String, dict or list to apply the format string
@param formatstring: formatstring
@return: Formated value of "value"
"""
if
not
value
:
return
''
if
isinstance
(
value
,
datetime
.
datetime
):
if
formatstring
==
value
.
strftime
(
formatstring
):
value
=
value
.
isoformat
()
else
:
return
value
.
strftime
(
formatstring
)
if
isinstance
(
value
,
list
):
tmp
=
''
for
element
in
value
:
tmp
+=
self
.
_apply_formatstring
(
element
,
formatstring
)
return
tmp
elif
isinstance
(
value
,
dict
)
or
isinstance
(
value
,
basestring
):
return
formatstring
.
format
(
value
)
else
:
assert
False
,
'String, Dictionay or List expected'
def
_get_intermediate_value
(
self
,
dict_part
,
field
):
"""
Helper function that fetch the value of some field from dict_part
@see: get(self, field=None, formatstring=None, formatfunction=None)
@param dict_part: Dictionary or list containing all the information from
this method will fetch field.
@param field: Name or index of the field to fetch from dict_part
@return: The value of the field, this might be, a dictionary, a list,
a string, or any combination of the three depending on the value of
field
"""
if
isinstance
(
dict_part
,
dict
):
return
eval
(
'dict_part
%s
'
%
field
)
# kwalitee: disable=eval
elif
isinstance
(
dict_part
,
list
):
tmp
=
[]
for
element
in
dict_part
:
tmp
.
append
(
self
.
_get_intermediate_value
(
element
,
field
))
return
tmp
else
:
assert
False
,
'Dictionay or List expected get
%s
'
%
(
type
(
dict_part
),)
class
BlobWrapper
(
object
):
"""
Wrapper class to work easily with the blob and the information related to it
inside the *Reader
"""
def
__init__
(
self
,
blob
,
**
kw
):
self
.
__info
=
kw
self
.
blob
=
blob
def
__getattr__
(
self
,
name
):
"""Trick to access the information inside self.__info using dot syntax"""
try
:
return
self
.
__info
[
name
]
except
KeyError
:
raise
AttributeError
(
"
%r
object has no attribute
%r
"
%
(
type
(
self
)
.
__name__
,
name
))
class
CoolDict
(
dict
):
"""
C{dict} but it keeps track of which elements has been consumed/accessed
and which not
"""
def
__init__
(
self
,
*
args
,
**
kwargs
):
dict
.
__init__
(
self
,
*
args
,
**
kwargs
)
self
.
_consumed
=
{}
if
self
:
for
key
,
value
in
dict
.
iteritems
(
self
):
self
[
key
]
=
value
def
__getitem__
(
self
,
key
):
"""
As in C{dict} but in this case the key could be a compiled regular expression.
Also update the consumed list in case the item is not a list or other
dictionary.
@return: Like in C{dict.__getitem__} or, if a regular expression is used,
a list containing all the items inside the dictionary which key matches
the regular expression ([] if none)
"""
if
isinstance
(
key
,
re
.
_pattern_type
):
keys
=
filter
(
key
.
match
,
self
.
keys
())
values
=
[]
for
key
in
keys
:
value
=
dict
.
get
(
self
,
key
)
values
.
append
(
dict
.
get
(
self
,
key
))
if
not
isinstance
(
value
,
dict
)
and
not
isinstance
(
value
,
list
):
self
.
_consumed
[
key
]
=
True
return
values
else
:
try
:
value
=
dict
.
get
(
self
,
key
)
if
not
isinstance
(
value
,
dict
)
and
not
isinstance
(
value
,
list
):
self
.
_consumed
[
key
]
=
True
return
value
except
:
return
None
def
__setitem__
(
self
,
key
,
value
):
"""
As in C{dict} but in this case it takes care of updating the consumed
value for each element inside value depending on its type.
"""
if
isinstance
(
value
,
dict
):
dict
.
__setitem__
(
self
,
key
,
CoolDict
(
value
))
self
.
_consumed
[
key
]
=
self
[
key
]
.
_consumed
elif
isinstance
(
value
,
list
):
dict
.
__setitem__
(
self
,
key
,
CoolList
(
value
))
self
.
_consumed
[
key
]
=
self
[
key
]
.
_consumed
else
:
dict
.
__setitem__
(
self
,
key
,
value
)
self
.
_consumed
[
key
]
=
False
def
extend
(
self
,
key
,
value
):
"""
If the key is present inside the dictionary it creates a list (it not
present) and extends it with the new value. Almost as in C{list.extend}
"""
if
key
in
self
:
current_value
=
dict
.
get
(
self
,
key
)
if
not
isinstance
(
current_value
,
list
):
current_value
=
CoolList
([
current_value
])
current_value
.
append
(
value
)
value
=
current_value
self
[
key
]
=
value
def
iteritems
(
self
):
""" As in C{dict} but it updates the consumed value if needed"""
for
key
,
value
in
dict
.
iteritems
(
self
):
if
not
isinstance
(
value
,
dict
)
and
not
isinstance
(
value
,
list
):
self
.
_consumed
[
key
]
=
True
yield
key
,
value
raise
StopIteration
@property
def
consumed
(
self
):
for
key
,
value
in
self
.
_consumed
.
iteritems
():
if
not
isinstance
(
value
,
dict
)
and
not
isinstance
(
value
,
list
):
if
not
value
:
return
False
elif
not
dict
.
get
(
self
,
key
)
.
consumed
:
return
False
return
True
class
CoolList
(
list
):
"""
C{list} but it keeps track of which elements has been consumed/accessed and
which not
"""
def
__init__
(
self
,
*
args
,
**
kwargs
):
list
.
__init__
(
self
,
*
args
,
**
kwargs
)
self
.
_consumed
=
[]
if
self
:
for
i
,
value
in
enumerate
(
list
.
__iter__
(
self
)):
self
.
_consumed
.
append
(
None
)
self
[
i
]
=
value
def
__getitem__
(
self
,
index
):
"""As in C{list}, also update the consumed list in case the item is not
a dictionary or other list.
@return: Like in C{list.__getitem__}
"""
value
=
list
.
__getitem__
(
self
,
index
)
if
not
isinstance
(
value
,
dict
)
and
not
isinstance
(
value
,
list
):
self
.
_consumed
[
index
]
=
True
return
value
def
__setitem__
(
self
,
index
,
value
):
"""
As in C{list} but in this case it takes care of updating the consumed
value for each element inside value depending on its type
"""
if
isinstance
(
value
,
dict
):
list
.
__setitem__
(
self
,
index
,
CoolDict
(
value
))
self
.
_consumed
[
index
]
=
self
[
index
]
.
_consumed
elif
isinstance
(
value
,
list
):
list
.
__setitem__
(
self
,
index
,
CoolList
(
value
))
self
.
_consumed
[
index
]
=
self
[
index
]
.
_consumed
else
:
list
.
__setitem__
(
self
,
index
,
value
)
self
.
_consumed
[
index
]
=
False
def
__iter__
(
self
,
*
args
,
**
kwargs
):
""" As in C{dict} but it updates the consumed value if needed"""
for
index
,
value
in
enumerate
(
list
.
__iter__
(
self
)):
if
not
isinstance
(
value
,
dict
)
and
not
isinstance
(
value
,
list
):
self
.
_consumed
[
index
]
=
True
yield
value
raise
StopIteration
def
append
(
self
,
element
):
"""@see __setitem__() """
self
+=
[
None
]
self
.
_consumed
+=
[
None
]
self
[
len
(
self
)
-
1
]
=
element
@property
def
consumed
(
self
):
for
index
,
value
in
enumerate
(
self
.
_consumed
):
if
not
isinstance
(
value
,
dict
)
and
not
isinstance
(
value
,
list
):
if
not
value
:
return
False
elif
not
list
.
__getitem__
(
self
,
index
)
.
consumed
:
return
False
return
True
def
prepare_field_keys
(
field
,
write
=
False
):
"""
Helper function to split the name of the fields and the indexes in a
proper way to be used by eval function
@see: bibfield.get()
@param field: String containing all the names and indexes
@param write: If the fields are use to write inside the record then the
granularity is lower for convenience
@return: List of string that can be evaluated by eval function
"""
parts
=
field
.
split
(
'.'
)
keys
=
[]
for
part
in
parts
:
if
'['
in
part
:
if
write
:
keys
.
append
(
'["
%s
"]'
%
(
part
[:
part
.
find
(
'['
)]))
keys
.
append
(
part
[
part
.
find
(
'['
):]
.
replace
(
'n'
,
'-1'
))
else
:
keys
.
append
(
'["
%s
"]
%s
'
%
(
part
[:
part
.
find
(
'['
)],
part
[
part
.
find
(
'['
):]
.
replace
(
'n'
,
'-1'
)))
else
:
keys
.
append
(
'["
%s
"]'
%
part
)
return
keys
def
build_data_structure
(
record
,
field
):
"""
Helper functions that builds the record structure
@param record: Existing data structure
@param field: New field to add to the structure
"""
eval_string
=
''
for
key
in
prepare_field_keys
(
field
,
write
=
True
):
if
key
==
'[-1]'
:
try
:
eval
(
"record
%s
.append(None)"
%
(
eval_string
,))
# kwalitee: disable=eval
except
AttributeError
:
exec
(
"record
%s
=[None]"
%
(
eval_string
,))
elif
key
==
'[0]'
:
try
:
eval
(
"record
%s
"
%
(
eval_string
+
key
,))
# kwalitee: disable=eval
rec_part
=
eval
(
"record
%s
"
%
(
eval_string
,))
# kwalitee: disable=eval
if
not
isinstance
(
rec_part
,
list
):
pass
rec_part
.
insert
(
0
,
None
)
except
TypeError
:
exec
(
"record
%s
=list([None])"
%
(
eval_string
,))
else
:
try
:
eval
(
"record
%s
"
%
(
eval_string
+
key
,))
# kwalitee: disable=eval
except
KeyError
:
exec
(
"record
%s
=None"
%
(
eval_string
+
key
,))
except
TypeError
:
exec
(
"record
%s
={}"
%
(
eval_string
,))
exec
(
"record
%s
=None"
%
(
eval_string
+
key
,))
eval_string
+=
key
Event Timeline
Log In to Comment