Привет следующий код для моего потребителя,
$conf->set('group.id','commonqueue2');
$conf->set('offset.store.method', 'broker');
$rk = new \RdKafka\Consumer($conf);
$rk->addBrokers($kafka_servers);
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms',1000);
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'smallest');
$topic = $rk->newTopic("registration17", $topicConf);
$topic->consumeStart(0,RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0,1000);
//store offset to broker
$topic->offsetStore($message->partition,$message->offset);
$message_offset = $message->offset;
echo $message_data = $message->payload;
}//end of while loop
В вышеупомянутой программе, как я могу найти последнее сохраненное смещение перед выполнением следующего кода
$topic->consumeStart(0,RD_KAFKA_OFFSET_STORED);
Есть 2 способа получить последние смещения:
1. От зоопарка Shell
Используя coomand get / consumer // offsets // num_of_partitions
Для помощи используйте эту ссылку: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
2. Чтобы получить последнее смещение, мы даже можем использовать команду: bin / kafka-run-class.sh kafka.tools.ConsumerOffsetChecker —group —zookeeper: —topic
Других решений пока нет …