Page MenuHomec4science

CorrelationMatrixCreatorParallel.cpp
No OneTemporary

File Metadata

Created
Sat, Sep 21, 13:47

CorrelationMatrixCreatorParallel.cpp

#include <string>
#include <vector>
#include <list>
#include <stdexcept> // std::runtime_error
#include <utility> // std::pair, std::make_pair(), std::move()
#include <functional> // std::ref(), std::bind()
#include <seqan/bam_io.h> // BamFileIn, BamAlignmentRecord
#include <ThreadPool.hpp>
#include <CorrelationMatrixCreator.hpp>
#include <Matrix2D.hpp>
template<class T>
std::ostream& operator << (std::ostream& stream, const std::list<T>& l)
{
for(const auto& p : l)
{ stream << p << " " ; }
return stream ;
}
template<class T>
std::ostream& operator << (std::ostream& stream, const std::vector<T>& v)
{
for(const auto& p : v)
{ stream << p << " " ; }
return stream ;
}
template<class T, class U>
std::ostream& operator << (std::ostream& stream, const std::pair<T,U>& p)
{
stream << "[" << p.first << " " << p.second << "] " ;
return stream ;
}
template<class T, class U>
std::ostream& operator << (std::ostream& stream, const std::unordered_map<T,U>& m)
{
for(const auto& p : m)
{ stream << p << " " << std::endl; }
return stream ;
}
/* A lambda to sort GenomeRegion by ascending starting coordinate
*/
auto sortByStartPos = [](const GenomeRegion& r1, const GenomeRegion& r2) -> bool
{ return r1 < r2 ;
} ;
CorrelationMatrixCreator::CorrelationMatrixCreator(const std::string& bed_file_path,
const std::string& bam_file_path,
const std::string& bai_file_path,
int from,
int to,
int bin_size,
MatrixCreator::methods method,
size_t n_threads)
: MatrixCreator(bed_file_path,
bam_file_path,
bai_file_path,
from,
to,
bin_size,
method,
n_threads),
target_lists_fw(n_threads),
target_lists_rv(n_threads)
{
seqan::BedRecord<seqan::Bed3> bed_line ;
// compute coordinates relative to each region
this->compute_relative_bin_coord() ;
size_t n_col = this->relative_bin_coord.size() ;
// compute number of regions and get valid chromosomes names
this->open_bed_files() ;
this->open_bam_files() ;
seqan::BamHeader header ;
seqan::readHeader(header, this->bam_files[0]) ;
size_t n_row = 0 ;
while(not seqan::atEnd(this->bed_files[0]))
{ seqan::readRecord(bed_line, this->bed_files[0]) ;
std::string chrom_name = seqan::toCString(bed_line.ref) ;
// new chromosome
if(this->chrom_map_names.find(chrom_name) ==
this->chrom_map_names.end())
{ int chrom_idx = -1 ;
seqan::getIdByName(chrom_idx,
seqan::contigNamesCache(seqan::context(this->bam_files[0])),
chrom_name) ;
this->chrom_map_names[chrom_name] = chrom_idx ;
}
n_row++ ;
}
this->close_bed_files() ;
// create the count matrix
this->matrix_counts = Matrix2D<int>(n_row, n_col, 0) ;
// create the region matrix
this->matrix_bins =
std::vector<std::vector<GenomeRegion>>
(n_row,std::vector<GenomeRegion>(n_col)) ;
this->open_bed_files() ;
size_t i = 0 ;
while(not seqan::atEnd(this->bed_files[0]))
{ seqan::readRecord(bed_line, this->bed_files[0]) ;
// find the region limits
std::string region_chr = seqan::toCString(bed_line.ref) ;
int region_len = bed_line.endPos - bed_line.beginPos ;
int region_mid = bed_line.beginPos + (region_len / 2) ;
// compute the absolute bins coordinates for this region
// and create the bins in this region
for(size_t j=0; j<n_col; j++)
{ const auto& relative_coord = this->relative_bin_coord[j] ;
this->matrix_bins[i][j] =
GenomeRegion(region_chr,
this->chrom_map_names[region_chr],
region_mid + relative_coord.first,
region_mid + relative_coord.second) ;
}
i++ ;
}
this->close_bed_files() ;
this->close_bam_files() ;
}
CorrelationMatrixCreator::~CorrelationMatrixCreator()
{ this->threads.join() ;
this->close_bam_files() ;
this->close_bed_files() ;
}
/*
Matrix2D<int> CorrelationMatrixCreator::create_matrix()
{
this->open_bam_files() ;
this->open_bai_files() ;
// read BAM header
seqan::BamHeader bam_header ;
seqan::readHeader(bam_header, this->bam_files) ;
for(size_t i=0; i<this->matrix_counts.get_nrow(); i++)
{
const auto& row = this->matrix_bins[i] ;
GenomeRegion region(row.front().chromosome,
row.front().chromosome_idx,
row.front().start,
row.back().end) ;
bool jump = this->jump_upstream(region, 600) ;
if(not jump)
{ continue ; }
// read all relevant targets
this->to_downstream_target(region) ;
// update count matrix row
this->update_count_matrix(i) ;
// clean buffers
this->clear_target_lists() ;
}
this->close_bam_files() ;
return this->matrix_counts ;
}*/
/*
Matrix2D<int> CorrelationMatrixCreator::create_matrix()
{
// compute the slices on which each thread will work
std::vector<std::pair<size_t,size_t>> slices =
ThreadPool::split_range(0,
this->matrix_counts.get_nrow(),
this->n_threads) ;
// prepare the futures and promises
std::vector<std::promise<bool>> promises(this->n_threads) ;
std::vector<std::future<bool>> futures(this->n_threads) ;
for(size_t i=0; i<this->n_threads; i++)
{ futures[i] = promises[i].get_future() ; }
// open all streams
this->open_bam_files() ;
this->open_bai_files() ;
// ----------------------- threads start -----------------------
for(size_t i=0; i<this->n_threads; i++)
{ auto slice = slices[i] ;
std::cerr << "CorrelationMatrixCreator::create_matrix " << i << " [" << slice.first << " " << slice.second << ")" << std::endl ;
this->threads.addJob(std::move(
std::bind(&CorrelationMatrixCreator::create_matrix_routine,
this,
slice.first,
slice.second,
i,
std::ref(promises[i])))) ;
}
// wait for all threads to be done
for(auto& future : futures)
{ future.get() ; }
// ----------------------- threads stop ------------------------
// close all streams
this->close_bam_files() ;
return this->matrix_counts ;
}
*/
Matrix2D<int> CorrelationMatrixCreator::create_matrix()
{
// prepare the futures and promises
std::promise<bool> promise ;
std::future<bool> future = promise.get_future() ;
// open all streams
this->open_bam_files() ;
this->open_bai_files() ;
this->threads.addJob(std::move(
std::bind(&CorrelationMatrixCreator::create_matrix_routine,
this,
0,
this->matrix_counts.get_nrow(),
0,
std::ref(promise)))) ;
// wait for all threads to be done
future.get() ;
// ----------------------- threads stop ------------------------
// close all streams
this->close_bam_files() ;
return this->matrix_counts ;
}
void CorrelationMatrixCreator::create_matrix_routine(size_t row_from,
size_t row_to,
size_t thread_idx,
std::promise<bool>& done)
{ std::cerr << "CorrelationMatrixCreator::create_matrix_routine " << thread_idx << std::endl ;
// read BAM header
seqan::BamHeader bam_header ;
seqan::readHeader(bam_header, this->bam_files[thread_idx]) ;
for(size_t i=row_from; i<row_to; i++)
{
const auto& row = this->matrix_bins[i] ;
GenomeRegion region(row.front().chromosome,
row.front().chromosome_idx,
row.front().start,
row.back().end) ;
bool jump = this->jump_upstream(region, 600, thread_idx) ;
if(not jump)
{ continue ; }
// read all relevant targets
this->to_downstream_target(region, thread_idx) ;
// update count matrix row
this->update_count_matrix(i, thread_idx) ;
// clean buffers
this->clear_target_lists(thread_idx) ;
}
// signal done working
done.set_value(true) ;
}
bool CorrelationMatrixCreator::jump_upstream(const GenomeRegion& region,
int margin,
size_t thread_idx)
{ bool has_alignment = false ;
int rID = -10 ;
if(this->chrom_map_names.find(region.chromosome) !=
this->chrom_map_names.end())
{ rID = this->chrom_map_names[region.chromosome] ; }
else
{ char msg[4096] ;
sprintf(msg, "Error! chromosome %s is not linked with a valid ID in BAM file",
region.chromosome.c_str()) ;
std::cerr << msg << std::endl ;
return false ;
}
int start = std::max(0, region.start - margin) ;
int end = start + 1 ;
bool jump = seqan::jumpToRegion(this->bam_files[thread_idx],
has_alignment,
rID,
start,
end,
this->bai_files[thread_idx]) ;
return jump ;
}
void CorrelationMatrixCreator::to_downstream_target(const GenomeRegion& region,
size_t thread_idx)
{ if(this->method == CorrelationMatrixCreator::methods::READ or
this->method == CorrelationMatrixCreator::methods::READ_ATAC)
{ this->to_downstream_read(region, thread_idx) ; }
else
{ this->to_downstream_fragment(region, thread_idx) ; }
}
void CorrelationMatrixCreator::to_downstream_read(const GenomeRegion& region,
size_t thread_idx)
{ bool done = false ;
seqan::BamAlignmentRecord record ;
while(not seqan::atEnd(this->bam_files[thread_idx]) and
not done)
{ // QC check and transform record
seqan::readRecord(record, this->bam_files[thread_idx]) ;
if(not CorrelationMatrixCreator::is_good_read(record) or
not this->is_valid_chromosome(record, thread_idx))
{ continue ; }
GenomeRegion target ;
try
{ if(this->method == CorrelationMatrixCreator::methods::READ)
{ target = GenomeRegion::constructRead(record, this->bam_files[thread_idx]) ; }
else
{ target = GenomeRegion::constructReadATAC(record, this->bam_files[thread_idx]) ; }
}
catch(std::invalid_argument& e)
{ // connect to cerr to write in SAM
seqan::BamFileOut samFileOut(seqan::context(this->bam_files[thread_idx]),
std::cerr,
seqan::Sam()) ;
std::cerr << "std::invalid_argument caught! could not use "
"this record as read: " << std::endl ;
writeRecord(samFileOut, record) ;
std::cerr << "message was : " << e.what() << std::endl << std::endl ;
continue ;
}
// upstream -> continue
if(target < region)
{ continue ; }
// overlap -> store
else if(target | region)
{ if(not seqan::hasFlagRC(record))
{ this->target_lists_fw[thread_idx].push_back(target) ; }
else
{ this->target_lists_rv[thread_idx].push_back(target) ; }
}
// downstream -> stop
else
{ done = true ; }
}
}
void CorrelationMatrixCreator::to_downstream_fragment(const GenomeRegion& region,
size_t thread_idx)
{
bool done = false ;
seqan::BamAlignmentRecord record ;
while(not seqan::atEnd(this->bam_files[thread_idx]) and
not done)
{ // QC check and transform record
seqan::readRecord(record, this->bam_files[thread_idx]) ;
if(not CorrelationMatrixCreator::is_good_pair(record) or
not this->is_valid_chromosome(record, thread_idx))
{ continue ; }
GenomeRegion target ;
try
{ target = GenomeRegion::constructFragment(record, this->bam_files[thread_idx]) ; }
catch(std::invalid_argument& e)
{ // connect to cerr to write in SAM
seqan::BamFileOut samFileOut(seqan::context(this->bam_files[thread_idx]),
std::cerr,
seqan::Sam()) ;
std::cerr << "std::invalid_argument caught! could not use "
"this record as fragment: " << std::endl ;
writeRecord(samFileOut, record) ;
std::cerr << "message was : " << e.what() << std::endl << std::endl ;
continue ;
}
// upstream -> continue
if(target < region)
{ continue ; }
// overlap -> store
else if(target | region)
{ if(this->method == CorrelationMatrixCreator::methods::FRAGMENT_CENTER)
{ target = GenomeRegion::constructFragmentCenter(record,
this->bam_files[thread_idx]) ;
if(target | region)
{ this->target_lists_fw[thread_idx].push_back(target) ; }
}
else
{ this->target_lists_fw[thread_idx].push_back(target) ; }
}
// downstream -> stop
else if(target > region)
{ // std::cerr << std::endl ;
done = true ;
}
}
// std::cerr << "to_downstream_fragment END" << std::endl ;
}
void CorrelationMatrixCreator::clear_target_lists(size_t thread_idx)
{ this->target_lists_fw[thread_idx].clear() ;
this->target_lists_rv[thread_idx].clear() ;
}
/*
void CorrelationMatrixCreator::remove_upstream_targets(const GenomeRegion& region)
{ // forward targets
auto iter_fw = this->target_list_fw.cbegin() ;
while(iter_fw != this->target_list_fw.end())
{ // remove upstream reads
if(*iter_fw < region)
{ iter_fw = this->target_list_fw.erase(iter_fw) ; }
// keep overlapping reads, don't stop here
else if(*iter_fw | region)
{ iter_fw++ ; }
// stop at first read downstream
else
{ break ; }
}
// reverse targets
auto iter_rv = this->target_list_rv.cbegin() ;
while(iter_rv != this->target_list_rv.end())
{ // remove upstream reads
if(*iter_rv < region)
{ iter_rv = this->target_list_rv.erase(iter_rv) ; }
// keep overlapping reads
else if(*iter_rv | region)
{ iter_rv++ ; }
// stop at first read downstream
else
{ break ; }
}
}
*/
void CorrelationMatrixCreator::update_count_matrix(size_t row_index,
size_t thread_idx)
{
// forward targets
for(const auto& iter : this->target_lists_fw[thread_idx])
{ auto bin_start_end = CorrelationMatrixCreator::
get_bin_indices(iter, this->matrix_bins[row_index]) ;
for(int j=bin_start_end.first; j<bin_start_end.second; j++)
{ this->matrix_counts(row_index, j) +=
iter.overlap_len(this->matrix_bins[row_index][j]) ;
}
}
// reverse targets
for(const auto& iter : this->target_lists_rv[thread_idx])
{ auto bin_start_end = CorrelationMatrixCreator::
get_bin_indices(iter, this->matrix_bins[row_index]) ;
for(int j=bin_start_end.first; j<bin_start_end.second; j++)
{ this->matrix_counts(row_index, j) +=
iter.overlap_len(this->matrix_bins[row_index][j]) ;
}
}
}
/*
void CorrelationMatrixCreator::update_count_matrix_naive(size_t row_index)
{ // forward targets
for(const auto& iter : target_list_fw)
{ for(size_t j=0; j<this->matrix_counts.get_ncol(); j++)
{ this->matrix_counts(row_index, j) +=
iter.overlap_len(this->matrix_bins[row_index][j]) ;
}
}
// reverse targets
for(const auto& iter : target_list_rv)
{ for(size_t j=0; j<this->matrix_counts.get_ncol(); j++)
{ this->matrix_counts(row_index, j) +=
iter.overlap_len(this->matrix_bins[row_index][j]) ;
}
}
}
*/

Event Timeline