ZMQ_CONFLATE не работает для ZMQ_SUB (без фильтров)

У меня есть библиотека zeromq-4.1.4 и cppzmq, установленные на быстром сервере в реальном времени и медленном клиенте.

И клиент, и сервер имеют 2 порта для публикации и подписки, связываясь по TCP-IP.

Сервер отправляет сообщения со своей скоростью. Клиент получает последнее сообщение, выполняет медленные вычисления и отправляет сообщение обратно на сервер. Сервер читает сообщение, если есть входящий, и обрабатывает его.

Проблема в том, что старые сообщения не перезаписываются новыми. Клиент всегда распечатывает старые сообщения, и даже если я выключаю сервер, сообщения продолжают помещаться в очередь из приемного буфера клиента.

Почему это происходит? ZMQ_CONFLATE установлен. Разве это не должно просто работать?

В качестве обходного пути я хотел бы поместить клиента в рабочий поток для работы на максимальной скорости, а затем сохранить последнее сообщение вручную. Но это накладные расходы, поскольку именно так и поступает zeromq при отправке или получении сообщений, насколько я понимаю.

Код клиент / сервер такой же:

void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags)
{
flags_ = flags;
int confl = 1;

// Prepare our context
context_ = new zmq::context_t(1);

// Prepare ZMQ publisher
publisher_ = new zmq::socket_t(*context_, ZMQ_PUB);
publisher_->bind(pubAddress);
publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message

// Prepare ZMQ subscriber
subscriber_ = new zmq::socket_t(*this->context_, ZMQ_SUB);
subscriber_->connect(subAddress);
subscriber_->setsockopt(ZMQ_SUBSCRIBE, "", 0);
subscriber_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message

if (flags_ & ZMQ_SYNC_PUB)
{
syncService_ = new zmq::socket_t(*context_, ZMQ_REP);
syncService_->bind(syncAddress);
}

if (flags_ & ZMQ_SYNC_SUB)
{
// synchronize with publisher
syncService_ = new zmq::socket_t(*context_, ZMQ_REQ);
syncService_->connect(syncAddress);

// - send a synchronization request
zmq::message_t message(0);
syncService_->send(message);

// - wait for synchronization reply
zmq::message_t update;
syncService_->recv(&update);
}
}

void ZeromqMessenger::sync()
{
if (connected_)
return;

if (flags_ & ZMQ_SYNC_PUB)
{
//std::cout << "Waiting for subscribers" << std::endl;
if (subscribers_ < subscribers_expected_)
{
// - wait for synchronization request
zmq::message_t update;
if (syncService_->recv(&update, ZMQ_DONTWAIT))
{
// - send synchronization reply
zmq::message_t message(0);
syncService_->send(message);

subscribers_++;
}
}

if (subscribers_ == subscribers_expected_)
connected_ = true;
}
}

void ZeromqMessenger::send(const void* data, int size) const
{
zmq::message_t message(size);
memcpy(message.data(), data, size);
publisher_->send(message);
}

bool ZeromqMessenger::recv(void *data, int size, int flags) const
{
zmq::message_t update;
bool received = subscriber_->recv(&update, flags);
if(received)
memcpy(data, update.data(), size);
return received;
}

0

Решение

Я реализовал версию с резьбой, и она работает просто отлично. Это очень грубая реализация с глобальными переменными, которая должна быть уточнена, но по крайней мере она работает.

#include <zmq_messenger.h>
#include <iostream>
#include <thread>
#include <mutex>

std::string gSubAddress;
std::mutex gMtx;
const int gSize = 20*sizeof(double);
char gData[gSize];

void *worker_routine (void *context)
{
// Prepare ZMQ subscriber
int confl = 1;
zmq::socket_t* subscriber = new zmq::socket_t(*(zmq::context_t*)context, ZMQ_SUB);
subscriber->connect(gSubAddress.c_str());
subscriber->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
subscriber->setsockopt(ZMQ_SUBSCRIBE, "", 0);

while (1)
{
zmq::message_t update;
bool received = subscriber->recv(&update, ZMQ_DONTWAIT);
if(received)
{
gMtx.lock();
memcpy(gData, update.data(), gSize);
gMtx.unlock();
}
}
zmq_close(subscriber);
return NULL;
}

void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags)
{
flags_ = flags;
int confl = 1;

// Prepare our context
context_ = new zmq::context_t(1);

// Prepare ZMQ publisher
publisher_ = new zmq::socket_t(*context_, ZMQ_PUB);
publisher_->bind(pubAddress);
publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message

gSubAddress = std::string(subAddress);
pthread_create (&subscriber_worker_, NULL, worker_routine, context_);

if (flags_ & ZMQ_SYNC_PUB)
{
syncService_ = new zmq::socket_t(*context_, ZMQ_REP);
syncService_->bind(syncAddress);
}

if (flags_ & ZMQ_SYNC_SUB)
{
//std::cout << "Trying to connect" << std::endl;

// synchronize with publisher
syncService_ = new zmq::socket_t(*context_, ZMQ_REQ);
syncService_->connect(syncAddress);

// - send a synchronization request
zmq::message_t message(0);
syncService_->send(message);

// - wait for synchronization reply
zmq::message_t update;
syncService_->recv(&update);

// Third, get our updates and report how many we got
//std::cout << "Ready to receive" << std::endl;
}
}

void ZeromqMessenger::sync()
{
//std::cout << "sync" << std::endl;
if (connected_)
return;

if (flags_ & ZMQ_SYNC_PUB)
{
//std::cout << "Waiting for subscribers" << std::endl;
if (subscribers_ < subscribers_expected_)
{
// - wait for synchronization request
zmq::message_t update;
if (syncService_->recv(&update, ZMQ_DONTWAIT))
{
// - send synchronization reply
zmq::message_t message(0);
syncService_->send(message);

subscribers_++;
}
}

if (subscribers_ == subscribers_expected_)
connected_ = true;

//std::cout << subscribers_ << " subscriber(s) connected" << std::endl;
}
}

void ZeromqMessenger::send(const void* data, int size) const
{
zmq::message_t message(size);
memcpy(message.data(), data, size);
publisher_->send(message);
}

bool ZeromqMessenger::recv(void *data, int size, int flags) const
{
assert(gSize == size);
gMtx.lock();
memcpy(data, gData, size);
gMtx.unlock();
return true;
}
0

Другие решения

Других решений пока нет …

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector