Я написал непрямой алгоритм радикальной сортировки в C ++ (косвенно, я имею в виду, что он возвращает индексы элементов):
#include <algorithm>
#include <iterator>
#include <vector>
template<class It1, class It2>
void radix_ipass(
It1 begin, It1 const end,
It2 const a, size_t const i,
std::vector<std::vector<size_t> > &buckets)
{
size_t ncleared = 0;
for (It1 j = begin; j != end; ++j)
{
size_t const k = a[*j][i];
while (k >= ncleared && ncleared < buckets.size())
{ buckets[ncleared++].clear(); }
if (k >= buckets.size())
{
buckets.resize(k + 1);
ncleared = buckets.size();
}
buckets[k].push_back(size_t());
using std::swap; swap(buckets[k].back(), *j);
}
for (std::vector<std::vector<size_t> >::iterator
j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
{
begin = std::swap_ranges(j->begin(), j->end(), begin);
}
}
template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)
{
for (ptrdiff_t i = 0; i != end - begin; ++i) { items[i] = i; }
size_t smax = 0;
for (It i = begin; i != end; ++i)
{
size_t const n = i->size();
smax = n > smax ? n : smax;
}
std::vector<std::vector<size_t> > buckets;
for (size_t i = 0; i != smax; ++i)
{
radix_ipass(
items, items + (end - begin),
begin, smax - i - 1, buckets);
}
}
Кажется, он работает примерно на 40% быстрее, чем std::sort
когда я проверяю его с помощью следующего кода (3920 мс по сравнению с 6530 мс):
#include <functional>
template<class Key>
struct key_comp : public Key
{
explicit key_comp(Key const &key = Key()) : Key(key) { }
template<class T>
bool operator()(T const &a, T const &b) const
{ return this->Key::operator()(a) < this->Key::operator()(b); }
};
template<class Key>
key_comp<Key> make_key_comp(Key const &key) { return key_comp<Key>(key); }
template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
{ T1 operator()(T1 a, T2 const &b) const { return a += b; } };
template<class F>
struct deref : public F
{
deref(F const &f) : F(f) { }
typename std::iterator_traits<
typename F::result_type
>::value_type const
&operator()(typename F::argument_type const &a) const
{ return *this->F::operator()(a); }
};
template<class T> deref<T> make_deref(T const &t) { return deref<T>(t); }
size_t xorshf96(void) // random number generator
{
static size_t x = 123456789, y = 362436069, z = 521288629;
x ^= x << 16;
x ^= x >> 5;
x ^= x << 1;
size_t t = x;
x = y;
y = z;
z = t ^ x ^ y;
return z;
}
#include <stdio.h>
#include <time.h>
#include <array>
int main(void)
{
typedef std::vector<std::array<size_t, 3> > Items;
Items items(1 << 24);
std::vector<size_t> ranks(items.size() * 2);
for (size_t i = 0; i != items.size(); i++)
{
ranks[i] = i;
for (size_t j = 0; j != items[i].size(); j++)
{ items[i][j] = xorshf96() & 0xFFF; }
}
clock_t const start = clock();
if (1) { radix_isort(items.begin(), items.end(), ranks.begin()); }
else // STL sorting
{
std::sort(
ranks.begin(),
ranks.begin() + items.size(),
make_key_comp(make_deref(std::bind1st(
add<Items::const_iterator, ptrdiff_t>(),
items.begin()))));
}
printf("%u ms\n",
(unsigned)((clock() - start) * 1000 / CLOCKS_PER_SEC),
std::min(ranks.begin(), ranks.end()));
return 0;
}
Хм, думаю, это лучшее, что я могу сделать, подумал я.
Но после большого количества ударов головой о стену, я понял, что предварительная выборка в начале radix_ipass
может помочь сократить результат до 1440 мс (!):
#include <xmmintrin.h>
...
for (It1 j = begin; j != end; ++j)
{
#if defined(_MM_TRANSPOSE4_PS) // should be defined if xmmintrin.h is included
enum { N = 8 };
if (end - j > N)
{ _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); }
#endif
...
}
Ясно, что узким местом является пропускная способность памяти — схема доступа непредсказуема.
Или не так много места для улучшения?
(Я надеюсь избежать возможного нарушения читабельности кода, если это возможно, поэтому, если читаемость вредит, улучшение должно быть значительным.)
Использование более компактной структуры данных, которая объединяет ранги и значения, может повысить производительность std::sort
в 2-3 раза По сути, сортировка теперь выполняется на vector<pair<Value,Rank>>
, Value
тип данных, std::array<integer_type, 3>
был заменен на это более компактным pair<uint32_t, uint8_t>
структура данных. Только половина байта этого не используется, и <
сравнение может быть сделано в два этапа, сначала с использованием предположительно эффективного сравнения uint32_t
s (не ясно, если цикл используется std::array<..>::operator<
может быть оптимизирован для такого же быстрого кода, но замена std::array<integer_type,3>
эта структура данных дала еще один прирост производительности).
Тем не менее, он не так эффективен, как радикальная сортировка. (Может быть, вы можете настроить пользовательский QuickSort с предварительными выборками?)
Помимо этого дополнительного метода сортировки, я заменил xorshf96
по mt19937
, потому что я знаю, как обеспечить семя для последнего;)
Начальное число и количество значений можно изменить с помощью двух аргументов командной строки: сначала начальное число, а затем число.
Скомпилировано с g ++ 4.9.0 20131022, используя -std=c++11 -march=native -O3
, для 64-битной Linux
Пробные прогоны; важная заметка работает на процессоре Core2Duo U9400 (старый & медленный!)
количество элементов: 16000000 используя std :: sort длительность: 12260 мс результат отсортирован: верно семя: 5648 количество элементов: 16000000 используя std :: sort длительность: 12230 мс результат отсортирован: верно семя: 5648 количество элементов: 16000000 используя std :: sort длительность: 12230 мс результат отсортирован: trueseed: 5648 количество элементов: 16000000 использование std :: sort с упакованной структурой данных длительность: 4290 мс результат отсортирован: верно семя: 5648 количество элементов: 16000000 использование std :: sort с упакованной структурой данных длительность: 4270 мс результат отсортирован: верно семя: 5648 количество элементов: 16000000 использование std :: sort с упакованной структурой данных длительность: 4280 мс результат отсортирован по количеству истинных: 16000000 используя радикальную сортировку длительность: 3790 мс результат отсортирован: верно семя: 5648 количество элементов: 16000000 используя радикальную сортировку длительность: 3820 мс результат отсортирован: верно семя: 5648 количество элементов: 16000000 используя радикальную сортировку длительность: 3780 мс результат отсортирован: верно
Новый или измененный код:
template<class It>
struct fun_obj
{
It beg;
bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
{
return beg[lhs] < beg[rhs];
}
};
template<class It>
fun_obj<It> make_fun_obj(It beg)
{
return fun_obj<It>{beg};
}
struct uint32p8_t
{
uint32_t m32;
uint8_t m8;
uint32p8_t(std::array<uint16_t, 3> const& a)
: m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
, m8( a[2]&0xFF )
{
}
operator std::array<size_t, 3>() const
{
return {{m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
(m32&0xF)<<8 | m8}};
}
friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
{
if(lhs.m32 < rhs.m32) return true;
if(lhs.m32 > rhs.m32) return false;
return lhs.m8 < rhs.m8;
}
};
#include <stdio.h>
#include <time.h>
#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>
int main(int argc, char* argv[])
{
std::cout.sync_with_stdio(false);
constexpr auto items_count_default = 2<<22;
constexpr auto seed_default = 42;
uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
std::cout << "seed: " << seed << "\n";
size_t const items_count = argc > 2 ? std::atoll(argv[2])
: items_count_default;
std::cout << "item count: " << items_count << "\n";
using Items_array_value_t =
#ifdef RADIX_SORT
size_t
#elif defined(STDSORT)
uint16_t
#elif defined(STDSORT_PACKED)
uint16_t
#endif
;
typedef std::vector<std::array<Items_array_value_t, 3> > Items;
Items items(items_count);
auto const ranks_count =
#ifdef RADIX_SORT
items.size() * 2
#elif defined(STDSORT)
items.size()
#elif defined(STDSORT_PACKED)
items.size()
#endif
;
//auto prng = xorshf96;
std::mt19937 gen(seed);
std::uniform_int_distribution<> dist;
auto prng = [&dist, &gen]{return dist(gen);};
std::vector<size_t> ranks(ranks_count);
for (size_t i = 0; i != items.size(); i++)
{
ranks[i] = i;
for (size_t j = 0; j != items[i].size(); j++)
{ items[i][j] = prng() & 0xFFF; }
}
std::cout << "using ";
clock_t const start = clock();
#ifdef RADIX_SORT
std::cout << "radix sort\n";
radix_isort(items.begin(), items.end(), ranks.begin());
#elif defined(STDSORT)
std::cout << "std::sort\n";
std::sort(ranks.begin(), ranks.begin() + items.size(),
make_fun_obj(items.cbegin())
//make_key_comp(make_deref(std::bind1st(
// add<Items::const_iterator, ptrdiff_t>(),
// items.begin())))
);
#elif defined(STDSORT_PACKED)
std::cout << "std::sort with a packed data structure\n";
using Items_ranks = std::vector< std::pair<uint32p8_t,
decltype(ranks)::value_type> >;
Items_ranks items_ranks;
size_t i = 0;
for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
{
items_ranks.emplace_back(*iI, i);
}
std::sort(begin(items_ranks), end(items_ranks),
[](Items_ranks::value_type const& lhs,
Items_ranks::value_type const& rhs)
{ return lhs.first < rhs.first; }
);
std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
[](Items_ranks::value_type const& e) { return e.second; }
);
#endif
auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);
bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
make_fun_obj(items.cbegin()));
std::cout << "duration: " << duration << " ms\n"<< "result sorted: " << std::boolalpha << sorted << "\n";
return 0;
}
Полный код:
#include <algorithm>
#include <iterator>
#include <vector>
#include <cstddef>
using std::size_t;
using std::ptrdiff_t;
#include <xmmintrin.h>
template<class It1, class It2>
void radix_ipass(
It1 begin, It1 const end,
It2 const a, size_t const i,
std::vector<std::vector<size_t> > &buckets)
{
size_t ncleared = 0;
for (It1 j = begin; j != end; ++j)
{
#if defined(_MM_TRANSPOSE4_PS)
constexpr auto N = 8;
if(end - j > N)
{ _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); }
#else
#error SS intrinsic not found
#endif
size_t const k = a[*j][i];
while (k >= ncleared && ncleared < buckets.size())
{ buckets[ncleared++].clear(); }
if (k >= buckets.size())
{
buckets.resize(k + 1);
ncleared = buckets.size();
}
buckets[k].push_back(size_t());
using std::swap; swap(buckets[k].back(), *j);
}
for (std::vector<std::vector<size_t> >::iterator
j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
{
begin = std::swap_ranges(j->begin(), j->end(), begin);
}
}
template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)
{
for (ptrdiff_t i = 0; i != end - begin; ++i) { items[i] = i; }
size_t smax = 0;
for (It i = begin; i != end; ++i)
{
size_t const n = i->size();
smax = n > smax ? n : smax;
}
std::vector<std::vector<size_t> > buckets;
for (size_t i = 0; i != smax; ++i)
{
radix_ipass(
items, items + (end - begin),
begin, smax - i - 1, buckets);
}
}
#include <functional>
template<class Key>
struct key_comp : public Key
{
explicit key_comp(Key const &key = Key()) : Key(key) { }
template<class T>
bool operator()(T const &a, T const &b) const
{ return this->Key::operator()(a) < this->Key::operator()(b); }
};
template<class Key>
key_comp<Key> make_key_comp(Key const &key) { return key_comp<Key>(key); }
template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
{ T1 operator()(T1 a, T2 const &b) const { return a += b; } };
template<class F>
struct deref : public F
{
deref(F const &f) : F(f) { }
typename std::iterator_traits<
typename F::result_type
>::value_type const
&operator()(typename F::argument_type const &a) const
{ return *this->F::operator()(a); }
};
template<class T> deref<T> make_deref(T const &t) { return deref<T>(t); }
size_t xorshf96(void) // random number generator
{
static size_t x = 123456789, y = 362436069, z = 521288629;
x ^= x << 16;
x ^= x >> 5;
x ^= x << 1;
size_t t = x;
x = y;
y = z;
z = t ^ x ^ y;
return z;
}
template<class It>
struct fun_obj
{
It beg;
bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
{
return beg[lhs] < beg[rhs];
}
};
template<class It>
fun_obj<It> make_fun_obj(It beg)
{
return fun_obj<It>{beg};
}
struct uint32p8_t
{
uint32_t m32;
uint8_t m8;
uint32p8_t(std::array<uint16_t, 3> const& a)
: m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
, m8( a[2]&0xFF )
{
}
operator std::array<size_t, 3>() const
{
return {{m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
(m32&0xF)<<8 | m8}};
}
friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
{
if(lhs.m32 < rhs.m32) return true;
if(lhs.m32 > rhs.m32) return false;
return lhs.m8 < rhs.m8;
}
};
#include <stdio.h>
#include <time.h>
#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>
int main(int argc, char* argv[])
{
std::cout.sync_with_stdio(false);
constexpr auto items_count_default = 2<<22;
constexpr auto seed_default = 42;
uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
std::cout << "seed: " << seed << "\n";
size_t const items_count = argc > 2 ? std::atoll(argv[2]) : items_count_default;
std::cout << "item count: " << items_count << "\n";
using Items_array_value_t =
#ifdef RADIX_SORT
size_t
#elif defined(STDSORT)
uint16_t
#elif defined(STDSORT_PACKED)
uint16_t
#endif
;
typedef std::vector<std::array<Items_array_value_t, 3> > Items;
Items items(items_count);
auto const ranks_count =
#ifdef RADIX_SORT
items.size() * 2
#elif defined(STDSORT)
items.size()
#elif defined(STDSORT_PACKED)
items.size()
#endif
;
//auto prng = xorshf96;
std::mt19937 gen(seed);
std::uniform_int_distribution<> dist;
auto prng = [&dist, &gen]{return dist(gen);};
std::vector<size_t> ranks(ranks_count);
for (size_t i = 0; i != items.size(); i++)
{
ranks[i] = i;
for (size_t j = 0; j != items[i].size(); j++)
{ items[i][j] = prng() & 0xFFF; }
}
std::cout << "using ";
clock_t const start = clock();
#ifdef RADIX_SORT
std::cout << "radix sort\n";
radix_isort(items.begin(), items.end(), ranks.begin());
#elif defined(STDSORT)
std::cout << "std::sort\n";
std::sort(ranks.begin(), ranks.begin() + items.size(),
make_fun_obj(items.cbegin())
//make_key_comp(make_deref(std::bind1st(
// add<Items::const_iterator, ptrdiff_t>(),
// items.begin())))
);
#elif defined(STDSORT_PACKED)
std::cout << "std::sort with a packed data structure\n";
using Items_ranks = std::vector< std::pair<uint32p8_t,
decltype(ranks)::value_type> >;
Items_ranks items_ranks;
size_t i = 0;
for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
{
items_ranks.emplace_back(*iI, i);
}
std::sort(begin(items_ranks), end(items_ranks),
[](Items_ranks::value_type const& lhs,
Items_ranks::value_type const& rhs)
{ return lhs.first < rhs.first; }
);
std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
[](Items_ranks::value_type const& e) { return e.second; }
);
#endif
auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);
bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
make_fun_obj(items.cbegin()));
std::cout << "duration: " << duration << " ms\n"<< "result sorted: " << std::boolalpha << sorted << "\n";
return 0;
}
Других решений пока нет …