diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c9b568f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.pyc +*.swp diff --git a/README.md b/README.md new file mode 100644 index 0000000..a7d09bc --- /dev/null +++ b/README.md @@ -0,0 +1,102 @@ +######################################################################### +###### Welcome to Gbids + A gc3pie app to parallelize bids apps ########### +######################################################################### + +######################################## +### Install standalone gbids app ######## +######################################### + +Install gc3pie and gbids. + +# Grab the latest master branch +$ wget https://raw.githubusercontent.com/uzh/gc3pie/master/install.py +# Runt the script +$ python ~/install.py --develop -y +# Activate the virtualenv and generate a gc3pie.conf file +$ source ~/gc3pie/bin/activate + gservers # this will generate a config file ~/.gc3/gc3pie.conf +# Install additional python packages +$ source ~/gc3pie/bin/activate && pip install -r requirements.txt +# Install gbids +$ cd ~ && git clone https://c4science.ch/source/gbids.git +# Make sure debian requied packages are installed +$ sudo apt-get update && sudo apt-get install -y +- libffi-dev +- libssl-dev +- python-dev + +######################################## +### Run gbids ########################## +######################################### + +Remember to: + +-)configure your gc3pie.conf file +-)identify yourself with your cloud provider +-)activate your virtualenv +-)export the gibids directory to the pythonpath: e.g. cd path/to/gbids && export PYTHONPATH=$PWD + +Run gbids at different level: + +Participant level + $ python gbids.py {docker image} path/to/local/input_dir/ path/to/local/output_dir/ participant -s {session name} -N [options] -vvvv + +Group level + $ python gbids.py {docker image} path/to/local/input_dir/ path/to/local/output_dir/ group -s {session name} -N [options] -vvvv + +Note: + The current gbids app supports bids apps that require FREESURFER license only. Use -FL option to pass the license file: + + $ python gbids.py {docker image} path/to/local/input_dir/ path/to/local/output_dir/ -FL path/to/freesurfer_license.txt participant -s {session name} -N [options] -vvvv + +######################################## +### Gbids modes ####################### +######################################### + +Gbids can be used in TRANSFER or FILE-SERVER mode. + +-) TRANSFER mode. + + - description + + The data are located at the control node (where the gbids app is located) and they will be transferred to the job running nodes. + + - example + + To use the transfer mode use -F flag and call the gbids app as follow: + + $ python gbids.py {docker image} path/to/local/input_dir/ path/to/local/output_dir/ participant -s gs -N -F -vvvv + +-) FILE-SERVER mode. + + - description + + The data are located on a file-server. The file-sever, e.g. a NFS server needs to export the directory containing the data to all nodes, i.e. the control node as well as the job running + nodes. In order for those data to be used by the running nodes the gc3pie.conf file needs to be modified accordingly. + + - example + + Assuming you are using an ubuntu image and on your control machine the NFS server is mounted at "/home/ubuntu/mnt/" you can add the following code at the end of the gc3pie.conf file: + + gbids_user_data = !/bin/bash + apt-get update + apt-get install -y nfs-common + mkdir -p /home/ubuntu/mnt/ + chown -R 1000:1000 /home/ubuntu/mnt/ + mount -t nfs "CHANGE_WITH_YOUR_NFS_IP_ADDRESS":/data /home/ubuntu/mnt/ + + This script assumes that the data are located on the NFS server under "/data" directory. You can use the local filesystem as reference, i.e to mirror your local mount point path to the + control job running machine. + + To use a file-server mode call the gbids app as follow: + + $ python gbids.py {docker image} path/to/local/input_dir/ path/to/local/output_dir/ participant -s gs -N -vvvv + + +######################################## +### Deploy gbids environment ######## +######################################### + +It is possible to set up a ready-to-go environment, with multiple virtual machines (vm), e.g. a gbids control vm, a NFS server where to store your data and a private docker-registry +where to store your own private docker images. Currently this solution has been tested on ScienceCloud. Please contact us at if you are interested in this solution. diff --git a/gbids.py b/gbids.py index b46f72e..754ac1d 100644 --- a/gbids.py +++ b/gbids.py @@ -1,393 +1,451 @@ #! /usr/bin/env python # # gbids.py -- Front-end script for running Docker BIDS apps # function over a large dataset. # # Copyright (C) 2018, 2019 S3IT, University of Zurich # # This program is free software: you can redistribute it and/or # modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # """ Front-end script for submitting multiple `BIDS apps` jobs fetching docker images from the `BIDS apps` repository. It uses the generic `gc3libs.cmdline.SessionBasedScript` framework. See the output of ``gbids.py --help`` for program usage instructions. Example of docker execution: docker run -i --rm -v /mnt/filo/data/ds005:/bids_dataset:ro bids/fmriprep /bids_dataset /outputs participant --participant_label 01 gbids takes BIDS files as input. """ # summary of user-visible changes __changelog__ = """ 2018-01-10: * added support for freesurfer license file to be passed as part of the docker invocation. see: https://fmriprep.readthedocs.io/en/latest/installation.html#the-freesurfer-license 2017-04-18: * Initial version """ -__author__ = 'Sergio Maffioletti ' +__author__ = 'Sergio Maffioletti , Diego Villamaina ' __docformat__ = 'reStructuredText' -__version__ = '1.0' +__version__ = '1.1' import os import shutil import subprocess import tempfile # GC3Pie specific libraries import gc3libs import gc3libs.exceptions from gc3libs import Application from gc3libs.workflow import RetryableTask from gc3libs.cmdline import SessionBasedScript, existing_file, existing_directory import gc3libs.utils from gc3libs.quantity import GB from gc3libs.utils import write_contents # 3rd-party dependencies from bids.grabbids import BIDSLayout # Defaults RUN_DOCKER = "./run_docker.sh" MAX_MEMORY = 32*GB -DEFAULT_BIDS_FOLDER = "$PWD/data/" +DEFAULT_BIDS_FOLDER = "data/" DEFAULT_RESULT_FOLDER_LOCAL = "output" -DEFAULT_RESULT_FOLDER_REMOTE = "$PWD/output/" +DEFAULT_RESULT_FOLDER_REMOTE = "output/" DEFAULT_DOCKER_BIDS_ARGS = "--no-submm-recon" DEFAULT_FREESURFER_LICENSE_FILE = "license.txt" DEFAULT_DOCKER_BIDS_APP = "poldracklab/fmriprep " + DEFAULT_DOCKER_BIDS_ARGS ANALYSIS_LEVELS = ["participant", "group1", "group2", "group"] DOCKER_RUN_COMMAND = "sudo docker run -i --rm {DOCKER_MOUNT} {DOCKER_TO_RUN} /bids /output {ANALYSIS} " COPY_COMMAND = "cp {0}/* {1} -Rf" RUN_DOCKER_SCRIPT="""#!/bin/bash echo "[`date`]: Start processing for subject {subject}" group=`id -g -n` -sudo docker run -i --rm -v {data}:/bids -v {output}:/output {container} /bids /output {analysis} --participant_label {subject} +i=1 +retry=3 +while [ $i -le $retry ]; +do + docker pull {container} + if [ $? -ne 0 ] + then + sleep 20s + i=$((i+1)) + else + break + fi +done +echo "Running the container..." +sudo docker run -i --rm -v {data}:/bids -v $PWD/{output}:/output {license_var} {license_map} {container} /bids /output {analysis} --participant_label {subject} RET=$? echo "fixing local filesystem permission" sudo chown -R $USER:$group {output} echo "[`date`]: Done with code $RET" exit $RET """ - # Utility methods -def _make_temp_run_docker_file(location, docker, data, output, analysis, subject): +def _make_temp_run_docker_file(location, docker, data, output, license_var,license_map, analysis, subject): """ Create execution script to control docker execution and post-process """ try: (fd, tmp_filename) = tempfile.mkstemp(prefix="sbj{0}-".format(subject),dir=location) write_contents(tmp_filename, RUN_DOCKER_SCRIPT.format(data=data, output=output, + license_var=license_var, + license_map=license_map, container=docker, analysis=analysis, subject=subject)) os.chmod(tmp_filename, 0755) return tmp_filename except Exception, ex: gc3libs.log.debug("Error creating execution script." "Error type: %s. Message: %s" % (type(ex), ex.message)) raise -def _get_subjects(root_input_folder): +def _get_subjects(root_input_folder, participant_label=None): """ build subject list form either input arguments (participant_label, participant_file) or (if participant_label and participant_file are not specified) input data in bids_input_folder, then remove subjects form list according to participant_exclusion_file (if any) """ layout = BIDSLayout(root_input_folder) + number_of_subjects = layout.get_subjects() + if participant_label: + if not set(participant_label).issubset(set(number_of_subjects)): + raise ValueError('One or more participant labels in: %s do not exist. Please choose among: %s' % (participant_label, number_of_subjects)) + number_of_subjects = participant_label + return [(os.path.abspath(os.path.join(root_input_folder, "sub-{}".format(subject))), - subject) for subject in layout.get_subjects()] + subject) for subject in number_of_subjects] def _get_control_files(input_folder): """ return tuple ([list .json and .tsv files],[list of sub-folders]) Assumptions: * each sub-folder contains a valid subject's data * each .json and .tsv file found in root folder will be made available to all Applications. """ return [os.path.abspath(os.path.join(input_folder, control)) for control in os.listdir(input_folder) if control.endswith(".json") or control.endswith(".tsv")] def _is_participant_analysis(analysis_level): return analysis_level == ANALYSIS_LEVELS[0] +def _is_group_analysis(analysis_level): + return analysis_level == ANALYSIS_LEVELS[3] + # Custom application class class GbidsApplication(Application): """ Custom class to wrap the execution of the R scripts passed in src_dir. """ application_name = 'gbids' def __init__(self, docker_run, subject, subject_name, control_files, analysis_level, **extra_args): executables = [] inputs = dict() outputs = [] self.subject_dir = subject self.subject_name = subject_name self.data_output_dir = extra_args['data_output_dir'] + self.license = extra_args['freesurfer_license'] + + # Transfer FREESURFER license file if given + if extra_args['freesurfer_license']: + license_file = os.path.basename(self.license) + license_file_location = '/tmp/license.txt' + inputs[extra_args['freesurfer_license']] = license_file + ## Create the $FS_LICENSE env var and pass it to the docker container + license_var = '-e FS_LICENSE='+license_file_location + ## Map the license.txt to the file location within the docker container + license_map = '-v ' + os.path.join('$PWD',license_file) + ':'+license_file_location + else: + license_var='' + license_map='' if extra_args['transfer_data']: # Input data need to be transferred to compute node # include them in the `inputs` list and adapt # container execution command - inputs[subject] = os.path.join(DEFAULT_BIDS_FOLDER, os.path.basename(subject)) - # add all control files to 'data' folder - for element in control_files: - inputs[element] = os.path.join(DEFAULT_BIDS_FOLDER, - os.path.basename(element)) + # for the "group" level no need to include control_files, the whole input directory tree is transferred. + if control_files: + for element in control_files: + inputs[element] = os.path.join(DEFAULT_BIDS_FOLDER, + os.path.basename(element)) inputs[extra_args['local_result_folder']] = DEFAULT_RESULT_FOLDER_REMOTE outputs.append(DEFAULT_RESULT_FOLDER_REMOTE) - run_docker_input_data = DEFAULT_BIDS_FOLDER - run_docker_output_data = DEFAULT_RESULT_FOLDER_REMOTE + run_docker_input_data = os.path.join('$PWD',DEFAULT_BIDS_FOLDER) + run_docker_output_data = DEFAULT_RESULT_FOLDER_REMOTE + + if _is_group_analysis(analysis_level): + run_docker_input_data = os.path.join('$PWD',DEFAULT_BIDS_FOLDER,extra_args['bids_input_folder']) + else: # Use local filesystem as reference # Define mount points run_docker_input_data = subject - run_docker_output_data = extra_args['data_output_dir'] + run_docker_output_data = DEFAULT_RESULT_FOLDER_REMOTE + outputs.append(DEFAULT_RESULT_FOLDER_REMOTE) self.run_script = _make_temp_run_docker_file(extra_args['session'], docker_run, run_docker_input_data, run_docker_output_data, + license_var, + license_map, analysis_level, subject_name) inputs[self.run_script] = "./run_docker.sh" executables.append(inputs[self.run_script]) Application.__init__( self, arguments="./run_docker.sh", inputs=inputs, outputs=outputs, stdout='{0}.log'.format(subject_name), join=True, executables=executables, **extra_args) def terminated(self): """ checks exitcode. If out-of-memory is somehow detected (e.g. exit code 137) try re-submit increasing memory allocation :return: None """ if self.execution.returncode == 137: if self.requested_memory and self.requested_memory < MAX_MEMORY: self.requested_memory *= 4*GB self.execution.returncode = (0, 99) class GbidsRetriableTask(RetryableTask): def __init__(self, subject, subject_name, control_files, docker_run, freesurfer_license, **extra_args): return GbidsApplication(subject, subject_name, control_files, docker_run, freesurfer_license, **extra_args) class GbidsScript(SessionBasedScript): """ The ``gbids`` command keeps a record of jobs (submitted, executed and pending) in a session file (set name with the ``-s`` option); at each invocation of the command, the status of all recorded jobs is updated, output from finished jobs is collected, and a summary table of all known jobs is printed. Options can specify a maximum number of jobs that should be in 'SUBMITTED' or 'RUNNING' state; ``gbids`` will delay submission of newly-created jobs so that this limit is never exceeded. Once the processing of all chunked files has been completed, ``gbids`` aggregates them into a single larger output file located in 'self.params.output'. """ def __init__(self): self.bids_app_execution = DEFAULT_DOCKER_BIDS_APP SessionBasedScript.__init__( self, version=__version__, application=GbidsApplication, stats_only_for=GbidsApplication, ) def setup_args(self): self.add_param("bids_app", type=str, - help="Name of BIDS App to run. " + help="Name of BIDS App to run. " " Images are listed at: http://bids-apps.neuroimaging.io/apps/ ") self.add_param("bids_input_folder", type=existing_directory, help="Root location of input data. Note: expects folder in " "BIDS format.") self.add_param("bids_output_folder", type=str, help="Location of the " " results.") self.add_param("analysis_level", type=str, help="analysis_level: participant: 1st level\n" "group: second level. Bids-Apps specs allow for multiple substeps " "(e.g., group1, group2." "Allowed values: {0}.".format(ANALYSIS_LEVELS)) def setup_options(self): self.add_param("-F", "--datatransfer", dest="transfer_data", action="store_true", default=False, help="Transfer input data to compute nodes. " "If False, data will be assumed be already visible on " "compute nodes - e.g. shared filesystem. " "Default: %(default)s.") - self.add_param("-L", "--license", metavar="[PATH]", + self.add_param("-FL", "--freesurfer_license", metavar="[PATH]", type=existing_file, dest="freesurfer_license", default=None, help="Location of freesurfer license file. Default: %(default)s.") + self.add_param("-PL","--participant_label", dest="participant_label", nargs='+', + help="Run analysis for a single or multiple participants" ) + + def parse_args(self): """ Check for valid analysis level. merge bids_app and related execution arguments """ assert self.params.analysis_level in ANALYSIS_LEVELS, "Unknown analysis level {0}. " \ "Allowed values are {1}".format(self.params.analysis_level, ANALYSIS_LEVELS) self.bids_app_execution = self.params.bids_app self.params.bids_output_folder = os.path.abspath(self.params.bids_output_folder) - self.params.bids_input_folder = os.path.abspath(self.params.bids_input_folder) + self.params.bids_input_folder = os.path.abspath(self.params.bids_input_folder) def new_tasks(self, extra): """ if analysis type is 'group' create single gbidsApplication with all input data if analysis type is 'participants' for each valid input file create a new GbidsApplication """ tasks = [] control_files = _get_control_files(self.params.bids_input_folder) local_result_folder = os.path.join(self.session.path, DEFAULT_RESULT_FOLDER_LOCAL) for folder in [self.params.bids_output_folder]: if not os.path.isdir(folder): try: os.mkdir(folder) except OSError, osx: gc3libs.log.error("Failed to create folder {0}. reason: '{1}'".format(folder, osx)) - + if self.params.transfer_data and not os.path.isdir(local_result_folder): os.mkdir(os.path.join(local_result_folder)) - + if _is_participant_analysis(self.params.analysis_level): + # participant level analysis - for (subject_dir, subject_name) in _get_subjects(self.params.bids_input_folder): + for (subject_dir, subject_name) in _get_subjects(self.params.bids_input_folder, self.params.participant_label): + job_name = subject_name extra_args = extra.copy() if not self.params.transfer_data: # Use root BIDS folder and set participant label for each task subject_dir = self.params.bids_input_folder if self.params.transfer_data: extra_args['local_result_folder'] = local_result_folder - + extra_args['session'] = self.session.path extra_args['transfer_data'] = self.params.transfer_data extra_args['jobname'] = job_name extra_args['output_dir'] = os.path.join(os.path.abspath(self.session.path), '.compute', subject_name) extra_args['data_output_dir'] = os.path.join(os.path.abspath(self.params.bids_output_folder), subject_name) extra_args['freesurfer_license'] = self.params.freesurfer_license self.log.debug("Creating Application for subject {0}".format(subject_name)) tasks.append(GbidsApplication( self.bids_app_execution, subject_dir, subject_name, control_files, self.params.analysis_level, **extra_args)) else: # Group level analysis subject_name = self.params.analysis_level extra_args = extra.copy() + + extra_args['bids_input_folder'] = os.path.basename(os.path.normpath(self.params.bids_input_folder)) + + if self.params.transfer_data: + extra_args['local_result_folder'] = local_result_folder + + extra_args['session'] = self.session.path extra_args['jobname'] = self.params.analysis_level - extra_args['data-transfer'] = self.params.transfer_data - extra_args['output_dir'] = os.path.join(self.params.bids_output_folder, - '.compute') - extra_args['data_output_dir'] = os.path.join(os.path.abspath(self.params.bids_output_folder), - subject_name) + extra_args['transfer_data'] = self.params.transfer_data + extra_args['output_dir'] = os.path.join(os.path.abspath(self.session.path),'.compute', + subject_name) + extra_args['data_output_dir'] = os.path.join(os.path.abspath(self.params.bids_output_folder),subject_name) extra_args['freesurfer_license'] = self.params.freesurfer_license self.log.debug("Creating Application for analysis {0}".format(self.params.analysis_level)) tasks.append(GbidsApplication( self.bids_app_execution, self.params.bids_input_folder, + subject_name, None, - control_files, self.params.analysis_level, **extra_args)) return tasks def after_main_loop(self): """ Merge all results from all subjects into `results` folder """ for task in self.session: - if isinstance(task, GbidsApplication) and task.execution.returncode == 0: + if isinstance(task, GbidsApplication) and task.execution.returncode == 0 and os.path.exists(task.output_dir): + gc3libs.log.debug("Moving tasks {0} results from {1} to {2}".format(task.subject_name, - task.data_output_dir, + task.output_dir, self.params.bids_output_folder)) - gc3libs.utils.movetree(task.data_output_dir, self.params.bids_output_folder) - # Cleanup data_output_dir folder - gc3libs.log.debug("Removing task data_output_dir '{0}'".format(task.data_output_dir)) - shutil.rmtree(task.data_output_dir) + gc3libs.utils.movetree(task.output_dir, self.params.bids_output_folder) + # Cleanup data_output_dir folder + gc3libs.log.debug("Removing task data_output_dir '{0}'".format(task.output_dir)) + shutil.rmtree(task.output_dir) # run script, but allow GC3Pie persistence module to access classes defined here; # for details, see: http://code.google.com/p/gc3pie/issues/detail?id=95 if __name__ == "__main__": import gbids gbids.GbidsScript().run() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c37243e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +python-neutronclient==6.10.0 +python-novaclient==11.0.0 +python-openstackclient==3.16.1 +pybids==0.6.5 +os-client-config==1.31.2