boost :: asio :: async_read_some запускается в родительском потоке

Я пишу эффективный сокет-сервер. Намерение — хорошая общая пропускная способность. Я использую основной поток в качестве слушателя. Это async_accept клиент и добавить сокет в очередь. Там потоки диспетчера выбирают сокет, который готов к чтению, из очереди и добавляются в очередь одного из рабочих потоков. Я держу пул рабочих потоков. Рабочий поток будет выполнять реальное чтение / запись.

я использую async_accept в моем слушателе. Чтобы узнать, какой сокет готов для чтения, я использую async_read_some в моем диспетчере. Эта идея работает, но с проблемой. мой io_service.run() вызывается в слушателе, поэтому обработчик async_read_some в диспетчере, фактически выполняется в потоке слушателя.

Вот мой код:

using boost::asio::ip::tcp;
using namespace std;

std::queue<std::shared_ptr<tcp::socket>> q_sock;
boost::mutex m_log1;
boost::condition_variable m_cond1;
boost::mutex::scoped_lock m_lock1 = boost::mutex::scoped_lock(m_log1);
sem_t _sem_sock;

enum { max_length1 = 1024 };
char data_1[max_length1];

void handle_read1(std::shared_ptr<tcp::socket> sock, const boost::system::error_code& error,
size_t bytes_transferred)
{
printf("handle_read1 : error : %s : %d, thread id is: %ld, pid : %d \n", error.category().name(), error.value(), (long int)syscall(SYS_gettid), getpid());

boost::asio::write(*(sock.get()), boost::asio::buffer(data_1, bytes_transferred));
}void sock_dispatch() {
int v_size = 0;
std::shared_ptr<tcp::socket> curr_sock;

printf("sock_dispatch started. The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());

while(1) {

while(1) {
sem_wait(&_sem_sock);
v_size = q_sock.size();
sem_post(&_sem_sock);

if(v_size <= 0)
m_cond1.timed_wait(m_lock1,boost::posix_time::milliseconds(5000));
else
break;
}

sem_wait(&_sem_sock);
curr_sock = q_sock.front();
q_sock.pop();
sem_post(&_sem_sock);

curr_sock->async_read_some(boost::asio::buffer(data_1, max_length1),
boost::bind(handle_read1, curr_sock,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}

}

class session
{
public:
session(boost::asio::io_service& io_service)
: sockptr(new tcp::socket(io_service)) {}

void start()
{
printf("START NEW SESSION   The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());

sem_wait(&_sem_sock);

q_sock.push(sockptr);

sem_post(&_sem_sock);

m_cond1.notify_all();
}

std::shared_ptr<tcp::socket> sockptr;
};

class server
{
public:
server(boost::asio::io_service& io_service, short port)
: io_service_(io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
{
session* new_session = new session(io_service_);
acceptor_.async_accept(*(new_session->sockptr.get()),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));

printf("WAITING TO ACCEPT: The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());

}

void handle_accept(session* new_session,
const boost::system::error_code& error)
{
new_session->start();
new_session = new session(io_service_);
acceptor_.async_accept(*(new_session->sockptr.get()),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}

private:
boost::asio::io_service& io_service_;
tcp::acceptor acceptor_;
};

int main(int argc, char* argv[])
{
sem_init(&_sem_sock, 0, 1);

boost::asio::io_service io_service;

using namespace std;
server s(io_service, atoi(argv[1]));

boost::thread t(boost::bind(sock_dispatch));

io_service.run();

return 0;
}

Этот код изменен из примера boost :: asio, http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/async_tcp_echo_server.cpp. И код клиента http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/blocking_tcp_echo_client.cpp.

Когда клиент подключается, вывод сервера:

WAITING TO ACCEPT: The ID of this of this thread is: 3843, pid : 3843
sock_dispatch started. The ID of this of this thread is: 3844, pid : 3843
START NEW SESSION   The ID of this of this thread is: 3843, pid : 3843
handle_read1 : error : system : 0, thread id is: 3843, pid : 3843

В этом случае идентификатор потока диспетчера равен 3944, но handle_read1 выполняется в потоке 3843.
В идеале handle_read1 должен работать в диспетчере, поэтому он не будет блокировать прием в слушателе.

Любая идея, что я должен сделать, чтобы достичь этого? Или вообще есть дизайн лучше :)?

0

Решение

Если вам нужны конкретные обработчики, вызываемые в определенных потоках, используйте разные io_service объекты. Например, acceptor может быть построен с io_service1и розетки могут быть построены с io_service2, Основной поток может затем выполнить io_service1.run()в то время как потоки в пуле потоков выполняют io_service2.run(),

При этом смешивание асинхронных и синхронных функций может быть довольно сложным. В большинстве асинхронных программ, над которыми я работал, редко возникает необходимость выделять поток для определенных асинхронных цепочек.


В целом, я думаю, что концептуальный дизайн в порядке, но у меня есть несколько предложений по реализации:

  • q_sock Код потребителя и производителя представляет собой смесь конструкций более высокого уровня и более низкого уровня. Использование условной переменной немного неидиоматично, и возникает вопрос, почему sem_t используется вместо boost::mutexи замки. Например, следующий код потребителя и производителя:

    // Consumer
    while(1)
    {
    sem_wait(&_sem_sock);
    v_size = q_sock.size();
    sem_post(&_sem_sock);
    
    if (v_size <= 0)
    m_cond1.timed_wait(m_lock1, boost::posix_time::milliseconds(5000));
    else
    break;
    }
    sem_wait(&_sem_sock);
    curr_sock = q_sock.front();
    q_sock.pop();
    sem_post(&_sem_sock);
    
    // Producer
    sem_wait(&_sem_sock);
    q_sock.push(sockptr);
    sem_post(&_sem_sock);
    m_cond1.notify_all();
    

    Может быть переписан без использования sem_tи быть немного более идиоматичным на основе Boost.Thread’s condition_variable документация. Рассмотрим альтернативу:

    // Consumer
    boost::unique_lock<boost::mutex> lock(m_log1);
    while (q_sock.empty())
    {
    m_cond1.wait(lock);
    }
    curr_sock = q_sock.front();
    q_sock.pop();
    lock.unlock();
    
    // Producer
    {
    boost::lock_guard<boost::mutex> lock(m_log1);
    q_sock.push(sockptr);
    }
    m_cond1.notify_all();
    
  • Неясно, какой функционал session обеспечивает.

    • Это только кажется, что он служит средством выделения сокета и помещения его в очередь. Почему бы просто не выделить сокет напрямую и не сделать так, чтобы вызывающий абонент поместил его в очередь?
    • session::sockptr управляется с помощью умного указателя, но session не является. С session не управляется с помощью интеллектуального указателя, утечка памяти происходит в server::handle_acceptкак ручку session теряется в переназначении.

    Определите, какие функции session это обеспечить и разработать интерфейс вокруг этого.

    • Если он предназначен для обеспечения инкапсуляции, то функции, не являющиеся членами, такие как handle_read1, возможно, потребуется стать членом функции.
    • Если session имеет свою собственную асинхронную цепочку и предоставляет себя обработчикам, затем рассмотрите возможность использования enable_shared_from_this. Boost.Asio руководство предоставляет пример использования, как и некоторые из Примеры.
  • В данный момент, async_read_some не указывает, какой сокет готов для чтения. К тому времени ReadHandler был вызван, данные были прочитаны.

    Это принципиальное различие между Proactor и Reactor. Если вам нужны операции в стиле Reactor, используйте boost::asio::null_buffers. Увидеть этот документация для более подробной информации. Тем не менее, есть последствия для каждого подхода. Таким образом, очень важно понимать эти последствия, чтобы можно было принять наилучшее решение.

  • С Boost.Asio, обеспечивающим демультиплексирование событий с помощью высокоуровневых конструкций, sock_dispatch нить может показаться непрактичной. session::start функция-член может инициировать асинхронное чтение в сокете. Это незначительное изменение устранит необходимость q_sockи все конструкции синхронизации в примере кода.

  • Изучите, почему должна использоваться синхронная запись. В случае эхо-клиентов, как показано в примере, часто бывает так, что асинхронные записи могут использоваться путем управления потоком самой асинхронной цепочки для устранения конфликта ресурсов. Это позволяет каждому соединению иметь свой собственный буфер, который можно использовать как для чтения, так и для записи.

  • Не предварительно оптимизируйте. Асинхронное программирование сложнее отладить в результате инвертированного потока управления. Попытка предварительной оптимизации пропускной способности только усугубит проблему сложности. Как только программа заработает, выполните проверку пропускной способности. Если результаты не соответствуют требованиям, то профиль для определения узкого места. По моему опыту, большинство серверов с высокой пропускной способностью будут связаны с вводом / выводом задолго до того, как будут связаны с процессором.
2

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]