Учитывая контейнер фьючерсов, как выполнить все получает неблокирующим способом?

Итак, я пытаюсь создать общий способ создания контейнера фьючерсов, а также выполнить все future.get () ‘неблокирующим способом.

Я предполагаю, что время выполнения задач должно составлять от нескольких сотен миллисекунд, обычно до 2 минут. Некоторые, однако, могут не завершиться вообще. При обычном запуске будет выполнено не менее 10 000 задач.

Я хочу, чтобы результаты задач с наиболее быстрым возвратом вернулись, не задерживаясь другими, более длительными задачами в контейнере фьючерсов.

Вот что я пока использовал, просто используя фиктивное время сна для имитации задержек завершения задачи (дизайн во многом благодаря хорошим постам, сделанным здесь, таким как этот, а также этот):

#include <future>
#include <vector>
#include <iostream>
#include <random>
#include <chrono>
#include <ratio>
#include <thread>
#include <algorithm>

size_t rand_from_range(const size_t, const size_t);
int rand_sleep_range(const size_t, const size_t);
template<class CT> size_t get_async_all( CT& );

// Given a function and a collection,
//  return a vector of futures.
template<class Function, class CT>
auto async_all( Function f, CT coll )
-> std::vector<decltype(std::async(f, *std::begin(coll)))>
{
std::vector<decltype(std::async(f, *std::begin(coll)))> futures;
futures.reserve(coll.size());
for (auto& element : coll)
futures.push_back(std::async(f, element));
return futures;
}

// Given the beginning and end of a number
//  range, return a random number therein.
size_t rand_from_range( const size_t range_begin,
const size_t range_end )
{
std::uniform_int_distribution<size_t>
distr(range_begin, range_end);
std::random_device dev;
return distr(dev);
}

// Given a shortest and longest duration, put the calling
//  thread to sleep for a random duration therein.
// (in milliseconds)
int rand_sleep_range( const size_t shortest_time,
const size_t longest_time )
{
std::chrono::milliseconds
sleep_time(rand_from_range(shortest_time, longest_time));
std::this_thread::sleep_for(sleep_time);
return (int)sleep_time.count();
}

// Given a container of futures, perform all
//  get()'s.
template<class CT>
size_t get_async_all( CT& async_coll )
{
size_t get_ctr(0);
const size_t future_cnt = async_coll.size();
std::vector<size_t> completed;
completed.reserve(future_cnt);

while (true) {
for (size_t ndx = 0; ndx < future_cnt; ++ndx) {
// Check to see if this ndx' future has completed already.
if (std::none_of(std::begin(completed), std::end(completed),
[=](size_t x) {
return (x == ndx);
}))
{ // No, this one hasn't completed
//  yet, attempt to process it.
auto& f = async_coll[ndx];
if (f.wait_for(std::chrono::milliseconds(10))
== std::future_status::ready)
{
f.get(); // The future's work gets done here.
++get_ctr;
completed.push_back(ndx);
if (completed.size() == future_cnt)
break; // for()
}
}
}
if (completed.size() == future_cnt)
break; // while()
}
return get_ctr;
}

int main()
{
// A dummy container of ints.
std::vector<int> my_vec(100);
for (auto& elem : my_vec)
elem = rand_from_range(1, 100);

// A dummy function lambda.
auto my_func = [](int x) {
int x_ = x;
int sleep_time = rand_sleep_range(100, 20000); // in ms.
x *= 2;
std::cout << " after sleeping " << sleep_time << "ms \t"<< "f(" << x_ << ") = " << x << std::endl;
};

// Create and execute the container of futures.
auto async_coll = async_all(my_func, my_vec);
size_t count = get_async_all(async_coll);

std::cout << std::endl << count << " items completed. \n";
}

Итак, мои вопросы:

  • Есть ли какие-то ошибки в подходе, который я использую?
  • Есть ли лучший / более элегантный подход для get_async_all (), чем тот, который я использую? Или что-нибудь еще, что я делаю, в этом отношении.

Спасибо всем, кто нашел время, чтобы просмотреть код и дать мне любую конструктивную критику или обратную связь.

3

Решение

Там по крайней мере один гоча. Вы вызываете std::async без указания политики запуска, а это означает, что некоторые или все задачи могут выполняться отложено. Но в вашем тесте, чтобы увидеть, если задача выполнена, вы тестируете только для std::future_status_ready, Если задача отложена, вы всегда вернетесь std::future_status_deferredЭто означает, что ваш тест никогда не вернет истину.

Самое простое решение этой проблемы — указать политику запуска std::launch::async, но тогда вы рискуете переподписать вашу систему. Альтернативой является изменение вашего теста для проверки отложенных задач, но тогда возникает вопрос, что с ними делать. Если вы позвоните get или же wait на них вы блокируете на произвольное количество времени.

Что касается вашего общего подхода, вместо того, чтобы блокировать 10 мс для ожидания завершения каждой задачи по мере их опроса, вы можете рассмотреть вопрос об ожидании 0 мс, то есть сделать чистый опрос, чтобы увидеть, завершена ли задача. Это может уменьшить задержку между завершением задачи и ее обработкой, но может увеличить время ожидания опроса до точки, в которой ваша система работает медленнее.

Совершенно другой подход может состоять в том, чтобы отказаться от опроса каждой задачи и вместо этого сделать так, чтобы каждая задача записывала флаг «Я закончил» в общую структуру данных (например, std::deque), затем периодически опрашивайте эту структуру данных, чтобы увидеть, есть ли в ней что-нибудь. Если это так, обработайте выполненные задачи, удалите их из структуры данных, а затем вернитесь в спящий режим, пока не придет время для повторного опроса. Если ваши задачи делают push_back что касается структуры данных, вы можете обработать их в том порядке, в котором они завершены. Недостатком этого дизайна является то, что общая структура данных может стать узким местом в производительности.

3

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]