Модель расчета актера с использованием boost :: thread

Я пытаюсь реализовать модель вычисления Actor над потоками на C ++, используя boost :: thread.
Но программа выдает странное исключение во время выполнения. Исключение не стабильно, и иногда программа работает правильно.

Там мой код:

actor.hpp

class Actor {

public:
typedef boost::function<int()> Job;

private:
std::queue<Job>             d_jobQueue;
boost::mutex                d_jobQueueMutex;
boost::condition_variable   d_hasJob;
boost::atomic<bool>         d_keepWorkerRunning;
boost::thread               d_worker;

void workerThread();

public:
Actor();
virtual ~Actor();

void execJobAsync(const Job& job);

int execJobSync(const Job& job);
};

actor.cpp

namespace {

int executeJobSync(std::string          *error,
boost::promise<int> *promise,
const Actor::Job     *job)
{
int rc = (*job)();

promise->set_value(rc);
return 0;
}

}

void Actor::workerThread()
{
while (d_keepWorkerRunning) try {
Job job;
{
boost::unique_lock<boost::mutex> g(d_jobQueueMutex);

while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}

job = d_jobQueue.front();
d_jobQueue.pop();
}

job();
}
catch (...) {
// Log error
}
}

void Actor::execJobAsync(const Job& job)
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(job);
d_hasJob.notify_one();
}

int Actor::execJobSync(const Job& job)
{
std::string error;
boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();

{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job));
d_hasJob.notify_one();
}

int rc = future.get();

if (rc) {
ErrorUtil::setLastError(rc, error.c_str());
}

return rc;
}

Actor::Actor()
: d_keepWorkerRunning(true)
, d_worker(&Actor::workerThread, this)
{
}

Actor::~Actor()
{
d_keepWorkerRunning = false;
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_hasJob.notify_one();
}
d_worker.join();
}

На самом деле выдается исключение boost :: thread_interrupted в int rc = future.get(); линия. Но я не могу объяснить это исключение. Документы говорят

Выдает: — boost :: thread_interrupted, если результат, связанный с *, не готов в момент вызова, и текущий поток прерывается.

Но мой рабочий поток не может быть прерван.

Когда я использовал gdb и установил «catch throw», я вижу, что обратная трассировка выглядит как

бросить thread_interrupted

повышение :: подробнее :: interruption_checker :: check_for_interruption

повышение :: подробнее :: interruption_checker :: interruption_checker

повышение :: condition_variable :: ждать

повышение :: подробнее :: future_object_base :: wait_internal

повышение :: детали :: future_object_base :: ждать

повышение :: детали :: future_object :: получить

повышение :: unique_future :: получить

Я изучил источники повышения, но не могу понять, почему interrupt_checker решил, что рабочий поток прерывается.

Так что кто-то гуру C ++, пожалуйста, помогите мне. Что мне нужно сделать, чтобы получить правильный код?
Я использую:

повысить 1_53

Версия Linux 2.6.18-194.32.1.el5 Red Hat 4.1.2-48

gcc 4.7

РЕДАКТИРОВАТЬ

Починил это! Спасибо Евгению Панасюку и Лазину. Проблема была в TLS
управление. boost :: thread и boost :: thread_specific_ptr используют
такое же хранилище TLS для своих целей. В моем случае была проблема, когда
они оба пытались изменить это хранилище при создании (К сожалению, я
не понимаю, почему в деталях это происходит). Так что TLS испортился.

Я заменил boost :: thread_specific_ptr из моего кода на __thread
указанная переменная.

Оффтоп: во время отладки я обнаружил повреждение памяти во внешней библиотеке
и исправил это =)

.

РЕДАКТИРОВАТЬ 2
У меня есть точная проблема … Это ошибка в GCC =)
Флаг компиляции _GLIBCXX_DEBUG нарушает ABI.
Вы можете увидеть обсуждение багтрекера Boost:
https://svn.boost.org/trac/boost/ticket/7666

8

Решение

Я нашел несколько ошибок:


Actor::workerThread функция выполняет двойную разблокировку d_jobQueueMutex, Первая разблокировка производится вручную d_jobQueueMutex.unlock();Второе находится в деструкторе boost::unique_lock<boost::mutex>,

Вы должны предотвратить одно из разблокировки, например релиз связь между unique_lock а также mutex:

g.release(); // <------------ PATCH
d_jobQueueMutex.unlock();

Или добавьте дополнительный блок кода + построенный по умолчанию Job,


Возможно, что workerThread никогда не покинет следующий цикл:

while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}

Представьте себе следующий случай: d_jobQueue пустой, Actor::~Actor() вызывается, устанавливает флаг и уведомляет рабочий поток:

d_keepWorkerRunning = false;
d_hasJob.notify_one();

workerThread просыпается в цикле while, видит, что очередь пуста и снова спит.

Обычной практикой является отправка специального финального задания для остановки рабочего потока:

~Actor()
{
execJobSync([this]()->int
{
d_keepWorkerRunning = false;
return 0;
});
d_worker.join();
}

В этом случае, d_keepWorkerRunning не обязательно должен быть атомным.


LIVE DEMO на Колиру


РЕДАКТИРОВАТЬ:

Я добавил код очереди событий в ваш пример.

У вас есть одновременная очередь в обоих EventQueueImpl а также Actor, но для разных типов. Можно выделить общую часть в отдельную сущность concurrent_queue<T> который работает для любого типа. Было бы намного проще отлаживать и тестировать очередь в одном месте, чем обнаруживать ошибки, разбросанные по разным классам.

Итак, вы можете попробовать использовать это concurrent_queue<T>(на Колиру)

5

Другие решения

Это всего лишь предположение. Я думаю, что какой-то код может вызвать повышение :: протекторного :: прерывания (). Вы можете установить точку останова для этой функции и посмотреть, какой код отвечает за это. Вы можете проверить на прерывание в execJobSync:

int Actor::execJobSync(const Job& job)
{
if (boost::this_thread::interruption_requested())
std::cout << "Interruption requested!" << std::endl;
std::string error;
boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();

Наиболее подозрительным кодом в этом случае является код, имеющий ссылку на объект потока.

Хорошей практикой является информирование вашего прерывания кода boost :: thread в любом случае. Также возможно отключить прерывание для какой-то сферы.

Если это не так — вам нужно проверить код, который работает с локальным хранилищем потока, потому что флаг прерывания потока хранится в TLS. Может быть, какой-то ваш код переписывает это. Вы можете проверить прерывание до и после такого фрагмента кода.

Другая возможность заключается в том, что ваша память повреждена. Если никакой код не вызывает boost :: thread :: interrupt () и вы не работаете с TLS. Это наиболее сложный случай, попробуйте использовать какой-нибудь динамический анализатор — valgrind или clang memory sanitizer.

Не по теме:
Вам, вероятно, нужно использовать некоторую параллельную очередь. std :: queue будет очень медленным из-за большой нехватки памяти, и вы получите низкую производительность кеша. Хорошая параллельная очередь позволяет вашему коду помещать в очередь и удалять элементы параллельно.

Кроме того, актер не должен выполнять произвольный код. Очередь актера должна получать простые сообщения, а не функции! Вы пишете очередь на работу 🙂 Вам нужно взглянуть на какую-то систему актеров, как Akka или же libcpa.

2

По вопросам рекламы [email protected]