у меня есть Worker
классы и Handler
Класс для создания слоя абстракции для рабочих мест уже. Я хотел использовать std::async
добавить немного асинхронности в микс, но я получил странное поведение от моей Visual Studio 2012 (обновление 1).
Моя классовая иерархия выглядит следующим образом:
Worker
абстрактный класс с Init
а также Work
как чисто виртуальные методы.BasicWorker : Worker
просто использует printf
для некоторого вывода.GroupWorker : Worker
это совокупность других работников. Handler
держится за Worker
сделать какую-то работу.Тогда я звоню нескольким std::async
методы, в которых я создаю рабочих и обработчик, я вызываю обработчик во вложенном std::async
позвони и жду инициализации (std::condition_variable
здесь) работника, а затем я останавливаю обработчик.
В конце я жду все std::future
с, чтобы закончить.
Код выглядит следующим образом:
#include <stdio.h>
#include <future>
#include <array>
#include <atomic>
#include <vector>
struct Worker
{
virtual ~Worker() { }
virtual void Init() = 0;
virtual void Work() = 0;
};
struct BasicWorker : public Worker
{
virtual ~BasicWorker() { }
virtual void Init()
{
printf("\t\t\t\tInit: %d\n", std::this_thread::get_id());
}
virtual void Work()
{
printf("\t\t\t\tWork: %d\n", std::this_thread::get_id());
}
};
struct GroupWorker : public Worker
{
GroupWorker()
{
workers.push_back(std::make_shared<BasicWorker>());
}
virtual ~GroupWorker() { }
virtual void Init()
{
for(int i = 0; i < workers.size(); ++i)
{
workers[i]->Init();
}
initEvent.notify_all();
}
virtual void Work()
{
for(int i = 0; i < workers.size(); ++i)
{
workers[i]->Work();
}
}
void WaitForInit()
{
//std::unique_lock<std::mutex> initLock(initMutex);
//initEvent.wait(initLock);
}
private:
std::mutex initMutex;
std::condition_variable initEvent;
std::vector<std::shared_ptr<Worker>> workers;
};
struct Handler
{
static const int Stopped = -1;
static const int Ready = 0;
static const int Running = 1;
Handler(const std::shared_ptr<Worker>& worker) :
worker(worker)
{ }
void Start(int count)
{
int readyValue = Ready;
if(working.compare_exchange_strong(readyValue, Running))
{
worker->Init();
for(int i = 0; i < count && working == Running; ++i)
{
worker->Work();
}
}
}
void Stop()
{
working = Stopped;
}
private:
std::atomic<int> working;
std::shared_ptr<Worker> worker;
};
std::future<void> Start(int jobIndex, int runCount)
{
//printf("Start: %d\n", jobIndex);
return std::async(std::launch::async, [=]()
{
printf("Async: %d\n", jobIndex);
auto worker = std::make_shared<GroupWorker>();
auto handler = std::make_shared<Handler>(worker);
auto result = std::async(std::launch:async, [=]()
{
printf("Nested async: %d\n", jobIndex);
handler->Start(runCount);
});
worker->WaitForInit();
handler->Stop();
result.get();
});
}
int main()
{
const int JobCount = 300;
const int RunCount = 5;
std::array<std::future<void>, JobCount> jobs;
for(int i = 0; i < JobCount; ++i)
{
jobs[i] = Start(i, RunCount);
}
for(int i = 0; i < JobCount; ++i)
{
jobs[i].get();
}
}
Моя проблема:
WaitForInit@GroupWorker
функции, то мои вложенные вызовы асинхронных функций не выполняются, пока не будут выполнены все вызовы асинхронных функций первого уровняstd::condition_variable
если я увеличу количество рабочих мест, создание новых потоков будет выглядеть как экспоненциально медленное. Для моей пробной версии ниже 100 рабочих мест существует некоторая асинхронность, но выше 300 это совершенно последовательно для создания рабочих мест.printf
линия в Start
метод, вся вложенная асинхронность работает как шармТак,
std::condition_variable
?printf
делать с этим? (Я пытался удалить все printf
звонки в случае состояния гонки, и я поставил точку останова в коде, но без помощи. То же самое с std::cout
тоже)Редактировать:
Я добавил политику запуска (как предложено Джонатаном Уэйкли) для обеспечения создания потока. Но это тоже не помогло. Я в настоящее время создаю std::thread
и звонит thread::join
функция ожидания внутри первого уровня асинхронна.
Нотабене нормально звонить printf
, но не предполагать std::thread::id
конвертируется в int
, Вы можете сделать это немного более портативным, вот так:
inline long tol(std::thread::id id)
{
std::ostringstream ss;
ss << id;
return stol(ss.str());
}
(Это все еще предполагает строковое значение std::thread::id
может быть преобразован в long
, который не требуется, но более вероятен, чем предполагаемое неявное преобразование в int
)
Что я делаю неправильно при использовании std :: condition_variable?
У вас нет «условия», которого вы ждете, и нет синхронизации, чтобы обеспечить вызов notify_all
происходит до звонков wait
, У вас должна быть переменная-член, которая говорит «этот работник был init’d», который устанавливается Init
и ожидать переменную условия только в том случае, если она неверна (этот флаг должен быть атомарным или защищаться мьютексом, чтобы предотвратить скачки данных).
Почему создание рабочих мест происходит медленнее для сотен потоков? (этот вопрос не является обязательным, кажется проблемой ОС и может быть исправлен с помощью концепции интеллектуального пула потоков)
Поскольку с сотнями потоков существует много разногласий по поводу общих ресурсов и большое давление на планировщик ОС, поэтому реализация, вероятно, решит начать возвращать отложенные функции (т. Е. Как будто std::async
был вызван с std::launch::deferred
) вместо асинхронных. Ваш код предполагает async
не будет возвращать отложенные функции, потому что, если асинхронный рабочий и его вложенный асинхронный рабочий оба выполняются как отложенные функции, программа может заблокироваться, потому что внешние функциональные блоки ожидают вызова вложенного. Init
но вложенная функция никогда не запускается, пока не вызовет out result.get()
, Ваша программа не переносима и работает только на Windows, потому что (если я правильно понимаю) MSVC async
использует пул потоков для кражи работы, который будет запускать отложенную функцию, если для нее станет доступен поток. Это не требуется стандартом. Если вы хотите, чтобы у каждого работника был новый поток, используйте std::launch::async
политика.
Какое отношение имеет printf к этому? (Я попытался удалить все вызовы printf в случае состояния гонки, и я поставил точку останова в коде, но без помощи. Это также относится и к std :: cout)
Это налагает небольшую задержку и, возможно, некоторую форму ненадежного упорядочения между потоками, так как они сейчас конкурируют и, возможно, борются за единый глобальный ресурс. Задержка, налагаемая printf
может быть достаточно для завершения одного потока, который высвобождает свои ресурсы в пул потоков и позволяет запустить другой асинхронный рабочий.
Других решений пока нет …