Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F112282017
pipeline.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Fri, May 9, 15:16
Size
17 KB
Mime Type
text/x-python
Expires
Sun, May 11, 15:16 (2 d)
Engine
blob
Format
Raw Data
Handle
26068690
Attached To
R7078 singularity-pipeline
pipeline.py
View Options
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Pipeline, a wrapper around Singularity to build, run and test scientific pipelines."""
from
__future__
import
print_function
import
colorama
import
argparse
import
os
import
string
import
subprocess
import
sys
import
yaml
import
re
from
eprint
import
EPrint
SUPPORTED_VERSION
=
"2.4"
FORMAT_VERSION
=
1
class
Pipeline
():
"""Main Pipeline class."""
def
__init__
(
self
,
source
,
imagefile
=
None
,
eprint_instance
=
None
,
dry_run
=
False
):
"""Initialize a pipeline instance.
Requires a YAML-formatted description (string or file handle)."""
if
not
eprint_instance
:
eprint_instance
=
EPrint
()
self
.
eprint
=
eprint_instance
self
.
dry_run
=
dry_run
self
.
load_description
(
source
)
if
imagefile
:
self
.
imagefile
=
make_safe_filename
(
imagefile
)
else
:
self
.
imagefile
=
make_safe_filename
(
"{}-{}.img"
.
format
(
self
.
description
.
get
(
"name"
),
self
.
description
.
get
(
"version"
))
)
self
.
eprint
.
normal
(
"Target image file: {}
\n
"
.
format
(
self
.
imagefile
))
def
load_description
(
self
,
source
):
"""Load pipeline description from a file or a YAML string."""
self
.
eprint
.
bold
(
"# Loading pipeline description..."
)
try
:
self
.
description
=
yaml
.
safe_load
(
source
)
self
.
validate_description
(
self
.
description
)
bind_specs
=
self
.
description
.
get
(
"binds"
,
[])
self
.
binds
=
[
(
spec
.
split
(
":"
)[
0
],
spec
.
split
(
":"
)[
1
])
for
spec
in
bind_specs
]
self
.
eprint
.
normal
(
"Pipeline '{name}' version {version} loaded."
.
format
(
name
=
self
.
description
.
get
(
"name"
),
version
=
self
.
description
.
get
(
"version"
)
))
except
yaml
.
YAMLError
as
e
:
self
.
eprint
.
red
(
"
\n
Error parsing pipeline description: {0}"
.
format
(
e
))
raise
LoadError
()
except
FormatError
as
e
:
self
.
eprint
.
red
(
"
\n
Pipeline description error: {0}"
.
format
(
e
))
raise
LoadError
()
def
validate_description
(
self
,
description
):
"""Validate dict-parsed pipeline description."""
format_version
=
description
.
get
(
"format_version"
)
if
not
format_version
:
raise
FormatError
(
"Format version not specified; expected {}"
.
format
(
FORMAT_VERSION
))
elif
format_version
!=
FORMAT_VERSION
:
raise
FormatError
(
"Incompatible format version {}; expected {}"
.
format
(
format_version
,
FORMAT_VERSION
))
for
attribute
in
[
"name"
,
"version"
,
"build"
,
"run"
,
"test"
]:
if
not
description
.
get
(
attribute
):
raise
FormatError
(
"Missing attribute '{}'"
.
format
(
attribute
))
def
build
(
self
,
force
=
False
):
"""Build pipeline according to description."""
self
.
eprint
.
bold
(
"# Building pipeline...
\n
"
)
if
not
self
.
dry_run
:
if
os
.
path
.
exists
(
self
.
imagefile
):
if
force
:
self
.
eprint
.
normal
(
"Deleting existing image file {}."
.
format
(
self
.
imagefile
))
os
.
remove
(
self
.
imagefile
)
else
:
self
.
eprint
.
yellow
(
"Image file {} already exists! Skipping build."
.
format
(
self
.
imagefile
))
return
credentials
=
self
.
description
.
get
(
"build"
)
.
get
(
"credentials"
)
if
credentials
:
if
credentials
.
get
(
"username"
):
os
.
environ
[
"SINGULARITY_DOCKER_USERNAME"
]
=
credentials
.
get
(
"username"
)
if
credentials
.
get
(
"password"
):
os
.
environ
[
"SINGULARITY_DOCKER_PASSWORD"
]
=
credentials
.
get
(
"password"
)
build_type
=
self
.
description
.
get
(
"build"
)
.
get
(
"type"
)
source
=
self
.
description
.
get
(
"build"
)
.
get
(
"source"
)
options
=
self
.
description
.
get
(
"build"
)
.
get
(
"options"
,
""
)
size
=
self
.
description
.
get
(
"build"
)
.
get
(
"size"
)
if
size
:
size
=
"--size {}"
.
format
(
size
)
if
build_type
==
"pull"
:
build_calls
=
[
"singularity pull {size} {options} --name {image} {source}"
]
elif
build_type
==
"bootstrap"
:
build_calls
=
[
"singularity create -F {size} {image}"
,
"sudo singularity bootstrap {options} {image} {source}"
]
elif
build_type
==
"docker2singularity"
:
build_calls
=
[
"sudo docker build -t {docker_name} -f {source} ."
,
(
"sudo docker run -v /var/run/docker.sock:/var/run/docker.sock -v $(pwd):/output "
"--privileged -t --rm singularityware/docker2singularity {docker_name}"
),
"mv {docker_name}-*.img {image}"
]
elif
build_type
==
"custom"
:
build_calls
=
self
.
description
.
get
(
"build"
)
.
get
(
"commands"
)
else
:
raise
NotImplementedError
(
"Build type {} not implemented."
.
format
(
build_type
))
ret_code
,
_
=
self
.
__run_batch
(
build_calls
,
{
"source"
:
source
,
"options"
:
options
,
"size"
:
size
,
"docker_name"
:
make_safe_filename
(
make_safe_filename
(
self
.
description
.
get
(
"name"
),
lower
=
True
))
})
if
ret_code
:
raise
RuntimeError
(
"Singularity build failed (exit code {})"
.
format
(
ret_code
))
if
self
.
dry_run
:
self
.
eprint
.
bold
(
"# Dry-run of building image {} complete.
\n
"
.
format
(
self
.
imagefile
))
else
:
self
.
eprint
.
bold
(
"# Successfully built image {}.
\n
"
.
format
(
self
.
imagefile
))
def
run
(
self
):
"""Run built pipeline according to description."""
self
.
eprint
.
bold
(
"# Running pipeline...
\n
"
)
if
not
self
.
dry_run
:
if
os
.
path
.
isfile
(
self
.
imagefile
):
raise
RuntimeError
(
"Image {} does not exist"
.
format
(
self
.
imagefile
))
for
spec
in
self
.
binds
:
if
not
os
.
path
.
isdir
(
spec
[
0
]):
raise
RuntimeError
(
"Bind source {} does not exist"
.
format
(
spec
[
0
]))
commands
=
self
.
description
.
get
(
"run"
)
.
get
(
"commands"
)
ret_code
,
step
=
self
.
__run_batch
(
commands
)
if
ret_code
:
raise
RuntimeError
(
"Singularity run failed (step {}, exit code {})"
.
format
(
step
+
1
,
ret_code
))
if
self
.
dry_run
:
self
.
eprint
.
bold
(
"# Dry-run of running image {} complete.
\n
"
.
format
(
self
.
imagefile
))
else
:
self
.
eprint
.
bold
(
"# Successfully ran {}.
\n
"
.
format
(
self
.
description
.
get
(
"name"
)))
def
test
(
self
,
force
=
False
,
skip_run
=
False
):
"""Run defined tests against the pipeline according to description."""
self
.
eprint
.
bold
(
"# Testing pipeline...
\n
"
)
test_files
=
self
.
description
.
get
(
"test"
)
.
get
(
"test_files"
)
if
not
self
.
__check_files_exist
(
test_files
)
or
force
:
self
.
eprint
.
bold
(
"(Re)creating test files..."
)
test_prepare
=
self
.
description
.
get
(
"test"
)
.
get
(
"prepare_commands"
)
ret_code
,
step
=
self
.
__run_batch
(
test_prepare
)
if
not
self
.
dry_run
and
not
self
.
__check_files_exist
(
test_files
):
raise
RuntimeError
(
"Test files not generated by prepare commands"
)
else
:
self
.
eprint
.
yellow
(
"Test files already exist and will be reused.
\n
"
)
if
skip_run
:
self
.
eprint
.
bold
(
"# Skipping run stage.
\n
"
)
else
:
self
.
run
()
self
.
eprint
.
bold
(
"# Running validation stage...
\n
"
)
test_validate
=
self
.
description
.
get
(
"test"
)
.
get
(
"validate_commands"
)
ret_code
,
step
=
self
.
__run_batch
(
test_validate
)
if
ret_code
:
raise
RuntimeError
(
"Singularity test validation failed (step {}, exit code {})"
.
format
(
step
+
1
,
ret_code
))
if
self
.
dry_run
:
self
.
eprint
.
bold
(
"# Dry-run of validating image {} complete.
\n
"
.
format
(
self
.
imagefile
))
else
:
self
.
eprint
.
bold
(
"# Pipeline {} validated successfully!
\n
"
.
format
(
self
.
imagefile
))
def
__run_batch
(
self
,
commands
,
substitutions
=
{}):
if
not
isinstance
(
commands
,
list
):
raise
FormatError
(
"Run commands must be a list"
)
subs
=
self
.
substitution_dictionary
(
**
substitutions
)
if
self
.
description
.
get
(
"substitutions"
):
subs
.
update
(
self
.
description
.
get
(
"substitutions"
))
action
=
"Executing"
if
self
.
dry_run
:
action
=
"Displaying"
self
.
eprint
.
yellow
(
"DRY RUN: Commands only displayed, not run.
\n
"
)
for
step
,
command
in
enumerate
(
commands
):
command
=
command
.
format
(
**
subs
)
self
.
eprint
.
bold
(
"{action} step {step}:
\n
{command}
\n
"
.
format
(
action
=
action
,
step
=
step
+
1
,
command
=
command
))
if
not
self
.
dry_run
:
ret_code
=
subprocess
.
call
(
command
,
shell
=
True
)
if
ret_code
:
return
ret_code
,
step
+
1
# Failure code + step number
return
0
,
0
# No failures
def
check
(
self
):
"""Validate the pipeline description file."""
self
.
eprint
.
bold
(
"# Checking pipeline file!
\n
"
)
raise
NotImplementedError
(
"Checking still in the works."
)
def
__bind_flags
(
self
):
"""Return all bind flags for singularity as a string.
Will contain trailing space if non-empty."""
bind_flags
=
""
if
len
(
self
.
binds
):
for
spec
in
self
.
binds
:
bind_flags
+=
"-B {source}:{dest} "
.
format
(
source
=
spec
[
0
],
dest
=
spec
[
1
])
return
bind_flags
def
check_binds_exist
(
self
):
"""Check that all source folders in binds exist."""
binds
=
list
(
self
.
description
.
get
(
"binds"
))
if
binds
is
not
None
:
return
self
.
__check_files_exist
(
[
bind
.
split
(
":"
)[
0
]
for
bind
in
binds
]
)
def
__check_files_exist
(
self
,
file_list
):
"""Check that a list of files/folders exists."""
if
not
file_list
:
# None
return
True
if
not
isinstance
(
file_list
,
list
):
raise
FormatError
(
"List of files to check is not a list"
)
for
file
in
file_list
:
if
not
os
.
path
.
exists
(
file
):
return
False
return
True
def
substitution_dictionary
(
self
,
**
extra
):
"""Compile a dictionary of substitutions to be passed to .format() for shell commands.
extra - Addidtional substitutions to include.
"""
subs
=
extra
.
copy
()
if
self
.
description
.
get
(
"substitutions"
):
subs
.
update
(
self
.
description
.
get
(
"substitutions"
))
subs
[
"image"
]
=
self
.
imagefile
subs
[
"binds"
]
=
self
.
__bind_flags
()
subs
[
"exec"
]
=
"singularity exec {binds}{image}"
.
format
(
**
subs
)
subs
[
"run"
]
=
"singularity run {binds}{image}"
.
format
(
**
subs
)
return
subs
def
check_singularity
():
"""Check that Singularity is installed and is of >= SUPPORTED_VERSION version"""
def
to_int
(
s
):
"""Converts string to int, or 0 if not convertible"""
try
:
return
int
(
s
)
except
ValueError
:
return
0
def
compare_version
(
test
,
target
):
"""Compare that test version (loose semver format) is >= target version."""
# Can't use e.g. semver as Singularity is not following SemVer spec
test_array
=
list
(
map
(
to_int
,
re
.
split
(
"[\.-]"
,
test
)))
target_array
=
list
(
map
(
to_int
,
re
.
split
(
"[\.-]"
,
target
)))
return
test_array
>=
target_array
try
:
version
=
subprocess
.
check_output
([
"singularity"
,
"--version"
])
.
strip
()
if
not
compare_version
(
version
,
SUPPORTED_VERSION
):
raise
ToolError
(
"Singularity version {} is less than minimum supported ({})"
.
format
(
version
,
SUPPORTED_VERSION
))
except
subprocess
.
CalledProcessError
as
e
:
raise
ToolError
(
e
.
output
)
except
OSError
as
e
:
raise
ToolError
(
e
.
strerror
)
except
ValueError
as
e
:
raise
ToolError
(
"Unexpected format for Singularity version string ({})"
.
format
(
version
))
def
__main
():
"""Main method to be called when running directly.
Expects CLI arguments."""
colorama
.
init
()
eprint
=
EPrint
()
args
=
parse_args
(
sys
.
argv
[
1
:])
if
args
.
command
==
"template"
:
print
(
template_pipeline
)
exit
(
0
)
try
:
check_singularity
()
except
ToolError
as
e
:
eprint
.
red
(
"Error when running `singularity`: {}"
.
format
(
e
.
error
))
eprint
.
yellow
(
"Check your Singularity installation!"
)
sys
.
exit
(
1
)
try
:
try
:
with
open
(
args
.
pipeline
)
as
f
:
pipeline
=
Pipeline
(
f
,
imagefile
=
args
.
image
,
eprint_instance
=
eprint
,
dry_run
=
args
.
dry_run
)
except
IOError
as
e
:
eprint
.
red
(
"
\n
Cannot open pipeline description {0}: {1}"
.
format
(
args
.
pipeline
,
e
.
strerror
))
raise
LoadError
()
except
LoadError
:
eprint
.
yellow
(
"
\n
Unable to load pipeline description. Aborting."
)
sys
.
exit
(
1
)
try
:
if
args
.
command
==
"build"
:
pipeline
.
build
(
force
=
args
.
force
)
elif
args
.
command
==
"run"
:
pipeline
.
run
()
elif
args
.
command
==
"test"
:
pipeline
.
test
(
force
=
args
.
force
,
skip_run
=
args
.
skip_run
)
elif
args
.
command
==
"check"
:
pipeline
.
check
()
else
:
raise
RuntimeError
(
"Unknown command specified"
)
except
RuntimeError
as
e
:
eprint
.
red
(
"ERROR: {}"
.
format
(
e
))
sys
.
exit
(
1
)
def
parse_args
(
args
):
parser
=
argparse
.
ArgumentParser
(
formatter_class
=
argparse
.
RawDescriptionHelpFormatter
,
description
=
"Pipeline, a wrapper around Singularity to build, run and test scientific pipelines."
)
parser
.
add_argument
(
"command"
,
help
=
"Command to execute"
,
choices
=
[
'build'
,
'run'
,
'test'
,
'check'
,
'template'
]
)
parser
.
add_argument
(
"-p"
,
"--pipeline"
,
default
=
"pipeline.yaml"
,
help
=
"Pipeline description file (default: '
%(default)s
')"
)
parser
.
add_argument
(
"-i"
,
"--image"
,
help
=
"Singularity image file (default: as defined in pipeline description)"
)
parser
.
add_argument
(
"-f"
,
"--force"
,
action
=
"store_true"
,
help
=
"Force rebuilding the image or test data for validation (default: no)"
)
parser
.
add_argument
(
"--no-bind"
,
action
=
"store_true"
,
help
=
"Omit runtime bind arguments (default: no). Useful in cases when user-supplied path binding is not allowed."
)
parser
.
add_argument
(
"--skip-run"
,
action
=
"store_true"
,
help
=
"For testing, skip the run phase, only validating existing output (default: no)"
)
parser
.
add_argument
(
"--dry-run"
,
action
=
"store_true"
,
help
=
"Output the intended command sequence without executing it (default: no)"
)
return
parser
.
parse_args
(
args
)
def
make_safe_filename
(
name
,
lower
=
False
):
"""Convert filename-unsafe characters to '_'.
Parameter:
lower (bool) - whether to lowercase the filename."""
safe
=
string
.
ascii_letters
+
string
.
digits
+
"_-./"
if
lower
:
name
=
name
.
lower
()
return
""
.
join
(
map
(
lambda
c
:
c
if
(
c
in
safe
)
else
'_'
,
name
))
class
LoadError
(
RuntimeError
):
"""Exception class for failed file load or parse."""
pass
class
FormatError
(
ValueError
):
"""Exception class for non-conforming file format."""
def
__init__
(
self
,
error
):
"""Store specific error description."""
self
.
error
=
error
def
__str__
(
self
):
"""Print out specific error description as string representation."""
return
self
.
error
class
ToolError
(
RuntimeError
):
"""Exception class for unexpected response from external tools."""
def
__init__
(
self
,
error
):
"""Store specific error description."""
self
.
error
=
error
def
__str__
(
self
):
"""Print out specific error description as string representation."""
return
self
.
error
template_pipeline
=
r'''## Those will be used in default image name
format_version: 1
name: CowSay
version: 1
## Those are purely informative at the moment
author: Alexander Kashev
author_org: UniBe
source:
## Extra substitutions for commands
## {image} is always available; in some contexts,
## To use literal {foo} in commands, double the braces: {{foo}}
substitutions:
text: "Moo"
## Bind specifications (souce:destination) to be passed to Singularity
binds:
- "/var/tmp:/var/tmp"
## Build instructions
build:
## Currently supported: bootstrap (will run sudo), pull, docker2singularity, custom
type: pull
## Size in MB; optional for pull
size: 512
## Extra options to pass to corresponding singularity build command; string
# options: "--some-option"
## For bootstrap, should be a local Singularity file
## For pull, shub / docker URL
## For docker2singularity, should be a local Dockerfile file
source: docker://chuanwen/cowsay
## Only for build type "custom".
## Additional substitutions: {source}, {size} (as "--size XXX") and {options}
# commands:
# - "singularity ..."
## Credentials for docker regsiteries
## Passed to singularity as environment variables
# credentials:
# username: foo
# password: bar
## Run instructions
run:
## An array of scripts to be executed in shell
## Preset substitutions:
## * {exec} for "singularity exec [-B <bind specification>] <image name>"
## * {run} for "singularity run [-B <bind specification>] <image name>"
## * {binds} for "[-B <bind specification>]"
## * {image} for container file name
## will be substituted; for literal {} (e.g. shell) use {{}}
commands:
- "{exec} /usr/games/cowsay {text} > cowsay.txt 2> /dev/null"
## Test instructions
test:
## Files required for testing; will run prepare_commands if any doesn't exist or --force specified
test_files:
- cowsay.md5
## An array of scripts to be executed in shell to prepare test_files
prepare_commands:
- "echo '548c5e52a6c1abc728a6b8e27f5abdd4 cowsay.txt' > cowsay.md5"
## An array of scripts to be executed in shell after running
validate_commands:
- "md5sum -c cowsay.md5"'''
if
__name__
==
"__main__"
:
__main
()
Event Timeline
Log In to Comment