Могу ли я использовать std :: async, не дожидаясь будущих ограничений?

Высокий уровень
Я хочу вызвать некоторые функции без возвращаемого значения в асинхронном режиме, не дожидаясь их завершения. Если я использую std :: async, будущий объект не будет разрушен, пока задача не будет завершена, в моем случае вызов не синхронизируется.

пример

void sendMail(const std::string& address, const std::string& message)
{
//sending the e-mail which takes some time...
}

myResonseType processRequest(args...)
{
//Do some processing and valuate the address and the message...

//Sending the e-mail async
auto f = std::async(std::launch::async, sendMail, address, message);

//returning the response ASAP to the client
return myResponseType;

} //<-- I'm stuck here until the async call finish to allow f to be destructed.
// gaining no benefit from the async call.

Мои вопросы

  1. Есть ли способ преодолеть это ограничение?
  2. если (1) — нет, следует ли мне однажды реализовать поток, который возьмет эти «зомби» фьючерсы и будет ждать их?
  3. Есть (1) и (2) нет, есть ли другой вариант, кроме как просто построить свой собственный пул потоков?

нота:
Я скорее не использую опцию thread + detach (предложено @ galop1n), поскольку создание нового потока связано с издержками, которых я бы хотел избежать. При использовании std :: async (по крайней мере, в MSVC) используется внутренний пул потоков.

Благодарю.

34

Решение

Вы можете переместить будущее в глобальный объект, поэтому при запуске деструктора локального будущего ему не нужно ждать завершения асинхронного потока.

std::vector<std::future<void>> pending_futures;

myResonseType processRequest(args...)
{
//Do some processing and valuate the address and the message...

//Sending the e-mail async
auto f = std::async(std::launch::async, sendMail, address, message);

// transfer the future's shared state to a longer-lived future
pending_futures.push_back(std::move(f));

//returning the response ASAP to the client
return myResponseType;

}

Нотабене Это небезопасно, если асинхронный поток ссылается на любые локальные переменные в processRequest функция.

При использовании std::async (по крайней мере на MSVC) использует внутренний пул потоков.

Это на самом деле не соответствует стандарту прямо говорится, что задачи выполняются с std::launch::async должен выполняться так, как если бы он находился в новом потоке, поэтому любые локальные переменные потока не должны сохраняться от одной задачи к другой. Это обычно не имеет значения, хотя.

13

Другие решения

почему вы не просто запускаете тему и не отсоединяетесь, если вам не важно присоединиться?

std::thread{ sendMail, address, message}.detach();

std :: async привязан к времени жизни возвращаемого std :: future, и их альтернативы нет.

Помещение std :: future в очередь ожидания, прочитанную другим потоком, потребует того же механизма безопасности, что и пул, получающий новую задачу, например мьютекс вокруг контейнера.

В таком случае лучше всего использовать пул потоков для использования задач, непосредственно помещенных в очередь, безопасную для потоков. И это не будет зависеть от конкретной реализации.

Ниже реализации пула потоков, принимающей любые вызываемые и аргументы, потоки опрашивают очередь, лучшая реализация должна использовать условные переменные (coliru):

#include <iostream>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <functional>
#include <string>

struct ThreadPool {
struct Task {
virtual void Run() const = 0;
virtual ~Task() {};
};

template < typename task_, typename... args_ >
struct RealTask : public Task {
RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {}
void Run() const override {
fun_();
}
private:
decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_;
};

template < typename task_, typename... args_ >
void AddTask( task_&& task, args_&&... args ) {
auto lock = std::unique_lock<std::mutex>{mtx_};
using FinalTask = RealTask<task_, args_... >;
q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) );
}

ThreadPool() {
for( auto & t : pool_ )
t = std::thread( [=] {
while ( true ) {
std::unique_ptr<Task> task;
{
auto lock = std::unique_lock<std::mutex>{mtx_};
if ( q_.empty() && stop_ )
break;
if ( q_.empty() )
continue;
task = std::move(q_.front());
q_.pop();
}
if (task)
task->Run();
}
} );
}
~ThreadPool() {
{
auto lock = std::unique_lock<std::mutex>{mtx_};
stop_ = true;
}
for( auto & t : pool_ )
t.join();
}
private:
std::queue<std::unique_ptr<Task>> q_;
std::thread pool_[8];
std::mutex mtx_;
volatile bool stop_ {};
};

void foo( int a, int b ) {
std::cout << a << "." << b;
}
void bar( std::string const & s) {
std::cout << s;
}

int main() {
ThreadPool pool;
for( int i{}; i!=42; ++i ) {
pool.AddTask( foo, 3, 14 );
pool.AddTask( bar, " - " );
}
}
4

Вместо того, чтобы перенести будущее в глобальный объект (и вручную управлять удалением неиспользованных фьючерсов), вы можете переместить его в локальная сфера асинхронно вызываемой функции.

«Пусть асинхронная функция берет свое будущее», так сказать.

Я придумал эту обертку шаблона, которая работает для меня (проверено на Windows):

#include <future>

template<class Function, class... Args>
void async_wrapper(Function&& f, Args&&... args, std::future<void>& future,
std::future<void>&& is_valid, std::promise<void>&& is_moved) {
is_valid.wait(); // Wait until the return value of std::async is written to "future"auto our_future = std::move(future); // Move "future" to a local variable
is_moved.set_value(); // Only now we can leave void_async in the main thread

// This is also used by std::async so that member function pointers work transparently
auto functor = std::bind(f, std::forward<Args>(args)...);
functor();
}

template<class Function, class... Args> // This is what you call instead of std::async
void void_async(Function&& f, Args&&... args) {
std::future<void> future; // This is for std::async return value
// This is for our synchronization of moving "future" between threads
std::promise<void> valid;
std::promise<void> is_moved;
auto valid_future = valid.get_future();
auto moved_future = is_moved.get_future();

// Here we pass "future" as a reference, so that async_wrapper
// can later work with std::async's return value
future = std::async(
async_wrapper<Function, Args...>,
std::forward<Function>(f), std::forward<Args>(args)...,
std::ref(future), std::move(valid_future), std::move(is_moved)
);
valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid
moved_future.wait(); // Wait for "future" to actually be moved
}

Я немного удивлен, что это работает, потому что я думал, что деструктор перенесенного будущего будет блокировать, пока мы не уйдем async_wrapper. Это должно ждать async_wrapper чтобы вернуться, но он ждет внутри этой самой функции. Логично, что это должен быть тупик, но это не так.

Я также попытался добавить строку в конце async_wrapper чтобы вручную очистить будущий объект:

our_future = std::future<void>();

Это тоже не блокирует.

2

я понятия не имею, что я делаю, но это, кажется, работает:

// :( http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3451.pdf
template<typename T>
void noget(T&& in)
{
static std::mutex vmut;
static std::vector<T> vec;
static std::thread getter;
static std::mutex single_getter;
if (single_getter.try_lock())
{
getter = std::thread([&]()->void
{
size_t size;
for(;;)
{
do
{
vmut.lock();
size=vec.size();
if(size>0)
{
T target=std::move(vec[size-1]);
vec.pop_back();
vmut.unlock();
// cerr << "getting!" << endl;
target.get();
}
else
{
vmut.unlock();
}
}while(size>0);
// ¯\_(ツ)_/¯
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
getter.detach();
}
vmut.lock();
vec.push_back(std::move(in));
vmut.unlock();
}

он создает выделенный поток получения для каждого типа будущего, которое вы выбрасываете (например, если вы даете будущее и будущее, у вас будет 2 потока. Если вы дадите ему 100x будущее, у вас останется только 2 потока), и когда есть будущее, с которым ты не хочешь иметь дело, просто сделай notget(fut); — вы также можете noget(std::async([]()->void{...})); работает просто отлично, нет блока, кажется. предупреждение, делать не попытайтесь получить значение из будущего после использования noget () для него. это вероятно UB и напрашивается на неприятности.

0
По вопросам рекламы [email protected]