Я могу видеть с помощью профилировщика процессора, что compute_variances()
является узким местом моего проекта.
% cumulative self self total
time seconds seconds calls ms/call ms/call name
75.63 5.43 5.43 40 135.75 135.75 compute_variances(unsigned int, std::vector<Point, std::allocator<Point> > const&, float*, float*, unsigned int*)
19.08 6.80 1.37 readDivisionSpace(Division_Euclidean_space&, char*)
...
Вот тело функции:
void compute_variances(size_t t, const std::vector<Point>& points, float* avg,
float* var, size_t* split_dims) {
for (size_t d = 0; d < points[0].dim(); d++) {
avg[d] = 0.0;
var[d] = 0.0;
}
float delta, n;
for (size_t i = 0; i < points.size(); ++i) {
n = 1.0 + i;
for (size_t d = 0; d < points[0].dim(); ++d) {
delta = (points[i][d]) - avg[d];
avg[d] += delta / n;
var[d] += delta * ((points[i][d]) - avg[d]);
}
}
/* Find t dimensions with largest scaled variance. */
kthLargest(var, points[0].dim(), t, split_dims);
}
где kthLargest()
не кажется проблемой, так как я вижу это:
0.00 7.18 0.00 40 0.00 0.00 kthLargest(float*, int, int, unsigned int*)
compute_variances()
принимает вектор векторов с плавающей точкой (то есть вектор Points
, где Points
это класс, который я реализовал) и вычисляет их дисперсию в каждом измерении (с учетом алгоритма Кнута).
Вот как я вызываю функцию:
float avg[(*points)[0].dim()];
float var[(*points)[0].dim()];
size_t split_dims[t];
compute_variances(t, *points, avg, var, split_dims);
Вопрос в том, могу ли я сделать лучше? Я был бы очень рад заплатить компромисс между скоростью и приблизительным вычислением отклонений. Или, может быть, я мог бы сделать код более дружественным к кешу или что-то?
Я скомпилировал так:
g++ main_noTime.cpp -std=c++0x -p -pg -O3 -o eg
Обратите внимание, что перед редактированием я использовал -o3
не с большой буквы «о». Благодаря ypnos, Я скомпилировал сейчас с флагом оптимизации -O3
, Я уверен, что между ними была разница, так как я выполнил измерения времени с одним из эти методы в моем псевдо-сайте.
Обратите внимание, что сейчас, compute_variances
доминирует в общем времени проекта!
copute_variances()
называется 40 раз.
На 10 вызовов выполняются следующие условия:
points.size() = 1000 and points[0].dim = 10000
points.size() = 10000 and points[0].dim = 100
points.size() = 10000 and points[0].dim = 10000
points.size() = 100000 and points[0].dim = 100
Каждый вызов обрабатывает разные данные.
Q: Насколько быстрый доступ к points[i][d]
?
A: point[i]
это просто i-й элемент std :: vector, где второй []
, как это реализовано в Point
учебный класс.
const FT& operator [](const int i) const {
if (i < (int) coords.size() && i >= 0)
return coords.at(i);
else {
std::cout << "Error at Point::[]" << std::endl;
exit(1);
}
return coords[0]; // Clear -Wall warning
}
где coords
это std::vector
из float
ценности. Это кажется немного тяжелым, но не должен ли компилятор быть достаточно умным, чтобы правильно предсказать, что ветвь всегда верна? (Я имею ввиду после холодного старта). Кроме того, std::vector.at()
должно быть постоянным временем (как сказано в ссылка). Я изменил это, чтобы иметь только .at()
в теле функции и измерения времени остались, в значительной степени, то же самое.
деление в compute_variances()
это наверняка что-то тяжелое! Однако алгоритм Кнута был численно устойчивым, и я не смог найти другой алгоритм, который был бы как численно устойчивым, так и без деления.
Обратите внимание, что я не заинтересован в параллелизм прямо сейчас.
[EDIT.2]Минимальный пример Point
класс (думаю, я не забыл показать что-то):
class Point {
public:
typedef float FT;
...
/**
* Get dimension of point.
*/
size_t dim() const {
return coords.size();
}
/**
* Operator that returns the coordinate at the given index.
* @param i - index of the coordinate
* @return the coordinate at index i
*/
FT& operator [](const int i) {
return coords.at(i);
//it's the same if I have the commented code below
/*if (i < (int) coords.size() && i >= 0)
return coords.at(i);
else {
std::cout << "Error at Point::[]" << std::endl;
exit(1);
}
return coords[0]; // Clear -Wall warning*/
}
/**
* Operator that returns the coordinate at the given index. (constant)
* @param i - index of the coordinate
* @return the coordinate at index i
*/
const FT& operator [](const int i) const {
return coords.at(i);
/*if (i < (int) coords.size() && i >= 0)
return coords.at(i);
else {
std::cout << "Error at Point::[]" << std::endl;
exit(1);
}
return coords[0]; // Clear -Wall warning*/
}
private:
std::vector<FT> coords;
};
Вот что я бы сделал в предполагаемом порядке важности:
Point::operator[]
по значению, а не по ссылке.coords[i]
вместо coords.at(i)
, так как ты уже утверждают, что это в пределах границ. at
член проверяет границы. Вам нужно только проверить это один раз.Point::operator[]
с утверждением. Это то, что утверждает. Они номинально не работают в режиме релиза — я сомневаюсь, что вам нужно проверить это в коде релиза.avg
а также var
, Это может фактически удалить все ошибки кэша на avg
а также var
если prefetch работает в обратном порядке итерации, как и должно быть.std::fill
а также std::copy
может использовать выравнивание типов и иметь шанс быть быстрее, чем библиотека C memset
а также memcpy
, Point::operator[]
будет иметь шанс попасть в сборку релиза и может сократить до двух машинных инструкций (эффективное вычисление адреса и нагрузка с плавающей запятой). Это то, что ты хочешь. Конечно должно быть определенный в заголовочном файле, в противном случае вставка будет выполняться только в том случае, если вы включите генерацию кода времени соединения (например, LTO).
Обратите внимание, что Point::operator[]
Тело эквивалентно только одной строке
return coords.at(i)
в отладочной сборке. В релизе постройте все Тело эквивалентно return coords[i]
, не return coords.at(i)
,
FT Point::operator[](int i) const {
assert(i >= 0 && i < (int)coords.size());
return coords[i];
}
const FT * Point::constData() const {
return &coords[0];
}
void compute_variances(size_t t, const std::vector<Point>& points, float* avg,
float* var, size_t* split_dims)
{
assert(points.size() > 0);
const int D = points[0].dim();
// i = 0, i_n = 1
assert(D > 0);
#if __cplusplus >= 201103L
std::copy_n(points[0].constData(), D, avg);
#else
std::copy(points[0].constData(), points[0].constData() + D, avg);
#endif
// i = 1, i_n = 0.5
if (points.size() >= 2) {
assert(points[1].dim() == D);
for (int d = D - 1; d >= 0; --d) {
float const delta = points[1][d] - avg[d];
avg[d] += delta * 0.5f;
var[d] = delta * (points[1][d] - avg[d]);
}
} else {
std::fill_n(var, D, 0.0f);
}
// i = 2, ...
for (size_t i = 2; i < points.size(); ) {
{
const float i_n = 1.0f / (1.0f + i);
assert(points[i].dim() == D);
for (int d = 0; d < D; ++d) {
float const delta = points[i][d] - avg[d];
avg[d] += delta * i_n;
var[d] += delta * (points[i][d] - avg[d]);
}
}
++ i;
if (i >= points.size()) break;
{
const float i_n = 1.0f / (1.0f + i);
assert(points[i].dim() == D);
for (int d = D - 1; d >= 0; --d) {
float const delta = points[i][d] - avg[d];
avg[d] += delta * i_n;
var[d] += delta * (points[i][d] - avg[d]);
}
}
++ i;
}
/* Find t dimensions with largest scaled variance. */
kthLargest(var, D, t, split_dims);
}
1. SIMD
Одним из простых способов для этого является использование векторных инструкций (SIMD) для вычислений. На x86 это означает SSE, инструкции AVX. В зависимости от длины слова и процессора вы можете получить ускорение примерно в 4 раза или даже больше. Этот код здесь:
for (size_t d = 0; d < points[0].dim(); ++d) {
delta = (points[i][d]) - avg[d];
avg[d] += delta / n;
var[d] += delta * ((points[i][d]) - avg[d]);
}
можно ускорить, выполнив вычисления для четырех элементов одновременно с помощью SSE. Поскольку ваш код действительно обрабатывает только один элемент в каждой итерации цикла, узкого места нет. Если вместо 32-битного числа с плавающей запятой перейти к 16-битному короткому (приблизительное значение), вы можете поместить в одну инструкцию восемь элементов. С AVX это было бы еще больше, но для этого нужен новейший процессор.
Это не решение вашей проблемы с производительностью, но только один из них, который также может быть объединен с другими.
2. Микропараллелизм
Второе простое ускорение, когда у вас столько циклов, — это использование параллельной обработки. Я обычно использую Intel TBB, другие могут предложить вместо OpenMP. Для этого вам, вероятно, придется изменить порядок циклов. Так что распараллелить над d во внешнем цикле, а не над i.
Вы можете комбинировать обе техники, и если вы все сделаете правильно, на четырехъядерных процессорах с HT вы можете получить ускорение на 25-30 для комбинации без потери точности.
3. Оптимизация компилятора
Прежде всего, может быть, это просто опечатка здесь на SO, но это должно быть -O3
не -o3
!
Как общее примечание, компилятору может быть проще оптимизировать ваш код, если вы объявите переменные delta, n в той области, где вы их фактически используете. Вы также должны попробовать -funroll-loops
опция компилятора, а также -march
, Вариант с последним зависит от вашего процессора, но в настоящее время обычно -march core2
Это хорошо (также для недавних AMD) и включает в себя оптимизацию SSE (но я бы пока не доверял компилятору делать это для вашего цикла).
Большая проблема с вашей структурой данных в том, что это по сути vector<vector<float> >
, Это указатель на массив указателей на массивы float
с некоторыми наворотами. В частности, доступ к последовательному Point
в vector
не соответствует доступу к последовательным ячейкам памяти. Могу поспорить, вы видите тонны и тонны промахов кэша, когда вы профилируете этот код.
Исправьте это, прежде чем дурачиться с чем-либо еще.
Проблемы более низкого порядка включают деление с плавающей точкой во внутреннем цикле (вычисление 1/n
во внешнем цикле) и большой цепочке хранения нагрузки, которая является вашим внутренним циклом. Вы можете вычислить средние значения и дисперсии срезов вашего массива, используя SIMD, и, например, объединить их в конце.
Проверка границ для каждого доступа, вероятно, тоже не поможет. Избавьтесь от этого тоже, или хотя бы выньте его из внутренней петли; не думайте, что компилятор знает, как это исправить самостоятельно.
for (size_t d = 0; d < points[0].dim(); d++) {
avg[d] = 0.0;
var[d] = 0.0;
}
Этот код можно оптимизировать, просто используя memset. Представление IEEE754 0.0 в 32 битах — 0x00000000. Если измерение большое, оно того стоит.
Что-то вроде:
memset((void*)avg, 0, points[0].dim() * sizeof(float));
В вашем коде у вас много обращений к points [0] .dim (). Было бы лучше вызвать один раз в начале функции и сохранить в переменной. Вероятно, компилятор уже делает это (так как вы используете -O3).
Операции деления намного дороже (из POV тактового цикла), чем другие операции (сложение, вычитание).
avg[d] += delta / n;
Возможно, имеет смысл попытаться уменьшить количество делений: использовать частичное некумулятивное среднее вычисление, которое приведет к тусклый операция деления для N элементов (вместо Н х Дим); N < points.size ()
Огромное ускорение может быть достигнуто с использованием Cuda или OpenCL, поскольку вычисление avg и var может выполняться одновременно для каждого измерения (рассмотрим использование графического процессора).
Еще одна оптимизация оптимизация кеша включая оба кеш данных а также кеш инструкций.
Методы оптимизации высокого уровня
Оптимизация кэша данных
Пример оптимизации кеша данных & разворачивание
for (size_t d = 0; d < points[0].dim(); d += 4)
{
// Perform loading all at once.
register const float p1 = points[i][d + 0];
register const float p2 = points[i][d + 1];
register const float p3 = points[i][d + 2];
register const float p4 = points[i][d + 3];
register const float delta1 = p1 - avg[d+0];
register const float delta2 = p2 - avg[d+1];
register const float delta3 = p3 - avg[d+2];
register const float delta4 = p4 - avg[d+3];
// Perform calculations
avg[d + 0] += delta1 / n;
var[d + 0] += delta1 * ((p1) - avg[d + 0]);
avg[d + 1] += delta2 / n;
var[d + 1] += delta2 * ((p2) - avg[d + 1]);
avg[d + 2] += delta3 / n;
var[d + 2] += delta3 * ((p3) - avg[d + 2]);
avg[d + 3] += delta4 / n;
var[d + 3] += delta4 * ((p4) - avg[d + 3]);
}
Это отличается от классического развертывание петли при этом загрузка из матрицы выполняется как группа вверху цикла.
Изменить 1:
Тонкая оптимизация данных заключается в размещении avg
а также var
в структуру. Это гарантирует, что два массива находятся рядом друг с другом в памяти, без заполнения. Механизм извлечения данных в процессорах, таких как датумы, которые очень близки друг к другу. Меньше шансов на отсутствие кеша данных и больше шансов загрузить все данные в кеш.
Вы можете использовать математику с фиксированной точкой вместо математики с плавающей точкой в качестве оптимизации.
Оптимизация с помощью фиксированной точки
Процессоры любят манипулировать целыми числами (со знаком или без знака). С плавающей точкой может потребоваться дополнительная вычислительная мощность из-за извлечения частей, выполнения математических операций, а затем повторной сборки частей. Одним из смягчающих мер является использование Фиксированная точка математика
Простой пример: метры
Учитывая единицу измерения метров, можно выразить длины меньше метра, используя число с плавающей запятой, например 3,14159 м. Однако такую же длину можно выразить в единицах более мелких деталей, таких как миллиметры, например 3141,59 мм. Для более высокого разрешения выбирается меньшая единица, и значение умножается, например, 3141 590 мкм (микрометры). Суть заключается в выборе достаточно маленькой единицы, чтобы представить точность с плавающей запятой в виде целого числа.
Значение с плавающей точкой преобразуется при вводе в фиксированную точку. Вся обработка данных происходит в фиксированной точке. Значение с фиксированной точкой перед выводом преобразуется в число с плавающей точкой.
Сила 2 Фиксированная точка базы
Как и при преобразовании метров с плавающей запятой в миллиметры с фиксированной запятой, используя 1000, можно использовать степень 2 вместо 1000. Выбор степени 2 позволяет процессору использовать сдвиг бит вместо умножения или деления. Сдвиг битов на степень 2 обычно быстрее, чем умножение или деление.
В соответствии с темой и точностью до миллиметров, мы могли бы использовать 1024 в качестве базы вместо 1000. Аналогично, для более высокой точности используйте 65536 или 131072.
Резюме
Изменение конструкции или реализации на используемую математику с фиксированной точкой позволяет процессору использовать более комплексные инструкции по обработке данных, чем с плавающей запятой. Операции с плавающей запятой потребляют больше вычислительной мощности, чем интегральные операции во всех процессах, кроме специализированных. Использование степеней 2 в качестве основания (или знаменателя) позволяет коду использовать сдвиг битов вместо умножения или деления. Деление и умножение требуют больше операций, чем сдвиг, и, следовательно, сдвиг происходит быстрее. Таким образом, вместо оптимизации кода для выполнения (такого как развертывание цикла), можно попытаться использовать нотацию с фиксированной точкой, а не с плавающей точкой.
Пункт 1
Вы вычисляете среднее значение и дисперсию одновременно.
Это правильно?
Разве вам не нужно сначала вычислять среднее значение, а затем, когда вы его узнаете, рассчитать сумму квадратов разностей от среднего значения?
Помимо того, что это правильно, это скорее поможет производительности, чем повредит.
Попытка сделать две вещи в одном цикле не обязательно быстрее, чем два последовательных простых цикла.
Пункт 2
Знаете ли вы, что есть способ рассчитать среднее значение и дисперсию одновременно, например так:
double sumsq = 0, sum = 0;
for (i = 0; i < n; i++){
double xi = x[i];
sum += xi;
sumsq += xi * xi;
}
double avg = sum / n;
double avgsq = sumsq / n
double variance = avgsq - avg*avg;
Пункт 3
Внутренние циклы делают повторную индексацию.
Компилятор может быть быть в состоянии оптимизировать это до минимума, но я бы не стал ставить на это свои носки.
Пункт 4
Вы используете дргоЕ или что-то вроде этого.
Единственное достаточно надежное число, которое можно получить из этого, — это время по функциям.
Это не очень хорошо скажет вам, как время тратится внутри функции.
Я и многие другие полагаемся на Этот метод, который берет вас прямо в сердце того, что требует времени.