boost :: asio :: async_read не читает последующие пакеты

Я пытаюсь создать сервер чата TCP, который будет взаимодействовать с уже существующим клиентом чата. У меня нет доступа к коду существующего клиента чата, но я знаю, что он отправляет по сети и что я должен отправить обратно.

Я пытаюсь использовать boost::asio библиотека, но по какой-то причине она не хочет выполнять второй boost::asio::async_read и я не могу на всю жизнь понять это.

Я редактировал boost::asio::async_read функция, чтобы добавить printf( "async_read\n" ); просто чтобы увидеть, действительно ли это вызывали и действительно ли это так.

Вот что я получаю в качестве вывода:

[INFO] Adding Server 0.0.0.0:10013
[INFO] Starting Servers

[ROOM] Client Joined (#1)
async_read
[HEADER] \x70
[HEADER] 112
async_read
[ROOM] Client Left (#1)

Клиент отправляет два пакета сразу друг за другом:

Packet 1 (Header : Body Size):
00000000  70 00                                            p.

Packet 2 (Body : Auth Info):
00000002  00 0c 59 38 3d 00 38 34  62 63 34 35 62 65 35 66 ..Y8=.84 bc45be5f
00000012  36 33 34 34 35 66 63 33  38 32 34 32 31 61 39 39 63445fc3 82421a99
00000022  31 66 37 66 39 62 00 31  30 35 2e 32 33 36 2e 33 1f7f9b.1 05.236.3
00000032  35 2e 39 39 00 61 39 33  35 66 65 62 32 64 62 66 5.99.a93 5feb2dbf
00000042  36 65 38 66 32 39 30 62  32 61 36 64 36 37 61 35 6e8f290b 2a6d67a5
00000052  31 36 63 65 65 30 36 34  64 38 64 63 37 00 28 00 16cee064 d8dc7.(.
00000062  00 00 81 06 01 00 77 61  63 00 03 00 09 00 00 00 ......wa c.......

Как видите, серверный код получает только первый пакет, затем останавливается на втором boost::asio::async_read вызов

Вот модифицированный код:

typedef std::deque< ChatMessage > ChatMessageQueue;

class ChatParticipant
{
public:
virtual ~ChatParticipant( ) { }
virtual void Deliver( const ChatMessage &Message ) = 0;
};

typedef boost::shared_ptr< ChatParticipant > ChatParticipantPtr;

class ChatRoom
{
public:
void join( ChatParticipantPtr Participant )
{
m_Participants.insert( Participant );
printf( "[ROOM] Client Joined (#%i)\n", m_Participants.size( ) );
std::for_each( m_RecentMessages.begin( ), m_RecentMessages.end( ),
boost::bind( &ChatParticipant::Deliver, Participant, _1 ) );
}

void leave( ChatParticipantPtr Participant )
{
printf( "[ROOM] Client Left (#%i)\n", m_Participants.count( Participant ) );
m_Participants.erase( Participant );
}

void deliver( const ChatMessage &Message )
{
m_RecentMessages.push_back( Message );
while( m_RecentMessages.size( ) > MaxRecentMessages )
m_RecentMessages.pop_front( );

std::for_each( m_Participants.begin( ), m_Participants.end( ),
boost::bind( &ChatParticipant::Deliver, _1, boost::ref( Message ) ) );
}

private:
std::set< ChatParticipantPtr > m_Participants;
enum { MaxRecentMessages = 100 };
ChatMessageQueue m_RecentMessages;
};

class ChatSession : public ChatParticipant, public boost::enable_shared_from_this< ChatSession >
{
public:
ChatSession( boost::asio::io_service &IOService, ChatRoom &Room ) : m_Socket( IOService ), m_Room( Room )
{
}

tcp::socket &Socket( )
{
return m_Socket;
}

void Start( )
{
m_Room.join( shared_from_this( ) );
boost::asio::async_read( m_Socket,
boost::asio::buffer( m_ReadMessage.data( ), ChatMessage::HeaderLength ),
boost::bind( &ChatSession::Handle_ReadHeader, shared_from_this( ), boost::asio::placeholders::error ) );
}

void Deliver( const ChatMessage &Message )
{
bool write_in_progress = !m_WriteMessages.empty( );
m_WriteMessages.push_back( Message );
if( !write_in_progress )
{
boost::asio::async_write( m_Socket,
boost::asio::buffer( m_WriteMessages.front( ).data( ), m_WriteMessages.front( ).length( ) ),
boost::bind( &ChatSession::Handle_Write, shared_from_this( ), boost::asio::placeholders::error ) );
}
}

void Handle_ReadHeader( const boost::system::error_code &Error )
{
if( Error )
{
m_Room.leave( shared_from_this( ) );
return;
}

if( m_ReadMessage.DecodeHeader( ) )
{
printf( "[HEADER] %s\n", Strings::StringToHex( m_ReadMessage.data( ) ).c_str( ) );
printf( "[HEADER] %i\n", m_ReadMessage.length( ) );
boost::asio::async_read( m_Socket,
boost::asio::buffer( m_ReadMessage.body( ), m_ReadMessage.length( ) ),
boost::bind( &ChatSession::Handle_ReadBody, shared_from_this( ), boost::asio::placeholders::error ) );
}
}

void Handle_ReadBody( const boost::system::error_code &Error )
{
if( Error )
{
m_Room.leave( shared_from_this( ) );
return;
}

printf( "[BODY] %s\n", Strings::StringToHex( m_ReadMessage.body( ) ).c_str( ) );
//m_Room.deliver( m_ReadMessage );
boost::asio::async_read( m_Socket,
boost::asio::buffer( m_ReadMessage.data( ), ChatMessage::HeaderLength ),
boost::bind( &ChatSession::Handle_ReadHeader, shared_from_this( ), boost::asio::placeholders::error ) );
}

void Handle_Write( const boost::system::error_code &Error )
{
if( Error )
{
m_Room.leave( shared_from_this( ) );
return;
}

m_WriteMessages.pop_front( );
if( !m_WriteMessages.empty( ) )
{
boost::asio::async_write( m_Socket,
boost::asio::buffer( m_WriteMessages.front( ).data( ), m_WriteMessages.front( ).length( ) ),
boost::bind( &ChatSession::Handle_Write, shared_from_this( ), boost::asio::placeholders::error ) );
}
}

private:
tcp::socket m_Socket;
ChatRoom &m_Room;
ChatMessage m_ReadMessage;
ChatMessageQueue m_WriteMessages;
};

typedef boost::shared_ptr< ChatSession > ChatSessionPtr;

class ChatServer
{
public:
ChatServer( boost::asio::io_service &IOService, const tcp::endpoint &EndPoint ) : m_IOService( IOService ), m_Acceptor( IOService, EndPoint )
{
StartAccepting( );
}

void StartAccepting()
{
ChatSessionPtr NewSession( new ChatSession( m_IOService, m_Room ) );
m_Acceptor.async_accept( NewSession->Socket( ),
boost::bind( &ChatServer::Handle_Accept, this, NewSession, boost::asio::placeholders::error ) );
}

void Handle_Accept( ChatSessionPtr Session, const boost::system::error_code& Error )
{
if( !Error )
Session->Start( );

StartAccepting( );
}

private:
boost::asio::io_service &m_IOService;
tcp::acceptor m_Acceptor;
ChatRoom m_Room;
};

typedef boost::shared_ptr< ChatServer > ChatServerPtr;

int main( int argc, char **argv, char **envp )
{
boost::asio::io_service IOService;

tcp::endpoint EndPoint( tcp::v4( ), 10013 );
ChatServerPtr Server( new ChatServer( IOService, EndPoint ) );
printf( "[INFO] Adding Server %s:%i\n", EndPoint.address( ).to_string( ).c_str( ), EndPoint.port( ) );

IOService.run( );

return TRUE;
}

Вы видите, что я делаю не так?

1

Решение

TCP является байтово-ориентированным потоком и не имеет понятия «пакет» в API. Ваше первое чтение — чтение 112 байтов (в соответствии с выводом отладки), поэтому вы читаете заголовок и тело за один вызов.

Что вы должны сделать — это сначала прочитать двухбайтовую длину, вызвав async_read с двухбайтовым буфером. Когда это вернется, вы должны декодировать его и использовать эту длину, чтобы указать, сколько байтов вы хотите прочитать для тела. Это должно быть количество байтов, которые вы передаете в параметре буфера для вашего следующего вызова async_read.

0

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]