Я пытаюсь создать общую систему задач, где я могу публиковать задачи, которые выполняются в любом потоке, который свободен. С предыдущей попыткой у меня часто заканчивались потоки, потому что в какой-то момент они блокировались. Поэтому я пытаюсь увеличить волокна; когда одно волокно блокирует нить, она может свободно работать с другим волокном, звучит идеально.
Алгоритм кражи работ кажется идеальным для моих целей, но мне очень трудно его использовать. В этом примере создаются волокна кода, и только тогда создаются потоки и планировщики, поэтому все волокна фактически выполняются во всех потоках. Но я хочу запустить волокна позже, и к тому времени все остальные потоки будут приостановлены на неопределенный срок, потому что у них не было никакой работы. Я не нашел способа разбудить их снова, все мои волокна выполняются только в главном потоке. «notify», кажется, является методом для вызова, но я не вижу способа фактически добраться до экземпляра алгоритма.
Я пытался сохранить указатели на все экземпляры алгоритма, чтобы я мог вызвать notify (), но это не очень помогает; в большинстве случаев алгоритмы в рабочих потоках не могут украсть что-либо из основного, потому что следующим является dispatcher_context.
Я мог бы отключить «приостановить», но потоки заняты ожидания, а не вариант.
Я также попробовал алгоритм shared_work. Та же проблема, когда поток не может найти волокно, он никогда не проснется снова. Я попробовал тот же хак вручную, вызвав notify (), тот же результат, очень ненадежный.
Я попытался использовать каналы, но AFAICT, если волокно ожидает его, текущий контекст просто «перепрыгивает» и запускает ожидающее волокно, приостанавливая текущий.
Короче говоря: мне очень трудно надежно запустить волокно на другом потоке. При профилировании большинство потоков просто ждут переменную condition_variable, хотя я и создал тонны волокон.
В качестве небольшого тестового примера я пытаюсь:
std::vector<boost::fibers::future<int>> v;
for (auto i = 0; i < 16; ++i)
v.emplace_back(boost::fibers::async([i] {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return i;
}));
int s = 0;
for (auto &f : v)
s += f.get();
Я намеренно использую this_thread :: sleep_for для имитации загрузки процессора.
С 16 потоками я ожидал бы, что этот код будет выполняться в 1 с, но в основном это 16 с. Я смог получить этот конкретный пример для запуска в 1 с, просто взламывая вещи; но никоим образом не чувствовал себя «правильным» и никоим образом не работал для других сценариев, его всегда нужно было обрабатывать вручную в соответствии с одним конкретным сценарием.
Я думаю, что этот пример должен просто работать как ожидалось с алгоритмом work_stealing; что мне не хватает? Это просто неправильное использование волокон? Как я мог реализовать это надежно?
Спасибо,
Dix
boost.fiber содержит пример использования алгоритма work_stealing (examples / work_stealing.cpp).
Вы должны установить алгоритм на каждом рабочем потоке, который должен обрабатывать / красть волокна.
boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( 4); // 4 worker-threads
Прежде чем обрабатывать задачи / волокна, вы должны подождать, пока все рабочие потоки не будут зарегистрированы по алгоритму. Пример использует барьер для этой цели.
Вам нужно указание, что вся работа / задача была выполнена, для возможности использования условной переменной.
Взгляни на Запуск с рабочими потоками (форсированная документация).
Других решений пока нет …