Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F76363811
MPI_Wrappers.cpp
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, Aug 7, 13:54
Size
11 KB
Mime Type
text/x-c
Expires
Fri, Aug 9, 13:54 (1 d, 21 h)
Engine
blob
Format
Raw Data
Handle
19700491
Attached To
rLAMMPS lammps
MPI_Wrappers.cpp
View Options
#include "MPI_Wrappers.h"
#include "Utility.h"
using
ATC_Utility
::
to_string
;
#include "ATC_Error.h"
using
ATC
::
ATC_Error
;
using
std
::
cout
;
using
std
::
string
;
#ifdef ISOLATE_FE
#include "Matrix.h"
using
ATC_Matrix
::
SparseMatrix
;
#endif
namespace
MPI_Wrappers
{
int
rank
(
MPI_Comm
comm
)
{
int
rank
;
MPI_Comm_rank
(
comm
,
&
rank
);
return
rank
;
}
bool
rank_zero
(
MPI_Comm
comm
)
{
return
rank
(
comm
)
==
0
;}
int
size
(
MPI_Comm
comm
)
{
int
size
;
MPI_Comm_size
(
comm
,
&
size
);
return
size
;
}
bool
serial
(
MPI_Comm
comm
)
{
return
size
(
comm
)
==
0
;
}
void
broadcast
(
MPI_Comm
comm
,
double
*
buf
,
int
count
)
{
int
error
=
MPI_Bcast
(
buf
,
count
,
MPI_DOUBLE
,
0
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in broadcast "
+
to_string
(
error
));
}
void
int_broadcast
(
MPI_Comm
comm
,
int
*
buf
,
int
count
)
{
int
error
=
MPI_Bcast
(
buf
,
count
,
MPI_INT
,
0
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in broadcast "
+
to_string
(
error
));
}
void
allsum
(
MPI_Comm
comm
,
void
*
send_buf
,
double
*
rec_buf
,
int
count
)
{
int
error
=
MPI_Allreduce
(
send_buf
,
rec_buf
,
count
,
MPI_DOUBLE
,
MPI_SUM
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in allsum "
+
to_string
(
error
));
}
void
int_allsum
(
MPI_Comm
comm
,
void
*
send_buf
,
int
*
rec_buf
,
int
count
)
{
int
error
=
MPI_Allreduce
(
send_buf
,
rec_buf
,
count
,
MPI_INT
,
MPI_SUM
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in int_allsum "
+
to_string
(
error
));
}
void
int_scansum
(
MPI_Comm
comm
,
int
*
send_buf
,
int
*
rec_buf
,
int
count
)
{
int
error
=
MPI_Scan
(
send_buf
,
rec_buf
,
count
,
MPI_INT
,
MPI_SUM
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in int_scansum "
+
to_string
(
error
));
}
void
allmax
(
MPI_Comm
comm
,
double
*
send_buf
,
double
*
rec_buf
,
int
count
)
{
int
error
=
MPI_Allreduce
(
send_buf
,
rec_buf
,
count
,
MPI_DOUBLE
,
MPI_MAX
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in allmax "
+
to_string
(
error
));
}
void
int_allmax
(
MPI_Comm
comm
,
int
*
send_buf
,
int
*
rec_buf
,
int
count
)
{
int
error
=
MPI_Allreduce
(
send_buf
,
rec_buf
,
count
,
MPI_INT
,
MPI_MAX
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in int_allmax "
+
to_string
(
error
));
}
void
allmin
(
MPI_Comm
comm
,
double
*
send_buf
,
double
*
rec_buf
,
int
count
)
{
int
error
=
MPI_Allreduce
(
send_buf
,
rec_buf
,
count
,
MPI_DOUBLE
,
MPI_MIN
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in allmax "
+
to_string
(
error
));
}
void
int_allmin
(
MPI_Comm
comm
,
int
*
send_buf
,
int
*
rec_buf
,
int
count
)
{
int
error
=
MPI_Allreduce
(
send_buf
,
rec_buf
,
count
,
MPI_INT
,
MPI_MIN
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in int_allmax "
+
to_string
(
error
));
}
int
rank_min
(
MPI_Comm
comm
,
double
*
send_buf
,
double
*
rec_buf
,
int
count
)
{
int
myRank
;
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
myRank
);
DOUBLE_RANK
in
[
count
],
out
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++
)
{
in
[
i
].
val
=
send_buf
[
i
];
in
[
i
].
rank
=
myRank
;
}
int
error
=
MPI_Allreduce
(
in
,
out
,
count
,
MPI_DOUBLE_INT
,
MPI_MINLOC
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in rank_min "
+
to_string
(
error
));
for
(
int
i
=
0
;
i
<
count
;
i
++
)
{
rec_buf
[
i
]
=
out
[
i
].
val
;
}
return
out
[
0
].
rank
;
}
void
int_recv
(
MPI_Comm
comm
,
int
*
recv_buf
,
int
max_size
,
int
iproc
)
{
MPI_Status
status
;
MPI_Request
request
;
int
tmp
,
error
,
recv_size
;
error
=
MPI_Irecv
(
recv_buf
,
max_size
,
MPI_INT
,
iproc
,
0
,
comm
,
&
request
);
error
=
error
&&
MPI_Send
(
&
tmp
,
0
,
MPI_INT
,
iproc
,
0
,
comm
);
error
=
error
&&
MPI_Wait
(
&
request
,
&
status
);
error
=
error
&&
MPI_Get_count
(
&
status
,
MPI_DOUBLE
,
&
recv_size
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in int_recv "
+
to_string
(
error
));
}
void
recv
(
MPI_Comm
comm
,
double
*
recv_buf
,
int
max_size
,
int
iproc
)
{
MPI_Status
status
;
MPI_Request
request
;
int
tmp
,
error
,
recv_size
;
error
=
MPI_Irecv
(
recv_buf
,
max_size
,
MPI_DOUBLE
,
iproc
,
0
,
comm
,
&
request
);
error
=
error
&&
MPI_Send
(
&
tmp
,
0
,
MPI_INT
,
iproc
,
0
,
comm
);
error
=
error
&&
MPI_Wait
(
&
request
,
&
status
);
error
=
error
&&
MPI_Get_count
(
&
status
,
MPI_DOUBLE
,
&
recv_size
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in recv "
+
to_string
(
error
));
}
void
int_send
(
MPI_Comm
comm
,
int
*
send_buf
,
int
send_size
)
{
MPI_Status
status
;
int
tmp
,
error
;
error
=
MPI_Recv
(
&
tmp
,
0
,
MPI_INT
,
0
,
0
,
comm
,
&
status
);
error
=
error
&&
MPI_Rsend
(
send_buf
,
send_size
,
MPI_INT
,
0
,
0
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in int_send "
+
to_string
(
error
));
}
void
send
(
MPI_Comm
comm
,
double
*
send_buf
,
int
send_size
)
{
MPI_Status
status
;
int
tmp
,
error
;
error
=
MPI_Recv
(
&
tmp
,
0
,
MPI_INT
,
0
,
0
,
comm
,
&
status
);
error
=
error
&&
MPI_Rsend
(
send_buf
,
send_size
,
MPI_DOUBLE
,
0
,
0
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in int_send "
+
to_string
(
error
));
}
void
int_scatter
(
MPI_Comm
comm
,
int
*
send_buf
,
int
*
rec_buf
,
int
count
)
{
int
error
;
int
numprocs
=
size
(
comm
);
int
sizes
[
numprocs
];
int
displacements
[
numprocs
];
for
(
int
i
=
0
;
i
<
numprocs
;
++
i
)
{
sizes
[
i
]
=
1
;
displacements
[
i
]
=
i
;
}
error
=
MPI_Scatterv
(
send_buf
,
sizes
,
displacements
,
MPI_INT
,
rec_buf
,
count
,
MPI_INT
,
0
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in int_scatter "
+
to_string
(
error
));
}
void
allgatherv
(
MPI_Comm
comm
,
double
*
send_buf
,
int
send_count
,
double
*
rec_buf
,
int
*
rec_counts
,
int
*
displacements
)
{
int
error
=
MPI_Allgatherv
(
send_buf
,
send_count
,
MPI_DOUBLE
,
rec_buf
,
rec_counts
,
displacements
,
MPI_DOUBLE
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in allgatherv "
+
to_string
(
error
));
}
void
gather
(
MPI_Comm
comm
,
double
send
,
double
*
recv
)
{
int
send_count
=
1
;
int
recv_count
=
1
;
int
root
=
0
;
int
error
=
MPI_Gather
(
&
send
,
send_count
,
MPI_DOUBLE
,
recv
,
recv_count
,
MPI_DOUBLE
,
root
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in allgatherv "
+
to_string
(
error
));
}
void
int_allgather
(
MPI_Comm
comm
,
int
send
,
int
*
recv
)
{
int
send_count
=
1
;
int
recv_count
=
1
;
int
error
=
MPI_Allgather
(
&
send
,
send_count
,
MPI_INT
,
recv
,
recv_count
,
MPI_INT
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in allgatherv "
+
to_string
(
error
));
}
void
logical_or
(
MPI_Comm
comm
,
void
*
send_buf
,
int
*
rec_buf
,
int
count
)
{
int
error
=
MPI_Allreduce
(
send_buf
,
rec_buf
,
count
,
MPI_INT
,
MPI_LOR
,
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in logical_or "
+
to_string
(
error
));
}
void
barrier
(
MPI_Comm
comm
)
{
int
error
=
MPI_Barrier
(
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in barrier "
+
to_string
(
error
));
}
void
stop
(
MPI_Comm
comm
,
string
msg
)
{
int
error
=
MPI_Barrier
(
comm
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in barrier "
+
to_string
(
error
));
throw
ATC_Error
(
"...stopping "
+
msg
);
}
#ifdef ISOLATE_FE
void
sparse_allsum
(
MPI_Comm
comm
,
SparseMatrix
<
double
>
&
toShare
)
const
{
toShare
.
compress
();
// initialize MPI information
int
nProcs
=
size
(
comm
);
int
myRank
=
rank
(
comm
);;
int
error
;
// get numbers of rows, columns, rowsCRS, and
// sizes (number of nonzero elements in matrix)
SparseMatInfo
*
recInfo
=
new
SparseMatInfo
[
nProcs
];
SparseMatInfo
myInfo
;
myInfo
.
rows
=
toShare
.
nRows
();
myInfo
.
cols
=
toShare
.
nCols
();
myInfo
.
rowsCRS
=
toShare
.
nRowsCRS
();
myInfo
.
size
=
toShare
.
size
();
error
=
MPI_Allgather
(
&
myInfo
,
4
,
MPI_INT
,
recInfo
,
4
,
MPI_INT
,
lammps_
->
world
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in sparse_allsum_numrows "
+
to_string
(
error
));
// adjust row sendcounts because recRowsCRS is off by one
int
rowCounts
[
nProcs
];
int
sizeCounts
[
nProcs
];
// set up total size of receive buffers for Allgatherv calls
int
totalRowsCRS
=
0
;
int
totalSize
=
0
;
// set up array of displacements for Allgatherv calls
int
rowOffsets
[
nProcs
];
rowOffsets
[
0
]
=
0
;
int
sizeOffsets
[
nProcs
];
sizeOffsets
[
0
]
=
0
;
for
(
int
i
=
0
;
i
<
nProcs
;
i
++
)
{
// find the total number of entries to share in the mpi calls below
rowCounts
[
i
]
=
recInfo
[
i
].
rowsCRS
+
1
;
sizeCounts
[
i
]
=
recInfo
[
i
].
size
;
totalRowsCRS
+=
rowCounts
[
i
];
totalSize
+=
recInfo
[
i
].
size
;
// these already have their 0th slot filled in
if
(
i
==
0
)
continue
;
rowOffsets
[
i
]
=
rowOffsets
[
i
-
1
]
+
rowCounts
[
i
-
1
];
sizeOffsets
[
i
]
=
sizeOffsets
[
i
-
1
]
+
sizeCounts
[
i
-
1
];
}
// get actual rows
INDEX
*
rec_ia
=
new
INDEX
[
totalRowsCRS
];
if
(
toShare
.
size
()
==
0
)
{
double
dummy
[
0
];
error
=
MPI_Allgatherv
(
dummy
,
0
,
MPI_INT
,
rec_ia
,
rowCounts
,
rowOffsets
,
MPI_INT
,
lammps_
->
world
);
}
else
error
=
MPI_Allgatherv
(
toShare
.
rows
(),
rowCounts
[
myRank
],
MPI_INT
,
rec_ia
,
rowCounts
,
rowOffsets
,
MPI_INT
,
lammps_
->
world
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in sparse_allsum_rowarray "
+
to_string
(
error
));
// get actual cols
INDEX
*
rec_ja
=
new
INDEX
[
totalSize
];
error
=
MPI_Allgatherv
(
toShare
.
cols
(),
sizeCounts
[
myRank
],
MPI_INT
,
rec_ja
,
sizeCounts
,
sizeOffsets
,
MPI_INT
,
lammps_
->
world
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in sparse_allsum_colarray "
+
to_string
(
error
));
// get the array of values
double
*
rec_vals
=
new
double
[
totalSize
];
error
=
MPI_Allgatherv
(
toShare
.
ptr
(),
sizeCounts
[
myRank
],
MPI_DOUBLE
,
rec_vals
,
sizeCounts
,
sizeOffsets
,
MPI_DOUBLE
,
lammps_
->
world
);
if
(
error
!=
MPI_SUCCESS
)
throw
ATC_Error
(
"error in sparse_allsum_valarray "
+
to_string
(
error
));
INDEX
*
rec_ia_proc
;
INDEX
*
rec_ja_proc
;
double
*
rec_vals_proc
;
for
(
int
i
=
0
;
i
<
nProcs
;
i
++
)
{
if
(
myRank
!=
i
)
{
// deallocated when tempMat is deleted since it wraps them
rec_ia_proc
=
new
INDEX
[
rowCounts
[
i
]];
rec_ja_proc
=
new
INDEX
[
sizeCounts
[
i
]];
rec_vals_proc
=
new
double
[
sizeCounts
[
i
]];
// copy the data passed with MPI into the new spots
copy
(
rec_ia
+
rowOffsets
[
i
],
rec_ia
+
rowOffsets
[
i
]
+
rowCounts
[
i
],
rec_ia_proc
);
copy
(
rec_ja
+
sizeOffsets
[
i
],
rec_ja
+
sizeOffsets
[
i
]
+
sizeCounts
[
i
],
rec_ja_proc
);
copy
(
rec_vals
+
sizeOffsets
[
i
],
rec_vals
+
sizeOffsets
[
i
]
+
sizeCounts
[
i
],
rec_vals_proc
);
// Does anyone know why we have to declare tempMat here (as well as set it equal to
// something) to avoid segfaults? there are still segfaults, but they happen at a much
// later stage of the game now (and for less benchmarks overall).
SparseMatrix
<
double
>
tempMat
=
SparseMatrix
<
double
>
(
rec_ia_proc
,
rec_ja_proc
,
rec_vals_proc
,
recInfo
[
i
].
size
,
recInfo
[
i
].
rows
,
recInfo
[
i
].
cols
,
recInfo
[
i
].
rowsCRS
);
toShare
+=
tempMat
;
}
}
delete
[]
recInfo
;
delete
[]
rec_ia
;
delete
[]
rec_ja
;
delete
[]
rec_vals
;
}
#endif
void
print_msg
(
MPI_Comm
comm
,
string
msg
)
{
if
(
serial
(
comm
))
{
cout
<<
" ATC: "
<<
msg
<<
"
\n
"
;
}
else
{
cout
<<
" ATC: P"
<<
rank
(
comm
)
<<
", "
<<
msg
<<
"
\n
"
;
}
}
void
print_msg_once
(
MPI_Comm
comm
,
string
msg
,
bool
prefix
,
bool
endline
)
{
if
(
rank_zero
(
comm
))
{
if
(
prefix
)
cout
<<
" ATC: "
;
cout
<<
msg
;
if
(
endline
)
cout
<<
"
\n
"
;
}
}
}
Event Timeline
Log In to Comment