Я пытаюсь реализовать модель вычисления Actor над потоками на C ++, используя boost :: thread.
Но программа выдает странное исключение во время выполнения. Исключение не стабильно, и иногда программа работает правильно.
Там мой код:
actor.hpp
class Actor {
public:
typedef boost::function<int()> Job;
private:
std::queue<Job> d_jobQueue;
boost::mutex d_jobQueueMutex;
boost::condition_variable d_hasJob;
boost::atomic<bool> d_keepWorkerRunning;
boost::thread d_worker;
void workerThread();
public:
Actor();
virtual ~Actor();
void execJobAsync(const Job& job);
int execJobSync(const Job& job);
};
actor.cpp
namespace {
int executeJobSync(std::string *error,
boost::promise<int> *promise,
const Actor::Job *job)
{
int rc = (*job)();
promise->set_value(rc);
return 0;
}
}
void Actor::workerThread()
{
while (d_keepWorkerRunning) try {
Job job;
{
boost::unique_lock<boost::mutex> g(d_jobQueueMutex);
while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}
job = d_jobQueue.front();
d_jobQueue.pop();
}
job();
}
catch (...) {
// Log error
}
}
void Actor::execJobAsync(const Job& job)
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(job);
d_hasJob.notify_one();
}
int Actor::execJobSync(const Job& job)
{
std::string error;
boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job));
d_hasJob.notify_one();
}
int rc = future.get();
if (rc) {
ErrorUtil::setLastError(rc, error.c_str());
}
return rc;
}
Actor::Actor()
: d_keepWorkerRunning(true)
, d_worker(&Actor::workerThread, this)
{
}
Actor::~Actor()
{
d_keepWorkerRunning = false;
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_hasJob.notify_one();
}
d_worker.join();
}
На самом деле выдается исключение boost :: thread_interrupted в int rc = future.get();
линия. Но я не могу объяснить это исключение. Документы говорят
Выдает: — boost :: thread_interrupted, если результат, связанный с *, не готов в момент вызова, и текущий поток прерывается.
Но мой рабочий поток не может быть прерван.
Когда я использовал gdb и установил «catch throw», я вижу, что обратная трассировка выглядит как
бросить thread_interrupted
повышение :: подробнее :: interruption_checker :: check_for_interruption
повышение :: подробнее :: interruption_checker :: interruption_checker
повышение :: condition_variable :: ждать
повышение :: подробнее :: future_object_base :: wait_internal
повышение :: детали :: future_object_base :: ждать
повышение :: детали :: future_object :: получить
повышение :: unique_future :: получить
Я изучил источники повышения, но не могу понять, почему interrupt_checker решил, что рабочий поток прерывается.
Так что кто-то гуру C ++, пожалуйста, помогите мне. Что мне нужно сделать, чтобы получить правильный код?
Я использую:
повысить 1_53
Версия Linux 2.6.18-194.32.1.el5 Red Hat 4.1.2-48
gcc 4.7
РЕДАКТИРОВАТЬ
Починил это! Спасибо Евгению Панасюку и Лазину. Проблема была в TLS
управление. boost :: thread и boost :: thread_specific_ptr используют
такое же хранилище TLS для своих целей. В моем случае была проблема, когда
они оба пытались изменить это хранилище при создании (К сожалению, я
не понимаю, почему в деталях это происходит). Так что TLS испортился.Я заменил boost :: thread_specific_ptr из моего кода на __thread
указанная переменная.Оффтоп: во время отладки я обнаружил повреждение памяти во внешней библиотеке
и исправил это =)
.
РЕДАКТИРОВАТЬ 2
У меня есть точная проблема … Это ошибка в GCC =)
Флаг компиляции _GLIBCXX_DEBUG нарушает ABI.
Вы можете увидеть обсуждение багтрекера Boost:
https://svn.boost.org/trac/boost/ticket/7666
Я нашел несколько ошибок:
Actor::workerThread
функция выполняет двойную разблокировку d_jobQueueMutex
, Первая разблокировка производится вручную d_jobQueueMutex.unlock();
Второе находится в деструкторе boost::unique_lock<boost::mutex>
,
Вы должны предотвратить одно из разблокировки, например релиз связь между unique_lock
а также mutex
:
g.release(); // <------------ PATCH
d_jobQueueMutex.unlock();
Или добавьте дополнительный блок кода + построенный по умолчанию Job
,
Возможно, что workerThread
никогда не покинет следующий цикл:
while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}
Представьте себе следующий случай: d_jobQueue
пустой, Actor::~Actor()
вызывается, устанавливает флаг и уведомляет рабочий поток:
d_keepWorkerRunning = false;
d_hasJob.notify_one();
workerThread
просыпается в цикле while, видит, что очередь пуста и снова спит.
Обычной практикой является отправка специального финального задания для остановки рабочего потока:
~Actor()
{
execJobSync([this]()->int
{
d_keepWorkerRunning = false;
return 0;
});
d_worker.join();
}
В этом случае, d_keepWorkerRunning
не обязательно должен быть атомным.
РЕДАКТИРОВАТЬ:
Я добавил код очереди событий в ваш пример.
У вас есть одновременная очередь в обоих EventQueueImpl
а также Actor
, но для разных типов. Можно выделить общую часть в отдельную сущность concurrent_queue<T>
который работает для любого типа. Было бы намного проще отлаживать и тестировать очередь в одном месте, чем обнаруживать ошибки, разбросанные по разным классам.
Итак, вы можете попробовать использовать это concurrent_queue<T>
(на Колиру)
Это всего лишь предположение. Я думаю, что какой-то код может вызвать повышение :: протекторного :: прерывания (). Вы можете установить точку останова для этой функции и посмотреть, какой код отвечает за это. Вы можете проверить на прерывание в execJobSync
:
int Actor::execJobSync(const Job& job)
{
if (boost::this_thread::interruption_requested())
std::cout << "Interruption requested!" << std::endl;
std::string error;
boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();
Наиболее подозрительным кодом в этом случае является код, имеющий ссылку на объект потока.
Хорошей практикой является информирование вашего прерывания кода boost :: thread в любом случае. Также возможно отключить прерывание для какой-то сферы.
Если это не так — вам нужно проверить код, который работает с локальным хранилищем потока, потому что флаг прерывания потока хранится в TLS. Может быть, какой-то ваш код переписывает это. Вы можете проверить прерывание до и после такого фрагмента кода.
Другая возможность заключается в том, что ваша память повреждена. Если никакой код не вызывает boost :: thread :: interrupt () и вы не работаете с TLS. Это наиболее сложный случай, попробуйте использовать какой-нибудь динамический анализатор — valgrind или clang memory sanitizer.
Не по теме:
Вам, вероятно, нужно использовать некоторую параллельную очередь. std :: queue будет очень медленным из-за большой нехватки памяти, и вы получите низкую производительность кеша. Хорошая параллельная очередь позволяет вашему коду помещать в очередь и удалять элементы параллельно.
Кроме того, актер не должен выполнять произвольный код. Очередь актера должна получать простые сообщения, а не функции! Вы пишете очередь на работу 🙂 Вам нужно взглянуть на какую-то систему актеров, как Akka или же libcpa.