Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F84238982
CorrelationMatrixCreatorParallel.cpp
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Sep 21, 13:47
Size
17 KB
Mime Type
text/x-c
Expires
Mon, Sep 23, 13:47 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
20967005
Attached To
R8820 scATAC-seq
CorrelationMatrixCreatorParallel.cpp
View Options
#include <string>
#include <vector>
#include <list>
#include <stdexcept>
// std::runtime_error
#include <utility>
// std::pair, std::make_pair(), std::move()
#include <functional>
// std::ref(), std::bind()
#include <seqan/bam_io.h>
// BamFileIn, BamAlignmentRecord
#include <ThreadPool.hpp>
#include <CorrelationMatrixCreator.hpp>
#include <Matrix2D.hpp>
template
<
class
T
>
std
::
ostream
&
operator
<<
(
std
::
ostream
&
stream
,
const
std
::
list
<
T
>&
l
)
{
for
(
const
auto
&
p
:
l
)
{
stream
<<
p
<<
" "
;
}
return
stream
;
}
template
<
class
T
>
std
::
ostream
&
operator
<<
(
std
::
ostream
&
stream
,
const
std
::
vector
<
T
>&
v
)
{
for
(
const
auto
&
p
:
v
)
{
stream
<<
p
<<
" "
;
}
return
stream
;
}
template
<
class
T
,
class
U
>
std
::
ostream
&
operator
<<
(
std
::
ostream
&
stream
,
const
std
::
pair
<
T
,
U
>&
p
)
{
stream
<<
"["
<<
p
.
first
<<
" "
<<
p
.
second
<<
"] "
;
return
stream
;
}
template
<
class
T
,
class
U
>
std
::
ostream
&
operator
<<
(
std
::
ostream
&
stream
,
const
std
::
unordered_map
<
T
,
U
>&
m
)
{
for
(
const
auto
&
p
:
m
)
{
stream
<<
p
<<
" "
<<
std
::
endl
;
}
return
stream
;
}
/* A lambda to sort GenomeRegion by ascending starting coordinate
*/
auto
sortByStartPos
=
[](
const
GenomeRegion
&
r1
,
const
GenomeRegion
&
r2
)
->
bool
{
return
r1
<
r2
;
}
;
CorrelationMatrixCreator
::
CorrelationMatrixCreator
(
const
std
::
string
&
bed_file_path
,
const
std
::
string
&
bam_file_path
,
const
std
::
string
&
bai_file_path
,
int
from
,
int
to
,
int
bin_size
,
MatrixCreator
::
methods
method
,
size_t
n_threads
)
:
MatrixCreator
(
bed_file_path
,
bam_file_path
,
bai_file_path
,
from
,
to
,
bin_size
,
method
,
n_threads
),
target_lists_fw
(
n_threads
),
target_lists_rv
(
n_threads
)
{
seqan
::
BedRecord
<
seqan
::
Bed3
>
bed_line
;
// compute coordinates relative to each region
this
->
compute_relative_bin_coord
()
;
size_t
n_col
=
this
->
relative_bin_coord
.
size
()
;
// compute number of regions and get valid chromosomes names
this
->
open_bed_files
()
;
this
->
open_bam_files
()
;
seqan
::
BamHeader
header
;
seqan
::
readHeader
(
header
,
this
->
bam_files
[
0
])
;
size_t
n_row
=
0
;
while
(
not
seqan
::
atEnd
(
this
->
bed_files
[
0
]))
{
seqan
::
readRecord
(
bed_line
,
this
->
bed_files
[
0
])
;
std
::
string
chrom_name
=
seqan
::
toCString
(
bed_line
.
ref
)
;
// new chromosome
if
(
this
->
chrom_map_names
.
find
(
chrom_name
)
==
this
->
chrom_map_names
.
end
())
{
int
chrom_idx
=
-
1
;
seqan
::
getIdByName
(
chrom_idx
,
seqan
::
contigNamesCache
(
seqan
::
context
(
this
->
bam_files
[
0
])),
chrom_name
)
;
this
->
chrom_map_names
[
chrom_name
]
=
chrom_idx
;
}
n_row
++
;
}
this
->
close_bed_files
()
;
// create the count matrix
this
->
matrix_counts
=
Matrix2D
<
int
>
(
n_row
,
n_col
,
0
)
;
// create the region matrix
this
->
matrix_bins
=
std
::
vector
<
std
::
vector
<
GenomeRegion
>>
(
n_row
,
std
::
vector
<
GenomeRegion
>
(
n_col
))
;
this
->
open_bed_files
()
;
size_t
i
=
0
;
while
(
not
seqan
::
atEnd
(
this
->
bed_files
[
0
]))
{
seqan
::
readRecord
(
bed_line
,
this
->
bed_files
[
0
])
;
// find the region limits
std
::
string
region_chr
=
seqan
::
toCString
(
bed_line
.
ref
)
;
int
region_len
=
bed_line
.
endPos
-
bed_line
.
beginPos
;
int
region_mid
=
bed_line
.
beginPos
+
(
region_len
/
2
)
;
// compute the absolute bins coordinates for this region
// and create the bins in this region
for
(
size_t
j
=
0
;
j
<
n_col
;
j
++
)
{
const
auto
&
relative_coord
=
this
->
relative_bin_coord
[
j
]
;
this
->
matrix_bins
[
i
][
j
]
=
GenomeRegion
(
region_chr
,
this
->
chrom_map_names
[
region_chr
],
region_mid
+
relative_coord
.
first
,
region_mid
+
relative_coord
.
second
)
;
}
i
++
;
}
this
->
close_bed_files
()
;
this
->
close_bam_files
()
;
}
CorrelationMatrixCreator
::~
CorrelationMatrixCreator
()
{
this
->
threads
.
join
()
;
this
->
close_bam_files
()
;
this
->
close_bed_files
()
;
}
/*
Matrix2D<int> CorrelationMatrixCreator::create_matrix()
{
this->open_bam_files() ;
this->open_bai_files() ;
// read BAM header
seqan::BamHeader bam_header ;
seqan::readHeader(bam_header, this->bam_files) ;
for(size_t i=0; i<this->matrix_counts.get_nrow(); i++)
{
const auto& row = this->matrix_bins[i] ;
GenomeRegion region(row.front().chromosome,
row.front().chromosome_idx,
row.front().start,
row.back().end) ;
bool jump = this->jump_upstream(region, 600) ;
if(not jump)
{ continue ; }
// read all relevant targets
this->to_downstream_target(region) ;
// update count matrix row
this->update_count_matrix(i) ;
// clean buffers
this->clear_target_lists() ;
}
this->close_bam_files() ;
return this->matrix_counts ;
}*/
/*
Matrix2D<int> CorrelationMatrixCreator::create_matrix()
{
// compute the slices on which each thread will work
std::vector<std::pair<size_t,size_t>> slices =
ThreadPool::split_range(0,
this->matrix_counts.get_nrow(),
this->n_threads) ;
// prepare the futures and promises
std::vector<std::promise<bool>> promises(this->n_threads) ;
std::vector<std::future<bool>> futures(this->n_threads) ;
for(size_t i=0; i<this->n_threads; i++)
{ futures[i] = promises[i].get_future() ; }
// open all streams
this->open_bam_files() ;
this->open_bai_files() ;
// ----------------------- threads start -----------------------
for(size_t i=0; i<this->n_threads; i++)
{ auto slice = slices[i] ;
std::cerr << "CorrelationMatrixCreator::create_matrix " << i << " [" << slice.first << " " << slice.second << ")" << std::endl ;
this->threads.addJob(std::move(
std::bind(&CorrelationMatrixCreator::create_matrix_routine,
this,
slice.first,
slice.second,
i,
std::ref(promises[i])))) ;
}
// wait for all threads to be done
for(auto& future : futures)
{ future.get() ; }
// ----------------------- threads stop ------------------------
// close all streams
this->close_bam_files() ;
return this->matrix_counts ;
}
*/
Matrix2D
<
int
>
CorrelationMatrixCreator
::
create_matrix
()
{
// prepare the futures and promises
std
::
promise
<
bool
>
promise
;
std
::
future
<
bool
>
future
=
promise
.
get_future
()
;
// open all streams
this
->
open_bam_files
()
;
this
->
open_bai_files
()
;
this
->
threads
.
addJob
(
std
::
move
(
std
::
bind
(
&
CorrelationMatrixCreator
::
create_matrix_routine
,
this
,
0
,
this
->
matrix_counts
.
get_nrow
(),
0
,
std
::
ref
(
promise
))))
;
// wait for all threads to be done
future
.
get
()
;
// ----------------------- threads stop ------------------------
// close all streams
this
->
close_bam_files
()
;
return
this
->
matrix_counts
;
}
void
CorrelationMatrixCreator
::
create_matrix_routine
(
size_t
row_from
,
size_t
row_to
,
size_t
thread_idx
,
std
::
promise
<
bool
>&
done
)
{
std
::
cerr
<<
"CorrelationMatrixCreator::create_matrix_routine "
<<
thread_idx
<<
std
::
endl
;
// read BAM header
seqan
::
BamHeader
bam_header
;
seqan
::
readHeader
(
bam_header
,
this
->
bam_files
[
thread_idx
])
;
for
(
size_t
i
=
row_from
;
i
<
row_to
;
i
++
)
{
const
auto
&
row
=
this
->
matrix_bins
[
i
]
;
GenomeRegion
region
(
row
.
front
().
chromosome
,
row
.
front
().
chromosome_idx
,
row
.
front
().
start
,
row
.
back
().
end
)
;
bool
jump
=
this
->
jump_upstream
(
region
,
600
,
thread_idx
)
;
if
(
not
jump
)
{
continue
;
}
// read all relevant targets
this
->
to_downstream_target
(
region
,
thread_idx
)
;
// update count matrix row
this
->
update_count_matrix
(
i
,
thread_idx
)
;
// clean buffers
this
->
clear_target_lists
(
thread_idx
)
;
}
// signal done working
done
.
set_value
(
true
)
;
}
bool
CorrelationMatrixCreator
::
jump_upstream
(
const
GenomeRegion
&
region
,
int
margin
,
size_t
thread_idx
)
{
bool
has_alignment
=
false
;
int
rID
=
-
10
;
if
(
this
->
chrom_map_names
.
find
(
region
.
chromosome
)
!=
this
->
chrom_map_names
.
end
())
{
rID
=
this
->
chrom_map_names
[
region
.
chromosome
]
;
}
else
{
char
msg
[
4096
]
;
sprintf
(
msg
,
"Error! chromosome %s is not linked with a valid ID in BAM file"
,
region
.
chromosome
.
c_str
())
;
std
::
cerr
<<
msg
<<
std
::
endl
;
return
false
;
}
int
start
=
std
::
max
(
0
,
region
.
start
-
margin
)
;
int
end
=
start
+
1
;
bool
jump
=
seqan
::
jumpToRegion
(
this
->
bam_files
[
thread_idx
],
has_alignment
,
rID
,
start
,
end
,
this
->
bai_files
[
thread_idx
])
;
return
jump
;
}
void
CorrelationMatrixCreator
::
to_downstream_target
(
const
GenomeRegion
&
region
,
size_t
thread_idx
)
{
if
(
this
->
method
==
CorrelationMatrixCreator
::
methods
::
READ
or
this
->
method
==
CorrelationMatrixCreator
::
methods
::
READ_ATAC
)
{
this
->
to_downstream_read
(
region
,
thread_idx
)
;
}
else
{
this
->
to_downstream_fragment
(
region
,
thread_idx
)
;
}
}
void
CorrelationMatrixCreator
::
to_downstream_read
(
const
GenomeRegion
&
region
,
size_t
thread_idx
)
{
bool
done
=
false
;
seqan
::
BamAlignmentRecord
record
;
while
(
not
seqan
::
atEnd
(
this
->
bam_files
[
thread_idx
])
and
not
done
)
{
// QC check and transform record
seqan
::
readRecord
(
record
,
this
->
bam_files
[
thread_idx
])
;
if
(
not
CorrelationMatrixCreator
::
is_good_read
(
record
)
or
not
this
->
is_valid_chromosome
(
record
,
thread_idx
))
{
continue
;
}
GenomeRegion
target
;
try
{
if
(
this
->
method
==
CorrelationMatrixCreator
::
methods
::
READ
)
{
target
=
GenomeRegion
::
constructRead
(
record
,
this
->
bam_files
[
thread_idx
])
;
}
else
{
target
=
GenomeRegion
::
constructReadATAC
(
record
,
this
->
bam_files
[
thread_idx
])
;
}
}
catch
(
std
::
invalid_argument
&
e
)
{
// connect to cerr to write in SAM
seqan
::
BamFileOut
samFileOut
(
seqan
::
context
(
this
->
bam_files
[
thread_idx
]),
std
::
cerr
,
seqan
::
Sam
())
;
std
::
cerr
<<
"std::invalid_argument caught! could not use "
"this record as read: "
<<
std
::
endl
;
writeRecord
(
samFileOut
,
record
)
;
std
::
cerr
<<
"message was : "
<<
e
.
what
()
<<
std
::
endl
<<
std
::
endl
;
continue
;
}
// upstream -> continue
if
(
target
<
region
)
{
continue
;
}
// overlap -> store
else
if
(
target
|
region
)
{
if
(
not
seqan
::
hasFlagRC
(
record
))
{
this
->
target_lists_fw
[
thread_idx
].
push_back
(
target
)
;
}
else
{
this
->
target_lists_rv
[
thread_idx
].
push_back
(
target
)
;
}
}
// downstream -> stop
else
{
done
=
true
;
}
}
}
void
CorrelationMatrixCreator
::
to_downstream_fragment
(
const
GenomeRegion
&
region
,
size_t
thread_idx
)
{
bool
done
=
false
;
seqan
::
BamAlignmentRecord
record
;
while
(
not
seqan
::
atEnd
(
this
->
bam_files
[
thread_idx
])
and
not
done
)
{
// QC check and transform record
seqan
::
readRecord
(
record
,
this
->
bam_files
[
thread_idx
])
;
if
(
not
CorrelationMatrixCreator
::
is_good_pair
(
record
)
or
not
this
->
is_valid_chromosome
(
record
,
thread_idx
))
{
continue
;
}
GenomeRegion
target
;
try
{
target
=
GenomeRegion
::
constructFragment
(
record
,
this
->
bam_files
[
thread_idx
])
;
}
catch
(
std
::
invalid_argument
&
e
)
{
// connect to cerr to write in SAM
seqan
::
BamFileOut
samFileOut
(
seqan
::
context
(
this
->
bam_files
[
thread_idx
]),
std
::
cerr
,
seqan
::
Sam
())
;
std
::
cerr
<<
"std::invalid_argument caught! could not use "
"this record as fragment: "
<<
std
::
endl
;
writeRecord
(
samFileOut
,
record
)
;
std
::
cerr
<<
"message was : "
<<
e
.
what
()
<<
std
::
endl
<<
std
::
endl
;
continue
;
}
// upstream -> continue
if
(
target
<
region
)
{
continue
;
}
// overlap -> store
else
if
(
target
|
region
)
{
if
(
this
->
method
==
CorrelationMatrixCreator
::
methods
::
FRAGMENT_CENTER
)
{
target
=
GenomeRegion
::
constructFragmentCenter
(
record
,
this
->
bam_files
[
thread_idx
])
;
if
(
target
|
region
)
{
this
->
target_lists_fw
[
thread_idx
].
push_back
(
target
)
;
}
}
else
{
this
->
target_lists_fw
[
thread_idx
].
push_back
(
target
)
;
}
}
// downstream -> stop
else
if
(
target
>
region
)
{
// std::cerr << std::endl ;
done
=
true
;
}
}
// std::cerr << "to_downstream_fragment END" << std::endl ;
}
void
CorrelationMatrixCreator
::
clear_target_lists
(
size_t
thread_idx
)
{
this
->
target_lists_fw
[
thread_idx
].
clear
()
;
this
->
target_lists_rv
[
thread_idx
].
clear
()
;
}
/*
void CorrelationMatrixCreator::remove_upstream_targets(const GenomeRegion& region)
{ // forward targets
auto iter_fw = this->target_list_fw.cbegin() ;
while(iter_fw != this->target_list_fw.end())
{ // remove upstream reads
if(*iter_fw < region)
{ iter_fw = this->target_list_fw.erase(iter_fw) ; }
// keep overlapping reads, don't stop here
else if(*iter_fw | region)
{ iter_fw++ ; }
// stop at first read downstream
else
{ break ; }
}
// reverse targets
auto iter_rv = this->target_list_rv.cbegin() ;
while(iter_rv != this->target_list_rv.end())
{ // remove upstream reads
if(*iter_rv < region)
{ iter_rv = this->target_list_rv.erase(iter_rv) ; }
// keep overlapping reads
else if(*iter_rv | region)
{ iter_rv++ ; }
// stop at first read downstream
else
{ break ; }
}
}
*/
void
CorrelationMatrixCreator
::
update_count_matrix
(
size_t
row_index
,
size_t
thread_idx
)
{
// forward targets
for
(
const
auto
&
iter
:
this
->
target_lists_fw
[
thread_idx
])
{
auto
bin_start_end
=
CorrelationMatrixCreator
::
get_bin_indices
(
iter
,
this
->
matrix_bins
[
row_index
])
;
for
(
int
j
=
bin_start_end
.
first
;
j
<
bin_start_end
.
second
;
j
++
)
{
this
->
matrix_counts
(
row_index
,
j
)
+=
iter
.
overlap_len
(
this
->
matrix_bins
[
row_index
][
j
])
;
}
}
// reverse targets
for
(
const
auto
&
iter
:
this
->
target_lists_rv
[
thread_idx
])
{
auto
bin_start_end
=
CorrelationMatrixCreator
::
get_bin_indices
(
iter
,
this
->
matrix_bins
[
row_index
])
;
for
(
int
j
=
bin_start_end
.
first
;
j
<
bin_start_end
.
second
;
j
++
)
{
this
->
matrix_counts
(
row_index
,
j
)
+=
iter
.
overlap_len
(
this
->
matrix_bins
[
row_index
][
j
])
;
}
}
}
/*
void CorrelationMatrixCreator::update_count_matrix_naive(size_t row_index)
{ // forward targets
for(const auto& iter : target_list_fw)
{ for(size_t j=0; j<this->matrix_counts.get_ncol(); j++)
{ this->matrix_counts(row_index, j) +=
iter.overlap_len(this->matrix_bins[row_index][j]) ;
}
}
// reverse targets
for(const auto& iter : target_list_rv)
{ for(size_t j=0; j<this->matrix_counts.get_ncol(); j++)
{ this->matrix_counts(row_index, j) +=
iter.overlap_len(this->matrix_bins[row_index][j]) ;
}
}
}
*/
Event Timeline
Log In to Comment