/** * @file static_communicator_mpi_inline_impl.cc * @author Nicolas Richart * @date Thu Sep 2 15:10:51 2010 * * @brief implementation of the mpi communicator * * @section LICENSE * * Copyright (©) 2010-2011 EPFL (Ecole Polytechnique fédérale de Lausanne) * Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides) * * Akantu is free software: you can redistribute it and/or modify it under the * terms of the GNU Lesser General Public License as published by the Free * Software Foundation, either version 3 of the License, or (at your option) any * later version. * * Akantu is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR * A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more * details. * * You should have received a copy of the GNU Lesser General Public License * along with Akantu. If not, see . * */ /* -------------------------------------------------------------------------- */ inline CommunicationRequestMPI::CommunicationRequestMPI(UInt source, UInt dest) : CommunicationRequest(source, dest) { request = new MPI_Request; } /* -------------------------------------------------------------------------- */ inline CommunicationRequestMPI::~CommunicationRequestMPI() { delete request; } /* -------------------------------------------------------------------------- */ inline StaticCommunicatorMPI::StaticCommunicatorMPI(int * argc, char *** argv) { MPI_Init(argc, argv); setMPICommunicator(MPI_COMM_WORLD); } /* -------------------------------------------------------------------------- */ inline StaticCommunicatorMPI::~StaticCommunicatorMPI() { MPI_Finalize(); } /* -------------------------------------------------------------------------- */ inline void StaticCommunicatorMPI::setMPICommunicator(MPI_Comm comm) { communicator = comm; MPI_Comm_rank(communicator, &prank); MPI_Comm_size(communicator, &psize); } /* -------------------------------------------------------------------------- */ inline MPI_Comm StaticCommunicatorMPI::getMPICommunicator() const { return communicator; } /* -------------------------------------------------------------------------- */ inline void StaticCommunicatorMPI::send(UInt * buffer, Int size, Int receiver, Int tag) { #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Send(buffer, size, MPI_UNSIGNED, receiver, tag, communicator); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Send."); } /* -------------------------------------------------------------------------- */ inline void StaticCommunicatorMPI::send(Real * buffer, Int size, Int receiver, Int tag) { #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Send(buffer, size, MPI_DOUBLE, receiver, tag, communicator); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Send."); } /* -------------------------------------------------------------------------- */ inline void StaticCommunicatorMPI::receive(UInt * buffer, Int size, Int sender, Int tag) { MPI_Status status; #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Recv(buffer, size, MPI_UNSIGNED, sender, tag, communicator, &status); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Recv."); } /* -------------------------------------------------------------------------- */ inline void StaticCommunicatorMPI::receive(Real * buffer, Int size, Int sender, Int tag) { MPI_Status status; #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Recv(buffer, size, MPI_DOUBLE, sender, tag, communicator, &status); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Recv."); } /* -------------------------------------------------------------------------- */ inline CommunicationRequest * StaticCommunicatorMPI::asyncSend(UInt * buffer, Int size, Int receiver, Int tag) { CommunicationRequestMPI * request = new CommunicationRequestMPI(prank, receiver); #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Isend(buffer, size, MPI_UNSIGNED, receiver, tag, communicator, request->getMPIRequest()); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Isend."); return request; } /* -------------------------------------------------------------------------- */ inline CommunicationRequest * StaticCommunicatorMPI::asyncSend(Real * buffer, Int size, Int receiver, Int tag) { CommunicationRequestMPI * request = new CommunicationRequestMPI(prank, receiver); #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Isend(buffer, size, MPI_DOUBLE, receiver, tag, communicator, request->getMPIRequest()); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Isend."); return request; } /* -------------------------------------------------------------------------- */ inline CommunicationRequest * StaticCommunicatorMPI::asyncReceive(UInt * buffer, Int size, Int sender, Int tag) { CommunicationRequestMPI * request = new CommunicationRequestMPI(sender, prank); #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Irecv(buffer, size, MPI_UNSIGNED, sender, tag, communicator, request->getMPIRequest()); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Irecv."); return request; } /* -------------------------------------------------------------------------- */ inline CommunicationRequest * StaticCommunicatorMPI::asyncReceive(Real * buffer, Int size, Int sender, Int tag) { CommunicationRequestMPI * request = new CommunicationRequestMPI(sender, prank); #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Irecv(buffer, size, MPI_DOUBLE, sender, tag, communicator, request->getMPIRequest()); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Irecv."); return request; } /* -------------------------------------------------------------------------- */ inline bool StaticCommunicatorMPI::testRequest(CommunicationRequest * request) { MPI_Status status; int flag; CommunicationRequestMPI * req_mpi = static_cast(request); MPI_Request * req = req_mpi->getMPIRequest(); #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Test(req, &flag, &status); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Test."); return (flag != 0); } /* -------------------------------------------------------------------------- */ inline void StaticCommunicatorMPI::wait(CommunicationRequest * request) { MPI_Status status; CommunicationRequestMPI * req_mpi = static_cast(request); MPI_Request * req = req_mpi->getMPIRequest(); #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Wait(req, &status); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Wait."); } /* -------------------------------------------------------------------------- */ inline void StaticCommunicatorMPI::waitAll(std::vector & requests) { MPI_Status status; std::vector::iterator it; for(it = requests.begin(); it != requests.end(); ++it) { MPI_Request * req = static_cast(*it)->getMPIRequest(); #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Wait(req, &status); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Wait."); } } /* -------------------------------------------------------------------------- */ inline void StaticCommunicatorMPI::barrier() { MPI_Barrier(communicator); } /* -------------------------------------------------------------------------- */ inline void StaticCommunicatorMPI::allReduce(UInt * values, Int nb_values, const SynchronizerOperation & op) { #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Allreduce(MPI_IN_PLACE, values, nb_values, MPI_UNSIGNED, synchronizer_operation_to_mpi_op[op], communicator); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Allreduce."); } /* -------------------------------------------------------------------------- */ inline void StaticCommunicatorMPI::allReduce(Real * values, Int nb_values, const SynchronizerOperation & op) { #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Allreduce(MPI_IN_PLACE, values, nb_values, MPI_DOUBLE, synchronizer_operation_to_mpi_op[op], communicator); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Allreduce."); } /* -------------------------------------------------------------------------- */ template inline void StaticCommunicatorMPI::_gather(T * values, Int nb_values, Int root) { T * send_buf = NULL, * recv_buf = NULL; if(prank == root) { send_buf = (T *) MPI_IN_PLACE; recv_buf = values; } else { send_buf = values; } MPI_Datatype type = getMPIDatatype(); #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Gather(send_buf, nb_values, type, recv_buf, nb_values, type, root, communicator); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Gather."); } /* -------------------------------------------------------------------------- */ template inline void StaticCommunicatorMPI::_gatherv(T * values, Int * nb_values, Int root) { Int * displs = NULL; if(prank == root) { displs = new Int[psize]; displs[0] = 0; for (Int i = 1; i < psize; ++i) { displs[i] = displs[i-1] + nb_values[i-1]; } } T * send_buf = NULL, * recv_buf = NULL; if(prank == root) { send_buf = (T *) MPI_IN_PLACE; recv_buf = values; } else send_buf = values; MPI_Datatype type = getMPIDatatype(); #if !defined(AKANTU_NDEBUG) int ret = #endif MPI_Gatherv(send_buf, *nb_values, type, recv_buf, nb_values, displs, type, root, communicator); AKANTU_DEBUG_ASSERT(ret == MPI_SUCCESS, "Error in MPI_Gather."); if(prank == root) { delete [] displs; } } /* -------------------------------------------------------------------------- */ template inline MPI_Datatype StaticCommunicatorMPI::getMPIDatatype() { return MPI_DATATYPE_NULL; } template<> inline MPI_Datatype StaticCommunicatorMPI::getMPIDatatype() { return MPI_DOUBLE; } template<> inline MPI_Datatype StaticCommunicatorMPI::getMPIDatatype() { return MPI_UNSIGNED; } template<> inline MPI_Datatype StaticCommunicatorMPI::getMPIDatatype() { return MPI_INT; } /* -------------------------------------------------------------------------- */