Кафка не сохраняет смещение, если потребляет короткое время

проблема

Потребитель с определенным идентификатором группы подключается к брокеру, прослушивает тему менее 1 минуты и отключается (согласно бизнес-логике). Пока он слушает тему, он может потреблять некоторые сообщения.
Когда тот же потребитель повторяет это действие, он потребляет те же сообщения!

Я обнаружил, что Кафка сохраняет смещение с интервалом в 1 минуту. Это означает, что потребитель должен слушать тему более 1 минуты.
Как я могу уменьшить этот интервал?

Я нашел такие свойства:

  • log.flush.offset.checkpoint.interval.ms
  • log.flush.start.offset.checkpoint.interval.ms
  • offset.flush.interval.ms — выглядит наиболее подходящим

Я пытаюсь установить их в server.properties файл:

log.flush.offset.checkpoint.interval.ms=6000
log.flush.start.offset.checkpoint.interval.ms=6000
offset.flush.interval.ms=6000

Перезапустите Кафку и Zookeeper. Но это не помогает. Потребитель все еще должен слушать тему более 1 минуты. Что я делаю не так?

Моя среда

  • Кафка и Zookeeper через Confluent.
  • php-rdkafka как клиентская библиотека
  • enable.auto.commit установлен в true

Я использую низкий уровень потребителя. auto.offset.reset установлен в smallest,
Пример кода

<?php
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$conf = new \RdKafka\Conf();
$conf->set('group.id', 'foo');

$kafkaConsumer = new \RdKafka\Consumer($conf);
$kafkaConsumer->addBrokers('queue.a:9092');
$kafkaConsumer->setLogLevel(LOG_DEBUG);

$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'smallest');

$queue = $kafkaConsumer->newQueue();
$topic = $kafkaConsumer->newTopic('topic_name', $topicConf);
$topic->consumeQueueStart(0, \RD_KAFKA_OFFSET_STORED, $queue);

while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
}
}

1

Решение

Вы должны попытаться явно зафиксировать смещение в вашем потребителе:

Явная фиксация смещений у потребителей
Если вы используете автоматическую фиксацию смещения, вам не нужно беспокоиться о явной фиксации смещения. Но вам нужно подумать о том, как вы будете фиксировать смещения, если решите, что вам нужен больший контроль над временем принятия смещений — либо для минимизации дубликатов, либо потому, что вы выполняете обработку событий вне основного цикла опроса потребителей.

Извлечь из Кафка полное руководство, страница 127. (Это бесплатная электронная книга, которую вы можете скачать)

Рекомендуется, чтобы вы Всегда фиксировать смещения после обработки событий Если вы выполняете всю обработку в цикле опроса и не поддерживаете состояние между циклами опроса (например, для агрегации), это должно быть легко. Вы можете использовать конфигурацию автоматической фиксации или зафиксировать события в конце цикла опроса.

Я не использовал php-клиент сам, но выглядит так это может быть то, что вам нужно.

Добавление к вашему примеру кода выше:

while (true) {
$msg = $queue->consume(2000);
if ($msg !== null) {
var_dump($msg);
$kafkaConsumer->commit($msg);
}
}
1

Другие решения

Других решений пока нет …

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector