Я переписываю свой код C ++ для использования KafkaConsumer а также Очередь.
Это фрагмент (в псевдокоде) из моего исходного кода:
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
//set basic conf
setGlobalConf("bootstrap.servers", brokers);
setGlobalConf("metadata.broker.list", brokers);
setGlobalConf("client.id", "app-meteo");
setGlobalConf("compression.codec", "snappy");
//set group id
setGlobalConf("group.id", params.at("group.id"));
//set default_topic_conf
tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
setConf(tconf, "auto.offset.reset", "earliest");
conf->set("default_topic_conf", tconf, errstr);
//create consuner
auto consumer = RdKafka::KafkaConsumer::create(conf, errstr);
consumer->subscribe("my_topic");
//create the queue for that consumer
auto queue = RdKafka::Queue::create(consumer);
//then ...
while (true)
{
RdKafka::Message* msg = consumer->consume(1000);
do_smthg(*msg, nullptr);
}
проблема здесь в том, что msg содержит ошибку, которая всегда: RdKafka :: ERR__TIMED_OUT
Я не знаю, почему это происходит, даже если внутри много сообщений моя тема.
Примечание: я следовал за FAQ и документацией, но, возможно, что-то упустил.
Задача ещё не решена.
Других решений пока нет …