Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F93610402
communicator.hh
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Nov 30, 03:40
Size
20 KB
Mime Type
text/x-c++
Expires
Mon, Dec 2, 03:40 (2 d)
Engine
blob
Format
Raw Data
Handle
22673632
Attached To
rAKA akantu
communicator.hh
View Options
/**
* Copyright (©) 2010-2023 EPFL (Ecole Polytechnique Fédérale de Lausanne)
* Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
*
* This file is part of Akantu
*
* Akantu is free software: you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* Akantu is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Akantu. If not, see <http://www.gnu.org/licenses/>.
*/
/* -------------------------------------------------------------------------- */
#include "aka_array.hh"
#include "aka_common.hh"
#include "aka_event_handler_manager.hh"
#include "communication_buffer.hh"
#include "communication_request.hh"
#include "communicator_event_handler.hh"
/* -------------------------------------------------------------------------- */
#ifndef AKANTU_STATIC_COMMUNICATOR_HH_
#define AKANTU_STATIC_COMMUNICATOR_HH_
namespace akantu {
namespace debug {
class CommunicationException : public Exception {
public:
CommunicationException()
: Exception("An exception happen during a communication process.") {}
};
} // namespace debug
/// @enum SynchronizerOperation reduce operation that the synchronizer can
/// perform
enum class SynchronizerOperation {
_sum,
_min,
_max,
_prod,
_land,
_band,
_lor,
_bor,
_lxor,
_bxor,
_min_loc,
_max_loc,
_null
};
enum class CommunicationMode { _auto, _synchronous, _ready };
namespace {
int _any_source = -1;
}
} // namespace akantu
namespace akantu {
struct CommunicatorInternalData {
virtual ~CommunicatorInternalData() = default;
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
class Communicator : public EventHandlerManager<CommunicatorEventHandler> {
struct private_member {};
/* ------------------------------------------------------------------------ */
/* Constructors/Destructors */
/* ------------------------------------------------------------------------ */
public:
Communicator(int & argc, char **& argv, const private_member & /*m*/);
Communicator(const private_member & /*unused*/ = private_member{});
~Communicator() override;
/* ------------------------------------------------------------------------ */
/* Methods */
/* ------------------------------------------------------------------------ */
public:
/* ------------------------------------------------------------------------ */
/* Point to Point */
/* ------------------------------------------------------------------------ */
template <typename T>
void probe(Int sender, Int tag, CommunicationStatus & status) const;
template <typename T>
bool asyncProbe(Int sender, Int tag, CommunicationStatus & status) const;
/* ------------------------------------------------------------------------ */
template <typename T>
inline void receive(Array<T> & values, Int sender, Int tag) const {
return this->receiveImpl(
values.data(), values.size() * values.getNbComponent(), sender, tag);
}
template <typename T>
inline void receive(std::vector<T> & values, Int sender, Int tag) const {
return this->receiveImpl(values.data(), values.size(), sender, tag);
}
template <typename Tensor>
inline void
receive(Tensor & values, Int sender, Int tag,
std::enable_if_t<aka::is_tensor<Tensor>::value> * /*unused*/ =
nullptr) const {
return this->receiveImpl(values.data(), values.size(), sender, tag);
}
inline void receive(CommunicationBufferTemplated<true> & values, Int sender,
Int tag) const {
return this->receiveImpl(values.data(), values.size(), sender, tag);
}
inline void receive(CommunicationBufferTemplated<false> & values, Int sender,
Int tag) const {
CommunicationStatus status;
this->probe<char>(sender, tag, status);
values.reserve(status.size());
return this->receiveImpl(values.data(), values.size(), sender, tag);
}
template <typename T>
inline void
receive(T & values, Int sender, Int tag,
std::enable_if_t<std::is_arithmetic<T>::value> * /*unused*/ =
nullptr) const {
return this->receiveImpl(&values, 1, sender, tag);
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline void
send(const Array<T> & values, Int receiver, Int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(values.data(),
values.size() * values.getNbComponent(), receiver,
tag, mode);
}
template <typename T>
inline void
send(const std::vector<T> & values, Int receiver, Int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(values.data(), values.size(), receiver, tag, mode);
}
template <typename Tensor,
std::enable_if_t<aka::is_tensor_v<Tensor>> * = nullptr>
inline void
send(const Tensor & values, Int receiver, Int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(values.data(), values.size(), receiver, tag, mode);
}
template <bool is_static>
inline void
send(const CommunicationBufferTemplated<is_static> & values, Int receiver,
Int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->sendImpl(values.data(), values.size(), receiver, tag, mode);
}
template <typename T>
inline void send(const T & values, Int receiver, Int tag,
const CommunicationMode & mode = CommunicationMode::_auto,
std::enable_if_t<std::is_arithmetic<T>::value> * /*unused*/ =
nullptr) const {
return this->sendImpl(&values, 1, receiver, tag, mode);
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline CommunicationRequest
asyncSend(const Array<T> & values, Int receiver, Int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->asyncSendImpl(values.data(),
values.size() * values.getNbComponent(),
receiver, tag, mode);
}
template <typename T>
inline CommunicationRequest
asyncSend(const std::vector<T> & values, Int receiver, Int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->asyncSendImpl(values.data(), values.size(), receiver, tag,
mode);
}
template <typename Tensor>
inline CommunicationRequest asyncSend(
const Tensor & values, Int receiver, Int tag,
const CommunicationMode & mode = CommunicationMode::_auto,
std::enable_if_t<aka::is_tensor_v<Tensor>> * /*unused*/ = nullptr) const {
return this->asyncSendImpl(values.data(), values.size(), receiver, tag,
mode);
}
template <bool is_static>
inline CommunicationRequest
asyncSend(const CommunicationBufferTemplated<is_static> & values,
Int receiver, Int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const {
return this->asyncSendImpl(values.data(), values.size(), receiver, tag,
mode);
}
template <typename T>
inline CommunicationRequest
asyncSend(const T & values, Int receiver, Int tag,
const CommunicationMode & mode = CommunicationMode::_auto,
std::enable_if_t<std::is_arithmetic<T>::value> * /*unused*/ =
nullptr) const {
return this->asyncSendImpl(&values, 1, receiver, tag, mode);
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline CommunicationRequest asyncReceive(Array<T> & values, Int sender,
Int tag) const {
return this->asyncReceiveImpl(
values.data(), values.size() * values.getNbComponent(), sender, tag);
}
template <typename T>
inline CommunicationRequest asyncReceive(std::vector<T> & values, Int sender,
Int tag) const {
return this->asyncReceiveImpl(values.data(), values.size(), sender, tag);
}
template <typename Tensor,
typename = std::enable_if_t<aka::is_tensor_v<Tensor>>>
inline CommunicationRequest asyncReceive(Tensor & values, Int sender,
Int tag) const {
return this->asyncReceiveImpl(values.data(), values.size(), sender, tag);
}
template <bool is_static>
inline CommunicationRequest
asyncReceive(CommunicationBufferTemplated<is_static> & values, Int sender,
Int tag) const {
return this->asyncReceiveImpl(values.data(), values.size(), sender, tag);
}
/* ------------------------------------------------------------------------ */
/* Collectives */
/* ------------------------------------------------------------------------ */
template <typename T>
inline void
allReduce(Array<T> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->allReduceImpl(values.data(), values.size() * values.getNbComponent(),
op);
}
template <typename Derived>
inline void
allReduce(Eigen::MatrixBase<Derived> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->allReduceImpl(values.derived().data(), values.size(), op);
}
template <typename T>
inline void
allReduce(T & values, SynchronizerOperation op = SynchronizerOperation::_sum,
std::enable_if_t<std::is_arithmetic<T>::value> * /*unused*/ =
nullptr) const {
this->allReduceImpl(&values, 1, op);
}
template <typename T>
inline void
scan(Array<T> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->scanImpl(values.data(), values.size() * values.getNbComponent(), op);
}
template <typename Tensor>
inline void scan(
Tensor & values, SynchronizerOperation op,
std::enable_if_t<aka::is_tensor_v<Tensor>> * /*unused*/ = nullptr) const {
this->scanImpl(values.data(), values.data(), values.size(), op);
}
template <typename T>
inline void scan(T & values,
SynchronizerOperation op = SynchronizerOperation::_sum,
std::enable_if_t<std::is_arithmetic<T>::value> * /*unused*/ =
nullptr) const {
this->scanImpl(&values, &values, 1, op);
}
template <typename T>
inline void
exclusiveScan(Array<T> & values,
SynchronizerOperation op = SynchronizerOperation::_sum) const {
this->exclusiveScanImpl(values.data(), values.data(),
values.size() * values.getNbComponent(), op);
}
template <typename Tensor>
inline void
exclusiveScan(Tensor & values,
SynchronizerOperation op = SynchronizerOperation::_sum,
std::enable_if_t<aka::is_tensor_v<Tensor>> * = nullptr) const {
this->exclusiveScanImpl(values.data(), values.size(), op);
}
template <typename T>
inline void
exclusiveScan(T & values,
SynchronizerOperation op = SynchronizerOperation::_sum,
std::enable_if_t<std::is_arithmetic<T>::value> * /*unused*/ =
nullptr) const {
this->exclusiveScanImpl(&values, &values, 1, op);
}
template <typename T>
inline void
exclusiveScan(T & values, T & result,
SynchronizerOperation op = SynchronizerOperation::_sum,
std::enable_if_t<std::is_arithmetic<T>::value> * /*unused*/ =
nullptr) const {
this->exclusiveScanImpl(&values, &result, 1, op);
}
/* ------------------------------------------------------------------------ */
template <typename T> inline void allGather(Array<T> & values) const {
AKANTU_DEBUG_ASSERT(getNbProc() == values.size(),
"The array size is not correct");
this->allGatherImpl(values.data(), values.getNbComponent());
}
template <typename Tensor,
typename = std::enable_if_t<aka::is_tensor_v<Tensor>>>
inline void allGather(Tensor & values) const {
AKANTU_DEBUG_ASSERT(values.size() / getNbProc() > 0,
"The vector size is not correct");
this->allGatherImpl(values.data(), values.size() / getNbProc());
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline void allGatherV(Array<T> & values, const Array<Int> & sizes) const {
this->allGatherVImpl(values.data(), sizes.data());
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline void reduce(Array<T> & values, SynchronizerOperation op,
int root = 0) const {
this->reduceImpl(values.data(), values.size() * values.getNbComponent(), op,
root);
}
/* ------------------------------------------------------------------------ */
template <typename Tensor>
inline void gather(
Tensor & values, int root = 0,
std::enable_if_t<aka::is_tensor_v<Tensor>> * /*unused*/ = nullptr) const {
this->gatherImpl(values.data(), values.getNbComponent(), root);
}
template <typename T>
inline void
gather(T values, int root = 0,
std::enable_if_t<std::is_arithmetic<T>::value> * /*unused*/ =
nullptr) const {
this->gatherImpl(&values, 1, root);
}
/* ------------------------------------------------------------------------ */
template <typename Tensor, typename T>
inline void gather(
Tensor & values, Array<T> & gathered,
std::enable_if_t<aka::is_tensor_v<Tensor>> * /*unused*/ = nullptr) const {
AKANTU_DEBUG_ASSERT(values.size() == gathered.getNbComponent(),
"The array size is not correct");
gathered.resize(getNbProc());
this->gatherImpl(values.data(), values.size(), gathered.data(),
gathered.getNbComponent());
}
template <typename T>
inline void
gather(T values, Array<T> & gathered,
std::enable_if_t<std::is_arithmetic<T>::value> * /*unused*/ =
nullptr) const {
this->gatherImpl(&values, 1, gathered.data(), 1);
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline void gatherV(Array<T> & values, const Array<Int> & sizes,
int root = 0) const {
this->gatherVImpl(values.data(), sizes.data(), root);
}
/* ------------------------------------------------------------------------ */
template <typename T>
inline void broadcast(Array<T> & values, int root = 0) const {
this->broadcastImpl(values.data(), values.size() * values.getNbComponent(),
root);
}
template <typename T>
inline void broadcast(std::vector<T> & values, int root = 0) const {
this->broadcastImpl(values.data(), values.size(), root);
}
inline void broadcast(CommunicationBufferTemplated<true> & buffer,
int root = 0) const {
this->broadcastImpl(buffer.data(), buffer.size(), root);
}
inline void broadcast(CommunicationBufferTemplated<false> & buffer,
int root = 0) const {
auto buffer_size = buffer.size();
this->broadcastImpl(&buffer_size, 1, root);
if (whoAmI() != root) {
buffer.reserve(buffer_size);
}
if (buffer_size == 0) {
return;
}
this->broadcastImpl(buffer.data(), buffer.size(), root);
}
template <typename T> inline void broadcast(T & values, int root = 0) const {
this->broadcastImpl(&values, 1, root);
}
/* ------------------------------------------------------------------------ */
void barrier() const;
CommunicationRequest asyncBarrier() const;
/* ------------------------------------------------------------------------ */
/* Request handling */
/* ------------------------------------------------------------------------ */
static bool test(CommunicationRequest & request);
static bool testAll(std::vector<CommunicationRequest> & request);
static void wait(CommunicationRequest & request);
static void waitAll(std::vector<CommunicationRequest> & requests);
static Int waitAny(std::vector<CommunicationRequest> & requests);
static inline void freeCommunicationRequest(CommunicationRequest & request);
static inline void
freeCommunicationRequest(std::vector<CommunicationRequest> & requests);
template <typename T, typename MsgProcessor>
inline void
receiveAnyNumber(std::vector<CommunicationRequest> & send_requests,
MsgProcessor && processor, Int tag) const;
protected:
template <typename T>
void
sendImpl(const T * buffer, Int size, Int receiver, Int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const;
template <typename T>
void receiveImpl(T * buffer, Int size, Int sender, Int tag) const;
template <typename T>
CommunicationRequest asyncSendImpl(
const T * buffer, Int size, Int receiver, Int tag,
const CommunicationMode & mode = CommunicationMode::_auto) const;
template <typename T>
CommunicationRequest asyncReceiveImpl(T * buffer, Int size, Int sender,
Int tag) const;
template <typename T>
void allReduceImpl(T * values, int nb_values, SynchronizerOperation op) const;
template <typename T>
void scanImpl(T * values, T * results, int nb_values,
SynchronizerOperation op) const;
template <typename T>
void exclusiveScanImpl(T * values, T * results, int nb_values,
SynchronizerOperation op) const;
template <typename T> void allGatherImpl(T * values, int nb_values) const;
template <typename T>
void allGatherVImpl(T * values, const int * nb_values) const;
template <typename T>
void reduceImpl(T * values, int nb_values, SynchronizerOperation op,
int root = 0) const;
template <typename T>
void gatherImpl(T * values, int nb_values, int root = 0) const;
template <typename T>
void gatherImpl(T * values, int nb_values, T * gathered,
int nb_gathered = 0) const;
template <typename T>
void gatherVImpl(T * values, int * nb_values, int root = 0) const;
template <typename T>
void broadcastImpl(T * values, int nb_values, int root = 0) const;
/* ------------------------------------------------------------------------ */
/* Accessors */
/* ------------------------------------------------------------------------ */
public:
Int getNbProc() const;
Int whoAmI() const;
static Communicator & getStaticCommunicator();
static Communicator & getStaticCommunicator(int & argc, char **& argv);
int getMaxTag() const;
int getMinTag() const;
AKANTU_GET_MACRO(CommunicatorData, (*communicator_data), decltype(auto));
/* ------------------------------------------------------------------------ */
/* Class Members */
/* ------------------------------------------------------------------------ */
private:
static std::unique_ptr<Communicator> static_communicator;
protected:
std::unique_ptr<CommunicatorInternalData> communicator_data;
};
inline std::ostream & operator<<(std::ostream & stream,
const CommunicationRequest & _this) {
_this.printself(stream);
return stream;
}
} // namespace akantu
#include "communicator_inline_impl.hh"
#endif /* AKANTU_STATIC_COMMUNICATOR_HH_ */
Event Timeline
Log In to Comment