Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F60533886
EMEngine.cpp
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Tue, Apr 30, 22:17
Size
17 KB
Mime Type
text/x-c
Expires
Thu, May 2, 22:17 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17369103
Attached To
R8820 scATAC-seq
EMEngine.cpp
View Options
#include <EMEngine.hpp>
#include <cmath> // log(), exp(), pow()
#include <future> // std::promise, std::future
#include <utility> // std::pair, std::move()
#include <functional> // std::bind(), std::ref()
#include <Random.hpp> // rand_int_uniform()
#include <RandomNumberGenerator.hpp> // getRandomNumberGenerator()
#include <Statistics.hpp> // poisson_pmf(), normal_pmf(), sd()
#include <ConsoleProgressBar.hpp> // ConsoleProgressBar
#include <matrices.hpp>
EMEngine::EMEngine(const std::vector<matrix2d_i>& read_matrices,
const std::vector<matrix2d_i>& seq_matrices,
size_t n_class,
size_t n_iter,
size_t n_shift,
bool flip,
EMEngine::seeding_codes seeding,
const std::string& seed,
size_t n_threads)
: read_layer_list(),
sequence_layer_list(),
threads(nullptr)
{ // nb of layers
size_t n_layer_read = read_matrices.size() ;
size_t n_layer_seq = seq_matrices.size() ;
this->n_layer = n_layer_read + n_layer_seq ;
if(this->n_layer == 0)
{ throw std::invalid_argument("Error! No data layer given!") ; }
// matrices dimensions
size_t n_row = 0 ;
size_t n_col = 0 ;
if(n_layer_read)
{ n_row = read_matrices[0].size() ;
n_col = read_matrices[0][0].size() ;
}
else
{ n_row = seq_matrices[0].size() ;
n_col = seq_matrices[0][0].size() ;
}
for(const auto& matrix : read_matrices)
{ if(matrix.size() != n_row)
{ char msg[4096] ;
sprintf(msg, "Error! A read layer row number is invalid "
"(found %zu, expected %zu)!",
matrix.size(), n_row) ;
throw std::invalid_argument(msg) ;
}
else if(matrix[0].size() != n_col)
{ char msg[4096] ;
sprintf(msg, "Error! A read layer column number is invalid "
"(found %zu, expected %zu)!",
matrix.size(), n_col) ;
throw std::invalid_argument(msg) ;
}
}
for(const auto& matrix : seq_matrices)
{ if(matrix.size() != n_row)
{ char msg[4096] ;
sprintf(msg, "Error! A sequence layer row number is invalid "
"(found %zu, expected %zu)!",
matrix.size(), n_row) ;
throw std::invalid_argument(msg) ;
}
else if(matrix[0].size() != n_col)
{ char msg[4096] ;
sprintf(msg, "Error! A sequence layes column number is invalid "
"(found %zu, expected %zu)!",
matrix.size(), n_col) ;
throw std::invalid_argument(msg) ;
}
}
this->n_row = n_row ;
this->n_col = n_col ;
// class, shift, flip, iter
this->n_class = n_class ;
this->n_shift = n_shift ;
this->n_flip = flip+1 ;
this->flip = flip ;
this->n_iter = n_iter ;
// model lenth
if(this->n_col < this->n_shift)
{ char msg[4096] ;
sprintf(msg, "Error! Shift is bigger than data column number "
"(%zu / %zu)!",
this->n_shift, this->n_col) ;
throw std::invalid_argument(msg) ;
}
this->l_model = n_col - n_shift + 1 ;
// data structures
this->loglikelihood =
std::vector<matrix4d_d>(this->n_layer,
matrix4d_d(n_row,
matrix3d_d(this->n_class,
matrix2d_d(this->n_shift,
vector_d(this->n_flip, 0))))) ;
this->loglikelihood_max = matrix2d_d(this->n_layer,
vector_d(this->n_row, 0)) ;
this->loglikelihood_joint =
matrix4d_d(this->n_row,
matrix3d_d(this->n_class,
matrix2d_d(this->n_shift,
vector_d(this->n_flip, 0)))) ;
this->post_prob =
matrix4d_d(this->n_row,
matrix3d_d(this->n_class,
matrix2d_d(this->n_shift,
vector_d(this->n_flip, 0)))) ;
this->post_state_prob =
matrix3d_d(n_class,
matrix2d_d(this->n_shift,
vector_d(this->n_flip, 0))) ;
this->post_class_prob = vector_d(n_class, 0) ;
this->post_prob_rowsum = vector_d(n_row, 0) ;
this->post_prob_colsum = vector_d(n_class, 0) ;
this->post_prob_tot = 0 ;
// set random number generator seed
getRandomGenerator(seed) ;
// threads
if(n_threads)
{ this->threads = new ThreadPool(n_threads) ; }
// create read layers with initialised models
for(const auto& matrix : read_matrices)
{ // create the layer
this->read_layer_list.push_back(new ReadLayer(matrix,
this->n_class,
this->n_shift,
flip,
this->threads)) ;
// seed the models
if(seeding == EMEngine::RANDOM)
{ this->read_layer_list.back()->seed_model_randomly() ; }
else if(seeding == EMEngine::SAMPLING)
{ this->read_layer_list.back()->seed_model_sampling() ; }
else if(seeding == EMEngine::TOY)
{ this->read_layer_list.back()->seed_model_toy() ; }
}
// create read layers with initialised models
for(const auto& matrix : seq_matrices)
{ // create the layer
this->sequence_layer_list.push_back(new SequenceLayer(matrix,
this->n_class,
this->n_shift,
flip)) ;
// seed the models
if(seeding == EMEngine::RANDOM)
{ this->sequence_layer_list.back()->seed_model_randomly() ; }
else if(seeding == EMEngine::SAMPLING)
{ this->sequence_layer_list.back()->seed_model_sampling() ; }
else if(seeding == EMEngine::TOY)
{ this->sequence_layer_list.back()->seed_model_toy() ; }
}
// set the class probabilities to a uniform distribution
this->set_state_prob_uniform() ;
}
EMEngine::~EMEngine()
{ // threads
if(this->threads != nullptr)
{ this->threads->join() ;
delete this->threads ;
this->threads = nullptr ;
}
// read data and models
for(auto& ptr : this->read_layer_list)
{ if(ptr != nullptr)
{ delete ptr ;
ptr = nullptr ;
}
}
// sequence data and models
for(auto& ptr : this->sequence_layer_list)
{ if(ptr != nullptr)
{ delete ptr ;
ptr = nullptr ;
}
}
}
std::vector<matrix3d_d> EMEngine::get_read_models() const
{ std::vector<matrix3d_d> models ;
for(const auto& ptr : this->read_layer_list)
{ models.push_back(ptr->get_model()) ; }
return models ;
}
std::vector<matrix3d_d> EMEngine::get_sequence_models() const
{ std::vector<matrix3d_d> models ;
for(const auto& ptr : this->sequence_layer_list)
{ models.push_back(ptr->get_model()) ; }
return models ;
}
matrix4d_d EMEngine::get_post_prob() const
{ return this->post_prob ; }
vector_d EMEngine::get_post_class_prob() const
{ return this->post_class_prob ; }
EMEngine::exit_codes EMEngine::classify()
{
size_t bar_update_n = this->n_iter ;
ConsoleProgressBar bar(std::cerr, bar_update_n, 60, "classifying") ;
// optimize the partition
for(size_t n_iter=0; n_iter<this->n_iter; n_iter++)
{
// E-step
this->compute_loglikelihood() ;
this->compute_post_prob() ;
// M-step
this->compute_class_prob() ;
this->update_models() ;
this->center_post_state_prob() ;
bar.update() ;
}
bar.update() ; std::cerr << std::endl ;
return EMEngine::exit_codes::SUCCESS ;
}
void EMEngine::set_state_prob_uniform()
{ double sum = this->n_class * this->n_shift * this->n_flip ;
for(size_t i=0; i<this->n_class; i++)
{ for(size_t j=0; j<this->n_shift; j++)
{ for(size_t k=0; k<this->n_flip; k++)
{ this->post_state_prob[i][j][k] = 1./sum ; }
}
}
}
void EMEngine::compute_loglikelihood()
{ // compute the loglikelihood for each layer
size_t i = 0 ;
for(auto& ptr : this->read_layer_list)
{ ptr->compute_loglikelihoods(this->loglikelihood[i],
this->loglikelihood_max[i],
this->threads) ;
i++ ;
}
for(auto& ptr : this->sequence_layer_list)
{ ptr->compute_loglikelihoods(this->loglikelihood[i],
this->loglikelihood_max[i],
this->threads) ;
i++ ;
}
// sum the likelihood for each state, over all layers
// this is the "joint likelihood"
for(size_t i=0; i<this->n_row; i++)
{ for(size_t j=0; j<this->n_class; j++)
{ for(size_t k=0; k<this->n_shift; k++)
{ for(size_t l=0; l<this->n_flip; l++)
{ // reset
this->loglikelihood_joint[i][j][k][l] = 0. ;
// sum
for(size_t m=0; m<this->n_layer; m++)
{ this->loglikelihood_joint[i][j][k][l] +=
(this->loglikelihood[m][i][j][k][l] -
this->loglikelihood_max[m][i]) ;
}
}
}
}
}
}
void EMEngine::compute_post_prob()
{ // don't parallelize
if(this->threads == nullptr)
{ std::promise<vector_d> promise ;
std::future<vector_d> future = promise.get_future() ;
this->compute_post_prob_routine(0, this->n_row, promise) ;
// compute the sum of post prob and the per class sum of post prob
// from the partial results computed on each slice
this->post_prob_tot = 0. ;
this->post_prob_colsum = future.get() ;
for(const auto& prob : this->post_prob_colsum)
{ this->post_prob_tot += prob ; }
}
// parallelize
else
{ size_t n_threads = this->threads->getNThread() ;
// compute the slices on which each thread will work
std::vector<std::pair<size_t,size_t>> slices =
ThreadPool::split_range(0, this->n_row,n_threads) ;
// get promises and futures
// the function run by the threads will compute
// the partial sum per class of post_prob for the given slice
// this should be used to compute the complete sum of post_prob
// and the complete sum per class of post_prob
std::vector<std::promise<vector_d>> promises(n_threads) ;
std::vector<std::future<vector_d>> futures(n_threads) ;
for(size_t i=0; i<n_threads; i++)
{ futures[i] = promises[i].get_future() ; }
// distribute work to threads
// -------------------------- threads start --------------------------
for(size_t i=0; i<n_threads; i++)
{ auto slice = slices[i] ;
this->threads->addJob(std::move(
std::bind(&EMEngine::compute_post_prob_routine,
this,
slice.first,
slice.second,
std::ref(promises[i])))) ;
}
// wait until all threads are done working
// compute the sum of post prob and the per class sum of post prob
// from the partial results computed on each slice
this->post_prob_tot = 0. ;
this->post_prob_colsum = vector_d(this->n_class, 0.) ;
for(auto& future : futures)
{ auto probs = future.get() ;
for(size_t i=0; i<this->n_class; i++)
{ double prob = probs[i] ;
this->post_prob_colsum[i] += prob ;
this->post_prob_tot += prob ;
}
}
// -------------------------- threads stop ---------------------------
}
}
void EMEngine::compute_post_prob_routine(size_t from,
size_t to,
std::promise<vector_d>& post_prob_colsum)
{ vector_d colsums(this->n_class, 0.) ;
// reset grand total
// this->post_prob_tot = 0 ;
// this->post_prob_colsum = vector_d(n_class, 0) ;
// post prob
for(size_t i=from; i<to; i++)
{ // reset row sum to 0
this->post_prob_rowsum[i] = 0. ;
for(size_t n_class=0; n_class<this->n_class; n_class++)
{ for(size_t n_shift=0; n_shift<this->n_shift; n_shift++)
{ for(size_t n_flip=0; n_flip<this->n_flip; n_flip++)
{
double p = std::max(exp(this->loglikelihood_joint[i][n_class][n_shift][n_flip]) *
this->post_state_prob[n_class][n_shift][n_flip],
DataLayer::p_min) ;
this->post_prob[i][n_class][n_shift][n_flip] = p ;
this->post_prob_rowsum[i] += p ;
}
}
}
// normalize
for(size_t n_class=0; n_class<this->n_class; n_class++)
{ for(size_t n_shift=0; n_shift<this->n_shift; n_shift++)
{ for(size_t n_flip=0; n_flip<this->n_flip; n_flip++)
{ this->post_prob[i][n_class][n_shift][n_flip] /=
this->post_prob_rowsum[i] ;
double p = this->post_prob[i][n_class][n_shift][n_flip] ;
colsums[n_class] += p ;
// this->post_prob_colsum[n_class] += p ;
// this->post_prob_tot += p ;
}
}
}
}
post_prob_colsum.set_value(colsums) ;
}
void EMEngine::compute_class_prob()
{
for(size_t n_class=0; n_class<this->n_class; n_class++)
{ // reset total
this->post_class_prob[n_class] = 0. ;
for(size_t n_shift=0; n_shift<this->n_shift; n_shift++)
{ for(size_t flip=0; flip<this->n_flip; flip++)
{ // sum
this->post_state_prob[n_class][n_shift][flip] = 0. ;
for(size_t i=0; i<this->n_row; i++)
{ this->post_state_prob[n_class][n_shift][flip] +=
this->post_prob[i][n_class][n_shift][flip] ;
}
// normalize
this->post_state_prob[n_class][n_shift][flip] /= this->post_prob_tot ;
this->post_class_prob[n_class] += this->post_state_prob[n_class][n_shift][flip] ;
}
}
}
}
void EMEngine::update_models()
{ // read data and models
for(auto& ptr : this->read_layer_list)
{ ptr->update_model(this->post_prob,
this->post_prob_colsum,
this->threads) ;
}
// sequence data and models
for(auto& ptr : this->sequence_layer_list)
{ ptr->update_model(this->post_prob,
this->threads) ;
}
}
void EMEngine::center_post_state_prob()
{
if(this->n_shift == 1)
{ return ; }
// the possible shift states
vector_d shifts(this->n_shift) ;
std::iota(shifts.begin(), shifts.end(), 1.) ;
// the shift probabilities and the class probabilies
// (no need to norm., class_prob sums to 1)
double shifts_prob_measured_tot = 0. ;
std::vector<double> shifts_prob_measured(this->n_shift) ;
for(size_t s=0; s<this->n_shift; s++)
{ for(size_t k=0; k<this->n_class; k++)
{ for(size_t f=0; f<this->n_flip; f++)
{ shifts_prob_measured[s] += this->post_state_prob[k][s][f] ;
shifts_prob_measured_tot += this->post_state_prob[k][s][f] ;
}
}
}
// the shift mean and (biased) standard deviation
double shifts_sd = sd(shifts, shifts_prob_measured, false) ;
// the shift probabilities under the assumption that is
// distributed as a gaussian centered on
// the central shift state with sd and mean as in the data
// sd as the data
vector_d shifts_prob_centered(shifts.size(), 0.) ;
double shifts_prob_centered_tot = 0. ;
for(size_t i=0; i<shifts.size(); i++)
{ shifts_prob_centered[i] = normal_pmf(shifts[i],
(this->n_shift/2)+1, shifts_sd) ;
shifts_prob_centered_tot += shifts_prob_centered[i] ;
}
for(size_t k=0; k<this->n_class; k++)
{ for(size_t f=0; f<this->n_flip; f++)
{ for(size_t s=0; s<this->n_shift; s++)
{ this->post_state_prob[k][s][f] = this->post_class_prob[k] *
shifts_prob_centered[s] /
(this->n_flip * shifts_prob_centered_tot) ;
}
}
}
// shifts_prob_measured_tot = 0. ;
shifts_prob_measured.clear() ;
shifts_prob_measured.resize(this->n_shift) ;
for(size_t s=0; s<this->n_shift; s++)
{ for(size_t k=0; k<this->n_class; k++)
{ for(size_t f=0; f<this->n_flip; f++)
{ shifts_prob_measured[s] +=
this->post_state_prob[k][s][f] ;
}
}
}
}
Event Timeline
Log In to Comment