Как тиражировать функцию MPI_Accumulate в MPI-2 +

Я изучаю MPI одностороннюю связь, представленную в MPI-2 / MPI-3, и наткнулся на это страница онлайн курса около MPI_Accumulate:

MPI_Accumulate позволяет вызывающей стороне объединять данные, перемещенные в
целевой процесс с уже имеющимися данными, такими как накопление
сумма в целевом процессе. Та же функциональность может быть достигнута
использование MPI_Get для извлечения данных (с последующей синхронизацией);
выполнение операции суммирования у вызывающей стороны; затем используя MPI_Put для отправки
обновленные данные возвращаются к целевому процессу. Накопить упрощает
эта грязь …

Но есть только ограниченное количество операций, которые разрешено использовать с MPI_Accumulate (max, min, sum, product и т. д.) и пользовательские операции запрещены. Мне было интересно, как реализовать вышеупомянутое беспорядочности, с помощью MPI_Get, синхронизация, операции и MPI_Put, Существуют ли учебники или примеры рабочего кода на C / C ++?

Спасибо


Для того, чтобы проверить, я адаптировал кусок кода из этого ТАК вопрос, в котором односторонняя связь используется для создания целочисленного счетчика, который синхронизируется между процессами MPI. Целевая проблемная строка с использованием MPI_Accumulate помечен.

Код компилируется как есть и возвращается примерно через 15 секунд. Но когда я попытался заменить MPI_Accumulate с эквивалентной последовательностью основных операций, как показано в блоке комментариев сразу после проблемной строки, скомпилированная программа зависает на неопределенное время.

Может кто-нибудь, пожалуйста, помогите объяснить, что пошло не так, и
какой правильный способ заменить MPI_Accumulate в данном контексте?

Постскриптум Я скомпилировал код с

g++ -std=c++11 -I..   mpistest.cpp -lmpi

и выполнил двоичный файл с

mpiexec -n 4 a.exe

Код:

//adpated from https://stackoverflow.com/questions/4948788/
#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <thread>
#include <chrono>

struct mpi_counter_t {
MPI_Win win;
int  hostrank;  //id of the process that host values to be exposed to all processes
int  rank;    //process id
int  size;     //number of processes
int  val;
int  *hostvals;
};

struct mpi_counter_t *create_counter(int hostrank) {
struct mpi_counter_t *count;

count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
count->hostrank = hostrank;
MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
MPI_Comm_size(MPI_COMM_WORLD, &(count->size));

if (count->rank == hostrank) {
MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->hostvals));
for (int i=0; i<count->size; i++) count->hostvals[i] = 0;
MPI_Win_create(count->hostvals, count->size * sizeof(int), sizeof(int),
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
}
else {
count->hostvals = NULL;
MPI_Win_create(count->hostvals, 0, 1,
MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
}
count -> val = 0;

return count;
}

int increment_counter(struct mpi_counter_t *count, int increment) {
int *vals = (int *)malloc( count->size * sizeof(int) );
int val;

MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

for (int i=0; i<count->size; i++) {

if (i == count->rank) {
MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,count->win); //Problem line: increment hostvals[i] on host
/* //Question: How to correctly replace the above MPI_Accumulate call with the following sequence? Currently, the following causes the program to hang.
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_fence(0,count->win);
vals[i] += increment;
MPI_Put(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_fence(0,count->win);
//*/
} else {
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
}
}

MPI_Win_unlock(0, count->win);

//do op part of MPI_Accumulate's work on count->rank
count->val += increment;
vals[count->rank] = count->val;

//return the sum of vals
val = 0;
for (int i=0; i<count->size; i++)
val += vals[i];

free(vals);
return val;
}

void delete_counter(struct mpi_counter_t **count) {
if ((*count)->rank == (*count)->hostrank) {
MPI_Free_mem((*count)->hostvals);
}
MPI_Win_free(&((*count)->win));
free((*count));
*count = NULL;

return;
}

void print_counter(struct mpi_counter_t *count) {
if (count->rank == count->hostrank) {
for (int i=0; i<count->size; i++) {
printf("%2d ", count->hostvals[i]);
}
puts("");
}
}int main(int argc, char **argv) {

MPI_Init(&argc, &argv);

const int WORKITEMS=50;

struct mpi_counter_t *c;
int rank;
int result = 0;

c = create_counter(0);

MPI_Comm_rank(MPI_COMM_WORLD, &rank);
srand(rank);

while (result < WORKITEMS) {
result = increment_counter(c, 1);
if (result <= WORKITEMS) {
printf("%d working on item %d...\n", rank, result);
std::this_thread::sleep_for (std::chrono::seconds(rand()%2));
} else {
printf("%d done\n", rank);
}
}

MPI_Barrier(MPI_COMM_WORLD);
print_counter(c);
delete_counter(&c);MPI_Finalize();
return 0;
}

Еще один вопрос, я должен использовать MPI_Win_fence здесь вместо замков?

—РЕДАКТИРОВАТЬ—

Я использовал блокировку / разблокировку в increment_counter следующим образом, программа работает, но ведет себя странно. В окончательной распечатке главный узел выполняет всю работу. Все еще в замешательстве.

int increment_counter(struct mpi_counter_t *count, int increment) {
int *vals = (int *)malloc( count->size * sizeof(int) );
int val;

MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

for (int i=0; i<count->size; i++) {

if (i == count->rank) {
//MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,count->win); //Problem line: increment hostvals[i] on host
///* //Question: How to correctly replace the above MPI_Accumulate call with the following sequence? reports that 0 does all the work
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_unlock(0, count->win);
vals[i] += increment;
MPI_Put(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);
//*/
} else {
MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
}
}

MPI_Win_unlock(0, count->win);

//do op part of MPI_Accumulate's work on count->rank
count->val += increment;
vals[count->rank] = count->val;

//return the sum of vals
val = 0;
for (int i=0; i<count->size; i++)
val += vals[i];

free(vals);
return val;
}

3

Решение

Реализация Accumulate with Gets and Puts действительно будет очень грязной, особенно когда вам приходится иметь дело с производными типами данных и тому подобным. Но если предположить, что вы накапливаете одно целое число и хотите просто суммировать локальное значение в удаленном буфере, вы можете сделать следующее (только псевдокод):

MPI_Win_lock(EXCLUSIVE);  /* exclusive needed for accumulate atomicity constraints */
MPI_Get(&remote_data);
MPI_Win_flush(win);  /* make sure GET has completed */
new = local_data + remote_data;
MPI_Put(&new);
MPI_Win_unlock();

Ваш код неверен, потому что вы отказываетесь от монопольной блокировки после GET, что вызывает проблемы атомарности, когда два процесса пытаются суммировать данные одновременно.

1

Другие решения


По вопросам рекламы [email protected]