Реализация простого общего пула потоков в C ++ 11

Я хочу создать пул потоков для экспериментальных целей (и для забавного фактора). Он должен быть в состоянии обрабатывать самые разные задачи (поэтому я могу использовать его в более поздних проектах).


В моем классе пула потоков мне понадобится какая-то очередь задач. Поскольку стандартная библиотека обеспечивает std::packaged_task начиная со стандарта C ++ 11, моя очередь будет выглядеть std::deque<std::packaged_task<?()> > task_queueтак что клиент может нажать std::packaged_tasks в очередь через какую-то функцию открытого интерфейса (а затем один из потоков в пуле будет уведомлен с помощью условной переменной для его выполнения и т. д.).


Мой вопрос связан с аргументом шаблона std::packaged_task<?()>в дэке

Подпись функции ?() должен иметь возможность работать с любым типом / числом параметров, потому что клиент может сделать что-то вроде:

std::packaged_task<int()> t(std::bind(factorial, 342));
thread_pool.add_task(t);

Поэтому мне не нужно иметь дело с типом / количеством параметров.

Но каким должно быть возвращаемое значение? (отсюда вопросительный знак)

  • Если я сделаю весь класс пула потоков классом шаблона, один экземпляр
    из него можно будет справиться только с задачами с определенной подписью
    (лайк std::packaged_task<int()>).

    Я хочу, чтобы один объект пула потоков мог справиться с любой задачей.

  • Если я пойду с std::packaged_task<void()> и вызванная функция
    возвращает целое число или что-то вообще, тогда это неопределенное поведение.

4

Решение

Так что сложная часть заключается в том, что packaged_task<R()> только для перемещения, в противном случае вы могли бы просто бросить его в std::function<void()>и запустите те в своих темах.

Есть несколько способов обойти это.

Во-первых, смешно, использовать packaged_task<void()> хранить packaged_task<R()>, Я бы посоветовал против этого, но это работает. 😉 (что является подписью operator() на packaged_task<R()>? Какова обязательная подпись для объектов, которые вы передаете packaged_task<void()>?)

Во-вторых, заверните packaged_task<R()> в shared_ptrЗахватите что в лямбду с подписью void()Храните это в std::function<void()>и сделано. Это накладные расходы, но, вероятно, меньше, чем первое решение.

Наконец, напишите свою собственную функцию-оболочку только для перемещения. Для подписи void() это коротко:

struct task {
template<class F,
class dF=std::decay_t<F>,
class=decltype( std::declval<dF&>()() )
>
task( F&& f ):
ptr(
new dF(std::forward<F>(f)),
[](void* ptr){ delete static_cast<dF*>(ptr); }
),
invoke([](void*ptr){
(*static_cast<dF*>(ptr))();
})
{}
void operator()()const{
invoke( ptr.get() );
}
task(task&&)=default;
task&operator=(task&&)=default;
task()=default;
~task()=default;
explicit operator bool()const{return static_cast<bool>(ptr);}
private:
std::unique_ptr<void, void(*)(void*)> ptr;
void(*invoke)(void*) = nullptr;
};

и просто. Выше может хранить packaged_task<R()> для любого типа Rи вызвать их позже.

Это имеет относительно минимальные накладные расходы — это должно быть дешевле, чем std::functionПо крайней мере, реализации, которые я видел, за исключением того, что он не выполняет SBO (оптимизацию небольших буферов), где он хранит небольшие функциональные объекты внутри, а не в куче.

Вы можете улучшить unique_ptr<> ptr Контейнер с небольшой оптимизацией буфера, если хотите.

4

Другие решения

У меня есть реализация, которая делает именно это. Мой способ делать вещи, чтобы обернуть std::packaged_task объекты в структуре, которая абстрагирует возвращаемый тип. Метод, который отправляет задачу в пул потоков, возвращает будущее для результата.

Этот вид работает, но из-за выделения памяти, требуемого для каждой задачи, он не подходит для задач, которые очень короткие и очень частые (я пытался использовать его для распараллеливания фрагментов моделирования жидкости, и объем служебных данных был слишком высоким, в порядка нескольких миллисекунд для 324 задач).

Ключевая часть этой структуры:

struct abstract_packaged_task
{
template <typename R>
abstract_packaged_task(std::packaged_task<R> &&task):
m_task((void*)(new std::packaged_task<R>(std::move(task)))),
m_call_exec([](abstract_packaged_task *instance)mutable{
(*(std::packaged_task<R>*)instance->m_task)();
}),
m_call_delete([](abstract_packaged_task *instance)mutable{
delete (std::packaged_task<R>*)(instance->m_task);
})
{

}

abstract_packaged_task(abstract_packaged_task &&other);

~abstract_packaged_task();

void operator()();

void *m_task;
std::function<void(abstract_packaged_task*)> m_call_exec;
std::function<void(abstract_packaged_task*)> m_call_delete;
};

Как видите, он скрывает зависимости типов с помощью лямбда-выражений с std::function и void*, Если вы знаете максимальный размер всех возможных std::packaged_task объекты (я не проверял, зависит ли размер от R вообще), вы можете попытаться оптимизировать это, удалив выделение памяти.

Метод отправки в пул потоков затем делает это:

template <typename R>
std::future<R> submit_task(std::packaged_task<R()> &&task)
{
assert(m_workers.size() > 0);
std::future<R> result = task.get_future();
{
std::unique_lock<std::mutex> lock(m_queue_mutex);
m_task_queue.emplace_back(std::move(task));
}
m_queue_wakeup.notify_one();
return result;
}

где m_task_queue является std::deque из abstract_packaged_task Структуры. m_queue_wakeup это std::condition_variable разбудить рабочую нить, чтобы забрать задачу. Реализация рабочих потоков так же проста, как:

void ThreadPool::worker_impl()
{
std::unique_lock<std::mutex> lock(m_queue_mutex, std::defer_lock);
while (!m_terminated) {
lock.lock();
while (m_task_queue.empty()) {
m_queue_wakeup.wait(lock);
if (m_terminated) {
return;
}
}
abstract_packaged_task task(std::move(m_task_queue.front()));
m_task_queue.pop_front();
lock.unlock();

task();
}
}

Вы можете взглянуть на полный исходный код и соответствующий заголовок на моем github.

4

По вопросам рекламы [email protected]