Как использовать API производителя KAFKA в librdkafka с кодом C ++ на Windows

Я пытаюсь написать клиента в качестве продюсера.
Я следовал примерам, чтобы создать новый консольный проект win32.
Я обнаружил, что API не работает для меня, если я не добавлю функцию getline () в конце моей программы.

Если я удаляю getline (), метод yield (..) по-прежнему возвращает результат успеха.
Однако я не вижу никакого ответа в командном окне kafka-console-consumer

Я немного запутался. Это правильно?
Как я могу отправить сообщение без использования getline ()?
Кто-нибудь знает?

Я нашел, почему это не работает.
Кажется, слишком быстро удалить объект производителя
в результате производитель не может отправлять сообщения брокеру.

Когда я добавляю сон 1000 между методом получения и удалением объекта производителя,
производитель может отправить сообщение правильно.

Итак, вопрос в том, как отправить сообщение немедленно.
Как я могу убедиться, что эти сообщения были отправлены полностью, прежде чем я уничтожу объект-производитель?

Как решить эту проблему, на самом деле я не люблю добавлять sleep () в мой исходный код.

win10 + vs2015 + kafka_2.10-0.9.0.1 + 3.4.6-Zookeeper + librdkafka
Пожалуйста, проверьте следующий код

    // kafka_test_win32_nomfc.cpp
//

#include "stdafx.h"#include <iostream>
#include "librdkafka/rdkafkacpp.h"

int static producer_1()
{
std::string brokers = "127.0.0.1";
std::string errstr;
std::string topic_str = "linli";
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
// MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;

/*
* Create configuration objects
*/
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

conf->set("metadata.broker.list", brokers, errstr);

RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}

std::cout << "% Created producer " << producer->name() << std::endl;

/*
* Create topic handle.
*/
RdKafka::Topic *topic = NULL;
if (!topic_str.empty()) {
topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
}

RdKafka::ErrorCode resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>("hello worlf"), 11,
NULL, NULL);

delete topic;
delete producer;
return 0;
}int static producer_2()
{
std::string brokers = "127.0.0.1";
std::string errstr;
std::string topic_str = "linli";
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
// MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;

RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

conf->set("metadata.broker.list", brokers, errstr);

RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}

std::cout << "% Created producer " << producer->name() << std::endl;

RdKafka::ErrorCode resp = producer->produce(topic_str, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
(void *)"hi", 2,
NULL, 0, 0, NULL);std::string errs(RdKafka::err2str(resp));
std::cout << errs << std::endl;
//producer->poll(0);delete producer;

return 0;
}int main()
{

producer_2();

return 0;
}

2

Решение

API librdkafka yield () (как C, так и C ++) является асинхронным, ваше сообщение изначально будет помещено в очередь только во внутренней очереди производителя и только позже (см. queue.buffering.max.ms Свойство конфигурации — по умолчанию 1 секунда) объединяется с другими сообщениями в пакет сообщений (MessageSet) и отправляется посреднику из фонового потока.

Ваша программа звонит produce() и затем быстро завершает работу, задолго до того, как поток фонового производителя получил возможность отправить сообщение посреднику, а тем более получить подтверждение от посредника.

Чтобы убедиться, что все оставшиеся сообщения были отправлены, позвоните flush()до прекращения подачи заявления.

Если ваше заявление является долгосрочным, вы должны позвонить poll() через регулярные промежутки времени, чтобы обслуживать любые зарегистрированные обратные вызовы отчета о доставке

4

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]