Мне нужно реализовать многоадресный приемник, способный присоединиться к списку групп многоадресной рассылки и обрабатывать полученные данные в определенном потоке с помощью boost. Я попробовал следующий код …..
boost::asio::io_service m_io_service;
boost::asio::ip::udp::socket m_multicast_socket(m_io_service);
// listen address
boost::asio::ip::address listen_address
= boost::asio::ip::address::from_string("0.0.0.0");
// listen port
unsigned short multicast_port = m_configuration->m_multicast_interface_port;
boost::asio::ip::udp::endpoint listen_endpoint( listen_address, multicast_port );
// open socket
m_multicast_socket.open( listen_endpoint.protocol() );
// set socket buffer size
m_multicast_socket.set_option(
boost::asio::ip::udp::socket::receive_buffer_size
( m_configuration->m_receiving_socket_buffer_size ) );
// other sockets could bind to listen_address
m_multicast_socket.set_option( boost::asio::ip::udp::socket::reuse_address(true) );
boost::asio::socket_base::bytes_readable num_of_bytes_readable(true);
m_multicast_socket.io_control(num_of_bytes_readable);
m_multicast_socket.bind(listen_endpoint);// joining a list of multicast group
for ( size_t i=0; i < multicast_groups.size(); ++i )
{
boost::asio::ip::address multicast_address
= boost::asio::ip::address::from_string( multicast_groups[i] );
m_multicast_socket.set_option(
boost::asio::ip::multicast::join_group(
multicast_address ) );
std::cout << multicast_groups[i] << " multicast group joined!" << std::endl;
}
А потом читать данные бесконечным циклом …….
while ( !m_exit )
{
while ( !num_of_bytes_readable.get() )
{
boost::this_thread::sleep( boost::posix_time::milliseconds( 1 ) );
}
boost::asio::ip::udp::endpoint sender_endpoint;
size_t bytes_received = m_multicast_socket.receive_from(
boost::asio::buffer( m_reading_buffer.get(), m_configuration->m_reading_buffer_size )
, sender_endpoint );
if ( bytes_received > 0 )
{
// process
}
boost::this_thread::yield();
}
Но данные не получены и цикл …..
while ( !num_of_bytes_readable.get() )
{
boost::this_thread::sleep( boost::posix_time::milliseconds( 1 ) );
}
никогда не выходит.
Я также попробовал код примера получателя многоадресной рассылки из документации boost asio
но async_recv_from
никогда не вернется.
Есть несколько изменений, которые должны произойти:
ip::multicast::enable_loopback
вариант верный.socket_base::bytes_readable
является управляющей командой ввода-вывода socket.io_control
выполняет команду на сокете, и bytes_readable.get()
возвращает значение команды. Таким образом, команду необходимо выполнять каждый раз, чтобы запросить, сколько байтов доступно для чтения. Альтернативное и спорно немного более читаемое решение, заключается в использовании socket.available()
функция.Вот простой пример того, что демонстрирует использование socket_base::bytes_readable
,
// Standard includes.
#include <iostream>
// 3rd party includes.
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
void read(boost::asio::ip::udp::socket& socket)
{
boost::asio::ip::udp::endpoint sender;
std::vector<char> buffer;
std::size_t bytes_readable = 0;
for (int i=0; i < 3; ++i)
{
// Poll until data is available.
while (!bytes_readable)
{
// Issue command to socket to get number of bytes readable.
boost::asio::socket_base::bytes_readable num_of_bytes_readable(true);
socket.io_control(num_of_bytes_readable);
// Get the value from the command.
bytes_readable = num_of_bytes_readable.get();
// If there is no data available, then sleep.
if (!bytes_readable)
{
boost::this_thread::sleep(boost::posix_time::seconds(1));
}
}
// Resize the buffer to store all available data.
buffer.resize(bytes_readable);
// Read available data.
socket.receive_from(
boost::asio::buffer(buffer, bytes_readable),
sender);
// Extract data from the buffer.
std::string message(buffer.begin(), buffer.end());
// Output data.
std::cout << "Received message: ";
std::cout << message << std::endl;
}
}
void write(boost::asio::ip::udp::socket& socket,
boost::asio::ip::udp::endpoint& destination)
{
std::string message;
for (unsigned int i=0; i < 3; ++i)
{
std::ostringstream stream;
stream << i;
message = stream.str();
socket.send_to(boost::asio::buffer(message), destination);
std::cout << "Sent message: " << message << std::endl;
}
}
int main(int argc, char* argv[])
{
// Extract command-line arguments.
bool receiver = std::string(argv[1]) == "receive";
boost::asio::ip::address address =
boost::asio::ip::address::from_string(argv[2]);
unsigned short port = boost::lexical_cast<unsigned short>(argv[3]);
// Create socket.
using boost::asio::ip::udp;
boost::asio::io_service service;
udp::socket socket(service);
socket.open(boost::asio::ip::udp::v4());
// Allow other processes to reuse the address, permitting other processes on
// the same machine to use the multicast address.
socket.set_option(udp::socket::reuse_address(true));
// Guarantee the loopback is enabled so that multiple processes on the same
// machine can receive data that originates from the same socket.
socket.set_option(boost::asio::ip::multicast::enable_loopback(true));
socket.bind(
udp::endpoint(boost::asio::ip::address_v4::any(),
receiver ? port /* same as multicast port */
: 0 /* any */));
udp::endpoint destination(address, port);
// Join group.
namespace ip = boost::asio::ip;
socket.set_option(ip::multicast::join_group(address));
// Start read or write loops based on command line options.
if (receiver) read(socket);
else write(socket, destination);
}
Использование:
$ ./a.out receive 235.55.55.55 55555 &
$ sleep 1
$ ./a.out send 235.55.55.55 55555
Sent message: 0
Sent message: 1
Sent message: 2
Received message: 0
Received message: 1
Received message: 2
Других решений пока нет …