Я пишу приложение, которое имеет очередь событий. Мое намерение состоит в том, чтобы создать это таким образом, чтобы несколько потоков могли писать, и один поток мог читать из очереди, и передать обработку вытолкнутого элемента другому потоку, чтобы последующее всплывающее окно снова не было заблокировано. Я использовал блокировку и переменную условия для выталкивания и извлечения элементов из очереди:
void Publisher::popEvent(boost::shared_ptr<Event>& event) {
boost::mutex::scoped_lock lock(queueMutex);
while(eventQueue.empty())
{
queueConditionVariable.wait(lock);
}
event = eventQueue.front();
eventQueue.pop();
lock.unlock();
}
void Publisher::pushEvent(boost::shared_ptr<Event> event) {
boost::mutex::scoped_lock lock(queueMutex);
eventQueue.push(event);
lock.unlock();
queueConditionVariable.notify_one();
}
В конструкторе класса Publisher (создается только один экземпляр) я запускаю один поток, который будет повторять цикл до тех пор, пока не будет захвачен notify_one (), а затем запускает другой поток для обработки события, извлеченного из очереди :
В конструкторе:
publishthreadGroup = boost::shared_ptr<boost::thread_group> (new boost::thread_group());
publishthreadGroup->create_thread(boost::bind(queueProcessor, this));
Метод queueProcessor:
void queueProcessor(Publisher* agent) {
while(true) {
boost::shared_ptr<Event> event;
agent->getEvent(event);
agent->publishthreadGroup->create_thread(boost::bind(dispatcher, agent, event));
}
}
и в способе диспетчера соответствующая обработка выполняется, и обработанная информация публикуется на сервере через экономию средств. В другом методе, вызываемом до того, как программа существует, которая находится в главном потоке, я вызываю join_all (), чтобы основной поток ожидал завершения потоков.
В этой реализации после создания потока для диспетчера в цикле while, описанном выше, я столкнулся с тупиком / зависанием. Работающий код, похоже, застрял. В чем проблема в этой реализации? И есть ли более чистый и лучший способ делать то, что я пытаюсь сделать? (Несколько производителей и один потребительский поток, повторяющийся в очереди и передающий обработку элемента другому потоку)
Спасибо!
Кажется, что queueProcessor
функция будет работать вечно, и поток, выполняющий ее, никогда не завершится. Любые потоки, созданные этой функцией, выполнят свою работу и завершатся, но этот поток — первый, созданный в publishthreadGroup
— имеет while(true)
цикл, который не имеет возможности остановить. Таким образом, призыв к join_all()
буду ждать вечно. Можете ли вы создать какую-нибудь другую переменную-флаг, которая запускает эту функцию для выхода из цикла и возврата? Это должно делать свое дело!
Других решений пока нет …