Page MenuHomec4science

duo_distributed_vector.cc
No OneTemporary

File Metadata

Created
Fri, May 24, 17:07

duo_distributed_vector.cc

/**
* @file duo_distributed_vector.cc
*
* @author Guillaume Anciaux <guillaume.anciaux@epfl.ch>
*
* @date Fri Nov 15 21:14:57 2013
*
* @brief Migration safe representation of array of references
*
* @section LICENSE
*
* Copyright (©) 2010-2011 EPFL (Ecole Polytechnique Fédérale de Lausanne)
* Laboratory (LSMS - Laboratoire de Simulation en Mécanique des Solides)
*
* LibMultiScale is free software: you can redistribute it and/or modify it
* under the
* terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* LibMultiScale is distributed in the hope that it will be useful, but
* WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with LibMultiScale. If not, see <http://www.gnu.org/licenses/>.
*
*/
//#define TIMER
/* -------------------------------------------------------------------------- */
#include "duo_distributed_vector.hh"
#include "comm_buffer.hh"
#include "lm_common.hh"
#include "lm_communicator.hh"
#include "reference_manager.hh"
/* -------------------------------------------------------------------------- */
#include <fstream>
#include <mpi.h>
/* -------------------------------------------------------------------------- */
__BEGIN_LIBMULTISCALE__
DuoDistributedVector::DuoDistributedVector(UInt lsize, UInt tsize,
const std::string &my_name) {
DUMP("Creating duodistributed vector " << my_name << " with size " << lsize
<< "," << tsize,
DBG_INFO_STARTUP);
this->setSize(lsize, tsize);
name = my_name;
something_changed = true;
}
/* -------------------------------------------------------------------------- */
DuoDistributedVector::~DuoDistributedVector() {}
/* -------------------------------------------------------------------------- */
void DuoDistributedVector::setSize(UInt lsize, UInt tsize) {
totalsize = tsize;
localsize = lsize;
}
/* -------------------------------------------------------------------------- */
void DuoDistributedVector::clear() {
com_list.clear();
duo_index_map.clear();
duo_proc_map.clear();
sent.clear();
sent_procs.clear();
received.clear();
moved.clear();
totalsize = 0;
localsize = 0;
something_changed = 0;
}
/* -------------------------------------------------------------------------- */
UInt DuoDistributedVector::synchronizeMigration(CommGroup &groupAtomic,
CommGroup &groupFE) {
DUMP("SynchronizeMigration start (" << this << ")", DBG_INFO);
// printSummary();
something_changed = false;
CommBuffer<UInt> buffer;
Communicator &com = Communicator::getCommunicator();
// FE side
if (groupFE.amIinGroup()) {
// I receive the indexes that have changed processor
MapUIntToUIntList::iterator it;
MapUIntToUIntList::iterator end;
for (it = com_list.begin(), end = com_list.end(); it != end; ++it) {
UInt proc = (*it).first;
if (proc == UINT_MAX)
continue;
buffer.clear();
DUMP("receive migrated indexes from " << proc << " duo is " << name,
DBG_INFO);
groupAtomic.receive(buffer, proc, "receive migrated");
com.waitForPendingComs();
UInt nb_migrated = buffer.size() / 2;
DUMP("received " << nb_migrated << " migrated indexes from " << proc
<< " duo is " << name,
DBG_INFO);
if (nb_migrated) {
something_changed = true;
DUMP("i am warned from proc " << proc << " of migration of "
<< nb_migrated << " atoms"
<< " duo is " << name,
DBG_DETAIL);
UInt com_index;
UInt new_neigh;
UInt real_index;
for (UInt i = 0; i < nb_migrated; ++i) {
buffer >> com_index;
buffer >> new_neigh;
real_index = findRealIndex(com_index, proc);
DUMP("proc " << proc << " warned me that index " << real_index
<< " com index " << com_index << " was sent to proc "
<< new_neigh << " duo is " << name,
DBG_DETAIL);
sent[proc].push_back(real_index);
received[new_neigh].push_back(real_index);
}
}
}
}
// atoms side
if (groupAtomic.amIinGroup()) {
// I send the indexes that have changed processor
MapUIntToUIntList::iterator it;
MapUIntToUIntList::iterator end;
for (it = com_list.begin(), end = com_list.end(); it != end; ++it) {
UInt proc = (*it).first;
if (proc == UINT_MAX)
continue;
UInt nb_migrated = sent[proc].size();
if (nb_migrated)
something_changed = true;
DUMP("send the migrated indexes to proc "
<< proc << "(" << proc << ") : " << nb_migrated << " duo is "
<< name,
DBG_INFO);
buffer.clear();
for (UInt i = 0; i < nb_migrated; ++i) {
UInt index = sent[proc][i];
UInt com_index = duo_index_map[index];
UInt new_neigh = sent_procs[proc][i];
DUMP("I warn proc " << proc << " that index " << index << " com index "
<< com_index << " was sent to proc " << new_neigh
<< " duo is " << name,
DBG_DETAIL);
buffer << com_index;
buffer << new_neigh;
}
groupFE.send(buffer, 2 * nb_migrated, proc, "send migration information");
com.waitForPendingComs();
}
}
STARTTIMER("syncMigration updateMaps");
updateSent();
updateRecv();
updateMoved();
STOPTIMER("syncMigration updateMaps");
DUMP("done sync duo is " << name, DBG_INFO);
return something_changed;
}
/* -------------------------------------------------------------------------- */
void DuoDistributedVector::updateSent() {
// now do the actual remove of indexes on both sides
MapUIntToUIntList::iterator it;
MapUIntToUIntList::iterator end;
for (it = sent.begin(), end = sent.end(); it != end; ++it) {
UInt proc = (*it).first;
std::vector<UInt> &indexes = (*it).second;
while (!indexes.empty()) {
UInt i = indexes.back();
indexes.pop_back();
UInt duo_proc = removeIndex(i);
if (duo_proc != proc)
LM_FATAL("inconstistency");
}
}
sent.clear();
sent_procs.clear();
}
/* -------------------------------------------------------------------------- */
void DuoDistributedVector::updateRecv() {
// now do the actual remove of indexes on both sides
MapUIntToUIntList::iterator it;
MapUIntToUIntList::iterator end;
for (it = received.begin(), end = received.end(); it != end; ++it) {
UInt proc = (*it).first;
std::vector<UInt> &indexes = (*it).second;
UInt nb_recv = indexes.size();
for (UInt i = 0; i < nb_recv; ++i) {
addIndex(indexes[i], proc);
}
}
received.clear();
}
/* -------------------------------------------------------------------------- */
void DuoDistributedVector::updateMoved() {
for (UInt i = 0; i < moved.size(); ++i) {
UInt i_src = moved[i].first;
UInt i_dest = moved[i].second;
LM_ASSERT(duo_index_map.count(i_src),
"inconstistency: index " << i_src << " is not in duo_index_map");
LM_ASSERT(duo_proc_map.count(i_src),
"inconstistency: index " << i_src << " is not in duo_proc_map");
UInt duo_proc = duo_proc_map[i_src];
UInt com_index = duo_index_map[i_src];
duo_index_map.erase(i_src);
duo_proc_map.erase(i_src);
LM_ASSERT(!duo_proc_map.count(i_dest), "inconstistency");
LM_ASSERT(!duo_index_map.count(i_dest), "inconstistency");
DUMP("moving index " << i_src << " to " << i_dest << " for proc "
<< duo_proc << " com_index " << com_index << " duo is "
<< name,
DBG_DETAIL);
std::vector<UInt> &coms = com_list[duo_proc];
LM_ASSERT(coms[com_index] == i_src, "inconstistency");
coms[com_index] = i_dest;
duo_index_map[i_dest] = com_index;
duo_proc_map[i_dest] = duo_proc;
}
moved.clear();
}
/* -------------------------------------------------------------------------- */
void DuoDistributedVector::distributeVector(const std::string &name_vec,
ContainerArray<Real> &vec,
CommGroup &group1,
CommGroup &group2, UInt stride) {
Communicator &com = Communicator::getCommunicator();
MapUIntToUIntList::iterator it;
MapUIntToUIntList::iterator end;
for (it = com_list.begin(), end = com_list.end(); it != end; ++it) {
UInt proc = (*it).first;
if (proc == UINT_MAX)
continue;
std::vector<UInt> &indexes = (*it).second;
CommBuffer<Real> buffer;
// group 1 sends to group2
if (group1.amIinGroup()) {
for (UInt i = 0; i < indexes.size(); ++i) {
UInt index = indexes[i];
DUMP("packing index " << index << " to proc " << proc << " duo is "
<< name,
DBG_DETAIL);
for (UInt k = 0; k < stride; ++k) {
buffer << vec[index * stride + k];
DUMP("packing value[" << k << "] = " << vec[index * stride + k],
DBG_DETAIL);
}
}
DUMP("sending buffer of size " << buffer.size() << " to proc " << proc
<< " duo is " << name,
DBG_INFO);
group2.send(buffer, proc, name_vec);
DUMP("sent buffer of size " << buffer.size() << " to proc " << proc
<< " duo is " << name,
DBG_INFO);
}
// group 2 recv from group1
if (group2.amIinGroup()) {
DUMP("recepting from proc " << proc << " duo is " << name, DBG_INFO);
group1.receive(buffer, indexes.size() * stride, proc, name_vec);
LM_ASSERT(buffer.size() == indexes.size() * stride,
"did not receive the expected amount "
<< buffer.size() << " != " << indexes.size() * stride
<< " from proc " << proc << " duo is " << name
<< " => distribution problem");
DUMP("received " << buffer.size() << " from proc " << proc << " duo is "
<< name,
DBG_INFO);
for (UInt i = 0; i < indexes.size(); ++i) {
UInt index = indexes[i];
DUMP("unpacking index " << index << " com index " << i << " from proc "
<< proc << " duo is " << name,
DBG_DETAIL);
for (UInt k = 0; k < stride; ++k) {
LM_ASSERT(index * stride + k < vec.size(),
"overflow: vec " << name_vec << " not large enough "
<< index * stride + k << " "
<< vec.size());
buffer >> vec[index * stride + k];
DUMP("unpacking value[" << k << "] = " << vec[index * stride + k],
DBG_DETAIL);
}
}
}
}
com.waitForPendingComs();
}
/* -------------------------------------------------------------------------- */
void DuoDistributedVector::synchronizeVectorBySum(const std::string &name_vec,
ContainerArray<Real> &vec,
CommGroup &group1,
CommGroup &group2,
UInt stride) {
std::string message = name_vec;
message += " (sync by sum process)";
if (group1.amIinGroup()) {
DUMP("distribute vector from " << group1 << " to " << group2, DBG_INFO);
distributeVector(message, vec, group1, group2, stride);
} else if (group2.amIinGroup()) {
DUMP("receive partial sum from " << group1 << " to " << group2, DBG_INFO);
ContainerArray<Real> tmp(vec.size());
distributeVector(message, tmp, group1, group2, stride);
// compute sum
for (UInt i = 0; i < vec.size(); ++i)
vec[i] += tmp[i];
}
// renvoie le resultat
DUMP("retour du resultat", DBG_INFO);
distributeVector(message, vec, group2, group1, stride);
}
/* -------------------------------------------------------------------------- */
void DuoDistributedVector::print(const std::string &prefix) {
std::stringstream name;
name << prefix << "-redistrib-scheme" << current_step << "-proc"
<< lm_my_proc_id << ".mat";
std::ofstream file(name.str().c_str());
file << "proc index x y z" << std::endl;
MapUIntToUIntList::iterator it;
MapUIntToUIntList::iterator end;
for (it = com_list.begin(), end = com_list.end(); it != end; ++it) {
UInt proc = (*it).first;
std::vector<UInt> &coms = (*it).second;
for (UInt i = 0; i < coms.size(); ++i) {
UInt index = coms[i];
file << proc << " " << index << std::endl;
}
}
}
/* -------------------------------------------------------------------------- */
void DuoDistributedVector::printSummary() {
MapUIntToUIntList::iterator it;
MapUIntToUIntList::iterator end;
for (it = com_list.begin(), end = com_list.end(); it != end; ++it) {
#ifndef LM_OPTIMIZED
UInt proc = (*it).first;
std::vector<UInt> &coms = (*it).second;
#endif // LM_OPTIMIZED
DUMP("Com with " << proc << " of " << coms.size() << " indexes"
<< " duo is " << name,
DBG_INFO);
}
}
/* -------------------------------------------------------------------------- */
__END_LIBMULTISCALE__

Event Timeline