ZeroMQ Публикация и подписка одновременно

Я работаю над программой на C ++, которая должна иметь возможность отправлять / получать JSON-полезные нагрузки от произвольного числа других клиентов.

Сначала я попытался внедрить службу PubNub, но решил, что не могу получать и публиковать сообщения одновременно (даже используя два разных контекста в разных потоках). Я должен быть в состоянии сделать это. Я также обнаружил, что PubNub имеет слишком много задержек на мой вкус.

Я наткнулся на ZeroMQ библиотека, которая имеет PUB/SUB модель, которая будет соответствовать моим потребностям. Но все Примеры Я натолкнулся на объяснение, как реализовать это таким образом, чтобы один процесс был издателем ИЛИ подписчиком, и не оба одновременно.

В идеале я хотел бы запрограммировать сервер, который будет передавать все сообщения, поступающие от кого-либо, кому-либо, подписанному на определенный канал, указанный в сообщении. Любой должен иметь возможность получать и публиковать сообщения кому-либо еще в сети, при условии, что они подписаны на правильный канал.


UPDATE 1:

Замечания : Мне не нужна страховка чека потому что полезная нагрузка N + 1 будет иметь приоритет над полезной нагрузкой N. Я хочу отправить и забыть о средстве связи (по типу UDP).

По запросу: PubNub предел 32 kB в JSON-полезная нагрузка была для меня идеальной, мне больше не нужно. На самом деле, мои полезные нагрузки 4 kB в среднем. Все экземпляры клиентов будут работать в одной локальной сети, поэтому задержка должна быть меньше 5 ms в идеале. Что касается количества клиентов, не будет более 4 клиентов, подписанных на один и тот же канал / тему одновременно.


UPDATE 2 :

Я не могу предсказать, сколько каналов / тем будет существовать раньше времени, но это будет порядка десятков (большую часть времени), сотен (в пике). Не тысячи


Вопросы:

Q1: — Могу ли я реализовать такое поведение, используя ZeroMQ ?
Q2: — Есть ли рабочий образец, демонстрирующий это (предпочтительно в C++)
Q3: — Если нет, то любые предложения для библиотеки в C++ ?


пабсаб архитектура

0

Решение

ZeroMQ : способен хорошо справиться с этой задачей в масштабах, указанных выше
nanomsg : способен также выполнять эту задачу, необходимость перепроверить порты / привязки для клиентов

Обзор дизайна:

  • клиент случаи не являются постоянными, могут свободно появляться самостоятельно, могут свободно исчезать сами по себе или по ошибке
  • клиент Экземпляр решает сам, что он собирается PUB-Lish как сообщение полезной нагрузки
  • клиент Экземпляр решает сам, что он собирается SUB-подписаться как актуальный входящий поток сообщений TOPIC-фильтр
  • клиент экземпляр обменивается (отправляет), сам по себе, простой, не состоящий из нескольких частей, JSON-форматированные сообщения, которые он подготовил / произвел
  • клиент Экземпляр собирает (получает) сообщения, для которых предполагается, что они находятся в том же, не многочастном, JSON-форматированная форма и для которой попытка получить их локально обрабатывается будет иметь место после завершения приема
  • максимальное количество клиент-экземпляры не превышают небольшое количество сотен
  • максимальный размер любого JSON-formatted полезная нагрузка меньше чем 32 kB, Об 4 kB в среднем
  • максимальная задержка, приемлемая для межпроцессной доставки E2E через общую область коллизий LAN, ниже 5,000 [usec]
  • сервер Экземпляр является центральной ролью и постоянной сущностью
  • сервер экземпляр обеспечивает известный транспортный класс URLцель для всех просроченных столяры.connect()-s

Предложение:

сервер может использовать несколько вариантов поведения для достижения поставленных целей, используя как PUB а также SUB поведения, и обеспечивает код, управляемый, быстрый, SUB-блокированная, неблокирующая петля событий .poll() с выровненной повторной передачей любого из его SUB-боковая сторона .recv()полезные нагрузки к нему PUB-сторона, в настоящее время .connect()аудитория (в прямом эфире клиент экземпляры):

задавать s_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
а также s_PUB_send = aZmqCONTEXT.socket( zmq.PUB );

по причинам производительности, которые здесь не так сложны, можно также разделить обработку потоков рабочих нагрузок, сопоставив каждый из них с несвязанными подмножествами нескольких созданных потоков ввода-вывода:

карта s_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
а также s_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );

задавать s_PUB_send.bind( "tcp://localhost:8899" );
+
задавать s_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // forever *every*-TOPIC
задавать s_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
задавать s_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // retain just the last msg
задавать s_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
задавать s_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );

а также s_SUB_recv.bind( "tcp://localhost:8888" ); // [PUB]s .connect()

Так же,
клиент Экземпляр может развернуть обратный тандем как PUBконечная точка и SUB-конечный, готов к .connect() известной транспортной цели-URL,

клиент конкретная подписка локально решает, что фильтровать из входящего потока сообщений (до ZeroMQ v.3.1 API множество сообщений будет доставлено каждому клиент экземпляр над классом транспорта, однако, так как API v.3.1+, тема-фильтр работает на PUBсторона, которая в желаемом modus-operandi устраняет потерянные объемы данных по сети, но в то же время это увеличивает PUBнакладные расходы на обработку ( исх .: Замечания по принципу увеличенного отображения / увеличения потоков мульти-ввода-вывода выше)

задавать c_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
а также c_PUB_send = aZmqCONTEXT.socket( zmq.PUB );

до тех пор, пока накладные расходы на сборку / обработку полезной нагрузки не приблизятся к допустимому пороговому значению сквозной задержки, не будет необходимости разделять / отделять ZeroMQ потоки ввода / вывода низкого уровня здесь:
карта c_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
а также c_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );

задавать c_PUB_send.connect( "tcp://server:8888" ); // reverse .bind on [SUB]
+
задавать c_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // modified on-the-fly
задавать c_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
задавать c_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // take just last
задавать c_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
задавать c_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
а также c_SUB_recv.connect( "tcp://server:8899" );

Обсуждение:

Для хобби-проектов больше нет необходимости в инфраструктуре обмена сообщениями, тем не менее для более серьезных доменов существуют дополнительные сервер а также клиент экземпляры должны иметь некоторые дополнительные формы поведения формальной коммуникации.
r/KBD для удаленной клавиатуры, с CLIподобные специальные инспекционные утилиты
KEEP_ALIVE транспондеры для общесистемного мониторинга состояния / перфоманса
SIG_EXIT обработчики для разрешения всей системы / конкретного экземпляра SIG_EXITs
distributed syslog сервис, позволяющий безопасно собирать / хранить неблокирующую реплику записей журнала (будь то на этапе отладки или на этапе настройки производительности или сбора доказательств производственного уровня)

Identity Management инструменты для аудита-следы и др

WhiteList/BlackList для повышения надежности инфраструктуры, чтобы сделать ее более защищенной от DoS-атак / отравления ошибочных пакетов трафика NIC и др.

Adaptive Node re-Discovery для более разумного / специального проектирования инфраструктуры и мониторинга состояния или когда на сцену выходят сценарии с активным горячим резервированием и передачей ролей с активным горячим резервированием с много ролевой / (N + M) и т. д.

Резюме

A1: Да, полностью в рамках возможностей ZeroMQ
A2: Да, примеры кода C ++ в книге ZeroMQ / Руководства доступны
A3: Ref .: A1, плюс может понравиться в посте Мартина СУСТРИКА на тему «Различия между nanomsg а также ZeroMQ«

Надеюсь, вам понравятся возможности распределенной обработки, будь то при поддержке ZeroMQ или же nanomsg или оба.

Только собственное воображение — это предел.

Если интересно Более подробная информация, кому-то может понравиться книга, упомянутая в Лучший следующий шаг раздел этого поста

1

Другие решения

Q1: — Могу ли я реализовать такое поведение с помощью ZeroMQ?

Определенно да; но, вероятно, не используя PUB/SUB Розетки.

Способ сделать это с помощью PUB/SUB это: для каждого узла в системе вы делаете один PUB сокет и один SUB розетка, и подключите единый SUB розетка к PUB розетки все другие узлы и установите свой фильтр подписки соответственно. Это ограничено в своей полезности, потому что (я думаю) вам нужно установить один и тот же фильтр для всех ваших соединений. Обратите внимание, что вам определенно следует НЕ создать более одного контекста в каждом узле.

Если ваше общее количество узлов мало (например, 10-20 или меньше), вы можете сделать один PUB розетка и N-1 SUB сокеты на узел (все еще в одном контексте) и подключить каждый SUB розетка к каждому из PUB розетки других узлов.

Если у вас есть четкие представления о клиентских и серверных узлах, вы можете использовать более новые CLIENT/SERVER розетки (доступны в 4.2 или же 4.1Я верю.) Это будет более элегантно и, вероятно, проще в управлении, но вам придется реализовать фильтрацию контента («каналы») самостоятельно; что может быть довольно простым или немного сложным, в зависимости от того, что именно вы хотите сделать.

Q2: — Есть ли рабочий образец, демонстрирующий это (желательно на C ++)?

Не то, что я знаю из.

Q3: — Если нет, какие-либо предложения для библиотеки в C ++?

Я бы еще посоветовал ZeroMQиз-за его относительно легкого веса, простого и элегантного интерфейса, всесторонней функциональности и способности работать на многих языках. Есть много комбинаций сокетов на выбор. Если хуже становится хуже, вы всегда можете использовать PAIR розетки везде.

0

nanomsg с BUS протокол см. http://nanomsg.org/documentation-zeromq.html

0
По вопросам рекламы ammmcru@yandex.ru
Adblock
detector