Я сделал соединение PUB / SUB, используя zmqpp и теперь я хочу отправить данные от издателя подписчикам, используя только заголовок, версию C ++ 11 msgpack-с.
Издатель должен отправить 2 int64_t
цифры — header_1
а также header_2
— с последующим std::vector<T>
— data
—, где T
определяется (header_1, header_2)
сочетание.
Так как примеров того, как объединить msgpack и zmqpp, не так много, идея, которую я придумала, состоит в том, чтобы отправить сообщение из 3 частей, используя zmqpp::message::add/add_raw
. Каждая часть будет уплотненный/распакованный используя msgpack.
Издатель упаковывает одну часть данных следующим образом:
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
И получатель распаковывает это так:
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
Когда я запускаю код, я получаю следующую ошибку на стороне подписчика:
terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
what(): insufficient bytes
Aborted
Кроме того, кажется, что zmqpp сгенерировал сообщение из 5 частей, хотя я звонил add()
только 3 раза.
Q1: правильно ли я упаковываю / распаковываю данные?
Q2: это правильный метод для отправки буферов msgpack с использованием zmqpp?
Вот важные части кода:
издатель
zmqpp::socket publisherSock;
/* connection setup stuff ...*/
// forever send data to the subscribers
while(true)
{
zmqpp::message msg;
// meta info about the data
int64_t header_1 = 1234567;
int64_t header_2 = 89;
// sample data
std::vector<double> data;
data.push_back(1.2);
data.push_back(3.4);
data.push_back(5.6);{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
cout << "header_1:" << header_1 << endl; // header_1:1234567
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_2);
msg.add(buffer.data(), buffer.size());
cout << "header_2:" << header_2 << endl; // header_2:89
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, data);
msg.add_raw(buffer.data(), buffer.size());
std::cout << "data: " << data << std::endl; // data:[1.2 3.4 5.6]
}
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"... why ?
publisherSock.send(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
подписчик
zmqpp::socket subscriberSock;
/* connection setup stuff ...*/
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
int64_t header_2;
std::vector<double> data;
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"{
// header 1
{
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
cout << "header_1:" << header_1 << endl;
}
// header 2
{
msgpack::unpacked unpackedData;
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(1)),
msg.size(1));
unpackedData.get().convert(&header_2);
cout << "header_2:" << header_2 << endl;
}
// data
{
msgpack::unpacked unpacked_data;
msgpack::unpack(unpacked_data,
static_cast<const char*>(msg.raw_data(2)),
msg.size(2));
unpacked_data.get().convert(&data);
std::cout << "data:" << data << std::endl;
}
}
РЕДАКТИРОВАТЬ: Проблема решена: как отмечает @Jens, правильный способ упаковки / отправки данных заключается в использовании zmqpp::message::add_raw()
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());
Я думаю, что звонки msg.add(buffer.data(), buffer.size()
не добавлять массив buffer.size()
байт, но вызов message::add(Type const& part, Args &&...args)
, который
msg << buffer.data()
что, вероятно, вызывает message::operator<<(bool)
так как указатель преобразуется в booladd(buffer.size())
который затем вызывает msg << buffer.size()
, который добавляет size_t
значение в качестве следующей части.Глядя на класс zmqpp :: message, использование message :: add_raw должно помочь.
PS: это все без какой-либо гарантии, потому что я никогда не использовал zmqpp или msgpack.