Я пытаюсь сделать простой пример, который принимает клиентов с помощью сокета tcp и обрабатывает их с помощью функции выбора winsock.
Проблема в том, что когда я запускаю функцию выбора, она возвращает мне значение -1 (ошибка), а WSAGetLastError возвращает значение 10022.
Я не могу выяснить, что я делаю не так, потому что fd_set настроен с сокетами правильно, а Maximum_sd установлен на правильное значение.
#include "stdafx.h"#include <stdio.h>
#include <thread>
#include <queue>
#include <string.h>
#include <WinSock2.h>
#include <iostream>
#include <Windows.h>
using namespace std;
void SlaveThread(queue<char*>* tasks);
void MasterThread(queue<char*>* tasks);
fd_set readfds;
int max_sd = 0;
deque<SOCKET> socketsQueue;
int nSocketsAmount = 0;
int _tmain(int argc, _TCHAR* argv[])
{
queue<char*>* tasksQueue = new queue<char*>();
FD_ZERO(&readfds);
thread SecondThread(MasterThread,tasksQueue);
thread FirstThread(SlaveThread,tasksQueue);
int nReady;
struct timeval timeout={0, 0};
timeout.tv_sec=10;
timeout.tv_usec=0;
while (true)
{
int i;
nReady = select(max_sd + 1, &readfds, NULL, NULL, &timeout);
for (i=0; i < nSocketsAmount && nReady > 0; i++)
{
SOCKET temp = socketsQueue[i];
if (FD_ISSET(temp, &readfds)) {
char buffer[200];
memset(buffer, 0, 200);
recv(temp, buffer, 200, 0);
tasksQueue->push(buffer);
nReady--;
}
}
}
FirstThread.join();
SecondThread.join();
return 0;
};
void SlaveThread(queue<char*>* tasks)
{
while (true)
{
if (!tasks->empty())
{
cout << tasks->front() << " Queue size : " << tasks->size() << endl;
tasks->pop();
}
Sleep(1000);
}
};
void MasterThread(queue<char*>* tasks)
{
WSAData WinSockData;
WORD Version = MAKEWORD(2, 1);
WSAStartup(Version, &WinSockData);
/* Create socket structure */
SOCKADDR_IN Server;
Server.sin_addr.s_addr = inet_addr("10.0.0.7");
Server.sin_family = AF_INET;
Server.sin_port = htons(27015);
SOCKET ListenSock = socket(AF_INET, SOCK_STREAM, NULL);
SOCKET Connect;
::bind(ListenSock, (SOCKADDR*)&Server, sizeof(Server));
int errno0 = WSAGetLastError();
listen(ListenSock, 1);
int errno1 = WSAGetLastError();
cout << "Listening on port 27015" << endl;
char buffer[200];
int size = sizeof(Server);
while (true)
{
if (Connect = accept(ListenSock, (SOCKADDR*)&Server, &size))
{
cout << "Connection established..." << endl;
FD_SET(Connect, &readfds);
socketsQueue.push_front(Connect);
nSocketsAmount++;
if (Connect > max_sd)
{
max_sd = Connect;
}
}
}
WSACleanup();
};
главный поток, добавляющий сокеты в очередь fd_set и сокетов. и основное использование выберите, чтобы получить сокет необходимо прочитать.
Любые предложения, почему это происходит?
Спасибо.
Кто (на какой странице документации) дал вам разрешение на изменение readfds
в то время как select
использует это?
Когда вы передаете структуру данных в функцию API, эта функция владеет ею до тех пор, пока она не вернется (или дольше, в случае буферов для перекрывающегося ввода-вывода). Вы не можете переписать его из другого потока.
Вам нужно объединить свои основные select
петля и ваша «главная» нить, к счастью select
вполне способен ожидать входящие соединения на прослушивающем сокете.
Фактическая ошибка, которую вы получаете, заключается в нарушении этого требования, найденного в документации:
Любые два параметра, readfds, writefds или кромеfds, могут быть заданы как null. По крайней мере один должен быть ненулевым, и любой набор ненулевых дескрипторов должен содержать по крайней мере один дескриптор сокета.
После применения редизайна, который я описал выше, набор всегда будет содержать гнездо для прослушивания, что также решает эту проблему.
10022 это WSAEINVAL
, Вы передаете неверный аргумент select()
, Одна вещь, которую вы не принимаете во внимание, это то, что select()
модифицирует предоставленный fd_set
структурирует на выходе, поэтому вы должны сбрасывать их каждый раз, когда вы звоните select()
, Вы также не принимаете во внимание, что вы изменяете fd_set
в то время как select()
может все еще использовать это. И вы также не защищаете свои очереди задач от одновременного доступа к потокам.
Попробуйте что-то более похожее на это:
#include "stdafx.h"#include <stdio.h>
#include <thread>
#include <queue>
#include <mutex>
#include <string.h>
#include <WinSock2.h>
#include <iostream>
#include <Windows.h>
using namespace std;
struct taskQueue
{
queue<char*> items;
mutex itemsLock;
};
void SlaveThread(taskQueue *tasks);
void MasterThread(taskQueue *tasks);
deque<SOCKET> socketsQueue;
mutex socketsQueueLock;
int _tmain(int argc, _TCHAR* argv[])
{
WSAData WinSockData;
WORD Version = MAKEWORD(2, 1);
WSAStartup(Version, &WinSockData);
taskQueue tasks;
thread SecondThread(MasterThread, &tasks);
thread FirstThread(SlaveThread, &tasks);
char *buffer = NULL;
while (true)
{
fd_set readfds;
FD_ZERO(&readfds);
struct timeval timeout = {0};
timeout.tv_sec = 10;
timeout.tv_usec = 0;
int nReady = 0;
{
lock_guard<mutex> lock(socketsQueueLock);
for (deque<SOCKET>::iterator iter = socketsQueue.begin(); iter != socketsQueue.end(); ++iter)
{
FD_SET(*iter, &readfds);
++nReady;
}
}
if (nReady == 0)
{
Sleep(100);
continue;
}
nReady = select(0, &readfds, NULL, NULL, &timeout);
if (nReady > 0)
{
lock_guard<mutex> lock(socketsQueueLock);
for (deque<SOCKET>::iterator iter = socketsQueue.begin(); iter != socketsQueue.end(); ++iter)
{
SOCKET temp = *iter;
if (FD_ISSET(temp, &readfds))
{
if (!buffer)
buffer = new char[201];
memset(buffer, 0, 200);
nReady = recv(temp, buffer, 200, 0);
if (nReady > 0)
{
buffer[nReady] = 0;
lock_guard<mutex> lock2(tasks.itemsLock);
tasks.items.push(buffer);
buffer = NULL;
}
}
}
}
}
FirstThread.join();
SecondThread.join();
delete[] buffer;
WSACleanup();
return 0;
};
void SlaveThread(taskQueue *tasks)
{
while (true)
{
{
lock_guard<mutex> lock(tasks->itemsLock);
if (!tasks->items.empty())
{
char *buffer = tasks->items.front();
cout << buffer << " Queue size : " << tasks->items.size() << endl;
tasks->items.pop();
delete[] buffer;
}
}
Sleep(1000);
}
};
void MasterThread(taskQueue *tasks)
{
/* Create socket structure */
SOCKADDR_IN Server = {0};
Server.sin_addr.s_addr = inet_addr("10.0.0.7");
Server.sin_family = AF_INET;
Server.sin_port = htons(27015);
SOCKET ListenSock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (ListenSock == INVALID_SOCKET)
{
cout << "Cannot create listening socket. Error: " << WSAGetLastError() << endl;
return;
}
if (::bind(ListenSock, (SOCKADDR*)&Server, sizeof(Server)) == SOCKET_ERROR)
{
cout << "Cannot bind listening socket. Error: " << WSAGetLastError() << endl;
closesocket(ListenSock);
return;
}
if (listen(ListenSock, 1) == SOCKET_ERROR)
{
cout << "Cannot listen on port 27015. Error: " << WSAGetLastError() << endl;
closesocket(ListenSock);
return;
}
cout << "Listening on port 27015" << endl;
char buffer[200];
int size;
SOCKET Connect;
while (true)
{
size = sizeof(Server);
Connect = accept(ListenSock, (SOCKADDR*)&Server, &size);
if (Connect != INVALID_SOCKET)
{
cout << "Connection established..." << endl;
lock_guard<mutex> lock(socketsQueueLock);
socketsQueue.push_front(Connect);
}
}
};