Пример C ++ и Python ZeroMQ 4.x PUB / SUB не работает

Я могу найти только старые исходные коды C ++. В любом случае, я сделал мой, основываясь на них. Вот мой издатель в Python:

import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5563")

while True:
msg = "hello"socket.send_string(msg)
print("sent "+ msg)
sleep(5)

А вот подписчик на C ++:

void * ctx = zmq_ctx_new();
void * subscriber = zmq_socket(ctx, ZMQ_SUB);
// zmq_connect(subscriber, "tcp://*:5563");
zmq_connect(subscriber, "tcp://localhost:5563");
// zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", sizeof(""));

while (true) {
zmq_msg_t msg;
int rc;

rc = zmq_msg_init( & msg);
assert(rc == 0);
std::cout << "waiting for message..." << std::endl;
rc = zmq_msg_recv( & msg, subscriber, 0);
assert(rc == 1);

std::cout << "received: " << (char * ) zmq_msg_data( & msg) << std::endl;
zmq_msg_close( & msg);
}

Изначально я пытался zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", sizeof("") ); но я думаю, я должен получить все, если я не установлю это, верно? Я прокомментировал это.

Когда я запускаю код, я вижу «ожидание сообщения …» навсегда.

Я пытался прослушивать TCP-трафик, используя tcpdump. Оказывается, что когда издатель включен, я вижу много мусора на 5563 порт, и когда я выключаю издателя, они останавливаются. Когда я попробовал PUSH/PULL Схема, я мог видеть текстовое сообщение в tcpdump, (Я пытался нажать на nodejs и тянуть на c ++, и это сработало).

Что я могу делать не так?

Я пробовал разные комбинации .bind(), .connect(), localhost, 127.0.0.1, но они тоже не будут работать.

ОБНОВИТЬЯ только что прочитал, что я должен подписаться на что-то, поэтому я сделал zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, NULL, 0 ); подписаться на все, но я все еще ничего не получаю

PyZMQ находится в версии 17.0.0.b3 и имеет ZeroMQ 4.2.3

C ++ имеет ZeroMQ 4.2.2

ОБНОВЛЕНИЕ 2:

Обновления до 4.2.3 тоже не будут работать.

10

Решение

«Я думаю, я должен получить все, если я не установлю это, верно?«

Нет, это неверное предположение. Вам может понравиться коллекция из моих других сообщений ZeroMQ здесь, о {обычная строка | Юникод | сериализация} -популярные и {производительность- | Traffic-} — влияет на текущую политику ( SUB-обработка стороннего тематического фильтра в ранних версиях ZeroMQ и / или PUB-боковая обработка для более свежих) можно встретиться при проектировании гетерогенных распределенных систем с использованием ZeroMQ.

(Любая другая масштабируемая модель архетипа формального общения, подобная наблюдаемой PUSH/PULL, ничего не делает с политикой подписки, поэтому будет работать независимо от обработки сопоставления подписки с заданным списком фильтра тем. )


Шаг 0: сначала проверьте RTO отправляющей части, если он .send()Что-нибудь вообще

Давайте смоделируем быстрый питонический приемник, чтобы увидеть, действительно ли отправитель отправляет что-либо по линии:

import zmq

aContext = zmq.Context()                                          # .new Context
aSUB     = aContext.socket( zmq.SUB )                             # .new Socket
aSUB.connect( "tcp://127.0.0.1:5563" )                            # .connect
aSUB.setsockopt( zmq.LINGER, 0 )                                  # .set ALWAYS!
aSUB.setsockopt( zmq.SUBSCRIBE, "" )                              # .set T-filter

MASK = "INF: .recv()-ed this:[{0:}]\n:     waited {1: > 7d} [us]"aClk = zmq.Stopwatch();

while True:
try:
aClk.start(); print MASK.format( aSUB.recv(),
aClk.stop()
)
except ( KeyboardInterrupt, SystemExit ):
pass
break
pass
aSUB.close()                                                     # .close ALWAYS!
aContext.term()                                                  # .term  ALWAYS!

Это должно сообщать обо всем PUB-отправитель на самом деле .send()— по телеграфу, а также по фактическому времени прибытия сообщений (в [us]Я рад, что ZeroMQ включил этот инструмент для отладки и настройки производительности / задержки.

Если ACK-ред, как вы видите в прямом эфире INF:-сообщения на самом деле тикают на экране, продолжайте работать, и теперь имеет смысл перейти к следующему шагу.


Шаг 1: Проверьте код получающей части следующим образом:

#include <zmq.h>
void *aContext = zmq_ctx_new();
void *aSUB     = zmq_socket(     aContext, ZMQ_SUB );               std::cout << "INF: .. zmq_ctx_new() done" << std::endl;
zmq_connect(    aSUB,    "tcp://127.0.0.1:5563" ); std::cout << "INF: .. zmq_connect() done" << std::endl;
zmq_setsockopt( aSUB,     ZMQ_SUBSCRIBE, "", 0 );  std::cout << "INF: .. zmq_setsockopt( ZMQ_SUBSCRIBE, ... ) done" << std::endl;
zmq_setsockopt( aSUB,     ZMQ_LINGER,        0 );  std::cout << "INF: .. zmq_setsockopt( ZMQ_LINGER, ...    ) done" << std::endl;
int rc;
while (true) {
zmq_msg_t       msg;                            /*       Create an empty ØMQ message */
rc = zmq_msg_init  (&msg);          assert (rc ==  0 && "EXC: in zmq_msg_init() call" );
std::cout << "INF: .. zmq_msg_init() done" << std::endl;
rc = zmq_msg_recv  (&msg, aSUB, 0); assert (rc != -1 && "EXC: in zmq_msg_recv() call" );
std::cout << "INF: .. zmq_msg_recv() done: received [" << (char * ) zmq_msg_data( &msg ) << "]" << std::endl;
zmq_msg_close (&msg);                           /*       Release message */
std::cout << "INF: .. zmq_msg_close()'d" << std::endl;
}
zmq_close(    aSUB );                          std::cout << "INF: .. aSUB was zmq_close()'d" << std::endl;
zmq_ctx_term( aContext );                      std::cout << "INF: .. aCTX was zmq_ctx_term()'d" << std::endl;
4

Другие решения

Что является возвращаемым значением для zmq_setsockopt()?

Тогда вы должны использовать "" вместо NULL, они разные.

zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", 0 );

Как API определяет:

Возвращаемое значение

zmq_setsockopt() функция должна вернуть ноль в случае успеха. В противном случае он должен вернуть -1 и установить errno к одному из значений, определенных ниже.

2

Правильный рецепт для запуска паттерна PUB / SUB (независимо от языка):

  1. socket(zmq.PUB)
  2. bind("tcp://127.0.0.1:5555")
  3. encoding (обычно просто encode () для строк, но вы также можете сжимать () или dumps () для объектов или даже для обоих) encoded_topic = topic.encode() encoded_msg = msg.encode()
  4. send_multipart([encoded_topic, encoded_msg])
  1. socket(zmq.SUB)
  2. setsockopt(zmq.SUBSCRIBE, topic.encode())
  3. connect("tcp://127.0.0.1:5555")
  4. answer = recv_multipart()
  5. расшифровать ответ enc_topic, enc_msg = answer topic = enc_topic.decode() msg = enc_msg.decode()

В общем, шаги Pub — 2 / Sub — 3 (т. Е. Bind / connect) и Pub — 3 / Sub —
5 (т. Е. Кодирование / декодирование или дамп / загрузка) должны дополнять друг друга, чтобы все работало.

2

это я, тот, кто задал вопрос.

Мне удается работать обмена socket.bind("tcp://*:5563") в socket.connect("tcp://dns_address_of_my_dcker_container:5564") в питоне,

и обмен zmq_connect(subscriber, "tcp://localhost:5563") в zmq_bind(subscriber, "tcp://*:5563") в C ++

Примеры, которые я нашел в Интернете, говорят, что я должен использовать bind для издателя и connect для подписчика, но для меня это никак не сработает. У кого-нибудь есть идеи почему?

Документация ZeroMQ гласит следующее:

Функция zmq_bind () связывает сокет с локальной конечной точкой, а затем
принимает входящие соединения на этой конечной точке.

Функция zmq_connect () соединяет сокет с конечной точкой, а затем
принимает входящие соединения на этой конечной точке.

Я не имею четкого представления о том, что изменилось, но это сработало.

0
По вопросам рекламы [email protected]