Использование вложенного std :: async с std :: condition_variable в Visual Studio 2012

у меня есть Worker классы и Handler Класс для создания слоя абстракции для рабочих мест уже. Я хотел использовать std::async добавить немного асинхронности в микс, но я получил странное поведение от моей Visual Studio 2012 (обновление 1).

Моя классовая иерархия выглядит следующим образом:

  • Worker абстрактный класс с Init а также Work как чисто виртуальные методы.
  • BasicWorker : Worker просто использует printf для некоторого вывода.
  • GroupWorker : Worker это совокупность других работников.
  • Handler держится за Worker сделать какую-то работу.

Тогда я звоню нескольким std::async методы, в которых я создаю рабочих и обработчик, я вызываю обработчик во вложенном std::async позвони и жду инициализации (std::condition_variable здесь) работника, а затем я останавливаю обработчик.

В конце я жду все std::futureс, чтобы закончить.

Код выглядит следующим образом:

#include <stdio.h>
#include <future>
#include <array>
#include <atomic>
#include <vector>

struct Worker
{
virtual ~Worker() { }
virtual void Init() = 0;
virtual void Work() = 0;
};

struct BasicWorker : public Worker
{
virtual ~BasicWorker() { }
virtual void Init()
{
printf("\t\t\t\tInit: %d\n", std::this_thread::get_id());
}

virtual void Work()
{
printf("\t\t\t\tWork: %d\n", std::this_thread::get_id());
}
};

struct GroupWorker : public Worker
{
GroupWorker()
{
workers.push_back(std::make_shared<BasicWorker>());
}

virtual ~GroupWorker() { }

virtual void Init()
{
for(int i = 0; i < workers.size(); ++i)
{
workers[i]->Init();
}
initEvent.notify_all();
}

virtual void Work()
{
for(int i = 0; i < workers.size(); ++i)
{
workers[i]->Work();
}
}

void WaitForInit()
{
//std::unique_lock<std::mutex> initLock(initMutex);
//initEvent.wait(initLock);
}
private:
std::mutex initMutex;
std::condition_variable initEvent;
std::vector<std::shared_ptr<Worker>> workers;
};

struct Handler
{
static const int Stopped = -1;
static const int Ready = 0;
static const int Running = 1;

Handler(const std::shared_ptr<Worker>& worker) :
worker(worker)
{ }

void Start(int count)
{
int readyValue = Ready;
if(working.compare_exchange_strong(readyValue, Running))
{
worker->Init();

for(int i = 0; i < count && working == Running; ++i)
{
worker->Work();
}
}
}

void Stop()
{
working = Stopped;
}
private:
std::atomic<int> working;
std::shared_ptr<Worker> worker;
};

std::future<void> Start(int jobIndex, int runCount)
{
//printf("Start: %d\n", jobIndex);
return std::async(std::launch::async, [=]()
{
printf("Async: %d\n", jobIndex);
auto worker = std::make_shared<GroupWorker>();
auto handler = std::make_shared<Handler>(worker);

auto result = std::async(std::launch:async, [=]()
{
printf("Nested async: %d\n", jobIndex);
handler->Start(runCount);
});

worker->WaitForInit();
handler->Stop();

result.get();
});
}

int main()
{
const int JobCount = 300;
const int RunCount =  5;
std::array<std::future<void>, JobCount> jobs;

for(int i = 0; i < JobCount; ++i)
{
jobs[i] = Start(i, RunCount);
}

for(int i = 0; i < JobCount; ++i)
{
jobs[i].get();
}
}

Моя проблема:

  • Если я раскомментирую строки в WaitForInit@GroupWorker функции, то мои вложенные вызовы асинхронных функций не выполняются, пока не будут выполнены все вызовы асинхронных функций первого уровня
  • Пока жду std::condition_variable если я увеличу количество рабочих мест, создание новых потоков будет выглядеть как экспоненциально медленное. Для моей пробной версии ниже 100 рабочих мест существует некоторая асинхронность, но выше 300 это совершенно последовательно для создания рабочих мест.
  • Тогда, если я раскомментирую printf линия в Start метод, вся вложенная асинхронность работает как шарм

Так,

  • Что я делаю не так в использовании std::condition_variable?
  • Почему создание рабочих мест происходит медленнее для сотен потоков? (этот вопрос не является обязательным, кажется проблемой ОС и может быть исправлен с помощью концепции интеллектуального пула потоков)
  • У чего имеется printf делать с этим? (Я пытался удалить все printf звонки в случае состояния гонки, и я поставил точку останова в коде, но без помощи. То же самое с std::cout тоже)

Редактировать:
Я добавил политику запуска (как предложено Джонатаном Уэйкли) для обеспечения создания потока. Но это тоже не помогло. Я в настоящее время создаю std::thread и звонит thread::join функция ожидания внутри первого уровня асинхронна.

2

Решение

Нотабене нормально звонить printf, но не предполагать std::thread::id конвертируется в int, Вы можете сделать это немного более портативным, вот так:

inline long tol(std::thread::id id)
{
std::ostringstream ss;
ss << id;
return stol(ss.str());
}

(Это все еще предполагает строковое значение std::thread::id может быть преобразован в long, который не требуется, но более вероятен, чем предполагаемое неявное преобразование в int)

Что я делаю неправильно при использовании std :: condition_variable?

У вас нет «условия», которого вы ждете, и нет синхронизации, чтобы обеспечить вызов notify_all происходит до звонков wait, У вас должна быть переменная-член, которая говорит «этот работник был init’d», который устанавливается Initи ожидать переменную условия только в том случае, если она неверна (этот флаг должен быть атомарным или защищаться мьютексом, чтобы предотвратить скачки данных).

Почему создание рабочих мест происходит медленнее для сотен потоков? (этот вопрос не является обязательным, кажется проблемой ОС и может быть исправлен с помощью концепции интеллектуального пула потоков)

Поскольку с сотнями потоков существует много разногласий по поводу общих ресурсов и большое давление на планировщик ОС, поэтому реализация, вероятно, решит начать возвращать отложенные функции (т. Е. Как будто std::async был вызван с std::launch::deferred) вместо асинхронных. Ваш код предполагает async не будет возвращать отложенные функции, потому что, если асинхронный рабочий и его вложенный асинхронный рабочий оба выполняются как отложенные функции, программа может заблокироваться, потому что внешние функциональные блоки ожидают вызова вложенного. Init но вложенная функция никогда не запускается, пока не вызовет out result.get(), Ваша программа не переносима и работает только на Windows, потому что (если я правильно понимаю) MSVC async использует пул потоков для кражи работы, который будет запускать отложенную функцию, если для нее станет доступен поток. Это не требуется стандартом. Если вы хотите, чтобы у каждого работника был новый поток, используйте std::launch::async политика.

Какое отношение имеет printf к этому? (Я попытался удалить все вызовы printf в случае состояния гонки, и я поставил точку останова в коде, но без помощи. Это также относится и к std :: cout)

Это налагает небольшую задержку и, возможно, некоторую форму ненадежного упорядочения между потоками, так как они сейчас конкурируют и, возможно, борются за единый глобальный ресурс. Задержка, налагаемая printf может быть достаточно для завершения одного потока, который высвобождает свои ресурсы в пул потоков и позволяет запустить другой асинхронный рабочий.

1

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]