Я работаю над парой приложений клиент / сервер UDP, и мое приложение, которое отправляет команды, работает превосходно — я могу контролировать то, что отправляется на порт через nc и hexdump, и они отлично декодируются.
В моем приложении, которое должно получать команды, я использую recvfrom с флагом MSG_DONTWAIT. Я делаю это, потому что мне нужно проверить очередь на отправляемые вещи, так что просто оставить блокировку недоступной. ЕСЛИ я удаляю флаг MSG_DONTWAIT, сообщения принимаются и обрабатываются правильно, но это блокирует ожидание, которое не будет работать для моего приложения. При использовании MSG_DONTWAIT он всегда возвращает -1 и устанавливает для errno значение EAGAIN. Хотя этого можно ожидать, когда ничего не отправляется, он НИКОГДА не получает ничего вообще. Я бы подумал, что он вернет EAGAIN, пока что-нибудь не станет доступно, но это не так. Соответствующий код выложен ниже — что мне не хватает?
uint8_t Receiver::Setup(uint16_t rx_port, uint16_t tx_port)
{
std::stringstream ss;
ss << "UDP session manager, setup ports.";
Logger::Info(ss.str());
tx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
rx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (rx_socket_fd < 0)
{
Logger::Error("Could not open an rx UDP socket!");
}
else
{
std::cout << "rx_socket_fd is " << rx_socket_fd << "\n";
}
if (tx_socket_fd < 0)
{
Logger::Error("Could not open an tx UDP socket!");
}
else
{
std::cout << "tx_socket_fd is " << tx_socket_fd << "\n";
}int reuse = 1;
if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("Could not set socket reuse!");
#ifdef SO_REUSEPORT
reuse = 1;
if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("setsockopt(SO_REUSEPORT) failed");
#endif
reuse = 1;
if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("Could not set socket reuse!");
#ifdef SO_REUSEPORT
reuse = 1;
if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
Logger::Warn("setsockopt(SO_REUSEPORT) failed");
#endif
memset(&tx_sockaddr, 0, sizeof(tx_sockaddr));
memset(&rx_sockaddr, 0, sizeof(rx_sockaddr));
tx_sockaddr.sin_family = AF_INET;
tx_sockaddr.sin_addr.s_addr = INADDR_ANY;
tx_sockaddr.sin_port = htons(tx_port);
rx_sockaddr.sin_family = AF_INET;
rx_sockaddr.sin_addr.s_addr = INADDR_ANY;
rx_sockaddr.sin_port = htons(rx_port);
int rva = 0;
rva = bind(tx_socket_fd, (const struct sockaddr *) &tx_sockaddr, sizeof(tx_sockaddr) );
if (rva < 0)
{
std::stringstream ss;
ss << "UDP SessionManager: Could not bind to tx socket (bind returned error code " << rva << ", errno is " << errno << ")";
Logger::Error(ss.str());
}
rva = bind(rx_socket_fd, (const struct sockaddr *) &rx_sockaddr, sizeof(rx_sockaddr) );
if (rva < 0)
{
std::stringstream ss;
ss << "UDP SessionManager: Could not bind to rx socket (bind returned error code " << rva << ", errno is " << errno << ")";
Logger::Error(ss.str());
}
return NO_ERROR;
}uint8_t Receiver::SendTelemetry(const TelemetryBase * telemetry)
{
const uint8_t * bytes = EncodeTelemetryToSend(telemetry);
if (bytes == NULL)
{
Logger::Error("Receiver: Something went wrong trying to encode the telemetry.");
return 1;
}
const UDPHeader * header = (const UDPHeader * ) bytes;
uint16_t numBytes = header->length;
std::stringstream ss;
ss << "Receiver::SendTelemetry - bytesToWrite is " << numBytes << "\n";
Logger::Info(ss.str());
int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (rva == -1 && errno == EINVAL)
{
ss.clear();
ss << "invalid argument!";
Logger::Warn(ss.str());
}
else if (rva < 0)
{
ss.clear();
ss << "Failed to write to the UDP port, errno is " << errno;
Logger::Warn(ss.str());
return 1;
}
delete bytes;
return 0;
}uint8_t Receiver::SendCommand(const CommandBase * command)
{
const uint8_t * bytes = EncodeCommandToSend(command);
if (bytes == NULL)
{
Logger::Error("Receiver: Something went wrong trying to encode the message.");
return 1;
}
const UDPHeader * header = (const UDPHeader * ) bytes;
uint16_t numBytes = header->length;
std::stringstream ss;
ss << "Receiver::SendCommand - bytesToWrite is " << numBytes << "\n";
Logger::Info(ss.str());
int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (rva < 0)
{
ss.clear();
ss << "Failed to write to the UDP port, errno is " << errno;
Logger::Warn(ss.str());
return 1;
}
delete bytes;
return 0;
}
uint8_t Receiver::Receive()
{
uint8_t inputBuffer[UDP_BUFFER_BYTES];
memset(inputBuffer, '\0', UDP_BUFFER_BYTES);
int totalBytesRead = 0;
//socklen_t addressLength = sizeof(rx_sockaddr);
struct sockaddr_in sender;
socklen_t len;
totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
MSG_DONTWAIT, (struct sockaddr *) &sender, &len );
if ( totalBytesRead >= 0 )
{
std::stringstream ss;
ss << "UDP port read " << totalBytesRead << " bytes";
Logger::Info(ss.str() );
const CommandBase * command = DecodeReceivedCommand(inputBuffer);
if (command == NULL)
{
Logger::Warn("Failed to decode received command from commanding app.");
return UDP_ERROR_DECODE_FAILED;
}
EnqueCommand(command);
}
else
{
std::stringstream ss;
ss << "UDP port rva = " << totalBytesRead << ", errno is " << errno;
Logger::Debug(ss.str());
}
return UDP_ERROR_NO_ERROR;
}void Receiver::ProcessingLoopThread()
{
while ( GetState() == STATE_RUN )
{
const TelemetryBase * telemetry = DequeTelemetry();
while (telemetry != NULL)
{
std::stringstream ss;
ss << "Receiver sending telemetry with ID: " << telemetry->GetTelemetryID();
Logger::Debug(ss.str());
SendTelemetry(telemetry);
delete telemetry;
telemetry = DequeTelemetry();
}
Receive();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
Несколько вещей:
Это может не быть проблемой, но я бы посоветовал вам зафиксировать ошибку до того, как любой другой код сможет запустить и очистить ошибку. Вместо этого:
std::stringstream ss;
ss << "UDP port rva = " << totalBytesRead << ", errno is " << errno;
Лучше:
totalBytesRead = recvfrom(rx_socket_fd,...
int lasterror = errno; // catch errno before anything else can change it. . .
ss << "UDP port rva = " << totalBytesRead << ", errno is " << lasterror;
Вернуться к исходной проблеме.
Я предполагаю, что вам нужно опрашивать сокет более одного раза при использовании неблокирующего флага MSG_DONTWAIT.
Похоже, ваш основной цикл спит в течение 10 миллисекунд между каждым опросом сокета. Если это ваш дизайн, то просто сделайте это:
Когда вы создаете сокет, установите для него время ожидания 10 миллисекунд:
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10 * 1000; // 10 milliseconds
setsockopt(rx_socket_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
Затем просто удалите MSG_DONTWAIT
флаг из вызова recvfrom.
Также удалите оператор сна в вашем основном цикле:
std::this_thread::sleep_for(std::chrono::milliseconds(10));
А затем изящно обрабатывать ошибку тайм-аута как доброкачественную вещь, которая может произойти
totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
0, (struct sockaddr *) &sender, &len );if (totalBytesRead >= 0 )
{
// data available - handle it
}
else
{
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
// socket timed out after waiting 10 milliseconds
}
else
{
// actual socket error
}
}
struct sockaddr_in sender;
socklen_t len;
totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
MSG_DONTWAIT, (struct sockaddr *) &sender, &len );
Вы не назначали len
разумная ценность. Если вы не инициализируете len
до размера адреса сокета, вызов может завершиться неудачно.
Кроме того, вы захватываете errno
поздно. Вы должны захватить это немедленно после вызова, который получает ошибку. В противном случае другие посреднические операции могут изменить его стоимость. Таким образом, вы не можете рассчитывать на получение разумного значения.
Использование отдельных сокетов для отправки и получения очень странно. Если вы отправляете и получаете в ту же другую конечную точку, вы должны использовать только один сокет.