diff --git a/src/synchronizer/communicator.hh b/src/synchronizer/communicator.hh index cefd90ace..e3ac8f462 100644 --- a/src/synchronizer/communicator.hh +++ b/src/synchronizer/communicator.hh @@ -1,437 +1,436 @@ /** * @file communicator.hh * * @author Nicolas Richart * * @date creation: Fri Jun 18 2010 * @date last modification: Wed Nov 15 2017 * * @brief Class handling the parallel communications * * @section LICENSE * * Copyright (©) 2010-2018 EPFL (Ecole Polytechnique Fédérale de Lausanne) * Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) * * Akantu is free software: you can redistribute it and/or modify it under the * terms of the GNU Lesser General Public License as published by the Free * Software Foundation, either version 3 of the License, or (at your option) any * later version. * * Akantu is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more * details. * * You should have received a copy of the GNU Lesser General Public License * along with Akantu. If not, see . * */ /* -------------------------------------------------------------------------- */ #include "aka_array.hh" #include "aka_common.hh" #include "aka_event_handler_manager.hh" #include "communication_buffer.hh" #include "communication_request.hh" #include "communicator_event_handler.hh" /* -------------------------------------------------------------------------- */ #ifndef __AKANTU_STATIC_COMMUNICATOR_HH__ #define __AKANTU_STATIC_COMMUNICATOR_HH__ namespace akantu { namespace debug { class CommunicationException : public Exception { public: CommunicationException() : Exception("An exception happen during a communication process.") {} }; } // namespace debug /// @enum SynchronizerOperation reduce operation that the synchronizer can /// perform enum class SynchronizerOperation { _sum, _min, _max, _prod, _land, _band, _lor, _bor, _lxor, _bxor, _min_loc, _max_loc, _null }; enum class CommunicationMode { _auto, _synchronous, _ready }; namespace { int _any_source = -1; } } // namespace akantu namespace akantu { struct CommunicatorInternalData { virtual ~CommunicatorInternalData() = default; }; /* -------------------------------------------------------------------------- */ /* -------------------------------------------------------------------------- */ class Communicator : public EventHandlerManager { struct private_member {}; /* ------------------------------------------------------------------------ */ /* Constructors/Destructors */ /* ------------------------------------------------------------------------ */ public: Communicator(int & argc, char **& argv, const private_member &); ~Communicator() override; /* ------------------------------------------------------------------------ */ /* Methods */ /* ------------------------------------------------------------------------ */ public: /* ------------------------------------------------------------------------ */ /* Point to Point */ /* ------------------------------------------------------------------------ */ template inline void receive(Array & values, Int sender, Int tag) const { return this->receiveImpl( values.storage(), values.size() * values.getNbComponent(), sender, tag); } template inline void receive(Tensor & values, Int sender, Int tag, std::enable_if_t::value> * = nullptr) const { return this->receiveImpl(values.storage(), values.size(), sender, tag); } template inline void receive(CommunicationBufferTemplated & values, Int sender, Int tag) const { return this->receiveImpl(values.storage(), values.size(), sender, tag); } template inline void receive(T & values, Int sender, Int tag, std::enable_if_t::value> * = nullptr) const { return this->receiveImpl(&values, 1, sender, tag); } /* ------------------------------------------------------------------------ */ template inline void send(const Array & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const { return this->sendImpl(values.storage(), values.size() * values.getNbComponent(), receiver, tag, mode); } template inline void send(const Tensor & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto, std::enable_if_t::value> * = nullptr) const { return this->sendImpl(values.storage(), values.size(), receiver, tag, mode); } template inline void send(const CommunicationBufferTemplated & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const { return this->sendImpl(values.storage(), values.size(), receiver, tag, mode); } template inline void send(const T & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto, std::enable_if_t::value> * = nullptr) const { return this->sendImpl(&values, 1, receiver, tag, mode); } /* ------------------------------------------------------------------------ */ template inline CommunicationRequest asyncSend(const Array & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const { return this->asyncSendImpl(values.storage(), values.size() * values.getNbComponent(), receiver, tag, mode); } template inline CommunicationRequest asyncSend(const std::vector & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const { return this->asyncSendImpl(values.data(), values.size(), receiver, tag, mode); } template inline CommunicationRequest asyncSend(const Tensor & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto, std::enable_if_t::value> * = nullptr) const { return this->asyncSendImpl(values.storage(), values.size(), receiver, tag, mode); } template inline CommunicationRequest asyncSend(const CommunicationBufferTemplated & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const { return this->asyncSendImpl(values.storage(), values.size(), receiver, tag, mode); } template inline CommunicationRequest asyncSend(const T & values, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto, std::enable_if_t::value> * = nullptr) const { return this->asyncSendImpl(&values, 1, receiver, tag, mode); } /* ------------------------------------------------------------------------ */ template inline CommunicationRequest asyncReceive(Array & values, Int sender, Int tag) const { return this->asyncReceiveImpl( values.storage(), values.size() * values.getNbComponent(), sender, tag); } template inline CommunicationRequest asyncReceive(std::vector & values, Int sender, Int tag) const { return this->asyncReceiveImpl(values.data(), values.size(), sender, tag); } template ::value>> inline CommunicationRequest asyncReceive(Tensor & values, Int sender, Int tag) const { return this->asyncReceiveImpl(values.storage(), values.size(), sender, tag); } template inline CommunicationRequest asyncReceive(CommunicationBufferTemplated & values, Int sender, Int tag) const { return this->asyncReceiveImpl(values.storage(), values.size(), sender, tag); } /* ------------------------------------------------------------------------ */ template void probe(Int sender, Int tag, CommunicationStatus & status) const; template bool asyncProbe(Int sender, Int tag, CommunicationStatus & status) const; /* ------------------------------------------------------------------------ */ /* Collectives */ /* ------------------------------------------------------------------------ */ template inline void allReduce(Array & values, const SynchronizerOperation & op) const { this->allReduceImpl(values.storage(), values.size() * values.getNbComponent(), op); } template inline void allReduce(Tensor & values, const SynchronizerOperation & op, std::enable_if_t::value> * = nullptr) const { this->allReduceImpl(values.storage(), values.size(), op); } template inline void allReduce(T & values, const SynchronizerOperation & op, std::enable_if_t::value> * = nullptr) const { this->allReduceImpl(&values, 1, op); } /* ------------------------------------------------------------------------ */ template inline void allGather(Array & values) const { AKANTU_DEBUG_ASSERT(UInt(psize) == values.size(), "The array size is not correct"); this->allGatherImpl(values.storage(), values.getNbComponent()); } template ::value>> inline void allGather(Tensor & values) const { AKANTU_DEBUG_ASSERT(values.size() / UInt(psize) > 0, "The vector size is not correct"); this->allGatherImpl(values.storage(), values.size() / UInt(psize)); } /* ------------------------------------------------------------------------ */ template inline void allGatherV(Array & values, const Array & sizes) const { this->allGatherVImpl(values.storage(), sizes.storage()); } /* ------------------------------------------------------------------------ */ template inline void reduce(Array & values, const SynchronizerOperation & op, int root = 0) const { this->reduceImpl(values.storage(), values.size() * values.getNbComponent(), op, root); } /* ------------------------------------------------------------------------ */ template inline void gather(Tensor & values, int root = 0, std::enable_if_t::value> * = nullptr) const { this->gatherImpl(values.storage(), values.getNbComponent(), root); } template inline void gather(T values, int root = 0, std::enable_if_t::value> * = nullptr) const { this->gatherImpl(&values, 1, root); } /* ------------------------------------------------------------------------ */ template inline void gather(Tensor & values, Array & gathered, std::enable_if_t::value> * = nullptr) const { AKANTU_DEBUG_ASSERT(values.size() == gathered.getNbComponent(), "The array size is not correct"); gathered.resize(psize); this->gatherImpl(values.data(), values.size(), gathered.storage(), gathered.getNbComponent()); } template inline void gather(T values, Array & gathered, std::enable_if_t::value> * = nullptr) const { this->gatherImpl(&values, 1, gathered.storage(), 1); } /* ------------------------------------------------------------------------ */ template inline void gatherV(Array & values, const Array & sizes, int root = 0) const { this->gatherVImpl(values.storage(), sizes.storage(), root); } /* ------------------------------------------------------------------------ */ template inline void broadcast(Array & values, int root = 0) const { this->broadcastImpl(values.storage(), values.size() * values.getNbComponent(), root); } template inline void broadcast(CommunicationBufferTemplated & values, int root = 0) const { this->broadcastImpl(values.storage(), values.size(), root); } template inline void broadcast(T & values, int root = 0) const { this->broadcastImpl(&values, 1, root); } /* ------------------------------------------------------------------------ */ void barrier() const; CommunicationRequest asyncBarrier() const; /* ------------------------------------------------------------------------ */ /* Request handling */ /* ------------------------------------------------------------------------ */ bool test(CommunicationRequest & request) const; bool testAll(std::vector & request) const; void wait(CommunicationRequest & request) const; void waitAll(std::vector & requests) const; UInt waitAny(std::vector & requests) const; inline void freeCommunicationRequest(CommunicationRequest & request) const; inline void freeCommunicationRequest(std::vector & requests) const; template inline void receiveAnyNumber(std::vector & send_requests, - Array receive_buffer, MsgProcessor && processor, - Int tag) const; + MsgProcessor && processor, Int tag) const; protected: template void sendImpl(const T * buffer, Int size, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const; template void receiveImpl(T * buffer, Int size, Int sender, Int tag) const; template CommunicationRequest asyncSendImpl( const T * buffer, Int size, Int receiver, Int tag, const CommunicationMode & mode = CommunicationMode::_auto) const; template CommunicationRequest asyncReceiveImpl(T * buffer, Int size, Int sender, Int tag) const; template void allReduceImpl(T * values, int nb_values, const SynchronizerOperation & op) const; template void allGatherImpl(T * values, int nb_values) const; template void allGatherVImpl(T * values, int * nb_values) const; template void reduceImpl(T * values, int nb_values, const SynchronizerOperation & op, int root = 0) const; template void gatherImpl(T * values, int nb_values, int root = 0) const; template void gatherImpl(T * values, int nb_values, T * gathered, int nb_gathered = 0) const; template void gatherVImpl(T * values, int * nb_values, int root = 0) const; template void broadcastImpl(T * values, int nb_values, int root = 0) const; /* ------------------------------------------------------------------------ */ /* Accessors */ /* ------------------------------------------------------------------------ */ public: Int getNbProc() const { return psize; }; Int whoAmI() const { return prank; }; static Communicator & getStaticCommunicator(); static Communicator & getStaticCommunicator(int & argc, char **& argv); int getMaxTag() const; int getMinTag() const; AKANTU_GET_MACRO(CommunicatorData, (*communicator_data), decltype(auto)); /* ------------------------------------------------------------------------ */ /* Class Members */ /* ------------------------------------------------------------------------ */ private: static std::unique_ptr static_communicator; protected: Int prank{0}; Int psize{1}; std::unique_ptr communicator_data; }; inline std::ostream & operator<<(std::ostream & stream, const CommunicationRequest & _this) { _this.printself(stream); return stream; } } // namespace akantu #include "communicator_inline_impl.hh" #endif /* __AKANTU_STATIC_COMMUNICATOR_HH__ */ diff --git a/src/synchronizer/communicator_inline_impl.hh b/src/synchronizer/communicator_inline_impl.hh index 2cc7f5e9c..d2ced88f9 100644 --- a/src/synchronizer/communicator_inline_impl.hh +++ b/src/synchronizer/communicator_inline_impl.hh @@ -1,87 +1,87 @@ /** * @file communicator_inline_impl.hh * * @author Nicolas Richart * * @date creation: Tue Feb 02 2016 * @date last modification: Tue Nov 07 2017 * * @brief implementation of inline functions * * @section LICENSE * * Copyright (©) 2016-2018 EPFL (Ecole Polytechnique Fédérale de Lausanne) * Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) * * Akantu is free software: you can redistribute it and/or modify it under the * terms of the GNU Lesser General Public License as published by the Free * Software Foundation, either version 3 of the License, or (at your option) any * later version. * * Akantu is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more * details. * * You should have received a copy of the GNU Lesser General Public License * along with Akantu. If not, see . * */ /* -------------------------------------------------------------------------- */ #include "communicator.hh" /* -------------------------------------------------------------------------- */ #ifndef __AKANTU_STATIC_COMMUNICATOR_INLINE_IMPL_HH__ #define __AKANTU_STATIC_COMMUNICATOR_INLINE_IMPL_HH__ namespace akantu { /* -------------------------------------------------------------------------- */ inline void Communicator::freeCommunicationRequest(CommunicationRequest & request) const { request.free(); } /* -------------------------------------------------------------------------- */ inline void Communicator::freeCommunicationRequest( std::vector & requests) const { std::vector::iterator it; for (it = requests.begin(); it != requests.end(); ++it) { it->free(); } } /* -------------------------------------------------------------------------- */ template inline void Communicator::receiveAnyNumber( - std::vector & send_requests, Array receive_buffer, + std::vector & send_requests, MsgProcessor && processor, Int tag) const { CommunicationRequest barrier_request; bool got_all = false, are_send_finished = false; while (not got_all) { bool are_receives_ready = true; while (are_receives_ready) { CommunicationStatus status; - are_receives_ready = asyncProbe(_any_source, tag, status); + are_receives_ready = asyncProbe(_any_source, tag, status); if (are_receives_ready) { - receive_buffer.resize(status.size()); + Array receive_buffer(status.size(), 1); receive(receive_buffer, status.getSource(), tag); std::forward(processor)(status.getSource(), receive_buffer); } } if (not are_send_finished) { are_send_finished = testAll(send_requests); if (are_send_finished) barrier_request = asyncBarrier(); } if (are_send_finished) { got_all = test(barrier_request); } } } } // namespace akantu #endif /* __AKANTU_STATIC_COMMUNICATOR_INLINE_IMPL_HH__ */ diff --git a/src/synchronizer/facet_synchronizer.cc b/src/synchronizer/facet_synchronizer.cc index ae247444b..9de07e683 100644 --- a/src/synchronizer/facet_synchronizer.cc +++ b/src/synchronizer/facet_synchronizer.cc @@ -1,208 +1,206 @@ /** * @file facet_synchronizer.cc * * @author Nicolas Richart * @author Marco Vocialta * * @date creation: Wed Nov 05 2014 * @date last modification: Fri Jan 26 2018 * * @brief Facet synchronizer for parallel simulations with cohesive elments * * @section LICENSE * * Copyright (©) 2015-2018 EPFL (Ecole Polytechnique Fédérale de Lausanne) * Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) * * Akantu is free software: you can redistribute it and/or modify it under the * terms of the GNU Lesser General Public License as published by the Free * Software Foundation, either version 3 of the License, or (at your option) any * later version. * * Akantu is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more * details. * * You should have received a copy of the GNU Lesser General Public License * along with Akantu. If not, see . * */ /* -------------------------------------------------------------------------- */ #include "facet_synchronizer.hh" /* -------------------------------------------------------------------------- */ namespace akantu { /* -------------------------------------------------------------------------- */ FacetSynchronizer::FacetSynchronizer( Mesh & mesh, const ElementSynchronizer & element_synchronizer, const ID & id, MemoryID memory_id) : ElementSynchronizer(mesh, id, memory_id) { auto spatial_dimension = mesh.getSpatialDimension(); element_to_prank.initialize(mesh, _spatial_dimension = spatial_dimension - 1, _ghost_type = _ghost, _with_nb_element = true, _default_value = rank); // Build element to prank for (auto && scheme_pair : element_synchronizer.communications.iterateSchemes(_recv)) { auto proc = std::get<0>(scheme_pair); const auto & scheme = std::get<1>(scheme_pair); for (auto && elem : scheme) { const auto & facet_to_element = mesh.getSubelementToElement(elem.type, elem.ghost_type); Vector facets = facet_to_element.begin( facet_to_element.getNbComponent())[elem.element]; for (UInt f = 0; f < facets.size(); ++f) { const auto & facet = facets(f); if (facet == ElementNull) continue; if (facet.ghost_type == _not_ghost) continue; auto & facet_rank = element_to_prank(facet); if ((proc < UInt(facet_rank)) || (UInt(facet_rank) == rank)) facet_rank = proc; } } } ElementTypeMapArray facet_global_connectivities( "facet_global_connectivities", id, memory_id); facet_global_connectivities.initialize( mesh, _spatial_dimension = spatial_dimension - 1, _with_nb_element = true, _with_nb_nodes_per_element = true); mesh.getGlobalConnectivity(facet_global_connectivities); // \TODO perhaps a global element numbering might be useful here... for (auto type : facet_global_connectivities.elementTypes(_spatial_dimension = _all_dimensions, _element_kind = _ek_not_defined, _ghost_type = _not_ghost)) { auto & conn = facet_global_connectivities(type, _not_ghost); auto conn_view = make_view(conn, conn.getNbComponent()); std::for_each(conn_view.begin(), conn_view.end(), [&](auto & conn) { std::sort(conn.storage(), conn.storage() + conn.size()); }); } /// init facet check tracking ElementTypeMapArray facet_checked("facet_checked", id, memory_id); std::map> recv_connectivities; /// Generate the recv scheme and connnectivities to send to the other /// processors for (auto && scheme_pair : element_synchronizer.communications.iterateSchemes(_recv)) { facet_checked.initialize(mesh, _spatial_dimension = spatial_dimension - 1, _ghost_type = _ghost, _with_nb_element = true, _default_value = false); auto proc = scheme_pair.first; const auto & elements = scheme_pair.second; auto & facet_scheme = communications.createScheme(proc, _recv); // this creates empty arrays... auto & connectivities_for_proc = recv_connectivities[proc]; connectivities_for_proc.setID( id + ":connectivities_for_proc:" + std::to_string(proc)); connectivities_for_proc.initialize( mesh, _spatial_dimension = spatial_dimension - 1, _with_nb_nodes_per_element = true, _ghost_type = _ghost); // for every element in the element synchronizer communication scheme, // check the facets to see if they should be communicated and create a // connectivity array to match with the one other processors might send for (auto && element : elements) { const auto & facet_to_element = mesh.getSubelementToElement(element.type, element.ghost_type); Vector facets = facet_to_element.begin( facet_to_element.getNbComponent())[element.element]; for (UInt f = 0; f < facets.size(); ++f) { auto & facet = facets(f); // exclude no valid facets if (facet == ElementNull) continue; // exclude _ghost facet from send scheme and _not_ghost from receive if (facet.ghost_type != _ghost) continue; // exclude facet from other processors then the one of current // interest in case of receive scheme if (UInt(element_to_prank(facet)) != proc) continue; auto & checked = facet_checked(facet); // skip already checked facets if (checked) continue; checked = true; facet_scheme.push_back(facet); auto & global_conn = facet_global_connectivities(facet.type, facet.ghost_type); Vector conn = global_conn.begin(global_conn.getNbComponent())[facet.element]; std::sort(conn.storage(), conn.storage() + conn.size()); connectivities_for_proc(facet.type, facet.ghost_type).push_back(conn); } } } std::vector send_requests; /// do every communication by element type for (auto && type : mesh.elementTypes(spatial_dimension - 1)) { for (auto && pair : recv_connectivities) { auto proc = std::get<0>(pair); const auto & connectivities_for_proc = std::get<1>(pair); auto && tag = Tag::genTag(proc, type, 1337); send_requests.push_back( communicator.asyncSend(connectivities_for_proc(type, _ghost), proc, tag, CommunicationMode::_synchronous)); } auto nb_nodes_per_facet = Mesh::getNbNodesPerElement(type); - Array buffer; - - communicator.receiveAnyNumber( - send_requests, buffer, + communicator.receiveAnyNumber( + send_requests, [&](auto && proc, auto && message) { auto & local_connectivities = facet_global_connectivities(type, _not_ghost); auto & send_scheme = communications.createScheme(proc, _send); auto conn_view = make_view(local_connectivities, nb_nodes_per_facet); auto conn_begin = conn_view.begin(); auto conn_end = conn_view.end(); for (const auto & c_to_match : make_view(message, nb_nodes_per_facet)) { auto it = std::find(conn_begin, conn_end, c_to_match); if (it != conn_end) { auto facet = Element{type, UInt(it - conn_begin), _not_ghost}; send_scheme.push_back(facet); } else { AKANTU_EXCEPTION("No local facet found to send to proc " << proc << " corresponding to " << c_to_match); } } }, Tag::genTag(rank, type, 1337)); } } } // namespace akantu diff --git a/src/synchronizer/node_synchronizer.cc b/src/synchronizer/node_synchronizer.cc index f3ace1167..edb4c7a28 100644 --- a/src/synchronizer/node_synchronizer.cc +++ b/src/synchronizer/node_synchronizer.cc @@ -1,165 +1,177 @@ /** * @file node_synchronizer.cc * * @author Nicolas Richart * * @date creation: Fri Jun 18 2010 * @date last modification: Wed Nov 15 2017 * * @brief Implementation of the node synchronizer * * @section LICENSE * * Copyright (©) 2010-2018 EPFL (Ecole Polytechnique Fédérale de Lausanne) * Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) * * Akantu is free software: you can redistribute it and/or modify it under the * terms of the GNU Lesser General Public License as published by the Free * Software Foundation, either version 3 of the License, or (at your option) any * later version. * * Akantu is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more * details. * * You should have received a copy of the GNU Lesser General Public License * along with Akantu. If not, see . * */ /* -------------------------------------------------------------------------- */ #include "node_synchronizer.hh" #include "mesh.hh" /* -------------------------------------------------------------------------- */ namespace akantu { /* -------------------------------------------------------------------------- */ NodeSynchronizer::NodeSynchronizer(Mesh & mesh, const ID & id, MemoryID memory_id, const bool register_to_event_manager, EventHandlerPriority event_priority) : SynchronizerImpl(mesh.getCommunicator(), id, memory_id), mesh(mesh) { AKANTU_DEBUG_IN(); if (register_to_event_manager) { this->mesh.registerEventHandler(*this, event_priority); } AKANTU_DEBUG_OUT(); } /* -------------------------------------------------------------------------- */ NodeSynchronizer::~NodeSynchronizer() = default; /* -------------------------------------------------------------------------- */ void NodeSynchronizer::onNodesAdded(const Array & /*nodes_list*/, const NewNodesEvent &) { std::map> nodes_per_proc; - static int count = 1; + // recreates fully the schemes due to changes of global ids // \TODO add an event to handle global id changes - for(auto && data : communications.iterateRecvSchemes()) { + for (auto && data : communications.iterateSchemes(_recv)) { auto & scheme = data.second; scheme.resize(0); } for (auto && local_id : arange(mesh.getNbNodes())) { auto type = mesh.getNodeType(local_id); if (type < 0) continue; // local, master or pure ghost auto global_id = mesh.getNodeGlobalId(local_id); auto proc = UInt(type); nodes_per_proc[proc].push_back(global_id); auto & scheme = communications.createScheme(proc, _recv); scheme.push_back(local_id); } std::vector send_requests; - for (auto & pair : nodes_per_proc) { + for (auto && pair : communications.iterateSchemes(_recv)) { auto proc = pair.first; - auto & nodes = pair.second; - send_requests.push_back( - communicator.asyncSend(nodes, proc, Tag::genTag(proc, count, 0xcafe))); + // if proc not in nodes_per_proc this should insert an empty array to send + send_requests.push_back(communicator.asyncSend( + nodes_per_proc[proc], proc, Tag::genTag(rank, proc, 0xcafe))); + } + + for (auto && data : communications.iterateSchemes(_send)) { + auto proc = data.first; + auto & scheme = data.second; + CommunicationStatus status; + + auto tag = Tag::genTag(proc, rank, 0xcafe); + communicator.probe(proc, tag, status); + + scheme.resize(status.size()); + communicator.receive(scheme, proc, tag); + std::transform(scheme.begin(), scheme.end(), scheme.begin(), + [&](auto & gnode) { return mesh.getNodeLocalId(gnode); }); } - Array buffer; - - communicator.receiveAnyNumber( - send_requests, buffer, - [&](auto && proc, auto && nodes) { - auto & scheme = communications.createScheme(proc, _send); - scheme.resize(nodes.size()); - for (auto && data : enumerate(nodes)) { - auto global_id = std::get<1>(data); - auto local_id = mesh.getNodeLocalId(global_id); - AKANTU_DEBUG_ASSERT(local_id != UInt(-1), - "The global node " << global_id - << "is not known on rank " - << rank); - scheme[std::get<0>(data)] = local_id; - } - }, - Tag::genTag(rank, count, 0xcafe)); + // communicator.receiveAnyNumber( + // send_requests, + // [&](auto && proc, auto && nodes) { + // auto & scheme = communications.createScheme(proc, _send); + // scheme.resize(nodes.size()); + // for (auto && data : enumerate(nodes)) { + // auto global_id = std::get<1>(data); + // auto local_id = mesh.getNodeLocalId(global_id); + // AKANTU_DEBUG_ASSERT(local_id != UInt(-1), + // "The global node " << global_id + // << "is not known on rank " + // << rank); + // scheme[std::get<0>(data)] = local_id; + // } + // }, + // Tag::genTag(rank, count, 0xcafe)); + // ++count; communicator.waitAll(send_requests); communicator.freeCommunicationRequest(send_requests); - ++count; } /* -------------------------------------------------------------------------- */ UInt NodeSynchronizer::sanityCheckDataSize(const Array & nodes, const SynchronizationTag & tag, bool from_comm_desc) const { UInt size = SynchronizerImpl::sanityCheckDataSize(nodes, tag, from_comm_desc); // positions size += mesh.getSpatialDimension() * sizeof(Real) * nodes.size(); return size; } /* -------------------------------------------------------------------------- */ void NodeSynchronizer::packSanityCheckData( CommunicationBuffer & buffer, const Array & nodes, const SynchronizationTag & /*tag*/) const { auto dim = mesh.getSpatialDimension(); for (auto && node : nodes) { buffer << Vector(mesh.getNodes().begin(dim)[node]); } } /* -------------------------------------------------------------------------- */ void NodeSynchronizer::unpackSanityCheckData(CommunicationBuffer & buffer, const Array & nodes, const SynchronizationTag & tag, UInt proc, UInt rank) const { auto dim = mesh.getSpatialDimension(); // std::set skip_conn_tags{_gst_smmc_facets_conn, // _gst_giu_global_conn}; // bool is_skip_tag_conn = skip_conn_tags.find(tag) != skip_conn_tags.end(); for (auto && node : nodes) { Vector pos_remote(dim); buffer >> pos_remote; Vector pos(mesh.getNodes().begin(dim)[node]); auto dist = pos_remote.distance(pos); if (not Math::are_float_equal(dist, 0.)) { AKANTU_EXCEPTION("Unpacking an unknown value for the node " << node << "(position " << pos << " != buffer " << pos_remote << ") [" << dist << "] - tag: " << tag << " comm from " << proc << " to " << rank); } } } /* -------------------------------------------------------------------------- */ } // namespace akantu