diff --git a/src/synchronizer/communicator.hh b/src/synchronizer/communicator.hh index 4a1c70153..be12339a7 100644 --- a/src/synchronizer/communicator.hh +++ b/src/synchronizer/communicator.hh @@ -1,540 +1,541 @@ /** * @file communicator.hh * * @author Nicolas Richart * * @date creation: Fri Jun 18 2010 * @date last modification: Wed Nov 15 2017 * * @brief Class handling the parallel communications * * @section LICENSE * * Copyright (©) 2010-2018 EPFL (Ecole Polytechnique Fédérale de Lausanne) * Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) * * Akantu is free software: you can redistribute it and/or modify it under the * terms of the GNU Lesser General Public License as published by the Free * Software Foundation, either version 3 of the License, or (at your option) any * later version. * * Akantu is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more * details. * * You should have received a copy of the GNU Lesser General Public License * along with Akantu. If not, see . * */ /* -------------------------------------------------------------------------- */ #include "aka_array.hh" #include "aka_common.hh" #include "aka_event_handler_manager.hh" #include "communication_buffer.hh" #include "communication_request.hh" #include "communicator_event_handler.hh" /* -------------------------------------------------------------------------- */ #ifndef __AKANTU_STATIC_COMMUNICATOR_HH__ #define __AKANTU_STATIC_COMMUNICATOR_HH__ namespace akantu { namespace debug { class CommunicationException : public Exception { public: CommunicationException() : Exception("An exception happen during a communication process.") {} }; } // namespace debug /// @enum SynchronizerOperation reduce operation that the synchronizer can /// perform enum class SynchronizerOperation { _sum, _min, _max, _prod, _land, _band, _lor, _bor, _lxor, _bxor, _min_loc, _max_loc, _null }; enum class CommunicationMode { _auto, _synchronous, _ready }; namespace { int _any_source = -1; } } // namespace akantu namespace akantu { struct CommunicatorInternalData { virtual ~CommunicatorInternalData() = default; }; /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ class Communicator : public EventHandlerManager { struct private_member {}; /* ------------------------------------------------------------------------ */ /* Constructors/Destructors */ /* ------------------------------------------------------------------------ */ public: Communicator(int & argc, char **& argv, const private_member &); + Communicator(const private_member & = private_member{}); ~Communicator() override; /* ------------------------------------------------------------------------ */ /* Methods */ /* ------------------------------------------------------------------------ */ public: /* ------------------------------------------------------------------------ */ /* Point to Point */ /* ------------------------------------------------------------------------ */ template void probe(Int sender, Int tag, CommunicationStatus & status) const; template bool asyncProbe(Int sender, Int tag, CommunicationStatus & status) const; /* ------------------------------------------------------------------------ */ template inline void receive(Array & values, Int sender, Int tag) const { return this->receiveImpl( values.storage(), values.size() * values.getNbComponent(), sender, tag); } template inline void receive(std::vector & values, Int sender, Int tag) const { return this->receiveImpl(values.data(), values.size(), sender, tag); } template inline void receive(Tensor & values, Int sender, Int tag, std::enable_if_t::value> * = nullptr) const { return this->receiveImpl(values.storage(), values.size(), sender, tag); } inline void receive(CommunicationBufferTemplated & values, Int sender, Int tag) const { return this->receiveImpl(values.storage(), values.size(), sender, tag); } inline void receive(CommunicationBufferTemplated & values, Int sender, Int tag) const { CommunicationStatus status; this->probe(sender, tag, status); values.reserve(status.size()); return this->receiveImpl(values.storage(), values.size(), sender, tag); } template inline void receive(T & values, Int sender, Int tag, std::enable_if_t::value> * = nullptr) const { return this->receiveImpl(&values, 1, sender, tag); } /* ------------------------------------------------------------------------ */ template inline void send(const Array & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const { return this->sendImpl(values.storage(), values.size() * values.getNbComponent(), receiver, tag, mode); } template inline void send(const std::vector & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const { return this->sendImpl(values.data(), values.size(), receiver, tag, mode); } template inline void send(const Tensor & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto, std::enable_if_t::value> * = nullptr) const { return this->sendImpl(values.storage(), values.size(), receiver, tag, mode); } template inline void send(const CommunicationBufferTemplated & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const { return this->sendImpl(values.storage(), values.size(), receiver, tag, mode); } template inline void send(const T & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto, std::enable_if_t::value> * = nullptr) const { return this->sendImpl(&values, 1, receiver, tag, mode); } /* ------------------------------------------------------------------------ */ template inline CommunicationRequest asyncSend(const Array & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const { return this->asyncSendImpl(values.storage(), values.size() * values.getNbComponent(), receiver, tag, mode); } template inline CommunicationRequest asyncSend(const std::vector & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const { return this->asyncSendImpl(values.data(), values.size(), receiver, tag, mode); } template inline CommunicationRequest asyncSend(const Tensor & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto, std::enable_if_t::value> * = nullptr) const { return this->asyncSendImpl(values.storage(), values.size(), receiver, tag, mode); } template inline CommunicationRequest asyncSend(const CommunicationBufferTemplated & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const { return this->asyncSendImpl(values.storage(), values.size(), receiver, tag, mode); } template inline CommunicationRequest asyncSend(const T & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto, std::enable_if_t::value> * = nullptr) const { return this->asyncSendImpl(&values, 1, receiver, tag, mode); } /* ------------------------------------------------------------------------ */ template inline CommunicationRequest asyncReceive(Array & values, Int sender, Int tag) const { return this->asyncReceiveImpl( values.storage(), values.size() * values.getNbComponent(), sender, tag); } template inline CommunicationRequest asyncReceive(std::vector & values, Int sender, Int tag) const { return this->asyncReceiveImpl(values.data(), values.size(), sender, tag); } template ::value>> inline CommunicationRequest asyncReceive(Tensor & values, Int sender, Int tag) const { return this->asyncReceiveImpl(values.storage(), values.size(), sender, tag); } template inline CommunicationRequest asyncReceive(CommunicationBufferTemplated & values, Int sender, Int tag) const { return this->asyncReceiveImpl(values.storage(), values.size(), sender, tag); } /* ------------------------------------------------------------------------ */ /* Collectives */ /* ------------------------------------------------------------------------ */ template inline void allReduce(Array & values, SynchronizerOperation op = SynchronizerOperation::_sum) const { this->allReduceImpl(values.storage(), values.size() * values.getNbComponent(), op); } template inline void allReduce(Tensor & values, SynchronizerOperation op = SynchronizerOperation::_sum, std::enable_if_t::value> * = nullptr) const { this->allReduceImpl(values.storage(), values.size(), op); } template inline void allReduce(T & values, SynchronizerOperation op = SynchronizerOperation::_sum, std::enable_if_t::value> * = nullptr) const { this->allReduceImpl(&values, 1, op); } template inline void scan(Array & values, SynchronizerOperation op = SynchronizerOperation::_sum) const { this->scanImpl(values.storage(), values.storage(), values.size() * values.getNbComponent(), op); } template inline void scan(Tensor & values, SynchronizerOperation op, std::enable_if_t::value> * = nullptr) const { this->scanImpl(values.storage(), values.storage(), values.size(), op); } template inline void scan(T & values, SynchronizerOperation op = SynchronizerOperation::_sum, std::enable_if_t::value> * = nullptr) const { this->scanImpl(&values, &values, 1, op); } template inline void exclusiveScan(Array & values, SynchronizerOperation op = SynchronizerOperation::_sum) const { this->exclusiveScanImpl(values.storage(), values.storage(), values.size() * values.getNbComponent(), op); } template inline void exclusiveScan( Tensor & values, SynchronizerOperation op = SynchronizerOperation::_sum, std::enable_if_t::value> * = nullptr) const { this->exclusiveScanImpl(values.storage(), values.storage(), values.size(), op); } template inline void exclusiveScan( T & values, SynchronizerOperation op = SynchronizerOperation::_sum, std::enable_if_t::value> * = nullptr) const { this->exclusiveScanImpl(&values, &values, 1, op); } template inline void exclusiveScan( T & values, T & result, SynchronizerOperation op = SynchronizerOperation::_sum, std::enable_if_t::value> * = nullptr) const { this->exclusiveScanImpl(&values, &result, 1, op); } /* ------------------------------------------------------------------------ */ template inline void allGather(Array & values) const { AKANTU_DEBUG_ASSERT(UInt(psize) == values.size(), "The array size is not correct"); this->allGatherImpl(values.storage(), values.getNbComponent()); } template ::value>> inline void allGather(Tensor & values) const { AKANTU_DEBUG_ASSERT(values.size() / UInt(psize) > 0, "The vector size is not correct"); this->allGatherImpl(values.storage(), values.size() / UInt(psize)); } /* ------------------------------------------------------------------------ */ template inline void allGatherV(Array & values, const Array & sizes) const { this->allGatherVImpl(values.storage(), sizes.storage()); } /* ------------------------------------------------------------------------ */ template inline void reduce(Array & values, SynchronizerOperation op, int root = 0) const { this->reduceImpl(values.storage(), values.size() * values.getNbComponent(), op, root); } /* ------------------------------------------------------------------------ */ template inline void gather(Tensor & values, int root = 0, std::enable_if_t::value> * = nullptr) const { this->gatherImpl(values.storage(), values.getNbComponent(), root); } template inline void gather(T values, int root = 0, std::enable_if_t::value> * = nullptr) const { this->gatherImpl(&values, 1, root); } /* ------------------------------------------------------------------------ */ template inline void gather(Tensor & values, Array & gathered, std::enable_if_t::value> * = nullptr) const { AKANTU_DEBUG_ASSERT(values.size() == gathered.getNbComponent(), "The array size is not correct"); gathered.resize(psize); this->gatherImpl(values.data(), values.size(), gathered.storage(), gathered.getNbComponent()); } template inline void gather(T values, Array & gathered, std::enable_if_t::value> * = nullptr) const { this->gatherImpl(&values, 1, gathered.storage(), 1); } /* ------------------------------------------------------------------------ */ template inline void gatherV(Array & values, const Array & sizes, int root = 0) const { this->gatherVImpl(values.storage(), sizes.storage(), root); } /* ------------------------------------------------------------------------ */ template inline void broadcast(Array & values, int root = 0) const { this->broadcastImpl(values.storage(), values.size() * values.getNbComponent(), root); } template inline void broadcast(std::vector & values, int root = 0) const { this->broadcastImpl(values.data(), values.size(), root); } inline void broadcast(CommunicationBufferTemplated & buffer, int root = 0) const { this->broadcastImpl(buffer.storage(), buffer.size(), root); } inline void broadcast(CommunicationBufferTemplated & buffer, int root = 0) const { UInt buffer_size = buffer.size(); this->broadcastImpl(&buffer_size, 1, root); if (prank != root) buffer.reserve(buffer_size); if (buffer_size == 0) return; this->broadcastImpl(buffer.storage(), buffer.size(), root); } template inline void broadcast(T & values, int root = 0) const { this->broadcastImpl(&values, 1, root); } /* ------------------------------------------------------------------------ */ void barrier() const; CommunicationRequest asyncBarrier() const; /* ------------------------------------------------------------------------ */ /* Request handling */ /* ------------------------------------------------------------------------ */ bool test(CommunicationRequest & request) const; bool testAll(std::vector & request) const; void wait(CommunicationRequest & request) const; void waitAll(std::vector & requests) const; UInt waitAny(std::vector & requests) const; inline void freeCommunicationRequest(CommunicationRequest & request) const; inline void freeCommunicationRequest(std::vector & requests) const; template inline void receiveAnyNumber(std::vector & send_requests, MsgProcessor && processor, Int tag) const; protected: template void sendImpl(const T * buffer, Int size, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const; template void receiveImpl(T * buffer, Int size, Int sender, Int tag) const; template CommunicationRequest asyncSendImpl( const T * buffer, Int size, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const; template CommunicationRequest asyncReceiveImpl(T * buffer, Int size, Int sender, Int tag) const; template void allReduceImpl(T * values, int nb_values, SynchronizerOperation op) const; template void scanImpl(T * values, T * results, int nb_values, SynchronizerOperation op) const; template void exclusiveScanImpl(T * values, T * results, int nb_values, SynchronizerOperation op) const; template void allGatherImpl(T * values, int nb_values) const; template void allGatherVImpl(T * values, int * nb_values) const; template void reduceImpl(T * values, int nb_values, SynchronizerOperation op, int root = 0) const; template void gatherImpl(T * values, int nb_values, int root = 0) const; template void gatherImpl(T * values, int nb_values, T * gathered, int nb_gathered = 0) const; template void gatherVImpl(T * values, int * nb_values, int root = 0) const; template void broadcastImpl(T * values, int nb_values, int root = 0) const; /* ------------------------------------------------------------------------ */ /* Accessors */ /* ------------------------------------------------------------------------ */ public: Int getNbProc() const { return psize; }; Int whoAmI() const { return prank; }; static Communicator & getStaticCommunicator(); static Communicator & getStaticCommunicator(int & argc, char **& argv); int getMaxTag() const; int getMinTag() const; AKANTU_GET_MACRO(CommunicatorData, (*communicator_data), decltype(auto)); /* ------------------------------------------------------------------------ */ /* Class Members */ /* ------------------------------------------------------------------------ */ private: static std::unique_ptr static_communicator; protected: Int prank{0}; Int psize{1}; std::unique_ptr communicator_data; }; inline std::ostream & operator<<(std::ostream & stream, const CommunicationRequest & _this) { _this.printself(stream); return stream; } } // namespace akantu #include "communicator_inline_impl.hh" #endif /* __AKANTU_STATIC_COMMUNICATOR_HH__ */ diff --git a/src/synchronizer/communicator_dummy_inline_impl.cc b/src/synchronizer/communicator_dummy_inline_impl.cc index 24322d851..99320fb6d 100644 --- a/src/synchronizer/communicator_dummy_inline_impl.cc +++ b/src/synchronizer/communicator_dummy_inline_impl.cc @@ -1,130 +1,132 @@ /** * @file communicator_dummy_inline_impl.cc * * @author Nicolas Richart * * @date creation: Tue Nov 07 2017 * @date last modification: Fri Nov 10 2017 * * @brief Dummy communicator to make everything work im sequential * * @section LICENSE * * Copyright (©) 2016-2018 EPFL (Ecole Polytechnique Fédérale de Lausanne) * Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) * * Akantu is free software: you can redistribute it and/or modify it under the * terms of the GNU Lesser General Public License as published by the Free * Software Foundation, either version 3 of the License, or (at your option) any * later version. * * Akantu is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more * details. * * You should have received a copy of the GNU Lesser General Public License * along with Akantu. If not, see . * */ /* -------------------------------------------------------------------------- */ #include "communicator.hh" /* -------------------------------------------------------------------------- */ #include #include #include /* -------------------------------------------------------------------------- */ namespace akantu { Communicator::Communicator(int & /*argc*/, char **& /*argv*/, const private_member & /*unused*/) {} +Communicator::Communicator(const private_member & /*unused*/) {} + template void Communicator::sendImpl(const T *, Int, Int, Int, const CommunicationMode &) const {} template void Communicator::receiveImpl(T *, Int, Int, Int) const {} template CommunicationRequest Communicator::asyncSendImpl(const T *, Int, Int, Int, const CommunicationMode &) const { return std::shared_ptr( new InternalCommunicationRequest(0, 0)); } template CommunicationRequest Communicator::asyncReceiveImpl(T *, Int, Int, Int) const { return std::shared_ptr( new InternalCommunicationRequest(0, 0)); } template void Communicator::probe(Int, Int, CommunicationStatus &) const {} template bool Communicator::asyncProbe(Int, Int, CommunicationStatus &) const { return true; } bool Communicator::test(CommunicationRequest &) const { return true; } bool Communicator::testAll(std::vector &) const { return true; } void Communicator::wait(CommunicationRequest &) const {} void Communicator::waitAll(std::vector &) const {} UInt Communicator::waitAny(std::vector &) const { return UInt(-1); } void Communicator::barrier() const {} CommunicationRequest Communicator::asyncBarrier() const { return std::shared_ptr( new InternalCommunicationRequest(0, 0)); } template void Communicator::reduceImpl(T *, int, SynchronizerOperation, int) const {} template void Communicator::allReduceImpl(T *, int, SynchronizerOperation) const {} template void Communicator::scanImpl(T * values, T * result, int n, SynchronizerOperation) const { if (values == result) return; std::copy_n(values, n, result); } template void Communicator::exclusiveScanImpl(T * /*values*/, T * result, int n, SynchronizerOperation) const { std::fill_n(result, n, T()); } template inline void Communicator::allGatherImpl(T *, int) const {} template inline void Communicator::allGatherVImpl(T *, int *) const {} template inline void Communicator::gatherImpl(T *, int, int) const {} template void Communicator::gatherImpl(T * values, int nb_values, T * gathered, int) const { static_assert(std::is_trivially_copyable{}, "Cannot send this type of data"); std::memcpy(gathered, values, nb_values); } template inline void Communicator::gatherVImpl(T *, int *, int) const {} template inline void Communicator::broadcastImpl(T *, int, int) const {} int Communicator::getMaxTag() const { return std::numeric_limits::max(); } int Communicator::getMinTag() const { return 0; } } // namespace akantu diff --git a/src/synchronizer/communicator_mpi_inline_impl.cc b/src/synchronizer/communicator_mpi_inline_impl.cc index 9680efdba..619ffe19d 100644 --- a/src/synchronizer/communicator_mpi_inline_impl.cc +++ b/src/synchronizer/communicator_mpi_inline_impl.cc @@ -1,495 +1,499 @@ /** * @file communicator_mpi_inline_impl.cc * * @author Nicolas Richart * * @date creation: Tue Nov 07 2017 * @date last modification: Mon Dec 18 2017 * * @brief StaticCommunicatorMPI implementation * * @section LICENSE * * Copyright (©) 2016-2018 EPFL (Ecole Polytechnique Fédérale de Lausanne) * Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) * * Akantu is free software: you can redistribute it and/or modify it under the * terms of the GNU Lesser General Public License as published by the Free * Software Foundation, either version 3 of the License, or (at your option) any * later version. * * Akantu is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more * details. * * You should have received a copy of the GNU Lesser General Public License * along with Akantu. If not, see . * */ /* -------------------------------------------------------------------------- */ #include "aka_iterators.hh" #include "communicator.hh" #include "mpi_communicator_data.hh" /* -------------------------------------------------------------------------- */ #include #include #include #include /* -------------------------------------------------------------------------- */ #include /* -------------------------------------------------------------------------- */ #if (defined(__GNUC__) || defined(__GNUG__)) #if AKA_GCC_VERSION < 60000 namespace std { template <> struct hash { using argument_type = akantu::SynchronizerOperation; size_t operator()(const argument_type & e) const noexcept { auto ue = underlying_type_t(e); return uh(ue); } private: const hash> uh{}; }; } // namespace std #endif #endif namespace akantu { class CommunicationRequestMPI : public InternalCommunicationRequest { public: CommunicationRequestMPI(UInt source, UInt dest) : InternalCommunicationRequest(source, dest), request(std::make_unique()) {} MPI_Request & getMPIRequest() { return *request; }; private: std::unique_ptr request; }; namespace { template inline MPI_Datatype getMPIDatatype(); MPI_Op getMPISynchronizerOperation(SynchronizerOperation op) { std::unordered_map _operations{ {SynchronizerOperation::_sum, MPI_SUM}, {SynchronizerOperation::_min, MPI_MIN}, {SynchronizerOperation::_max, MPI_MAX}, {SynchronizerOperation::_prod, MPI_PROD}, {SynchronizerOperation::_land, MPI_LAND}, {SynchronizerOperation::_band, MPI_BAND}, {SynchronizerOperation::_lor, MPI_LOR}, {SynchronizerOperation::_bor, MPI_BOR}, {SynchronizerOperation::_lxor, MPI_LXOR}, {SynchronizerOperation::_bxor, MPI_BXOR}, {SynchronizerOperation::_min_loc, MPI_MINLOC}, {SynchronizerOperation::_max_loc, MPI_MAXLOC}, {SynchronizerOperation::_null, MPI_OP_NULL}}; return _operations[op]; } template MPI_Datatype inline getMPIDatatype() { return MPI_DATATYPE_NULL; } #define SPECIALIZE_MPI_DATATYPE(type, mpi_type) \ template <> MPI_Datatype inline getMPIDatatype() { return mpi_type; } #define COMMA , SPECIALIZE_MPI_DATATYPE(char, MPI_CHAR) SPECIALIZE_MPI_DATATYPE(std::uint8_t, MPI_UINT8_T) SPECIALIZE_MPI_DATATYPE(float, MPI_FLOAT) SPECIALIZE_MPI_DATATYPE(double, MPI_DOUBLE) SPECIALIZE_MPI_DATATYPE(long double, MPI_LONG_DOUBLE) SPECIALIZE_MPI_DATATYPE(signed int, MPI_INT) SPECIALIZE_MPI_DATATYPE(unsigned int, MPI_UNSIGNED) SPECIALIZE_MPI_DATATYPE(signed long int, MPI_LONG) SPECIALIZE_MPI_DATATYPE(unsigned long int, MPI_UNSIGNED_LONG) SPECIALIZE_MPI_DATATYPE(signed long long int, MPI_LONG_LONG) SPECIALIZE_MPI_DATATYPE(unsigned long long int, MPI_UNSIGNED_LONG_LONG) SPECIALIZE_MPI_DATATYPE(SCMinMaxLoc, MPI_DOUBLE_INT) SPECIALIZE_MPI_DATATYPE(SCMinMaxLoc, MPI_FLOAT_INT) SPECIALIZE_MPI_DATATYPE(bool, MPI_CXX_BOOL) template <> MPI_Datatype inline getMPIDatatype() { return getMPIDatatype>(); } inline int getMPISource(int src) { if (src == _any_source) return MPI_ANY_SOURCE; return src; } decltype(auto) convertRequests(std::vector & requests) { std::vector mpi_requests(requests.size()); for (auto && request_pair : zip(requests, mpi_requests)) { auto && req = std::get<0>(request_pair); auto && mpi_req = std::get<1>(request_pair); mpi_req = aka::as_type(req.getInternal()) .getMPIRequest(); } return mpi_requests; } } // namespace // this is ugly but shorten the code a lot #define MPIDATA \ (*reinterpret_cast(communicator_data.get())) /* -------------------------------------------------------------------------- */ /* Implementation */ /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ Communicator::Communicator(int & /*argc*/, char **& /*argv*/, - const private_member & /*unused*/) + const private_member & m) + : Communicator(m) {} + +/* -------------------------------------------------------------------------- */ +Communicator::Communicator(const private_member &) : communicator_data(std::make_unique()) { prank = MPIDATA.rank(); psize = MPIDATA.size(); } /* -------------------------------------------------------------------------- */ template void Communicator::sendImpl(const T * buffer, Int size, Int receiver, Int tag, const CommunicationMode & mode) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); MPI_Datatype type = getMPIDatatype(); switch (mode) { case CommunicationMode::_auto: MPI_Send(buffer, size, type, receiver, tag, communicator); break; case CommunicationMode::_synchronous: MPI_Ssend(buffer, size, type, receiver, tag, communicator); break; case CommunicationMode::_ready: MPI_Rsend(buffer, size, type, receiver, tag, communicator); break; } } /* -------------------------------------------------------------------------- */ template void Communicator::receiveImpl(T * buffer, Int size, Int sender, Int tag) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); MPI_Status status; MPI_Datatype type = getMPIDatatype(); MPI_Recv(buffer, size, type, getMPISource(sender), tag, communicator, &status); } /* -------------------------------------------------------------------------- */ template CommunicationRequest Communicator::asyncSendImpl(const T * buffer, Int size, Int receiver, Int tag, const CommunicationMode & mode) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); auto * request = new CommunicationRequestMPI(prank, receiver); MPI_Request & req = request->getMPIRequest(); MPI_Datatype type = getMPIDatatype(); switch (mode) { case CommunicationMode::_auto: MPI_Isend(buffer, size, type, receiver, tag, communicator, &req); break; case CommunicationMode::_synchronous: MPI_Issend(buffer, size, type, receiver, tag, communicator, &req); break; case CommunicationMode::_ready: MPI_Irsend(buffer, size, type, receiver, tag, communicator, &req); break; } return std::shared_ptr(request); } /* -------------------------------------------------------------------------- */ template CommunicationRequest Communicator::asyncReceiveImpl(T * buffer, Int size, Int sender, Int tag) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); auto * request = new CommunicationRequestMPI(sender, prank); MPI_Datatype type = getMPIDatatype(); MPI_Request & req = request->getMPIRequest(); MPI_Irecv(buffer, size, type, getMPISource(sender), tag, communicator, &req); return std::shared_ptr(request); } /* -------------------------------------------------------------------------- */ template void Communicator::probe(Int sender, Int tag, CommunicationStatus & status) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); MPI_Status mpi_status; MPI_Probe(getMPISource(sender), tag, communicator, &mpi_status); MPI_Datatype type = getMPIDatatype(); int count; MPI_Get_count(&mpi_status, type, &count); status.setSource(mpi_status.MPI_SOURCE); status.setTag(mpi_status.MPI_TAG); status.setSize(count); } /* -------------------------------------------------------------------------- */ template bool Communicator::asyncProbe(Int sender, Int tag, CommunicationStatus & status) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); MPI_Status mpi_status; int test; MPI_Iprobe(getMPISource(sender), tag, communicator, &test, &mpi_status); if (not test) return false; MPI_Datatype type = getMPIDatatype(); int count; MPI_Get_count(&mpi_status, type, &count); status.setSource(mpi_status.MPI_SOURCE); status.setTag(mpi_status.MPI_TAG); status.setSize(count); return true; } /* -------------------------------------------------------------------------- */ bool Communicator::test(CommunicationRequest & request) const { MPI_Status status; int flag; auto & req_mpi = aka::as_type(request.getInternal()); MPI_Request & req = req_mpi.getMPIRequest(); MPI_Test(&req, &flag, &status); return flag; } /* -------------------------------------------------------------------------- */ bool Communicator::testAll(std::vector & requests) const { // int are_finished; // auto && mpi_requests = convertRequests(requests); // MPI_Testall(mpi_requests.size(), mpi_requests.data(), &are_finished, // MPI_STATUSES_IGNORE); // return are_finished; for (auto & request : requests) { if (not test(request)) return false; } return true; } /* -------------------------------------------------------------------------- */ void Communicator::wait(CommunicationRequest & request) const { MPI_Status status; auto & req_mpi = aka::as_type(request.getInternal()); MPI_Request & req = req_mpi.getMPIRequest(); MPI_Wait(&req, &status); } /* -------------------------------------------------------------------------- */ void Communicator::waitAll(std::vector & requests) const { auto && mpi_requests = convertRequests(requests); MPI_Waitall(mpi_requests.size(), mpi_requests.data(), MPI_STATUSES_IGNORE); } /* -------------------------------------------------------------------------- */ UInt Communicator::waitAny(std::vector & requests) const { auto && mpi_requests = convertRequests(requests); int pos; MPI_Waitany(mpi_requests.size(), mpi_requests.data(), &pos, MPI_STATUSES_IGNORE); if (pos != MPI_UNDEFINED) { return pos; } else { return UInt(-1); } } /* -------------------------------------------------------------------------- */ void Communicator::barrier() const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); MPI_Barrier(communicator); } /* -------------------------------------------------------------------------- */ CommunicationRequest Communicator::asyncBarrier() const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); auto * request = new CommunicationRequestMPI(0, 0); MPI_Request & req = request->getMPIRequest(); MPI_Ibarrier(communicator, &req); return std::shared_ptr(request); } /* -------------------------------------------------------------------------- */ template void Communicator::reduceImpl(T * values, int nb_values, SynchronizerOperation op, int root) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); MPI_Datatype type = getMPIDatatype(); MPI_Reduce(MPI_IN_PLACE, values, nb_values, type, getMPISynchronizerOperation(op), root, communicator); } /* -------------------------------------------------------------------------- */ template void Communicator::allReduceImpl(T * values, int nb_values, SynchronizerOperation op) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); MPI_Datatype type = getMPIDatatype(); MPI_Allreduce(MPI_IN_PLACE, values, nb_values, type, getMPISynchronizerOperation(op), communicator); } /* -------------------------------------------------------------------------- */ template void Communicator::scanImpl(T * values, T * result, int nb_values, SynchronizerOperation op) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); MPI_Datatype type = getMPIDatatype(); if (values == result) { values = reinterpret_cast(MPI_IN_PLACE); } MPI_Scan(values, result, nb_values, type, getMPISynchronizerOperation(op), communicator); } /* -------------------------------------------------------------------------- */ template void Communicator::exclusiveScanImpl(T * values, T * result, int nb_values, SynchronizerOperation op) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); MPI_Datatype type = getMPIDatatype(); if (values == result) { values = reinterpret_cast(MPI_IN_PLACE); } MPI_Exscan(values, result, nb_values, type, getMPISynchronizerOperation(op), communicator); if (prank == 0) { result[0] = T(); } } /* -------------------------------------------------------------------------- */ template void Communicator::allGatherImpl(T * values, int nb_values) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); MPI_Datatype type = getMPIDatatype(); MPI_Allgather(MPI_IN_PLACE, nb_values, type, values, nb_values, type, communicator); } /* -------------------------------------------------------------------------- */ template void Communicator::allGatherVImpl(T * values, int * nb_values) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); std::vector displs(psize); displs[0] = 0; for (int i = 1; i < psize; ++i) { displs[i] = displs[i - 1] + nb_values[i - 1]; } MPI_Datatype type = getMPIDatatype(); MPI_Allgatherv(MPI_IN_PLACE, *nb_values, type, values, nb_values, displs.data(), type, communicator); } /* -------------------------------------------------------------------------- */ template void Communicator::gatherImpl(T * values, int nb_values, int root) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); T *send_buf = nullptr, *recv_buf = nullptr; if (prank == root) { send_buf = (T *)MPI_IN_PLACE; recv_buf = values; } else { send_buf = values; } MPI_Datatype type = getMPIDatatype(); MPI_Gather(send_buf, nb_values, type, recv_buf, nb_values, type, root, communicator); } /* -------------------------------------------------------------------------- */ template void Communicator::gatherImpl(T * values, int nb_values, T * gathered, int nb_gathered) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); T * send_buf = values; T * recv_buf = gathered; if (nb_gathered == 0) nb_gathered = nb_values; MPI_Datatype type = getMPIDatatype(); MPI_Gather(send_buf, nb_values, type, recv_buf, nb_gathered, type, this->prank, communicator); } /* -------------------------------------------------------------------------- */ template void Communicator::gatherVImpl(T * values, int * nb_values, int root) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); int * displs = nullptr; if (prank == root) { displs = new int[psize]; displs[0] = 0; for (int i = 1; i < psize; ++i) { displs[i] = displs[i - 1] + nb_values[i - 1]; } } T *send_buf = nullptr, *recv_buf = nullptr; if (prank == root) { send_buf = (T *)MPI_IN_PLACE; recv_buf = values; } else send_buf = values; MPI_Datatype type = getMPIDatatype(); MPI_Gatherv(send_buf, *nb_values, type, recv_buf, nb_values, displs, type, root, communicator); if (prank == root) { delete[] displs; } } /* -------------------------------------------------------------------------- */ template void Communicator::broadcastImpl(T * values, int nb_values, int root) const { MPI_Comm communicator = MPIDATA.getMPICommunicator(); MPI_Datatype type = getMPIDatatype(); MPI_Bcast(values, nb_values, type, root, communicator); } /* -------------------------------------------------------------------------- */ int Communicator::getMaxTag() const { return MPIDATA.getMaxTag(); } int Communicator::getMinTag() const { return 0; } /* -------------------------------------------------------------------------- */ } // namespace akantu