Я работаю над программой на C ++, которая должна иметь возможность отправлять / получать JSON
-полезные нагрузки от произвольного числа других клиентов.
Сначала я попытался внедрить службу PubNub, но решил, что не могу получать и публиковать сообщения одновременно (даже используя два разных контекста в разных потоках). Я должен быть в состоянии сделать это. Я также обнаружил, что PubNub имеет слишком много задержек на мой вкус.
Я наткнулся на ZeroMQ библиотека, которая имеет PUB/SUB
модель, которая будет соответствовать моим потребностям. Но все Примеры Я натолкнулся на объяснение, как реализовать это таким образом, чтобы один процесс был издателем ИЛИ подписчиком, и не оба одновременно.
В идеале я хотел бы запрограммировать сервер, который будет передавать все сообщения, поступающие от кого-либо, кому-либо, подписанному на определенный канал, указанный в сообщении. Любой должен иметь возможность получать и публиковать сообщения кому-либо еще в сети, при условии, что они подписаны на правильный канал.
UPDATE 1:
Замечания : Мне не нужна страховка чека потому что полезная нагрузка N + 1 будет иметь приоритет над полезной нагрузкой N. Я хочу отправить и забыть о средстве связи (по типу UDP).
По запросу: PubNub предел 32 kB
в JSON
-полезная нагрузка была для меня идеальной, мне больше не нужно. На самом деле, мои полезные нагрузки 4 kB
в среднем. Все экземпляры клиентов будут работать в одной локальной сети, поэтому задержка должна быть меньше 5 ms
в идеале. Что касается количества клиентов, не будет более 4 клиентов, подписанных на один и тот же канал / тему одновременно.
UPDATE 2 :
Я не могу предсказать, сколько каналов / тем будет существовать раньше времени, но это будет порядка десятков (большую часть времени), сотен (в пике). Не тысячи
Q1:
— Могу ли я реализовать такое поведение, используя ZeroMQ
?
Q2:
— Есть ли рабочий образец, демонстрирующий это (предпочтительно в C++
)
Q3:
— Если нет, то любые предложения для библиотеки в C++
?
ZeroMQ :
способен хорошо справиться с этой задачей в масштабах, указанных выше
nanomsg :
способен также выполнять эту задачу, необходимость перепроверить порты / привязки для клиентовОбзор дизайна:
PUB
-Lish как сообщение полезной нагрузкиSUB
-подписаться как актуальный входящий поток сообщений TOPIC
-фильтрJSON
-форматированные сообщения, которые он подготовил / произвелJSON
-форматированная форма и для которой попытка получить их локально обрабатывается будет иметь место после завершения приемаJSON
-formatted полезная нагрузка меньше чем 32 kB
, Об 4 kB
в среднем5,000 [usec]
URL
цель для всех просроченных столяры.connect()
-sПредложение:
сервер может использовать несколько вариантов поведения для достижения поставленных целей, используя как
PUB
а такжеSUB
поведения, и обеспечивает код, управляемый, быстрый,SUB
-блокированная, неблокирующая петля событий.poll()
с выровненной повторной передачей любого из егоSUB
-боковая сторона.recv()
полезные нагрузки к немуPUB
-сторона, в настоящее время.connect()
аудитория (в прямом эфире клиент экземпляры):задавать
s_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
а такжеs_PUB_send = aZmqCONTEXT.socket( zmq.PUB );
по причинам производительности, которые здесь не так сложны, можно также разделить обработку потоков рабочих нагрузок, сопоставив каждый из них с несвязанными подмножествами нескольких созданных потоков ввода-вывода:
карта
s_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
а такжеs_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );
задавать
s_PUB_send.bind( "tcp://localhost:8899" );
+
задаватьs_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // forever *every*-TOPIC
задаватьs_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
задаватьs_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // retain just the last msg
задаватьs_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
задаватьs_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
а также
s_SUB_recv.bind( "tcp://localhost:8888" ); // [PUB]s .connect()
Так же,
клиент Экземпляр может развернуть обратный тандем какPUB
конечная точка иSUB
-конечный, готов к.connect()
известной транспортной цели-URL
,клиент конкретная подписка локально решает, что фильтровать из входящего потока сообщений (до
ZeroMQ v.3.1 API
множество сообщений будет доставлено каждому клиент экземпляр над классом транспорта, однако, так какAPI v.3.1+
, тема-фильтр работает наPUB
сторона, которая в желаемом modus-operandi устраняет потерянные объемы данных по сети, но в то же время это увеличиваетPUB
накладные расходы на обработку ( исх .: Замечания по принципу увеличенного отображения / увеличения потоков мульти-ввода-вывода выше)задавать
c_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
а такжеc_PUB_send = aZmqCONTEXT.socket( zmq.PUB );
до тех пор, пока накладные расходы на сборку / обработку полезной нагрузки не приблизятся к допустимому пороговому значению сквозной задержки, не будет необходимости разделять / отделять
ZeroMQ
потоки ввода / вывода низкого уровня здесь:
картаc_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
а такжеc_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );
задавать
c_PUB_send.connect( "tcp://server:8888" ); // reverse .bind on [SUB]
+
задаватьc_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // modified on-the-fly
задаватьc_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
задаватьc_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // take just last
задаватьc_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
задаватьc_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
а такжеc_SUB_recv.connect( "tcp://server:8899" );
Для хобби-проектов больше нет необходимости в инфраструктуре обмена сообщениями, тем не менее для более серьезных доменов существуют дополнительные сервер а также клиент экземпляры должны иметь некоторые дополнительные формы поведения формальной коммуникации.
— r/KBD
для удаленной клавиатуры, с CLI
подобные специальные инспекционные утилиты
— KEEP_ALIVE
транспондеры для общесистемного мониторинга состояния / перфоманса
— SIG_EXIT
обработчики для разрешения всей системы / конкретного экземпляра SIG_EXIT
s
— distributed syslog
сервис, позволяющий безопасно собирать / хранить неблокирующую реплику записей журнала (будь то на этапе отладки или на этапе настройки производительности или сбора доказательств производственного уровня)
— Identity Management
инструменты для аудита-следы и др
— WhiteList/BlackList
для повышения надежности инфраструктуры, чтобы сделать ее более защищенной от DoS-атак / отравления ошибочных пакетов трафика NIC и др.
— Adaptive Node re-Discovery
для более разумного / специального проектирования инфраструктуры и мониторинга состояния или когда на сцену выходят сценарии с активным горячим резервированием и передачей ролей с активным горячим резервированием с много ролевой / (N + M) и т. д.
A1
: Да, полностью в рамках возможностей ZeroMQ
A2
: Да, примеры кода C ++ в книге ZeroMQ / Руководства доступны
A3
: Ref .: A1, плюс может понравиться в посте Мартина СУСТРИКА на тему «Различия между nanomsg
а также ZeroMQ
«
Надеюсь, вам понравятся возможности распределенной обработки, будь то при поддержке ZeroMQ
или же nanomsg
или оба.
Только собственное воображение — это предел.
Если интересно Более подробная информация, кому-то может понравиться книга, упомянутая в Лучший следующий шаг раздел этого поста
Q1:
— Могу ли я реализовать такое поведение с помощью ZeroMQ?
Определенно да; но, вероятно, не используя PUB/SUB
Розетки.
Способ сделать это с помощью PUB/SUB
это: для каждого узла в системе вы делаете один PUB
сокет и один SUB
розетка, и подключите единый SUB
розетка к PUB
розетки все другие узлы и установите свой фильтр подписки соответственно. Это ограничено в своей полезности, потому что (я думаю) вам нужно установить один и тот же фильтр для всех ваших соединений. Обратите внимание, что вам определенно следует НЕ создать более одного контекста в каждом узле.
Если ваше общее количество узлов мало (например, 10-20 или меньше), вы можете сделать один PUB
розетка и N-1 SUB
сокеты на узел (все еще в одном контексте) и подключить каждый SUB
розетка к каждому из PUB
розетки других узлов.
Если у вас есть четкие представления о клиентских и серверных узлах, вы можете использовать более новые CLIENT/SERVER
розетки (доступны в 4.2
или же 4.1
Я верю.) Это будет более элегантно и, вероятно, проще в управлении, но вам придется реализовать фильтрацию контента («каналы») самостоятельно; что может быть довольно простым или немного сложным, в зависимости от того, что именно вы хотите сделать.
Q2:
— Есть ли рабочий образец, демонстрирующий это (желательно на C ++)?
Не то, что я знаю из.
Q3:
— Если нет, какие-либо предложения для библиотеки в C ++?
Я бы еще посоветовал ZeroMQ
из-за его относительно легкого веса, простого и элегантного интерфейса, всесторонней функциональности и способности работать на многих языках. Есть много комбинаций сокетов на выбор. Если хуже становится хуже, вы всегда можете использовать PAIR
розетки везде.
nanomsg
с BUS
протокол см. http://nanomsg.org/documentation-zeromq.html