Самый быстрый способ вычисления минимального евклидова расстояния между двумя матрицами, содержащими векторы высокой размерности

Я начал похожий вопрос другая нить, но потом я сосредоточился на том, как использовать OpenCV. Не сумев добиться того, чего я изначально хотел, я спрошу здесь именно то, что я хочу.

У меня есть две матрицы. Матрица a имеет размер 2782×128, а матрица b имеет размер 4000×128, оба значения без знака. Значения хранятся в одном массиве. Для каждого вектора в a мне нужен индекс вектора в b с ближайшим евклидовым расстоянием.

Хорошо, теперь мой код для достижения этой цели:

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"
using namespace std;

void main(int argc, char* argv[])
{
int a_size;
unsigned char* a = NULL;
read_matrix(&a, a_size,"matrixa");
int b_size;
unsigned char* b = NULL;
read_matrix(&b, b_size,"matrixb");

LARGE_INTEGER liStart;
LARGE_INTEGER liEnd;
LARGE_INTEGER liPerfFreq;
QueryPerformanceFrequency( &liPerfFreq );
QueryPerformanceCounter( &liStart );

int* indexes = NULL;
min_distance_loop(&indexes, b, b_size, a, a_size);

QueryPerformanceCounter( &liEnd );

cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

if (a)
delete[]a;
if (b)
delete[]b;
if (indexes)
delete[]indexes;
return;
}

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
ofstream myfile;
float f;
FILE * pFile;
pFile = fopen (matrixPath,"r");
fscanf (pFile, "%d", &matrix_size);
*matrix = new unsigned char[matrix_size*128];

for (int i=0; i<matrix_size*128; ++i)
{
unsigned int matPtr;
fscanf (pFile, "%u", &matPtr);
matrix[i]=(unsigned char)matPtr;
}
fclose (pFile);
}

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
const int descrSize = 128;

*indexes = (int*)malloc(a_size*sizeof(int));
int dataIndex=0;
int vocIndex=0;
int min_distance;
int distance;
int multiply;

unsigned char* dataPtr;
unsigned char* vocPtr;
for (int i=0; i<a_size; ++i)
{
min_distance = LONG_MAX;
for (int j=0; j<b_size; ++j)
{
distance=0;
dataPtr = &a[dataIndex];
vocPtr = &b[vocIndex];

for (int k=0; k<descrSize; ++k)
{
multiply = *dataPtr++-*vocPtr++;
distance += multiply*multiply;
// If the distance is greater than the previously calculated, exit
if (distance>min_distance)
break;
}

// if distance smaller
if (distance<min_distance)
{
min_distance = distance;
(*indexes)[i] = j;
}
vocIndex+=descrSize;
}
dataIndex+=descrSize;
vocIndex=0;
}
}

Прилагаются файлы с образцами матриц.

матрица А
matrixb

Я использую windows.h просто для вычисления потребляющего времени, поэтому, если вы хотите протестировать код на другой платформе, отличной от windows, просто измените заголовок windows.h и измените способ вычисления потребляющего времени.

Этот код в моем компьютере составляет около 0,5 секунд. Проблема в том, что у меня есть другой код в Matlab, который делает то же самое за 0,05 секунды. В моих экспериментах я получаю несколько матриц, таких как матрица а каждую секунду, поэтому 0,5 секунды — это слишком много.

Теперь код Matlab для расчета этого:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b';
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);

Хорошо. Код Matlab использует это (x-a) ^ 2 = x ^ 2 + a ^ 2 — 2ab.

Поэтому моей следующей попыткой было сделать то же самое. Я удалил свой собственный код, чтобы выполнить те же вычисления, но это было примерно за 1,2 секунды.

Затем я попытался использовать разные внешние библиотеки. Первая попытка была Эйген:

const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);

unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
for (int j=0; j<descrSize; ++j)
{
a(i,j)=(int)*dataPtr++;
}
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
for (int j=0; j<descrSize; ++j)
{
b(i,j)=(int)*vocPtr ++;
}
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();

int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
d.row(i).minCoeff(&index[i]);
}

Этот Eigen-код стоит примерно 1.2 для одной строки, которая говорит: ab = a * b.transpose ();

Аналогичный код с использованием opencv также использовался, и стоимость ab = a * b.transpose (); было 0,65 секунды.

Итак, это действительно раздражает, что matlab может делать то же самое так быстро, а я не умею в C ++! Конечно, было бы здорово провести мой эксперимент, но я думаю, что недостаток знаний — это то, что действительно раздражает меня. Как я могу достичь, по крайней мере, такой же производительности, как в Matlab? Любой вид растворения приветствуется. Я имею в виду любую внешнюю библиотеку (бесплатную, если это возможно), циклическое развертывание, шаблоны, SSE-вторжения (я знаю, что они существуют), кэширование. Как я уже сказал, моя главная цель — расширить свои знания, чтобы код мог мыслить так с более высокой производительностью.

заранее спасибо

РЕДАКТИРОВАТЬ: больше кода, предложенного Дэвидом Хамменом. Я привел массивы к int, прежде чем делать какие-либо вычисления. Вот код:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
const int descrSize = 128;

int* a_int;
int* b_int;

LARGE_INTEGER liStart;
LARGE_INTEGER liEnd;
LARGE_INTEGER liPerfFreq;
QueryPerformanceFrequency( &liPerfFreq );
QueryPerformanceCounter( &liStart );

a_int = (int*)malloc(a_size*descrSize*sizeof(int));
b_int = (int*)malloc(b_size*descrSize*sizeof(int));

for(int i=0; i<descrSize*a_size; ++i)
a_int[i]=(int)a[i];
for(int i=0; i<descrSize*b_size; ++i)
b_int[i]=(int)b[i];

QueryPerformanceCounter( &liEnd );

cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

*indexes = (int*)malloc(a_size*sizeof(int));
int dataIndex=0;
int vocIndex=0;
int min_distance;
int distance;
int multiply;

/*unsigned char* dataPtr;
unsigned char* vocPtr;*/
int* dataPtr;
int* vocPtr;
for (int i=0; i<a_size; ++i)
{
min_distance = LONG_MAX;
for (int j=0; j<b_size; ++j)
{
distance=0;
dataPtr = &a_int[dataIndex];
vocPtr = &b_int[vocIndex];

for (int k=0; k<descrSize; ++k)
{
multiply = *dataPtr++-*vocPtr++;
distance += multiply*multiply;
// If the distance is greater than the previously calculated, exit
if (distance>min_distance)
break;
}

// if distance smaller
if (distance<min_distance)
{
min_distance = distance;
(*indexes)[i] = j;
}
vocIndex+=descrSize;
}
dataIndex+=descrSize;
vocIndex=0;
}
}

Весь процесс теперь составляет 0,6, а начальные циклы — 0,001 секунды. Может я что то не так сделал?

EDIT2: что-нибудь об Эйгене? Когда я ищу внешних библиотек, они всегда говорят об Эйгене и его скорости. Я сделал что-то не так? Вот простой код с использованием Eigen, который показывает, что это не так быстро. Может быть, мне не хватает какой-либо конфигурации или флаг, или …

MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;

Этот код составляет около 0,9 секунд.

6

Решение

Как вы заметили, в вашем коде преобладает матричный продукт, представляющий около 2.8e9 арифметических операций. Йопу говорит, что Matlab (точнее, высоко оптимизированный MKL) вычисляет его примерно за 0,05 с. Это соответствует скорости 57 GFLOPS, показывающей, что она использует не только векторизацию, но и многопоточность. С Eigen вы можете включить многопоточность путем компиляции с включенным OpenMP (-fopenmp с gcc). На моем 5-летнем компьютере (2,66 ГГц Core2), используя float и 4 потока, ваш продукт занимает около 0,053 с и 0,16 с без OpenMP, поэтому с флагами компиляции должно быть что-то не так. Подводя итог, чтобы получить лучшее из Eigen:

  • компилировать в режиме 64бит
  • использовать числа с плавающей запятой (удваивается вдвое медленнее из-за векторизации)
  • включить OpenMP
  • если ваш процессор имеет гиперпоточность, либо отключите его, либо определите OMP_NUM_THREADS переменная окружения в зависимости от количества физических ядер (это очень важно, иначе производительность будет очень плохой!)
  • если у вас запущена другая задача, было бы неплохо уменьшить OMP_NUM_THREADS в nb_cores-1
  • используйте самый последний компилятор, который вы можете, GCC, clang и ICC лучше, MSVC обычно медленнее.
3

Другие решения

Одна вещь, которая определенно ранит вас в вашем коде на C ++, это то, что он содержит множество преобразований char в int. Под лодкой я подразумеваю до 2 * 2782 * 4000 * 128 символов в int. Те char в int преобразования медленные, очень медленные.

Вы можете уменьшить это до (2782 + 4000) * 128 таких преобразований, выделив пару int массивы, один 2782 * 128 и другой 4000 * 128, для хранения содержимого приведения к целому числу вашего char* a а также char* b массивы. Работать с этими int* массивы, а не ваш char* массивы.

Еще одной проблемой может быть использование вами int против long, Я не работаю над окнами, так что это может быть неприменимо. На машинах, на которых я работаю, int 32 бита и long сейчас 64 бита. 32 бита более чем достаточно, потому что 255 * 255 * 128 < 256 * 256 * 128 = 223.

Это, очевидно, не проблема.

Поразительно, что рассматриваемый код не вычисляет тот огромный массив 2728 на 4000, который создает код Matlab. Еще более поразительно то, что Matlab, скорее всего, делает это с двойными числами, а не с целыми числами — и он все еще бьет штаны из кода C / C ++.

Одна большая проблема — кеш. Этот массив 4000 * 128 слишком велик для кеша уровня 1, и вы перебираете этот большой массив 2782 раза. Ваш код делает слишком много ожидания в памяти. Чтобы преодолеть эту проблему, работайте с меньшими кусками b массив, чтобы ваш код работал с кешем 1-го уровня как можно дольше.

Еще одна проблема — оптимизация if (distance>min_distance) break;, Я подозреваю, что это на самом деле дез-оптимизация. имеющий if тесты внутри вашего внутреннего цикла часто плохая идея. Взрыв через этот внутренний продукт как можно быстрее. Помимо ненужных вычислений, нет никакого вреда в избавлении от этого теста. Иногда лучше сделать явно ненужные вычисления, если это может удалить ветку в самом внутреннем цикле. Это один из тех случаев. Вы можете решить свою проблему, просто исключив этот тест. Попробуйте сделать это.

Возвращаясь к проблеме с кешем, вам нужно избавиться от этой ветки, чтобы можно было разделить операции над a а также b Матрица на более мелкие куски, куски не более 256 строк одновременно. Именно столько строк из 128 неподписанных символов помещается в один из двух современных кэшей L1 современного чипа Intel. Поскольку 250 делит 4000, рассмотрим логическое разбиение, которое b матрица в 16 кусков. Возможно, вы захотите сформировать этот большой массив 2872 на 4000 внутренних продуктов, но делайте это небольшими кусками. Вы можете добавить это if (distance>min_distance) break; обратно, но делайте это на уровне фрагмента, а не на уровне байтов.

Вы должны быть в состоянии победить Matlab, потому что он почти наверняка работает с двойными числами, но вы можете работать с неподписанными символами и целыми числами.

2

Матрица умножения обычно использует наихудший из возможных шаблонов доступа к кэшу для одной из двух матриц, и решение состоит в том, чтобы транспонировать одну из матриц и использовать специализированный алгоритм умножения, который работает с данными, хранящимися таким образом.

Ваша матрица уже хранится транспонированной. Переставив его в нормальный порядок, а затем используя нормальное умножение матриц, вы абсолютно убиваете производительность.

Напишите свой собственный цикл умножения матриц, который инвертирует порядок индексов во вторую матрицу (что приводит к ее транспонированию, фактически ничего не перемещая и не нарушая поведение кэша). И передайте вашему компилятору любые опции, которые он имеет для включения автоматической векторизации.

1
По вопросам рекламы [email protected]