Приемник C ++ Boost UDP не работает, когда помещается в поток

У меня есть приемник UDP, который работает. Код здесь:

#include <array>
#include <iostream>
#include <string>
#include <boost/asio.hpp>

std::string getMyIp()
{
std::string result;
try
{
boost::asio::io_service netService;
boost::asio::ip::udp::resolver   resolver(netService);
boost::asio::ip::udp::udp::resolver::query query(boost::asio::ip::udp::v4(), "google.com", "");
boost::asio::ip::udp::udp::resolver::iterator endpoints = resolver.resolve(query);
boost::asio::ip::udp::udp::endpoint ep = *endpoints;
boost::asio::ip::udp::udp::socket socket(netService);
socket.connect(ep);
boost::asio::ip::address addr = socket.local_endpoint().address();
result = addr.to_string();
//std::cout << "My IP according to google is: " << results << std::endl;

}
catch (std::exception& e)
{
std::cerr << "Could not deal with socket. Exception: " << e.what() << std::endl;
}
return result;
}

class receiver
{
private:
boost::asio::ip::udp::socket socket_;
boost::asio::ip::udp::endpoint sender_endpoint_;
std::array<char, 1024> data_;

public:
receiver(boost::asio::io_service& io_service,
const boost::asio::ip::address& listen_address,
const boost::asio::ip::address& multicast_address,
unsigned short multicast_port = 13000)
: socket_(io_service)
{
// Create the socket so that multiple may be bound to the same address.
boost::asio::ip::udp::endpoint listen_endpoint(listen_address, multicast_port);
socket_.open(listen_endpoint.protocol());
socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true));
socket_.bind(listen_endpoint);

// Join the multicast group.
socket_.set_option(boost::asio::ip::multicast::join_group(multicast_address));
do_receive();
}

private:
void do_receive()
{
socket_.async_receive_from(boost::asio::buffer(data_), sender_endpoint_, [this](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
std::cout.write(data_.data(), length);
std::cout << std::endl;
do_receive();
}
});
}
};

int main(int argc, char* argv[])
{
try
{
boost::asio::io_service io_service;
receiver r(io_service, boost::asio::ip::make_address(getMyIp()), boost::asio::ip::make_address("224.0.0.0"), 13000);
io_service.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}

return 0;
}

Я хочу поместить код получателя в поток внутри класса, чтобы я мог делать другие вещи помимо него:

#define _CRT_SECURE_NO_WARNINGS
#include <ctime>
#include <iostream>
#include <string>
#include <queue>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <boost/thread/thread.hpp>
#include <boost/chrono.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

using boost::asio::ip::udp;
using std::cout;
using std::cin;
using std::endl;
using std::string;
using namespace std;

std::string getMyIp()
{
std::string result;
try
{
boost::asio::io_service netService;
boost::asio::ip::udp::resolver   resolver(netService);
boost::asio::ip::udp::udp::resolver::query query(boost::asio::ip::udp::v4(), "google.com", "");
boost::asio::ip::udp::udp::resolver::iterator endpoints = resolver.resolve(query);
boost::asio::ip::udp::udp::endpoint ep = *endpoints;
boost::asio::ip::udp::udp::socket socket(netService);
socket.connect(ep);
boost::asio::ip::address addr = socket.local_endpoint().address();
result = addr.to_string();
//std::cout << "My IP according to google is: " << results << std::endl;
}
catch (std::exception& e)
{
std::cerr << "Could not deal with socket. Exception: " << e.what() << std::endl;
}
return result;
}

class UdpReceiver
{
private:
boost::asio::ip::udp::socket socket_;
boost::asio::ip::udp::endpoint sender_endpoint_;
std::array<char, 1024> data_;

string address_send, address_recv;
unsigned short port_send, port_recv;
boost::thread_group threads;            // thread group
boost::thread* thread_main;             // main thread
boost::thread* thread_receive;          // receive thread
boost::thread* thread_send;             // get/send thread
boost::mutex stopMutex;
bool initialize = false;
bool stop, showBroadcast;
int i_send, i_recv, i_operator,
interval_send, interval_recv, interval_operator,
mode;
string message_send, message_recv;
string message_STOP = "STOP";

public:
// constructor
UdpReceiver(boost::asio::io_service& io_service, std::string address, unsigned short port, int interval, int mode, bool show = false)
: socket_(io_service),
showBroadcast(show)
{
initialize = false;
Initialize(io_service, show);
}

UdpReceiver(boost::asio::io_service& io_service, bool show = false)
: socket_(io_service),
showBroadcast(show)
{
Initialize(io_service, show);
}

// destructor
~UdpReceiver()
{
// show exit message
cout << "Exiting UDP Core." << endl;
}

// initialize
void Initialize(boost::asio::io_service& io_service, bool show = false)
{
if (initialize == false)
{
GetMode(true);
GetInfo(true);
}

CreateEndpoint(io_service);
CreateThreads();
stop = false;
showBroadcast = show;
i_send = 0;
i_recv = 0;
i_operator = 0;
message_send.clear();
message_recv.clear();
initialize = true;          // clear flag
}

void GetMode(bool default_value = false)
{
std::string input;
if (default_value)
{
mode = 0;
}
else
{
string prompt = "Set mode:\n0/other - Listen\n1 - Send\nEnter your choice: ";
cout << prompt;
getline(cin, input);

try
{
mode = stoi(input);

// set default mode to Listen
if (mode > 1)
mode = 0;
}
catch (exception ec)
{
cout << "Error converting mode: " << ec.what() << endl;
Stop();
}
}
}

void GetInfo(bool default_value = false)
{
// always called after GetMode()
string address;
unsigned short port;
int interval;

if (default_value)
{
address = getMyIp();
port = 13000;
interval = 500;
}

switch (mode)
{
case 0:
address_recv = address;
port_recv = port;
interval_recv = interval;
break;

case 1:
address_send = address;
port_send = port;
interval_send = interval;
break;

default:
// already set to 0 in GetMode()
break;
}
}

void CreateEndpoint(boost::asio::io_service& io_service)
{
// Create the socket so that multiple may be bound to the same address.
boost::asio::ip::udp::endpoint listen_endpoint(boost::asio::ip::address::from_string(address_recv), port_recv);
socket_.open(listen_endpoint.protocol());
socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true));
socket_.bind(listen_endpoint);

// Join the multicast group.
socket_.set_option(boost::asio::ip::multicast::join_group(boost::asio::ip::address::from_string("224.0.0.0")));
}

void CreateThreads()
{
thread_main = new boost::thread(boost::ref(*this));
interval_operator = 500;    // default value

switch (mode)
{
case 0:
thread_receive = new boost::thread(&UdpReceiver::Callable_Receive, this);
threads.add_thread(thread_receive);
break;

default:
// already set to 0 in GetMode()
break;
}
}

// start the threads
void Start()
{
// Wait till they are finished
threads.join_all();
}

// stop the threads
void Stop()
{
// warning message
cout << "Stopping all threads." << endl;

// signal the threads to stop (thread-safe)
stopMutex.lock();
stop = true;
stopMutex.unlock();

// wait for the threads to finish
thread_main->interrupt();   // in case not interrupted by operator()
threads.interrupt_all();
threads.join_all();

// close socket after everything closes
//socketPtr->close();
socket_.close();
}

void Callable_Receive()
{
while (!stop)
{
stopMutex.lock();
socket_.async_receive_from(boost::asio::buffer(data_), sender_endpoint_, [this](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
//cout << message_recv << endl;
std::cout.write(data_.data(), length);
std::cout << std::endl;
Callable_Receive();
}
});
stopMutex.unlock();
//cout << i_recv << endl;
++i_recv;
}
}

// Thread function
void operator () ()
{
while (!stop)
{
if (message_send == message_STOP)
{
try
{
this->Stop();
}
catch (exception e)
{
cout << e.what() << endl;
}
}

boost::this_thread::sleep(boost::posix_time::millisec(interval_operator));
boost::this_thread::interruption_point();
}
}
};

int main()
{
try
{
boost::asio::io_service io_service;
UdpReceiver mt(io_service, false);
mt.Start();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
}

Асинхронный прием находится внутри Callable_Receive (), который находится внутри thread_receive. Я вижу, что поток работает, когда счетчик напечатан на экране (который я комментирую). Однако async_receive_from () никогда ничего не получает. Может кто-нибудь сказать мне, почему это происходит?

0

Решение

Вы, вероятно, тупик в Callable_Receive. В теме с Callable_Receive как тело темы вы звоните stopMutex.lock перед вызовом async_receive_from функция. async_receive_from немедленно возвращается, но мы не знаем, когда лямбда-объект был передан в качестве третьего параметра async_receive_from будет называться. Когда выполняется тело лямбда-объекта, вы звоните Callable_Receive функция, если stopMutex был заблокирован (нить с Callable_Receive все еще выполняется, и следующая итерация выполняется, пока выполняется цикл), и если вы попытаетесь снова его заблокировать, вы получите тупик boost::mutex Вы не можете вызвать метод блокировки, когда мьютекс уже заблокирован тем же потоком.

Вы должны прочитать о boost::recursive_mutex если вы хотите решить эту проблему.

0

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]