конкретика
У меня есть потребитель, написанный на PHP, который пытается потреблять сообщения. Моя цель проста: если в очереди нет сообщений, отмените выполнение и продолжите, учитывая, что «данные не получены».
Текущие мысли
Я старался AMQP_NOWAIT
флаг, как:
$flag = AMQP_NOWAIT;
$this->queue->consume($callbackFunction, $flag, $this->consumerTag);
И это не сработало. Пока у меня есть обходной путь, вроде — я объявляю время ожидания соединения для \ AMQPConnection, как, скажем, 5 секунд, а затем поймать его таким образом:
try {
$this->consumer->consume($this->consumer->getReadMessageCallback($notifications, $requeue));
} catch (\AMQPConnectionException $connectionException) {
//based on timeouts. Are there other ways to interrupt empty queue consuming? AMQP_NOWAIT fails, does nothing:
return [];
}
Но это очень «хакерский» способ сделать это. Это работает для меня, но:
timeout
секундДальше — попробовал AMQP_IFEMPTY | AMQP_PASSIVE
на создание очереди. Дело в том, что он удалит очередь, если там нет сообщений, и вызовет исключение (которое я могу поймать) при попытке получить оттуда сообщения. Но тут возникает проблема вроде — очередь сразу удаляется и я даже не могу туда добавлять сообщения.
Вопрос
Чтение сообщений из пустой очереди — действительно распространенная проблема, поэтому я уверен, что это должен быть способ решить ее в нужном направлении. Таким образом, как бы я это сделал?
Да, ручные ссылки /pl/
так как нет «en» ссылок. Но в любом случае он более-менее читабелен, как и на английском.
Если вам нужно выяснить, пуста очередь или нет, вы можете позвонить AMQPQueue::declare()
который идемпотентен и в результате возвращает количество сообщений в очереди. Обратите внимание, что это число не очень точное (см. Зачем).
Кроме того, вы можете просто позвонить AMQPQueue::get()
(ведет себя так, как будто это сделано в админке).
И после этого все, как вы тоже пытались, можете установить AMQPConnection::setReadTimeout()
к какому-то низкому значению (в локальной сети может хватить 1 секунды), а затем вызвать AMQPQueue::consume()
и поймать исключение тайм-аута, если потребитель ждет слишком долго.
Что касается плохой документации, см. Ответ на этот вопрос: Где я могу найти документацию по php-amqp.
Других решений пока нет …