У меня есть приложение PHP, которое потребляет сообщения Кафки.
Проблема в том, как узнать, что в Кафке есть новые сообщения?
Первое решение — создать потребителя в PHP, а затем запустить его в цикле для проверки новых сообщений. Что-то вроде этого
<?php
namespace MyAppBundle\Command;
use MyAppBundle\EventSourcing\EventSerializer\JSONEventSerializer;
use MyAppBundle\Service\EventProjectorService;
use MyAppBundle\Service\KafkaService;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Process\Exception\RuntimeException;
class EventCommand extends ContainerAwareCommand
{
protected function configure()
{
$this
->setName('events:fetch');
}
protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var KafkaService $kafkaService */
$kafkaService = $this->getContainer()->get('store_locator.kafka_service');
/** @var EventProjectorService $eventProjector */
$eventProjector = $this->getContainer()->get('store_locator.event_projector');
while(1){
$messages = $kafkaService->fetchEvents();
foreach ($messages as $message) {
$eventProjector->aggregate($message);
}
}
$output->writeln("Finish");
}
}
Но мне это не нравится … Есть ли другой способ?
Если нет лучшего способа, как его сохранить? Например, когда что-то не получается.
Насколько я знаю, нет лучшего способа, чем бесконечный цикл и проверка новых сообщений. Обычный подход состоит в том, чтобы задача убивала себя после заданного количества времени или количества итераций, а затем использовала что-то вроде supervisord
обнаружить смерть и воскресить потребителя, чтобы он не съел все ваши ресурсы.
Других решений пока нет …