Page MenuHomec4science

Snakefile
No OneTemporary

File Metadata

Created
Thu, Jan 23, 07:25

Snakefile

##############################
## MelArray - main pipeline ##
##############################
#
# @author: Phil Cheng, USZ (for original pipeline)
# @author: Balazs Laurenczy, ETHZ (for Snakemake pipeline)
# @date: 2017-2018
##########
# IMPORT #
##########
# import entire packages
import os, logging, platform, re
# import some specific packages
from timeit import default_timer as t
from datetime import datetime
# import the load_config function from the scripts folder
from pipeline.scripts.load_config import load_config
from pipeline.scripts.overwrite_config import overwrite_config
from pipeline.scripts.replace_in_config import replace_in_config
from pipeline.scripts.parse_ids import parse_ids
from pipeline.scripts.get_tumor_normal_dict import get_tumor_normal_dict
########
# INIT #
########
## LOGGING
logging_format = '#%(asctime)s | %(levelname)-7s | %(filename)-20s: %(message)s'
# get verbosity
verbosity = str(config.get('verbosity', 'INFO')).upper()
# if a bad verbosity was specified
if verbosity not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
logging.basicConfig(level = logging.INFO, format = logging_format)
logging.warning("Bad verbosity specified ('%s')", verbosity)
# otherwise set up logging with specified verbosity
else:
logging.basicConfig(level = getattr(logging, verbosity), format = logging_format)
## CONFIGURATION LOADING
# store the original "config" dictionary that is created from the input parameters
input_params = config
# load the basic (default) configuration file
config_file_path = "{}/pipeline/configs/config.yaml".format(os.getcwd())
config = load_config(config_file_path)
# load the configuration file from the specified configuration file, if any specified
config_file_name = input_params.get('config', None)
if config_file_name is not None:
config_file_path = "{}/pipeline/configs/config_{}.yaml".format(os.getcwd(), config_file_name)
config = overwrite_config(config, config_file_path)
# do some replacements and cleaning up in the paths
config['paths'] = replace_in_config(config['path']);
## SUBWORKFLOWS
# load the subworkflow list from the input parameter, if any
subworkflows_from_input = input_params.get('subworkflows', [])
if len(subworkflows_from_input) > 0:
subworkflows_from_input = subworkflows_from_input.split(',')
## REFERENCE GENOME
# initialize the paths for the selected/configured reference genome
ref_name = config['main']['ref']
logging.debug("Selected reference genome: %s", ref_name)
# make sure it is a valid reference
if ref_name not in ['hg19', 'hg38', 'b37']:
logging.warning('Unknown reference genome name ("%s"). Switching to default "hg19".', ref_name)
ref_name = "hg19"
# replace each reference-related path with the path corresponding to the selected reference genome
for key, value in config['path'].items():
# if the current key looks like "XXXXX_hg19" (e.g. "ref_genome_hg19" or "dbsnp_hg19")
if key.split('_')[-1] == ref_name:
# find the corresponding 'target' key ("ref_genome" or "dbsnp")
target_key = '_'.join(key.split('_')[:-1])
# replace the path at the target key with the one from the reference
config['path'][target_key] = value
logging.debug('Found reference-related path: "%s" = "%s". Setting as current path for "%s" ...',
key, value, target_key)
## VARIABLES
# store the path dictionary as a separate variable
paths = config['path']
# store the paths relative to the rule's context (might be in a container)
rule_paths = config['path']
# store the software dictionary as a separate variable
soft = config['software']
############
# RUN MODE #
############
# check the run mode, e.g. whether or not we should execute commands inside a container
# if the pipeline is fully run on the host or fully run inside a single container, not command has to be
# prefixed to the calls
if config['main']['run_mode'] == 'host' or config['main']['run_mode'] == 'run_inside_single':
logging.debug('Running without Singularity calls (run_mode = "{}") ...'.format(config['main']['run_mode']))
SINGULARITY_CMD = ''
# if the pipeline runs on the host but calls software inside a single container, always call that container
elif config['main']['run_mode'] == 'run_outside_single':
SINGULARITY_CMD = 'singularity exec -B {host_data_root}:/data -B {host_ref_root}:/ref {container} '\
.format(**paths)
logging.debug('Running with a single Singularity container (located at {} on host).'.format(paths['container']))
logging.debug('Command prefix: "{}" ...'.format(SINGULARITY_CMD))
# change the paths to be relative to the container
paths = { name: (re.sub('^' + paths['data_root'], paths['host_data_root'], value)\
if not re.match('^host', name) else value) for name, value in paths.items() }
paths = { name: (re.sub('^' + paths['ref_root'], paths['host_ref_root'], value)\
if not re.match('^host', name) else value) for name, value in paths.items() }
# other run modes are not yet supported
else:
logging.error("Run mode '{}' not supported. Aborting.".format(config['main']['run_mode']))
exit(0)
#####################
# FASTQ FILES LISTS #
#####################
logging.info('Looking for files in %s ...', paths['fastq'])
# get the list of files to process using the regexp stored in the paths['sample_pattern']
# variable, which is something like:
# [0-9]*.A-\([A-Za-z0-9-]*\)_[A-Za-z0-9_]*_R1.fastq.gz
sample_wildcards = glob_wildcards(paths['fastq'] + "/" + paths['sample_pattern'])
# extract the IDs which will be the main sample identifiers over the whole pipeline
IDs = sample_wildcards.ID
logging.info("Found {} file(s) to process ...".format(len(IDs)))
# parse the ID list depending on the input parameters (for sub-indexing for example) and
# get the new list of IDs and of FASTQ file names based on the IDs. Also get the "IDs_MD5",
# which is an unique MD5 based on the current list of IDs
IDs, IDs_MD5, FASTQ_R1_FILES, FASTQ_R2_FILES = parse_ids(IDs, sample_wildcards, input_params, paths);
########################
# TUMOR / NORMAL LISTS #
########################
# check if the tumor normal match file is present
if not os.path.exists(paths['tumor_normal_match']):
logging.error('Cannot find tumor_normal match file at "{}". Aborting.'.format(paths['tumor_normal_match']))
exit(1)
# get the dictionary containing the mapping of the matching tumor and normal samples, which
# is extracted from the file located where "paths['tumor_normal_match']" points to
TUMOR_NORMAL_DICT, NORMAL_LIST, TUMOR_LIST = get_tumor_normal_dict(IDs, paths);
#################################
## INCLUDE OTHER SUB WORKFLOWS ##
#################################
# use either the subworkflows required by the config file or what comes from the command line arguments
subworkflows_to_use = config['main']['subworkflows']
if subworkflows_to_use is None:
subworkflows_to_use = []
logging.info("Subworkflows from config: %s", subworkflows_to_use)
logging.info("Subworkflows from input: %s", subworkflows_from_input)
if subworkflows_from_input is not None and len(subworkflows_from_input) > 0:
subworkflows_to_use = subworkflows_from_input
# check if all subworklows are requested from the input
if len(subworkflows_from_input) == 1 and\
(subworkflows_from_input[0] == 'all' or subworkflows_from_input[0] == '*'):
# get the list of subworkflows from the appropriate folder
subworkflows_to_use = [ subw.replace('.snake', '') for subw in os.listdir('pipeline/subworkflows')\
if subw.endswith('.snake') ]
logging.info("Selected subworkflows: %s", subworkflows_to_use)
# this will store the required outputs from all the subworkflows, either as a dictionary or as an array
subw_outputs_dict = {}
subw_outputs = []
# loop trough all the needed subworkflows and get their outputs
for subworkflow in set(subworkflows_to_use):
# get the path to the Snakefile of the subworkflow
subworkflow_path = 'pipeline/subworkflows/{}.snake'.format(subworkflow)
# check if the Snakefile actually exists
if not os.path.exists(subworkflow_path):
logging.warning('Cannot find subworkflow "%s" at "%s". Skipping.', subworkflow, subworkflow_path)
continue
# set the required final outputs to None
subw_outputs_dict[subworkflow] = []
# if the Snakefile exists, load it. This adds the rules of that subworkflow to the main workflow but also
# generates a list of required outputs that will be stored in the dictionary "subw_outputs_dict".
include: subworkflow_path
# if no outputs were assigned, abort
if not isinstance(subw_outputs_dict[subworkflow], list) or len(subw_outputs_dict[subworkflow]) == 0:
logging.warning('No final output specified by the Snakefile at "%s". Skipping.', subworkflow_path)
continue
# if some outputs were assigned, add them to the list
subw_outputs.extend(subw_outputs_dict[subworkflow])
logging.info("IDs:")
logging.info(IDs)
logging.info('All subworkflow outputs:')
logging.info(subw_outputs)
###########
# ONSTART #
###########
# piece of code to be executed before the pipeline starts
onstart:
logging.info('Starting MelArray Snakemake pipeline ...')
logging.info("IDs to process:" + str(IDs))
# check the run mode, e.g. whether or not we should execute commands inside a container:
# running inside a single container with all dependencies available: no call to Singularity
if config['main']['run_mode'] == 'run_inside_single':
logging.info('Running inside a container (located at {} on host) ...'.format(paths['container']))
# if the pipeline runs on the host but calls software inside a single container, always call that container
elif config['main']['run_mode'] == 'run_outside_single':
logging.info('Running with a single Singularity container (located at {} on host).'
.format(paths['container']))
logging.info('Command prefix: "{}" ...'.format(SINGULARITY_CMD))
else:
logging.error('Not implemented. Aborting.')
exit(1)
#########
# RULES #
#########
# final step that will gather all the required files
rule all:
input:
# the "subw_outputs" list contains the outputs of the selected subworkflows
subworkflow_outputs = subw_outputs
run:
logging.info('Pipeline completed.')
# piece of code to be executed after the pipeline ends *without* error
onsuccess:
logging.info('Pipeline completed successfully.')
# piece of code to be executed after the pipeline ends *with* an error
onerror:
logging.info('Pipeline failed.')

Event Timeline