Как подтвердить прием сообщения в кафке с помощью php-rdkafka?

я использую PHP-rdkafka как клиент php kafka. Я успешно создал тестовое сообщение, используя test group.and использовать сообщение, используя приведенный ниже код,

$kafkaConsumer = new RdKafka\Consumer();
$kafkaConsumer->addBrokers("127.0.0.1:9292");
$topic = $kafkaConsumer->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
$msg = $topic->consume(0, 1000);
if($msg){
if ($msg->err) {
echo $msg->errstr(), "\n";
break;
} else {
echo $msg->payload, "\n";
}
}
}

Но когда я пытаюсь снова установить сообщение в test группа и пытается использовать сообщение для test группа, то я получаю старое сообщение, а также новое сообщение. Поэтому я просто хочу узнать, как я могу подтвердить старое сообщение, чтобы я мог получить только новое сообщение, а не старое? Может кто-нибудь придать этому блеска?

Моя версия Кафки 0.11.0.1

1

Решение

Метод подтверждения потребляемых сообщений в Kafka заключается в фиксации его смещения. Таким образом, при перезапуске вашего потребителя он может извлечь последнее зафиксированное смещение и перезапустить с того места, где он остановился.

Как предлагается в комментариях, вы должны использовать RD_KAFKA_OFFSET_STORED поручить потребителю извлечь сохраненное смещение.

Но вам также нужно указать имя группы, установив group.id конфигурации:

<?php

$conf = new RdKafka\Conf();

// Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1:9292");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("test", $topicConf);

// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
?>
2

Другие решения

Других решений пока нет …

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector