Кафка — как следить за новыми сообщениями

У меня есть приложение PHP, которое потребляет сообщения Кафки.
Проблема в том, как узнать, что в Кафке есть новые сообщения?
Первое решение — создать потребителя в PHP, а затем запустить его в цикле для проверки новых сообщений. Что-то вроде этого

<?php

namespace MyAppBundle\Command;

use MyAppBundle\EventSourcing\EventSerializer\JSONEventSerializer;
use MyAppBundle\Service\EventProjectorService;
use MyAppBundle\Service\KafkaService;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Process\Exception\RuntimeException;

class EventCommand extends ContainerAwareCommand
{
protected function configure()
{
$this
->setName('events:fetch');
}

protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var KafkaService $kafkaService */
$kafkaService = $this->getContainer()->get('store_locator.kafka_service');
/** @var EventProjectorService $eventProjector */
$eventProjector = $this->getContainer()->get('store_locator.event_projector');

while(1){
$messages = $kafkaService->fetchEvents();

foreach ($messages as $message) {
$eventProjector->aggregate($message);
}
}
$output->writeln("Finish");
}
}

Но мне это не нравится … Есть ли другой способ?

Если нет лучшего способа, как его сохранить? Например, когда что-то не получается.

2

Решение

Насколько я знаю, нет лучшего способа, чем бесконечный цикл и проверка новых сообщений. Обычный подход состоит в том, чтобы задача убивала себя после заданного количества времени или количества итераций, а затем использовала что-то вроде supervisord обнаружить смерть и воскресить потребителя, чтобы он не съел все ваши ресурсы.

1

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]