Моя компания рассматривает возможность использования ZeroMQ в качестве транспортного механизма. Сначала я оценил производительность, чтобы понять, с чем я играю.
Поэтому я создал приложение, сравнивающее настройку zmq от дилера к winsock. Я измерил циклическую отправку синхронных сообщений от клиента на сервер и затем вычисление среднего значения.
Вот сервер, на котором работает winsock:
DWORD RunServerWINSOCKTest(DWORD dwPort)
{
WSADATA wsaData;
int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iRet != NO_ERROR)
{
printf("WSAStartup failed with error: %d\n", iRet);
return iRet;
}
struct addrinfo hints;
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
struct addrinfo *result = NULL;
iRet = getaddrinfo(NULL, std::to_string(dwPort).c_str(), &hints, &result);
if (iRet != 0)
{
WSACleanup();
return iRet;
}
SOCKET ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
if (ListenSocket == INVALID_SOCKET)
{
freeaddrinfo(result);
WSACleanup();
return WSAGetLastError();
}
iRet = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
if (iRet == SOCKET_ERROR)
{
freeaddrinfo(result);
closesocket(ListenSocket);
WSACleanup();
return WSAGetLastError();
}
freeaddrinfo(result);
iRet = listen(ListenSocket, SOMAXCONN);
if (iRet == SOCKET_ERROR)
{
closesocket(ListenSocket);
WSACleanup();
return WSAGetLastError();
}
while (true)
{
SOCKET ClientSocket = accept(ListenSocket, NULL, NULL);
if (ClientSocket == INVALID_SOCKET)
{
closesocket(ListenSocket);
WSACleanup();
return WSAGetLastError();
}
char value = 0;
setsockopt(ClientSocket, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value));
char recvbuf[DEFAULT_BUFLEN];
int recvbuflen = DEFAULT_BUFLEN;
do {
iRet = recv(ClientSocket, recvbuf, recvbuflen, 0);
if (iRet > 0) {
// Echo the buffer back to the sender
int iSendResult = send(ClientSocket, recvbuf, iRet, 0);
if (iSendResult == SOCKET_ERROR)
{
closesocket(ClientSocket);
WSACleanup();
return WSAGetLastError();
}
}
else if (iRet == 0)
printf("Connection closing...\n");
else {
closesocket(ClientSocket);
WSACleanup();
return 1;
}
} while (iRet > 0);
iRet = shutdown(ClientSocket, SD_SEND);
if (iRet == SOCKET_ERROR)
{
closesocket(ClientSocket);
WSACleanup();
return WSAGetLastError();
}
closesocket(ClientSocket);
}
closesocket(ListenSocket);
return WSACleanup();
}
Вот клиент под управлением winsock:
DWORD RunClientWINSOCKTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
WSADATA wsaData;
int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iRet != NO_ERROR)
{
return iRet;
}
SOCKET ConnectSocket = INVALID_SOCKET;
struct addrinfo *result = NULL, *ptr = NULL, hints;ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
int iResult = getaddrinfo(strAddress.c_str(), std::to_string(dwPort).c_str(), &hints, &result);
if (iResult != 0) {
WSACleanup();
return 1;
}
for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {
ConnectSocket = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (ConnectSocket == INVALID_SOCKET) {
WSACleanup();
return 1;
}
iResult = connect(ConnectSocket, ptr->ai_addr, (int)ptr->ai_addrlen);
if (iResult == SOCKET_ERROR) {
closesocket(ConnectSocket);
ConnectSocket = INVALID_SOCKET;
continue;
}
break;
}
freeaddrinfo(result);
if (ConnectSocket == INVALID_SOCKET) {
WSACleanup();
return 1;
}// Statistics
UINT64 uint64BytesTransmitted = 0;
UINT64 uint64StartTime = s_TimeStampGenerator.GetHighResolutionTimeStamp();
UINT64 uint64WaitForResponse = 0;
DWORD dwMessageCount = 1000000;
CHAR cRecvMsg[DEFAULT_BUFLEN];
SecureZeroMemory(&cRecvMsg, DEFAULT_BUFLEN);
std::string strSendMsg(dwMessageSize, 'X');
for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
{
int iRet = send(ConnectSocket, strSendMsg.data(), strSendMsg.size(), 0);
if (iRet == SOCKET_ERROR) {
closesocket(ConnectSocket);
WSACleanup();
return 1;
}
uint64BytesTransmitted += strSendMsg.size();
UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
iRet = recv(ConnectSocket, cRecvMsg, DEFAULT_BUFLEN, 0);
if (iRet < 1)
{
closesocket(ConnectSocket);
WSACleanup();
return 1;
}
std::string strMessage(cRecvMsg);
if (strMessage.compare(strSendMsg) == 0)
{
uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
}
else
{
return NO_ERROR;
}
}
UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);
iResult = shutdown(ConnectSocket, SD_SEND);
if (iResult == SOCKET_ERROR) {
closesocket(ConnectSocket);
WSACleanup();
return 1;
}
closesocket(ConnectSocket);
return WSACleanup();
}
Вот сервер под управлением ZMQ (дилер)
DWORD RunServerZMQTest(DWORD dwPort)
{
try
{
zmq::context_t context(1);
zmq::socket_t server(context, ZMQ_DEALER);
// Set options here
std::string strIdentity = s_set_id(server);
printf("Created server connection with ID: %s\n", strIdentity.c_str());
std::string strConnect = "tcp://*:" + std::to_string(dwPort);
server.bind(strConnect.c_str());
bool bRunning = true;
while (bRunning)
{
std::string strMessage = s_recv(server);
if (!s_send(server, strMessage))
{
return NO_ERROR;
}
}
}
catch (zmq::error_t& e)
{
return (DWORD)e.num();
}
return NO_ERROR;
}
Вот клиент под управлением ZMQ (дилер)
DWORD RunClientZMQTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
try
{
zmq::context_t ctx(1);
zmq::socket_t client(ctx, ZMQ_DEALER); // ZMQ_REQ
// Set options here
std::string strIdentity = s_set_id(client);
std::string strConnect = "tcp://" + strAddress + ":" + std::to_string(dwPort);
client.connect(strConnect.c_str());
if(s_send(client, "INIT"))
{
std::string strMessage = s_recv(client);
if (strMessage.compare("INIT") == 0)
{
printf("Client[%s] connected to: %s\n", strIdentity.c_str(), strConnect.c_str());
}
else
{
return NO_ERROR;
}
}
else
{
return NO_ERROR;
}// Statistics
UINT64 uint64BytesTransmitted = 0;
UINT64 uint64StartTime = s_TimeStampGenerator.GetHighResolutionTimeStamp();
UINT64 uint64WaitForResponse = 0;
DWORD dwMessageCount = 10000000;std::string strSendMsg(dwMessageSize, 'X');
for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
{
if (s_send(client, strSendMsg))
{
uint64BytesTransmitted += strSendMsg.size();
UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
std::string strRecvMsg = s_recv(client);
if (strRecvMsg.compare(strSendMsg) == 0)
{
uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
}
else
{
return NO_ERROR;
}
}
else
{
return NO_ERROR;
}
}
UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);
}
catch (zmq::error_t& e)
{
return (DWORD)e.num();
}
return NO_ERROR;
}
Я запускаю тест локально с размером сообщения 5 байтов, и я получаю следующий результат:
WINSOCK
Messages sent: 1 000 000
Time elapsed (us): 48 019 415
Time elapsed (s): 48.019 415
Message size (bytes): 5
Msg/s: 20 825
Bytes/s: 104 125
Mb/s: 0.099
Total response time (us): 24 537 376
Average repsonse time (us): 24.0
а также
ZeroMQ
Messages sent: 1 000 000
Time elapsed (us): 158 290 708
Time elapsed (s): 158.290 708
Message size (bytes): 5
Msg/s: 6 317
Bytes/s: 31 587
Mb/s: 0.030
Total response time (us): 125 524 178
Average response time (us): 125.0
Может кто-нибудь объяснить, почему среднее время отклика намного выше при использовании ZMQ?
Цель состоит в том, чтобы найти установку, где я могу отправлять и получать сообщения асинхронно без необходимости отвечать. Если это может быть достигнуто с другой настройкой, чем у дилера-дилера, пожалуйста, дайте мне знать!
Это всего лишь ответ на небольшую часть вашего вопроса, но здесь идет —
Зачем вам нужен дилер / дилер? Я предполагаю, потому что общение может начаться с любой точки? Вы не связанный дилеру / дилеру, в частности, он ограничивает вас только двумя конечными точками, если вы когда-либо добавите другую конечную точку на любой стороне сообщения, скажем, второго клиента, то каждый клиент будет получать только половину сообщений, потому что дилер строго циклически перебирает ,
Что вам нужно для асинхронной связи, так это некоторая комбинация разъемов дилера и / или маршрутизатора. Ни один из них не требует ответа, основные различия заключаются в том, как они выбирают, какой подключенный узел должен отправить сообщение:
Эти два типа сокетов работают вместе, потому что сокеты дилера (и сокеты запроса, дилер является сокетом типа запроса) отправляют свое «имя» как часть сообщения, которое сокет маршрутизатора может использовать для отправки данных обратно. Это парадигма запроса / ответа, и вы увидите, что такая парадигма применяется во всех примерах в гид, но вы можете склонить эту парадигму к тому, что вы ищете, в частности ни дилер, ни маршрутизатор не требуют ответа.
Не зная ваших полных требований, я не могу сказать вам, какую архитектуру ZMQ я бы выбрал, но в целом я предпочитаю расширяемость сокетов маршрутизатора, легче обрабатывать соответствующую адресацию, чем просто объединять все в один узел … вы увидите предупреждения против использования роутера / роутера, и я согласен с ними в той степени, в которой вы должны понимать, что вы делаете, прежде чем пытаться это делать, но, понимая, что вы делаете, реализация не так сложна.
У вас также есть возможность, если это соответствует вашим требованиям, настроить каждый конец с сокетом паба и каждый с дополнительным сокетом, если буквально нет ответов Когда-либо. Если это строго поток данных от источника к цели, и ни одному из партнеров не нужна обратная связь о том, что он отправляет, то это, вероятно, лучший выбор, хотя это означает, что вы имеете дело с двумя сокетами на конец, а не с одним.
Ничто из этого напрямую не связано с производительностью, но важно понимать, что сокеты zmq оптимизированы для конкретных случаев использования, и, как указано в ответе Джона Джеффериса, вы нарушаете этот вариант использования для сокета своего дилера, отправляя сообщения в ваш тест строго синхронный. Первое, что нужно начать, это завершить архитектуру ZMQ, а затем смоделировать фактический поток сообщений, в частности не добавление в произвольные ожидания и синхронность, которая будет обязательно измените способ, которым пропускная способность выглядит, поскольку вы проверяете это, в значительной степени по определению.
Вы говорите, что хотите отправлять и получать сообщения асинхронно без необходимости отвечать. Тем не менее, все проведенные тесты являются полностью синхронными, по сути, запрос-ответ, но на розетке дилер-дилер. Что-то там не вычисляется. Почему бы не запустить тесты, которые более точно имитируют дизайн, к которому вы стремитесь?
ZeroMQ получает достаточное количество производительности «быстрее, чем TCP», объединяя сообщения в очереди в одно сообщение. Очевидно, что этот механизм не может быть активирован в чисто синхронном режиме с одним сообщением в полете за раз.
Что касается того, почему этот конкретный тест очень маленьких сообщений, отправляемых и принимаемых исключительно синхронно, является относительно медленным, я не могу сказать. Вы сделали профилирование? Я снова скажу, что проведение этого теста и принятие решений на его основе не имеет смысла, если он не похож на ваш окончательный дизайн.
Одна вещь, которая выглядит странно, это блок try / catch в коде ZeroMQ. Это не выглядит справедливым, потому что тест winsock не был написан таким образом. Известно, что в try / catch есть / было много накладных расходов.
Проблема с OP связана с пропускной способностью, а не с задержкой, и, скорее всего, связана с шаблоном, который используется в приведенных примерах. Тем не менее, вы, вероятно, всегда найдете это ZeroMQ
имеет большую задержку, что я объясню, хотя это может быть бесполезно для OP в этой ситуации.
ZeroMQ
работает путем буферизации сообщений. Представьте себе (просто в качестве базовой иллюстрации) создание std::string
и добавление к нему множества маленьких строк (многие тысячи, каждая из которых включает небольшой заголовок, чтобы узнать размер этих маленьких сегментов), а затем отправка этой большей строки с интервалами 100us
, 1000us
, 10ms
или что угодно. На принимающей стороне большая строка принимается, и каждое меньшее сообщение удаляется по одному на основе размера заголовка, который отправляется вместе с ним. Это позволяет вам потенциально отправлять миллионы сообщений партиями (хотя std::string
очевидно, это плохой выбор) без дополнительных затрат на отправку этих миллионов очень маленьких измерений по одному. В результате вы в полной мере используете свои сетевые ресурсы и увеличиваете пропускную способность, а также создаете базовые FIFO
поведение. Однако вы также создаете задержку для заполнения буфера, что означает увеличение задержки.
Представьте себе (опять же, просто в качестве базовой иллюстрации): если вы потратите полсекунды (включая строковые операции и т. Д.) На буферизацию миллиона сообщений, это приведет к увеличению строки в несколько мегабайт. Современные сети могут легко отправить эту большую строку в оставшиеся полсекунды. 1000000us
(1 секунда) / 1000000 сообщений будет 1us
за сообщение, верно? Неверно — все сообщения имели половину секундной задержки для заполнения очереди, что приводит к увеличению задержки до половины секунды для всех сообщений. ZeroMQ
отправляет партии намного быстрее, чем каждый 500ms
, но увеличение задержки, которое это иллюстрирует, все еще происходит в ZeroMQ
хотя это обычно по линии нескольких ms
,