CG-Bench/src/comm.c

597 lines
18 KiB
C

/* Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
* All rights reserved. This file is part of nusif-solver.
* Use of this source code is governed by a MIT style
* license that can be found in the LICENSE file.
*/
#include "util.h"
#include <stdio.h>
#include <stdlib.h>
#ifdef _MPI
#include <mpi.h>
#endif
#include "allocate.h"
#include "comm.h"
// subroutines local to this module
int sizeOfRank(int rank, int size, int N)
{
return N / size + ((N % size > rank) ? 1 : 0);
}
void commReduction(double* v, int op)
{
#ifdef _MPI
if (op == MAX) {
MPI_Allreduce(MPI_IN_PLACE, v, 1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD);
} else if (op == SUM) {
MPI_Allreduce(MPI_IN_PLACE, v, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);
}
#endif
}
void commPartition(Comm* c, Matrix* A)
{
#ifdef _MPI
int rank = c->rank;
int size = c->size;
MPI_Comm comm = c->comm;
// Extract Matrix pieces
int start_row = A->startRow;
int stop_row = A->stopRow;
int total_nrow = A->totalNr;
long long total_nnz = A->totalNnz;
int local_nrow = A->nr;
int local_nnz = A->nnz;
int* row_ptr = (int*)A->rowPtr;
int* col_ind = (int*)A->colInd;
// int* nnz_in_row = A->nnz_in_row;
// double** ptr_to_vals_in_row = A->ptr_to_vals_in_row;
// int** ptr_to_inds_in_row = A->ptr_to_inds_in_row;
// We need to convert the index values for the rows on this processor
// to a local index space. We need to:
// - Determine if each index reaches to a local value or external value
// - If local, subtract start_row from index value to get local index
// - If external, find out if it is already accounted for.
// - If so, then do nothing,
// - otherwise
// - add it to the list of external indices,
// - find out which processor owns the value.
// - Set up communication for sparse MV operation.
// Scan the indices and transform to local
int* externals = (int*)allocate(ARRAY_ALIGNMENT, A->totalNr * sizeof(int));
int num_external = 1;
int* external_index = (int*)allocate(ARRAY_ALIGNMENT, MAX_EXTERNAL * sizeof(int));
c->external_index = external_index;
for (int i = 0; i < A->totalNr; i++) {
externals[i] = -1;
}
for (int i = 0; i < local_nrow; i++) {
for (int j = row_ptr[i]; j < row_ptr[i + 1]; j++) {
int cur_ind = A->colInd[j];
#ifdef VERBOSE
printf("Process %d of %d getting index %d in local row %d\n",
rank,
size,
cur_ind,
i);
#endif
// shift local rows to the start
if (start_row <= cur_ind && cur_ind <= stop_row) {
col_ind[j] -= start_row;
} else {
// Must find out if we have already set up this point
if (externals[cur_ind] == -1) {
externals[cur_ind] = num_external++;
if (num_external <= MAX_EXTERNAL) {
external_index[num_external - 1] = cur_ind;
// Mark index as external by negating it
col_ind[j] = -col_ind[j];
} else {
printf("Must increase MAX_EXTERNAL\n");
exit(EXIT_FAILURE);
}
} else {
// Mark index as external by negating it
col_ind[j] = -col_ind[j];
}
}
}
}
/**************************************************************************
Go through list of externals to find out which processors must be accessed.
**************************************************************************/
c->num_external = num_external;
int tmp_buffer[size];
int global_index_offsets[size];
for (int i = 0; i < size; i++) {
tmp_buffer[i] = 0;
}
tmp_buffer[rank] = start_row;
MPI_Allreduce(tmp_buffer,
global_index_offsets,
size,
MPI_INT,
MPI_SUM,
MPI_COMM_WORLD);
// Go through list of externals and find the processor that owns each
int external_processor[num_external];
for (int i = 0; i < num_external; i++) {
int cur_ind = external_index[i];
for (int j = size - 1; j >= 0; j--) {
if (global_index_offsets[j] <= cur_ind) {
external_processor[i] = j;
break;
}
}
}
/*Go through the external elements. For each newly encountered external
point assign it the next index in the local sequence. Then look for other
external elements who are updated by the same node and assign them the next
set of index numbers in the local sequence (ie. elements updated by the same node
have consecutive indices).*/
int* external_local_index = (int*)allocate(ARRAY_ALIGNMENT,
MAX_EXTERNAL * sizeof(int));
c->external_local_index = external_local_index;
int count = local_nrow;
for (int i = 0; i < num_external; i++) {
external_local_index[i] = -1;
}
for (int i = 0; i < num_external; i++) {
if (external_local_index[i] == -1) {
external_local_index[i] = count++;
for (int j = i + 1; j < num_external; j++) {
if (external_processor[j] == external_processor[i]) {
external_local_index[j] = count++;
}
}
}
}
// map all external ids to the new local index
CG_UINT* rowPtr = A->rowPtr;
for (int i = 0; i < local_nrow; i++) {
for (int j = rowPtr[i]; j < rowPtr[i + 1]; j++) {
if (col_ind[j] < 0) {
int cur_ind = -col_ind[j] - 1; // FIXME: Offset by 1??
col_ind[j] = external_local_index[externals[cur_ind]];
}
}
}
int new_external_processor[num_external];
for (int i = 0; i < num_external; i++) {
new_external_processor[i] = 0;
}
// setup map from external id to partition
for (int i = 0; i < num_external; i++) {
new_external_processor[external_local_index[i] - local_nrow] =
external_processor[i];
}
#ifdef VERBOSE
for (int i = 0; i < num_external; i++) {
printf("Process %d of %d: external process[%d] = %d\n",
rank,
size,
i,
external_processor[i]);
}
#endif
/* Count the number of neighbors from which we receive information to update
our external elements. Additionally, fill the array tmp_neighbors in the
following way:
tmp_neighbors[i] = 0 ==> No external elements are updated by
processor i.
tmp_neighbors[i] = x ==> (x-1)/size elements are updated from
processor i.*/
int tmp_neighbors[size];
for (int i = 0; i < size; i++) {
tmp_neighbors[i] = 0;
}
int num_recv_neighbors = 0;
int length = 1;
for (int i = 0; i < num_external; i++) {
if (tmp_neighbors[new_external_processor[i]] == 0) {
num_recv_neighbors++;
tmp_neighbors[new_external_processor[i]] = 1;
}
tmp_neighbors[new_external_processor[i]] += size;
}
// sum over all processors all the tmp_neighbors arrays
MPI_Allreduce(tmp_neighbors, tmp_buffer, size, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
/* decode the combined 'tmp_neighbors' (stored in tmp_buffer) array from all the
* processors */
int num_send_neighbors = tmp_buffer[rank] % size;
/* decode 'tmp_buffer[rank] to deduce total number of elements we must send */
int total_to_be_sent = (tmp_buffer[rank] - num_send_neighbors) / size;
/* Check to see if we have enough workspace allocated. This could be
dynamically modified, but let's keep it simple for now...*/
if (num_send_neighbors > MAX_NUM_MESSAGES) {
printf("Must increase MAX_NUM_MESSAGES. Must be at least %d\n",
num_send_neighbors);
exit(EXIT_FAILURE);
}
if (total_to_be_sent > MAX_EXTERNAL) {
printf("Must increase MAX_EXTERNAL. Must be at least %d\n", total_to_be_sent);
exit(EXIT_FAILURE);
}
#ifdef VERBOSE
cout << "Processor " << rank << " of " << size
<< ": Number of send neighbors = " << num_send_neighbors << endl;
cout << "Processor " << rank << " of " << size
<< ": Number of receive neighbors = " << num_recv_neighbors << endl;
cout << "Processor " << rank << " of " << size
<< ": Total number of elements to send = " << total_to_be_sent << endl;
MPI_Barrier(MPI_COMM_WORLD);
#endif
/* Make a list of the neighbors that will send information to update our
external elements (in the order that we will receive this information).*/
int* recv_list = allocate(ARRAY_ALIGNMENT, MAX_EXTERNAL * sizeof(int));
// FIXME: Create local scope
int j = 0;
recv_list[j++] = new_external_processor[0];
for (int i = 1; i < num_external; i++) {
if (new_external_processor[i - 1] != new_external_processor[i]) {
recv_list[j++] = new_external_processor[i];
}
}
// Ensure that all the neighbors we expect to receive from also send to us
// Send a 0 length message to each of our recv neighbors
int send_list[num_send_neighbors];
for (int i = 0; i < num_send_neighbors; i++) {
send_list[i] = 0;
}
// first post receives, these are immediate receives
// Do not wait for result to come, will do that at the
// wait call below.
int MPI_MY_TAG = 99;
MPI_Request request[MAX_NUM_MESSAGES];
for (int i = 0; i < num_send_neighbors; i++) {
MPI_Irecv(tmp_buffer + i,
1,
MPI_INT,
MPI_ANY_SOURCE,
MPI_MY_TAG,
MPI_COMM_WORLD,
request + i);
}
// send messages
for (int i = 0; i < num_recv_neighbors; i++) {
MPI_Send(tmp_buffer + i, 1, MPI_INT, recv_list[i], MPI_MY_TAG, MPI_COMM_WORLD);
}
// Receive message from each send neighbor to construct 'send_list'.
MPI_Status status;
for (int i = 0; i < num_send_neighbors; i++) {
if (MPI_Wait(request + i, &status)) {
printf("MPI_Wait error\n");
exit(EXIT_FAILURE);
}
send_list[i] = status.MPI_SOURCE;
}
/* Compare the two lists. In most cases they should be the same.
// However, if they are not then add new entries to the recv list
// that are in the send list (but not already in the recv list).
WHY!! This ensures that the sendlist is equal to the sendlist
But why is this required? -> Just One neighbour list??*/
for (int j = 0; j < num_send_neighbors; j++) {
int found = 0;
for (int i = 0; i < num_recv_neighbors; i++) {
if (recv_list[i] == send_list[j]) found = 1;
}
if (found == 0) {
#ifdef VERBOSE
printf("Process %d of %d: recv_list[%d] = %d\n",
rank,
size,
num_recv_neighbors,
send_list[i]);
#endif
recv_list[num_recv_neighbors] = send_list[j];
(num_recv_neighbors)++;
}
}
num_send_neighbors = num_recv_neighbors;
if (num_send_neighbors > MAX_NUM_MESSAGES) {
printf("Must increase MAX_EXTERNAL\n");
exit(EXIT_FAILURE);
}
// Start filling communication setup
// Create 'new_external' which explicitly put the external elements in the
// order given by 'external_local_index'
c->total_to_be_sent = total_to_be_sent;
int* elements_to_send = (int*)allocate(ARRAY_ALIGNMENT,
total_to_be_sent * sizeof(int));
c->elements_to_send = elements_to_send;
for (int i = 0; i < total_to_be_sent; i++) {
elements_to_send[i] = 0;
}
// Create 'new_external' which explicitly put the external elements in the
// order given by 'external_local_index'
int* new_external = (int*)allocate(ARRAY_ALIGNMENT, num_external * sizeof(int));
for (int i = 0; i < num_external; i++) {
new_external[external_local_index[i] - local_nrow] = external_index[i];
}
// Send each processor the global index list of the external elements in the
// order that I will want to receive them when updating my external elements
int lengths[num_recv_neighbors];
MPI_MY_TAG++;
// First post receives
for (int i = 0; i < num_recv_neighbors; i++) {
int partner = recv_list[i];
MPI_Irecv(lengths + i,
1,
MPI_INT,
partner,
MPI_MY_TAG,
MPI_COMM_WORLD,
request + i);
}
int* neighbors = c->neighbors;
int* recv_length = c->recv_length;
int* send_length = c->send_length;
j = 0;
for (int i = 0; i < num_recv_neighbors; i++) {
int start = j;
int newlength = 0;
// go through list of external elements until updating
// processor changes
while ((j < num_external) && (new_external_processor[j] == recv_list[i])) {
newlength++;
j++;
if (j == num_external) break;
}
recv_length[i] = newlength;
neighbors[i] = recv_list[i];
length = j - start;
MPI_Send(&length, 1, MPI_INT, recv_list[i], MPI_MY_TAG, MPI_COMM_WORLD);
}
// Complete the receives of the number of externals
for (int i = 0; i < num_recv_neighbors; i++) {
if (MPI_Wait(request + i, &status)) {
printf("MPI_Wait error\n");
exit(EXIT_FAILURE);
}
send_length[i] = lengths[i];
}
// Build "elements_to_send" list. These are the x elements I own
// that need to be sent to other processors.
MPI_MY_TAG++;
j = 0;
for (int i = 0; i < num_recv_neighbors; i++) {
MPI_Irecv(elements_to_send + j,
send_length[i],
MPI_INT,
neighbors[i],
MPI_MY_TAG,
MPI_COMM_WORLD,
request + i);
j += send_length[i];
}
j = 0;
for (int i = 0; i < num_recv_neighbors; i++) {
int start = j;
int newlength = 0;
// Go through list of external elements
// until updating processor changes. This is redundant, but
// saves us from recording this information.
while ((j < num_external) && (new_external_processor[j] == recv_list[i])) {
newlength++;
j++;
if (j == num_external) break;
}
MPI_Send(new_external + start,
j - start,
MPI_INT,
recv_list[i],
MPI_MY_TAG,
MPI_COMM_WORLD);
}
// receive from each neighbor the global index list of external elements
for (int i = 0; i < num_recv_neighbors; i++) {
if (MPI_Wait(request + i, &status)) {
printf("MPI_Wait error\n");
exit(EXIT_FAILURE);
}
}
/// replace global indices by local indices
for (int i = 0; i < total_to_be_sent; i++) {
elements_to_send[i] -= start_row;
}
// Finish up !!
c->num_send_neighbors = num_send_neighbors;
A->nc = A->nc + num_external;
// Used in exchange
CG_FLOAT* send_buffer = (CG_FLOAT*)allocate(ARRAY_ALIGNMENT,
total_to_be_sent * sizeof(CG_FLOAT));
c->send_buffer = send_buffer;
free(recv_list);
free(new_external);
#endif
}
void commExchange(Comm* c, Matrix* A, double* x)
{
#ifdef _MPI
int num_external = 0;
// Extract Matrix pieces
int local_nrow = A->nr;
int num_neighbors = c->num_send_neighbors;
int* recv_length = c->recv_length;
int* send_length = c->send_length;
int* neighbors = c->neighbors;
double* send_buffer = c->send_buffer;
int total_to_be_sent = c->total_to_be_sent;
int* elements_to_send = c->elements_to_send;
int rank = c->rank;
int size = c->size;
MPI_Comm comm = c->comm;
// first post receives, these are immediate receives
// Do not wait for result to come, will do that at the
// wait call below.
int MPI_MY_TAG = 99;
MPI_Request request[num_neighbors];
// Externals are at end of locals
double* x_external = (double*)x + local_nrow;
// Post receives first
for (int i = 0; i < num_neighbors; i++) {
int n_recv = recv_length[i];
MPI_Irecv(x_external,
n_recv,
MPI_DOUBLE,
neighbors[i],
MPI_MY_TAG,
MPI_COMM_WORLD,
request + i);
x_external += n_recv;
}
// Fill up send buffer
for (int i = 0; i < total_to_be_sent; i++) {
send_buffer[i] = x[elements_to_send[i]];
}
// Send to each neighbor
for (int i = 0; i < num_neighbors; i++) {
int n_send = send_length[i];
MPI_Send(send_buffer,
n_send,
MPI_DOUBLE,
neighbors[i],
MPI_MY_TAG,
MPI_COMM_WORLD);
send_buffer += n_send;
}
// Complete the reads issued above
MPI_Status status;
for (int i = 0; i < num_neighbors; i++) {
if (MPI_Wait(request + i, &status)) {
printf("MPI_Wait error\n");
exit(EXIT_FAILURE);
}
}
#endif
}
void commPrintConfig(Comm* c)
{
#ifdef _MPI
fflush(stdout);
MPI_Barrier(MPI_COMM_WORLD);
if (commIsMaster(c)) {
printf("Communication setup:\n");
}
for (int i = 0; i < c->size; i++) {
if (i == c->rank) {
fflush(stdout);
}
MPI_Barrier(MPI_COMM_WORLD);
}
#endif
}
void commInit(Comm* c, int argc, char** argv)
{
#ifdef _MPI
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &(c->rank));
MPI_Comm_size(MPI_COMM_WORLD, &(c->size));
#else
c->rank = 0;
c->size = 1;
#endif
}
void commFinalize(Comm* c)
{
#ifdef _MPI
MPI_Finalize();
#endif
}