Я действительно не понимаю, что здесь происходит, поэтому я надеюсь, что кто-то может заметить то, что я пропустил.
Я пишу пользовательский демон, который принимает клиента, который я разрабатываю в Java. На данный момент этот клиент только подключается и отправляет пароль пользователя. Я разработал код под Cygwin и там он работает. Демон отправляет свое введение, затем клиент отправляет имя пользователя и пароль, и демон отвечает либо отключением клиента, либо отправкой OK (еще не сделано).
Когда я проверяю это с помощью Cygwin, он работает на локальном хосте. Я перенес код в HPUX, и клиент может подключиться, а также получить представление от демона. Теперь, когда клиент отправляет свое имя пользователя и пароль, он больше не работает. Демон получает только один байт, и когда он пытается прочитать снова, я получаю -1 как результат с EAGAIN и ничего больше. Клиент не показывает никакой ошибки, а также на стороне демона ее нет. Когда я перебираю код с помощью gdb, сообщения полностью возвращаются. 🙁
Вот код, который я использую: если нужна дополнительная информация, я могу добавить ее:
int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
int max = getSendBufferSize();
if(nBytes != -1 && nBytes < max)
max = nBytes;
SocketMessage sb(max);
int rcv_bytes = 0;
int total = 0;
int error = 0;
while(1)
{
rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
error = errno;
FILE_LOG(logDEBUG4) << "Received on socket: " << mSocketId << " bytes: " << rcv_bytes << " expected:" << sb.size() << " total: " << total << " errno: " << error;
if(rcv_bytes == -1)
{
if(error == EAGAIN || error == EWOULDBLOCK)
return total;
throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
}
//if(rcv_bytes == 0)
// throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection has been closed!");
total += rcv_bytes;
oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
}
return total;
}
Вывод журнала таков:
16:16:04.391 DEBUG4: Received on socket: 4 bytes: 1 expected:32768 total: 0 errno: 2
16:16:04.391 DEBUG4: Received on socket: 4 bytes: -1 expected:32768 total: 1 errno: 11
Так где же остальные 30 байтов и почему они не возвращаются?
ОБНОВИТЬ
Это только та часть кода, которая на самом деле получает данные. Сам класс сокетов имеет дело только с необработанным сокетом без каких-либо протоколлов. Протокол реализован в отдельном классе. Эта функция приема должна захватывать столько байтов, сколько доступно в сети, и помещать ее в буфер (SocketMessage). Не имеет значения, является ли количество байтов несколькими сообщениями или только частью одного сообщения, потому что управляющий класс создаст фактический блок сообщений из (возможно) частичного потока сообщений. То, что первый вызов получает только один байт, не является проблемой, потому что, если сообщение не завершено, вызывающая сторона ожидает в цикле, пока не поступит больше данных и сообщение не будет завершено. поскольку может быть более одного клиента, я использую неблокирующие сокеты. Я не хотел иметь дело с отдельными потоками, поэтому мой сервер мультиплексирует соединения.
Проблема в том, что прием получает только один байт, а я знаю, что должно быть больше. Код ошибки EAGAIN обрабатывается, и когда следующий ввод получен, он должен получить больше байтов. Даже если сеть передала только один байт, остальное сообщение все равно должно прибыть следующим, но это не так. Выбор, который ожидает на сокете, чтобы получить блоки данных, как будто там ничего нет. Когда я запускаю один и тот же код в dbg и пошагово, он работает.
Когда я снова соединяюсь с тем же клиентом, то неожиданно получаю больше байтов.
Когда я использую тот же код с Cygwin, используя localhost, он работает нормально.
ОБНОВИТЬ
Вот полный код.
Main.cpp
mServerSocket = new TCPSocket(getListeningPort());
mServerSocket->bindSocket();
mServerSocket->setSocketBlocking(false);
mServerSocket->listenToClient(0);
setupSignals();
while(isSIGTERM() == false)
{
try
{
max = prepareListening();
//memset(&ts, 0, sizeof(struct timespec));
pret = pselect(max+1, &mReaders, &mWriters, &mExceptions, NULL, &mSignalMask);
error_code = errno;
if(pret == 0)
{
// Timeout occured, but we are not interested in that for now.
// Currently this shouldn't happen anyway.
continue;
}
processRequest(pret, error_code);
}
catch (SocketException &excp)
{
removeClientConnection(findClientConnection(excp.getTCPSocket()));
}
} // while sigTERMBaseSocket.cpp:
#ifdef UNIX
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#endif
#include "support/exceptions/socket_exception.h"#include "support/logging/simple_log.h"#include "support/network/base_socket.h"
using namespace std;
BaseSocket::BaseSocket(void)
{
mSocketId = -1;
mSendBufferSize = MAX_SEND_LEN;
}
BaseSocket::BaseSocket(int pNumber)
{
mSocketId = -1;
mPortNumber = pNumber;
mBlocking = 1;
mSendBufferSize = MAX_SEND_LEN;
try
{
if ((mSocketId = ::socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::CONSTRUCTOR, errno, "Socket", "unix: error in socket constructor", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
/*
set the initial address of client that shall be communicated with to
any address as long as they are using the same port number.
The clientAddr structure is used in the future for storing the actual
address of client applications with which communication is going
to start
*/
mClientAddr.sin_family = AF_INET;
mClientAddr.sin_addr.s_addr = htonl(INADDR_ANY);
mClientAddr.sin_port = htons(mPortNumber);
updateSendBufferSize(MAX_SEND_LEN);
}
void BaseSocket::updateSendBufferSize(int nNewSize)
{
mSendBufferSize = getSendBufferSize();
if(mSendBufferSize > nNewSize)
mSendBufferSize = nNewSize;
}
BaseSocket::~BaseSocket(void)
{
close();
}
void BaseSocket::setSocketId(int socketFd)
{
mSocketId = socketFd;
}
int BaseSocket::getSocketId()
{
return mSocketId;
}
// returns the port number
int BaseSocket::getPortNumber()
{
return mPortNumber;
}
void BaseSocket::setDebug(int debugToggle)
{
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (char *) &debugToggle, sizeof(debugToggle)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set debug", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void BaseSocket::setReuseAddr(int reuseToggle)
{
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (char *) &reuseToggle,
sizeof(reuseToggle)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set reuse address", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void BaseSocket::setKeepAlive(int aliveToggle)
{
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (char *) &aliveToggle,
sizeof(aliveToggle)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set keep alive", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void BaseSocket::setLingerSeconds(int seconds)
{
struct linger lingerOption;
if (seconds > 0)
{
lingerOption.l_linger = seconds;
lingerOption.l_onoff = 1;
}
else
lingerOption.l_onoff = 0;
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
sizeof(struct linger)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger seconds", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void BaseSocket::setLingerOnOff(bool lingerOn)
{
struct linger lingerOption;
if (lingerOn)
lingerOption.l_onoff = 1;
else
lingerOption.l_onoff = 0;
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
sizeof(struct linger)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger on/off", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void BaseSocket::setSendBufferSize(int sendBufSize)
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (char *) &sendBufSize, sizeof(sendBufSize)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error send buffer size", __FILE__, __LINE__);
#endif
}
updateSendBufferSize(sendBufSize);
}
void BaseSocket::setReceiveBufferSize(int receiveBufSize)
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (char *) &receiveBufSize, sizeof(receiveBufSize)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set receive buffer size", __FILE__, __LINE__);
#endif
}
}
int BaseSocket::isSocketBlocking()
{
return mBlocking;
}
void BaseSocket::setSocketBlocking(int blockingToggle)
{
if (blockingToggle)
{
if (isSocketBlocking())
return;
else
mBlocking = 1;
}
else
{
if (!isSocketBlocking())
return;
else
mBlocking = 0;
}
try
{
#ifdef UNIX
int flags;
if (-1 == (flags = fcntl(mSocketId, F_GETFL, 0)))
flags = 0;
if(mBlocking)
fcntl(mSocketId, F_SETFL, flags & (~O_NONBLOCK));
else
fcntl(mSocketId, F_SETFL, flags | O_NONBLOCK);
/*if (ioctl(socketId, FIONBIO, (char *) &blocking) == -1)
{
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set socke blocking");
}*/
#endif
}
catch (SocketException& excp)
{
excp.response();
}
}
int BaseSocket::getDebug()
{
int myOption;
int myOptionLen = sizeof(myOption);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (void *) &myOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get debug", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return myOption;
}
int BaseSocket::getReuseAddr()
{
int myOption;
int myOptionLen = sizeof(myOption);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (void *) &myOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get reuse address", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return myOption;
}
int BaseSocket::getKeepAlive()
{
int myOption;
int myOptionLen = sizeof(myOption);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (void *) &myOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get keep alive", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return myOption;
}
int BaseSocket::getLingerSeconds()
{
struct linger lingerOption;
int myOptionLen = sizeof(struct linger);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger seconds", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return lingerOption.l_linger;
}
bool BaseSocket::getLingerOnOff()
{
struct linger lingerOption;
int myOptionLen = sizeof(struct linger);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger on/off", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
if (lingerOption.l_onoff == 1)
return true;
else
return false;
}
int BaseSocket::getSendBufferSize()
{
int sendBuf;
int myOptionLen = sizeof(sendBuf);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (void *)&sendBuf, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get send buffer size", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return sendBuf;
}
int BaseSocket::getReceiveBufferSize()
{
int rcvBuf;
int myOptionLen = sizeof(rcvBuf);
try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (void *) &rcvBuf, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get receive buffer size", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return rcvBuf;
}
ostream &operator<<(ostream& io, BaseSocket& s)
{
string flagStr = "";
io << endl;
io << "Summary of socket settings:" << endl;
io << " Socket Id: " << s.getSocketId() << endl;
io << " port #: " << s.getPortNumber() << endl;
io << " debug: " << (flagStr = s.getDebug() ? "true" : "false")
<< endl;
io << " reuse addr: " << (flagStr = s.getReuseAddr() ? "true" : "false")
<< endl;
io << " keep alive: " << (flagStr = s.getKeepAlive() ? "true" : "false")
<< endl;
io << " send buf size: " << s.getSendBufferSize() << endl;
io << " recv bug size: " << s.getReceiveBufferSize() << endl;
io << " blocking: "<< (flagStr = s.isSocketBlocking() ? "true" : "false") << endl;
io << " linger on: "<< (flagStr = s.getLingerOnOff() ? "true" : "false") << endl;
io << " linger seconds: " << s.getLingerSeconds() << endl;
io << endl;
return io;
}
void BaseSocket::close(void)
{
::close(mSocketId);
}
TCPSocket.cpp:#ifdef UNIX
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#endif
#include <sstream>
#include "support/logging/log.h"#include "support/exceptions/socket_exception.h"#include "support/logging/simple_log.h"#include "support/network/tcp_socket.h"
using namespace std;
const int MSG_HEADER_LEN = 6;
TCPSocket::TCPSocket()
: BaseSocket()
{
}
TCPSocket::TCPSocket(int portId)
: BaseSocket(portId)
{
}
TCPSocket::~TCPSocket()
{
}
void TCPSocket::initialize()
{
}
void TCPSocket::bindSocket()
{
try
{
if (bind(mSocketId, (struct sockaddr *) &mClientAddr, sizeof(struct sockaddr_in)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::BIND, 0, "Socket", "unix: error calling bind()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
void TCPSocket::connectToServer(string& serverNameOrAddr, hostType hType)
{
/*
when this method is called, a client socket has been built already,
so we have the socketId and portNumber ready.
a HostInfo instance is created, no matter how the server's name is
given (such as www.yuchen.net) or the server's address is given (such
as 169.56.32.35), we can use this HostInfo instance to get the
IP address of the server
*/
HostInfo serverInfo(serverNameOrAddr, hType);
// Store the IP address and socket port number
struct sockaddr_in serverAddress;
serverAddress.sin_family = AF_INET;
serverAddress.sin_addr.s_addr = inet_addr(
serverInfo.getHostIPAddress().c_str());
serverAddress.sin_port = htons(mPortNumber);
// Connect to the given address
try
{
if (connect(mSocketId, (struct sockaddr *) &serverAddress, sizeof(serverAddress)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::CONNECT, 0, "Socket", "unix: error calling connect()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
TCPSocket *TCPSocket::acceptClient(string& clientHost)
{
int newSocket; // the new socket file descriptor returned by the accept system call
// the length of the client's address
int clientAddressLen = sizeof(struct sockaddr_in);
struct sockaddr_in clientAddress; // Address of the client that sent data
// Accepts a new client connection and stores its socket file descriptor
try
{
if ((newSocket = accept(mSocketId, (struct sockaddr *) &clientAddress, &clientAddressLen)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::ACCEPT, 0, "Socket", "unix: error calling accept()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return NULL;
}
// Get the host name given the address
char *sAddress = inet_ntoa((struct in_addr) clientAddress.sin_addr);
HostInfo clientInfo(sAddress, ADDRESS);
clientHost += clientInfo.getHostName();
// Create and return the new TCPSocket object
TCPSocket* retSocket = new TCPSocket();
retSocket->setSocketId(newSocket);
return retSocket;
}
void TCPSocket::listenToClient(int totalNumPorts)
{
try
{
if (listen(mSocketId, totalNumPorts) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::LISTEN, 0, "Socket", "unix: error calling listen()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}
ostream &operator<<(ostream &oStream, const TCPSocket &oSocket)
{
oStream << oSocket.mSocketId;
return oStream;
}
int TCPSocket::send(SocketMessage const &oBuffer, int nSize)
{
int numBytes; // the number of bytes sent
int error = errno;
if(nSize == -1)
nSize = oBuffer.size();
if((unsigned int)nSize > oBuffer.size())
{
std::stringstream ss;
ss << "Invalid Buffersize! Requested: " << (unsigned int)nSize << " Provided: " << oBuffer.size();
std::string s;
ss >> s;
FILE_LOG(logERROR) << s;
throw SocketException(this, SocketException::SEND, 0, "Socket", s, __FILE__, __LINE__);
}
// Sends the message to the connected host
try
{
FILE_LOG(logDEBUG4) << "Sending on socket: "<< mSocketId << " bytes:" << nSize;
numBytes = ::send(mSocketId, &oBuffer[0], nSize, 0);
error = errno;
FILE_LOG(logDEBUG4) << "Sent on socket: "<< mSocketId << " bytes:" << nSize << " errno: " << error;
if(numBytes == -1)
{
#ifdef UNIX
if(error == EAGAIN || error == EWOULDBLOCK)
{
return -1;
}
else
throw SocketException(this, SocketException::SEND, error, "Socket", "unix: error calling send()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
return numBytes;
}
int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
int max = getSendBufferSize();
if(nBytes != -1 && nBytes < max)
max = nBytes;
SocketMessage sb(max);
int rcv_bytes = 0;
int total = 0;
int error = 0;
while(1)
{
rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
error = errno;
FILE_LOG(logDEBUG4) << "Received on socket: " << getSocketId() << " bytes: " << rcv_bytes << " expected:" << sb.size() << " total: " << total << " errno: " << error;
if(rcv_bytes == -1)
{
if(error == EAGAIN || error == EWOULDBLOCK)
return total;
throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
}
// Socket has been closed.
if(rcv_bytes == 0)
return total;
total += rcv_bytes;
oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
}
return total;
}
void TCPSocket::close(void)
{
BaseSocket::close();
}
Вы уверены, что Алгоритм Nagle здесь не пинается? Если вы не отключили его, установив TCP_NODELAY опция сокета ваши данные не могут быть отправлены до определенного объема данных (MSS) доступен.
Сначала пара вопросов:
— Почему вы используете неблокирующий ввод / вывод?
— Вы, очевидно, знаете, что сообщение должно быть длиной 30 байт, так почему вы запрашиваете 32768 байт?
Сокеты гораздо больше, чем просто вызов recv
если вы используете неблокирующий ввод / вывод. При блокировке ввода / вывода каждая ошибка является истинной ошибкой. С неблокирующим вводом / выводом вам приходится иметь дело с этой надоедливой ошибкой EAGAIN / EWOULDBLOCK. Восстановление возможно, но вы отвечаете за это восстановление, когда настраиваете устройство на использование неблокирующего ввода-вывода.
Как следует из первого имени (EAGAIN), получение этого результата ошибки означает, что вам нужно повторить попытку, желательно после небольшого ожидания. Простой, но не очень хороший способ подождать sleep
(или же usleep
или же nanosleep
) на некоторое время. Проблема в том, что вы могли ждать слишком долго или недостаточно долго. Подождите слишком долго, и ваша система может перестать отвечать на запросы или отправитель может уйти. Подождите слишком немного, и вы заставите компьютер переключаться между привилегированным и непривилегированным режимом.
Лучший способ дождаться такого события — использовать схему, основанную на событиях, но, к сожалению, эти схемы не переносимы. Портативная схема заключается в использовании select
или же poll
, Ты можешь сделать select
или же poll
ждать бесконечно, или вы можете указать время ожидания. я нахожу poll
намного проще в использовании, особенно когда задействован только один дескриптор файла. Это личное предпочтение однако. Другие находят select
проще в использовании.
Количество байтов, возвращаемых любым вызовом recv, непредсказуемо. Многие сообщения принимаются в нескольких частях, поэтому необходимо повторно вызвать recv, если у вас еще нет всего сообщения. Но ваш код, похоже, не может определить, было ли получено все сообщение. И ваш код возвращается в EWOULDBLOCK, но EWOULDBLOCK является нормальной частью операций с сокетами. Это не означает ошибку или завершение сообщения.