Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F99305804
Snakefile
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, Jan 23, 07:25
Size
10 KB
Mime Type
text/x-python
Expires
Sat, Jan 25, 07:25 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
23772977
Attached To
R2915 eSCT pipeline interoperability
Snakefile
View Options
##############################
## MelArray - main pipeline ##
##############################
#
# @author: Phil Cheng, USZ (for original pipeline)
# @author: Balazs Laurenczy, ETHZ (for Snakemake pipeline)
# @date: 2017-2018
##########
# IMPORT #
##########
# import entire packages
import os, logging, platform, re
# import some specific packages
from timeit import default_timer as t
from datetime import datetime
# import the load_config function from the scripts folder
from pipeline.scripts.load_config import load_config
from pipeline.scripts.overwrite_config import overwrite_config
from pipeline.scripts.replace_in_config import replace_in_config
from pipeline.scripts.parse_ids import parse_ids
from pipeline.scripts.get_tumor_normal_dict import get_tumor_normal_dict
########
# INIT #
########
## LOGGING
logging_format = '#%(asctime)s | %(levelname)-7s | %(filename)-20s: %(message)s'
# get verbosity
verbosity = str(config.get('verbosity', 'INFO')).upper()
# if a bad verbosity was specified
if verbosity not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
logging.basicConfig(level = logging.INFO, format = logging_format)
logging.warning("Bad verbosity specified ('%s')", verbosity)
# otherwise set up logging with specified verbosity
else:
logging.basicConfig(level = getattr(logging, verbosity), format = logging_format)
## CONFIGURATION LOADING
# store the original "config" dictionary that is created from the input parameters
input_params = config
# load the basic (default) configuration file
config_file_path = "{}/pipeline/configs/config.yaml".format(os.getcwd())
config = load_config(config_file_path)
# load the configuration file from the specified configuration file, if any specified
config_file_name = input_params.get('config', None)
if config_file_name is not None:
config_file_path = "{}/pipeline/configs/config_{}.yaml".format(os.getcwd(), config_file_name)
config = overwrite_config(config, config_file_path)
# do some replacements and cleaning up in the paths
config['paths'] = replace_in_config(config['path']);
## SUBWORKFLOWS
# load the subworkflow list from the input parameter, if any
subworkflows_from_input = input_params.get('subworkflows', [])
if len(subworkflows_from_input) > 0:
subworkflows_from_input = subworkflows_from_input.split(',')
## REFERENCE GENOME
# initialize the paths for the selected/configured reference genome
ref_name = config['main']['ref']
logging.debug("Selected reference genome: %s", ref_name)
# make sure it is a valid reference
if ref_name not in ['hg19', 'hg38', 'b37']:
logging.warning('Unknown reference genome name ("%s"). Switching to default "hg19".', ref_name)
ref_name = "hg19"
# replace each reference-related path with the path corresponding to the selected reference genome
for key, value in config['path'].items():
# if the current key looks like "XXXXX_hg19" (e.g. "ref_genome_hg19" or "dbsnp_hg19")
if key.split('_')[-1] == ref_name:
# find the corresponding 'target' key ("ref_genome" or "dbsnp")
target_key = '_'.join(key.split('_')[:-1])
# replace the path at the target key with the one from the reference
config['path'][target_key] = value
logging.debug('Found reference-related path: "%s" = "%s". Setting as current path for "%s" ...',
key, value, target_key)
## VARIABLES
# store the path dictionary as a separate variable
paths = config['path']
# store the paths relative to the rule's context (might be in a container)
rule_paths = config['path']
# store the software dictionary as a separate variable
soft = config['software']
############
# RUN MODE #
############
# check the run mode, e.g. whether or not we should execute commands inside a container
# if the pipeline is fully run on the host or fully run inside a single container, not command has to be
# prefixed to the calls
if config['main']['run_mode'] == 'host' or config['main']['run_mode'] == 'run_inside_single':
logging.debug('Running without Singularity calls (run_mode = "{}") ...'.format(config['main']['run_mode']))
SINGULARITY_CMD = ''
# if the pipeline runs on the host but calls software inside a single container, always call that container
elif config['main']['run_mode'] == 'run_outside_single':
SINGULARITY_CMD = 'singularity exec -B {host_data_root}:/data -B {host_ref_root}:/ref {container} '\
.format(**paths)
logging.debug('Running with a single Singularity container (located at {} on host).'.format(paths['container']))
logging.debug('Command prefix: "{}" ...'.format(SINGULARITY_CMD))
# change the paths to be relative to the container
paths = { name: (re.sub('^' + paths['data_root'], paths['host_data_root'], value)\
if not re.match('^host', name) else value) for name, value in paths.items() }
paths = { name: (re.sub('^' + paths['ref_root'], paths['host_ref_root'], value)\
if not re.match('^host', name) else value) for name, value in paths.items() }
# other run modes are not yet supported
else:
logging.error("Run mode '{}' not supported. Aborting.".format(config['main']['run_mode']))
exit(0)
#####################
# FASTQ FILES LISTS #
#####################
logging.info('Looking for files in %s ...', paths['fastq'])
# get the list of files to process using the regexp stored in the paths['sample_pattern']
# variable, which is something like:
# [0-9]*.A-\([A-Za-z0-9-]*\)_[A-Za-z0-9_]*_R1.fastq.gz
sample_wildcards = glob_wildcards(paths['fastq'] + "/" + paths['sample_pattern'])
# extract the IDs which will be the main sample identifiers over the whole pipeline
IDs = sample_wildcards.ID
logging.info("Found {} file(s) to process ...".format(len(IDs)))
# parse the ID list depending on the input parameters (for sub-indexing for example) and
# get the new list of IDs and of FASTQ file names based on the IDs. Also get the "IDs_MD5",
# which is an unique MD5 based on the current list of IDs
IDs, IDs_MD5, FASTQ_R1_FILES, FASTQ_R2_FILES = parse_ids(IDs, sample_wildcards, input_params, paths);
########################
# TUMOR / NORMAL LISTS #
########################
# check if the tumor normal match file is present
if not os.path.exists(paths['tumor_normal_match']):
logging.error('Cannot find tumor_normal match file at "{}". Aborting.'.format(paths['tumor_normal_match']))
exit(1)
# get the dictionary containing the mapping of the matching tumor and normal samples, which
# is extracted from the file located where "paths['tumor_normal_match']" points to
TUMOR_NORMAL_DICT, NORMAL_LIST, TUMOR_LIST = get_tumor_normal_dict(IDs, paths);
#################################
## INCLUDE OTHER SUB WORKFLOWS ##
#################################
# use either the subworkflows required by the config file or what comes from the command line arguments
subworkflows_to_use = config['main']['subworkflows']
if subworkflows_to_use is None:
subworkflows_to_use = []
logging.info("Subworkflows from config: %s", subworkflows_to_use)
logging.info("Subworkflows from input: %s", subworkflows_from_input)
if subworkflows_from_input is not None and len(subworkflows_from_input) > 0:
subworkflows_to_use = subworkflows_from_input
# check if all subworklows are requested from the input
if len(subworkflows_from_input) == 1 and\
(subworkflows_from_input[0] == 'all' or subworkflows_from_input[0] == '*'):
# get the list of subworkflows from the appropriate folder
subworkflows_to_use = [ subw.replace('.snake', '') for subw in os.listdir('pipeline/subworkflows')\
if subw.endswith('.snake') ]
logging.info("Selected subworkflows: %s", subworkflows_to_use)
# this will store the required outputs from all the subworkflows, either as a dictionary or as an array
subw_outputs_dict = {}
subw_outputs = []
# loop trough all the needed subworkflows and get their outputs
for subworkflow in set(subworkflows_to_use):
# get the path to the Snakefile of the subworkflow
subworkflow_path = 'pipeline/subworkflows/{}.snake'.format(subworkflow)
# check if the Snakefile actually exists
if not os.path.exists(subworkflow_path):
logging.warning('Cannot find subworkflow "%s" at "%s". Skipping.', subworkflow, subworkflow_path)
continue
# set the required final outputs to None
subw_outputs_dict[subworkflow] = []
# if the Snakefile exists, load it. This adds the rules of that subworkflow to the main workflow but also
# generates a list of required outputs that will be stored in the dictionary "subw_outputs_dict".
include: subworkflow_path
# if no outputs were assigned, abort
if not isinstance(subw_outputs_dict[subworkflow], list) or len(subw_outputs_dict[subworkflow]) == 0:
logging.warning('No final output specified by the Snakefile at "%s". Skipping.', subworkflow_path)
continue
# if some outputs were assigned, add them to the list
subw_outputs.extend(subw_outputs_dict[subworkflow])
logging.info("IDs:")
logging.info(IDs)
logging.info('All subworkflow outputs:')
logging.info(subw_outputs)
###########
# ONSTART #
###########
# piece of code to be executed before the pipeline starts
onstart:
logging.info('Starting MelArray Snakemake pipeline ...')
logging.info("IDs to process:" + str(IDs))
# check the run mode, e.g. whether or not we should execute commands inside a container:
# running inside a single container with all dependencies available: no call to Singularity
if config['main']['run_mode'] == 'run_inside_single':
logging.info('Running inside a container (located at {} on host) ...'.format(paths['container']))
# if the pipeline runs on the host but calls software inside a single container, always call that container
elif config['main']['run_mode'] == 'run_outside_single':
logging.info('Running with a single Singularity container (located at {} on host).'
.format(paths['container']))
logging.info('Command prefix: "{}" ...'.format(SINGULARITY_CMD))
else:
logging.error('Not implemented. Aborting.')
exit(1)
#########
# RULES #
#########
# final step that will gather all the required files
rule all:
input:
# the "subw_outputs" list contains the outputs of the selected subworkflows
subworkflow_outputs = subw_outputs
run:
logging.info('Pipeline completed.')
# piece of code to be executed after the pipeline ends *without* error
onsuccess:
logging.info('Pipeline completed successfully.')
# piece of code to be executed after the pipeline ends *with* an error
onerror:
logging.info('Pipeline failed.')
Event Timeline
Log In to Comment