Я пишу эффективный сокет-сервер. Намерение — хорошая общая пропускная способность. Я использую основной поток в качестве слушателя. Это async_accept
клиент и добавить сокет в очередь. Там потоки диспетчера выбирают сокет, который готов к чтению, из очереди и добавляются в очередь одного из рабочих потоков. Я держу пул рабочих потоков. Рабочий поток будет выполнять реальное чтение / запись.
я использую async_accept
в моем слушателе. Чтобы узнать, какой сокет готов для чтения, я использую async_read_some в моем диспетчере. Эта идея работает, но с проблемой. мой io_service.run()
вызывается в слушателе, поэтому обработчик async_read_some
в диспетчере, фактически выполняется в потоке слушателя.
Вот мой код:
using boost::asio::ip::tcp;
using namespace std;
std::queue<std::shared_ptr<tcp::socket>> q_sock;
boost::mutex m_log1;
boost::condition_variable m_cond1;
boost::mutex::scoped_lock m_lock1 = boost::mutex::scoped_lock(m_log1);
sem_t _sem_sock;
enum { max_length1 = 1024 };
char data_1[max_length1];
void handle_read1(std::shared_ptr<tcp::socket> sock, const boost::system::error_code& error,
size_t bytes_transferred)
{
printf("handle_read1 : error : %s : %d, thread id is: %ld, pid : %d \n", error.category().name(), error.value(), (long int)syscall(SYS_gettid), getpid());
boost::asio::write(*(sock.get()), boost::asio::buffer(data_1, bytes_transferred));
}void sock_dispatch() {
int v_size = 0;
std::shared_ptr<tcp::socket> curr_sock;
printf("sock_dispatch started. The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());
while(1) {
while(1) {
sem_wait(&_sem_sock);
v_size = q_sock.size();
sem_post(&_sem_sock);
if(v_size <= 0)
m_cond1.timed_wait(m_lock1,boost::posix_time::milliseconds(5000));
else
break;
}
sem_wait(&_sem_sock);
curr_sock = q_sock.front();
q_sock.pop();
sem_post(&_sem_sock);
curr_sock->async_read_some(boost::asio::buffer(data_1, max_length1),
boost::bind(handle_read1, curr_sock,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
class session
{
public:
session(boost::asio::io_service& io_service)
: sockptr(new tcp::socket(io_service)) {}
void start()
{
printf("START NEW SESSION The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());
sem_wait(&_sem_sock);
q_sock.push(sockptr);
sem_post(&_sem_sock);
m_cond1.notify_all();
}
std::shared_ptr<tcp::socket> sockptr;
};
class server
{
public:
server(boost::asio::io_service& io_service, short port)
: io_service_(io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
{
session* new_session = new session(io_service_);
acceptor_.async_accept(*(new_session->sockptr.get()),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
printf("WAITING TO ACCEPT: The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());
}
void handle_accept(session* new_session,
const boost::system::error_code& error)
{
new_session->start();
new_session = new session(io_service_);
acceptor_.async_accept(*(new_session->sockptr.get()),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
private:
boost::asio::io_service& io_service_;
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
sem_init(&_sem_sock, 0, 1);
boost::asio::io_service io_service;
using namespace std;
server s(io_service, atoi(argv[1]));
boost::thread t(boost::bind(sock_dispatch));
io_service.run();
return 0;
}
Этот код изменен из примера boost :: asio, http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/async_tcp_echo_server.cpp. И код клиента http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/blocking_tcp_echo_client.cpp.
Когда клиент подключается, вывод сервера:
WAITING TO ACCEPT: The ID of this of this thread is: 3843, pid : 3843
sock_dispatch started. The ID of this of this thread is: 3844, pid : 3843
START NEW SESSION The ID of this of this thread is: 3843, pid : 3843
handle_read1 : error : system : 0, thread id is: 3843, pid : 3843
В этом случае идентификатор потока диспетчера равен 3944, но handle_read1 выполняется в потоке 3843.
В идеале handle_read1 должен работать в диспетчере, поэтому он не будет блокировать прием в слушателе.
Любая идея, что я должен сделать, чтобы достичь этого? Или вообще есть дизайн лучше :)?
Если вам нужны конкретные обработчики, вызываемые в определенных потоках, используйте разные io_service
объекты. Например, acceptor
может быть построен с io_service1
и розетки могут быть построены с io_service2
, Основной поток может затем выполнить io_service1.run()
в то время как потоки в пуле потоков выполняют io_service2.run()
,
При этом смешивание асинхронных и синхронных функций может быть довольно сложным. В большинстве асинхронных программ, над которыми я работал, редко возникает необходимость выделять поток для определенных асинхронных цепочек.
В целом, я думаю, что концептуальный дизайн в порядке, но у меня есть несколько предложений по реализации:
q_sock
Код потребителя и производителя представляет собой смесь конструкций более высокого уровня и более низкого уровня. Использование условной переменной немного неидиоматично, и возникает вопрос, почему sem_t
используется вместо boost::mutex
и замки. Например, следующий код потребителя и производителя:
// Consumer
while(1)
{
sem_wait(&_sem_sock);
v_size = q_sock.size();
sem_post(&_sem_sock);
if (v_size <= 0)
m_cond1.timed_wait(m_lock1, boost::posix_time::milliseconds(5000));
else
break;
}
sem_wait(&_sem_sock);
curr_sock = q_sock.front();
q_sock.pop();
sem_post(&_sem_sock);
// Producer
sem_wait(&_sem_sock);
q_sock.push(sockptr);
sem_post(&_sem_sock);
m_cond1.notify_all();
Может быть переписан без использования sem_t
и быть немного более идиоматичным на основе Boost.Thread’s condition_variable
документация. Рассмотрим альтернативу:
// Consumer
boost::unique_lock<boost::mutex> lock(m_log1);
while (q_sock.empty())
{
m_cond1.wait(lock);
}
curr_sock = q_sock.front();
q_sock.pop();
lock.unlock();
// Producer
{
boost::lock_guard<boost::mutex> lock(m_log1);
q_sock.push(sockptr);
}
m_cond1.notify_all();
Неясно, какой функционал session
обеспечивает.
session::sockptr
управляется с помощью умного указателя, но session
не является. С session
не управляется с помощью интеллектуального указателя, утечка памяти происходит в server::handle_accept
как ручку session
теряется в переназначении.Определите, какие функции session
это обеспечить и разработать интерфейс вокруг этого.
handle_read1
, возможно, потребуется стать членом функции.session
имеет свою собственную асинхронную цепочку и предоставляет себя обработчикам, затем рассмотрите возможность использования enable_shared_from_this
. Boost.Asio руководство предоставляет пример использования, как и некоторые из Примеры.В данный момент, async_read_some
не указывает, какой сокет готов для чтения. К тому времени ReadHandler
был вызван, данные были прочитаны.
Это принципиальное различие между Proactor и Reactor. Если вам нужны операции в стиле Reactor, используйте boost::asio::null_buffers
. Увидеть этот документация для более подробной информации. Тем не менее, есть последствия для каждого подхода. Таким образом, очень важно понимать эти последствия, чтобы можно было принять наилучшее решение.
С Boost.Asio, обеспечивающим демультиплексирование событий с помощью высокоуровневых конструкций, sock_dispatch
нить может показаться непрактичной. session::start
функция-член может инициировать асинхронное чтение в сокете. Это незначительное изменение устранит необходимость q_sock
и все конструкции синхронизации в примере кода.
Изучите, почему должна использоваться синхронная запись. В случае эхо-клиентов, как показано в примере, часто бывает так, что асинхронные записи могут использоваться путем управления потоком самой асинхронной цепочки для устранения конфликта ресурсов. Это позволяет каждому соединению иметь свой собственный буфер, который можно использовать как для чтения, так и для записи.
Других решений пока нет …