Я экспериментирую с clang-5 и его реализацией TS сопрограммы.
Я пытаюсь использовать его с boost asio, но сталкиваюсь с проблемами, похоже, мой фрейм стека сопрограмм разрушается дважды, но мне не удается понять, что я делаю неправильно.
Это мой упрощенный сценарий воспроизведения:
#include <experimental/coroutine>
#include <boost/asio.hpp>
#include <iostream>
using namespace boost::asio;
inline auto async_connect(ip::tcp::socket& s, const ip::tcp::endpoint& ep)
{
struct Awaiter
{
ip::tcp::socket& s;
ip::tcp::endpoint ep;
boost::system::error_code ec;
bool ready = false;
Awaiter(ip::tcp::socket& s, const ip::tcp::endpoint& ep) : s(s) , ep(ep) {}
bool await_ready() { return ready; }
void await_resume()
{
if (ec)
{
throw boost::system::system_error(ec);
}
}
void await_suspend(std::experimental::coroutine_handle<> coro)
{
s.async_connect(this->ep, [this, coro] (auto ec) mutable {
this->ready = true;
this->ec = ec;
std::cout << "Connect ready: resume " << coro.address() << "\n";
coro.resume();
});
std::cout << "Connect initiated\n";
}
};
return Awaiter(s, ep);
}
struct Future
{
struct promise_type
{
bool _ready = false;
std::experimental::coroutine_handle<> _waiter = nullptr;
Future get_return_object()
{
auto coro = std::experimental::coroutine_handle<promise_type>::from_promise(*this);
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << coro.address() << "]\n";
return Future(coro);
}
auto initial_suspend()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
return std::experimental::suspend_never();
}
auto final_suspend()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
return std::experimental::suspend_always();
}
void return_void()
{
_ready = true;
if (_waiter)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "] resume waiter[" << _waiter.address() << "]\n ";
_waiter.resume();
}
}
void unhandled_exception()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
std::terminate();
}
};
Future() = default;
Future(const Future&) = delete;
Future& operator=(const Future&) = delete;
Future& operator=(Future&& other)
{
_coro = other._coro;
other._coro = nullptr;
return *this;
}
explicit Future(std::experimental::coroutine_handle<promise_type> coro)
: _coro(coro)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
}
Future(Future&& f)
: _coro(f._coro)
{
f._coro = nullptr;
}
~Future()
{
if (_coro)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
_coro.destroy();
//_coro = nullptr;
}
}
bool await_ready()
{
assert(_coro);
std::cout << __FUNCTION__ << ":" << __LINE__ << ": ready " << _coro.promise()._ready << std::endl;
return _coro.promise()._ready;
}
void await_suspend(std::experimental::coroutine_handle<> callerCoro)
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "] waiter [" << callerCoro.address() << "]\n";
_coro.promise()._waiter = callerCoro;
}
void await_resume()
{
std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
}
void* address()
{
return _coro.address();
}
std::experimental::coroutine_handle<promise_type> _coro = nullptr;
};
struct RaiiCheck
{
~RaiiCheck()
{
std::cout << "******** ~RaiiCheck " << this << " ********\n";
}
};
Future asioConnect(ip::tcp::socket& s, const ip::tcp::endpoint& ep)
{
RaiiCheck rc;
s.open(ep.protocol());
co_await async_connect(s, ep);
}
template <typename Awaitable>
Future waitForIt(io_service& io, Awaitable&& awaitable)
{
co_await awaitable;
io.stop();
}
static Future performRequest(io_service& io)
{
try
{
auto addr = ip::tcp::endpoint(ip::address::from_string("8.8.8.8"), 53);
ip::tcp::socket socket(io);
// Works
//socket.open(addr.protocol());
//co_await async_connect(socket, addr);
// Crashes
std::cout << "asioConnect\n";
co_await asioConnect(socket, addr);
assert(socket.is_open());
}
catch (const boost::system::system_error& e)
{
std::cerr << "Failed to parse http response: " << e.what() << "\n";
}
}
int main(int, char**)
{
io_service io;
auto task = performRequest(io);
Future fut;
io.post([&] () {
fut = waitForIt(io, task);
});
io.run();
}
Он генерирует следующий вывод:
get_return_object:51 [0x7fbbeed03c00]
Future:99 [0x7fbbeed03c00]
initial_suspend:57 [0x7fbbeed03c00]
asioConnect
get_return_object:51 [0x7fbbeed03d50]
Future:99 [0x7fbbeed03d50]
initial_suspend:57 [0x7fbbeed03d50]
Connect initiated
await_suspend:129 [0x7fbbeed03d50] waiter [0x7fbbeed03c00]
get_return_object:51 [0x7fbbeec028a0]
Future:99 [0x7fbbeec028a0]
initial_suspend:57 [0x7fbbeec028a0]
await_suspend:129 [0x7fbbeed03c00] waiter [0x7fbbeec028a0]
Connect ready: resume 0x7fbbeed03d50
******** ~RaiiCheck 0x7fbbeed03e14 ********
return_void:69 [0x7fbbeed03d50] resume waiter[0x7fbbeed03c00]
await_resume:135 [0x7fbbeed03d50]
~Future:113 [0x7fbbeed03d50]
******** ~RaiiCheck 0x7fbbeed03e14 ********
return_void:69 [0x7fbbeed03c00] resume waiter[0x7fbbeec028a0]
await_resume:135 [0x7fbbeed03c00]
return_void:69 [0x7fbbeec028a0]
final_suspend:63 [0x7fbbeec028a0]
final_suspend:63 [0x7fbbeed03c00]
final_suspend:63 [0x7fbbeed03d50]
~Future:113 [0x7fbbeec028a0]
~Future:113 [0x7fbbeed03c00]
corotest(74949,0x7fffc163c3c0) malloc: *** error for object 0x7fbbeed03d50: incorrect checksum for freed object - object was probably modified after being freed.
*** set a breakpoint in malloc_error_break to debug
Abort trap: 6
Как вы можете видеть, объект RaiiCheck в функции asioConnect уничтожается дважды.
Объект Future внутри функции asioConnect имеет дескриптор coroutine_handle с адресом 0x7fbbeed03d50. Когда эта сопрограмма возобновляется в обратном вызове завершения s.async_connect, объект RaiiCheck уничтожается впервые. Затем я вижу, как вызывается функция return_void, которая возобновляет родительскую сопрограмму, которая, похоже, снова возобновляет внутреннюю сопрограмму, вызывая очередное уничтожение объекта RaiiCheck.
Похоже, это связано с моим вложением сопрограмм, потому что это работает, если я co_await непосредственно на вызов async_connect вместо того, чтобы помещать вызов async_connect в другую сопрограмму.
Любая подсказка для решения этой проблемы высоко ценится.
Вы должны возобновить работу официанта только после final_suspend()
, не в return_void()
потому что кадр сопрограммы не разматывается раньше final_suspend()
, у вас будут проблемы с вызовом destroy()
прежде чем это наконец приостановлено.
Исправленный код для final_suspend()
выглядит так:
auto final_suspend()
{
struct awaiter
{
std::experimental::coroutine_handle<> _waiter;
bool await_ready() { return false; }
void await_suspend(std::experimental::coroutine_handle<> coro)
{
if (_waiter)
{
_waiter();
}
}
void await_resume() {}
};
return awaiter{_waiter};
}
Других решений пока нет …