Я хочу создать пул потоков для экспериментальных целей (и для забавного фактора). Он должен быть в состоянии обрабатывать самые разные задачи (поэтому я могу использовать его в более поздних проектах).
В моем классе пула потоков мне понадобится какая-то очередь задач. Поскольку стандартная библиотека обеспечивает std::packaged_task
начиная со стандарта C ++ 11, моя очередь будет выглядеть std::deque<std::packaged_task<?()> > task_queue
так что клиент может нажать std::packaged_task
s в очередь через какую-то функцию открытого интерфейса (а затем один из потоков в пуле будет уведомлен с помощью условной переменной для его выполнения и т. д.).
Мой вопрос связан с аргументом шаблона std::packaged_task<?()>
в дэке
Подпись функции ?()
должен иметь возможность работать с любым типом / числом параметров, потому что клиент может сделать что-то вроде:
std::packaged_task<int()> t(std::bind(factorial, 342));
thread_pool.add_task(t);
Поэтому мне не нужно иметь дело с типом / количеством параметров.
Но каким должно быть возвращаемое значение? (отсюда вопросительный знак)
Если я сделаю весь класс пула потоков классом шаблона, один экземпляр
из него можно будет справиться только с задачами с определенной подписью
(лайк std::packaged_task<int()>
).
Я хочу, чтобы один объект пула потоков мог справиться с любой задачей.
Если я пойду с std::packaged_task<void()>
и вызванная функция
возвращает целое число или что-то вообще, тогда это неопределенное поведение.
Так что сложная часть заключается в том, что packaged_task<R()>
только для перемещения, в противном случае вы могли бы просто бросить его в std::function<void()>
и запустите те в своих темах.
Есть несколько способов обойти это.
Во-первых, смешно, использовать packaged_task<void()>
хранить packaged_task<R()>
, Я бы посоветовал против этого, но это работает. 😉 (что является подписью operator()
на packaged_task<R()>
? Какова обязательная подпись для объектов, которые вы передаете packaged_task<void()>
?)
Во-вторых, заверните packaged_task<R()>
в shared_ptr
Захватите что в лямбду с подписью void()
Храните это в std::function<void()>
и сделано. Это накладные расходы, но, вероятно, меньше, чем первое решение.
Наконец, напишите свою собственную функцию-оболочку только для перемещения. Для подписи void()
это коротко:
struct task {
template<class F,
class dF=std::decay_t<F>,
class=decltype( std::declval<dF&>()() )
>
task( F&& f ):
ptr(
new dF(std::forward<F>(f)),
[](void* ptr){ delete static_cast<dF*>(ptr); }
),
invoke([](void*ptr){
(*static_cast<dF*>(ptr))();
})
{}
void operator()()const{
invoke( ptr.get() );
}
task(task&&)=default;
task&operator=(task&&)=default;
task()=default;
~task()=default;
explicit operator bool()const{return static_cast<bool>(ptr);}
private:
std::unique_ptr<void, void(*)(void*)> ptr;
void(*invoke)(void*) = nullptr;
};
и просто. Выше может хранить packaged_task<R()>
для любого типа R
и вызвать их позже.
Это имеет относительно минимальные накладные расходы — это должно быть дешевле, чем std::function
По крайней мере, реализации, которые я видел, за исключением того, что он не выполняет SBO (оптимизацию небольших буферов), где он хранит небольшие функциональные объекты внутри, а не в куче.
Вы можете улучшить unique_ptr<> ptr
Контейнер с небольшой оптимизацией буфера, если хотите.
У меня есть реализация, которая делает именно это. Мой способ делать вещи, чтобы обернуть std::packaged_task
объекты в структуре, которая абстрагирует возвращаемый тип. Метод, который отправляет задачу в пул потоков, возвращает будущее для результата.
Этот вид работает, но из-за выделения памяти, требуемого для каждой задачи, он не подходит для задач, которые очень короткие и очень частые (я пытался использовать его для распараллеливания фрагментов моделирования жидкости, и объем служебных данных был слишком высоким, в порядка нескольких миллисекунд для 324 задач).
Ключевая часть этой структуры:
struct abstract_packaged_task
{
template <typename R>
abstract_packaged_task(std::packaged_task<R> &&task):
m_task((void*)(new std::packaged_task<R>(std::move(task)))),
m_call_exec([](abstract_packaged_task *instance)mutable{
(*(std::packaged_task<R>*)instance->m_task)();
}),
m_call_delete([](abstract_packaged_task *instance)mutable{
delete (std::packaged_task<R>*)(instance->m_task);
})
{
}
abstract_packaged_task(abstract_packaged_task &&other);
~abstract_packaged_task();
void operator()();
void *m_task;
std::function<void(abstract_packaged_task*)> m_call_exec;
std::function<void(abstract_packaged_task*)> m_call_delete;
};
Как видите, он скрывает зависимости типов с помощью лямбда-выражений с std::function
и void*
, Если вы знаете максимальный размер всех возможных std::packaged_task
объекты (я не проверял, зависит ли размер от R
вообще), вы можете попытаться оптимизировать это, удалив выделение памяти.
Метод отправки в пул потоков затем делает это:
template <typename R>
std::future<R> submit_task(std::packaged_task<R()> &&task)
{
assert(m_workers.size() > 0);
std::future<R> result = task.get_future();
{
std::unique_lock<std::mutex> lock(m_queue_mutex);
m_task_queue.emplace_back(std::move(task));
}
m_queue_wakeup.notify_one();
return result;
}
где m_task_queue
является std::deque
из abstract_packaged_task
Структуры. m_queue_wakeup
это std::condition_variable
разбудить рабочую нить, чтобы забрать задачу. Реализация рабочих потоков так же проста, как:
void ThreadPool::worker_impl()
{
std::unique_lock<std::mutex> lock(m_queue_mutex, std::defer_lock);
while (!m_terminated) {
lock.lock();
while (m_task_queue.empty()) {
m_queue_wakeup.wait(lock);
if (m_terminated) {
return;
}
}
abstract_packaged_task task(std::move(m_task_queue.front()));
m_task_queue.pop_front();
lock.unlock();
task();
}
}
Вы можете взглянуть на полный исходный код и соответствующий заголовок на моем github.