diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c9b568f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.pyc +*.swp diff --git a/Snakefile b/Snakefile new file mode 100644 index 0000000..abaa82c --- /dev/null +++ b/Snakefile @@ -0,0 +1,93 @@ +import os +from snakemake.remote.HTTP import RemoteProvider as HTTPRemoteProvider + +HTTP = HTTPRemoteProvider() + +SAMPLES = ['DATASET_NAME'] + +for smp in SAMPLES: + print("Sample " + smp + " will be processed") + +rule final: + input: +# directory('/tmp/MOUSE_GRCm38.p6'), + expand('/output/{sample}_R1_fastqc.html', sample=SAMPLES), + expand('/output/{sample}_R2_fastqc.html', sample=SAMPLES), + expand('/output/{sample}_trimmed_R1.fastq', sample=SAMPLES), + expand('/output/{sample}_trimmed_R2.fastq', sample=SAMPLES), + expand('/output/{sample}_trimmed_R1_fastqc.html', sample=SAMPLES), + expand('/output/{sample}_trimmed_R2_fastqc.html', sample=SAMPLES), + expand('/output/{sample}_trimmed.fastq', sample=SAMPLES), + +rule get_genome: + input: + HTTP.remote("cloud.s3it.uzh.ch:8080/v1/AUTH_576f87a2a18948bdb2da11fdcad29ae2/RNA-genome/GENOME.zip", keep_local=True) + output: + directory('/tmp/MOUSE_GRCm38.p6') + priority:1 + run: + outputName = os.path.join('/tmp',os.path.basename(input[0])) + shell("mv {input} {outputName} && unzip {outputName} -d /tmp && rm {outputName}") + +rule perform_qc: + input: + R1='/input/{sample}_R1.fastq', + R2='/input/{sample}_R2.fastq', + params: + out_dir = '/output/' + output: + '/output/{sample}_R1_fastqc.html', + '/output/{sample}_R1_fastqc.zip', + '/output/{sample}_R2_fastqc.html', + '/output/{sample}_R2_fastqc.zip', + shell: + r''' + fastqc -o {params.out_dir} -f fastq {input.R1} {input.R2} + ''' + +rule trimmometic_run: + input: + R1_read='/input/{sample}_R1.fastq', + R2_read='/input/{sample}_R1.fastq', + output: + R1_trimmed='/output/{sample}_trimmed_R1.fastq', + R2_trimmed='/output/{sample}_trimmed_R2.fastq', + R1_unpaired='/output/{sample}_trimmed_upaired_R1.fastq', + R2_unpaired='/output/{sample}_trimmed_upaired_R2.fastq', + + message: """---TRIMMOMATIC---""" + shell: + r''' + java -jar /usr/share/java/trimmomatic-0.35.jar PE -phred33 {input.R1_read} {input.R2_read} {output.R1_trimmed} {output.R1_unpaired} {output.R2_trimmed} {output.R2_unpaired} LEADING:3 TRAILING:3 SLIDINGWINDOW:4:15 MINLEN:30 + ''' + +rule perform_qc_trimmed: + input: + R1_trimmed_qc='/output/{sample}_trimmed_R1.fastq', + R2_trimmed_qc='/output/{sample}_trimmed_R2.fastq', + params: + out_dir = '/output/' + output: + '/output/{sample}_trimmed_R1_fastqc.html', + '/output/{sample}_trimmed_R1_fastqc.zip', + '/output/{sample}_trimmed_R2_fastqc.html', + '/output/{sample}_trimmed_R2_fastqc.zip', + + message: """---QC trimmed Data---""" + shell: + r''' + fastqc -outdir {params.out_dir} -f fastq {input.R1_trimmed_qc} {input.R2_trimmed_qc} + ''' + +rule merge_trimmed_R1_and_R2: + input: + R1_trimmed_qc='/output/{sample}_trimmed_R1.fastq', + R2_trimmed_qc='/output/{sample}_trimmed_R2.fastq', + output: + merged='/output/{sample}_trimmed.fastq', + + message: """---Merge trimmed R1 and R2---""" + shell: + r''' + cat {input.R1_trimmed_qc} {input.R2_trimmed_qc} > {output.merged} + ''' diff --git a/Snakefile_Localhost b/Snakefile_Localhost new file mode 100644 index 0000000..47d3223 --- /dev/null +++ b/Snakefile_Localhost @@ -0,0 +1,86 @@ +import os + +SAMPLES = ['DATASET_NAME'] + +group = '-'.join(SAMPLES[0].split('-')[1:]) +dataset = SAMPLES[0] + +# This is for localhost execution +PATH_TO_INPUT_FILES = os.path.join('/input/',group,dataset) +PATH_TO_OUTPUT = os.path.join('/output/',dataset) + +for smp in SAMPLES: + print("Sample " + smp + " will be processed") + +rule final: + input: + expand(os.path.join(PATH_TO_OUTPUT,'{sample}_R1_fastqc.html'), sample=SAMPLES), + expand(os.path.join(PATH_TO_OUTPUT,'{sample}_R2_fastqc.html'), sample=SAMPLES), + expand(os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R1.fastq'), sample=SAMPLES), + expand(os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R2.fastq'), sample=SAMPLES), + expand(os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R1_fastqc.html'), sample=SAMPLES), + expand(os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R2_fastqc.html'), sample=SAMPLES), + expand(os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed.fastq'), sample=SAMPLES), + +rule perform_qc: + input: + R1=os.path.join(PATH_TO_INPUT_FILES,'{sample}_R1.fastq'), + R2=os.path.join(PATH_TO_INPUT_FILES,'{sample}_R2.fastq'), + params: + out_dir = PATH_TO_OUTPUT, + output: + os.path.join(PATH_TO_OUTPUT,'{sample}_R1_fastqc.html'), + os.path.join(PATH_TO_OUTPUT,'{sample}_R1_fastqc.zip'), + os.path.join(PATH_TO_OUTPUT,'{sample}_R2_fastqc.html'), + os.path.join(PATH_TO_OUTPUT,'{sample}_R2_fastqc.zip'), + shell: + r''' + fastqc -o {params.out_dir} -f fastq {input.R1} {input.R2} + ''' + +rule trimmometic_run: + input: + R1_read=os.path.join(PATH_TO_INPUT_FILES,'{sample}_R1.fastq'), + R2_read=os.path.join(PATH_TO_INPUT_FILES,'{sample}_R1.fastq'), + output: + R1_trimmed=os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R1.fastq'), + R2_trimmed=os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R2.fastq'), + R1_unpaired=os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_upaired_R1.fastq'), + R2_unpaired=os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_upaired_R2.fastq'), + + message: """---TRIMMOMATIC---""" + shell: + r''' + java -jar /usr/share/java/trimmomatic-0.35.jar PE -phred33 {input.R1_read} {input.R2_read} {output.R1_trimmed} {output.R1_unpaired} {output.R2_trimmed} {output.R2_unpaired} LEADING:3 TRAILING:3 SLIDINGWINDOW:4:15 MINLEN:30 + ''' + +rule perform_qc_trimmed: + input: + R1_trimmed_qc=os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R1.fastq'), + R2_trimmed_qc=os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R2.fastq'), + params: + out_dir=PATH_TO_OUTPUT + output: + os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R1_fastqc.html'), + os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R1_fastqc.zip'), + os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R2_fastqc.html'), + os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R2_fastqc.zip'), + + message: """---QC trimmed Data---""" + shell: + r''' + fastqc -outdir {params.out_dir} -f fastq {input.R1_trimmed_qc} {input.R2_trimmed_qc} + ''' + +rule merge_trimmed_R1_and_R2: + input: + R1_trimmed_qc=os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R1.fastq'), + R2_trimmed_qc=os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed_R2.fastq'), + output: + merged=os.path.join(PATH_TO_OUTPUT,'{sample}_trimmed.fastq'), + + message: """---Merge trimmed R1 and R2---""" + shell: + r''' + cat {input.R1_trimmed_qc} {input.R2_trimmed_qc} > {output.merged} + ''' diff --git a/gtn.py b/gtn.py new file mode 100644 index 0000000..160ddc7 --- /dev/null +++ b/gtn.py @@ -0,0 +1,502 @@ +#! /usr/bin/env python +# +# gtn.py -- Front-end script for running Dockerized tn-seq pipelines. +# +# Copyright (C) 2018, 2019 S3IT, University of Zurich +# +# This program is free software: you can redistribute it and/or +# modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +""" +Front-end script for submitting multiple `tn-seq pipeline` jobs fetching +docker images from the `sparkvilla/tnseq` docker hub repository. It uses +Snakemake as workflow manager. + +It uses the generic `gc3libs.cmdline.SessionBasedScript` framework. + +See the output of ``gtn.py --help`` for program usage +instructions. + +Example of docker execution: +docker run -i -v /path/to/tn_data/:/input:ro -v /path/to/tn_output/:/output -v /path/to/tn_setup/Snakefile:/setup/ --entrypoint snakemake sparkvilla/tnseq -s /setup/Snakefile + +gtn-seq takes "--- files" as input. +""" + +# summary of user-visible changes +__changelog__ = """ + 2018-11-06: + * Initial version +""" +__author__ = 'Diego Villamaina ' +__docformat__ = 'reStructuredText' +__version__ = '1.0' + +import os +import shutil +import subprocess +import tempfile +import re + +# GC3Pie specific libraries +import gc3libs +import gc3libs.exceptions +from gc3libs import Application +from gc3libs.workflow import RetryableTask +from gc3libs.cmdline import SessionBasedScript, existing_file, existing_directory, positive_int +import gc3libs.utils +from gc3libs.quantity import GB +from gc3libs.utils import write_contents + +# Defaults +RUN_DOCKER = "./run_docker.sh" +MAX_MEMORY = 32*GB +DEFAULT_GTN_FOLDER = "data/" +DEFAULT_RESULT_FOLDER_LOCAL = "output" +DEFAULT_RESULT_FOLDER_REMOTE = "output/" +DEFAULT_DOCKER_IMAGE = "sparkvilla/tnseq" +DEFAULT_SNAKEFILE_TEMPLATE = "./Snakefile" +DEFAULT_SNAKEFILE_TEMPLATE_LOCALHOST = "./Snakefile_Localhost" +COPY_COMMAND = "cp {0}/* {1} -Rf" +RUN_DOCKER_SCRIPT="""#!/bin/bash + +echo "[`date`]: Start processing for pipeline {dataset}" +group=`id -g -n` +i=1 +retry=3 +while [ $i -le $retry ]; +do + docker pull {container} + if [ $? -ne 0 ] + then + sleep 20s + i=$((i+1)) + else + break + fi +done +echo "Running the container..." +sudo docker run -i --rm -v {data}:/input -v {output}:/output -v {snakefile}:/setup/Snakefile --entrypoint snakemake {container} -s /setup/Snakefile + +RET=$? +echo "fixing local filesystem permission" +sudo chown -R $USER:$group {output} +echo "[`date`]: Done with code $RET" +exit $RET +""" + +# Utility methods +class Dataset(): + """ + Class to verify the namimg of gtn group, dataset, and FASTQ files. + + A group folder must be named as: + g-{'group_name'} + + A dataset folder must be named as: + d{number}-g-{'group_name'} + + 2 FASTQ files for each dataset are expected, named as: + d{number}-g-{'group_name'}_R1.fq + d{number}-g-{'group_name'}_R2.fq + + Attributes + --------- + + name: dataset name + location: full path to the dataset + name_g: group name + g_location: full path to the group + + """ + def __init__(self, name, location): + self.name = name + self.location = location + self.g_location = os.path.split(location)[0] + self.g_name = os.path.split(self.g_location)[1] + self._verify_dataset(self.g_name, self.g_location) + self._verify_filename(self.name, self.location) + + def _verify_dataset(self, g_name, g_location): + valid_dataset = [ data for data in os.listdir(g_location) if re.search("d\d-{0}".format(g_name),data) ] + assert len(valid_dataset) > 0, \ + "Marker d-{0} not found. Location {1} not a valid dataset.".format(g_name, + g_location) + def _verify_filename(self, name, location): + valid_filenames = [ f for f in os.listdir(location) if os.path.isfile(os.path.join(location,f)) and + re.search("{0}_R\d.fastq".format(name),f)] + # 2 FASTQ files expected: R1 and R2 + assert len(valid_filenames) > 1, \ + "Marker {0}. Location: {1} not a valid dataset.".format(name, + location) + + +def _make_temp_run_docker_file(location, dataset_name, data, output, snakefile, docker): + """ + Create execution script to control docker execution and post-process + """ + try: + (fd, tmp_filename) = tempfile.mkstemp(prefix="{}_drun_".format(dataset_name),dir=location) + write_contents(tmp_filename, RUN_DOCKER_SCRIPT.format(dataset=dataset_name, + data=data, + output=output, + snakefile=snakefile, + container=docker)) + os.chmod(tmp_filename, 0755) + return tmp_filename + except Exception, ex: + gc3libs.log.debug("Error creating execution script." + "Error type: %s. Message: %s" % (type(ex), ex.message)) + raise + +def _get_groups(gtn_input): + """ + Build a list of (group_path, group_name) from the gtn root folder. + A group folder must start with: g- + """ + dirs = os.listdir(gtn_input) + return [(os.path.join(gtn_input,d),d) for d in dirs if d.startswith('g-')] + +def _get_datasets(gtn_input): + """ + Yield all datasets objects. + """ + for g_path, g_name in _get_groups(gtn_input): + for d_name in os.listdir(g_path): + d_path = os.path.join(g_path,d_name) + yield Dataset(d_name, d_path) + +def _get_group_name(dataset_name): + split_name=dataset_name.split('-')[1:] + return '-'.join(split_name) + +def __in_place_change(filename, new_string, old_string): + """ + Helper: + Replace an old_string with a new_strig in a file. If the old_string + is not found an exeption is raised. + """ + # Read the input filename + with open(filename) as f: + s = f.read() + if old_string not in s: + raise ValueError('%s not found in: %s' % (old_string, filename)) + + # Safely write the changed content, if file and old_string were found + with open(filename, 'w') as f: + # This must change to gc3pie logging + gc3libs.log.debug("Changing %s to %s" % (old_string, new_string)) + s = s.replace(old_string, new_string) + f.write(s) + +def _make_tmp_snakefile(template_snakefile, location, dataset): + """ + Create a Snakefile for a specific dataset. + + The template_snakefile is expected to have a line: + SAMPLES = ['DATASET_NAME'] + with 'DATASET_NAME' string to be replaced by dataset. + + """ + (fd, tmp_filename) = tempfile.mkstemp(prefix="{}_snakef_".format(dataset),dir=location) + gc3libs.log.debug("Generate a tmp snakefile %s" % (tmp_filename)) + shutil.copy2(template_snakefile, tmp_filename) + __in_place_change(tmp_filename, new_string=dataset, old_string='DATASET_NAME') + return tmp_filename + +# Custom application class +class GtnApplication(Application): + """ + Custom class to wrap the execution of the Snakemake pipeline. + """ + application_name = 'gtn' + + def __init__(self, docker_run, dataset, dataset_name, data_output_dir, snakefile, **extra_args): + + executables = [] + inputs = dict() + outputs = [] + + self.dataset_path = dataset + self.dataset_name = dataset_name + self.data_output_dir = data_output_dir # extra_args['data_output_dir'] + self.snakefile = snakefile # extra_args['snakefile'] + self.group_name = _get_group_name(dataset_name) + + # Transfer the Snakefile + inputs[self.snakefile] = os.path.join(DEFAULT_GTN_FOLDER, + os.path.basename(self.snakefile)) + + if 'transfer_data' in extra_args.keys() and extra_args['transfer_data']: + # Input data need to be transferred to compute node + # include them in the `inputs` list and adapt + # container execution command + inputs[self.dataset_path] = os.path.join(DEFAULT_GTN_FOLDER, + os.path.basename(self.dataset_path)) + outputs.append(DEFAULT_RESULT_FOLDER_REMOTE) + + run_docker_input_data = os.path.join('$PWD', DEFAULT_GTN_FOLDER, self.dataset_name) + run_docker_output_data = os.path.join('$PWD', DEFAULT_RESULT_FOLDER_REMOTE) + run_docker_snakefile = os.path.join('$PWD', + DEFAULT_GTN_FOLDER, + os.path.basename(self.snakefile)) + else: + # Use local filesystem as reference + # Define mount points + run_docker_input_data = os.path.join(self.dataset_path, self.group_name, self.dataset_name) + run_docker_output_data = os.path.join('$PWD',DEFAULT_RESULT_FOLDER_REMOTE) + run_docker_snakefile = os.path.join('$PWD', + DEFAULT_GTN_FOLDER, + os.path.basename(self.snakefile)) + + outputs.append(DEFAULT_RESULT_FOLDER_REMOTE) + + self.run_script = _make_temp_run_docker_file(extra_args['session'], + dataset_name, + run_docker_input_data, + run_docker_output_data, + run_docker_snakefile, + docker_run) + + inputs[self.run_script] = "./run_docker.sh" + executables.append(inputs[self.run_script]) + + Application.__init__( + self, + arguments="./run_docker.sh", + inputs=inputs, + outputs=outputs, + stdout='{0}.log'.format(dataset_name), + join=True, + executables=executables, + **extra_args) + + def terminated(self): + """ + checks exitcode. If out-of-memory is somehow detected (e.g. exit code 137) + try re-submit increasing memory allocation + :return: None + """ + if self.execution.returncode == 137: + if self.requested_memory and self.requested_memory < MAX_MEMORY: + self.requested_memory *= 4*GB + self.execution.returncode = (0, 99) + +class GtnLocalhostApplication(Application): + """ + Custom class for execution of Snakemake pipeline on Localhost. + """ + application_name = 'gtnlocalhost' + + def __init__(self, dataset, dataset_name, data_output_dir, snakefile, **extra_args): + + self.dataset_path = dataset + self.dataset_name = dataset_name + self.data_output_dir = data_output_dir # extra_args['data_output_dir'] + self.snakefile = snakefile # extra_args['snakefile'] + self.group_name = _get_group_name(dataset_name) + + inp = os.path.basename(snakefile) + out = os.path.basename(os.path.join(self.data_output_dir,self.group_name)) + + + Application.__init__( + self, + arguments=[ + "snakemake", "-s", inp ], + stdout='{0}.log'.format(dataset_name), + inputs=[snakefile], + outputs=[out], + output_dir=("g-"+inp)) + +class GtnRetriableTask(RetryableTask): + def __init__(self, docker_run, dataset, dataset_name,**extra_args): + return GtnApplication(docker_run, dataset, dataset_name,**extra_args) + + +class GtnScript(SessionBasedScript): + """ + The ``gtn`` command keeps a record of jobs (submitted, executed + and pending) in a session file (set name with the ``-s`` option); at + each invocation of the command, the status of all recorded jobs is + updated, output from finished jobs is collected, and a summary table + of all known jobs is printed. + + Options can specify a maximum number of jobs that should be in + 'SUBMITTED' or 'RUNNING' state; ``gbids`` will delay submission of + newly-created jobs so that this limit is never exceeded. + + Once the processing of all chunked files has been completed, ``gtn`` + aggregates them into a single larger output file located in + 'self.params.output'. + """ + + def __init__(self): + self.gtn_app_execution = DEFAULT_DOCKER_IMAGE + SessionBasedScript.__init__( + self, + version=__version__, + application=GtnApplication, + stats_only_for=GtnApplication, + ) + + # def setup(self): + # SessionBasedScript.setup(self) + + # self.add_param("-C", "--continuous", "--watch", + # type=positive_int, dest="wait", + # default=10, metavar="NUM", + # help="Keep running, monitoring jobs and possibly submitting" + # " new ones every NUM seconds. Default: %(default)s seconds." + # ) + + # self.add_param("-c", "--cpu-cores", dest="ncores", + # type=positive_int, default=8, # 8 core + # metavar="NUM", + # help="Set the number of CPU cores required for each job" + # " (default: %(default)s). NUM must be a whole number." + # ) + + def setup_args(self): + + self.add_param("gtn_input_folder", type=existing_directory, + help="Root location of input data. Note: expects folder in " + "gtn format") + + self.add_param("gtn_output_folder", type=str, help="Location of the " + " results.") + + def setup_options(self): + self.add_param("-F", "--datatransfer", dest="transfer_data", + action="store_true", default=False, + help="Transfer input data to compute nodes. " + "If False, data will be assumed be already visible on " + "compute nodes - e.g. shared filesystem. " + "Default: %(default)s.") + + self.add_param("-app", dest="gtn_app", type=str, + default=False, + help="Name docker image to use. " + " Default: sparkvilla/gtnseq") + + self.add_param("-sfile", metavar="[PATH]", + type=existing_file, + dest="snakefile_template", default=False, + help="Location of the Snakefile template") + + def parse_args(self): + """ + Declare command line arguments. + """ + self.params.gtn_output_folder = os.path.abspath(self.params.gtn_output_folder) + self.params.gtn_input_folder = os.path.abspath(self.params.gtn_input_folder) + + def new_tasks(self, extra): + """ + create single gtnApplication with all input data + for each valid input file create a new GtnApplication + """ + tasks = [] + local_result_folder = os.path.join(self.session.path, + DEFAULT_RESULT_FOLDER_LOCAL) + + snakefile_template = os.path.abspath(DEFAULT_SNAKEFILE_TEMPLATE) + + if self.params.resource_name == "localhost": + snakefile_template = os.path.abspath(DEFAULT_SNAKEFILE_TEMPLATE_LOCALHOST) + + for folder in [self.params.gtn_output_folder]: + if not os.path.isdir(folder): + try: + os.mkdir(folder) + except OSError, osx: + gc3libs.log.error("Failed to create folder {0}. reason: '{1}'".format(folder, + osx)) + if self.params.transfer_data and not os.path.isdir(local_result_folder): + os.mkdir(os.path.join(local_result_folder)) + + + #Creates a tn pipeline for each existing dataset + for dataset in _get_datasets(self.params.gtn_input_folder): + + gc3libs.log.info("Dataset: {} was found. Pipeline starting out..".format(dataset.name)) + + extra_args = extra.copy() + + if not self.params.transfer_data: + # Use root BIDS folder and set participant label for each task + dataset.location = self.params.gtn_input_folder + + if self.params.transfer_data: + extra_args['local_result_folder'] = local_result_folder + + if self.params.gtn_app: + self.gtn_app_execution = self.params.gtn_app + + if self.params.snakefile_template: + snakefile_template = os.path.abspath(self.params.snakefile) + + snakefile_tmp_path = _make_tmp_snakefile(snakefile_template, + self.session.path, + dataset.name) + + extra_args['snakefile'] = snakefile_tmp_path + extra_args['session'] = self.session.path + extra_args['transfer_data'] = self.params.transfer_data + #extra_args['jobname'] = job_name + extra_args['output_dir'] = os.path.join(os.path.abspath(self.session.path), + '.compute', + dataset.name) + extra_args['data_output_dir'] = os.path.join(os.path.abspath(self.params.gtn_output_folder), + dataset.name) + + self.log.debug("Creating Application for dataset: {}".format(dataset.name)) + + if self.params.resource_name == "localhost": + tasks.append(GtnLocalhostApplication( + dataset.location, + dataset.name, + **extra_args)) + + else: + tasks.append(GtnApplication( + self.gtn_app_execution, + dataset.location, + dataset.name, + **extra_args)) + + return tasks + + def after_main_loop(self): + """ + Merge all results from all subjects into `results` folder + """ + + for task in self.session: + if isinstance(task, GtnApplication) and task.execution.returncode == 0 and os.path.exists(task.output_dir): + gc3libs.log.debug("Moving tasks {0} results from {1} to {2}".format(task.dataset_name, + task.output_dir, + self.params.gtn_output_folder)) + + gc3libs.utils.movetree(task.output_dir, os.path.join(self.params.gtn_output_folder,task.dataset_name)) + # Cleanup data_output_dir folder + gc3libs.log.debug("Removing task data_output_dir '{0}'".format(task.output_dir)) + shutil.rmtree(task.output_dir) + +# run script, but allow GC3Pie persistence module to access classes defined here; +# for details, see: http://code.google.com/p/gc3pie/issues/detail?id=95 +if __name__ == "__main__": + import gtn + + gtn.GtnScript().run()