Page MenuHomec4science

No OneTemporary

File Metadata

Mon, Mar 3, 20:47


* Copyright (©) 2010-2023 EPFL (Ecole Polytechnique Fédérale de Lausanne)
* Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
* This file is part of Akantu
* Akantu is free software: you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
* Akantu is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
* You should have received a copy of the GNU Lesser General Public License
* along with Akantu. If not, see <>.
/* -------------------------------------------------------------------------- */
#include "aka_array.hh"
#include "aka_common.hh"
#include "aka_event_handler_manager.hh"
#include "communication_buffer.hh"
#include "communication_request.hh"
#include "communicator_event_handler.hh"
/* -------------------------------------------------------------------------- */
namespace akantu {
namespace debug {
class CommunicationException : public Exception {
: Exception("An exception happen during a communication process.") {}
} // namespace debug
/// @enum SynchronizerOperation reduce operation that the synchronizer can
/// perform
enum class SynchronizerOperation {
enum class CommunicationMode { _auto, _synchronous, _ready };
inline constexpr int _any_source = -1;
} // namespace akantu
namespace akantu {
struct CommunicatorInternalData {
virtual ~CommunicatorInternalData() = default;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
class Communicator : public EventHandlerManager<CommunicatorEventHandler> {
struct private_member {};
/* ------------------------------------------------------------------------ */
/* Constructors/Destructors */
/* ------------------------------------------------------------------------ */
Communicator(int & argc, char **& argv, const private_member & /*m*/);
Communicator(const private_member & /*unused*/ = private_member{});
~Communicator() override;
/* ------------------------------------------------------------------------ */
/* Methods */
/* ------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------ */
/* Point to Point */
/* ------------------------------------------------------------------------ */
template <typename T>
void probe(int sender, int tag, CommunicationStatus & status) const;
template <typename T>
bool asyncProbe(int sender, int tag, CommunicationStatus & status) const;
/* ------------------------------------------------------------------------ */
template <typename T>
inline void receive(Array<T> & values, int sender, int tag) const {
return this->receiveImpl(, values.size() * values.getNbComponent(), sender, tag);
template <typename T>
inline void receive(std::vector<T> & values, int sender, int tag) const {
using signed_int = std::make_signed_t<decltype(values.size())>;
return this->receiveImpl(, signed_int(values.size()), sender,
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void receive(Tensor & values, int sender, int tag) const {
return this->receiveImpl(, values.size(), sender, tag);
inline void receive(CommunicationBufferTemplated<true> & values, int sender,
int tag) const {
return this->receiveImpl(, values.size(), sender, tag);
inline void receive(CommunicationBufferTemplated<false> & values, int sender,
int tag) const {
CommunicationStatus status;
this->probe<char>(sender, tag, status);
return this->receiveImpl(, values.size(), sender, tag);
template <typename T>
requires std::is_arithmetic_v<T>
inline void receive(T & values, int sender, int tag) const {
return this->receiveImpl(&values, 1, sender, tag);
/* ------------------------------------------------------------------------ */
template <typename T>
inline void
send(const Array<T> & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(,
values.size() * values.getNbComponent(), receiver,
tag, mode);
template <typename T>
inline void
send(const std::vector<T> & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
using signed_int = std::make_signed_t<decltype(values.size())>;
return this->sendImpl(, signed_int(values.size()), receiver,
tag, mode);
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void
send(const Tensor & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(, values.size(), receiver, tag, mode);
template <bool is_static>
inline void
send(const CommunicationBufferTemplated<is_static> & values, int receiver,
int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(, values.size(), receiver, tag, mode);
template <typename T>
requires std::is_arithmetic_v<T>
inline void
send(const T & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(&values, 1, receiver, tag, mode);
/* ------------------------------------------------------------------------ */
template <typename T>
inline CommunicationRequest
asyncSend(const Array<T> & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->asyncSendImpl(,
values.size() * values.getNbComponent(),
receiver, tag, mode);
template <typename T>
inline CommunicationRequest
asyncSend(const std::vector<T> & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
using signed_int = std::make_signed_t<decltype(values.size())>;
return this->asyncSendImpl(, signed_int(values.size()),
receiver, tag, mode);
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline CommunicationRequest
asyncSend(const Tensor & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->asyncSendImpl(, values.size(), receiver, tag,
template <bool is_static>
inline CommunicationRequest
asyncSend(const CommunicationBufferTemplated<is_static> & values,
int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->asyncSendImpl(, values.size(), receiver, tag,
template <typename T>
requires std::is_arithmetic_v<T>
inline CommunicationRequest
asyncSend(const T & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->asyncSendImpl(&values, 1, receiver, tag, mode);
/* ------------------------------------------------------------------------ */
template <typename T>
inline CommunicationRequest asyncReceive(Array<T> & values, int sender,
int tag) const {
return this->asyncReceiveImpl(, values.size() * values.getNbComponent(), sender, tag);
template <typename T>
inline CommunicationRequest asyncReceive(std::vector<T> & values, int sender,
int tag) const {
using signed_int = std::make_signed_t<decltype(values.size())>;
return this->asyncReceiveImpl(, signed_int(values.size()),
sender, tag);
template <typename Tensor,
typename = std::enable_if_t<aka::is_tensor_v<Tensor>>>
inline CommunicationRequest asyncReceive(Tensor & values, int sender,
int tag) const {
return this->asyncReceiveImpl(, values.size(), sender, tag);
template <bool is_static>
inline CommunicationRequest
asyncReceive(CommunicationBufferTemplated<is_static> & values, int sender,
int tag) const {
return this->asyncReceiveImpl(, values.size(), sender, tag);
/* ------------------------------------------------------------------------ */
/* Collectives */
/* ------------------------------------------------------------------------ */
template <typename T>
inline void
allReduce(Array<T> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->allReduceImpl(, values.size() * values.getNbComponent(),
template <typename Derived>
inline void
allReduce(Eigen::MatrixBase<Derived> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->allReduceImpl(values.derived().data(), values.size(), op);
template <typename T>
requires std::is_arithmetic_v<T>
inline void
allReduce(T & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->allReduceImpl(&values, 1, op);
template <typename T>
inline void
scan(Array<T> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->scanImpl(, values.size() * values.getNbComponent(), op);
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void scan(Tensor & values, SynchronizerOperation op) const {
this->scanImpl(,, values.size(), op);
template <typename T>
requires std::is_arithmetic_v<T>
inline void
scan(T & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->scanImpl(&values, &values, 1, op);
template <typename T>
inline void
exclusiveScan(Array<T> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
values.size() * values.getNbComponent(), op);
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void
exclusiveScan(Tensor & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->exclusiveScanImpl(, values.size(), op);
template <typename T>
requires std::is_arithmetic_v<T>
inline void
exclusiveScan(T & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->exclusiveScanImpl(&values, &values, 1, op);
template <typename T>
requires std::is_arithmetic_v<T>
inline void
exclusiveScan(T & values, T & result,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->exclusiveScanImpl(&values, &result, 1, op);
/* ------------------------------------------------------------------------ */
template <typename T> inline void allGather(Array<T> & values) const {
AKANTU_DEBUG_ASSERT(getNbProc() == values.size(),
"The array size is not correct");
this->allGatherImpl(, values.getNbComponent());
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void allGather(Tensor & values) const {
AKANTU_DEBUG_ASSERT(values.size() / getNbProc() > 0,
"The vector size is not correct");
this->allGatherImpl(, values.size() / getNbProc());
/* ------------------------------------------------------------------------ */
template <typename T, typename integer>
requires std::signed_integral<integer>
inline void allGatherV(Array<T> & values,
const Array<integer> & sizes) const {
/* ------------------------------------------------------------------------ */
template <typename T>
inline void reduce(Array<T> & values, SynchronizerOperation op,
int root = 0) const {
this->reduceImpl(, values.size() * values.getNbComponent(), op,
/* ------------------------------------------------------------------------ */
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void gather(Tensor & values, int root = 0) const {
this->gatherImpl(, values.getNbComponent(), root);
template <typename T>
requires std::is_arithmetic_v<T>
inline void gather(T values, int root = 0) const {
this->gatherImpl(&values, 1, root);
/* ------------------------------------------------------------------------ */
template <typename Tensor, typename T>
requires aka::is_tensor_v<Tensor>
inline void gather(Tensor & values, Array<T> & gathered) const {
AKANTU_DEBUG_ASSERT(values.size() == gathered.getNbComponent(),
"The array size is not correct");
this->gatherImpl(, values.size(),,
template <typename T>
requires std::is_arithmetic_v<T>
inline void gather(T values, Array<T> & gathered) const {
this->gatherImpl(&values, 1,, 1);
/* ------------------------------------------------------------------------ */
template <typename T>
inline void gatherV(Array<T> & values, const Array<Int> & sizes,
int root = 0) const {
this->gatherVImpl(,, root);
/* ------------------------------------------------------------------------ */
template <typename T>
inline void broadcast(Array<T> & values, int root = 0) const {
this->broadcastImpl(, values.size() * values.getNbComponent(),
template <typename T>
inline void broadcast(std::vector<T> & values, int root = 0) const {
using signed_int = std::make_signed_t<decltype(values.size())>;
this->broadcastImpl(, signed_int(values.size()), root);
inline void broadcast(CommunicationBufferTemplated<true> & buffer,
int root = 0) const {
this->broadcastImpl(, buffer.size(), root);
inline void broadcast(CommunicationBufferTemplated<false> & buffer,
int root = 0) const {
auto buffer_size = buffer.size();
this->broadcastImpl(&buffer_size, 1, root);
if (whoAmI() != root) {
if (buffer_size == 0) {
this->broadcastImpl(, buffer.size(), root);
template <typename T> inline void broadcast(T & values, int root = 0) const {
this->broadcastImpl(&values, 1, root);
/* ------------------------------------------------------------------------ */
void barrier() const;
CommunicationRequest asyncBarrier() const;
/* ------------------------------------------------------------------------ */
/* Request handling */
/* ------------------------------------------------------------------------ */
static bool test(CommunicationRequest & request);
static bool testAll(std::vector<CommunicationRequest> & request);
static void wait(CommunicationRequest & request);
static void waitAll(std::vector<CommunicationRequest> & requests);
static int waitAny(std::vector<CommunicationRequest> & requests);
static inline void freeCommunicationRequest(CommunicationRequest & request);
static inline void
freeCommunicationRequest(std::vector<CommunicationRequest> & requests);
template <typename T, typename MsgProcessor>
inline void
receiveAnyNumber(std::vector<CommunicationRequest> & send_requests,
MsgProcessor && processor, int tag) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
sendImpl(const T * buffer, integer size, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void receiveImpl(T * buffer, integer size, int sender, int tag) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
CommunicationRequest asyncSendImpl(
const T * buffer, integer size, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
CommunicationRequest asyncReceiveImpl(T * buffer, integer size, int sender,
int tag) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void allReduceImpl(T * values, integer nb_values,
SynchronizerOperation op) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void scanImpl(T * values, T * results, integer nb_values,
SynchronizerOperation op) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void exclusiveScanImpl(T * values, T * results, integer nb_values,
SynchronizerOperation op) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void allGatherImpl(T * values, integer nb_values) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void allGatherVImpl(T * values, const integer * nb_values) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void reduceImpl(T * values, integer nb_values, SynchronizerOperation op,
int root = 0) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void gatherImpl(T * values, integer nb_values, int root = 0) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void gatherImpl(T * values, integer nb_values, T * gathered,
integer nb_gathered = 0) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void gatherVImpl(T * values, integer * nb_values, int root = 0) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void broadcastImpl(T * values, integer nb_values, int root = 0) const;
/* ------------------------------------------------------------------------ */
/* Accessors */
/* ------------------------------------------------------------------------ */
int getNbProc() const;
int whoAmI() const;
static Communicator & getStaticCommunicator();
static Communicator & getStaticCommunicator(int & argc, char **& argv);
int getMaxTag() const;
int getMinTag() const;
AKANTU_GET_MACRO(CommunicatorData, (*communicator_data), decltype(auto));
/* ------------------------------------------------------------------------ */
/* Class Members */
/* ------------------------------------------------------------------------ */
static std::unique_ptr<Communicator> static_communicator;
std::unique_ptr<CommunicatorInternalData> communicator_data;
inline std::ostream & operator<<(std::ostream & stream,
const CommunicationRequest & _this) {
return stream;
} // namespace akantu
#include "communicator_inline_impl.hh"

Event Timeline