Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F103665765
communicator.hh
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Mon, Mar 3, 20:47
Size
20 KB
Mime Type
text/x-c++
Expires
Wed, Mar 5, 20:47 (2 d)
Engine
blob
Format
Raw Data
Handle
24633723
Attached To
rAKA akantu
communicator.hh
View Options
/**
* Copyright (©) 2010-2023 EPFL (Ecole Polytechnique Fédérale de Lausanne)
* Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
*
* This file is part of Akantu
*
* Akantu is free software: you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* Akantu is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Akantu. If not, see <http://www.gnu.org/licenses/>.
*/
/* -------------------------------------------------------------------------- */
#include "aka_array.hh"
#include "aka_common.hh"
#include "aka_event_handler_manager.hh"
#include "communication_buffer.hh"
#include "communication_request.hh"
#include "communicator_event_handler.hh"
/* -------------------------------------------------------------------------- */
#ifndef AKANTU_STATIC_COMMUNICATOR_HH_
#define AKANTU_STATIC_COMMUNICATOR_HH_
namespace akantu {
namespace debug {
class CommunicationException : public Exception {
public:
CommunicationException()
: Exception("An exception happen during a communication process.") {}
};
} // namespace debug
/// @enum SynchronizerOperation reduce operation that the synchronizer can
/// perform
enum class SynchronizerOperation {
_sum,
_min,
_max,
_prod,
_land,
_band,
_lor,
_bor,
_lxor,
_bxor,
_min_loc,
_max_loc,
_null
};
enum class CommunicationMode { _auto, _synchronous, _ready };
inline constexpr int _any_source = -1;
} // namespace akantu
namespace akantu {
struct CommunicatorInternalData {
virtual ~CommunicatorInternalData() = default;
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
class Communicator : public EventHandlerManager<CommunicatorEventHandler> {
struct private_member {};
/* ------------------------------------------------------------------------ */
/* Constructors/Destructors */
/* ------------------------------------------------------------------------ */
public:
Communicator(int & argc, char **& argv, const private_member & /*m*/);
Communicator(const private_member & /*unused*/ = private_member{});
~Communicator() override;
/* ------------------------------------------------------------------------ */
/* Methods */
/* ------------------------------------------------------------------------ */
public:
/* ------------------------------------------------------------------------ */
/* Point to Point */
/* ------------------------------------------------------------------------ */
template <typename T>
void probe(int sender, int tag, CommunicationStatus & status) const;
template <typename T>
bool asyncProbe(int sender, int tag, CommunicationStatus & status) const;
/* ------------------------------------------------------------------------ */
template <typename T>
inline void receive(Array<T> & values, int sender, int tag) const {
return this->receiveImpl(
values.data(), values.size() * values.getNbComponent(), sender, tag);
}
template <typename T>
inline void receive(std::vector<T> & values, int sender, int tag) const {
using signed_int = std::make_signed_t<decltype(values.size())>;
return this->receiveImpl(values.data(), signed_int(values.size()), sender,
tag);
}
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void receive(Tensor & values, int sender, int tag) const {
return this->receiveImpl(values.data(), values.size(), sender, tag);
}
inline void receive(CommunicationBufferTemplated<true> & values, int sender,
int tag) const {
return this->receiveImpl(values.data(), values.size(), sender, tag);
}
inline void receive(CommunicationBufferTemplated<false> & values, int sender,
int tag) const {
CommunicationStatus status;
this->probe<char>(sender, tag, status);
values.reserve(status.size());
return this->receiveImpl(values.data(), values.size(), sender, tag);
}
template <typename T>
requires std::is_arithmetic_v<T>
inline void receive(T & values, int sender, int tag) const {
return this->receiveImpl(&values, 1, sender, tag);
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline void
send(const Array<T> & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(values.data(),
values.size() * values.getNbComponent(), receiver,
tag, mode);
}
template <typename T>
inline void
send(const std::vector<T> & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
using signed_int = std::make_signed_t<decltype(values.size())>;
return this->sendImpl(values.data(), signed_int(values.size()), receiver,
tag, mode);
}
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void
send(const Tensor & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(values.data(), values.size(), receiver, tag, mode);
}
template <bool is_static>
inline void
send(const CommunicationBufferTemplated<is_static> & values, int receiver,
int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(values.data(), values.size(), receiver, tag, mode);
}
template <typename T>
requires std::is_arithmetic_v<T>
inline void
send(const T & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(&values, 1, receiver, tag, mode);
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline CommunicationRequest
asyncSend(const Array<T> & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->asyncSendImpl(values.data(),
values.size() * values.getNbComponent(),
receiver, tag, mode);
}
template <typename T>
inline CommunicationRequest
asyncSend(const std::vector<T> & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
using signed_int = std::make_signed_t<decltype(values.size())>;
return this->asyncSendImpl(values.data(), signed_int(values.size()),
receiver, tag, mode);
}
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline CommunicationRequest
asyncSend(const Tensor & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->asyncSendImpl(values.data(), values.size(), receiver, tag,
mode);
}
template <bool is_static>
inline CommunicationRequest
asyncSend(const CommunicationBufferTemplated<is_static> & values,
int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->asyncSendImpl(values.data(), values.size(), receiver, tag,
mode);
}
template <typename T>
requires std::is_arithmetic_v<T>
inline CommunicationRequest
asyncSend(const T & values, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->asyncSendImpl(&values, 1, receiver, tag, mode);
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline CommunicationRequest asyncReceive(Array<T> & values, int sender,
int tag) const {
return this->asyncReceiveImpl(
values.data(), values.size() * values.getNbComponent(), sender, tag);
}
template <typename T>
inline CommunicationRequest asyncReceive(std::vector<T> & values, int sender,
int tag) const {
using signed_int = std::make_signed_t<decltype(values.size())>;
return this->asyncReceiveImpl(values.data(), signed_int(values.size()),
sender, tag);
}
template <typename Tensor,
typename = std::enable_if_t<aka::is_tensor_v<Tensor>>>
inline CommunicationRequest asyncReceive(Tensor & values, int sender,
int tag) const {
return this->asyncReceiveImpl(values.data(), values.size(), sender, tag);
}
template <bool is_static>
inline CommunicationRequest
asyncReceive(CommunicationBufferTemplated<is_static> & values, int sender,
int tag) const {
return this->asyncReceiveImpl(values.data(), values.size(), sender, tag);
}
/* ------------------------------------------------------------------------ */
/* Collectives */
/* ------------------------------------------------------------------------ */
template <typename T>
inline void
allReduce(Array<T> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->allReduceImpl(values.data(), values.size() * values.getNbComponent(),
op);
}
template <typename Derived>
inline void
allReduce(Eigen::MatrixBase<Derived> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->allReduceImpl(values.derived().data(), values.size(), op);
}
template <typename T>
requires std::is_arithmetic_v<T>
inline void
allReduce(T & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->allReduceImpl(&values, 1, op);
}
template <typename T>
inline void
scan(Array<T> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->scanImpl(values.data(), values.size() * values.getNbComponent(), op);
}
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void scan(Tensor & values, SynchronizerOperation op) const {
this->scanImpl(values.data(), values.data(), values.size(), op);
}
template <typename T>
requires std::is_arithmetic_v<T>
inline void
scan(T & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->scanImpl(&values, &values, 1, op);
}
template <typename T>
inline void
exclusiveScan(Array<T> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->exclusiveScanImpl(values.data(), values.data(),
values.size() * values.getNbComponent(), op);
}
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void
exclusiveScan(Tensor & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->exclusiveScanImpl(values.data(), values.size(), op);
}
template <typename T>
requires std::is_arithmetic_v<T>
inline void
exclusiveScan(T & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->exclusiveScanImpl(&values, &values, 1, op);
}
template <typename T>
requires std::is_arithmetic_v<T>
inline void
exclusiveScan(T & values, T & result,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->exclusiveScanImpl(&values, &result, 1, op);
}
/* ------------------------------------------------------------------------ */
template <typename T> inline void allGather(Array<T> & values) const {
AKANTU_DEBUG_ASSERT(getNbProc() == values.size(),
"The array size is not correct");
this->allGatherImpl(values.data(), values.getNbComponent());
}
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void allGather(Tensor & values) const {
AKANTU_DEBUG_ASSERT(values.size() / getNbProc() > 0,
"The vector size is not correct");
this->allGatherImpl(values.data(), values.size() / getNbProc());
}
/* ------------------------------------------------------------------------ */
template <typename T, typename integer>
requires std::signed_integral<integer>
inline void allGatherV(Array<T> & values,
const Array<integer> & sizes) const {
this->allGatherVImpl(values.data(), sizes.data());
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline void reduce(Array<T> & values, SynchronizerOperation op,
int root = 0) const {
this->reduceImpl(values.data(), values.size() * values.getNbComponent(), op,
root);
}
/* ------------------------------------------------------------------------ */
template <typename Tensor>
requires aka::is_tensor_v<Tensor>
inline void gather(Tensor & values, int root = 0) const {
this->gatherImpl(values.data(), values.getNbComponent(), root);
}
template <typename T>
requires std::is_arithmetic_v<T>
inline void gather(T values, int root = 0) const {
this->gatherImpl(&values, 1, root);
}
/* ------------------------------------------------------------------------ */
template <typename Tensor, typename T>
requires aka::is_tensor_v<Tensor>
inline void gather(Tensor & values, Array<T> & gathered) const {
AKANTU_DEBUG_ASSERT(values.size() == gathered.getNbComponent(),
"The array size is not correct");
gathered.resize(getNbProc());
this->gatherImpl(values.data(), values.size(), gathered.data(),
gathered.getNbComponent());
}
template <typename T>
requires std::is_arithmetic_v<T>
inline void gather(T values, Array<T> & gathered) const {
this->gatherImpl(&values, 1, gathered.data(), 1);
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline void gatherV(Array<T> & values, const Array<Int> & sizes,
int root = 0) const {
this->gatherVImpl(values.data(), sizes.data(), root);
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline void broadcast(Array<T> & values, int root = 0) const {
this->broadcastImpl(values.data(), values.size() * values.getNbComponent(),
root);
}
template <typename T>
inline void broadcast(std::vector<T> & values, int root = 0) const {
using signed_int = std::make_signed_t<decltype(values.size())>;
this->broadcastImpl(values.data(), signed_int(values.size()), root);
}
inline void broadcast(CommunicationBufferTemplated<true> & buffer,
int root = 0) const {
this->broadcastImpl(buffer.data(), buffer.size(), root);
}
inline void broadcast(CommunicationBufferTemplated<false> & buffer,
int root = 0) const {
auto buffer_size = buffer.size();
this->broadcastImpl(&buffer_size, 1, root);
if (whoAmI() != root) {
buffer.reserve(buffer_size);
}
if (buffer_size == 0) {
return;
}
this->broadcastImpl(buffer.data(), buffer.size(), root);
}
template <typename T> inline void broadcast(T & values, int root = 0) const {
this->broadcastImpl(&values, 1, root);
}
/* ------------------------------------------------------------------------ */
void barrier() const;
CommunicationRequest asyncBarrier() const;
/* ------------------------------------------------------------------------ */
/* Request handling */
/* ------------------------------------------------------------------------ */
static bool test(CommunicationRequest & request);
static bool testAll(std::vector<CommunicationRequest> & request);
static void wait(CommunicationRequest & request);
static void waitAll(std::vector<CommunicationRequest> & requests);
static int waitAny(std::vector<CommunicationRequest> & requests);
static inline void freeCommunicationRequest(CommunicationRequest & request);
static inline void
freeCommunicationRequest(std::vector<CommunicationRequest> & requests);
template <typename T, typename MsgProcessor>
inline void
receiveAnyNumber(std::vector<CommunicationRequest> & send_requests,
MsgProcessor && processor, int tag) const;
protected:
template <typename T, typename integer>
requires std::signed_integral<integer>
void
sendImpl(const T * buffer, integer size, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void receiveImpl(T * buffer, integer size, int sender, int tag) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
CommunicationRequest asyncSendImpl(
const T * buffer, integer size, int receiver, int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
CommunicationRequest asyncReceiveImpl(T * buffer, integer size, int sender,
int tag) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void allReduceImpl(T * values, integer nb_values,
SynchronizerOperation op) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void scanImpl(T * values, T * results, integer nb_values,
SynchronizerOperation op) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void exclusiveScanImpl(T * values, T * results, integer nb_values,
SynchronizerOperation op) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void allGatherImpl(T * values, integer nb_values) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void allGatherVImpl(T * values, const integer * nb_values) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void reduceImpl(T * values, integer nb_values, SynchronizerOperation op,
int root = 0) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void gatherImpl(T * values, integer nb_values, int root = 0) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void gatherImpl(T * values, integer nb_values, T * gathered,
integer nb_gathered = 0) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void gatherVImpl(T * values, integer * nb_values, int root = 0) const;
template <typename T, typename integer>
requires std::signed_integral<integer>
void broadcastImpl(T * values, integer nb_values, int root = 0) const;
/* ------------------------------------------------------------------------ */
/* Accessors */
/* ------------------------------------------------------------------------ */
public:
int getNbProc() const;
int whoAmI() const;
static Communicator & getStaticCommunicator();
static Communicator & getStaticCommunicator(int & argc, char **& argv);
int getMaxTag() const;
int getMinTag() const;
AKANTU_GET_MACRO(CommunicatorData, (*communicator_data), decltype(auto));
/* ------------------------------------------------------------------------ */
/* Class Members */
/* ------------------------------------------------------------------------ */
private:
static std::unique_ptr<Communicator> static_communicator;
protected:
std::unique_ptr<CommunicatorInternalData> communicator_data;
};
inline std::ostream & operator<<(std::ostream & stream,
const CommunicationRequest & _this) {
_this.printself(stream);
return stream;
}
} // namespace akantu
#include "communicator_inline_impl.hh"
#endif /* AKANTU_STATIC_COMMUNICATOR_HH_ */
Event Timeline
Log In to Comment