Page MenuHomec4science

EMEngine.cpp
No OneTemporary

File Metadata

Created
Tue, Apr 30, 22:17

EMEngine.cpp

#include <EMEngine.hpp>
#include <cmath> // log(), exp(), pow()
#include <future> // std::promise, std::future
#include <utility> // std::pair, std::move()
#include <functional> // std::bind(), std::ref()
#include <Random.hpp> // rand_int_uniform()
#include <RandomNumberGenerator.hpp> // getRandomNumberGenerator()
#include <Statistics.hpp> // poisson_pmf(), normal_pmf(), sd()
#include <ConsoleProgressBar.hpp> // ConsoleProgressBar
#include <matrices.hpp>
EMEngine::EMEngine(const std::vector<matrix2d_i>& read_matrices,
const std::vector<matrix2d_i>& seq_matrices,
size_t n_class,
size_t n_iter,
size_t n_shift,
bool flip,
EMEngine::seeding_codes seeding,
const std::string& seed,
size_t n_threads)
: read_layer_list(),
sequence_layer_list(),
threads(nullptr)
{ // nb of layers
size_t n_layer_read = read_matrices.size() ;
size_t n_layer_seq = seq_matrices.size() ;
this->n_layer = n_layer_read + n_layer_seq ;
if(this->n_layer == 0)
{ throw std::invalid_argument("Error! No data layer given!") ; }
// matrices dimensions
size_t n_row = 0 ;
size_t n_col = 0 ;
if(n_layer_read)
{ n_row = read_matrices[0].size() ;
n_col = read_matrices[0][0].size() ;
}
else
{ n_row = seq_matrices[0].size() ;
n_col = seq_matrices[0][0].size() ;
}
for(const auto& matrix : read_matrices)
{ if(matrix.size() != n_row)
{ char msg[4096] ;
sprintf(msg, "Error! A read layer row number is invalid "
"(found %zu, expected %zu)!",
matrix.size(), n_row) ;
throw std::invalid_argument(msg) ;
}
else if(matrix[0].size() != n_col)
{ char msg[4096] ;
sprintf(msg, "Error! A read layer column number is invalid "
"(found %zu, expected %zu)!",
matrix.size(), n_col) ;
throw std::invalid_argument(msg) ;
}
}
for(const auto& matrix : seq_matrices)
{ if(matrix.size() != n_row)
{ char msg[4096] ;
sprintf(msg, "Error! A sequence layer row number is invalid "
"(found %zu, expected %zu)!",
matrix.size(), n_row) ;
throw std::invalid_argument(msg) ;
}
else if(matrix[0].size() != n_col)
{ char msg[4096] ;
sprintf(msg, "Error! A sequence layes column number is invalid "
"(found %zu, expected %zu)!",
matrix.size(), n_col) ;
throw std::invalid_argument(msg) ;
}
}
this->n_row = n_row ;
this->n_col = n_col ;
// class, shift, flip, iter
this->n_class = n_class ;
this->n_shift = n_shift ;
this->n_flip = flip+1 ;
this->flip = flip ;
this->n_iter = n_iter ;
// model lenth
if(this->n_col < this->n_shift)
{ char msg[4096] ;
sprintf(msg, "Error! Shift is bigger than data column number "
"(%zu / %zu)!",
this->n_shift, this->n_col) ;
throw std::invalid_argument(msg) ;
}
this->l_model = n_col - n_shift + 1 ;
// data structures
this->loglikelihood =
std::vector<matrix4d_d>(this->n_layer,
matrix4d_d(n_row,
matrix3d_d(this->n_class,
matrix2d_d(this->n_shift,
vector_d(this->n_flip, 0))))) ;
this->loglikelihood_max = matrix2d_d(this->n_layer,
vector_d(this->n_row, 0)) ;
this->loglikelihood_joint =
matrix4d_d(this->n_row,
matrix3d_d(this->n_class,
matrix2d_d(this->n_shift,
vector_d(this->n_flip, 0)))) ;
this->post_prob =
matrix4d_d(this->n_row,
matrix3d_d(this->n_class,
matrix2d_d(this->n_shift,
vector_d(this->n_flip, 0)))) ;
this->post_state_prob =
matrix3d_d(n_class,
matrix2d_d(this->n_shift,
vector_d(this->n_flip, 0))) ;
this->post_class_prob = vector_d(n_class, 0) ;
this->post_prob_rowsum = vector_d(n_row, 0) ;
this->post_prob_colsum = vector_d(n_class, 0) ;
this->post_prob_tot = 0 ;
// set random number generator seed
getRandomGenerator(seed) ;
// threads
if(n_threads)
{ this->threads = new ThreadPool(n_threads) ; }
// create read layers with initialised models
for(const auto& matrix : read_matrices)
{ // create the layer
this->read_layer_list.push_back(new ReadLayer(matrix,
this->n_class,
this->n_shift,
flip,
this->threads)) ;
// seed the models
if(seeding == EMEngine::RANDOM)
{ this->read_layer_list.back()->seed_model_randomly() ; }
else if(seeding == EMEngine::SAMPLING)
{ this->read_layer_list.back()->seed_model_sampling() ; }
else if(seeding == EMEngine::TOY)
{ this->read_layer_list.back()->seed_model_toy() ; }
}
// create read layers with initialised models
for(const auto& matrix : seq_matrices)
{ // create the layer
this->sequence_layer_list.push_back(new SequenceLayer(matrix,
this->n_class,
this->n_shift,
flip)) ;
// seed the models
if(seeding == EMEngine::RANDOM)
{ this->sequence_layer_list.back()->seed_model_randomly() ; }
else if(seeding == EMEngine::SAMPLING)
{ this->sequence_layer_list.back()->seed_model_sampling() ; }
else if(seeding == EMEngine::TOY)
{ this->sequence_layer_list.back()->seed_model_toy() ; }
}
// set the class probabilities to a uniform distribution
this->set_state_prob_uniform() ;
}
EMEngine::~EMEngine()
{ // threads
if(this->threads != nullptr)
{ this->threads->join() ;
delete this->threads ;
this->threads = nullptr ;
}
// read data and models
for(auto& ptr : this->read_layer_list)
{ if(ptr != nullptr)
{ delete ptr ;
ptr = nullptr ;
}
}
// sequence data and models
for(auto& ptr : this->sequence_layer_list)
{ if(ptr != nullptr)
{ delete ptr ;
ptr = nullptr ;
}
}
}
std::vector<matrix3d_d> EMEngine::get_read_models() const
{ std::vector<matrix3d_d> models ;
for(const auto& ptr : this->read_layer_list)
{ models.push_back(ptr->get_model()) ; }
return models ;
}
std::vector<matrix3d_d> EMEngine::get_sequence_models() const
{ std::vector<matrix3d_d> models ;
for(const auto& ptr : this->sequence_layer_list)
{ models.push_back(ptr->get_model()) ; }
return models ;
}
matrix4d_d EMEngine::get_post_prob() const
{ return this->post_prob ; }
vector_d EMEngine::get_post_class_prob() const
{ return this->post_class_prob ; }
EMEngine::exit_codes EMEngine::classify()
{
size_t bar_update_n = this->n_iter ;
ConsoleProgressBar bar(std::cerr, bar_update_n, 60, "classifying") ;
// optimize the partition
for(size_t n_iter=0; n_iter<this->n_iter; n_iter++)
{
// E-step
this->compute_loglikelihood() ;
this->compute_post_prob() ;
// M-step
this->compute_class_prob() ;
this->update_models() ;
this->center_post_state_prob() ;
bar.update() ;
}
bar.update() ; std::cerr << std::endl ;
return EMEngine::exit_codes::SUCCESS ;
}
void EMEngine::set_state_prob_uniform()
{ double sum = this->n_class * this->n_shift * this->n_flip ;
for(size_t i=0; i<this->n_class; i++)
{ for(size_t j=0; j<this->n_shift; j++)
{ for(size_t k=0; k<this->n_flip; k++)
{ this->post_state_prob[i][j][k] = 1./sum ; }
}
}
}
void EMEngine::compute_loglikelihood()
{ // compute the loglikelihood for each layer
size_t i = 0 ;
for(auto& ptr : this->read_layer_list)
{ ptr->compute_loglikelihoods(this->loglikelihood[i],
this->loglikelihood_max[i],
this->threads) ;
i++ ;
}
for(auto& ptr : this->sequence_layer_list)
{ ptr->compute_loglikelihoods(this->loglikelihood[i],
this->loglikelihood_max[i],
this->threads) ;
i++ ;
}
// sum the likelihood for each state, over all layers
// this is the "joint likelihood"
for(size_t i=0; i<this->n_row; i++)
{ for(size_t j=0; j<this->n_class; j++)
{ for(size_t k=0; k<this->n_shift; k++)
{ for(size_t l=0; l<this->n_flip; l++)
{ // reset
this->loglikelihood_joint[i][j][k][l] = 0. ;
// sum
for(size_t m=0; m<this->n_layer; m++)
{ this->loglikelihood_joint[i][j][k][l] +=
(this->loglikelihood[m][i][j][k][l] -
this->loglikelihood_max[m][i]) ;
}
}
}
}
}
}
void EMEngine::compute_post_prob()
{ // don't parallelize
if(this->threads == nullptr)
{ std::promise<vector_d> promise ;
std::future<vector_d> future = promise.get_future() ;
this->compute_post_prob_routine(0, this->n_row, promise) ;
// compute the sum of post prob and the per class sum of post prob
// from the partial results computed on each slice
this->post_prob_tot = 0. ;
this->post_prob_colsum = future.get() ;
for(const auto& prob : this->post_prob_colsum)
{ this->post_prob_tot += prob ; }
}
// parallelize
else
{ size_t n_threads = this->threads->getNThread() ;
// compute the slices on which each thread will work
std::vector<std::pair<size_t,size_t>> slices =
ThreadPool::split_range(0, this->n_row,n_threads) ;
// get promises and futures
// the function run by the threads will compute
// the partial sum per class of post_prob for the given slice
// this should be used to compute the complete sum of post_prob
// and the complete sum per class of post_prob
std::vector<std::promise<vector_d>> promises(n_threads) ;
std::vector<std::future<vector_d>> futures(n_threads) ;
for(size_t i=0; i<n_threads; i++)
{ futures[i] = promises[i].get_future() ; }
// distribute work to threads
// -------------------------- threads start --------------------------
for(size_t i=0; i<n_threads; i++)
{ auto slice = slices[i] ;
this->threads->addJob(std::move(
std::bind(&EMEngine::compute_post_prob_routine,
this,
slice.first,
slice.second,
std::ref(promises[i])))) ;
}
// wait until all threads are done working
// compute the sum of post prob and the per class sum of post prob
// from the partial results computed on each slice
this->post_prob_tot = 0. ;
this->post_prob_colsum = vector_d(this->n_class, 0.) ;
for(auto& future : futures)
{ auto probs = future.get() ;
for(size_t i=0; i<this->n_class; i++)
{ double prob = probs[i] ;
this->post_prob_colsum[i] += prob ;
this->post_prob_tot += prob ;
}
}
// -------------------------- threads stop ---------------------------
}
}
void EMEngine::compute_post_prob_routine(size_t from,
size_t to,
std::promise<vector_d>& post_prob_colsum)
{ vector_d colsums(this->n_class, 0.) ;
// reset grand total
// this->post_prob_tot = 0 ;
// this->post_prob_colsum = vector_d(n_class, 0) ;
// post prob
for(size_t i=from; i<to; i++)
{ // reset row sum to 0
this->post_prob_rowsum[i] = 0. ;
for(size_t n_class=0; n_class<this->n_class; n_class++)
{ for(size_t n_shift=0; n_shift<this->n_shift; n_shift++)
{ for(size_t n_flip=0; n_flip<this->n_flip; n_flip++)
{
double p = std::max(exp(this->loglikelihood_joint[i][n_class][n_shift][n_flip]) *
this->post_state_prob[n_class][n_shift][n_flip],
DataLayer::p_min) ;
this->post_prob[i][n_class][n_shift][n_flip] = p ;
this->post_prob_rowsum[i] += p ;
}
}
}
// normalize
for(size_t n_class=0; n_class<this->n_class; n_class++)
{ for(size_t n_shift=0; n_shift<this->n_shift; n_shift++)
{ for(size_t n_flip=0; n_flip<this->n_flip; n_flip++)
{ this->post_prob[i][n_class][n_shift][n_flip] /=
this->post_prob_rowsum[i] ;
double p = this->post_prob[i][n_class][n_shift][n_flip] ;
colsums[n_class] += p ;
// this->post_prob_colsum[n_class] += p ;
// this->post_prob_tot += p ;
}
}
}
}
post_prob_colsum.set_value(colsums) ;
}
void EMEngine::compute_class_prob()
{
for(size_t n_class=0; n_class<this->n_class; n_class++)
{ // reset total
this->post_class_prob[n_class] = 0. ;
for(size_t n_shift=0; n_shift<this->n_shift; n_shift++)
{ for(size_t flip=0; flip<this->n_flip; flip++)
{ // sum
this->post_state_prob[n_class][n_shift][flip] = 0. ;
for(size_t i=0; i<this->n_row; i++)
{ this->post_state_prob[n_class][n_shift][flip] +=
this->post_prob[i][n_class][n_shift][flip] ;
}
// normalize
this->post_state_prob[n_class][n_shift][flip] /= this->post_prob_tot ;
this->post_class_prob[n_class] += this->post_state_prob[n_class][n_shift][flip] ;
}
}
}
}
void EMEngine::update_models()
{ // read data and models
for(auto& ptr : this->read_layer_list)
{ ptr->update_model(this->post_prob,
this->post_prob_colsum,
this->threads) ;
}
// sequence data and models
for(auto& ptr : this->sequence_layer_list)
{ ptr->update_model(this->post_prob,
this->threads) ;
}
}
void EMEngine::center_post_state_prob()
{
if(this->n_shift == 1)
{ return ; }
// the possible shift states
vector_d shifts(this->n_shift) ;
std::iota(shifts.begin(), shifts.end(), 1.) ;
// the shift probabilities and the class probabilies
// (no need to norm., class_prob sums to 1)
double shifts_prob_measured_tot = 0. ;
std::vector<double> shifts_prob_measured(this->n_shift) ;
for(size_t s=0; s<this->n_shift; s++)
{ for(size_t k=0; k<this->n_class; k++)
{ for(size_t f=0; f<this->n_flip; f++)
{ shifts_prob_measured[s] += this->post_state_prob[k][s][f] ;
shifts_prob_measured_tot += this->post_state_prob[k][s][f] ;
}
}
}
// the shift mean and (biased) standard deviation
double shifts_sd = sd(shifts, shifts_prob_measured, false) ;
// the shift probabilities under the assumption that is
// distributed as a gaussian centered on
// the central shift state with sd and mean as in the data
// sd as the data
vector_d shifts_prob_centered(shifts.size(), 0.) ;
double shifts_prob_centered_tot = 0. ;
for(size_t i=0; i<shifts.size(); i++)
{ shifts_prob_centered[i] = normal_pmf(shifts[i],
(this->n_shift/2)+1, shifts_sd) ;
shifts_prob_centered_tot += shifts_prob_centered[i] ;
}
for(size_t k=0; k<this->n_class; k++)
{ for(size_t f=0; f<this->n_flip; f++)
{ for(size_t s=0; s<this->n_shift; s++)
{ this->post_state_prob[k][s][f] = this->post_class_prob[k] *
shifts_prob_centered[s] /
(this->n_flip * shifts_prob_centered_tot) ;
}
}
}
// shifts_prob_measured_tot = 0. ;
shifts_prob_measured.clear() ;
shifts_prob_measured.resize(this->n_shift) ;
for(size_t s=0; s<this->n_shift; s++)
{ for(size_t k=0; k<this->n_class; k++)
{ for(size_t f=0; f<this->n_flip; f++)
{ shifts_prob_measured[s] +=
this->post_state_prob[k][s][f] ;
}
}
}
}

Event Timeline