Блокировать потребителя в пустой очереди

конкретика

У меня есть потребитель, написанный на PHP, который пытается потреблять сообщения. Моя цель проста: если в очереди нет сообщений, отмените выполнение и продолжите, учитывая, что «данные не получены».

Текущие мысли

Я старался AMQP_NOWAIT флаг, как:

$flag = AMQP_NOWAIT;
$this->queue->consume($callbackFunction, $flag, $this->consumerTag);

И это не сработало. Пока у меня есть обходной путь, вроде — я объявляю время ожидания соединения для \ AMQPConnection, как, скажем, 5 секунд, а затем поймать его таким образом:

try {
$this->consumer->consume($this->consumer->getReadMessageCallback($notifications, $requeue));
} catch (\AMQPConnectionException $connectionException) {
//based on timeouts. Are there other ways to interrupt empty queue consuming? AMQP_NOWAIT fails, does nothing:
return [];
}

Но это очень «хакерский» способ сделать это. Это работает для меня, но:

  • Все еще блокирует код для timeout секунд
  • Очевидно, что произойдет сбой, если у меня будет слишком много сообщений (то есть он не сможет завершить до истечения времени ожидания).
  • Более того, это даже не задокументировано, поэтому полагаться на это следует в крайнем случае.

Дальше — попробовал AMQP_IFEMPTY | AMQP_PASSIVE на создание очереди. Дело в том, что он удалит очередь, если там нет сообщений, и вызовет исключение (которое я могу поймать) при попытке получить оттуда сообщения. Но тут возникает проблема вроде — очередь сразу удаляется и я даже не могу туда добавлять сообщения.

Вопрос

Чтение сообщений из пустой очереди — действительно распространенная проблема, поэтому я уверен, что это должен быть способ решить ее в нужном направлении. Таким образом, как бы я это сделал?

Да, ручные ссылки /pl/ так как нет «en» ссылок. Но в любом случае он более-менее читабелен, как и на английском.

1

Решение

Если вам нужно выяснить, пуста очередь или нет, вы можете позвонить AMQPQueue::declare() который идемпотентен и в результате возвращает количество сообщений в очереди. Обратите внимание, что это число не очень точное (см. Зачем).

Кроме того, вы можете просто позвонить AMQPQueue::get() (ведет себя так, как будто это сделано в админке).

И после этого все, как вы тоже пытались, можете установить AMQPConnection::setReadTimeout() к какому-то низкому значению (в локальной сети может хватить 1 секунды), а затем вызвать AMQPQueue::consume() и поймать исключение тайм-аута, если потребитель ждет слишком долго.

Что касается плохой документации, см. Ответ на этот вопрос: Где я могу найти документацию по php-amqp.

2

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]