Я реализовал рабочую многопоточную сортировку слиянием в C ++, но столкнулся со стеной.
В моей реализации я рекурсивно разделяю входной вектор на две части, а затем разделяю эти две части:
void MergeSort(vector<int> *in)
{
if(in->size() < 2)
return;
vector<int>::iterator ite = in->begin();
vector<int> left = vector<int> (ite, ite + in->size()/2);
vector<int> right = vector<int> (ite + in->size()/2, in->end() );
//current thread spawns 2 threads HERE
thread t1 = thread(MergeSort, &left);
thread t2 = thread(MergeSort, &right);
t1.join();
t2.join();
vector<int> ret;
ret.reserve(in->size() );
ret = MergeSortMerge(left, right);
in->clear();
in->insert(in->begin(), ret.begin(), ret.end() );
return;
}
Код кажется красивым, но это один из самых порочных кодов, которые я когда-либо писал. Попытка сортировки массива из более чем 1000 значений int приводит к появлению такого количества потоков, что у меня заканчивается свободное пространство в стеке, а также BSOD моего компьютера 🙁 Последовательно.
Я хорошо знаю причину, почему этот код порождает так много потоков, что не так хорошо, но технически (если не теоретически), разве это не правильная реализация?
Основываясь на небольшом поиске в Google, я, кажется, нашел необходимость в пуле потоков. Решит ли использование пула потоков фундаментальную проблему, с которой я сталкиваюсь — тот факт, что я пытаюсь порождать слишком много потоков? Если да, есть ли у вас рекомендации по библиотекам?
Спасибо за совет и помощь!
Как пояснил Здан, вы должны ограничить количество потоков. Есть две вещи, чтобы рассмотреть, чтобы определить, каков предел,
Количество ядер процессора. В C ++ 11 вы можете использовать std::thread::hardware_concurrency()
определить аппаратные ядра. Однако эта функция может возвращать 0, что означает, что программа не знает, сколько ядер, в этом случае вы можете принять это значение равным 2 или 4.
Ограничено количеством данных для обработки. Вы можете разделить данные, которые будут обрабатываться потоками, до 1 данных на поток, но это будет слишком дорого только для 1 данных и не будет экономически эффективным. Например, вы, вероятно, можете сказать, что когда количество данных меньше 50, вы больше не хотите делить. Таким образом, вы можете определить максимальное количество необходимых потоков на основе чего-то вроде total_data_number / 50 + 1
,
Затем вы выбираете минимальное число между делом 1 & Случай 2 для определения предела.
В вашем случае, поскольку вы генерируете поток по рекурсии, вы можете попытаться определить глубину рекурсии аналогичным образом.
Я не думаю, что пул потоков поможет вам. Поскольку ваш алгоритм является рекурсивным, вы попадете в точку, где все потоки в вашем пуле потребляются, и пул не захочет создавать больше потоков, и ваш алгоритм заблокируется.
Вероятно, вы могли бы просто ограничить глубину рекурсии создания потока до 2 или 3 (если у вас нет МНОГО ЦП, это не повлияет на производительность).
Вы можете установить свои ограничения на пространство стека, но это бесполезно. Слишком много потоков, даже с пулом, будут поглощать его по цене log2 (N) * на поток. Перейти на итеративный подход и сократить ваши накладные расходы. Накладные расходы — убийца.
Что касается производительности, вы обнаружите, что использование некоторого уровня избыточной фиксации N потоков, где аппаратный параллелизм, вероятно, даст лучшие результаты. Будет хороший баланс между накладными расходами и работой на ядро. Если N get очень велико, как на GPU, то существуют другие варианты (Bitonic), которые делают различные компромиссы для уменьшения накладных расходов на связь (ожидание / соединение).
Предполагая, что у вас есть диспетчер задач и семафор, созданный для N, уведомляет, прежде чем пропустить ожидающую задачу,
`
#include <algorithm>
#include <array>
#include <cstdint>
#include <vector>
#include <sometaskmanager.h>
void parallel_merge( size_t N ) {
std::array<int, 1000> ary {0};
// fill array...
intmax_t stride_size = ary.size( )/N; //TODO: Put a MIN size here
auto semaphore = make_semaphore( N );
using iterator = typename std::array<int, 1000>::iterator;
std::vector<std::pair<iterator, iterator>> ranges;
auto last_it = ary.begin( );
for( intmax_t n=stride_size; n<N; n +=stride_size ) {
ranges.emplace_back( last_it, std::next(last_it, std::min(std::distance(last_it, ary.end()), stride_size)));
semaphore.notify( );
}
for( auto const & rng: ranges ) {
add_task( [&semaphore,rng]( ) {
std::sort( rng.first, rng.second );
});
}
semaphore.wait( );
std::vector<std::pair<iterator, iterator>> new_rng;
while( ranges.size( ) > 1 ) {
semaphore = make_semaphore( ranges.size( )/2 );
for( size_t n=0; n<ranges.size( ); n+=2 ) {
auto first=ranges[n].first;
auto last=ranges[n+1].second;
add_task( [&semaphore, first, mid=ranges[n].second, last]( ) {
std::inplace_merge( first, mid, last );
semaphore.notify( );
});
new_rng.emplace_back( first, last );
}
if( ranges.size( ) % 2 != 0 ) {
new_rng.push_back( ranges.back( ) );
}
ranges = new_rng;
semaphore.wait( );
}
}
Как вы можете видеть, узкое место находится в фазе слияния, так как необходимо сделать много кординаций. Шон Родитель делает хорошую презентацию построения диспетчера задач, если у вас его нет, и рассказывает о его сравнении с анализом относительной производительности в своей презентации «Лучший код: параллелизм», http://sean-parent.stlab.cc/presentations/2016-11-16-concurrency/2016-11-16-concurrency.pdf . TBB и PPL имеют менеджеров задач.