diff --git a/src/Applications/ChIPPartitioningApplication.cpp b/src/Applications/ChIPPartitioningApplication.cpp index c49b26f..f54893e 100644 --- a/src/Applications/ChIPPartitioningApplication.cpp +++ b/src/Applications/ChIPPartitioningApplication.cpp @@ -1,172 +1,170 @@ #include #include #include #include #include // std::invalid_argument #include #include // namespace po = boost::program_options ; ChIPPartitioningApplication::ChIPPartitioningApplication(int argn, char** argv) : file_read(""), file_sequence(""), n_class(0), n_iter(0), n_shift(0), flip(false), n_threads(0), seeding(EMEngine::seeding_codes::RANDOM), seed(""), runnable(true) { // parse command line options and set the fields this->parseOptions(argn, argv) ; } int ChIPPartitioningApplication::run() { if(this->runnable) { // read data std::vector read_paths ; boost::split(read_paths, this->file_read, [](char c){return c == ',';}); std::vector data_read ; for(const auto& path : read_paths) { data_read.push_back(read_matrix2d_i(path)) ; } // sequence data std::vector data_seq ; if(this->file_sequence != "") { data_seq.push_back(read_matrix2d_i(this->file_sequence)) ; } EMEngine em(data_read, data_seq, this->n_class, this->n_iter, this->n_shift, this->flip, this->seeding, - this->seed) ; + this->seed, + this->n_threads) ; em.classify() ; std::cout << em.get_post_prob() << std::endl ; std::cerr << em.get_post_class_prob() << std::endl ; return EXIT_SUCCESS ; } else { return EXIT_FAILURE ; } } void ChIPPartitioningApplication::parseOptions(int argn, char** argv) { // no option to parse if(argv == nullptr) { std::string message = "no options to parse!" ; throw std::invalid_argument(message) ; } // help messages std::string desc_msg = "\n" "ChIPPartitioning is a probabilistic partitioning algorithm that \n" "sofetly assigns genomic regions to classes given their shape \n" "of the signal over the region. The assignment probabilities \n" "are returned through stdout.\n\n" ; std::string opt_help_msg = "Produces this help message." ; - std::string opt_parallel_msg = "The number of threads dedicated to the " - "computations, by default 1." ; + std::string opt_thread_msg = "The number of threads dedicated to parallelize the computations,\n " + "by default 0 (no parallelization)." ; std::string opt_read_msg = "A coma separated list of paths to the file containing the " "read density data" ; std::string opt_seq_msg = "The path to the file containing the sequence data" ; std::string opt_iter_msg = "The number of iterations." ; std::string opt_class_msg = "The number of classes to find." ; std::string opt_shift_msg = "Enables this number of column of shifting " "freedom. By default, shifting is " "disabled (equivalent to --shift 1)." ; std::string opt_flip_msg = "Enables flipping."; std::string opt_seeding_msg = "Specify which method should be used to initialise the " "cluster references." ; std::string opt_seed_msg = "A value to seed the random number generator."; // option parser boost::program_options::variables_map vm ; boost::program_options::options_description desc(desc_msg) ; std::string seeding_tmp ; desc.add_options() - ("help,h", opt_help_msg.c_str()) + ("help,h", opt_help_msg.c_str()) - ("read", po::value(&(this->file_read)), opt_read_msg.c_str()) - ("seq", po::value(&(this->file_sequence)), opt_read_msg.c_str()) + ("read", po::value(&(this->file_read)), opt_read_msg.c_str()) + ("seq", po::value(&(this->file_sequence)), opt_read_msg.c_str()) - ("iter,i", po::value(&(this->n_iter)), opt_iter_msg.c_str()) - ("class,c", po::value(&(this->n_class)), opt_class_msg.c_str()) - ("shift,s", po::value(&(this->n_shift)), opt_shift_msg.c_str()) - ("flip", opt_flip_msg.c_str()) + ("iter,i", po::value(&(this->n_iter)), opt_iter_msg.c_str()) + ("class,c", po::value(&(this->n_class)), opt_class_msg.c_str()) + ("shift,s", po::value(&(this->n_shift)), opt_shift_msg.c_str()) + ("flip", opt_flip_msg.c_str()) - ("seeding", po::value(&(seeding_tmp)), opt_seeding_msg.c_str()) - ("seed", po::value(&(this->seed)), opt_seed_msg.c_str()) - ("parallel,p", po::value(&(this->n_threads)), opt_parallel_msg.c_str()) ; + ("seeding", po::value(&(seeding_tmp)), opt_seeding_msg.c_str()) + ("seed", po::value(&(this->seed)), opt_seed_msg.c_str()) + ("thread", po::value(&(this->n_threads)), opt_thread_msg.c_str()) ; // parse try { po::store(po::parse_command_line(argn, argv, desc), vm) ; po::notify(vm) ; } catch(std::invalid_argument& e) { std::string msg = std::string("Error! Invalid option given!\n") + std::string(e.what()) ; throw std::invalid_argument(msg) ; } catch(...) { throw std::invalid_argument("An unknown error occured while parsing the options") ; } bool help = vm.count("help") ; // checks unproper option settings if(this->file_read == "" and (not help)) { std::string msg("Error! No read density data were given (--read)!") ; throw std::invalid_argument(msg) ; } else if((seeding_tmp != "random") and (seeding_tmp != "sampling") and (seeding_tmp != "toy") and (not help)) { std::string msg("Error! Unrecognized seeding method (--seeding)!") ; throw std::invalid_argument(msg) ; } // no class given -> 1 iter if(this->n_iter == 0) { this->n_iter = 1 ; } // no shift class given -> 1 class if(this->n_class == 0) { this->n_class = 1 ; } // no shift given, value of 1 -> no shift if(this->n_shift == 0) { this->n_shift = 1 ; } // set seeding if(seeding_tmp == "random") { this->seeding = EMEngine::seeding_codes::RANDOM ; } else if(seeding_tmp == "sampling") { this->seeding = EMEngine::seeding_codes::SAMPLING ; } else if(seeding_tmp == "toy") { this->seeding = EMEngine::seeding_codes::TOY ; } // set flip if(vm.count("flip")) { this->flip = true ; } - // threads - if(this->n_threads == 0) - { this->n_threads = 1 ; } // help invoked, run() cannot be invoked if(help) { std::cout << desc << std::endl ; this->runnable = false ; return ; } // everything fine, run() can be called else { this->runnable = true ; return ; } } int main(int argn, char** argv) { ChIPPartitioningApplication app(argn, argv) ; return app.run() ; } diff --git a/src/Applications/ProbToModelApplication.cpp b/src/Applications/ProbToModelApplication.cpp index a082c3c..2f6e7f7 100644 --- a/src/Applications/ProbToModelApplication.cpp +++ b/src/Applications/ProbToModelApplication.cpp @@ -1,207 +1,203 @@ #include #include #include #include #include // std::invalid_argument, std::runtime_error #include #include #include #include namespace po = boost::program_options ; ProbToModelApplication::ProbToModelApplication(int argn, char** argv) : file_read(""), file_seq(""), file_prob(""), n_threads(0), runnable(false) { this->parseOptions(argn, argv) ; } ProbToModelApplication::~ProbToModelApplication() {} int ProbToModelApplication::run() { if(this->runnable) { // load data std::string file_data ; bool read_data = false ; bool seq_data = false ; if(this->file_read != "") { file_data = this->file_read ; read_data = true ; seq_data = false ; } else if(this->file_seq != "") { file_data = this->file_seq ; read_data = false ; seq_data = true ; } else { std::string msg("Error! Could not determine the type of the data!") ; throw std::runtime_error(msg) ; } matrix2d_i data = read_matrix2d_i(file_data) ; matrix4d_d prob = read_matrix4d_d(this->file_prob) ; if(data.size() != prob.size()) { char msg[4096] ; sprintf(msg, "Error! data and prob matrices have unequal " "row numbers (%zu / %zu)!", data.size(), prob.size()) ; throw std::runtime_error(msg) ; } else if(data[0].size() < prob[0][0].size()) { char msg[4096] ; sprintf(msg, "Error! too many shift states for the data!" "%zu shift states and %zu columns in data)!", prob[0][0].size(), data[0].size()) ; throw std::runtime_error(msg) ; } // get the data model ModelComputer* ptr = nullptr ; if(read_data) - { ptr = new ReadModelComputer(data, prob) ; } + { ptr = new ReadModelComputer(data, prob, this->n_threads) ; } else if(seq_data) - { ptr = new SequenceModelComputer(data, prob) ; } + { ptr = new SequenceModelComputer(data, prob, this->n_threads) ; } matrix2d_d model = ptr->get_model() ; delete ptr ; ptr = nullptr ; // compute the class prob size_t n_row = prob.size() ; size_t n_class = prob[0].size() ; size_t n_shift = prob[0][0].size() ; size_t n_flip = prob[0][0][0].size() ; vector_d class_prob(n_class, 0.) ; double p_tot = 0. ; for(size_t i=0; i(&(this->file_read)), opt_read_msg.c_str()) ("seq,", po::value(&(this->file_seq)), opt_seq_msg.c_str()) ("prob,", po::value(&(this->file_prob)), opt_prob_msg.c_str()) - ("parallel", po::value(&(this->n_threads)), opt_parallel_msg.c_str()) ; + ("thread", po::value(&(this->n_threads)), opt_thread_msg.c_str()) ; // parse try { po::store(po::parse_command_line(argn, argv, desc), vm) ; po::notify(vm) ; } catch(std::invalid_argument& e) { std::string msg = std::string("Error! Invalid option given!\n") + std::string(e.what()) ; throw std::invalid_argument(msg) ; } catch(...) { throw std::invalid_argument("An unknown error occured while parsing the options") ; } bool help = vm.count("help") ; // checks unproper option settings if((this->file_read == "") and (this->file_seq == "") and (not help)) { std::string msg("Error! No data file was given (--read or --seq)!") ; throw std::invalid_argument(msg) ; } else if((this->file_read != "") and (this->file_seq != "") and (not help)) { std::string msg("Error! --read and --seq are mutually exclusive!") ; throw std::invalid_argument(msg) ; } else if(this->file_prob == "" and (not help)) { std::string msg("Error! No posterior probabily file was given (--prob)!") ; throw std::invalid_argument(msg) ; } - // threads - if(this->n_threads == 0) - { this->n_threads = 1 ; } - // help invoked, run() cannot be invoked if(help) { std::cout << desc << std::endl ; this->runnable = false ; return ; } // everything fine, run() can be called else { this->runnable = true ; return ; } } int main(int argn, char** argv) { ProbToModelApplication app(argn, argv) ; return app.run() ; } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f4c66a3..9c38267 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,93 +1,98 @@ # compiler options add_compile_options(-std=c++14) add_compile_options(-O3) add_compile_options(-Wall) add_compile_options(-Wextra) add_compile_options(-Werror) add_compile_options(-Wfatal-errors) add_compile_options(-pedantic) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SEQAN_CXX_FLAGS}") add_definitions (${SEQAN_DEFINITIONS}) # include file location include_directories (${SEQAN_INCLUDE_DIRS}) include_directories("${scATACseq_SOURCE_DIR}/src/Matrix") include_directories("${scATACseq_SOURCE_DIR}/src/Clustering") include_directories("${scATACseq_SOURCE_DIR}/src/Random") include_directories("${scATACseq_SOURCE_DIR}/src/Parallel") include_directories("${scATACseq_SOURCE_DIR}/src/Statistics") include_directories("${scATACseq_SOURCE_DIR}/src/GUI") include_directories("${scATACseq_SOURCE_DIR}/src/Applications") include_directories("${scATACseq_SOURCE_DIR}/src/Matrix") include_directories("${scATACseq_SOURCE_DIR}/src/GenomicTools") include_directories("${scATACseq_SOURCE_DIR}/src/Utility") # compile modules into static libraries ## set output directory set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${scATACseq_SOURCE_DIR}/lib") ## build instructions add_library(Clustering "Clustering/DataLayer.cpp" "Clustering/ReadLayer.cpp" "Clustering/SequenceLayer.cpp" "Clustering/EMEngine.cpp" "Clustering/ModelComputer.cpp" "Clustering/ReadModelComputer.cpp" "Clustering/SequenceModelComputer.cpp") add_library(Random "Random/Random.cpp" "Random/RandomNumberGenerator.cpp") add_library(Parallel "Parallel/ThreadPool.cpp") add_library(Statistics "Statistics/Statistics.cpp") add_library(GUI "GUI/ConsoleProgressBar.cpp" "GUI/Diplayable.cpp" "GUI/Updatable.cpp") add_library(GenomicTools "GenomicTools/MatrixCreator.cpp" "GenomicTools/CorrelationMatrixCreator.cpp" "GenomicTools/GenomeRegion.cpp") add_library(Utility "Utility/matrices.cpp") ## resolve dependencies -target_link_libraries(Clustering Random Statistics GUI Parallel) +target_link_libraries(Clustering Random Statistics GUI Parallel ${SEQAN_LIBRARIES}) target_link_libraries(Parallel Threads::Threads) target_link_libraries(GenomicTools ${SEQAN_LIBRARIES}) # executables +## a toy for seqan +set(EXE_MAIN_SEQAN "main_seqan") +add_executable(${EXE_MAIN_SEQAN} "main_seqan.cpp") +target_link_libraries(${EXE_MAIN_SEQAN} ${SEQAN_LIBRARIES} GenomicTools Clustering) +set_target_properties(${EXE_MAIN_SEQAN} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${scATACseq_SOURCE_DIR}/bin") ## a toy for correlation matrix set(EXE_MAIN_CORMAT "main_cormat") add_executable(${EXE_MAIN_CORMAT} "main_cormat.cpp") -target_link_libraries(${EXE_MAIN_CORMAT} ${SEQAN_LIBRARIES} GenomicTools Clustering) +target_link_libraries(${EXE_MAIN_CORMAT} ${SEQAN_LIBRARIES} GenomicTools) set_target_properties(${EXE_MAIN_CORMAT} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${scATACseq_SOURCE_DIR}/bin") ## a toy for EM usage set(EXE_MAIN_EM "main_em") add_executable(${EXE_MAIN_EM} "main_em.cpp") target_link_libraries(${EXE_MAIN_EM} Clustering Utility) set_target_properties(${EXE_MAIN_EM} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${scATACseq_SOURCE_DIR}/bin") ## a 2nd toy for EM usage set(EXE_MAIN_EM2 "main_em2") add_executable(${EXE_MAIN_EM2} "main_em2.cpp") target_link_libraries(${EXE_MAIN_EM2} Clustering Utility) set_target_properties(${EXE_MAIN_EM2} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${scATACseq_SOURCE_DIR}/bin") ## an application to create a matrix from BED and a BAM file set(EXE_MAIN_BAMMATRIX "CorrelationMatrixCreator") add_executable(${EXE_MAIN_BAMMATRIX} "Applications/CorrelationMatrixCreatorApplication.cpp" "Applications/ApplicationInterface.cpp") target_link_libraries(${EXE_MAIN_BAMMATRIX} GenomicTools Boost::program_options) set_target_properties(${EXE_MAIN_BAMMATRIX} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${scATACseq_SOURCE_DIR}/bin") ## an ChIPPartitioning standalone set(EXE_CHIPPART "ChIPPartitioning") add_executable(${EXE_CHIPPART} "Applications/ChIPPartitioningApplication.cpp" "Applications/ApplicationInterface.cpp") target_link_libraries(${EXE_CHIPPART} Clustering Utility Boost::program_options) set_target_properties(${EXE_CHIPPART} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${scATACseq_SOURCE_DIR}/bin") ## an executable to compute classes references from the data and the post prob of ChIPPartitioning set(EXE_PROB2REF "probToModel") add_executable(${EXE_PROB2REF} "Applications/ProbToModelApplication.cpp" "Applications/ApplicationInterface.cpp") target_link_libraries(${EXE_PROB2REF} Clustering Utility Boost::program_options) set_target_properties(${EXE_PROB2REF} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${scATACseq_SOURCE_DIR}/bin") ## a test suite set(EXE_TESTS "unittests") add_executable(${EXE_TESTS} "unittests.cpp" "Unittests/unittests_matrix.cpp" "Unittests/unittests_genomictools.cpp") target_link_libraries(${EXE_TESTS} ${UNITTEST_LIB} ${SEQAN_LIBRARIES} GenomicTools) set_target_properties(${EXE_TESTS} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${scATACseq_SOURCE_DIR}/bin") diff --git a/src/Clustering/DataLayer.cpp b/src/Clustering/DataLayer.cpp index 5a84ba7..21340b2 100644 --- a/src/Clustering/DataLayer.cpp +++ b/src/Clustering/DataLayer.cpp @@ -1,141 +1,142 @@ #include #include // std::invalid_argument #include // log() #include +#include DataLayer::DataLayer() {} DataLayer::DataLayer(const matrix2d_i& data, size_t n_class, size_t n_shift, bool flip) :data(data), flip(flip), n_row(data.size()), n_col(data[0].size()), n_class(n_class), l_model(n_col - n_shift + 1), n_shift(n_shift), n_flip(flip + 1) { // models cannot be initialise here // as the number of categories depend // on the exact class } DataLayer::DataLayer(const matrix2d_i& data, const matrix3d_d& model, bool flip) : data(data), model(model), flip(flip), n_row(data.size()), n_col(data[0].size()), n_class(model.size()), l_model(model[0].size()), n_category(model[0][0].size()), n_shift(n_col - l_model + 1), n_flip(flip + 1) { // check if model is not too long if(this->n_col < this->l_model) { char msg[4096] ; sprintf(msg, "Error! model is longer than data : %zu / %zu", this->l_model, this->n_col) ; throw std::invalid_argument(msg) ; } this->n_shift = this->n_col - this->l_model + 1 ; } DataLayer::~DataLayer() {} matrix3d_d DataLayer::get_model() const { return this->model ; } void DataLayer::check_loglikelihood_dim(const matrix4d_d& loglikelihood) const { if(loglikelihood.size() != this->n_row) { char msg[4096] ; sprintf(msg, "Error! loglikelihood matrix 1st dimension is not " "equal to data row number : %zu / %zu", loglikelihood.size(), this->n_row) ; throw std::invalid_argument(msg) ; } else if(loglikelihood[0].size() != this->n_class) { char msg[4096] ; sprintf(msg, "Error! loglikelihood matrix 2nd dimension is not " "equal to model class number : %zu / %zu", loglikelihood[0].size(), this->n_class) ; throw std::invalid_argument(msg) ; } else if(loglikelihood[0][0].size() != this->n_shift) { char msg[4096] ; sprintf(msg, "Error! loglikelihood matrix 3rd dimension is not " "equal to model shift state number : %zu / %zu", loglikelihood[0][0].size(), this->n_shift) ; throw std::invalid_argument(msg) ; } else if(loglikelihood[0][0][0].size() != this->n_flip) { char msg[4096] ; sprintf(msg, "Error! loglikelihood matrix 4th dimension is not " "equal to model flip state number : %zu / %zu", loglikelihood[0][0][0].size(), this->n_flip) ; throw std::invalid_argument(msg) ; } } void DataLayer::check_loglikelihood_max_dim(const vector_d& loglikelihood_max) const { if(loglikelihood_max.size() != this->n_row) { char msg[4096] ; sprintf(msg, "Error! loglikelihood_max length is not " "equal to data row number : %zu / %zu", loglikelihood_max.size(), this->n_flip) ; throw std::invalid_argument(msg) ; } } void DataLayer::check_posterior_prob_dim(const matrix4d_d& posterior_prob) const { if(posterior_prob.size() != this->n_row) { char msg[4096] ; sprintf(msg, "Error! posterior_prob matrix 1st dimension is not " "equal to data row number : %zu / %zu", posterior_prob.size(), this->n_row) ; throw std::invalid_argument(msg) ; } else if(posterior_prob[0].size() != this->n_class) { char msg[4096] ; sprintf(msg, "Error! posterior_prob matrix 2nd dimension is not " "equal to model class number : %zu / %zu", posterior_prob[0].size(), this->n_class) ; throw std::invalid_argument(msg) ; } else if(posterior_prob[0][0].size() != this->n_shift) { char msg[4096] ; sprintf(msg, "Error! posterior_prob matrix 3rd dimension is not " "equal to model shift state number : %zu / %zu", posterior_prob[0][0].size(), this->n_shift) ; throw std::invalid_argument(msg) ; } else if(posterior_prob[0][0][0].size() != this->n_flip) { char msg[4096] ; sprintf(msg, "Error! posterior_prob matrix 4th dimension is not " "equal to model flip state number : %zu / %zu", posterior_prob[0][0][0].size(), this->n_flip) ; throw std::invalid_argument(msg) ; } } const double DataLayer::p_min = 1e-100 ; const double DataLayer::p_min_log = log(DataLayer::p_min) ; diff --git a/src/Clustering/DataLayer.cpp b/src/Clustering/DataLayer.cpp.save similarity index 100% copy from src/Clustering/DataLayer.cpp copy to src/Clustering/DataLayer.cpp.save diff --git a/src/Clustering/DataLayer.hpp b/src/Clustering/DataLayer.hpp index 7657a83..cde2156 100644 --- a/src/Clustering/DataLayer.hpp +++ b/src/Clustering/DataLayer.hpp @@ -1,229 +1,239 @@ #ifndef DATALAYER_HPP #define DATALAYER_HPP #include +#include // std::promise, std::future #include +#include /*! * \brief The DataLayer class define the basic design * to handle probabilistic models together with * their data. * A DataLayer is made of two parts : * 1) a data matrix * 2) a model * The model contains the parameters of a probabilistic * model with one or more classes that fits the data. * The data likelihood given the model can be computed * and the model can be updated given a set of * posterior probabilities representing the data * assignments to the different classes. */ class DataLayer { public: /*! * \brief the smallest acceptable probability * for computations. */ static const double p_min ; /*! * \brief the log of the smallest probability. */ static const double p_min_log ; /*! * \brief The possible flip states. */ enum flip_states{FORWARD=0, REVERSE} ; /*! * \brief Default constructor. */ DataLayer() ; /*! * \brief Constructs an object with the * given data. * An empty model is not initialised yet * as the model number of categories * depends on the final class. * \param data the data. * \param n_class the number of classes * of the model. * \param n_shift the number of shift * states of the model. * \param flip whether flipping is allowed. */ DataLayer(const matrix2d_i& data, size_t n_class, size_t n_shift, bool flip) ; /*! * \brief Constructs an object with the * given data and model. * The model dimensions set the number of * classes and the shifting freedom. * \param data the data. * \param the model. * \param flip whether flipping is allowed. */ DataLayer(const matrix2d_i& data, const matrix3d_d& model, bool flip) ; /*! * \brief Destructor. */ virtual ~DataLayer() ; /*! * \brief Sets the model values randomly. */ virtual void seed_model_randomly() = 0 ; /*! * \brief Sets the model values by * sampling rows in the data and * assigning them as initial model * values. */ virtual void seed_model_sampling() = 0 ; /*! * \brief Sets the model values by * using the first n_class rows in data. */ virtual void seed_model_toy() = 0 ; /*! * \brief Computes the log likelihood of the data * given the current model parameters. * \param loglikelihood a matrix to store the * results. It should have the following dimensions : * 1st : same as the data number of row * 2nd : same as the model number of classes * 3rd : same as the number of shifts * 4th : same as the number of flip states * \param loglikelihood_max a vector containing the * max value for each row of log_likelihood. * Its length should be equal to the data row number. + * \param threads a pointer to a thread pool to + * parallelize the computations. If nullptr is given, + * the computations are performed by the main thread. */ virtual void compute_loglikelihoods(matrix4d_d& loglikelihood, - vector_d& loglikelihood_max) const = 0 ; + vector_d& loglikelihood_max, + ThreadPool* threads=nullptr) const = 0 ; /*! * \brief Updates the model given the posterior * probabilities (the probabilities of each row * in the data to be assigned to each class, * for each shift and flip state). * \param posterior_prob the data assignment probabilities to * the different classes. + * \param threads a pointer to a thread pool to + * parallelize the computations. If nullptr is given, + * the computations are performed by the main thread. */ - virtual void update_model(const matrix4d_d& posterior_prob) = 0 ; + virtual void update_model(const matrix4d_d& posterior_prob, + ThreadPool* threads=nullptr) = 0 ; /*! * \brief Returns a copy of the current model. * \return the current model. * 1st dim : the number of classes * 2nd dim : the model length * 3rd dim : the number of value categories. */ virtual matrix3d_d get_model() const ; protected: /*! * \brief Checks the argument has compatible * dimensions with the data and models. If this is * not the case, throw a std::invalid_argument with * a relevant message. * \param logliklihood a matrix to store the * results. It should have the following dimensions : * 1st : same as the data row number * 2nd : same as the model class number * 3rd : same as the shift state number * 4th : same as the flip state number * \throw std::invalid_argument if the dimensions are * incorrect. */ void check_loglikelihood_dim(const matrix4d_d& loglikelihood) const ; /*! * \brief Checks that the argument has compatible * dimensions with the data and models. If this is * not the case, throw a std::invalid_argument with * a relevant message. * \param loglikelihood_max a vector containing the * max value for each row of log_likelihood. * It should have a length equal to the number of * the data row number. * \throw std::invalid_argument if the dimensions are * incorrect. */ void check_loglikelihood_max_dim(const vector_d& loglikelihood_max) const ; /*! * \brief Checks the argument has compatible * dimensions with the data and models. If this is * not the case, throw a std::invalid_argument with * a relevant message. * \param posterior_prob a matrix to store the * results. It should have the following dimensions : * 1st : same as the data row number * 2nd : same as the model class number * 3rd : same as the shift state number * 4th : same as the flip state number * \throw std::invalid_argument if the dimensions are * incorrect. */ void check_posterior_prob_dim(const matrix4d_d& posterior_prob) const ; /*! * \brief the data. */ matrix2d_i data ; /*! * \brief the data model. */ matrix3d_d model ; /*! * \brief whether flip is enabled. */ bool flip ; /*! * \brief the number of row in the data. */ size_t n_row ; /*! * \brief the number of columns in the data. */ size_t n_col ; /*! * \brief the number of classes in the model. */ size_t n_class ; /*! * \brief the model length, its 2nd dimension. */ size_t l_model ; /*! * \brief the number of variable categories in * the data. This is also the model 3rd * dimension. * Read counts are quantitative values and * have a number of categories equal to one * whereas as DNA sequences are made of * A,C,G,T (at least) and have 4 different * categories. */ size_t n_category ; /*! * \brief the number of shift states. */ size_t n_shift ; /*! * \brief the number of flip states. */ size_t n_flip ; } ; #endif // DATALAYER_HPP diff --git a/src/Clustering/DataLayer.hpp b/src/Clustering/DataLayer.hpp.save similarity index 100% copy from src/Clustering/DataLayer.hpp copy to src/Clustering/DataLayer.hpp.save diff --git a/src/Clustering/EMEngine.cpp b/src/Clustering/EMEngine.cpp index 8e93d20..80afb99 100644 --- a/src/Clustering/EMEngine.cpp +++ b/src/Clustering/EMEngine.cpp @@ -1,319 +1,473 @@ #include -#include // log(), exp(), pow() +#include // log(), exp(), pow() +#include // std::promise, std::future +#include // std::pair, std::move() +#include // std::bind(), std::ref() -#include // rand_int_uniform() -#include // getRandomNumberGenerator() -#include // poisson_pmf(), normal_pmf(), sd() -#include // ConsoleProgressBar +#include // rand_int_uniform() +#include // getRandomNumberGenerator() +#include // poisson_pmf(), normal_pmf(), sd() +#include // ConsoleProgressBar #include + EMEngine::EMEngine(const std::vector& read_matrices, const std::vector& seq_matrices, size_t n_class, size_t n_iter, size_t n_shift, bool flip, EMEngine::seeding_codes seeding, - const std::string& seed) - : n_layer(read_matrices.size()+seq_matrices.size()), - n_row(read_matrices[0].size()), - n_col(read_matrices[0][0].size()), - n_class(n_class), - n_shift(n_shift), - n_flip(flip+1), - n_iter(n_iter), - flip(flip), - l_model(n_col - n_shift + 1), - loglikelihood(n_layer, - matrix4d_d(n_row, - matrix3d_d(n_class, - matrix2d_d(n_shift, - vector_d(n_flip, 0))))), - loglikelihood_max(n_layer, - vector_d(n_row, 0)), - loglikelihood_joint(n_row, - matrix3d_d(n_class, - matrix2d_d(n_shift, - vector_d(n_flip, 0)))), - post_prob(n_row, - matrix3d_d(n_class, - matrix2d_d(n_shift, - vector_d(n_flip, 0)))), - post_state_prob(n_class, - matrix2d_d(n_shift, - vector_d(n_flip, 0))), - post_class_prob(n_class, 0), - post_prob_rowsum(n_row, 0), - post_prob_colsum(n_class, 0), - post_prob_tot(0), - read_layer_list(), - sequence_layer_list() -{ - // set random number generator seed + const std::string& seed, + size_t n_threads) + : read_layer_list(), + sequence_layer_list(), + threads(nullptr) + +{ // nb of layers + size_t n_layer_read = read_matrices.size() ; + size_t n_layer_seq = seq_matrices.size() ; + this->n_layer = n_layer_read + n_layer_seq ; + if(this->n_layer == 0) + { throw std::invalid_argument("Error! No data layer given!") ; } + + // matrices dimensions + size_t n_row = 0 ; + size_t n_col = 0 ; + if(n_layer_read) + { n_row = read_matrices[0].size() ; + n_col = read_matrices[0][0].size() ; + } + else + { n_row = seq_matrices[0].size() ; + n_col = seq_matrices[0][0].size() ; + } + for(const auto& matrix : read_matrices) + { if(matrix.size() != n_row) + { char msg[4096] ; + sprintf(msg, "Error! A read layer row number is invalid " + "(found %zu, expected %zu)!", + matrix.size(), n_row) ; + throw std::invalid_argument(msg) ; + } + else if(matrix[0].size() != n_col) + { char msg[4096] ; + sprintf(msg, "Error! A read layer column number is invalid " + "(found %zu, expected %zu)!", + matrix.size(), n_col) ; + throw std::invalid_argument(msg) ; + } + } + for(const auto& matrix : seq_matrices) + { if(matrix.size() != n_row) + { char msg[4096] ; + sprintf(msg, "Error! A sequence layer row number is invalid " + "(found %zu, expected %zu)!", + matrix.size(), n_row) ; + throw std::invalid_argument(msg) ; + } + else if(matrix[0].size() != n_col) + { char msg[4096] ; + sprintf(msg, "Error! A sequence layes column number is invalid " + "(found %zu, expected %zu)!", + matrix.size(), n_col) ; + throw std::invalid_argument(msg) ; + } + } + this->n_row = n_row ; + this->n_col = n_col ; + + // class, shift, flip, iter + this->n_class = n_class ; + this->n_shift = n_shift ; + this->n_flip = flip+1 ; + this->flip = flip ; + this->n_iter = n_iter ; + + // model lenth + if(this->n_col < this->n_shift) + { char msg[4096] ; + sprintf(msg, "Error! Shift is bigger than data column number " + "(%zu / %zu)!", + this->n_shift, this->n_col) ; + throw std::invalid_argument(msg) ; + } + this->l_model = n_col - n_shift + 1 ; + + // data structures + this->loglikelihood = + std::vector(this->n_layer, + matrix4d_d(n_row, + matrix3d_d(this->n_class, + matrix2d_d(this->n_shift, + vector_d(this->n_flip, 0))))) ; + this->loglikelihood_max = matrix2d_d(this->n_layer, + vector_d(this->n_row, 0)) ; + this->loglikelihood_joint = + matrix4d_d(this->n_row, + matrix3d_d(this->n_class, + matrix2d_d(this->n_shift, + vector_d(this->n_flip, 0)))) ; + this->post_prob = + matrix4d_d(this->n_row, + matrix3d_d(this->n_class, + matrix2d_d(this->n_shift, + vector_d(this->n_flip, 0)))) ; + this->post_state_prob = + matrix3d_d(n_class, + matrix2d_d(this->n_shift, + vector_d(this->n_flip, 0))) ; + this->post_class_prob = vector_d(n_class, 0) ; + this->post_prob_rowsum = vector_d(n_row, 0) ; + this->post_prob_colsum = vector_d(n_class, 0) ; + this->post_prob_tot = 0 ; + + // set random number generator seed getRandomGenerator(seed) ; + // threads + if(n_threads) + { this->threads = new ThreadPool(n_threads) ; } + // create read layers with initialised models for(const auto& matrix : read_matrices) { // create the layer this->read_layer_list.push_back(new ReadLayer(matrix, this->n_class, this->n_shift, - flip)) ; + flip, + this->threads)) ; // seed the models if(seeding == EMEngine::RANDOM) { this->read_layer_list.back()->seed_model_randomly() ; } else if(seeding == EMEngine::SAMPLING) { this->read_layer_list.back()->seed_model_sampling() ; } else if(seeding == EMEngine::TOY) { this->read_layer_list.back()->seed_model_toy() ; } } // create read layers with initialised models for(const auto& matrix : seq_matrices) { // create the layer this->sequence_layer_list.push_back(new SequenceLayer(matrix, this->n_class, this->n_shift, flip)) ; // seed the models if(seeding == EMEngine::RANDOM) { this->sequence_layer_list.back()->seed_model_randomly() ; } else if(seeding == EMEngine::SAMPLING) { this->sequence_layer_list.back()->seed_model_sampling() ; } else if(seeding == EMEngine::TOY) { this->sequence_layer_list.back()->seed_model_toy() ; } } // set the class probabilities to a uniform distribution this->set_state_prob_uniform() ; } EMEngine::~EMEngine() -{ // read data and models +{ // threads + if(this->threads != nullptr) + { this->threads->join() ; + delete this->threads ; + this->threads = nullptr ; + } + // read data and models for(auto& ptr : this->read_layer_list) { if(ptr != nullptr) { delete ptr ; ptr = nullptr ; } } // sequence data and models for(auto& ptr : this->sequence_layer_list) { if(ptr != nullptr) { delete ptr ; ptr = nullptr ; } } } std::vector EMEngine::get_read_models() const { std::vector models ; for(const auto& ptr : this->read_layer_list) { models.push_back(ptr->get_model()) ; } return models ; } std::vector EMEngine::get_sequence_models() const { std::vector models ; for(const auto& ptr : this->sequence_layer_list) { models.push_back(ptr->get_model()) ; } return models ; } matrix4d_d EMEngine::get_post_prob() const { return this->post_prob ; } vector_d EMEngine::get_post_class_prob() const { return this->post_class_prob ; } EMEngine::exit_codes EMEngine::classify() { size_t bar_update_n = this->n_iter ; ConsoleProgressBar bar(std::cerr, bar_update_n, 60, "classifying") ; // optimize the partition for(size_t n_iter=0; n_itern_iter; n_iter++) { // E-step this->compute_loglikelihood() ; this->compute_post_prob() ; // M-step this->compute_class_prob() ; this->update_models() ; this->center_post_state_prob() ; bar.update() ; } bar.update() ; std::cerr << std::endl ; return EMEngine::exit_codes::SUCCESS ; } void EMEngine::set_state_prob_uniform() { double sum = this->n_class * this->n_shift * this->n_flip ; for(size_t i=0; in_class; i++) { for(size_t j=0; jn_shift; j++) { for(size_t k=0; kn_flip; k++) { this->post_state_prob[i][j][k] = 1./sum ; } } } } void EMEngine::compute_loglikelihood() { // compute the loglikelihood for each layer size_t i = 0 ; for(auto& ptr : this->read_layer_list) { ptr->compute_loglikelihoods(this->loglikelihood[i], - this->loglikelihood_max[i]) ; + this->loglikelihood_max[i], + this->threads) ; i++ ; } for(auto& ptr : this->sequence_layer_list) { ptr->compute_loglikelihoods(this->loglikelihood[i], - this->loglikelihood_max[i]) ; + this->loglikelihood_max[i], + this->threads) ; i++ ; } // sum the likelihood for each state, over all layers // this is the "joint likelihood" for(size_t i=0; in_row; i++) { for(size_t j=0; jn_class; j++) { for(size_t k=0; kn_shift; k++) { for(size_t l=0; ln_flip; l++) { // reset this->loglikelihood_joint[i][j][k][l] = 0. ; // sum for(size_t m=0; mn_layer; m++) { this->loglikelihood_joint[i][j][k][l] += (this->loglikelihood[m][i][j][k][l] - this->loglikelihood_max[m][i]) ; } } } } } } - void EMEngine::compute_post_prob() -{ // reset grand total - this->post_prob_tot = 0 ; - this->post_prob_colsum = vector_d(n_class, 0) ; +{ // don't parallelize + if(this->threads == nullptr) + { std::promise promise ; + std::future future = promise.get_future() ; + this->compute_post_prob_routine(0, this->n_row, promise) ; + // compute the sum of post prob and the per class sum of post prob + // from the partial results computed on each slice + this->post_prob_tot = 0. ; + this->post_prob_colsum = future.get() ; + for(const auto& prob : this->post_prob_colsum) + { this->post_prob_tot += prob ; } + } + // parallelize + else + { size_t n_threads = this->threads->getNThread() ; + + // compute the slices on which each thread will work + std::vector> slices = + ThreadPool::split_range(0, this->n_row,n_threads) ; + + // get promises and futures + // the function run by the threads will compute + // the partial sum per class of post_prob for the given slice + // this should be used to compute the complete sum of post_prob + // and the complete sum per class of post_prob + std::vector> promises(n_threads) ; + std::vector> futures(n_threads) ; + for(size_t i=0; ithreads->addJob(std::move( + std::bind(&EMEngine::compute_post_prob_routine, + this, + slice.first, + slice.second, + std::ref(promises[i])))) ; + } + // wait until all threads are done working + // compute the sum of post prob and the per class sum of post prob + // from the partial results computed on each slice + this->post_prob_tot = 0. ; + this->post_prob_colsum = vector_d(this->n_class, 0.) ; + for(auto& future : futures) + { auto probs = future.get() ; + for(size_t i=0; in_class; i++) + { double prob = probs[i] ; + this->post_prob_colsum[i] += prob ; + this->post_prob_tot += prob ; + } + } + // -------------------------- threads stop --------------------------- + } +} + + +void EMEngine::compute_post_prob_routine(size_t from, + size_t to, + std::promise& post_prob_colsum) +{ vector_d colsums(this->n_class, 0.) ; + + // reset grand total + // this->post_prob_tot = 0 ; + // this->post_prob_colsum = vector_d(n_class, 0) ; + // post prob - for(size_t i=0; in_row; i++) + for(size_t i=from; ipost_prob_rowsum[i] = 0. ; for(size_t n_class=0; n_classn_class; n_class++) { for(size_t n_shift=0; n_shiftn_shift; n_shift++) { for(size_t n_flip=0; n_flipn_flip; n_flip++) { double p = std::max(exp(this->loglikelihood_joint[i][n_class][n_shift][n_flip]) * this->post_state_prob[n_class][n_shift][n_flip], DataLayer::p_min) ; this->post_prob[i][n_class][n_shift][n_flip] = p ; this->post_prob_rowsum[i] += p ; } } } // normalize for(size_t n_class=0; n_classn_class; n_class++) { for(size_t n_shift=0; n_shiftn_shift; n_shift++) { for(size_t n_flip=0; n_flipn_flip; n_flip++) { this->post_prob[i][n_class][n_shift][n_flip] /= this->post_prob_rowsum[i] ; double p = this->post_prob[i][n_class][n_shift][n_flip] ; - this->post_prob_colsum[n_class] += p ; - this->post_prob_tot += p ; + colsums[n_class] += p ; + // this->post_prob_colsum[n_class] += p ; + // this->post_prob_tot += p ; } } } } + post_prob_colsum.set_value(colsums) ; } void EMEngine::compute_class_prob() { for(size_t n_class=0; n_classn_class; n_class++) { // reset total this->post_class_prob[n_class] = 0. ; for(size_t n_shift=0; n_shiftn_shift; n_shift++) { for(size_t flip=0; flipn_flip; flip++) { // sum this->post_state_prob[n_class][n_shift][flip] = 0. ; for(size_t i=0; in_row; i++) { this->post_state_prob[n_class][n_shift][flip] += this->post_prob[i][n_class][n_shift][flip] ; } // normalize this->post_state_prob[n_class][n_shift][flip] /= this->post_prob_tot ; this->post_class_prob[n_class] += this->post_state_prob[n_class][n_shift][flip] ; } } } } void EMEngine::update_models() { // read data and models for(auto& ptr : this->read_layer_list) { ptr->update_model(this->post_prob, - this->post_prob_colsum) ; + this->post_prob_colsum, + this->threads) ; } // sequence data and models for(auto& ptr : this->sequence_layer_list) - { ptr->update_model(this->post_prob) ; + { ptr->update_model(this->post_prob, + this->threads) ; } } void EMEngine::center_post_state_prob() { if(this->n_shift == 1) { return ; } // the possible shift states vector_d shifts(this->n_shift) ; std::iota(shifts.begin(), shifts.end(), 1.) ; // the shift probabilities and the class probabilies // (no need to norm., class_prob sums to 1) double shifts_prob_measured_tot = 0. ; std::vector shifts_prob_measured(this->n_shift) ; for(size_t s=0; sn_shift; s++) { for(size_t k=0; kn_class; k++) { for(size_t f=0; fn_flip; f++) { shifts_prob_measured[s] += this->post_state_prob[k][s][f] ; shifts_prob_measured_tot += this->post_state_prob[k][s][f] ; } } } // the shift mean and (biased) standard deviation double shifts_sd = sd(shifts, shifts_prob_measured, false) ; // the shift probabilities under the assumption that is // distributed as a gaussian centered on // the central shift state with sd and mean as in the data // sd as the data vector_d shifts_prob_centered(shifts.size(), 0.) ; double shifts_prob_centered_tot = 0. ; for(size_t i=0; in_shift/2)+1, shifts_sd) ; shifts_prob_centered_tot += shifts_prob_centered[i] ; } for(size_t k=0; kn_class; k++) { for(size_t f=0; fn_flip; f++) { for(size_t s=0; sn_shift; s++) { this->post_state_prob[k][s][f] = this->post_class_prob[k] * shifts_prob_centered[s] / (this->n_flip * shifts_prob_centered_tot) ; } } } // shifts_prob_measured_tot = 0. ; shifts_prob_measured.clear() ; shifts_prob_measured.resize(this->n_shift) ; for(size_t s=0; sn_shift; s++) { for(size_t k=0; kn_class; k++) { for(size_t f=0; fn_flip; f++) { shifts_prob_measured[s] += this->post_state_prob[k][s][f] ; } } } } diff --git a/src/Clustering/EMEngine.hpp b/src/Clustering/EMEngine.hpp index 76b02e2..960ebc5 100644 --- a/src/Clustering/EMEngine.hpp +++ b/src/Clustering/EMEngine.hpp @@ -1,272 +1,298 @@ #ifndef EMENGINE_HPP #define EMENGINE_HPP +#include #include #include #include +#include // std::promise #include #include #include #include +#include /*! * \brief This class implements the iterative an expectation * maximization classification procedure to discover * patterns in ChIP-seq (and related data) data, as described * in Nair et al. 2014, Bioinformatics. * However, the classification procedure has been generalized * such that genomic regions can be partitioned according * to several different signal at the same time, instead * of just one as in the original paper. Additionally, it * is possible to include the underlying DNA sequence such * that the partitioning procedure will find i) ChIP-seq * data signal patterns and ii) DNA sequence motifs at * the same time. * To mitigate a miss-alignment of the signal/sequences in * the different regions - that is a same signal strech/sequence * motif is present in two regions but at different offsets - * the classification procedure can search protypic signals * shorter than a whole region, at each possible offset over the * region (named shift states). * To mitigate an inversion of the signal/sequence in the different * regions - that is a same signal strech/sequence motif is present * in two regions but in reverse orientation - the classification * procedure can search protypic signals in both orientation. */ class EMEngine { - public: /*! * \brief The possible seeding strategies. */ enum seeding_codes {RANDOM=0, SAMPLING, TOY} ; /*! * \brief The possible exit codes for the cluster method. * 0 the clustering procedure converged, 1 the clustering * procedure succeeded without converging, 2 the clustering * failed. */ enum exit_codes {CONVERGENCE=0, SUCCESS, FAILURE} ; public: /*! * \brief Constructs an object to partition the * region according to all the givend data layers * with the given shifting and flipping freedom. * \param read_matrices a vector containing all * the different different data densities (ChIP-seq * or related signal) for the regions of interest. * \param seq_matrices a vector containing the DNA * sequences for the regions of interest. * \param n_class the number of region classes * to search. * \param n_iter the number of optimization iterations. * \param n_shift the number of shift states allowed. * \param flip whether flipping is allowed. * \param seeding how to initialise the signal/sequence * models. * \param seed a seed to initialise the random number * generator. + * \param n_threads the number of parallel threads + * to run the computations. 0 means no parallel + * computing, everything is run on the main thread. */ EMEngine(const std::vector& read_matrices, const std::vector& seq_matrices, size_t n_class, size_t n_iter, size_t n_shift, bool flip, EMEngine::seeding_codes seeding, - const std::string& seed="") ; + const std::string& seed="", + size_t n_threads=0) ; /*! * Destructor. */ ~EMEngine() ; /*! * \brief Returns all read models. * The models are in the same order * as the data were given to the * constructor. * \return a vector containing the * models. */ std::vector get_read_models() const ; /*! * \brief Returns all sequence models. * The models are in the same order * as the data were given to the * constructor. * \return a vector containing the * models. */ std::vector get_sequence_models() const ; /*! * \brief Returns the posterior probability * of each point belonging to each class, for * each possible shift and flip state. * \return the posterior probability matrix, * with the following dimensions : * 1st dim : the data points * 2nd dim : the classes * 3rd dim : the shift states * 4th dim : the flip states */ matrix4d_d get_post_prob() const ; /*! * \brief Returns the posterior class * probabilities (the total class * probability over all shift and * flip states). * \return the posterior class * probabilities. */ vector_d get_post_class_prob() const ; /*! - * \brief Runs the model optimization and the + * \brief Runs the models optimization and the * data classification. * \return a code indicating how the optimization * ended. */ EMEngine::exit_codes classify() ; protected: /*! * \brief Sets all the state probabilities * (all shift and flip states in all classes) * to a uniform probability. */ void set_state_prob_uniform() ; /*! * \brief Computes the data log likelihood given the * current models, for all layers and the joint * likelihood for each state (the sum of the layer * likelihoods for all layers, for a given state). */ void compute_loglikelihood() ; /*! * \brief Computes the data posterior probabilties. */ void compute_post_prob() ; + /*! + * \brief The routine that effectively computes + * the posterior probabilties. + * \param from the index of the first row + * in the data to consider. + * \param to the index of the past last row + * in the data to consider. + * \param done the partial column (over the classes) + * sum of posterior probabilities. If several routines + * are running together, the colsums are retrieved by + * summing up the vectors together. + */ + void compute_post_prob_routine(size_t from, + size_t to, + std::promise& post_prob_colsum) ; + /*! * \brief Computes the class/state probabilities from the * posterior probabilities. */ void compute_class_prob() ; /*! * \brief Update the data models for all layers, given * the current posterior and class probabilities. */ void update_models() ; /*! * \brief Modifies the state probabilities in such a * way that the state probabilities are then normaly * distributed, centered on the middle shift state. * However, the overall class probabilities remain * unchanged. */ void center_post_state_prob() ; /*! * \brief the number of data layers. */ size_t n_layer ; /*! * \brief the number of rows in data. */ size_t n_row ; /*! * \brief the number of columns in data. */ size_t n_col ; /*! * \brief the number of classes. */ size_t n_class ; /*! * \brief the number of shift states. */ size_t n_shift ; /*! * \brief zhe number of flip states. */ size_t n_flip ; /*! * \brief the number of iterations. */ size_t n_iter ; /*! * \brief whther flip is allowed. */ bool flip ; /*! * \brief the length of the models. */ size_t l_model ; /*! * \brief the log likelihoods. * One per data layer. */ std::vector loglikelihood ; /*! * \brief the max log likelihood value for each row. * One per data layer. */ std::vector loglikelihood_max ; /*! * \brief the joint loglikelihood, through all * layers, state (each class for each shift * and flip state). */ matrix4d_d loglikelihood_joint ; /*! * \brief the posterior probabilities. */ matrix4d_d post_prob ; /*! * \brief the states (shift and flip in each class) * probabilities. */ matrix3d_d post_state_prob ; /*! * \brief the total prob per class. */ vector_d post_class_prob ; /*! * \brief the sum per row (data point) of post_prob. */ vector_d post_prob_rowsum ; /*! * \brief the sum per column (class) of post_prob. */ vector_d post_prob_colsum ; /*! * \brief the total of post_prob. */ double post_prob_tot ; /*! * \brief the read data and their models. */ std::list read_layer_list ; /*! * \brief the sequence data and their models. */ std::list sequence_layer_list ; + /*! + * \brief the threads. + */ + ThreadPool* threads ; } ; #endif // EMENGINE_HPP diff --git a/src/Clustering/ReadLayer.cpp b/src/Clustering/ReadLayer.cpp index 9b0e2ef..2467572 100644 --- a/src/Clustering/ReadLayer.cpp +++ b/src/Clustering/ReadLayer.cpp @@ -1,297 +1,516 @@ #include #include // std::invalid_argument #include // numeric_limits #include // log(), exp(), pow() #include +#include // std::promise, std::future +#include // std::pair, std::move() +#include // std::bind(), std::ref() #include // beta_pmf(), poisson_pmf() #include // rand_real_uniform(), rand_int_uniform() #include +#include ReadLayer::ReadLayer(const matrix2d_i& data, size_t n_class, size_t n_shift, - bool flip) + bool flip, + ThreadPool* threads) : DataLayer(data, n_class, n_shift, flip), window_means(n_row, vector_d(n_shift, 0.)) { this->n_category = 1 ; // initialise the empty model this->model = matrix3d_d(this->n_class, matrix2d_d(this->l_model, vector_d(this->n_category, 0))) ; // compute window means - this->compute_window_means() ; + this->compute_window_means(threads) ; } ReadLayer::ReadLayer(const matrix2d_i& data, const matrix3d_d& model, - bool flip) + bool flip, + ThreadPool* threads) : DataLayer(data, model, flip), window_means(n_row, vector_d(n_shift, 0.)) { // check that the model only has one category if(this->n_category > 1) { char msg[4096] ; sprintf(msg, "Error! model is expected to have length 1 on " "3rd dimension, not %zu", this->n_category) ; throw std::invalid_argument(msg) ; } // compute window means - this->compute_window_means() ; + this->compute_window_means(threads) ; } ReadLayer::~ReadLayer() {} void ReadLayer::seed_model_randomly() { // get random values from a beta distribution cannot be done using boost so // i) generate random number [0,1] x // ii) compute f(x) where f is beta distribution matrix2d_d prob(this->n_row, vector_d(this->n_class, 0.)) ; - vector_d prob_class(this->n_class, 0.) ; double tot_sum = 0. ; // sample the prob // beta distribution parameters double alpha = pow(this->n_row, -0.5) ; double beta = 1. ; for(size_t i=0; in_row; i++) { double row_sum = 0. ; for(size_t j=0; jn_class; j++) { double x = rand_real_uniform(0., 1.0) ; double p = std::max(ReadLayer::p_min, beta_pmf(x, alpha, beta)) ; prob[i][j] = p ; - prob_class[j] += p ; tot_sum += p ; row_sum += p ; } // normalize for(size_t j=0; jn_class; j++) { prob[i][j] /= row_sum ; } } - // class prob - for(auto& p : prob_class) - { p /= tot_sum ; } - // compute the refererences for(size_t i=0; in_row; i++) { for(size_t j=0; jn_class; j++) { for(size_t j_ref=0, j_dat=this->n_shift/2; j_refl_model; j_ref++, j_dat++) { this->model[j][j_ref][0] += (this->data[i][j_dat] * prob[i][j]) ; } } } - // normalize - for(size_t i=0; in_class; i++) - { for(size_t j=0; jl_model; j++) - { this->model[i][j][0] ; } - } } void ReadLayer::seed_model_sampling() { std::vector choosen(this->n_row, false) ; for(size_t i=0; in_class; ) { size_t index = rand_int_uniform(size_t(0), size_t(this->n_row-1)) ; // already choose if(choosen[index]) { ; } // not yet choosen as reference else { for(size_t j_ref=0, j_dat=this->n_shift/2; j_refl_model; j_ref++, j_dat++) { this->model[i][j_ref][0] = this->data[index][j_dat] ; } choosen[index] = true ; i++ ; } } } void ReadLayer::seed_model_toy() { // sample data to initialise the references std::vector choosen(this->n_row, false) ; for(size_t i=0; in_class; ) { size_t index = i ; // already choose if(choosen[index]) { ; } // not yet choosen as reference else { for(size_t j_ref=0, j_dat=this->n_shift/2; j_refl_model; j_ref++, j_dat++) { this->model[i][j_ref][0] = this->data[index][j_dat] ; } choosen[index] = true ; i++ ; } } } void ReadLayer::compute_loglikelihoods(matrix4d_d& loglikelihood, - vector_d& loglikelihood_max) const + vector_d& loglikelihood_max, + ThreadPool* threads) const { // dimension checks this->check_loglikelihood_dim(loglikelihood) ; this->check_loglikelihood_max_dim(loglikelihood_max) ; + // don't parallelize + if(threads == nullptr) + { std::promise promise ; + std::future future = promise.get_future() ; + this->compute_loglikelihoods_routine(0, this->n_row, + std::ref(loglikelihood), + std::ref(loglikelihood_max), + promise) ; + future.get() ; + } + // parallelize + else + { size_t n_threads = threads->getNThread() ; + // compute the slices on which each thread will work + std::vector> slices = + ThreadPool::split_range(0, this->n_row, n_threads) ; + + // get promises and futures + // the function run by the threads will simply fill the promise with + // "true" to indicate that they are done + std::vector> promises(n_threads) ; + std::vector> futures(n_threads) ; + for(size_t i=0; iaddJob(std::move( + std::bind(&ReadLayer::compute_loglikelihoods_routine, + this, + slice.first, + slice.second, + std::ref(loglikelihood), + std::ref(loglikelihood_max), + std::ref(promises[i])))) ; + } + // wait until all threads are done working + for(auto& future : futures) + { future.get() ; } + // -------------------------- threads stop --------------------------- + } +} + + +void ReadLayer::compute_loglikelihoods_routine(size_t from, + size_t to, + matrix4d_d& loglikelihood, + vector_d& loglikelihood_max, + std::promise& done) const +{ // normalize the models matrix3d_d model_norm = this->model ; for(size_t i=0; in_class; i++) { double mean = 0. ; for(size_t j=0; jl_model; j++) { mean += model_norm[i][j][0] ; } mean /= this->l_model ; for(size_t j=0; jl_model; j++) { model_norm[i][j][0] /= mean ; } } // compute log likelihood - for(size_t i=0; in_row; i++) + for(size_t i=from; i::lowest() ; for(size_t j=0; jn_class; j++) { for(size_t s_fw=0, s_rev=this->n_shift-1; s_fwn_shift; s_fw++, s_rev--) { // slice is [from_fw,to) // from_dat_fw to_dat_fw [from_dat_fw, to_dat_fw] // fw |---------->>>----------| // ----------------------------------> data // rev |----------<<<----------| [from_dat_rev, to_dat_rev] // to_dat_rev can be -1 -> int // to_dat_rev from_dat_rev // log likelihood double ll_fw = 0. ; double ll_rev = 0. ; // --------------- forward --------------- size_t from_dat_fw = s_fw ; size_t to_dat_fw = from_dat_fw + this->l_model - 1 ; // --------------- reverse --------------- size_t from_dat_rev = this->n_col - 1 - s_fw ; // size_t to_dat_rev = from_dat_rev - (this->l_model - 1) ; for(size_t j_dat_fw=from_dat_fw,j_ref_fw=0, j_dat_rev=from_dat_rev; j_dat_fwdata[i][j_dat_fw], model_norm[j][j_ref_fw][0]* this->window_means[i][s_fw])) ; ll_fw += std::max(ll, ReadLayer::p_min_log) ; // --------------- reverse --------------- if(this->flip) { ll = log(poisson_pmf(this->data[i][j_dat_rev], model_norm[j][j_ref_fw][0]* this->window_means[i][s_rev])) ; ll_rev += std::max(ll, ReadLayer::p_min_log) ; } } loglikelihood[i][j][from_dat_fw][flip_states::FORWARD] = ll_fw ; // keep track of the max per row if(ll_fw > loglikelihood_max[i]) { loglikelihood_max[i] = ll_fw ; } if(this->flip) { loglikelihood[i][j][from_dat_fw][flip_states::REVERSE] = ll_rev ; // keep track of the max per row if(ll_rev > loglikelihood_max[i]) { loglikelihood_max[i] = ll_rev ; } } } } } + done.set_value(true) ; } -void ReadLayer::update_model(const matrix4d_d& posterior_prob) + + + +void ReadLayer::update_model(const matrix4d_d& posterior_prob, + ThreadPool* threads) { // computing sum over the columns (classes) size_t n_row = posterior_prob.size() ; size_t n_class = posterior_prob[0].size() ; size_t n_shift = posterior_prob[0][0].size() ; size_t n_flip = posterior_prob[0][0][0].size() ; vector_d colsum(n_class, 0.) ; for(size_t i=0; iupdate_model(posterior_prob, colsum) ; + + // don't parallelize + if(threads == nullptr) + { std::promise promise ; + std::future future = promise.get_future() ; + this->update_model_routine(0, + this->n_row, + posterior_prob, + colsum, + promise) ; + this->model = future.get() ; + } + // parallelize + else + { size_t n_threads = threads->getNThread() ; + // compute the slices on which each thread will work + std::vector> slices = + ThreadPool::split_range(0, this->n_row, n_threads) ; + + // get promises and futures + // the function run by the threads will simply fill the promise with + // "true" to indicate that they are done + std::vector> promises(n_threads) ; + std::vector> futures(n_threads) ; + for(size_t i=0; iaddJob(std::move( + std::bind(&ReadLayer::update_model_routine, + this, + slice.first, + slice.second, + posterior_prob, + colsum, + std::ref(promises[i])))) ; + } + // reinitialise the model + this->model = matrix3d_d(this->n_class, + matrix2d_d(this->l_model, + vector_d(this->n_category, 0))) ; + // wait until all threads are done working + // and update the mode + for(auto& future : futures) + { matrix3d_d model_part = future.get() ; + for(size_t i=0; in_class; i++) + { for(size_t j=0; jl_model; j++) + { for(size_t k=0; kn_category; k++) + { this->model[i][j][k] += + model_part[i][j][k] ; + } + } + } + } + // -------------------------- threads stop --------------------------- + } } void ReadLayer::update_model(const matrix4d_d& posterior_prob, - const vector_d& posterior_prob_colsum) -{ // dimension checks + const vector_d& posterior_prob_colsum, + ThreadPool* threads) +{ + // don't parallelize + if(threads == nullptr) + { std::promise promise ; + std::future future = promise.get_future() ; + this->update_model_routine(0, + this->n_row, + posterior_prob, + posterior_prob_colsum, + promise) ; + this->model = future.get() ; + } + // parallelize + else + { size_t n_threads = threads->getNThread() ; + // compute the slices on which each thread will work + std::vector> slices = + ThreadPool::split_range(0, this->n_row, n_threads) ; + + // get promises and futures + // the function run by the threads will simply fill the promise with + // "true" to indicate that they are done + std::vector> promises(n_threads) ; + std::vector> futures(n_threads) ; + for(size_t i=0; iaddJob(std::move( + std::bind(&ReadLayer::update_model_routine, + this, + slice.first, + slice.second, + posterior_prob, + posterior_prob_colsum, + std::ref(promises[i])))) ; + } + // reinitialise the model + this->model = matrix3d_d(this->n_class, + matrix2d_d(this->l_model, + vector_d(this->n_category, 0))) ; + // wait until all threads are done working + // and update the mode + for(auto& future : futures) + { matrix3d_d model_part = future.get() ; + for(size_t i=0; in_class; i++) + { for(size_t j=0; jl_model; j++) + { for(size_t k=0; kn_category; k++) + { this->model[i][j][k] += + model_part[i][j][k] ; + } + } + } + } + // -------------------------- threads stop --------------------------- + } +} + +void ReadLayer::update_model_routine(size_t from, + size_t to, + const matrix4d_d& posterior_prob, + const vector_d& posterior_prob_colsum, + std::promise& promise) const +{ + // dimension checks this->check_posterior_prob_dim(posterior_prob) ; this->check_posterior_prob_colsum_dim(posterior_prob_colsum) ; - // update models - // reset the parameters - this->model = matrix3d_d(this->n_class, - matrix2d_d(this->l_model, - vector_d(this->n_category, 0.))) ; + // partial model + matrix3d_d model = matrix3d_d(this->n_class, + matrix2d_d(this->l_model, + vector_d(this->n_category, 0.))) ; for(size_t n_class=0; n_class < this->n_class; n_class++) - { for(size_t i=0; in_row; i++) + { for(size_t i=from; in_shift; n_shift++) { // --------------- forward --------------- int from_dat_fw = n_shift ; int to_dat_fw = from_dat_fw + this->l_model - 1 ; for(int j_dat_fw=from_dat_fw, j_ref_fw=0; j_dat_fw<=to_dat_fw; j_dat_fw++, j_ref_fw++) - { this->model[n_class][j_ref_fw][0] += + { model[n_class][j_ref_fw][0] += (posterior_prob[i][n_class][n_shift][flip_states::FORWARD] * this->data[i][j_dat_fw]) / posterior_prob_colsum[n_class] ; } // --------------- reverse --------------- if(this->flip) { int from_dat_rev = this->n_col - 1 - n_shift ; int to_dat_rev = from_dat_rev - (this->l_model - 1) ; for(int j_dat_rev=from_dat_rev, j_ref_fw=0; j_dat_rev >= to_dat_rev; j_dat_rev--, j_ref_fw++) - { this->model[n_class][j_ref_fw][0] += + { model[n_class][j_ref_fw][0] += (posterior_prob[i][n_class][n_shift][flip_states::REVERSE] * this->data[i][j_dat_rev]) / posterior_prob_colsum[n_class] ; } } } } } + promise.set_value(model) ; } -void ReadLayer::compute_window_means() +void ReadLayer::compute_window_means(ThreadPool* threads) +{ // don't parallelize + if(threads == nullptr) + { std::promise promise ; + std::future future = promise.get_future() ; + this->compute_window_means_routine(0, this->n_row, promise) ; + future.get() ; + } + // parallelize + else + { size_t n_threads = threads->getNThread() ; + // compute the slices on which each thread will work + std::vector> slices = + ThreadPool::split_range(0, this->n_row, n_threads) ; + + // get promises and futures + // the function run by the threads will simply fill the promise with + // "true" to indicate that they are done + std::vector> promises(n_threads) ; + std::vector> futures(n_threads) ; + for(size_t i=0; iaddJob(std::move( + std::bind(&ReadLayer::compute_window_means_routine, + this, + slice.first, + slice.second, + std::ref(promises[i])))) ; + } + // wait until all threads are done working + for(auto& future : futures) + { future.get() ; } + // -------------------------- threads stop --------------------------- + } +} + +void ReadLayer::compute_window_means_routine(size_t from, + size_t to, + std::promise& done) { double l_window = double(this->l_model) ; - for(size_t i=0; in_row; i++) + for(size_t i=from; in_shift; from++) { double sum = 0. ; // slice is [from,to) size_t to = from + this->l_model ; for(size_t j=from; jdata[i][j] ;} this->window_means[i][from] = sum / l_window ; } } + done.set_value(true) ; } void ReadLayer::check_posterior_prob_colsum_dim(const vector_d& posterior_prob_colsum) const { if(posterior_prob_colsum.size() != this->n_class) { char msg[4096] ; sprintf(msg, "Error! posterior_class_prob matrix size is not " "equal to model class number : %zu / %zu", posterior_prob_colsum.size(), this->n_class) ; throw std::invalid_argument(msg) ; } } diff --git a/src/Clustering/ReadLayer.hpp b/src/Clustering/ReadLayer.hpp index b5c701c..b0d7636 100644 --- a/src/Clustering/ReadLayer.hpp +++ b/src/Clustering/ReadLayer.hpp @@ -1,144 +1,227 @@ #ifndef READLAYER_HPP #define READLAYER_HPP #include #include +#include class ReadLayer : public DataLayer { public: /*! * \brief Constructs an object with the * given data and an empty (0 values) * model. * \param data the data. * \param n_class the number of classes * of the model. * \param n_shift the number of shift * states of the model. * \param flip whether flipping is allowed. + * \param threads a pointer to a thread pool to + * parallelize the computations. If nullptr is given, + * the computations are performed by the main thread. */ ReadLayer(const matrix2d_i& data, size_t n_class, size_t n_shift, - bool flip) ; + bool flip, + ThreadPool* threads = nullptr) ; /*! * \brief Construct an object with the * given data and model. * \param data the data. * \param the model. * \param flip whether flipping is allowed. + * \param threads a pointer to a thread pool to + * parallelize the computations. If nullptr is given, + * the computations are performed by the main thread. */ ReadLayer(const matrix2d_i& data, const matrix3d_d& model, - bool flip) ; + bool flip, + ThreadPool* threads = nullptr) ; /*! * Destructor */ virtual ~ReadLayer() override ; /*! * \brief Initialises the references randomly. * Generates the initial references by randomly * assigning the data to the classes using a beta * distribution. */ virtual void seed_model_randomly() override ; /*! * \brief Sets the model values by * sampling rows in the data and * assigning them as initial model * values. */ virtual void seed_model_sampling() override ; /*! * \brief Sets the model values by * using the first n_class rows in data. */ virtual void seed_model_toy() override ; /*! * \brief Computes the log likelihood of the data * given the current model parameters. * During this process, a normalized version of the * models, having a sum of signal of 1 count in average, * is used (a copy of the models is normalized, meaning * that the original models can still be retrieved the * dedicated getter). * \param logliklihood a matrix to store the * results. It should have the following dimensions : * 1st : same as the data number of row * 2nd : same as the model number of classes * 3rd : same as the number of shifts * 4th : same as the number of flip states * \param loglikelihood_max a vector containing the * max value for each row of loglikelihood. * Its length should be equal to the data row number. + * \param threads a pointer to a thread pool to + * parallelize the computations. If nullptr is given, + * the computations are performed by the main thread. * \throw std::invalid_argument if the dimensions are * incorrect. */ virtual void compute_loglikelihoods(matrix4d_d& loglikelihood, - vector_d& loglikelihood_max) const override ; + vector_d& loglikelihood_max, + ThreadPool* threads=nullptr) const override ; /*! * \brief Updates the model given the posterior * probabilities (the probabilities of each row * in the data to be assigned to each class, * for each shift and flip state). * \param posterior_prob the data assignment probabilities to * the different classes. + * \param threads a pointer to a thread pool to + * parallelize the computations. If nullptr is given, + * the computations are performed by the main thread. */ - virtual void update_model(const matrix4d_d& posterior_prob) override ; + virtual void update_model(const matrix4d_d& posterior_prob, + ThreadPool* threads=nullptr) override ; /*! * \brief Updates the model given the posterior * probabilities (the probabilities of each row * in the data to be assigned to each class, * for each shift and flip state). * This method does the same as the virtual method it * overloads. The only difference is that, for run time * gain, it is given the sum over the columns of the * posterior_prob matrix which is computed by the virtual * method. * \param posterior_prob the data assignment probabilities to * the different classes. * \param posterior_prob_colsum the sum over the columns * (classes) of the posterior_prob matrix. + * \param threads a pointer to a thread pool to + * parallelize the computations. If nullptr is given, + * the computations are performed by the main thread. */ void update_model(const matrix4d_d& posterior_prob, - const vector_d& posterior_prob_colsum) ; + const vector_d& posterior_prob_colsum, + ThreadPool* threads=nullptr) ; protected: + /*! + * \brief The routine that effectively performs the + * loglikelihood computations. + * \param from the index of the first row of the data + * to considered. + * \param to the index of the past last row of the data + * to considered. + * \param loglikelihood a matrix to store the + * results. It should have the following dimensions : + * 1st : same as the data number of row + * 2nd : same as the model number of classes + * 3rd : same as the number of shifts + * 4th : same as the number of flip states + * \param loglikelihood_max a vector containing the + * max value for each row of log_likelihood. + * Its length should be equal to the data row number. + * \param done a promise to be filled when the routine + * is done running. + */ + void compute_loglikelihoods_routine(size_t from, + size_t to, + matrix4d_d& loglikelihood, + vector_d& loglikelihood_max, + std::promise& done) const ; + + /*! + * \brief The routine that effectively update the model. + * \param from the index of the first row of the + * posterior probabilities to considered. + * \param to the index of the past last row of the + * posterior probabilities to considered. + * \param posterior_prob the data assignment probabilities + * to the different classes. + * \param + * \param promise a promise containing the partial model + * computed from the given data slice. If several routines + * work together to update the model, the promise matrices + * need to be summed up to get the final model. + */ + void update_model_routine(size_t from, + size_t to, + const matrix4d_d& posterior_prob, + const vector_d& posterior_prob_colsum, + std::promise& promise) const ; + /*! * \brief Computes the mean number of reads present in * each slice (of length l_model), in each row * of the data and store them in this->window_means. + * \param threads a pointer to a thread pool to + * parallelize the computations. If nullptr is given, + * the computations are performed by the main thread. + */ + void compute_window_means(ThreadPool* threads) ; + + /*! + * \brief The routine that effectively computes the + * window means. + * \param from the index of the first row of the + * data to considered. + * \param to the index of the past last row of the + * data to considered. + * \param done a promise to fill when the routine + * is done running. */ - void compute_window_means() ; + void compute_window_means_routine(size_t from, + size_t to, + std::promise& done) ; /*! * \brief Checks that the argument has compatible * dimensions with the data and models. If this is * not the case, throw a std::invalid_argument with * a relevant message. * \param posterior_class_prob a vector containing the * class probabilities. * It should have a length equal to the number of * classes. * \throw std::invalid_argument if the dimensions are * incorrect. */ void check_posterior_prob_colsum_dim(const vector_d& posterior_prob_colsum) const ; /*! * \brief contains the data means, for * each window of size l_model. */ matrix2d_d window_means ; } ; #endif // READLAYER_HPP diff --git a/src/Clustering/ReadModelComputer.cpp b/src/Clustering/ReadModelComputer.cpp index 4dfc18d..dbfbd5f 100644 --- a/src/Clustering/ReadModelComputer.cpp +++ b/src/Clustering/ReadModelComputer.cpp @@ -1,27 +1,43 @@ #include #include +#include ReadModelComputer::ReadModelComputer(const matrix2d_i& data, - const matrix4d_d& post_prob) - : ModelComputer() + const matrix4d_d& post_prob, + size_t n_threads) + : ModelComputer(), + threads(nullptr) { // parameters size_t n_class = post_prob[0].size() ; size_t n_shift = post_prob[0][0].size() ; size_t n_flip = post_prob[0][0][0].size() ; bool flip = n_flip == 2 ; + // the threads + if(n_threads) + { this->threads = new ThreadPool(n_threads) ; } + + // the data and the model this->data_layer = new ReadLayer(data, n_class, n_shift, flip) ; - this->data_layer->update_model(post_prob) ; + this->data_layer->update_model(post_prob, + this->threads) ; } ReadModelComputer::~ReadModelComputer() -{ if(this->data_layer != nullptr) +{ // threads + if(this->threads != nullptr) + { this->threads->join() ; + delete this->threads ; + this->threads = nullptr ; + } + // data and model + if(this->data_layer != nullptr) { delete this->data_layer ; this->data_layer = nullptr ; } } diff --git a/src/Clustering/ReadModelComputer.hpp b/src/Clustering/ReadModelComputer.hpp index b66ee1b..0794ea1 100644 --- a/src/Clustering/ReadModelComputer.hpp +++ b/src/Clustering/ReadModelComputer.hpp @@ -1,30 +1,41 @@ #ifndef READMODELCOMPUTER_HPP #define READMODELCOMPUTER_HPP #include #include +#include class ReadModelComputer : public ModelComputer { public: /*! * \brief Constructs an object to retrieve * the read model given the data and their * classification results. * \param data the data. * \param post_prob the data class assignment * probabilities. + * \param n_threads the number of parallel threads + * to run the computations. 0 means no parallel + * computing, everything is run on the main thread. */ ReadModelComputer(const matrix2d_i& data, - const matrix4d_d& post_prob) ; + const matrix4d_d& post_prob, + size_t n_threads) ; /*! * \brief Destructor. */ virtual ~ReadModelComputer() override ; + protected: + /*! + * \brief the threads. + */ + ThreadPool* threads ; + } ; #endif // READMODELCOMPUTER_HPP diff --git a/src/Clustering/SequenceLayer.cpp b/src/Clustering/SequenceLayer.cpp index cd42222..8ef971e 100644 --- a/src/Clustering/SequenceLayer.cpp +++ b/src/Clustering/SequenceLayer.cpp @@ -1,156 +1,495 @@ #include #include // std::invalid_argument #include // numeric_limits -#include // log(), exp(), pow() +#include // log(), pow() #include + +#include // beta_pmf() +#include // rand_real_uniform(), rand_int_uniform() +#include // seqan::SeqFileIn #include +int SequenceLayer::char_to_int(char c) +{ switch (c) + { case 'A': + return 0 ; + break; + case 'C': + return 1 ; + break; + case 'G': + return 2 ; + break; + case 'T': + return 3 ; + break; + default: + char msg[4096] ; + sprintf(msg, "Error! Invalid DNA character %c", c) ; + throw std::invalid_argument(msg) ; + } +} + +matrix2d_i SequenceLayer::read_fasta(const std::string& file_address) +{ + // open file + seqan::SeqFileIn file_in; + if (not seqan::open(file_in, file_address.c_str())) + { char msg[4096] ; + sprintf(msg, "Error! Could not open %s", + file_address.c_str()) ; + throw std::invalid_argument(msg) ; + } + + // read + matrix2d_i seq_matrix ; + seqan::CharString id ; + seqan::Dna5String seq ; + size_t i = 0 ; + size_t seq_l = 0; + while(not seqan::atEnd(file_in)) + { seqan::readRecord(id, seq, file_in) ; + // get sequence length + if(i == 0) + { seq_l = seqan::length(seq) ; } + // sequence length should be constant + else if(seqan::length(seq) != seq_l) + { char msg[4096] ; + sprintf(msg, "Error! Sequences of variable length in %s", + file_address.c_str()) ; + throw std::invalid_argument(msg) ; + } + // store + seq_matrix.push_back(vector_i(seq_l)) ; + for(size_t j=0; j data[0].size()) + { char msg[4096] ; + sprintf(msg, "Error! given model is longer than sequences (%zu / %zu)", + model_log.size(), data[0].size()) ; + throw std::invalid_argument(msg) ; + } + else if(model_log[0].size() != 4) + { char msg[4096] ; + sprintf(msg, "Error! given model 2nd dimension is not 4 (%zu)", + model_log[0].size()) ; + throw std::invalid_argument(msg) ; + } + + size_t from = shift ; + size_t to = from + model_log.size() ; // will score [from, to) + + // assert(to <= sequences.get_ncol()) ; + + double ll = 0 ; + for(size_t i=from, j=0; in_category = 4 ; // initialise the empty model this->model = matrix3d_d(this->n_class, matrix2d_d(this->l_model, vector_d(this->n_category, 0))) ; } SequenceLayer::SequenceLayer(const matrix2d_i& data, const matrix3d_d& model, bool flip) :DataLayer(data, model,flip) {} -SequenceLayer::SequenceLayer(const matrix2d_c& data, - const matrix3d_d& model, - bool flip) -{ // parameters - this->flip = flip ; - this->n_row = data.size() ; - this->n_col = data[0].size() ; - this->n_class = model.size() ; - this->l_model = model[0].size() ; - this->n_category = model[0][0].size() ; - this->n_flip = this->flip + 1 ; - this->n_shift = this->n_col - this->l_model + 1 ; - - // check if model is not too long - if(this->n_col < this->l_model) - { char msg[4096] ; - sprintf(msg, - "Error! model is longer than data : %zu / %zu", - this->l_model, this->n_col) ; - throw std::invalid_argument(msg) ; +SequenceLayer::~SequenceLayer() +{} + +void SequenceLayer::seed_model_randomly() +{ + // get random values from a beta distribution cannot be done using boost so + // i) generate random number [0,1] x + // ii) compute f(x) where f is beta distribution + + matrix2d_d prob(this->n_row, vector_d(this->n_class, 0.)) ; + double tot_sum = 0. ; + + // sample the prob + // beta distribution parameters + double alpha = pow(this->n_row, -0.5) ; + double beta = 1. ; + for(size_t i=0; in_row; i++) + { double row_sum = 0. ; + for(size_t j=0; jn_class; j++) + { double x = rand_real_uniform(0., 1.0) ; + double p = std::max(SequenceLayer::p_min, beta_pmf(x, alpha, beta)) ; + prob[i][j] = p ; + tot_sum += p ; + row_sum += p ; + } + // normalize + for(size_t j=0; jn_class; j++) + { prob[i][j] /= row_sum ; } } - // convert DNA char to DNA int code - this->data = matrix2d_i(this->n_row, - vector_i(this->n_col)) ; + // compute the refererences for(size_t i=0; in_row; i++) - { for(size_t j=0; jn_col; j++) - { switch (data[i][j]) - { case 'A': - this->data[i][j] = 0 ; - break ; - case 'C': - this->data[i][j] = 1 ; - break ; - case 'G': - this->data[i][j] = 2 ; - break ; - case 'T': - this->data[i][j] = 3 ; - break ; - default: - this->data[i][j] = 4 ; - break; + { for(size_t j=0; jn_class; j++) + { for(size_t j_ref=0, j_dat=this->n_shift/2; j_refl_model; j_ref++, j_dat++) + { size_t base = this->data[i][j_dat] ; + this->model[j][j_ref][base] += prob[i][j] ; + } + } + } + // normalize + for(size_t i=0; in_class; i++) + { for(size_t j=0; jl_model; j++) + { // sum + double colsum = 0. ; + for(size_t k=0; kn_category; k++) + { colsum += this->model[i][j][k] ; } + // normalize + for(size_t k=0; kn_category; k++) + { double p = this->model[i][j][k] / colsum ; + this->model[i][j][k] = + std::max(p, SequenceLayer::p_min) ; } } } - - // model - this->model = model ; } -SequenceLayer::~SequenceLayer() -{} +void SequenceLayer::seed_model_sampling() +{ + std::vector choosen(this->n_row, false) ; -void SequenceLayer::seed_model_randomly() -{} + double minor_weight = 1. ; + double major_weight = 7. ; -void SequenceLayer::seed_model_sampling() -{} + for(size_t i=0; in_class; ) + { size_t index = rand_int_uniform(size_t(0), size_t(this->n_row-1)) ; + // already choose + if(choosen[index]) + { ; } + // not yet choosen as reference + else + { for(size_t j_ref=0, j_dat=this->n_shift/2; j_refl_model; j_ref++, j_dat++) + { size_t base = this->data[index][j_dat] ; + double colsum = 0. ; + for(size_t k=0; kn_category; k++) + { if(k == base) + { this->model[i][j_ref][k] = major_weight ; } + else + { this->model[i][j_ref][k] = minor_weight ; } + colsum += this->model[i][j_ref][k] ; + } + // normalize + for(size_t k=0; kn_category; k++) + { this->model[i][j_ref][k] /= colsum ; } + } + choosen[index] = true ; + i++ ; + } + } +} void SequenceLayer::seed_model_toy() -{} +{ + // sample data to initialise the references + std::vector choosen(this->n_row, false) ; + + double minor_weight = 1. ; + double major_weight = 7. ; + + for(size_t i=0; in_class; ) + { size_t index = i ; + // already choose + if(choosen[index]) + { ; } + // not yet choosen as reference + else + { for(size_t j_ref=0, j_dat=this->n_shift/2; j_refl_model; j_ref++, j_dat++) + { size_t base = this->data[index][j_dat] ; + double colsum = 0. ; + for(size_t k=0; kn_category; k++) + { if(k == base) + { this->model[i][j_ref][k] = major_weight ; } + else + { this->model[i][j_ref][k] = minor_weight ; } + colsum += this->model[i][j_ref][k] ; + } + // normalize + for(size_t k=0; kn_category; k++) + { this->model[i][j_ref][k] /= colsum ; } + } + choosen[index] = true ; + i++ ; + } + } +} void SequenceLayer::compute_loglikelihoods(matrix4d_d& loglikelihood, - vector_d& loglikelihood_max) const -{ // dimension checks + vector_d& loglikelihood_max, + ThreadPool* threads) const +{ + // dimension checks this->check_loglikelihood_dim(loglikelihood) ; this->check_loglikelihood_max_dim(loglikelihood_max) ; + // 1) compute the log prob model and the log prob reverse-complement model + matrix3d_d model_log(this->n_class, + matrix2d_d(this->l_model, + vector_d(this->n_category, 0.))) ; + matrix3d_d model_log_rev = model_log ; + for(size_t i=0; in_class; i++) + { for(size_t j=0; jl_model; j++) + { for(size_t k=0; kn_category; k++) + { // forward + model_log[i][j][k] = log(this->model[i][j][k]) ; + // reverse + model_log_rev[i][this->l_model-j-1][this->n_category-k-1] + = log(this->model[i][j][k]) ; + } + } + } + + // don't parallelize + if(threads == nullptr) + { std::promise promise ; + std::future future = promise.get_future() ; + this->compute_loglikelihoods_routine(0, this->n_row, + loglikelihood, + loglikelihood_max, + model_log, + model_log_rev, + promise) ; + future.get() ; + } + // parallelize + else + { size_t n_threads = threads->getNThread() ; + // compute the slices on which each thread will work + std::vector> slices = + ThreadPool::split_range(0, this->n_row, n_threads) ; + + // get promises and futures + // the function run by the threads will simply fill the promise with + // "true" to indicate that they are done + std::vector> promises(n_threads) ; + std::vector> futures(n_threads) ; + for(size_t i=0; iaddJob(std::move( + std::bind(&SequenceLayer::compute_loglikelihoods_routine, + this, + slice.first, + slice.second, + std::ref(loglikelihood), + std::ref(loglikelihood_max), + std::ref(model_log), + std::ref(model_log_rev), + std::ref(promises[i])))) ; + } + // wait until all threads are done working + for(auto& future : futures) + { future.get() ; } + // -------------------------- threads stop --------------------------- + } +} + +void SequenceLayer::compute_loglikelihoods_routine(size_t from, + size_t to, + matrix4d_d& loglikelihood, + vector_d& loglikelihood_max, + const matrix3d_d& model_log, + const matrix3d_d& model_log_rev, + std::promise& done) const +{ // compute log likelihood - for(size_t i=0; in_row; i++) + for(size_t i=from; i::lowest() ; for(size_t j=0; jn_class; j++) - { for(size_t s_fw=0, s_rev=this->n_shift-1; - s_fwn_shift; s_fw++, s_rev--) - { // slice is [from_fw,to) - // from_dat_fw to_dat_fw [from_dat_fw, to_dat_fw] - // fw |---------->>>----------| - // ----------------------------------> data - // rev |----------<<<----------| [from_dat_rev, to_dat_rev] - // to_dat_rev can be -1 -> int - // to_dat_rev from_dat_rev - - // log likelihood - double ll_fw = 0. ; - double ll_rev = 0. ; - // --------------- forward --------------- - size_t from_dat_fw = s_fw ; - size_t to_dat_fw = from_dat_fw + this->l_model - 1 ; - // --------------- reverse --------------- - size_t from_dat_rev = this->n_col - 1 - s_fw ; - // size_t to_dat_rev = from_dat_rev - (this->l_model - 1) ; - - for(size_t j_dat_fw=from_dat_fw,j_ref_fw=0, j_dat_rev=from_dat_rev; - j_dat_fwmodel[j][j_ref_fw][this->data[i][j_dat_fw]]) ; - ll_fw += std::max(ll, SequenceLayer::p_min_log) ; - // --------------- reverse --------------- - if(this->flip) - { ll = log(this->model[j][j_ref_fw][this->data[i][j_dat_rev]]) ; - ll_rev += std::max(ll, SequenceLayer::p_min_log) ; - } - } - loglikelihood[i][j][from_dat_fw][flip_states::FORWARD] = ll_fw ; - // keep track of the max per row - if(ll_fw > loglikelihood_max[i]) - { loglikelihood_max[i] = ll_fw ; } + { for(size_t s=0; sn_shift; s++) + { // forward strand + { double ll_fw = + std::max(SequenceLayer:: + score_subseq(this->data, i, s, model_log[j]), + SequenceLayer::p_min_log) ; + loglikelihood[i][j][s][flip_states::FORWARD] = ll_fw ; + // keep track of max per row + if(ll_fw > loglikelihood_max[i]) + { loglikelihood_max[i] = ll_fw ; } + } + // reverse if(this->flip) - { loglikelihood[i][j][from_dat_fw][flip_states::REVERSE] = ll_rev ; - // keep track of the max per row + { double ll_rev = + std::max(SequenceLayer:: + score_subseq(this->data, i, s, model_log_rev[j]), + SequenceLayer::p_min_log) ; + loglikelihood[i][j][s][flip_states::REVERSE] = ll_rev ; + // keep track of max per row if(ll_rev > loglikelihood_max[i]) { loglikelihood_max[i] = ll_rev ; } } } } } + done.set_value(true) ; +} + +void SequenceLayer::update_model(const matrix4d_d& posterior_prob, + ThreadPool* threads) +{ // don't parallelize + if(threads == nullptr) + { std::promise promise ; + std::future future = promise.get_future() ; + this->update_model_routine(0, + this->n_row, + posterior_prob, + promise) ; + this->model = future.get() ; + } + // parallelize + else + { size_t n_threads = threads->getNThread() ; + // compute the slices on which each thread will work + std::vector> slices = + ThreadPool::split_range(0, this->n_row, n_threads) ; + + // get promises and futures + // the function run by the threads will simply fill the promise with + // "true" to indicate that they are done + std::vector> promises(n_threads) ; + std::vector> futures(n_threads) ; + for(size_t i=0; iaddJob(std::move( + std::bind(&SequenceLayer::update_model_routine, + this, + slice.first, + slice.second, + std::ref(posterior_prob), + std::ref(promises[i])))) ; + } + // reinitialise the model + this->model = matrix3d_d(this->n_class, + matrix2d_d(this->l_model, + vector_d(this->n_category, 0))) ; + // wait until all threads are done working + // and update the model + for(auto& future : futures) + { matrix3d_d model_part = future.get() ; + for(size_t i=0; in_class; i++) + { for(size_t j=0; jl_model; j++) + { for(size_t k=0; kn_category; k++) + { this->model[i][j][k] += model_part[i][j][k] ; } + } + } + } + // -------------------------- threads stop --------------------------- + } + // normalize to get probs + for(size_t i=0; in_class; i++) + { for(size_t j=0; jl_model; j++) + { double sum = 0. ; + for(size_t k=0; kn_category; k++) + { sum += this->model[i][j][k] ; } + for(size_t k=0; kn_category; k++) + { double p = this->model[i][j][k] / sum ; + this->model[i][j][k] = + std::max(p, SequenceLayer::p_min) ; + } + } + } } -void SequenceLayer::update_model(const matrix4d_d& posterior_prob) -{ // dimension checks + +void SequenceLayer::update_model_routine(size_t from, + size_t to, + const matrix4d_d& posterior_prob, + std::promise& promise) const +{ + // dimension checks this->check_posterior_prob_dim(posterior_prob) ; + + matrix3d_d model = matrix3d_d(this->n_class, + matrix2d_d(this->l_model, + vector_d(this->n_category, 0))) ; + for(size_t k=0; k < this->n_class; k++) + { for(size_t s=0; sn_shift; s++) + { for(size_t j=0; jl_model; j++) + { + // base prob on fw and rv strand + vector_d base_prob(this->n_category, 0.) ; + vector_d base_prob_rev(this->n_category,0.) ; + + for(size_t i=from; idata[i][s+j] ; + // only treat A,C,G,T + if(base > 3) + { continue ; } + // --------------- forward --------------- + { base_prob[base] += + posterior_prob[i][k][s][SequenceLayer::FORWARD] ; + } + // --------------- reverse --------------- + if(this->flip) + { base_prob[base] += + posterior_prob[i][k][s][SequenceLayer::REVERSE] ; + } + } + // update this position of the model + for(size_t i=0,i_rev=base_prob.size()-1; iflip) + { model[k][this->l_model-j-1][i] += base_prob_rev[i] ; } + } + } + } + } + promise.set_value(model) ; } diff --git a/src/Clustering/SequenceLayer.hpp b/src/Clustering/SequenceLayer.hpp index 6012904..af3c77b 100644 --- a/src/Clustering/SequenceLayer.hpp +++ b/src/Clustering/SequenceLayer.hpp @@ -1,107 +1,206 @@ #ifndef SEQUENCELAYER_HPP #define SEQUENCELAYER_HPP #include #include +#include +#include // std::promise, std::future #include +#include class SequenceLayer : public DataLayer -{ public: +{ + public: + /*! + * \brief Converts a DNA character (A, C, + * G, T) to an integer. + * \param c the DNA character of interest. + * \return the character integer code. + * \throw std::invalid_argument if the + * given character is not a valid DNA + * character. + */ + static int char_to_int(char c) ; + + /*! + * \brief Loads the content of a fasta file and stores the + * data in a int matrix where each row contains one sequence. + * The sequence in the file should all have the same length. + * The DNA characters are converted using + * SequenceLayer::char_to_int(char) ; + * \param file_address the address of the file to load. + * \throw std::invalid_argument if the file cannot be read, + * if an invalid DNA character is detected in the sequences + * or if the sequences have variable lengths. + * \return a matrix containing the sequences on the rows and + * the characters over the columns. + */ + static matrix2d_i read_fasta(const std::string& file_address) ; + + /*! + * \brief Computes the log-likelihood of the sub- + * sequence starting at offset at row + * in the given data matrix with the given model. The + * subsequence length is determined by the model + * lenght. + * \param data the matrix containing the sequences + * in integer format. + * \param row the index of the row where the sub- + * sequence is. + * \param shift the offset at which the sub-sequence + * is starting. + * \param model_log a model containing the log + * probability model. + * \return the log-likelihood of the sub-sequence + * given the model. + * \throw std::invalid_argument if 1) the row index + * is invalid, 2) the shift is invalid, 3) the data + * and the model have incompatible dimensions or + * 4) the model 2n dimension is not 4 (A,C,G,T). + */ + static double score_subseq(const matrix2d_i& data, + size_t row, + size_t shift, + const matrix2d_d& model_log) ; + + public: /*! * \brief Constructs an object with the * given data and an empty (0 values) * model. * \param data the data. * \param n_class the number of classes * of the model. * \param n_shift the number of shift * states of the model. * \param flip whether flipping is allowed. */ SequenceLayer(const matrix2d_i& data, size_t n_class, size_t n_shift, bool flip) ; /*! * \brief Construct an object with the * given data and model. * \param data the data. The sequences * should be stored as integer values : * A:0, C:1, G:2, T:3, else:5. * \param the model. * \param flip whether flipping is allowed. */ SequenceLayer(const matrix2d_i& data, const matrix3d_d& model, bool flip) ; - /*! - * \brief Construct an object with the - * given data and model. - * \param data the data. The sequences - * will be transformed into integer values : - * A:0, C:1, G:2, T:3, else:5. - * \param the model. - * \param flip whether flipping is allowed. - */ - SequenceLayer(const matrix2d_c& data, - const matrix3d_d& model, - bool flip) ; - /*! * Destructor */ virtual ~SequenceLayer() override ; /*! * \brief Sets the model values randomly. */ virtual void seed_model_randomly() ; /*! * \brief Sets the model values by * sampling rows in the data and * assigning them as initial model * values. */ virtual void seed_model_sampling() ; /*! * \brief Sets the model values by * using the first n_class rows in data. */ virtual void seed_model_toy() ; /*! * \brief Computes the log likelihood of the data * given the current model parameters. * \param logliklihood a matrix to store the * results. It should have the following dimensions : * 1st : same as the data number of row * 2nd : same as the model number of classes * 3rd : same as the number of shifts * 4th : same as the number of flip states * \param loglikelihood_max a vector containing the * max value for each row of loglikelihood. * Its length should be equal to the data row number. * \throw std::invalid_argument if the dimensions are * incorrect. */ virtual void compute_loglikelihoods(matrix4d_d& loglikelihood, - vector_d& loglikelihood_max) const override ; + vector_d& loglikelihood_max, + ThreadPool* threads=nullptr) const override ; /*! * \brief Updates the model given the posterior * probabilities (the probabilities of each row * in the data to be assigned to each class, * for each shift and flip state). * \param posterior_prob the data assignment probabilities to * the different classes. */ - virtual void update_model(const matrix4d_d& posterior_prob) override ; + virtual void update_model(const matrix4d_d& posterior_prob, + ThreadPool* threads=nullptr) override ; + + protected: + /*! + * \brief The routine that effectively performs the + * loglikelihood computations. + * \param from the index of the first row of the data + * to considered. + * \param to the index of the past last row of the data + * to considered. + * \param loglikelihood a matrix to store the + * results. It should have the following dimensions : + * 1st : same as the data number of row + * 2nd : same as the model number of classes + * 3rd : same as the number of shifts + * 4th : same as the number of flip states + * \param loglikelihood_max a vector containing the + * max value for each row of log_likelihood. + * Its length should be equal to the data row number. + * \param model_log a matrix containing the log value + * of the model. + * \param model_log_rev a matrix containing the log values + * of the reverse strand model (the 1st position in the model + * becomes the last in the reverse model and probabilities are + * swapped A<->T and C<->G). + * \param done a promise to be filled when the routine + * is done running. + */ + void compute_loglikelihoods_routine(size_t from, + size_t to, + matrix4d_d& loglikelihood, + vector_d& loglikelihood_max, + const matrix3d_d& model_log, + const matrix3d_d& model_log_rev, + std::promise& done) const ; + /*! + * \brief The routine that effectively update the model. + * \param from the index of the first row of the + * posterior probabilities to considered. + * \param to the index of the past last row of the + * posterior probabilities to considered. + * \param posterior_prob the data assignment probabilities + * to the different classes. + * \param + * \param done a promise containing the partial model + * computed from the given data slice. If several routines + * work together at updating the model, they need to be + * summed and normalized (by the column sum) to get the + * final model. + */ + void update_model_routine(size_t from, + size_t to, + const matrix4d_d& posterior_prob, + std::promise& done) const ; } ; #endif // SEQUENCELAYER_HPP diff --git a/src/Clustering/SequenceModelComputer.cpp b/src/Clustering/SequenceModelComputer.cpp index 3aea168..b267a4a 100644 --- a/src/Clustering/SequenceModelComputer.cpp +++ b/src/Clustering/SequenceModelComputer.cpp @@ -1,27 +1,42 @@ #include #include SequenceModelComputer::SequenceModelComputer(const matrix2d_i& data, - const matrix4d_d& post_prob) - : ModelComputer() + const matrix4d_d& post_prob, + size_t n_threads) + : ModelComputer(), + threads(nullptr) { // parameters size_t n_class = post_prob[0].size() ; size_t n_shift = post_prob[0][0].size() ; size_t n_flip = post_prob[0][0][0].size() ; bool flip = n_flip == 2 ; + // the threads + if(n_threads) + { this->threads = new ThreadPool(n_threads) ; } + + // the data and the model this->data_layer = new SequenceLayer(data, n_class, n_shift, flip) ; - this->data_layer->update_model(post_prob) ; + this->data_layer->update_model(post_prob, + this->threads) ; } SequenceModelComputer::~SequenceModelComputer() -{ if(this->data_layer != nullptr) +{ // threads + if(this->threads != nullptr) + { this->threads->join() ; + delete this->threads ; + this->threads = nullptr ; + } + // data and model + if(this->data_layer != nullptr) { delete this->data_layer ; this->data_layer = nullptr ; } } diff --git a/src/Clustering/SequenceModelComputer.hpp b/src/Clustering/SequenceModelComputer.hpp index 6b22246..9c69b97 100644 --- a/src/Clustering/SequenceModelComputer.hpp +++ b/src/Clustering/SequenceModelComputer.hpp @@ -1,30 +1,41 @@ #ifndef SEQUENCEMODELCOMPUTER_HPP #define SEQUENCEMODELCOMPUTER_HPP #include #include +#include class SequenceModelComputer : public ModelComputer { public: /*! * \brief Constructs an object to retrieve * the sequence model given the data and their * classification results. * \param data the data. * \param post_prob the data class assignment * probabilities. + * \param n_threads the number of parallel threads + * to run the computations. 0 means no parallel + * computing, everything is run on the main thread. */ SequenceModelComputer(const matrix2d_i& data, - const matrix4d_d& post_prob) ; + const matrix4d_d& post_prob, + size_t n_threads) ; /*! * \brief Destructor. */ virtual ~SequenceModelComputer() override ; + protected: + /*! + * \brief the threads. + */ + ThreadPool* threads ; + } ; #endif // SEQUENCEMODELCOMPUTER_HPP diff --git a/src/GenomicTools/CorrelationMatrixCreator.cpp b/src/GenomicTools/CorrelationMatrixCreator.cpp index fbf6dfa..fb0cf0b 100644 --- a/src/GenomicTools/CorrelationMatrixCreator.cpp +++ b/src/GenomicTools/CorrelationMatrixCreator.cpp @@ -1,373 +1,376 @@ #include #include #include // std::runtime_error #include // BamFileIn #include // BedFileIn #include #include template std::ostream& operator << (std::ostream& stream, const std::list& l) { for(const auto& p : l) { stream << p << " " ; } return stream ; } template std::ostream& operator << (std::ostream& stream, const std::vector& v) { for(const auto& p : v) { stream << p << " " ; } return stream ; } template std::ostream& operator << (std::ostream& stream, const std::pair& p) { stream << "[" << p.first << " " << p.second << "] " ; return stream ; } template std::ostream& operator << (std::ostream& stream, const std::unordered_map& m) { for(const auto& p : m) { stream << p << " " << std::endl; } return stream ; } /* A lambda to sort GenomeRegion by ascending starting coordinate */ auto sortByStartPos = [](const GenomeRegion& r1, const GenomeRegion& r2) -> bool { return r1 < r2 ; } ; CorrelationMatrixCreator::CorrelationMatrixCreator(const std::string& bed_file_path, const std::string& bam_file_path, const std::string& bai_file_path, int from, int to, int bin_size, MatrixCreator::methods method) : MatrixCreator(bed_file_path, bam_file_path, bai_file_path, from, to, bin_size, method), target_list_fw(), target_list_rv() { seqan::BedRecord bed_line ; // compute coordinates relative to each region this->compute_relative_bin_coord() ; size_t n_col = this->relative_bin_coord.size() ; // compute number of regions and get valid chromosomes names this->open_bed_file() ; this->open_bam_file() ; seqan::BamHeader header ; seqan::readHeader(header, bam_file) ; size_t n_row = 0 ; while(not seqan::atEnd(this->bed_file)) { seqan::readRecord(bed_line, this->bed_file) ; std::string chrom_name = seqan::toCString(bed_line.ref) ; // new chromosome if(this->chrom_map_names.find(chrom_name) == this->chrom_map_names.end()) { int chrom_idx = -1 ; seqan::getIdByName(chrom_idx, seqan::contigNamesCache(seqan::context(this->bam_file)), chrom_name) ; this->chrom_map_names[chrom_name] = chrom_idx ; } n_row++ ; } this->close_bed_file() ; this->close_bam_file() ; // create the count matrix this->matrix_counts = Matrix2D(n_row, n_col, 0) ; // create the region matrix this->matrix_bins = std::vector> (n_row,std::vector(n_col)) ; this->open_bed_file() ; this->open_bam_file() ; size_t i = 0 ; while(not seqan::atEnd(this->bed_file)) { seqan::readRecord(bed_line, this->bed_file) ; // find the region limits std::string region_chr = seqan::toCString(bed_line.ref) ; int region_len = bed_line.endPos - bed_line.beginPos ; int region_mid = bed_line.beginPos + (region_len / 2) ; // compute the absolute bins coordinates for this region // and create the bins in this region for(size_t j=0; jrelative_bin_coord[j] ; this->matrix_bins[i][j] = GenomeRegion(region_chr, this->chrom_map_names[region_chr], region_mid + relative_coord.first, region_mid + relative_coord.second) ; } + std::cerr << region_chr << "\t" + << this->matrix_bins[i].front().start << "\t" + << this->matrix_bins[i].back().end << std::endl ; i++ ; } this->close_bed_file() ; this->close_bam_file() ; } CorrelationMatrixCreator::~CorrelationMatrixCreator() { this->close_bam_file() ; this->close_bed_file() ; } Matrix2D CorrelationMatrixCreator::create_matrix() { this->open_bam_file() ; this->open_bai_file() ; // read BAM header seqan::BamHeader bam_header ; seqan::readHeader(bam_header, this->bam_file) ; for(size_t i=0; imatrix_counts.get_nrow(); i++) { const auto& row = this->matrix_bins[i] ; GenomeRegion region(row.front().chromosome, row.front().chromosome_idx, row.front().start, row.back().end) ; bool jump = this->jump_upstream(region, 600) ; if(not jump) { continue ; } // read all relevant targets this->to_downstream_target(region) ; // update count matrix row this->update_count_matrix(i) ; // clean buffers this->clear_target_lists() ; } this->close_bam_file() ; return this->matrix_counts ; } bool CorrelationMatrixCreator::jump_upstream(const GenomeRegion& region, int margin) { bool has_alignment = false ; int rID = -10 ; if(this->chrom_map_names.find(region.chromosome) != this->chrom_map_names.end()) { rID = this->chrom_map_names[region.chromosome] ; } else { char msg[4096] ; sprintf(msg, "Error! chromosome %s is not linked with a valid ID in BAM file", region.chromosome.c_str()) ; std::cerr << msg << std::endl ; return false ; } int start = std::max(0, region.start - margin) ; int end = start + 1 ; bool jump = seqan::jumpToRegion(this->bam_file, has_alignment, rID, start, end, this->bai_file) ; return jump ; } void CorrelationMatrixCreator::to_downstream_target(const GenomeRegion& region) { if(this->method == CorrelationMatrixCreator::methods::READ or this->method == CorrelationMatrixCreator::methods::READ_ATAC) { this->to_downstream_read(region) ; } else { this->to_downstream_fragment(region) ; } } void CorrelationMatrixCreator::to_downstream_read(const GenomeRegion& region) { bool done = false ; seqan::BamAlignmentRecord record ; while(not seqan::atEnd(this->bam_file) and not done) { // QC check and transform record seqan::readRecord(record, this->bam_file) ; if(not CorrelationMatrixCreator::is_good_read(record) or not this->is_valid_chromosome(record)) { continue ; } GenomeRegion target ; try { if(this->method == CorrelationMatrixCreator::methods::READ) { target = GenomeRegion::constructRead(record, this->bam_file) ; } else { target = GenomeRegion::constructReadATAC(record, this->bam_file) ; } } catch(std::invalid_argument& e) { // connect to cerr to write in SAM seqan::BamFileOut samFileOut(seqan::context(this->bam_file), std::cerr, seqan::Sam()) ; std::cerr << "std::invalid_argument caught! could not use " "this record as read: " << std::endl ; writeRecord(samFileOut, record) ; std::cerr << "message was : " << e.what() << std::endl << std::endl ; continue ; } // upstream -> continue if(target < region) { continue ; } // overlap -> store else if(target | region) { if(not seqan::hasFlagRC(record)) { this->target_list_fw.push_back(target) ; } else { this->target_list_rv.push_back(target) ; } } // downstream -> stop else { done = true ; } } } void CorrelationMatrixCreator::to_downstream_fragment(const GenomeRegion& region) { bool done = false ; seqan::BamAlignmentRecord record ; while(not seqan::atEnd(this->bam_file) and not done) { // QC check and transform record seqan::readRecord(record, this->bam_file) ; if(not CorrelationMatrixCreator::is_good_pair(record) or not this->is_valid_chromosome(record)) { continue ; } GenomeRegion target ; try { target = GenomeRegion::constructFragment(record, this->bam_file) ; } catch(std::invalid_argument& e) { // connect to cerr to write in SAM seqan::BamFileOut samFileOut(seqan::context(this->bam_file), std::cerr, seqan::Sam()) ; std::cerr << "std::invalid_argument caught! could not use " "this record as fragment: " << std::endl ; writeRecord(samFileOut, record) ; std::cerr << "message was : " << e.what() << std::endl << std::endl ; continue ; } // upstream -> continue if(target < region) { continue ; } // overlap -> store else if(target | region) { if(this->method == CorrelationMatrixCreator::methods::FRAGMENT_CENTER) { target = GenomeRegion::constructFragmentCenter(record, this->bam_file) ; if(target | region) { this->target_list_fw.push_back(target) ; } } else { this->target_list_fw.push_back(target) ; } } // downstream -> stop else if(target > region) { // std::cerr << std::endl ; done = true ; } } // std::cerr << "to_downstream_fragment END" << std::endl ; } void CorrelationMatrixCreator::clear_target_lists() { this->target_list_fw.clear() ; this->target_list_rv.clear() ; } /* void CorrelationMatrixCreator::remove_upstream_targets(const GenomeRegion& region) { // forward targets auto iter_fw = this->target_list_fw.cbegin() ; while(iter_fw != this->target_list_fw.end()) { // remove upstream reads if(*iter_fw < region) { iter_fw = this->target_list_fw.erase(iter_fw) ; } // keep overlapping reads, don't stop here else if(*iter_fw | region) { iter_fw++ ; } // stop at first read downstream else { break ; } } // reverse targets auto iter_rv = this->target_list_rv.cbegin() ; while(iter_rv != this->target_list_rv.end()) { // remove upstream reads if(*iter_rv < region) { iter_rv = this->target_list_rv.erase(iter_rv) ; } // keep overlapping reads else if(*iter_rv | region) { iter_rv++ ; } // stop at first read downstream else { break ; } } } */ void CorrelationMatrixCreator::update_count_matrix(size_t row_index) { // forward targets for(const auto& iter : this->target_list_fw) { auto bin_start_end = CorrelationMatrixCreator:: get_bin_indices(iter, this->matrix_bins[row_index]) ; for(int j=bin_start_end.first; jmatrix_counts(row_index, j) += iter.overlap_len(this->matrix_bins[row_index][j]) ; } } // reverse targets for(const auto& iter : this->target_list_rv) { auto bin_start_end = CorrelationMatrixCreator:: get_bin_indices(iter, this->matrix_bins[row_index]) ; for(int j=bin_start_end.first; jmatrix_counts(row_index, j) += iter.overlap_len(this->matrix_bins[row_index][j]) ; } } } /* void CorrelationMatrixCreator::update_count_matrix_naive(size_t row_index) { // forward targets for(const auto& iter : target_list_fw) { for(size_t j=0; jmatrix_counts.get_ncol(); j++) { this->matrix_counts(row_index, j) += iter.overlap_len(this->matrix_bins[row_index][j]) ; } } // reverse targets for(const auto& iter : target_list_rv) { for(size_t j=0; jmatrix_counts.get_ncol(); j++) { this->matrix_counts(row_index, j) += iter.overlap_len(this->matrix_bins[row_index][j]) ; } } } */ diff --git a/src/main_cormat.cpp b/src/main_cormat.cpp index a3b01f5..b72dbaf 100644 --- a/src/main_cormat.cpp +++ b/src/main_cormat.cpp @@ -1,174 +1,174 @@ #include #include #include #include #include #include using namespace seqan; template std::ostream& operator << (std::ostream& stream, const std::vector& v) { for(const auto& p : v) { stream << p << " " ; } return stream ; } template std::ostream& operator << (std::ostream& stream, const std::pair& p) { stream << "[" << p.first << " " << p.second << "] " ; return stream ; } /* std::pair get_bin_indices(const GenomeRegion& target, const std::vector& bins) { // the bin range and chromosome int chromosome_idx = bins.front().chromosome_idx ; int bin_size = bins.front().end - bins.front().start ; int from = bins.front().start ; int to = bins.back().end ; // not on the same chromosome if(target.chromosome_idx != chromosome_idx) { return std::make_pair(0,0) ; } // target goes over all bins else if(target.start <= from and target.end >= to) { return std::make_pair(0, bins.size()) ; } // check if overlap else { // define whether target limits are inside int bin_start = -1 ; int bin_end = -1 ; // define whether target limits are inside bool target_start_in = false ; bool target_end_in = false ; if(target.start >= from and target.start < to) { target_start_in = true ; } if(target.end > from and target.end <= to) { target_end_in = true ; } // start if(not target_start_in) { bin_start = 0 ; } else { bin_start = (target.start - from) / bin_size ; } // end if(target_start_in and not target_end_in) { bin_end = bin_start + 1 ; } else if(not target_start_in and not target_end_in) { bin_end = 0 ; } else { bin_end = ((target.end - 1 - from) / bin_size) + 1 ; } return std::make_pair(bin_start, bin_end) ; } } */ std::pair get_bin_indices_naive(const GenomeRegion& target, const std::vector& bins) { int bin_start = 0 ; int bin_end = 0 ; GenomeRegion range(bins.front().chromosome, bins.front().chromosome_idx, bins.front().start, bins.back().end) ; // no overlap if(not (target | range)) { return std::make_pair(0,0) ; } else { // start if(target.start < bins.front().start) { bin_start = 0 ; } else { for(int i=0; i< (int)bins.size(); i++) { if(target.start >= bins[i].start and target.start < bins[i].end) { bin_start = i ; break ; } } } // end if(target.end > bins.back().end) { bin_end = bins.size() ; } else { for(int i=0; i<(int)bins.size(); i++) { if(target.end <= bins[i].end and target.end > bins[i].start) { bin_end = i+1 ; break ; } } } return std::make_pair(bin_start, bin_end) ; } } std::pair get_bin_indices(const GenomeRegion& target, const std::vector& bins) { // the bin range and chromosome GenomeRegion range(bins.front().chromosome, bins.front().chromosome_idx, bins.front().start, bins.back().end) ; // no overlap if(not (target | range)) { return std::make_pair(0,0) ; } // overlap else { // target goes over all bins if(target.start <= range.start and target.end >= range.end) { return std::make_pair(0, bins.size()) ; } else { int bin_start = -1 ; int bin_end = -1 ; int bin_size = bins.front().end - bins.front().start ; // start if(target.start <= range.start) { bin_start = 0 ; } else { bin_start = (target.start - range.start) / bin_size ; } // end if(target.end >= range.end) { bin_end = bins.size() ; } else { bin_end = ((target.end - 1 - range.start) / bin_size) + 1 ; } return std::make_pair(bin_start, bin_end) ; } } } int main() { std::string bed = "data/10xgenomics_PBMC_5k/ctcf_motifs_10e-6.bed" ; std::string bam = "data/10xgenomics_PBMC_5k/atac_v1_pbmc_5k_possorted_filtered_30-84bp.bam" ; std::string bai = "data/10xgenomics_PBMC_5k/atac_v1_pbmc_5k_possorted_filtered_30-84bp.bam.bai" ; CorrelationMatrixCreator mc(bed, bam, bai, - -100, - 100, - 10, + -400, + 400, + 1, CorrelationMatrixCreator::methods::READ_ATAC) ; mc.create_matrix() ; return 0; } diff --git a/src/main_em.cpp b/src/main_em.cpp index c09c5ad..145293a 100644 --- a/src/main_em.cpp +++ b/src/main_em.cpp @@ -1,71 +1,45 @@ #include #include #include #include +#include #include int main() { - std::vector read = {read_matrix2d_i("toy.mat")} ; - // std::vector read = {read_matrix2d_i("results/10xgenomics_PBMC_5k/" - // "ctcf_motifs_10e-6_open_bin1bp_read_atac.mat")} ; - std::vector seq = {} ; - EMEngine em(read, - seq, - 2, - 1, - 5, - true, - EMEngine::seeding_codes::RANDOM, - "20190710") ; - em.classify() ; - matrix4d_d post_prob1 = em.get_post_prob() ; + matrix2d_i seq = SequenceLayer::read_fasta("/local/groux/scATAC-seq/test.fasta") ; + matrix2d_i seq2(seq.size(), + vector_i(21, 0)) ; - // dump - std::ofstream f_out ; - f_out.open("prob.mat4d") ; - f_out << post_prob1 << std::endl ; - f_out.close() ; - - // read - matrix4d_d post_prob2 = read_matrix4d_d("prob.mat4d") ; - for(size_t i=0; i read_data = {}; + std::vector seq_data = {seq2} ; - vector_d class_prob1(post_prob1[0].size(), 0.) ; - vector_d class_prob2(post_prob2[0].size(), 0.) ; - double p_tot1 = 0. ; - double p_tot2 = 0. ; - for(size_t i=0; i +#include #include #include #include #include #include using namespace seqan; template std::ostream& operator << (std::ostream& o, const std::unordered_map& map) { for(const auto& pair : map) { o << "< " << pair.first << " " << pair.second << " >" << std::endl ; } return o ; } void bam_stat(const std::string& path_bam) { // CharString bamFileInName = path_bam.c_str() ; // Open input BAM file. BamFileIn bamFileIn; if (!open(bamFileIn, path_bam.c_str())) { char msg[1024] ; sprintf(msg, "ERROR: could not open input file %s", path_bam.c_str()) ; throw std::runtime_error(msg); } // read header. BamHeader header; try { readHeader(header, bamFileIn); } catch (ParseError const & e) { char msg[1024] ; sprintf(msg, "ERROR: input header is badly formatted. %s", e.what()) ; throw std::runtime_error(msg); } // counters int n_frag = 0 ; int n_frag_bad = 0 ; int n_read_fw_1 = 0 ; int n_read_fw_2 = 0 ; int n_read_rv_1 = 0 ; int n_read_rv_2 = 0 ; BamAlignmentRecord record; while (!atEnd(bamFileIn)) { readRecord(record, bamFileIn) ; n_frag++ ; if(not seqan::hasFlagAllProper(record)) { n_frag_bad++ ; continue ; } else if((seqan::hasFlagRC(record) and seqan::hasFlagNextRC(record)) or (not seqan::hasFlagRC(record) and not seqan::hasFlagNextRC(record))) { n_frag_bad++ ; continue ; } // read if(seqan::hasFlagFirst(record) and not seqan::hasFlagRC(record)) { n_read_fw_1++ ; } if(not seqan::hasFlagFirst(record) and not seqan::hasFlagRC(record)) { n_read_fw_2++ ; } if(seqan::hasFlagFirst(record) and seqan::hasFlagRC(record)) { n_read_rv_1++ ; } if(not seqan::hasFlagFirst(record) and seqan::hasFlagRC(record)) { n_read_rv_2++ ; } } close(bamFileIn) ; std::cout << path_bam << std::endl ; std::cout << "n frag : " << n_frag << std::endl ; std::cout << "n frag bad qual : " << n_frag_bad << std::endl ; std::cout << "n read fw 1st : " << n_read_fw_1 << std::endl ; std::cout << "n read fw 2nd : " << n_read_fw_2 << std::endl ; std::cout << "n read rv 1st : " << n_read_rv_1 << std::endl ; std::cout << "n read rv 2nd : " << n_read_rv_2 << std::endl << std::endl ; } bool is_good_read(const seqan::BamAlignmentRecord& record) { if((seqan::hasFlagUnmapped(record)) or // read unmapped flag seqan::hasFlagQCNoPass(record) or // not passing QC flag seqan::hasFlagDuplicate(record)) // PCR duplicate flag { return false ; } return true ; } bool is_good_pair(const seqan::BamAlignmentRecord& record) { if((not seqan::hasFlagMultiple(record)) or // is paired flag (not seqan::hasFlagAllProper(record))) // each read properly aligned flag { return false ; } if((not seqan::hasFlagFirst(record)) or // read 1st in pair flag seqan::hasFlagLast(record)) // mate 1st in pair flag { return false ; } // read info bool read_is_rev = seqan::hasFlagRC(record) ; // read is rev flag int read_start = record.beginPos ; // mate info bool mate_is_rev = seqan::hasFlagNextRC(record) ; // mate is rev flag int mate_start = record.pNext ; // qc if((not is_good_read(record)) or // --> --> (not read_is_rev and not mate_is_rev) or // <-- <-- (read_is_rev and mate_is_rev) or // <-- --> 1/2 ((read_is_rev and not mate_is_rev) and (read_start < mate_start)) or // <-- --> 2/2 ((not read_is_rev and mate_is_rev) and (read_start > mate_start))) { return false ; } return true ; } void read_bam(const std::string& path_bam) { BamFileIn bamFileIn; if (!open(bamFileIn, path_bam.c_str())) { char msg[1024] ; sprintf(msg, "ERROR: could not open input file %s", path_bam.c_str()) ; throw std::runtime_error(msg); } // read header. BamHeader header; try { readHeader(header, bamFileIn); } catch (ParseError const & e) { char msg[1024] ; sprintf(msg, "ERROR: input header is badly formatted. %s", e.what()) ; throw std::runtime_error(msg); } BamAlignmentRecord record; while (!atEnd(bamFileIn)) { readRecord(record, bamFileIn) ; bool read_rev = seqan::hasFlagRC(record) ; bool mate_rev = seqan::hasFlagNextRC(record) ; int read_start = record.beginPos ; int mate_start = record.pNext ; std::string chrom = seqan::toCString(seqan::getContigName(record, bamFileIn)) ; if(not is_good_pair(record)) { continue ; } char msg[1024] ; if(not read_rev and mate_rev) { sprintf(msg, "[fw %s %d] [rv %s %d]", chrom.c_str(), read_start, chrom.c_str(), mate_start) ; } else if(read_rev and not mate_rev) { sprintf(msg, "[rv %s %d] [fw %s %d]", chrom.c_str(), read_start, chrom.c_str(), mate_start) ; } std::cout << msg << std::endl ; } close(bamFileIn) ; } void count_record_bam(const std::string& path_bam) { BamFileIn bamFileIn; if (!open(bamFileIn, path_bam.c_str())) { char msg[1024] ; sprintf(msg, "ERROR: could not open input file %s", path_bam.c_str()) ; throw std::runtime_error(msg); } // read header. BamHeader header; try { readHeader(header, bamFileIn); } catch (ParseError const & e) { char msg[1024] ; sprintf(msg, "ERROR: input header is badly formatted. %s", e.what()) ; throw std::runtime_error(msg); } size_t n_rec = 0 ; BamAlignmentRecord record; while (!atEnd(bamFileIn)) { readRecord(record, bamFileIn) ; n_rec++ ; } close(bamFileIn) ; std::cout << "nber record : " << n_rec << std::endl ; } void check_chromosomes_bam(const std::string& path_bam) { BamFileIn bamFileIn; if (!open(bamFileIn, path_bam.c_str())) { char msg[1024] ; sprintf(msg, "ERROR: could not open input file %s", path_bam.c_str()) ; throw std::runtime_error(msg); } // read header. BamHeader header; try { readHeader(header, bamFileIn); } catch (ParseError const & e) { char msg[1024] ; sprintf(msg, "ERROR: input header is badly formatted. %s", e.what()) ; throw std::runtime_error(msg); } int chrom_n = 0 ; std::unordered_map map ; BamAlignmentRecord record; size_t i=0 ; while (!atEnd(bamFileIn)) { readRecord(record, bamFileIn) ; std::string chrom = seqan::toCString( seqan::getContigName( record, bamFileIn)) ; if(map.find(chrom) == map.end()) { map[chrom] = chrom_n ; chrom_n++ ; } /* else if(map.find(chrom) != map.end() and chrom_n-1 != map.find(chrom)->second) { auto chrom_tmp = map.find(chrom)->first ; auto chrom_n_tmp = map.find(chrom)->second ; std::cout << "sorting issue with " << chrom_tmp << " " << chrom_n_tmp << "/" << chrom_n << std::endl ; } */ std::cout << i << std::endl ; i++ ; } close(bamFileIn) ; std::cout << "chromosomes :" << std::endl << map << std::endl ; } void test(const std::string& path_bam) { BamFileIn bamFileIn; if (!open(bamFileIn, path_bam.c_str())) { char msg[1024] ; sprintf(msg, "ERROR: could not open input file %s", path_bam.c_str()) ; throw std::runtime_error(msg); } // Open output SAM which is the standard output. BamFileOut samFileOut(context(bamFileIn), std::cout, Sam()); // read header. BamHeader header; try { readHeader(header, bamFileIn) ; } catch (ParseError const & e) { char msg[1024] ; sprintf(msg, "ERROR: input header is badly formatted. %s", e.what()) ; throw std::runtime_error(msg); } size_t n_rec = 0 ; BamAlignmentRecord record; while (!atEnd(bamFileIn)) { readRecord(record, bamFileIn) ; if(n_rec == 165421293) { writeRecord(samFileOut, record) ; std::cout << is_good_read(record) << std::endl ; } else if(n_rec == 366090419) { writeRecord(samFileOut, record) ; std::cout << is_good_read(record) << std::endl ; // should bug /* std::string chrom = seqan::toCString( seqan::getContigName( record, bamFileIn)) ; */ } n_rec++ ; } close(bamFileIn) ; std::cout << "nber record : " << n_rec << std::endl ; } int copy_top_bam(const std::string& bam_path_in, const std::string& bam_path_out, int n_lines) { // Open input file, BamFileIn can read SAM and BAM files. BamFileIn bamFileIn; if (!open(bamFileIn, bam_path_in.c_str())) { std::cerr << "ERROR: Could not open " << bam_path_in << std::endl; return 1; } // Open output file, BamFileOut accepts also an ostream and a format tag. BamFileOut bamFileOut(bam_path_out.c_str()); try { // Copy header. BamHeader header; readHeader(header, bamFileIn); writeHeader(bamFileOut, header); // Copy records. BamAlignmentRecord record; int n=0 ; while (not atEnd(bamFileIn) and n