Дилер ZMQ — МАРШРУТИЗАТОР Связь

В настоящее время я работаю над проектом, который требует некоторой связи по сети с другими типами данных от некоторых объектов распределенной системы, и я использую ZMQ.

Основная цель проекта — создать центральный узел, обслуживающий клиентов, который может подключаться в любое время. Для каждого подключенного клиента центральный узел должен управлять передачей сообщений между ними.

В настоящее время и на данный момент все общение происходит по TCP.

Клиенты должны отправлять и получать сообщения в любое время, чтобы они ZMQ_DEALER типа сокетов и центральный узел ZMQ_ROUTER

Первоначально цель состоит в том, чтобы одно сообщение от какого-то клиента, это сообщение поступало другим клиентам. Это означает, что другие клиенты могут видеть все те же данные.

Я использую Шаблон асинхронного клиента / сервера потому что мне интересно, чтобы несколько клиентов общались друг с другом, имея, возможно, серверного брокера или промежуточное ПО.

у меня есть ZMQ_DEALER сокет-клиент, который подключается к ZMQ_ROUTER сервер сокетов

#include <zmq.hpp>
#include "zhelpers.hpp"using namespace std;

int main(int argc, char *argv[])
{

zmq::context_t context(1);
zmq::socket_t client(context, ZMQ_DEALER);

const string endpoint = "tcp://localhost:5559";

client.setsockopt(ZMQ_IDENTITY, "PEER1", 5);
cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
client.connect(endpoint);
for (int request = 0; request < 10; request++)
{

s_sendmore(client, "");
s_send(client, "Testing sending some data");

std::string string = s_recv(client);

std::cout << "Received reply " << request
<< " [" << string << "]" << std::endl;
}
}

На моем серверном коде у меня есть ZMQ_ROUTER, который принимает и управляет сообщениями, что делает его привязанным к скважинному порту. Этот сервер сделан на Python

import zmq
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

# Initialize a poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)

print("Creating Server Network Manager Router")

while True:
socks = dict(poller.poll())

if socks.get(frontend) == zmq.POLLIN:
message = frontend.recv_multipart()
print(message)
frontend.send_multipart(message)

На моем другом партнере / клиенте у меня есть следующее:

#include <zmq.hpp>
#include "zhelpers.hpp"using namespace std;

int main (int argc, char *argv[])
{

zmq::context_t context(1);
zmq::socket_t peer2(context, ZMQ_DEALER);

const string endpoint = "tcp://localhost:5559";

peer2.setsockopt(ZMQ_IDENTITY, "PEER2", 5);
cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
peer2.connect(endpoint);
//s_sendmore(peer2, "");
//s_send(peer2, "Probando");

//std::string string = s_recv(peer2);

//std::cout << "Received reply " << " [" << string << "]" << std::endl;

for (int request = 0; request < 10; request++)
{

s_sendmore(peer2, "");
s_send(peer2, "Probando");

std::string string = s_recv(peer2);

std::cout << "Received reply " << request
<< " [" << string << "]" << std::endl;
}

}

ОБНОВИТЬ

Но каждый, когда я выполняю некоторый клиент, их соответствующие сообщения не приходят к другому одноранговому клиенту.
Сообщения приходят на ZMQ_ROUTERи возвращаются в ZMQ_DEALER отправитель происхождения.

введите описание изображения здесь

Это связано с тем, что во время приема кадру идентификации предшествовал ROUTER, и сообщение отправляется обратно через ROUTER; где он удаляет идентификатор и использует значение для направления сообщения обратно к соответствующему ДИЛЕРУ, в соответствии с разделом ZMQ_ROUTER до конечной страницы здесь.

И это логика, я посылаю личность моего DEALER к ROUTER, ROUTER возьми эту личность и вернись к моему DEALER сообщение

Прежде всего, чтобы начать свою реализацию, мне нужно, чтобы какое-то ДИЛЕР отправляло какое-то сообщение, оно будет визуализировано любым другим ДИЛЕРОМ, независимо от того, сколько ДИЛЕРОВ (один или несколько) подключено к ZMQ_ROUTER.
В этом смысле … Нужно ли встречаться о структуре личности другого ДИЛЕРА или других ДИЛЕРОВ?

Если у меня есть DEALER A, DEALER B, а также DEALER C, а также ROUTER

затем:

DEALER A Отправить сообщение …
И я хочу, чтобы это сообщение от ДИЛЕРА А пришло на DEALER B а также DEALER C и так другое DEALERS это может быть присоединено к моей беседе сеанса …

В этом порядке идей, необходимо соответствовать структуре личности DEALER B а также DEALER C ранее на DEALER A сторона, чтобы это сообщение прибыло ему?

Как узнать идентификационные рамки каждого ДИЛЕРА, существующие в моей реализации?
Это сделано на стороне ROUTER?
Я не ясно это

1

Решение

Вы могли бы, чтобы все клиенты отправляли сообщение «Я здесь» при запуске. Центральный сервер может хранить все идентификаторы, c.f. начальная связь между работником и маршрутизатором здесь: http://zguide.zeromq.org/page:all#A-Load-Balancing-Message-Broker. Сервер будет отправлять любое полученное сообщение всем известным в настоящее время клиентам. Вы должны добавить сердцебиение, чтобы обнаружить отключенных клиентов, ср. http://zguide.zeromq.org/page:all#Heartbeating.

Тем не менее, ZeroMQ уже поставляется с такой схемой связи: PUBSUB, По сути, каждый клиент будет иметь DEALER и SUB сокет подключен к серверам ROUTER а также PUB Розетки. Сервер просто рассылает любые полученные
сообщение через PUB гнездо для все клиентов. Если это будет проблемой для исходящего клиента, вы можете включить в сообщение идентификатор клиента, чтобы каждый клиент мог отфильтровать сообщения со своим собственным идентификатором. Смотрите также этот пример из руководства
http://zguide.zeromq.org/page:all#Getting-an-Out-of-Band-Snapshot

Еще одна интересная картина будет Переиздание обновлений от клиентов:

Переиздание обновлений от клиентов

Вот PUSHPULL используется для отправки обновлений на сервер. Это имеет смысл, если нет необходимости в ответном сообщении с сервера. Если вам не нужен запрос состояния из этого примера, вы можете пропустить ROUTERDEALER часть. Вот пример реализации с использованием Python для краткости. Сервер слушает PULL сокет и отправляет все через PUB разъем:

import zmq

def main():
# context and sockets
ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://*:5557")
collector = ctx.socket(zmq.PULL)
collector.bind("tcp://*:5558")

while True:
message = collector.recv()
print "I: publishing update %s" % message
publisher.send(message)

if __name__ == '__main__':
main()

Клиент слушает PUB розетка на некоторое время. Если сообщение получено, оно регистрируется. Если время ожидания истекло, генерируется сообщение с вероятностью 1 к 10:

import random
import time

import zmq

def main():

# Prepare our context and subscriber
ctx = zmq.Context()
subscriber = ctx.socket(zmq.SUB)
subscriber.setsockopt(zmq.SUBSCRIBE, '')
subscriber.connect("tcp://localhost:5557")
publisher = ctx.socket(zmq.PUSH)
publisher.connect("tcp://localhost:5558")

random.seed(time.time())
while True:
if subscriber.poll(100) & zmq.POLLIN:
message = subscriber.recv()
print "I: received message %s" % message
else:
rand = random.randint(1, 100)
if rand < 10:
publisher.send("%d" % rand)
print "I: sending message %d" % rand

if __name__ == '__main__':
main()
1

Другие решения

(преждевременно) награжденный ответ не соответствует определенным свойствам.

Распределенные системы должны работать как умный а также продуктивно, поскольку агенты распределены, а анализ ошибок и развернутые производственные проблемы чрезвычайно дороги для анализа / тестирования / отладки.

Таким образом, повторное использование копирования / вставки несовместимой с проблемой идеи не является способом достижения первого, тем более второго.


Итак, давайте сначала рассмотрим эффективность:

client-[A].send()-сообщение о том, что O / P хотел стать серверным[S].recv()-Эд и повторно транслировать всем Другой clients-[B,C,...]кроме [A]-сам.

Наиболее ресурсоэффективный подход к этому состоит в правильной настройке инструментов инфраструктуры для выполнения именно этого, без повторного изобретения колеса и / или использования хрупкого и разрушающего производительность кода (ов) строительных лесов.

Так:

на клиенте[*] Сторону лучше всего использовать нижеприведенную эскизную примитивную концепцию агента. Более сложные настройки, такие как использование столь умных средств обработки событий, как Tkinter, превратились в .mainloop() мягкая система реального времени, тем лучше, но не так-то легко начать дизайн-сражения более чем на одном фронте, так что давайте сейчас все упростим:

zmq_VERSION      = zmq.zmq_version_info()
anAgentsIDENTITY = whateverHashOrHumanReadableSTRING
notMINE          = anAgentsIDENTITY

if     zmq_VERSION[0] < 4:
print "ZMQ{0:} ver < than expected, will exit".format( zmq_VERSION )
aCTX = zmq.Context( 2 )                        # if performance boosting is needed

#SUB ---------------------------------------------------------------------------
aSUB = aCTX.socket( zmq.SUB )
aSUB.setsockopt(    zmq.LINGER,          0 )   # protect your agent
aSUB.setsockopt(    zmq.MAXMSGSIZE,      m )   # protect your agent from DoS
aSUB.setsockopt(    zmq.AFFINITY,        1 )   # protect your server resources
aSUB.setsockopt(    zmq.HEARTBEAT_IVL,   ivl ) #     set server helping Heartbeats
aSUB.setsockopt(    zmq.HEARTBEAT_TTL,   ttl ) #     set server helping Heartbeats
aSUB.setsockopt(    zmq.INVERT_MATCHING, 1 )   #   avoid server sending data back
aSUB.setsockopt(    zmq.SUBSCRIBE,       notMINE )  #  NEVER .recv()-s  data back
...
#SUB PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST

aSUB.connect(      "tcp://localhost:5557" )

#PUSH --------------------------------------------------------------------------
aPUSH = aCTX.socket( zmq.PUSH )
...
#PUSH PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST

#main loop ---------------------------------------------------------------------
pass; notSoftFLAG = True; anAgentSignsWithIdentityPREFIX = anAgentsIDENTITY
while notSoftFLAG:

if aReasonToSendSomethingToServer:
aPUSH.send( anAgentSignsWithIdentityPREFIX
+ ":::"+ aMsgPAYLOAD,
zmq.DONTWAIT
)                          # inspect ZMQError
...
pass

if aSUB.poll( 100 ):
message = aSUB.recv( zmq.DONTWAIT )    #  NEVER .recv()-s own data back
...
passif aReasonToFlagLoopEXIT:
notSoftFLAG = False
...
pass

if ...:
...
pass

#main loop ---------------------------------------------------------------------
pass

#########
# ALWAYS:
#          better using context-aware try:/except:/finally:

aRetCODE = [ aSOCK.close() for aSOCK in ( aSUB, aPUSH, ) ]
...

aCTX.term()
#   .term()
#########

Сервер может избежать ВСЕ хлопоты с любой необходимостью для любой специальной обработки:

все хорошо настроено внутри инфраструктуры ZeroMQ:

pass;  zmq_VERSION = zmq.zmq_version_info()
if     zmq_VERSION[0] < 4:
print "ZMQ{0:} ver < than expected, will exit".format( zmq_VERSION )

aCTX = zmq.Context( 2 )                        # if performance boosting is needed

#PUB ---------------------------------------------------------------------------
aPUB = aCTX.socket( zmq.PUB )
aPUB.setsockopt(    zmq.LINGER,          0 )   # protect your server
aPUB.setsockopt(    zmq.MAXMSGSIZE,      m )   # protect your server from DoS
aPUB.setsockopt(    zmq.AFFINITY,        3 )   # protect your server resources
aPUB.setsockopt(    zmq.HEARTBEAT_IVL,   ivl ) #     server L3-helper Heartbeats
aPUB.setsockopt(    zmq.HEARTBEAT_TTL,   ttl ) #     server L3-helper Heartbeats
aPUB.setsockopt(    zmq.INVERT_MATCHING, 1 )   #   avoid server sending data back
aPUB.setsockopt(    zmq.IMMEDIATE,       1 )   # avoid Queueing for dead-ends
aPUB.setsockopt(    zmq.TOS,             tos ) # allow for L3-router TOS-policies
...
#PUB PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
aPUB.bind(   "tcp://*:5557" )                  # expose AccessPoint on tcp://

#PULL --------------------------------------------------------------------------
aPULL = aCTX.socket( zmq.PULL )
aPULL.setsockopt(    zmq.LINGER,          0 )  # protect your server
aPULL.setsockopt(    zmq.MAXMSGSIZE,      m )  # protect your server from DoS
aPULL.setsockopt(    zmq.AFFINITY,        3 )  # protect your server resources
aPULL.setsockopt(    zmq.HEARTBEAT_IVL,   ivl )#     server L3-helper Heartbeats
aPULL.setsockopt(    zmq.HEARTBEAT_TTL,   ttl )#     server L3-helper Heartbeats
...
#PULL PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
aPULL.bind(   "tcp://*:5558" )                 # expose AccessPoint on tcp://
...

#main loop ---------------------------------------------------------------------
pass; notSoftFLAG = True
while notSoftFLAG:
NOP_SLEEP = 10                            #  set a 10 [ms] sleep in case NOP
if aPULL.poll( 0 ):                       #  NEVER block/wait
aMSG = aPULL.recv( zmq.DONTWAIT )      #  NEVER .recv()-s own data back
#CPY = zmq_msg_copy( &aMSG );          // WARNING ABOUT NATIVE C-API
#                                      // HANDLING, NEED .COPY()
#                                      //           NEED .CLOSE()
aPUB.send( aMSG,   zmq.DONTWAIT )      #  re-PUB-lish to all others but sender
...< process aMSG payload on server-side, if needed >...

NOP_SLEEP = 0                          # !NOP, avoid 10[ms] NOP-loop sleep
pass

if aReasonToFlagLoopEXIT:
notSoftFLAG = False
...
NOP_SLEEP = 0
pass

if ...:
...
pass

sleep( NOP_SLEEP )                        # a soft-real-time controlled sleep on NOP
#main loop ---------------------------------------------------------------------
pass

#########
# ALWAYS:
#          better using context-aware try:/except:/finally:

aRetCODE = [ aSOCK.close() for aSOCK in ( aPUB, aPULL, ) ]
...

aCTX.term()
#   .term()
#########
1

По вопросам рекламы [email protected]