Использование RDKAFKA Назначение нескольких идентификаторов групп нескольким потребителям

Я использовал RDKAFKA в php для параллельной работы потребителя.

Но первый потребитель потреблял все сообщения из темы, поэтому второй потребитель не получил сообщения темы.

Таким образом, я использовал разные идентификаторы группы для разных потребителей, но эта проблема не решена.

Пожалуйста, помогите мне.
$conf-> set (‘group.id’, ‘myConsumerGroup’);
$conf-> set (‘metadata.broker.list’, ‘127.0.0.2’);
$topicConf = new RdKafka \ TopicConf ();
$topicConf-> set (‘auto.offset.reset’, ‘smalllest’);
$conf-> setDefaultTopicConf ($topicConf);
$потребитель1 = новый RdKafka \ KafkaConsumer ($конф);
$consumer1-> подписаться ([ ‘DBTEST’]);
while (true) {
$j = 0;
$сообщение = $consumer1-> потребляют (120 * 1000);

переключатель ($message-> err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$dataEx = json_decode ($Сообщение-> Полезная нагрузка, правда);
var_dump ($данные);
$sql = «INSERT INTO emp (имя, адрес электронной почты) VALUES (‘».$dataEx [ ‘имя’]. ​​» »».$dataEx [ ‘электронная почта’] «»)».

    `$`servername = "localhost";
`$`username = "A";
`$`password = "ASD";
`$`dbname = "test";
`$`conn = new mysqli(`$`servername, `$`username, `$`password, `$`dbname);
if (`$`conn->connect_error) {
die("Connection failed: " . `$`conn->connect_error);
}

if (`$`conn->query(`$`sql) === TRUE) {
echo "New record created successfully to datbase ".`$`dbname."/n";
} else {
echo "Error: " . `$`sql . "<br>" . `$`conn->error;
}
`$`conn->close();
echo "produced `$`j----------------------<br> ";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception(`$`message->errstr(), `$`message->err);
break;
}
$j++;

}
эхо «здесь»;
$conf-> set (‘group.id’, ‘myConsumerGroup1’);
$conf-> set (‘metadata.broker.list’, ‘127.0.0.2’);
$topicConf = new RdKafka \ TopicConf ();
$topicConf-> set (‘auto.offset.reset’, ‘smalllest’);
$conf-> setDefaultTopicConf ($topicConf);

$потребитель2 = новый RdKafka \ KafkaConsumer ($конф);
эхо «сонали»;
$consumer2-> подписаться ([ ‘DBTEST’]);
while (true) {
$message2 = $consumer2-> потребляют (120 * 1000);
переключатель ($message2-> err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$dataEx = json_decode ($message2-> Полезная нагрузка, правда);
« $ sql = «INSERT INTO emp (name, email) ЗНАЧЕНИЯ (‘». $ dataEx [‘ name ‘]. «‘, ‘».$dataEx [ ‘электронная почта’] «»)».
$ servername1 = «localhost»;
$ username1 = «A»;
$ password1 = «ASD»;
$ dbname1 = «test1»;
$conn1 = новый mysqli ($servername1, $username1, $password1, $dbname1);
если ($conn1-> connect_error) {
die («Ошибка подключения:». $conn1-> connect_error);
}

    if (`$`conn1->query(`$`sql) === TRUE) {
echo "New record created successfully ".`$`dbname1;
} else {
echo "Error: " . `$`sql . "<br>" . `$`conn1->error;
}
`$`conn1->close();
echo "produced `$`j----------------------<br> ";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception(`$`message->errstr(), `$`message->err);
break;
}
`$`j++;

}

0

Решение

Задача ещё не решена.

Другие решения

Других решений пока нет …

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector