Могу ли я выполнить мой get `std :: future` и тоже подождать?

Таким образом, вы можете создать std::future это не работает, пока .get() называется:

auto f_deferred = std::async( std::launch::deferred, []{ std::cout << "I ran\n"; } );

Вы также можете написать std::future это ожидаемо и может быть готово в любой момент с помощью кода в любом потоке:

std::packaged_task<void()> p( []( std::cout << "I also ran\n"; } );
auto f_waitable = p.get_future();

Если вы позвоните f_deferred.wait_for(1ms), это не будет беспокоить ожидание. Если вы позвоните f_deferred.get()лямбда на ваш выбор (в данном случае, тот, который печатает "I ran\n" выполняет.

Если вы позвоните f_waitable.get()код, управляющий задачами, не может знать, что кто-то ждет его в будущем. Но если вы позвоните f_deferred.wait(1ms);Вы просто получаете future_status::deferred немедленно.

Есть ли способ объединить эти два?

Конкретный вариант использования — пул потоков, возвращающий фьючерсы, когда люди ставят задачи в очередь. Если нетронутое будущее .get()Я хочу использовать поток, который заблокирован, для выполнения задачи, а не для простоя. С другой стороны, я хочу, чтобы люди с возвращенными фьючерсами могли определять, завершена ли задача, и даже ждать ограниченное количество времени, пока задача будет завершена. (в случае, когда вы ждете, я в порядке, когда ваш поток простаивает во время ожидания)

Если это не удастся, есть ли в будущих предложениях решения, которые могли бы решить эту проблему лучше, чем если бы мой пул потоков возвращал будущее со всеми его ограничениями? Я слышал, что у фьючерсов нет будущего, и существуют лучшие решения для решения фьючерсных проблем.

3

Решение

Я не уверен, что это именно то, что вам нужно, но это служит для иллюстрации того, что я предложил в комментарии. По крайней мере, я надеюсь, что это даст вам некоторые идеи для реализации того, что вам нужно, если это не покрывает все ваши потребности.

Отказ от ответственности: это очень грубо. Многое можно сделать более элегантно и эффективно.

#include <iostream>
#include <thread>
#include <future>
#include <memory>
#include <functional>
#include <queue>
#include <random>
#include <chrono>
#include <mutex>

typedef std::packaged_task<void()> task;
typedef std::shared_ptr<task> task_ptr;
typedef std::lock_guard<std::mutex> glock;
typedef std::unique_lock<std::mutex> ulock;
typedef unsigned int uint;
typedef unsigned long ulong;

// For sync'd std::cout
std::mutex cout_mtx;

// For task scheduling
std::mutex task_mtx;
std::condition_variable task_cv;

// Prevents main() from exiting
// before the last worker exits
std::condition_variable kill_switch;

// RNG engine
std::mt19937_64 engine;

// Random sleep (in ms)
std::uniform_int_distribution<int> sleep(100, 10000);

// Task queue
std::queue<task_ptr> task_queue;

static uint tasks = 0;
static std::thread::id main_thread_id;
static uint workers = 0;

template<typename T>
class Task
{
// Not sure if this needs
// to be std::atomic.
// A simple bool might suffice.
std::atomic<bool> working;
task_ptr tp;

public:

Task(task_ptr _tp)
:
working(false),
tp(_tp)
{}

inline T get()
{
working.store(true);
(*tp)();
return tp->get_future().get();
}

inline bool is_working()
{
return working.load();
}
};

auto task_factory()
{
return std::make_shared<task>([&]
{
uint task_id(0);
{
glock lk(cout_mtx);
task_id = ++tasks;
if (std::this_thread::get_id() == main_thread_id)
{
std::cout << "Executing task " << task_id << " in main thread.\n";
}
else
{
std::cout << "Executing task " << task_id << " in worker " << std::this_thread::get_id() << ".\n";
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep(engine)));
{
glock lk(cout_mtx);
std::cout << "\tTask " << task_id << " completed.\n";
}
});
}

auto func_factory()
{
return [&]
{

while(true)
{
ulock lk(task_mtx);
task_cv.wait(lk, [&]{ return !task_queue.empty(); });
Task<void> task(task_queue.front());
task_queue.pop();

// Check if the task has been assigned
if (!task.is_working())
{
// Sleep for a while and check again.
// If it is still not assigned after 1 s,
// start working on it.
// You can also place these checks
// directly in Task::get()
{
glock lk(cout_mtx);
std::cout << "\tTask not started, waiting 1 s...\n";
}
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
lk.lock();
if (!task.is_working())
{
{
glock lk(cout_mtx);
std::cout << "\tTask not started after 1 s, commencing work...\n";
}
lk.unlock();
task.get();
lk.lock();
}

if (task_queue.empty())
{
break;
}
}
}
};
}

int main()
{
engine.seed(std::chrono::high_resolution_clock::now().time_since_epoch().count());

std::cout << "Main thread: " << std::this_thread::get_id() << "\n";
main_thread_id = std::this_thread::get_id();

for (int i = 0; i < 50; ++i)
{
task_queue.push(task_factory());
}

std::cout << "Tasks enqueued: " << task_queue.size() << "\n";

// Spawn 5 workers
for (int i = 0; i < 5; ++i)
{
std::thread([&]
{
{
ulock lk(task_mtx);
++workers;
task_cv.wait(lk);
{
glock lk(cout_mtx);
std::cout << "\tWorker started\n";
}
}

auto fn(func_factory());
fn();

ulock lk(task_mtx);
--workers;
if (workers == 0)
{
kill_switch.notify_all();
}

}).detach();
}

// Notify all workers to start processing the queue
task_cv.notify_all();

// This is the important bit:
// Tasks can be executed by the main thread
// as well as by the workers.
// In fact, any thread can grab a task from the queue,
// check if it is running and start working
// on it if it is not.
auto fn(func_factory());
fn();

ulock lk(task_mtx);
if (workers > 0)
{
kill_switch.wait(lk);
}

return 0;
}

Это мой CMakeLists.txt

cmake_minimum_required(VERSION 3.2)

project(tp_wait)

set(CMAKE_CXX_COMPILER "clang++")
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_BUILD_TYPE "Debug" CACHE STRING "Build type" FORCE)

find_package(Threads REQUIRED)

add_executable(${PROJECT_NAME} "main.cpp")
target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT})
0

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]