Отправка данных через ZeroMQ (zmqpp) с использованием MsgPack приводит к ошибке «msgpack :: v1 :: достаточный_байт»

Я сделал соединение PUB / SUB, используя zmqpp и теперь я хочу отправить данные от издателя подписчикам, используя только заголовок, версию C ++ 11 msgpack-с.

Издатель должен отправить 2 int64_t цифры — header_1 а также header_2 — с последующим std::vector<T>data —, где T определяется (header_1, header_2) сочетание.

Так как примеров того, как объединить msgpack и zmqpp, не так много, идея, которую я придумала, состоит в том, чтобы отправить сообщение из 3 частей, используя zmqpp::message::add/add_raw. Каждая часть будет уплотненный/распакованный используя msgpack.

Издатель упаковывает одну часть данных следующим образом:

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());

И получатель распаковывает это так:

zmqpp::message msg;
subscriberSock.receive(msg);

int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);

Когда я запускаю код, я получаю следующую ошибку на стороне подписчика:

terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
what():  insufficient bytes
Aborted

Кроме того, кажется, что zmqpp сгенерировал сообщение из 5 частей, хотя я звонил add() только 3 раза.

Q1: правильно ли я упаковываю / распаковываю данные?

Q2: это правильный метод для отправки буферов msgpack с использованием zmqpp?

Вот важные части кода:

издатель

zmqpp::socket publisherSock;
/* connection setup stuff ...*/

// forever send data to the subscribers
while(true)
{
zmqpp::message msg;

// meta info about the data
int64_t header_1 = 1234567;
int64_t header_2 = 89;
// sample data
std::vector<double> data;
data.push_back(1.2);
data.push_back(3.4);
data.push_back(5.6);{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
cout << "header_1:" << header_1 << endl;  // header_1:1234567
}

{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_2);
msg.add(buffer.data(), buffer.size());
cout << "header_2:" << header_2 << endl;  // header_2:89
}

{
msgpack::sbuffer buffer;
msgpack::pack(buffer, data);
msg.add_raw(buffer.data(), buffer.size());
std::cout << "data: " << data << std::endl;  // data:[1.2 3.4 5.6]
}

std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"... why ?
publisherSock.send(msg);

std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

подписчик

zmqpp::socket subscriberSock;
/* connection setup stuff ...*/

zmqpp::message msg;
subscriberSock.receive(msg);

int64_t header_1;
int64_t header_2;
std::vector<double> data;

std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"{
// header 1
{
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
cout << "header_1:" << header_1 << endl;
}
// header 2
{
msgpack::unpacked unpackedData;
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(1)),
msg.size(1));
unpackedData.get().convert(&header_2);
cout << "header_2:" << header_2 << endl;
}
// data
{
msgpack::unpacked unpacked_data;
msgpack::unpack(unpacked_data,
static_cast<const char*>(msg.raw_data(2)),
msg.size(2));
unpacked_data.get().convert(&data);
std::cout << "data:" << data << std::endl;
}

}

РЕДАКТИРОВАТЬ: Проблема решена: как отмечает @Jens, правильный способ упаковки / отправки данных заключается в использовании zmqpp::message::add_raw()

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());

1

Решение

Я думаю, что звонки msg.add(buffer.data(), buffer.size()не добавлять массив buffer.size() байт, но вызов message::add(Type const& part, Args &&...args), который

  1. msg << buffer.data()что, вероятно, вызывает message::operator<<(bool) так как указатель преобразуется в bool
  2. add(buffer.size()) который затем вызывает msg << buffer.size(), который добавляет size_t значение в качестве следующей части.

Глядя на класс zmqpp :: message, использование message :: add_raw должно помочь.

PS: это все без какой-либо гарантии, потому что я никогда не использовал zmqpp или msgpack.

2

Другие решения


По вопросам рекламы [email protected]