Kafka Protobuf: сериализация C ++ в Java

Я разработал пару приложений на C ++, которые создают и используют сообщения Kafka (используя cppkafka), встраивающие сообщения Protobuf3. Оба работают нормально. Соответствующий код производителя:

std::string kafkaString;
cppkafka::MessageBuilder *builder;
...
solidList->SerializeToString(&kafkaString);
builder->payload(kafkaString);

Объекты Protobuf сериализуются в строку и вставляются как полезные данные Kafka. До этого момента все работало нормально. Сейчас я пытаюсь разработать потребитель для этого на Java. Соответствующий код должен быть:

KafkaConsumer<Long, String> consumer=new KafkaConsumer<Long, String>(properties);
....
ConsumerRecords<Long, String> records = consumer.poll(100);
for (ConsumerRecord<Long, String> record : records) {
SolidList solidList = SolidList.parseFrom(record.value());
...

но это не удается во время компиляции: parseFrom жалуется: Метод parseFrom (ByteBuffer) в типе Solidlist.SolidList не применим для аргументов (String). Итак, я пытаюсь использовать ByteBuffer:

KafkaConsumer<Long, ByteBuffer> consumer=new KafkaConsumer<Long, ByteBuffer>(properties);
....
ConsumerRecords<Long, ByteBuffer> records = consumer.poll(100);
for (ConsumerRecord<Long, ByteBuffer> record : records) {
SolidList solidList = SolidList.parseFrom(record.value());
...

Теперь ошибка находится во время выполнения, все еще в parseFrom (): Исключение в потоке «main» java.lang.ClassCastException: java.lang.String не может быть приведено к java.nio.ByteBuffer.. Я знаю, что это java.lang.String !!! Итак, я возвращаюсь к оригиналу и пытаюсь использовать его как байтовый массив:

    SolidList solidList = SolidList.parseFrom(record.value().getBytes());

Теперь ошибка на время выполнения: Исключение в потоке «main» com.google.protobuf.InvalidProtocolBufferException $ InvalidWireTypeException: тег сообщения протокола имел недопустимый тип проводника..

Документация protobuf заявляет для сериализации C ++: bool SerializeToString (строка output) const ;: сериализует сообщение и сохраняет байты в заданной строке. Обратите внимание, что байты являются двоичными, а не текстовыми; мы используем только строковый класс в качестве удобного контейнера. *

TL; DR: В результате, как я должен интерпретировать протобуф C ++ «двоичные байты» в Java?

Это кажется связанным (это наоборот), но не помогает: Protobuf Java To C ++ Сериализация [Бинарный]

Заранее спасибо.

3

Решение

Попробуйте реализовать десериализатор и передать его KafkaConsumer Конструктор как значение десериализатора. Это может выглядеть так:

class SolidListDeserializer implements Deserializer<SolidList> {
public SolidList deserialize(final String topic, byte[] data) {
return SolidList.parseFrom(data);
}
...
}

...

KafkaConsumer<Long, SolidList> consumer = new KafkaConsumer<>(props, new LongDeserializer(), new SolidListDeserializer())
1

Другие решения

Вы можете прочитать кафку как ConsumerRecords<Long, String>, А потом SolidList.parseFrom(ByteBuffer.wrap(record.value().getBytes("UTF-8")));

1

По вопросам рекламы [email protected]