Что вызывает случайный сбой в boost :: coroutine?

У меня есть многопоточное приложение, которое использует повышение :: ASIO а также повышение :: сопрограмму через его интеграцию в повышение :: ASIO. Каждый поток имеет свой io_service объект. Единственное общее состояние между потоками — это пулы соединений, которые заблокированы мьютекс когда соединение получено или возвращено из / в пул соединений. Когда в пуле не хватает подключений, я нажимаю бесконечно ASIO :: steady_tiemer во внутренней структуре пула и асинхронно жду на нем и я получая из функции куртины. Когда другой поток возвращает соединение с пулом, он проверяет, есть ли таймеры ожидания, он получает таймер ожидания из внутренней структуры, он получает его io_service объект и отправляет лямбду, которая пробуждает таймер, чтобы возобновить приостановленную сопрограмму. У меня случайные сбои в приложении. Я пытаюсь исследовать проблему с Valgrind. Это вызывает некоторые проблемы, но я не могу понять их, потому что они происходят в повышение :: сопрограмму а также повышение :: ASIO Внутренности. Вот фрагменты из моего кода и из Valgrind выход. Может кто-то увидеть и объяснить проблему?

Вот код вызова:

template <class ContextsType>
void executeRequests(ContextsType& avlRequestContexts)
{
AvlRequestDataList allRequests;
for(auto& requestContext : avlRequestContexts)
{
if(!requestContext.pullProvider || !requestContext.toAskGDS())
continue;

auto& requests = requestContext.pullProvider->getRequestsData();
copy(requests.begin(), requests.end(), back_inserter(allRequests));
}

if(allRequests.size() == 0)
return;

boost::asio::io_service ioService;
curl::AsioMultiplexer multiplexer(ioService);

for(auto& request : allRequests)
{
using namespace boost::asio;

spawn(ioService, [&multiplexer, &request](yield_context yield)
{
request->prepare(multiplexer, yield);
});
}

while(true)
{
try
{
VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop.");
ioService.run();
VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished.");
break;
}
catch(const std::exception& e)
{
VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what());
}
catch(...)
{
VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request.");
}
}
}

Здесь prepare Реализация функции, которая вызывается в порожденной лямбде:

void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer,
boost::asio::yield_context yield)
{
auto& ioService = multiplexer.getIoService();
_connection = _pool.getConnection(ioService, yield);
_connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS);

multiplexer.addEasyHandle(_connection->getHandle(),
[this](const curl::EasyHandleResult& result)
{
if(0 == result.responseCode)
returnQuota();
VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse);
_pool.addConnection(std::move(_connection));
});
}void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer,
boost::asio::yield_context yield)
{
try
{
prepareImpl(multiplexer, yield);
}
catch(const std::exception& e)
{
VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what());
returnQuota();
}
catch(...)
{
VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request.");
returnQuota();
}
}

returnQuota функция является чисто виртуальным методом AvlRequestData класс и его реализация для TravelportRequestData класс, который используется во всех моих тестах, следующий:

void returnQuota() const override
{
auto& avlQuotaManager = AvlQuotaManager::getInstance();
avlQuotaManager.consumeQuotaTravelport(-1);
}

Вот От себя а также поп методы пула соединений.

auto AvlConnectionPool::getConnection(
TimerPtr timer,
asio::yield_context yield) -> ConnectionPtr
{
lock_guard<mutex> lock(_mutex);

while(_connections.empty())
{
_timers.emplace_back(timer);
timer->expires_from_now(
asio::steady_timer::clock_type::duration::max());

_mutex.unlock();
coroutineAsyncWait(*timer, yield);
_mutex.lock();
}

ConnectionPtr connection = std::move(_connections.front());
_connections.pop_front();

VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.")
% _connectionPoolName % _connections.size()));

++_connectionsGiven;

return connection;
}

void AvlConnectionPool::addConnection(ConnectionPtr connection,
Side side /* = Back */)
{
lock_guard<mutex> lock(_mutex);

if(Front == side)
_connections.emplace_front(std::move(connection));
else
_connections.emplace_back(std::move(connection));

VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.")
% _connectionPoolName % _connections.size()));

if(_timers.empty())
return;

auto timer = _timers.back();
_timers.pop_back();

auto& ioService = timer->get_io_service();
ioService.post([timer](){ timer->cancel(); });

VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.")
% _connectionPoolName));
}

Это реализация coroutineAsyncWait.

inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
boost::asio::yield_context yield)
{
boost::system::error_code ec;
timer.async_wait(yield[ec]);
if(ec && ec != boost::asio::error::operation_aborted)
throw std::runtime_error(ec.message());
}

И наконец первая часть Valgrind выход:

== 8189 == Тема 41:
== 8189 == Недопустимое чтение размера 8
== 8189 == at 0x995F84: void boost :: coroutines :: detail :: trampoline_push_void, void, boost :: asio :: detail :: coro_entry_point, void (анонимное пространство имен) :: executeRequests>> (std :: vector<(анонимное пространство имен) :: AvlRequestContext, std :: allocator<(анонимное пространство имен) :: AvlRequestContext>>&) :: {lambda (boost :: asio :: basic_yield_context>) # 1}>&, boost :: coroutines :: basic_standard_stack_allocator>> (long) (trampoline_push.hpp: 65)
== 8189 == Адрес 0x2e3b5528 не является стековым, malloc или (недавно) свободным

Когда я использую Valgrind с отладчиком, он останавливается в следующей функции в trampoline_push.hpp в повышение :: сопрограмму библиотека.

53│ template< typename Coro >
54│ void trampoline_push_void( intptr_t vp)
55│ {
56│     typedef typename Coro::param_type   param_type;
57│
58│     BOOST_ASSERT( vp);
59│
60│     param_type * param(
61│         reinterpret_cast< param_type * >( vp) );
62│     BOOST_ASSERT( 0 != param);
63│
64│     Coro * coro(
65├>        reinterpret_cast< Coro * >( param->coro) );
66│     BOOST_ASSERT( 0 != coro);
67│
68│     coro->run();
69│ }

1

Решение

В конечном итоге я обнаружил, что, когда объекты должны быть удалены, boost :: asio не справляется с этим изящно без правильного использования shared_ptr и weak_ptr. Когда происходят сбои, их очень трудно отладить, потому что трудно понять, что делает очередь io_service во время сбоя.

После того, как я недавно выполнил полную асинхронную клиентскую архитектуру и столкнулся со случайными сбоями, у меня есть несколько советов. К сожалению, я не знаю, решат ли они ваши проблемы, но, надеюсь, это обеспечит хороший старт в правильном направлении.

Советы по использованию Asio Coroutine

  1. Используйте boost :: asio :: asio_handler_invoke вместо io_service.post ():

    авто& ioService = timer-> get_io_service ();

    ioService.post (timer {timer-> cancel ();});

    Использование post / dispatch в сопрограмме обычно плохая идея. Всегда используйте asio_handler_invoke при вызове из сопрограммы. В этом случае, однако, вы можете безопасно позвонить timer->cancel() без отправки его в цикл сообщений в любом случае.

  2. Похоже, ваши таймеры не используют объекты shared_ptr. Независимо от того, что происходит в остальной части вашего приложения, невозможно точно знать, когда эти объекты должны быть уничтожены. Я настоятельно рекомендую использовать объекты shared_ptr для всех ваших объектов таймера. Кроме того, любой указатель на методы класса должен использовать shared_from_this() также. Используя равнину this может быть довольно опасным, если this уничтожается (в стеке) или выходит из области видимости где-то еще в shared_ptr. Что бы вы ни делали, не используйте shared_from_this() в конструкторе объекта!

    Если вы получаете сбой, когда выполняется обработчик в io_service, но часть обработчика больше не действительна, это очень сложно отладить. Объект обработчика, который закачивается в io_service, включает в себя любые указатели на таймеры или указатели на объекты, которые могут понадобиться для выполнения обработчика.

    Я настоятельно рекомендую переборщить с объектами shared_ptr, обернутыми вокруг любых классов asio. Если проблема исчезнет, ​​то вероятен порядок ее уничтожения.

  3. Находится ли где-то адрес ошибки в куче или он указывает на стек? Это поможет вам определить, является ли объект выходящим за рамки метода в неподходящее время, или это что-то еще. Например, это доказало мне, что все мои таймеры должны стать объектами shared_ptr даже в рамках одного многопоточного приложения.
2

Другие решения


По вопросам рекламы [email protected]