diff --git a/gbids.py b/gbids.py
index 4d19b42..b46f72e 100644
--- a/gbids.py
+++ b/gbids.py
@@ -1,392 +1,393 @@
#! /usr/bin/env python
#
# gbids.py -- Front-end script for running Docker BIDS apps
# function over a large dataset.
#
# Copyright (C) 2018, 2019 S3IT, University of Zurich
#
# This program is free software: you can redistribute it and/or
# modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
"""
Front-end script for submitting multiple `BIDS apps` jobs fetching
docker images from the `BIDS apps` repository.
It uses the generic `gc3libs.cmdline.SessionBasedScript` framework.
See the output of ``gbids.py --help`` for program usage
instructions.
Example of docker execution:
docker run -i --rm -v /mnt/filo/data/ds005:/bids_dataset:ro bids/fmriprep /bids_dataset /outputs participant --participant_label 01
gbids takes BIDS files as input.
"""
# summary of user-visible changes
__changelog__ = """
2018-01-10:
* added support for freesurfer license file to be passed as part of the docker invocation. see: https://fmriprep.readthedocs.io/en/latest/installation.html#the-freesurfer-license
2017-04-18:
* Initial version
"""
__author__ = 'Sergio Maffioletti '
__docformat__ = 'reStructuredText'
__version__ = '1.0'
import os
import shutil
import subprocess
import tempfile
# GC3Pie specific libraries
import gc3libs
import gc3libs.exceptions
from gc3libs import Application
from gc3libs.workflow import RetryableTask
from gc3libs.cmdline import SessionBasedScript, existing_file, existing_directory
import gc3libs.utils
from gc3libs.quantity import GB
from gc3libs.utils import write_contents
# 3rd-party dependencies
from bids.grabbids import BIDSLayout
# Defaults
RUN_DOCKER = "./run_docker.sh"
MAX_MEMORY = 32*GB
DEFAULT_BIDS_FOLDER = "$PWD/data/"
DEFAULT_RESULT_FOLDER_LOCAL = "output"
DEFAULT_RESULT_FOLDER_REMOTE = "$PWD/output/"
DEFAULT_DOCKER_BIDS_ARGS = "--no-submm-recon"
DEFAULT_FREESURFER_LICENSE_FILE = "license.txt"
DEFAULT_DOCKER_BIDS_APP = "poldracklab/fmriprep " + DEFAULT_DOCKER_BIDS_ARGS
ANALYSIS_LEVELS = ["participant", "group1", "group2", "group"]
DOCKER_RUN_COMMAND = "sudo docker run -i --rm {DOCKER_MOUNT} {DOCKER_TO_RUN} /bids /output {ANALYSIS} "
COPY_COMMAND = "cp {0}/* {1} -Rf"
RUN_DOCKER_SCRIPT="""#!/bin/bash
echo "[`date`]: Start processing for subject {subject}"
group=`id -g -n`
sudo docker run -i --rm -v {data}:/bids -v {output}:/output {container} /bids /output {analysis} --participant_label {subject}
RET=$?
echo "fixing local filesystem permission"
sudo chown -R $USER:$group {output}
echo "[`date`]: Done with code $RET"
exit $RET
"""
# Utility methods
def _make_temp_run_docker_file(location, docker, data, output, analysis, subject):
"""
Create execution script to control docker execution and post-process
"""
try:
(fd, tmp_filename) = tempfile.mkstemp(prefix="sbj{0}-".format(subject),dir=location)
write_contents(tmp_filename, RUN_DOCKER_SCRIPT.format(data=data,
output=output,
container=docker,
analysis=analysis,
subject=subject))
os.chmod(tmp_filename, 0755)
return tmp_filename
except Exception, ex:
gc3libs.log.debug("Error creating execution script."
"Error type: %s. Message: %s" % (type(ex), ex.message))
raise
def _get_subjects(root_input_folder):
"""
build subject list form either input arguments (participant_label, participant_file) or
(if participant_label and participant_file are not specified) input data in bids_input_folder,
then remove subjects form list according to participant_exclusion_file (if any)
"""
layout = BIDSLayout(root_input_folder)
return [(os.path.abspath(os.path.join(root_input_folder, "sub-{}".format(subject))),
subject) for subject in layout.get_subjects()]
def _get_control_files(input_folder):
"""
return tuple ([list .json and .tsv files],[list of sub-folders])
Assumptions:
* each sub-folder contains a valid subject's data
* each .json and .tsv file found in root folder will be made available
to all Applications.
"""
return [os.path.abspath(os.path.join(input_folder, control)) for control in os.listdir(input_folder)
if control.endswith(".json") or control.endswith(".tsv")]
def _is_participant_analysis(analysis_level):
return analysis_level == ANALYSIS_LEVELS[0]
# Custom application class
class GbidsApplication(Application):
"""
Custom class to wrap the execution of the R scripts passed in src_dir.
"""
application_name = 'gbids'
def __init__(self, docker_run, subject, subject_name, control_files,
analysis_level, **extra_args):
executables = []
inputs = dict()
outputs = []
self.subject_dir = subject
self.subject_name = subject_name
self.data_output_dir = extra_args['data_output_dir']
if extra_args['transfer_data']:
# Input data need to be transferred to compute node
# include them in the `inputs` list and adapt
# container execution command
inputs[subject] = os.path.join(DEFAULT_BIDS_FOLDER,
os.path.basename(subject))
# add all control files to 'data' folder
for element in control_files:
inputs[element] = os.path.join(DEFAULT_BIDS_FOLDER,
os.path.basename(element))
inputs[extra_args['local_result_folder']] = DEFAULT_RESULT_FOLDER_REMOTE
outputs.append(DEFAULT_RESULT_FOLDER_REMOTE)
run_docker_input_data = DEFAULT_BIDS_FOLDER
run_docker_output_data = DEFAULT_RESULT_FOLDER_REMOTE
else:
# Use local filesystem as reference
# Define mount points
run_docker_input_data = subject
run_docker_output_data = extra_args['data_output_dir']
self.run_script = _make_temp_run_docker_file(extra_args['session'],
docker_run,
run_docker_input_data,
run_docker_output_data,
analysis_level,
subject_name)
inputs[self.run_script] = "./run_docker.sh"
executables.append(inputs[self.run_script])
Application.__init__(
self,
arguments="./run_docker.sh",
inputs=inputs,
outputs=outputs,
stdout='{0}.log'.format(subject_name),
join=True,
executables=executables,
**extra_args)
def terminated(self):
"""
checks exitcode. If out-of-memory is somehow detected (e.g. exit code 137)
try re-submit increasing memory allocation
:return: None
"""
if self.execution.returncode == 137:
if self.requested_memory and self.requested_memory < MAX_MEMORY:
self.requested_memory *= 4*GB
self.execution.returncode = (0, 99)
class GbidsRetriableTask(RetryableTask):
def __init__(self, subject, subject_name, control_files, docker_run,
freesurfer_license, **extra_args):
return GbidsApplication(subject, subject_name, control_files,
docker_run, freesurfer_license, **extra_args)
class GbidsScript(SessionBasedScript):
"""
The ``gbids`` command keeps a record of jobs (submitted, executed
and pending) in a session file (set name with the ``-s`` option); at
each invocation of the command, the status of all recorded jobs is
updated, output from finished jobs is collected, and a summary table
of all known jobs is printed.
Options can specify a maximum number of jobs that should be in
'SUBMITTED' or 'RUNNING' state; ``gbids`` will delay submission of
newly-created jobs so that this limit is never exceeded.
Once the processing of all chunked files has been completed, ``gbids``
aggregates them into a single larger output file located in
'self.params.output'.
"""
def __init__(self):
self.bids_app_execution = DEFAULT_DOCKER_BIDS_APP
SessionBasedScript.__init__(
self,
version=__version__,
application=GbidsApplication,
stats_only_for=GbidsApplication,
)
def setup_args(self):
self.add_param("bids_app", type=str,
help="Name of BIDS App to run. "
" Images are listed at: http://bids-apps.neuroimaging.io/apps/ ")
self.add_param("bids_input_folder", type=existing_directory,
help="Root location of input data. Note: expects folder in "
"BIDS format.")
self.add_param("bids_output_folder", type=str, help="Location of the "
" results.")
self.add_param("analysis_level", type=str,
help="analysis_level: participant: 1st level\n"
"group: second level. Bids-Apps specs allow for multiple substeps "
"(e.g., group1, group2."
"Allowed values: {0}.".format(ANALYSIS_LEVELS))
def setup_options(self):
self.add_param("-F", "--datatransfer", dest="transfer_data",
action="store_true", default=False,
help="Transfer input data to compute nodes. "
"If False, data will be assumed be already visible on "
"compute nodes - e.g. shared filesystem. "
"Default: %(default)s.")
self.add_param("-L", "--license", metavar="[PATH]",
type=existing_file,
dest="freesurfer_license", default=None,
help="Location of freesurfer license file. Default: %(default)s.")
def parse_args(self):
"""
Check for valid analysis level.
merge bids_app and related execution arguments
"""
assert self.params.analysis_level in ANALYSIS_LEVELS, "Unknown analysis level {0}. " \
"Allowed values are {1}".format(self.params.analysis_level,
ANALYSIS_LEVELS)
self.bids_app_execution = self.params.bids_app
self.params.bids_output_folder = os.path.abspath(self.params.bids_output_folder)
-
+ self.params.bids_input_folder = os.path.abspath(self.params.bids_input_folder)
+
def new_tasks(self, extra):
"""
if analysis type is 'group'
create single gbidsApplication with all input data
if analysis type is 'participants'
for each valid input file create a new GbidsApplication
"""
tasks = []
control_files = _get_control_files(self.params.bids_input_folder)
local_result_folder = os.path.join(self.session.path,
DEFAULT_RESULT_FOLDER_LOCAL)
for folder in [self.params.bids_output_folder]:
if not os.path.isdir(folder):
try:
os.mkdir(folder)
except OSError, osx:
gc3libs.log.error("Failed to create folder {0}. reason: '{1}'".format(folder,
osx))
if self.params.transfer_data and not os.path.isdir(local_result_folder):
os.mkdir(os.path.join(local_result_folder))
if _is_participant_analysis(self.params.analysis_level):
# participant level analysis
for (subject_dir, subject_name) in _get_subjects(self.params.bids_input_folder):
job_name = subject_name
extra_args = extra.copy()
if not self.params.transfer_data:
# Use root BIDS folder and set participant label for each task
subject_dir = self.params.bids_input_folder
if self.params.transfer_data:
extra_args['local_result_folder'] = local_result_folder
extra_args['session'] = self.session.path
extra_args['transfer_data'] = self.params.transfer_data
extra_args['jobname'] = job_name
extra_args['output_dir'] = os.path.join(os.path.abspath(self.session.path),
'.compute',
subject_name)
extra_args['data_output_dir'] = os.path.join(os.path.abspath(self.params.bids_output_folder),
subject_name)
extra_args['freesurfer_license'] = self.params.freesurfer_license
self.log.debug("Creating Application for subject {0}".format(subject_name))
tasks.append(GbidsApplication(
self.bids_app_execution,
subject_dir,
subject_name,
control_files,
self.params.analysis_level,
**extra_args))
else:
# Group level analysis
subject_name = self.params.analysis_level
extra_args = extra.copy()
extra_args['jobname'] = self.params.analysis_level
extra_args['data-transfer'] = self.params.transfer_data
extra_args['output_dir'] = os.path.join(self.params.bids_output_folder,
'.compute')
extra_args['data_output_dir'] = os.path.join(os.path.abspath(self.params.bids_output_folder),
subject_name)
extra_args['freesurfer_license'] = self.params.freesurfer_license
self.log.debug("Creating Application for analysis {0}".format(self.params.analysis_level))
tasks.append(GbidsApplication(
self.bids_app_execution,
self.params.bids_input_folder,
None,
control_files,
self.params.analysis_level,
**extra_args))
return tasks
def after_main_loop(self):
"""
Merge all results from all subjects into `results` folder
"""
for task in self.session:
if isinstance(task, GbidsApplication) and task.execution.returncode == 0:
gc3libs.log.debug("Moving tasks {0} results from {1} to {2}".format(task.subject_name,
task.data_output_dir,
self.params.bids_output_folder))
gc3libs.utils.movetree(task.data_output_dir, self.params.bids_output_folder)
# Cleanup data_output_dir folder
gc3libs.log.debug("Removing task data_output_dir '{0}'".format(task.data_output_dir))
shutil.rmtree(task.data_output_dir)
# run script, but allow GC3Pie persistence module to access classes defined here;
# for details, see: http://code.google.com/p/gc3pie/issues/detail?id=95
if __name__ == "__main__":
import gbids
gbids.GbidsScript().run()