как сделать boost :: asio :: spawn с io_service-per-CPU?

Мой сервер основан на ускорить эхо-сервер.

Сервер работает нормально на одноядерном компьютере, даже не один сбой в течение нескольких месяцев. Даже если он занимает 100% процессора, он все равно работает нормально.

Но мне нужно обрабатывать больше клиентских запросов, теперь я использую многоядерный компьютер. Чтобы использовать все процессоры, которые я запускаю io_service на несколько потоков, как это:

#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/write.hpp>
#include <boost/thread/thread.hpp>
#include <iostream>
#include <memory>
#include <thread>
using namespace std;

using boost::asio::ip::tcp;

class session : public std::enable_shared_from_this<session>{
public:
explicit session(tcp::socket socket)
: socket_(std::move(socket)),
timer_(socket_.get_io_service()),
strand_(socket_.get_io_service())
{}

void go()
{
auto self(shared_from_this());
boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
{
try {
char data[1024] = {'3'};
for( ; ;) {
timer_.expires_from_now(std::chrono::seconds(10));
std::size_t n = socket_.async_read_some(boost::asio::buffer(data, sizeof(data)), yield);
// do something with data
// write back something
boost::asio::async_write(socket_, boost::asio::buffer(data, sizeof(data)), yield);
}
} catch(...)   {
socket_.close();
timer_.cancel();
}
});

boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
{
while(socket_.is_open()) {
boost::system::error_code ignored_ec;
timer_.async_wait(yield[ignored_ec]);
if(timer_.expires_from_now() <= std::chrono::seconds(0))
socket_.close();
}
});
}

private:
tcp::socket socket_;
boost::asio::steady_timer timer_;
boost::asio::io_service::strand strand_;
};

int main(int argc, char* argv[]) {
try {

boost::asio::io_service io_service;

boost::asio::spawn(io_service, [&](boost::asio::yield_context yield)
{
tcp::acceptor acceptor(io_service,
#define PORT "7788"tcp::endpoint(tcp::v4(), std::atoi(PORT)));

for( ; ;) {
boost::system::error_code ec;
tcp::socket socket(io_service);
acceptor.async_accept(socket, yield[ec]);
if(!ec)
// std::make_shared<session>(std::move(socket))->go();
io_service.post(boost::bind(&session::go, std::make_shared<session>(std::move(socket))));
}
});

// ----------- this works fine on single-core machine ------------
{
// io_service.run();
}

// ----------- this crashes (with multi core) ----------
{
auto thread_count = std::thread::hardware_concurrency(); // for multi core
boost::thread_group threads;
for(auto i = 0; i < thread_count; ++i)
threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));

threads.join_all();
}

} catch(std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}

return 0;
}

Код отлично работает на одноядерных процессорах, но постоянно вылетает на 2-ядерном / 4-ядерном / 8-ядерном компьютере. Из аварийного дампа я не вижу ничего, связанного с моим кодом, просто что-то с boost :: spawn и каким-то произвольно названным lambda.

Поэтому я хочу попробовать это: Run io_service на процессор.

Я нашел некоторые демонстрация, но он использует асинхронную функцию:

void server::start_accept()
{
new_connection_.reset(new connection(
io_service_pool_.get_io_service(), request_handler_));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
boost::asio::placeholders::error));
}

void server::handle_accept(const boost::system::error_code& e)
{
if (!e)
{
new_connection_->start();
}

start_accept();
}

io_service_pool_.get_io_service() случайно забрать io_service, но мой код использует spawn

boost::asio::spawn(io_service, ...

Как spawn со случайным io_service?

0

Решение

Кажется, я задавал неправильный вопрос, spawn не может работать с несколькими io_service, но socket Можно. Я изменил код для этого:

int main(int argc, char* argv[]) {
try {

boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);

auto core_count = std::thread::hardware_concurrency();
// io_service_pool.hpp and io_service_pool.cpp from boost's example
io_service_pool pool(core_count);

boost::asio::spawn(io_service, [&](boost::asio::yield_context yield)
{
#define PORT "7788"tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), std::atoi(PORT)));

for( ; ;) {
boost::system::error_code ec;
boost::asio::io_service& ios = pool.get_io_service();
tcp::socket socket(ios);
acceptor.async_accept(socket, yield[ec]);
if(!ec)
ios.post(boost::bind(&session::go, std::make_shared<session>(std::move(socket))));
}
});

{ // run all io_service
thread t([&] { pool.run(); });
t.detach();

io_service.run();
}

} catch(std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}

return 0;
}

Теперь сервер больше не падает. Но я до сих пор не знаю, что может вызвать сбой, если я использую один io_service для всех потоков.

0

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]