Я не нашел эту конкретную тему нигде …
Я вызываю алгоритм nth_element () около 400 000 раз в секунду для разных данных в std :: vector из 23 целых чисел, более точных значений типа unsigned short.
Я хочу повысить скорость вычислений, и этот конкретный вызов требует значительной части процессорного времени.
Теперь, как и в случае с std :: sort (), я заметил, что функция nth_element видна в профилировщике даже при самом высоком уровне оптимизации и режиме NDEBUG (компилятор Linux Clang), поэтому сравнение является встроенным, но не сам вызов функции. Что ж, более точный пример: видна не nth_element (), а std :: __ introselect ().
Поскольку размер данных невелик, я экспериментировал с использованием функции квадратичной сортировки PIKSORT, которая часто выполняется быстрее, чем вызов std :: sort, когда размер данных меньше 20 элементов, возможно потому, что функция будет встроенной.
template <class CONTAINER>
inline void piksort(CONTAINER& arr) // indeed this is "insertion sort"{
typename CONTAINER::value_type a;
const int n = (int)arr.size();
for (int j = 1; j<n; ++j) {
a = arr[j];
int i = j;
while (i > 0 && a < arr[i - 1]) {
arr[i] = arr[i - 1];
i--;
}
arr[i] = a;
}
}
Однако это было медленнее, чем использование nth_element в этом случае.
Кроме того, использование статистического метода не подходит, Что-то быстрее, чем std :: nth_element
Наконец, поскольку значения находятся в диапазоне от 0 до около 20000, метод гистограммы не выглядит подходящим.
Мой вопрос: кто-нибудь знает простое решение этого? Я думаю, что я, вероятно, не единственный, кому приходится часто вызывать std :: sort или nth_element.
Вы упомянули, что размер массива всегда был известен 23
, Кроме того, используемый тип unsigned short
, В этом случае вы можете попытаться использовать сортировочную сеть размера 23
; так как ваш тип unsigned short
сортировка всего массива с помощью сети сортировки может быть даже быстрее, чем частичная сортировка с помощью std::nth_element
, Вот очень простая реализация C ++ 14 сортировочной сети размера 23
с 118
сравнить-обменять единицы, как описано Использование симметрии и эволюционного поиска для минимизации сортировки сетей:
template<typename RandomIt, typename Compare = std::less<>>
void network_sort23(RandomIt first, Compare compare={})
{
swap_if(first[1u], first[20u], compare);
swap_if(first[2u], first[21u], compare);
swap_if(first[5u], first[13u], compare);
swap_if(first[9u], first[17u], compare);
swap_if(first[0u], first[7u], compare);
swap_if(first[15u], first[22u], compare);
swap_if(first[4u], first[11u], compare);
swap_if(first[6u], first[12u], compare);
swap_if(first[10u], first[16u], compare);
swap_if(first[8u], first[18u], compare);
swap_if(first[14u], first[19u], compare);
swap_if(first[3u], first[8u], compare);
swap_if(first[4u], first[14u], compare);
swap_if(first[11u], first[18u], compare);
swap_if(first[2u], first[6u], compare);
swap_if(first[16u], first[20u], compare);
swap_if(first[0u], first[9u], compare);
swap_if(first[13u], first[22u], compare);
swap_if(first[5u], first[15u], compare);
swap_if(first[7u], first[17u], compare);
swap_if(first[1u], first[10u], compare);
swap_if(first[12u], first[21u], compare);
swap_if(first[8u], first[19u], compare);
swap_if(first[17u], first[22u], compare);
swap_if(first[0u], first[5u], compare);
swap_if(first[20u], first[21u], compare);
swap_if(first[1u], first[2u], compare);
swap_if(first[18u], first[19u], compare);
swap_if(first[3u], first[4u], compare);
swap_if(first[21u], first[22u], compare);
swap_if(first[0u], first[1u], compare);
swap_if(first[19u], first[22u], compare);
swap_if(first[0u], first[3u], compare);
swap_if(first[12u], first[13u], compare);
swap_if(first[9u], first[10u], compare);
swap_if(first[6u], first[15u], compare);
swap_if(first[7u], first[16u], compare);
swap_if(first[8u], first[11u], compare);
swap_if(first[11u], first[14u], compare);
swap_if(first[4u], first[11u], compare);
swap_if(first[6u], first[8u], compare);
swap_if(first[14u], first[16u], compare);
swap_if(first[17u], first[20u], compare);
swap_if(first[2u], first[5u], compare);
swap_if(first[9u], first[12u], compare);
swap_if(first[10u], first[13u], compare);
swap_if(first[15u], first[18u], compare);
swap_if(first[10u], first[11u], compare);
swap_if(first[4u], first[7u], compare);
swap_if(first[20u], first[21u], compare);
swap_if(first[1u], first[2u], compare);
swap_if(first[7u], first[15u], compare);
swap_if(first[3u], first[9u], compare);
swap_if(first[13u], first[19u], compare);
swap_if(first[16u], first[18u], compare);
swap_if(first[8u], first[14u], compare);
swap_if(first[4u], first[6u], compare);
swap_if(first[18u], first[21u], compare);
swap_if(first[1u], first[4u], compare);
swap_if(first[19u], first[21u], compare);
swap_if(first[1u], first[3u], compare);
swap_if(first[9u], first[10u], compare);
swap_if(first[11u], first[13u], compare);
swap_if(first[2u], first[6u], compare);
swap_if(first[16u], first[20u], compare);
swap_if(first[4u], first[9u], compare);
swap_if(first[13u], first[18u], compare);
swap_if(first[19u], first[20u], compare);
swap_if(first[2u], first[3u], compare);
swap_if(first[18u], first[20u], compare);
swap_if(first[2u], first[4u], compare);
swap_if(first[5u], first[17u], compare);
swap_if(first[12u], first[14u], compare);
swap_if(first[8u], first[12u], compare);
swap_if(first[5u], first[7u], compare);
swap_if(first[15u], first[17u], compare);
swap_if(first[5u], first[8u], compare);
swap_if(first[14u], first[17u], compare);
swap_if(first[3u], first[5u], compare);
swap_if(first[17u], first[19u], compare);
swap_if(first[3u], first[4u], compare);
swap_if(first[18u], first[19u], compare);
swap_if(first[6u], first[10u], compare);
swap_if(first[11u], first[16u], compare);
swap_if(first[13u], first[16u], compare);
swap_if(first[6u], first[9u], compare);
swap_if(first[16u], first[17u], compare);
swap_if(first[5u], first[6u], compare);
swap_if(first[4u], first[5u], compare);
swap_if(first[7u], first[9u], compare);
swap_if(first[17u], first[18u], compare);
swap_if(first[12u], first[15u], compare);
swap_if(first[14u], first[15u], compare);
swap_if(first[8u], first[12u], compare);
swap_if(first[7u], first[8u], compare);
swap_if(first[13u], first[15u], compare);
swap_if(first[15u], first[17u], compare);
swap_if(first[5u], first[7u], compare);
swap_if(first[9u], first[10u], compare);
swap_if(first[10u], first[14u], compare);
swap_if(first[6u], first[11u], compare);
swap_if(first[14u], first[16u], compare);
swap_if(first[15u], first[16u], compare);
swap_if(first[6u], first[7u], compare);
swap_if(first[10u], first[11u], compare);
swap_if(first[9u], first[12u], compare);
swap_if(first[11u], first[13u], compare);
swap_if(first[13u], first[14u], compare);
swap_if(first[8u], first[9u], compare);
swap_if(first[7u], first[8u], compare);
swap_if(first[14u], first[15u], compare);
swap_if(first[9u], first[10u], compare);
swap_if(first[8u], first[9u], compare);
swap_if(first[12u], first[14u], compare);
swap_if(first[11u], first[12u], compare);
swap_if(first[12u], first[13u], compare);
swap_if(first[10u], first[11u], compare);
swap_if(first[11u], first[12u], compare);
}
swap_if
функция полезности сравнивает два параметра x
а также y
с предикатом compare
и меняет их, если compare(y, x)
, Мой пример использует общий swap_if
функция, но вы можете использовать оптимизированную версию, если вы знаете, что вы будете сравнивать unsigned short
значения с operator<
в любом случае (вам может не понадобиться такая функция, если ваш компилятор распознает и оптимизирует сравнение-обмен, но, к сожалению, не все компиляторы делают это — я использую g ++ 5.2 с -O3
а мне еще нужна следующая функция для производительности):
void swap_if(unsigned short& x, unsigned short& y)
{
unsigned short dx = x;
unsigned short dy = y;
unsigned short tmp = x = std::min(dx, dy);
y ^= dx ^ tmp;
}
Теперь, чтобы убедиться, что это действительно быстрее, я решил время std::nth_element
когда требуется частичная сортировка только первые 10 элементов против сортировки целых 23 элементов с помощью сортировочной сети (1000000 раз с различными перемешанными массивами). Вот что я получаю:
std::nth_element 1158ms
network_sort23 487ms
Тем не менее, мой компьютер работает некоторое время и работает немного медленно, но разница в производительности невелика. Я считаю, что эта разница останется такой же, когда я перезагружу свой компьютер. Я могу попробовать это позже и сообщить вам.
Что касается того, как эти времена были сгенерированы, я использовал модифицированную версию этот ориентир от моего библиотека cpp-sort. Оригинальная сортировочная сеть и swap_if
оттуда также поступают функции, так что вы можете быть уверены, что они были проверены более одного раза 🙂
РЕДАКТИРОВАТЬ: вот результаты теперь, когда я перезагрузил свой компьютер. network_sort23
версия еще в два раза быстрее чем std::nth_element
:
std::nth_element 369ms
network_sort23 154ms
EDIT²: если все, что вам нужно в медиане, вы можете тривиально удалить единицы сравнения-обмена, которые не нужны для вычисления окончательного значения, которое будет на 11-й позиции. Результирующая сеть для определения медианы размера 23, которая следует ниже, использует сеть сортировки размера 23, отличную от предыдущей, и дает несколько лучшие результаты:
swap_if(first[0u], first[1u], compare);
swap_if(first[2u], first[3u], compare);
swap_if(first[4u], first[5u], compare);
swap_if(first[6u], first[7u], compare);
swap_if(first[8u], first[9u], compare);
swap_if(first[10u], first[11u], compare);
swap_if(first[1u], first[3u], compare);
swap_if(first[5u], first[7u], compare);
swap_if(first[9u], first[11u], compare);
swap_if(first[0u], first[2u], compare);
swap_if(first[4u], first[6u], compare);
swap_if(first[8u], first[10u], compare);
swap_if(first[1u], first[2u], compare);
swap_if(first[5u], first[6u], compare);
swap_if(first[9u], first[10u], compare);
swap_if(first[1u], first[5u], compare);
swap_if(first[6u], first[10u], compare);
swap_if(first[5u], first[9u], compare);
swap_if(first[2u], first[6u], compare);
swap_if(first[1u], first[5u], compare);
swap_if(first[6u], first[10u], compare);
swap_if(first[0u], first[4u], compare);
swap_if(first[7u], first[11u], compare);
swap_if(first[3u], first[7u], compare);
swap_if(first[4u], first[8u], compare);
swap_if(first[0u], first[4u], compare);
swap_if(first[7u], first[11u], compare);
swap_if(first[1u], first[4u], compare);
swap_if(first[7u], first[10u], compare);
swap_if(first[3u], first[8u], compare);
swap_if(first[2u], first[3u], compare);
swap_if(first[8u], first[9u], compare);
swap_if(first[2u], first[4u], compare);
swap_if(first[7u], first[9u], compare);
swap_if(first[3u], first[5u], compare);
swap_if(first[6u], first[8u], compare);
swap_if(first[3u], first[4u], compare);
swap_if(first[5u], first[6u], compare);
swap_if(first[7u], first[8u], compare);
swap_if(first[12u], first[13u], compare);
swap_if(first[14u], first[15u], compare);
swap_if(first[16u], first[17u], compare);
swap_if(first[18u], first[19u], compare);
swap_if(first[20u], first[21u], compare);
swap_if(first[13u], first[15u], compare);
swap_if(first[17u], first[19u], compare);
swap_if(first[12u], first[14u], compare);
swap_if(first[16u], first[18u], compare);
swap_if(first[20u], first[22u], compare);
swap_if(first[13u], first[14u], compare);
swap_if(first[17u], first[18u], compare);
swap_if(first[21u], first[22u], compare);
swap_if(first[13u], first[17u], compare);
swap_if(first[18u], first[22u], compare);
swap_if(first[17u], first[21u], compare);
swap_if(first[14u], first[18u], compare);
swap_if(first[13u], first[17u], compare);
swap_if(first[18u], first[22u], compare);
swap_if(first[12u], first[16u], compare);
swap_if(first[15u], first[19u], compare);
swap_if(first[16u], first[20u], compare);
swap_if(first[12u], first[16u], compare);
swap_if(first[13u], first[16u], compare);
swap_if(first[19u], first[22u], compare);
swap_if(first[15u], first[20u], compare);
swap_if(first[14u], first[15u], compare);
swap_if(first[20u], first[21u], compare);
swap_if(first[14u], first[16u], compare);
swap_if(first[19u], first[21u], compare);
swap_if(first[15u], first[17u], compare);
swap_if(first[18u], first[20u], compare);
swap_if(first[15u], first[16u], compare);
swap_if(first[17u], first[18u], compare);
swap_if(first[19u], first[20u], compare);
swap_if(first[0u], first[12u], compare);
swap_if(first[2u], first[14u], compare);
swap_if(first[4u], first[16u], compare);
swap_if(first[6u], first[18u], compare);
swap_if(first[8u], first[20u], compare);
swap_if(first[10u], first[22u], compare);
swap_if(first[2u], first[12u], compare);
swap_if(first[10u], first[20u], compare);
swap_if(first[4u], first[12u], compare);
swap_if(first[6u], first[14u], compare);
swap_if(first[8u], first[16u], compare);
swap_if(first[10u], first[18u], compare);
swap_if(first[8u], first[12u], compare);
swap_if(first[10u], first[14u], compare);
swap_if(first[10u], first[12u], compare);
swap_if(first[1u], first[13u], compare);
swap_if(first[3u], first[15u], compare);
swap_if(first[5u], first[17u], compare);
swap_if(first[7u], first[19u], compare);
swap_if(first[9u], first[21u], compare);
swap_if(first[3u], first[13u], compare);
swap_if(first[11u], first[21u], compare);
swap_if(first[5u], first[13u], compare);
swap_if(first[7u], first[15u], compare);
swap_if(first[9u], first[17u], compare);
swap_if(first[11u], first[19u], compare);
swap_if(first[9u], first[13u], compare);
swap_if(first[11u], first[15u], compare);
swap_if(first[11u], first[13u], compare);
swap_if(first[11u], first[12u], compare);
Вероятно, существуют более разумные способы создания сетей для поиска медианы, но я не думаю, что по этому вопросу были проведены обширные исследования. Поэтому это, вероятно, лучший метод, который вы можете использовать на данный момент. Результат не впечатляет, но он все равно использует 104 единицы сравнения-сравнения вместо 118.
Глядя на исходный код std::nth_element
в MSVC2013, кажется, что случаи N <= 32 решаются путем вставки сортировки. Это означает, что разработчики STL поняли, что выполнение рандомизированных разделов будет медленнее, несмотря на лучшую асимптотику для этих размеров.
Одним из способов повышения производительности является оптимизация алгоритма сортировки. @ Morwenn’s answer показывает, как сортировать 23 элемента с помощью сортировочной сети, которая, как известно, является одним из самых быстрых способов сортировки небольших массивов постоянного размера.
Я исследую другой способ — вычисление медианы без алгоритма сортировки. На самом деле, я не буду переставлять входной массив вообще.
Поскольку речь идет о небольших массивах, нам нужно реализовать некоторые O (N ^ 2) Алгоритм самым простым способом. В идеале у него не должно быть ветвей вообще или только хорошо предсказуемых ветвей. Кроме того, простая структура алгоритма может позволить нам векторизовать его, еще больше улучшая его производительность.
Я решил следовать методу подсчета, который был использован Вот ускорить небольшой линейный поиск. Прежде всего, предположим, что все элементы разные. Выберите любой элемент массива: количество элементов меньше, чем он определяет свою позицию в отсортированном массиве. Мы можем перебрать все элементы, и для каждого из них рассчитать количество элементов меньше его. Если отсортированный индекс имеет желаемое значение, мы можем остановить алгоритм.
К сожалению, в общем случае могут быть равные элементы. Нам придется сделать наш алгоритм значительно медленнее и сложнее для их обработки. Вместо того, чтобы вычислять уникальный отсортированный индекс элемента, мы можем вычислить интервал возможных отсортированных индексов для него. Для любого элемента достаточно посчитать количество элементов меньше его (L) и количество равных ему элементов (Е), затем отсортированный индекс соответствует диапазону [L, L + R). Если этот интервал содержит желаемый отсортированный индекс (т.е. N / 2), тогда мы можем остановить алгоритм и вернуть рассмотренный элемент.
for (size_t i = 0; i < n; i++) {
auto x = arr[i];
//count number of "less" and "equal" elements
int cntLess = 0, cntEq = 0;
for (size_t j = 0; j < n; j++) {
cntLess += arr[j] < x;
cntEq += arr[j] == x;
}
//fast range checking from here: https://stackoverflow.com/a/17095534/556899
if ((unsigned int)(idx - cntLess) < cntEq)
return x;
}
Построенный алгоритм имеет только одну ветвь, что довольно предсказуемо: он терпит неудачу во всех случаях, кроме единственного случая, когда мы останавливаем алгоритм. Алгоритм легко векторизовать, используя 8 элементов на регистр SSE. Поскольку нам нужно получить доступ к некоторым элементам после последнего, я предполагаю, что входной массив дополняется макс = 2 ^ 15-1 значения до 24 или 32 элементов.
Первый способ заключается в векторизации внутреннего цикла путем j
, В этом случае внутренний цикл будет выполняться только 3 раза, но после его завершения необходимо выполнить два сокращения на 8. Они едят больше времени, чем сама внутренняя петля. В результате такая векторизация не очень эффективна.
Второй способ заключается в векторизации внешнего цикла путем i
, В этом случае мы обрабатываем 8 элементов x = arr[i]
однажды. Для каждого пакета мы сравниваем его с каждым элементом arr[j]
во внутренней петле. После внутреннего цикла мы выполняем векторизованную проверку диапазона для всего пакета из 8 элементов. Если какой-либо из них удастся, мы определяем точное число с помощью простого скалярного кода (в любом случае, он мало ест).
__m128i idxV = _mm_set1_epi16(idx);
for (size_t i = 0; i < n; i += 8) {
//load pack of 8 elements
auto xx = _mm_loadu_si128((__m128i*)&arr[i]);
//count number of less/equal elements for each element in the pack
__m128i cntLess = _mm_setzero_si128();
__m128i cntEq = _mm_setzero_si128();
for (size_t j = 0; j < n; j++) {
__m128i vAll = _mm_set1_epi16(arr[j]);
cntLess = _mm_sub_epi16(cntLess, _mm_cmplt_epi16(vAll, xx));
cntEq = _mm_sub_epi16(cntEq, _mm_cmpeq_epi16(vAll, xx));
}
//perform range check for 8 elements at once
__m128i mask = _mm_andnot_si128(_mm_cmplt_epi16(idxV, cntLess), _mm_cmplt_epi16(idxV, _mm_add_epi16(cntLess, cntEq)));
if (int bm = _mm_movemask_epi8(mask)) {
//range check succeeds for one of the elements, find and return it
for (int t = 0; t < 8; t++)
if (bm & (1 << (2*t)))
return arr[i + t];
}
}
Здесь мы видим _mm_set1_epi16
свойственный внутреннему циклу. GCC, похоже, имеет некоторые проблемы с производительностью. В любом случае, это время, потребляемое на каждой внутренней итерации, которое может быть уменьшено, если мы обработаем одновременно 8 элементов в самом внутреннем цикле. В таком случае мы можем сделать одну векторизованную загрузку и 14 распаковать инструкции для получения vAll
для восьми элементов. Кроме того, нам придется написать код сравнения и подсчета для восьми элементов в теле цикла, поэтому он также действует как 8-кратное развертывание. Полученный код является самым быстрым, ссылку на него можно найти ниже.
Я протестировал различные решения на процессоре Ivy Bridge 3,4 ГГц. Ниже вы можете увидеть общее время вычислений для 2 ^ 23 ~ = 8 млн. звонки в секундах (первый номер). Второе число — контрольная сумма результатов.
Результаты на MSVC 2013 x64 (/ O2):
memcpy only: 0.020
std::nth_element: 2.110 (1186136064)
network sort: 0.630 (1186136064) //solution by @Morwenn (I had to change swap_if)
trivial count: 2.266 (1186136064) //scalar algorithm (presented above)
vectorized count: 0.692 (1186136064) //vectorization by j
vectorized count (T): 0.602 (1186136064) //vectorization by i (presented above)
vectorized count (both): 0.450 (1186136064) //vectorization by i and j
Результаты на MinGW GCC 4.8.3 x64 (-O3 -msse4):
memcpy only: 0.016
std::nth_element: 1.981 (1095237632)
network sort: 0.531 (1095237632) //original swap_if used
trivial count: 1.482 (1095237632)
vectorized count: 0.655 (1095237632)
vectorized count (T): 2.668 (1095237632) //GCC generates some crap
vectorized count (both): 0.374 (1095237632)
Как вы видите, предложенный векторизованный алгоритм для 23 16-битных элементов немного быстрее, чем подход, основанный на сортировке (кстати, на старом ЦП я вижу только 5% разницу во времени).
Если вы можете гарантировать, что все элементы разные, вы можете упростить алгоритм, сделав его еще быстрее.
Полный код всех алгоритмов доступен Вот, включая весь код тестирования.
Мне эта проблема показалась интересной, поэтому я попробовал все алгоритмы, которые только мог придумать.
Вот результаты:
testing 100000 repetitions
variant 0, no-op (for overhead measure)
5 ms
variant 1, vector + nth_element
205 ms
variant 2, multiset + advance
745 ms
variant 2b, set (not fully conformant)
787 ms
variant 3, list + lower_bound
589 ms
variant 3b, list + block-allocator
269 ms
variant 4, avl-tree + insert_sorted
645 ms
variant 4b, avl-tree + prune
682 ms
variant 5, histogram
1429 ms
Я думаю, что мы можем сделать вывод, что вы уже используете самый быстрый Мальчик был я не прав. Однако, если вы можете принять приблизительный ответ,
алгоритм.
Есть, вероятно, более быстрые способы, такие как медиана медиан.
Если вы заинтересованы, источник Вот.