В настоящее время я работаю над проектом, который требует некоторой связи по сети с другими типами данных от некоторых объектов распределенной системы, и я использую ZMQ.
Основная цель проекта — создать центральный узел, обслуживающий клиентов, который может подключаться в любое время. Для каждого подключенного клиента центральный узел должен управлять передачей сообщений между ними.
В настоящее время и на данный момент все общение происходит по TCP.
Клиенты должны отправлять и получать сообщения в любое время, чтобы они ZMQ_DEALER
типа сокетов и центральный узел ZMQ_ROUTER
Первоначально цель состоит в том, чтобы одно сообщение от какого-то клиента, это сообщение поступало другим клиентам. Это означает, что другие клиенты могут видеть все те же данные.
Я использую Шаблон асинхронного клиента / сервера потому что мне интересно, чтобы несколько клиентов общались друг с другом, имея, возможно, серверного брокера или промежуточное ПО.
у меня есть ZMQ_DEALER
сокет-клиент, который подключается к ZMQ_ROUTER
сервер сокетов
#include <zmq.hpp>
#include "zhelpers.hpp"using namespace std;
int main(int argc, char *argv[])
{
zmq::context_t context(1);
zmq::socket_t client(context, ZMQ_DEALER);
const string endpoint = "tcp://localhost:5559";
client.setsockopt(ZMQ_IDENTITY, "PEER1", 5);
cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
client.connect(endpoint);
for (int request = 0; request < 10; request++)
{
s_sendmore(client, "");
s_send(client, "Testing sending some data");
std::string string = s_recv(client);
std::cout << "Received reply " << request
<< " [" << string << "]" << std::endl;
}
}
На моем серверном коде у меня есть ZMQ_ROUTER, который принимает и управляет сообщениями, что делает его привязанным к скважинному порту. Этот сервер сделан на Python
import zmq
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# Initialize a poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
print("Creating Server Network Manager Router")
while True:
socks = dict(poller.poll())
if socks.get(frontend) == zmq.POLLIN:
message = frontend.recv_multipart()
print(message)
frontend.send_multipart(message)
На моем другом партнере / клиенте у меня есть следующее:
#include <zmq.hpp>
#include "zhelpers.hpp"using namespace std;
int main (int argc, char *argv[])
{
zmq::context_t context(1);
zmq::socket_t peer2(context, ZMQ_DEALER);
const string endpoint = "tcp://localhost:5559";
peer2.setsockopt(ZMQ_IDENTITY, "PEER2", 5);
cout << "Connecting to ZMQ Network Manager " << endpoint << "..." << endl;
peer2.connect(endpoint);
//s_sendmore(peer2, "");
//s_send(peer2, "Probando");
//std::string string = s_recv(peer2);
//std::cout << "Received reply " << " [" << string << "]" << std::endl;
for (int request = 0; request < 10; request++)
{
s_sendmore(peer2, "");
s_send(peer2, "Probando");
std::string string = s_recv(peer2);
std::cout << "Received reply " << request
<< " [" << string << "]" << std::endl;
}
}
ОБНОВИТЬ
Но каждый, когда я выполняю некоторый клиент, их соответствующие сообщения не приходят к другому одноранговому клиенту.
Сообщения приходят на ZMQ_ROUTER
и возвращаются в ZMQ_DEALER
отправитель происхождения.
Это связано с тем, что во время приема кадру идентификации предшествовал ROUTER, и сообщение отправляется обратно через ROUTER; где он удаляет идентификатор и использует значение для направления сообщения обратно к соответствующему ДИЛЕРУ, в соответствии с разделом ZMQ_ROUTER до конечной страницы здесь.
И это логика, я посылаю личность моего DEALER
к ROUTER
, ROUTER
возьми эту личность и вернись к моему DEALER
сообщение
Прежде всего, чтобы начать свою реализацию, мне нужно, чтобы какое-то ДИЛЕР отправляло какое-то сообщение, оно будет визуализировано любым другим ДИЛЕРОМ, независимо от того, сколько ДИЛЕРОВ (один или несколько) подключено к ZMQ_ROUTER.
В этом смысле … Нужно ли встречаться о структуре личности другого ДИЛЕРА или других ДИЛЕРОВ?
Если у меня есть DEALER A
, DEALER B
, а также DEALER C
, а также ROUTER
затем:
DEALER A
Отправить сообщение …
И я хочу, чтобы это сообщение от ДИЛЕРА А пришло на DEALER B
а также DEALER C
и так другое DEALERS
это может быть присоединено к моей беседе сеанса …
В этом порядке идей, необходимо соответствовать структуре личности DEALER B
а также DEALER C
ранее на DEALER A
сторона, чтобы это сообщение прибыло ему?
Как узнать идентификационные рамки каждого ДИЛЕРА, существующие в моей реализации?
Это сделано на стороне ROUTER?
Я не ясно это
Вы могли бы, чтобы все клиенты отправляли сообщение «Я здесь» при запуске. Центральный сервер может хранить все идентификаторы, c.f. начальная связь между работником и маршрутизатором здесь: http://zguide.zeromq.org/page:all#A-Load-Balancing-Message-Broker. Сервер будет отправлять любое полученное сообщение всем известным в настоящее время клиентам. Вы должны добавить сердцебиение, чтобы обнаружить отключенных клиентов, ср. http://zguide.zeromq.org/page:all#Heartbeating.
Тем не менее, ZeroMQ уже поставляется с такой схемой связи: PUB
—SUB
, По сути, каждый клиент будет иметь DEALER
и SUB
сокет подключен к серверам ROUTER
а также PUB
Розетки. Сервер просто рассылает любые полученные
сообщение через PUB
гнездо для все клиентов. Если это будет проблемой для исходящего клиента, вы можете включить в сообщение идентификатор клиента, чтобы каждый клиент мог отфильтровать сообщения со своим собственным идентификатором. Смотрите также этот пример из руководства
http://zguide.zeromq.org/page:all#Getting-an-Out-of-Band-Snapshot
Еще одна интересная картина будет Переиздание обновлений от клиентов:
Вот PUSH
—PULL
используется для отправки обновлений на сервер. Это имеет смысл, если нет необходимости в ответном сообщении с сервера. Если вам не нужен запрос состояния из этого примера, вы можете пропустить ROUTER
—DEALER
часть. Вот пример реализации с использованием Python для краткости. Сервер слушает PULL
сокет и отправляет все через PUB
разъем:
import zmq
def main():
# context and sockets
ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://*:5557")
collector = ctx.socket(zmq.PULL)
collector.bind("tcp://*:5558")
while True:
message = collector.recv()
print "I: publishing update %s" % message
publisher.send(message)
if __name__ == '__main__':
main()
Клиент слушает PUB
розетка на некоторое время. Если сообщение получено, оно регистрируется. Если время ожидания истекло, генерируется сообщение с вероятностью 1 к 10:
import random
import time
import zmq
def main():
# Prepare our context and subscriber
ctx = zmq.Context()
subscriber = ctx.socket(zmq.SUB)
subscriber.setsockopt(zmq.SUBSCRIBE, '')
subscriber.connect("tcp://localhost:5557")
publisher = ctx.socket(zmq.PUSH)
publisher.connect("tcp://localhost:5558")
random.seed(time.time())
while True:
if subscriber.poll(100) & zmq.POLLIN:
message = subscriber.recv()
print "I: received message %s" % message
else:
rand = random.randint(1, 100)
if rand < 10:
publisher.send("%d" % rand)
print "I: sending message %d" % rand
if __name__ == '__main__':
main()
Распределенные системы должны работать как умный а также продуктивно, поскольку агенты распределены, а анализ ошибок и развернутые производственные проблемы чрезвычайно дороги для анализа / тестирования / отладки.
Таким образом, повторное использование копирования / вставки несовместимой с проблемой идеи не является способом достижения первого, тем более второго.
client-[A].send()
-сообщение о том, что O / P хотел стать серверным[S].recv()
-Эд и повторно транслировать всем Другой clients-[B,C,...]
кроме [A]
-сам.
Наиболее ресурсоэффективный подход к этому состоит в правильной настройке инструментов инфраструктуры для выполнения именно этого, без повторного изобретения колеса и / или использования хрупкого и разрушающего производительность кода (ов) строительных лесов.
Так:
на клиенте[*]
Сторону лучше всего использовать нижеприведенную эскизную примитивную концепцию агента. Более сложные настройки, такие как использование столь умных средств обработки событий, как Tkinter, превратились в .mainloop()
мягкая система реального времени, тем лучше, но не так-то легко начать дизайн-сражения более чем на одном фронте, так что давайте сейчас все упростим:
zmq_VERSION = zmq.zmq_version_info()
anAgentsIDENTITY = whateverHashOrHumanReadableSTRING
notMINE = anAgentsIDENTITY
if zmq_VERSION[0] < 4:
print "ZMQ{0:} ver < than expected, will exit".format( zmq_VERSION )
aCTX = zmq.Context( 2 ) # if performance boosting is needed
#SUB ---------------------------------------------------------------------------
aSUB = aCTX.socket( zmq.SUB )
aSUB.setsockopt( zmq.LINGER, 0 ) # protect your agent
aSUB.setsockopt( zmq.MAXMSGSIZE, m ) # protect your agent from DoS
aSUB.setsockopt( zmq.AFFINITY, 1 ) # protect your server resources
aSUB.setsockopt( zmq.HEARTBEAT_IVL, ivl ) # set server helping Heartbeats
aSUB.setsockopt( zmq.HEARTBEAT_TTL, ttl ) # set server helping Heartbeats
aSUB.setsockopt( zmq.INVERT_MATCHING, 1 ) # avoid server sending data back
aSUB.setsockopt( zmq.SUBSCRIBE, notMINE ) # NEVER .recv()-s data back
...
#SUB PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
aSUB.connect( "tcp://localhost:5557" )
#PUSH --------------------------------------------------------------------------
aPUSH = aCTX.socket( zmq.PUSH )
...
#PUSH PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
#main loop ---------------------------------------------------------------------
pass; notSoftFLAG = True; anAgentSignsWithIdentityPREFIX = anAgentsIDENTITY
while notSoftFLAG:
if aReasonToSendSomethingToServer:
aPUSH.send( anAgentSignsWithIdentityPREFIX
+ ":::"+ aMsgPAYLOAD,
zmq.DONTWAIT
) # inspect ZMQError
...
pass
if aSUB.poll( 100 ):
message = aSUB.recv( zmq.DONTWAIT ) # NEVER .recv()-s own data back
...
passif aReasonToFlagLoopEXIT:
notSoftFLAG = False
...
pass
if ...:
...
pass
#main loop ---------------------------------------------------------------------
pass
#########
# ALWAYS:
# better using context-aware try:/except:/finally:
aRetCODE = [ aSOCK.close() for aSOCK in ( aSUB, aPUSH, ) ]
...
aCTX.term()
# .term()
#########
все хорошо настроено внутри инфраструктуры ZeroMQ:
pass; zmq_VERSION = zmq.zmq_version_info()
if zmq_VERSION[0] < 4:
print "ZMQ{0:} ver < than expected, will exit".format( zmq_VERSION )
aCTX = zmq.Context( 2 ) # if performance boosting is needed
#PUB ---------------------------------------------------------------------------
aPUB = aCTX.socket( zmq.PUB )
aPUB.setsockopt( zmq.LINGER, 0 ) # protect your server
aPUB.setsockopt( zmq.MAXMSGSIZE, m ) # protect your server from DoS
aPUB.setsockopt( zmq.AFFINITY, 3 ) # protect your server resources
aPUB.setsockopt( zmq.HEARTBEAT_IVL, ivl ) # server L3-helper Heartbeats
aPUB.setsockopt( zmq.HEARTBEAT_TTL, ttl ) # server L3-helper Heartbeats
aPUB.setsockopt( zmq.INVERT_MATCHING, 1 ) # avoid server sending data back
aPUB.setsockopt( zmq.IMMEDIATE, 1 ) # avoid Queueing for dead-ends
aPUB.setsockopt( zmq.TOS, tos ) # allow for L3-router TOS-policies
...
#PUB PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
aPUB.bind( "tcp://*:5557" ) # expose AccessPoint on tcp://
#PULL --------------------------------------------------------------------------
aPULL = aCTX.socket( zmq.PULL )
aPULL.setsockopt( zmq.LINGER, 0 ) # protect your server
aPULL.setsockopt( zmq.MAXMSGSIZE, m ) # protect your server from DoS
aPULL.setsockopt( zmq.AFFINITY, 3 ) # protect your server resources
aPULL.setsockopt( zmq.HEARTBEAT_IVL, ivl )# server L3-helper Heartbeats
aPULL.setsockopt( zmq.HEARTBEAT_TTL, ttl )# server L3-helper Heartbeats
...
#PULL PERFORMANCE & RESOURCES TWEAKING DETAILS GO WAY BEYOND THE SCOPE OF THIS POST
aPULL.bind( "tcp://*:5558" ) # expose AccessPoint on tcp://
...
#main loop ---------------------------------------------------------------------
pass; notSoftFLAG = True
while notSoftFLAG:
NOP_SLEEP = 10 # set a 10 [ms] sleep in case NOP
if aPULL.poll( 0 ): # NEVER block/wait
aMSG = aPULL.recv( zmq.DONTWAIT ) # NEVER .recv()-s own data back
#CPY = zmq_msg_copy( &aMSG ); // WARNING ABOUT NATIVE C-API
# // HANDLING, NEED .COPY()
# // NEED .CLOSE()
aPUB.send( aMSG, zmq.DONTWAIT ) # re-PUB-lish to all others but sender
...< process aMSG payload on server-side, if needed >...
NOP_SLEEP = 0 # !NOP, avoid 10[ms] NOP-loop sleep
pass
if aReasonToFlagLoopEXIT:
notSoftFLAG = False
...
NOP_SLEEP = 0
pass
if ...:
...
pass
sleep( NOP_SLEEP ) # a soft-real-time controlled sleep on NOP
#main loop ---------------------------------------------------------------------
pass
#########
# ALWAYS:
# better using context-aware try:/except:/finally:
aRetCODE = [ aSOCK.close() for aSOCK in ( aPUB, aPULL, ) ]
...
aCTX.term()
# .term()
#########