ASIO — Как остановить простой сервер на основе сопрограмм?

У меня есть следующий простой сервер на основе сопрограмм:

class Server
{
private:
boost::asio::io_service        Service;
boost::asio::ip::tcp::acceptor Acceptor;
boost::asio::ip::tcp::socket   Socket;

private:
void Accept(boost::asio::yield_context Yield);
void Write(boost::asio::yield_context Yield);

public:
Server(): Acceptor(Service), Socket(Service) {}
void Open(unsigned short PortNum);
void Run();
void Stop();
};

void Server::Accept(boost::asio::yield_context Yield)
{
boost::system::error_code ec;

for (;;)
{
Socket.close();
Acceptor.async_accept(Socket,Yield[ec]);
spawn(Yield,std::bind(&Server::Write,this,Yield[ec]));
}
}

void Server::Write(boost::asio::yield_context Yield)
{
char InBuffer[1024]= {};
std::size_t Size;
boost::system::error_code ec;
double Data= 6.66;

for (;;)
{
boost::asio::streambuf OutBuffer;
std::ostream os(&OutBuffer);

Size= Socket.async_read_some(boost::asio::buffer(InBuffer),Yield[ec]);
if (ec)
break;
os.write(reinterpret_cast<const char *>(&Data),sizeof(double));
Socket.async_write_some(OutBuffer.data(),Yield[ec]);
if (ec)
break;
}
}

void Server::Open(unsigned short PortNum)
{
Acceptor.open(boost::asio::ip::tcp::v4());
Acceptor.bind({{},PortNum});
Acceptor.listen();
}

void Server::Run()
{
spawn(Service,std::bind(&Server::Accept,this,std::placeholders::_1));
Service.run();
}

void Server::Stop()
{
Service.stop();
}

Я хочу запустить этот сервер в потоке и аккуратно остановить его, когда основная программа завершает работу:

int main()
{
Server s;

s.Open(1024);

std::thread Thread(&Server::Run,&s);

Sleep(10'000);
s.Stop();
Thread.join();
}

К сожалению, если есть подключенная розетка, когда я звоню Stop исключение boost::coroutines::detail::forced_unwind брошен

Я также пытался создать явный strand и отправка Socket.close() до остановки с тем же результатом.

Что-то не так с этим подходом?

1

Решение

У меня проблемы с попыткой изящно остановить аналогичный сервер (stackoverflow.com/questions/50833730/…). — metalfox 4 часа назад

Вот минимальное изменение, которое показывает, как справиться

  • команда выхода, закрывающая сеанс
  • команда выключения, которая закрывает сервер (поэтому он прекращает принимать соединения и завершает работу после завершения последнего сеанса)

Жить на Колиру

#include <boost/asio.hpp>
#include <iostream>

using boost::asio::ip::tcp;
using boost::system::error_code;
using boost::asio::streambuf;

int main() {
boost::asio::io_service svc;

tcp::acceptor a(svc);
a.open(tcp::v4());
a.set_option(tcp::acceptor::reuse_address(true));
a.bind({{}, 6767}); // bind to port 6767 on localhost
a.listen(5);

using session = std::shared_ptr<tcp::socket>;

std::function<void()>        do_accept;
std::function<void(session)> do_session;

do_session = [&](session s) {
// do a read
auto buf = std::make_shared<boost::asio::streambuf>();
async_read_until(*s, *buf, "\n", [&,s,buf](error_code ec, size_t /*bytes*/) {
if (ec)
std::cerr << "read failed: " << ec.message() << "\n";
else {
std::istream is(buf.get());
std::string line;
while (getline(is, line)) // FIXME being sloppy with partially read lines
{
async_write(*s, boost::asio::buffer("Ack\n", 4), [&,s,buf](error_code ec, size_t) {
if (ec) std::cerr << "write failed: " << ec.message() << "\n";
});
if (line == "Exit") {
std::cout << "Exit received\n";
return;
}
if (line == "Shutdown") {
std::cout << "Server shutdown requested\n";
a.close();
return;
}
}

do_session(s); // full duplex, can read while writing, using a second buffer
}

});
};

do_accept = [&] {
auto s = std::make_shared<session::element_type>(svc);

a.async_accept(*s, [&,s](error_code ec) {
if (ec)
std::cerr << "accept failed: " << ec.message() << "\n";
else {
do_session(s);
do_accept(); // accept the next
}
});
};

do_accept(); // kick-off
svc.run();   // wait for shutdown (Ctrl-C or failure)
}

Обратите внимание на примеры сессий

echo -en "hello world\nExit\n"     | netcat 127.0.0.1 6767
echo -en "hello world\nShutdown\n" | netcat 127.0.0.1 6767

печать

Ack
Ack
Ack
Ack
Exit received
Server shutdown requested
accept failed: Operation canceled

Завершение команды

Если вы хотите команду «Завершить», активно закрывает все открытые сессии а также выключает сервер, вам придется

  • вести список сессий
  • или используйте сигнал

Вы можете увидеть код для обоих подходов здесь: Увеличить ASIO: отправить сообщение всем подключенным клиентам

Самый простой способ интеграции с текущим образцом:

Жить на Колиру

#include <boost/asio.hpp>
#include <iostream>
#include <list>

using boost::asio::ip::tcp;
using boost::system::error_code;
using boost::asio::streambuf;

int main() {
boost::asio::io_service svc;

tcp::acceptor a(svc);
a.open(tcp::v4());
a.set_option(tcp::acceptor::reuse_address(true));
a.bind({{}, 6767}); // bind to port 6767 on localhost
a.listen(5);

using session = std::shared_ptr<tcp::socket>;
using sessref = std::weak_ptr<tcp::socket>;

std::function<void()>        do_accept;
std::function<void(session)> do_session;
std::list<sessref> session_list;

auto garbage_collect_sessions = [&session_list] {
session_list.remove_if(std::mem_fn(&sessref::expired));
};

do_session = [&](session s) {
// do a read
auto buf = std::make_shared<boost::asio::streambuf>();
async_read_until(*s, *buf, "\n", [&,s,buf](error_code ec, size_t /*bytes*/) {
if (ec)
std::cerr << "read failed: " << ec.message() << "\n";
else {
std::istream is(buf.get());
std::string line;
while (getline(is, line)) // FIXME being sloppy with partially read lines
{
async_write(*s, boost::asio::buffer("Ack\n", 4), [&,s,buf](error_code ec, size_t) {
if (ec) std::cerr << "write failed: " << ec.message() << "\n";
});
if (line == "Exit") {
std::cout << "Exit received\n";
return;
}
if (line == "Shutdown") {
std::cout << "Server shutdown requested\n";
a.close();
return;
}
if (line == "Terminate") {
std::cout << "Server termination requested\n";
a.close();
for (auto wp : session_list) {
if (auto session = wp.lock())
session->close();
}
return;
}
}

do_session(s); // full duplex, can read while writing, using a second buffer
}

});
};

do_accept = [&] {
auto s = std::make_shared<session::element_type>(svc);

a.async_accept(*s, [&,s](error_code ec) {
if (ec)
std::cerr << "accept failed: " << ec.message() << "\n";
else {
garbage_collect_sessions();

session_list.push_back(s);
do_session(s);
do_accept(); // accept the next
}
});
};

do_accept(); // kick-off
svc.run();   // wait for shutdown (Ctrl-C or failure)
}

Который, очевидно, использует session_list реализовать "Terminate" команда:

if (line == "Terminate") {
std::cout << "Server termination requested\n";
a.close();
for (auto wp : session_list) {
if (auto session = wp.lock())
session->close();
}
return;
}
0

Другие решения

Других решений пока нет …

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector