Я пытаюсь сделать общение не по порядку. В основном у меня есть несколько массивов с плавающей запятой одинакового размера, идентифицируемых целочисленным идентификатором.
Каждое сообщение должно выглядеть так:
<int id><float array data>
На стороне получателя он точно знает, сколько существует массивов, и, таким образом, устанавливает точное число recvs. Получив сообщение, он анализирует идентификатор и помещает данные в нужное место. Проблема в том, что сообщение может быть отправлено из любого другого процесса в процесс получения. (Например, производители имеют структуру рабочей очереди и обрабатывают любой идентификатор, доступный в очереди.)
Поскольку MPI гарантирует только P2P при доставке заказа, я не могу тривиально поместить данные целочисленного идентификатора и данные FP в два сообщения, в противном случае получатель не сможет сопоставить идентификатор с данными. MPI также не допускает двух типов данных за одну отправку.
Я могу думать только о двух подходах.
1) Получатель имеет массив размером m (source [m]), где m — количество отправляющих узлов. Отправитель сначала отправляет идентификатор, затем данные. Получатель сохраняет идентификатор в source [i] после получения целочисленного сообщения от отправителя i. Получив массив FP от отправителя i, он проверяет источник [i], получает идентификатор и перемещает данные в нужное место. Это работает, потому что MPI гарантирует порядок связи P2P. Требуется, чтобы получатель сохранял информацию о состоянии для каждого отправителя. Хуже того, если один процесс отправки может иметь два идентификатора, отправленных перед данными (например, многопоточными), этот механизм не будет работать.
2) Обрабатывать id и FP как байты и копировать их в буфер отправки. Отправьте их как MPI_CHAR, и получатель преобразует их обратно в целое число и массив FP. Затем мне нужно оплатить дополнительную стоимость копирования вещей в байтовый буфер на стороне отправителя. Общий временной буфер также растет по мере увеличения количества потоков в процессе MPI.
Ни один из них не является идеальным решением. Я не хочу ничего блокировать внутри процесса. Интересно, есть ли у кого-нибудь из вас лучшие предложения.
Изменить: код будет запущен на общем кластере с Infiniband. Машины будут назначены случайным образом. Так что я не думаю, что сокеты TCP смогут помочь мне здесь. Кроме того, IPoIB выглядит дорого. Мне нужна полная скорость 40 Гбит / с для связи, и процессор продолжает выполнять вычисления.
Как кто-то уже писал, вы можете использовать MPI_ANY_SOURCE
получать из любого источника. Чтобы отправить два разных типа данных за один раз, вы можете использовать производный тип данных:
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"
#define asize 10
typedef struct data_ {
int id;
float array[asize];
} data;
int main() {
MPI_Init(NULL,NULL);
int rank = -1;
int size = -1;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
data buffer;
// Define and commit a new datatype
int blocklength [2];
MPI_Aint displacement[2];
MPI_Datatype datatypes [2];
MPI_Datatype mpi_tdata;
MPI_Aint startid,startarray;
MPI_Get_address(&(buffer.id),&startid);
MPI_Get_address(&(buffer.array[0]),&startarray);
blocklength [0] = 1;
blocklength [1] = asize;
displacement[0] = 0;
displacement[1] = startarray - startid;
datatypes [0] = MPI_INT;
datatypes [1] = MPI_FLOAT;
MPI_Type_create_struct(2,blocklength,displacement,datatypes,&mpi_tdata);
MPI_Type_commit(&mpi_tdata);
if (rank == 0) {
int count = 0;
MPI_Status status;
while (count < size-1 ) {
// Non-blocking receive
printf("Receiving message %d\n",count);
MPI_Recv(&buffer,1,mpi_tdata,MPI_ANY_SOURCE,0,MPI_COMM_WORLD,&status);
printf("Message tag %d, first entry %g\n",buffer.id,buffer.array[0]);
// Counting the received messages
count++;
}
} else {
// Initialize buffer to be sent
buffer.id = rank;
for (int ii = 0; ii < size; ii++) {
buffer.array[ii] = 10*rank + ii;
}
// Send buffer
MPI_Send(&buffer,1,mpi_tdata,0,0,MPI_COMM_WORLD);
}
MPI_Type_free(&mpi_tdata);
MPI_Finalize();
return 0;
}
Вы можете указать MPI_ANY_SOURCE
в качестве источника ранга в функции приема, затем сортируйте сообщения, используя их теги, что проще, чем создание пользовательских сообщений. Вот упрощенный пример:
#include <stdio.h>
#include "mpi.h"
int main() {
MPI_Init(NULL,NULL);
int rank=0;
int size=1;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
// Receiver is the last node for simplicity in the arrays
if (rank == size-1) {
// Receiver has size-1 slots
float data[size-1];
MPI_Request request[size-1];
// Use tags to sort receives
for (int tag=0;tag<size-1;++tag){
printf("Receiver for id %d\n",tag);
// Non-blocking receive
MPI_Irecv(data+tag,1,MPI_FLOAT,
MPI_ANY_SOURCE,tag,MPI_COMM_WORLD,&request[tag]);
}
// Wait for all requests to complete
printf("Waiting...\n");
MPI_Waitall(size-1,request,MPI_STATUSES_IGNORE);
for (size_t i=0;i<size-1;++i){
printf("%f\n",data[i]);
}
} else {
// Producer
int id = rank;
float data = rank;
printf("Sending {%d}{%f}\n",id,data);
MPI_Send(&data,1,MPI_FLOAT,size-1,id,MPI_COMM_WORLD);
}
return MPI_Finalize();
}