Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F74876356
bibencode_utils.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Tue, Jul 30, 04:23
Size
12 KB
Mime Type
text/x-python
Expires
Thu, Aug 1, 04:23 (2 d)
Engine
blob
Format
Raw Data
Handle
19446510
Attached To
R3600 invenio-infoscience
bibencode_utils.py
View Options
# -*- coding: utf-8 -*-
##
## This file is part of Invenio.
## Copyright (C) 2011 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""BibEncode helper functions.
Functions that are used throughout the BibEncode module
"""
import
os
import
subprocess
import
unicodedata
import
re
import
sys
import
time
import
uuid
from
invenio.bibencode_config
import
(
CFG_BIBENCODE_FFMPEG_PROBE_LOG
,
CFG_BIBENCODE_FFMPEG_PROBE_COMMAND
,
CFD_BIBENCODE_FFMPEG_OUT_RE_CONFIGURATION
,
CFG_BIBENCODE_FFMPEG_CONFIGURATION_REQUIRED
,
CFG_BIBENCODE_MEDIAINFO_COMMAND
)
from
invenio.config
import
CFG_PATH_FFPROBE
## The timestamp for the process. Used to identify Logfiles.
def
generate_timestamp
():
""" Generates a timestamp for the logfile to make it unique
"""
return
"
%s
-
%s
"
%
(
time
.
strftime
(
"%Y%m
%d
%H%M%S"
,
time
.
gmtime
()),
str
(
uuid
.
uuid4
()))
## The following functions are for ffmpeg specific timecodes
## The format is HH:MM:SS.ss
def
timecode_to_seconds
(
timecode
):
""" Converts a timecode to a total duration in seconds
"""
if
type
(
timecode
)
==
type
(
str
()):
try
:
hours
,
minutes
,
seconds
=
timecode
.
split
(
':'
)
total_seconds
=
int
(
hours
)
*
3600
+
int
(
minutes
)
*
60
+
float
(
seconds
)
except
ValueError
:
raise
ValueError
(
timecode
+
" is not a valid timecode, the format is hh:mm:ss.ss or hh:mm:ss"
)
else
:
raise
ValueError
(
timecode
+
" is not a valid timecode, the format is hh:mm:ss.ss or hh:mm:ss"
)
return
total_seconds
def
seconds_to_timecode
(
total_seconds
):
""" Converts seconds to a timecode
"""
## Cast to float
if
(
type
(
total_seconds
)
==
type
(
int
())
or
type
(
total_seconds
)
==
type
(
float
())
or
type
(
total_seconds
)
==
type
(
str
())
):
try
:
total_seconds
=
float
(
total_seconds
)
except
ValueError
:
ValueError
(
"string must be of format '1.1' or '1'"
)
hours
=
int
(
total_seconds
/
3600
)
minutes
=
int
(
total_seconds
/
60
)
-
hours
*
60
seconds
=
total_seconds
%
60
else
:
raise
TypeError
(
"seconds must be given as integer or float or string values"
)
return
"
%02d
:
%02d
:
%05.2f
"
%
(
hours
,
minutes
,
seconds
)
def
is_timecode
(
value
):
""" Checks if the given string is a timecode
"""
if
type
(
value
)
==
type
(
str
())
or
type
(
value
)
==
type
(
unicode
()):
pattern
=
re
.
compile
(
"^\d\d:\d\d:\d\d(\.\d+)?$"
)
if
pattern
.
match
(
value
):
return
True
else
:
return
False
else
:
return
False
def
is_seconds
(
value
):
""" Checks if the given value represents seconds
Integer, Floats and Strings of the right format are valid
"""
if
type
(
value
)
==
type
(
float
())
or
type
(
value
)
==
type
(
int
()):
return
True
elif
type
(
value
)
==
type
(
str
()):
pattern
=
re
.
compile
(
"^\d+(\.\d+)?$"
)
if
pattern
.
match
(
value
):
return
True
else
:
return
False
else
:
return
False
## Try to parse anything to unicode
## http://www.codigomanso.com/en/2010/05/una-de-python-force_unicode/
def
force_unicode
(
seq
,
encoding
=
'utf-8'
,
errors
=
'ignore'
):
"""
Returns a unicode object representing 'seq'. Treats bytestrings using the
'encoding' codec.
"""
import
codecs
if
seq
is
None
:
return
''
try
:
if
not
isinstance
(
seq
,
basestring
,):
if
hasattr
(
seq
,
'__unicode__'
):
seq
=
unicode
(
seq
)
else
:
try
:
seq
=
unicode
(
str
(
seq
),
encoding
,
errors
)
except
UnicodeEncodeError
:
if
not
isinstance
(
seq
,
Exception
):
raise
# If we get to here, the caller has passed in an Exception
# subclass populated with non-ASCII data without special
# handling to display as a string. We need to handle this
# without raising a further exception. We do an
# approximation to what the Exception's standard str()
# output should be.
seq
=
' '
.
join
([
force_unicode
(
arg
,
encoding
,
errors
)
for
arg
in
seq
])
elif
not
isinstance
(
seq
,
unicode
):
# Note: We use .decode() here, instead of unicode(seq, encoding,
# errors), so that if seq is a SafeString, it ends up being a
# SafeUnicode at the end.
seq
=
seq
.
decode
(
encoding
,
errors
)
except
UnicodeDecodeError
,
e
:
if
not
isinstance
(
seq
,
Exception
):
raise
UnicodeDecodeError
(
seq
,
*
e
.
args
)
else
:
# If we get to here, the caller has passed in an Exception
# subclass populated with non-ASCII bytestring data without a
# working unicode method. Try to handle this without raising a
# further exception by individually forcing the exception args
# to unicode.
seq
=
' '
.
join
([
force_unicode
(
arg
,
encoding
,
errors
)
for
arg
in
seq
])
return
seq
def
normalize_string
(
value
):
"""
Normalizes string, converts to lowercase, removes non-alpha characters,
and converts spaces to hyphens.
Returns a unicode object
"""
value
=
force_unicode
(
value
)
value
=
unicodedata
.
normalize
(
'NFKD'
,
value
)
.
encode
(
'ascii'
,
'ignore'
)
value
=
unicode
(
re
.
sub
(
'[^\w\s-]'
,
''
,
value
)
.
strip
()
.
lower
())
return
re
.
sub
(
'[-\s]+'
,
'-'
,
value
)
def
filename_convert
(
input_file
):
""" Converts /foo/bar/Example.mp4 to Example_mp4
Used for the generation of log filenames
"""
path
,
filename
=
os
.
path
.
split
(
input_file
)
fname
,
ext
=
os
.
path
.
splitext
(
filename
)
return
fname
+
"_"
+
ext
[
1
:]
def
get_probe_log_filename
(
input_file
):
""" generates the filename for the ffprobe logfile
"""
return
CFG_BIBENCODE_FFMPEG_PROBE_LOG
%
(
filename_convert
(
input_file
))
def
get_lines_from_probe_log
(
input_file
):
""" Probes the file using FFprobe and returns the lines of the probe log
This will create a log file.
"""
## Create Log file
log_file
=
open
(
get_probe_log_filename
(
input_file
),
'w'
)
## Build command for ffprobe execution
command
=
(
CFG_BIBENCODE_FFMPEG_PROBE_COMMAND
%
input_file
)
.
split
()
## Start process and wait until it finishes
process
=
subprocess
.
Popen
(
command
,
stderr
=
log_file
)
returncode
=
process
.
wait
()
## If the process ends normal parse log file
if
returncode
==
0
:
## Read the Log
log_file
=
open
(
get_probe_log_filename
(
input_file
))
finput
=
log_file
.
read
()
lines
=
finput
.
splitlines
()
log_file
.
close
()
return
lines
## If there was a problem during execution
if
returncode
==
-
15
or
returncode
==
1
:
return
None
else
:
return
None
## Simple function to receive ffprobe results
def
probe
(
input_file
,
parsable
=
False
):
""" Probes the file using FFprobe and returns the output as a string
"""
command
=
(
CFG_BIBENCODE_FFMPEG_PROBE_COMMAND
%
input_file
)
.
split
()
process
=
subprocess
.
Popen
(
command
,
stderr
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
)
returncode
=
process
.
wait
()
if
returncode
==
0
:
if
not
parsable
:
return
process
.
communicate
()[
1
]
else
:
return
process
.
communicate
()[
0
]
else
:
return
None
def
mediainfo
(
input_file
):
""" Receives XML output from mediainfo CLI
"""
command
=
(
CFG_BIBENCODE_MEDIAINFO_COMMAND
%
input_file
)
.
split
()
process
=
subprocess
.
Popen
(
command
,
stderr
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
)
returncode
=
process
.
wait
()
if
returncode
==
0
:
return
process
.
communicate
()[
0
]
else
:
return
None
def
check_ffmpeg_configuration
():
""" Uses ffprobe to check if ffmpeg is compiled with the right
options to integrate in BibEncode
@return: Returns a list of missing options
@rtype: set
"""
## Use ffprobe to get the current ffmpeg configuration
try
:
process
=
subprocess
.
Popen
(
CFG_PATH_FFPROBE
,
stderr
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
)
except
OSError
:
return
[
"FFMPEG/FFPROBE does not seem to be installed!"
]
returncode
=
process
.
wait
()
## This reads the stream from PIPE
output
=
process
.
communicate
()[
1
]
## find the configuration text inside ffprobes output
## and parse all the configuration arguments
## 'options' is a list configuration flags
options
=
CFD_BIBENCODE_FFMPEG_OUT_RE_CONFIGURATION
.
findall
(
output
)
## check if the neccessary configuration is availables
## This should be at least --enable-libvpx, ...libtheora, ...libvorbis
## ...gpl, ..version3, ...nonfree,
## libx264 and libfaac should be recommended in the manual but with
## regards about the licensing and patenting issues
## !!! Warn: For old Python versions, there is the sets module
## For newer ones, this is deprecated, set is a build in type now
if
sys
.
version_info
<
(
2
,
6
):
import
sets
o
=
sets
.
Set
(
options
)
s
=
sets
.
Set
(
CFG_BIBENCODE_FFMPEG_CONFIGURATION_REQUIRED
)
if
not
s
.
issubset
(
o
):
return
o
.
difference
(
s
)
else
:
if
not
set
(
CFG_BIBENCODE_FFMPEG_CONFIGURATION_REQUIRED
)
.
issubset
(
options
):
return
set
(
CFG_BIBENCODE_FFMPEG_CONFIGURATION_REQUIRED
)
.
difference
(
options
)
def
check_mediainfo_configuration
():
""" Checks if mediainfo lib is installed
@return: Returns a list of missing options or simply False if no missings
@rtype: set
"""
try
:
process
=
subprocess
.
Popen
(
'mediainfo'
,
stderr
=
subprocess
.
PIPE
,
stdout
=
subprocess
.
PIPE
)
except
OSError
:
return
[
"MEDIAINFO does not seem to be installed!"
]
return
False
def
getval
(
dictionary
,
key
,
fallback
=
None
):
""" Returns a value from a dict. If the key doesn't exist, returns fallback
@param dictionary: a dictionary with the value to access
@type dictionary: dict
@param key: key of the value to access
@type key: object
@param fallback: a fallback value if the key does not exist
@type fallback: object
@return: the value of the key or the fallback
@rtype: object
"""
if
type
(
dictionary
)
==
type
(
dict
())
and
key
in
dictionary
:
return
dictionary
[
key
]
else
:
return
fallback
def
chose
(
primary
,
fallback_key
,
fallback_dict
):
""" Returns a fallback from a dictionary if the primary is not available
@param primary: value to take first
@type primary: object
@param fallback_key: key of the fallback
@type fallback_key: object
@param fallback_dict: the dictinary where the fallback key is stored
@type fallback_dict: dict
@return: primary, fallback value or None
@rtype: object
"""
if
not
primary
:
return
getval
(
fallback_dict
,
fallback_key
)
else
:
return
primary
def
chose2
(
key
,
primary_dict
,
fallback_dict
):
""" Tries to receive a key from one dictionary and falls back to another
"""
return
getval
(
primary_dict
,
key
,
getval
(
fallback_dict
,
key
))
def
aspect_string_to_float
(
aspect_as_string
):
"""
Transforms a string containing an aspect ratio to a float
@param aspect_as_string: Aspect ratio as a String eg '16:9'
@type aspect_as_string: string
@return: Aspect ratio as a float eg '1.77777'
@rtype: float
"""
try
:
aspect_x
,
aspect_y
=
aspect_as_string
.
split
(
':'
)
aspect_x
=
float
(
aspect_x
)
aspect_y
=
float
(
aspect_y
)
except
:
raise
return
aspect_x
/
aspect_y
Event Timeline
Log In to Comment