RabbitMq Неподтвержденные сообщения после Consumer basic_cancel

Хорошо, не вдаваясь в подробности всей системы, которую я настроил,

У меня проблема в том, что когда потребители отменяют (AMQPChannel-> basic_cancel) прослушивание очереди, это оставляет одно дополнительное сообщение без подтверждения этим работником. Это также не вызывает нормальный обратный вызов для обработки этого сообщения.

Несколько деталей

  • Очереди блокируются (используется ожидание) while(count($channel->callbacks)) { $channel->wait( ... ) ... }
  • Предварительная выборка равна 1, наименьшее, что вы можете иметь
  • Потребители могут динамически слушать (AMQPChannel->basic_consume)
  • Потребители могут динамически забыть (AMQPChannel->basic_cancel)

Я не буду вдаваться в точный способ, которым я говорю данному потребителю, чтобы он потреблял или отменял очередь на передачу. Но все работает отлично, начало потребления или отмену просто отлично. Но когда я отменяю очередь, в которой все еще есть сообщения, они просто забывают об этом последнем сообщении, и я полагаю, что отмена удаляет обратный вызов для этой очереди, и поэтому нет способа вернуть это сообщение, за исключением убийства потребителя, что нежелательно.

Я сделал некоторые отладки (просто пример, а не то, что я на самом деле)

   Debug::dump($this->getAmqpChannel()->getMethodQueue());
$tag = $this->_tags[$queue]; //I keep track of the consumer tag on a queue by queue basis, $queue == {queuename} below
$this->getAmqpChannel()->basic_cancel( $tag );
Debug::dump($this->getAmqpChannel()->getMethodQueue());

Выход это примерно

  array()
RunCommand: basic_cancel //this works fine consumer forgets queue except ->
array(1){
[0] => array(3){
[0] => string(5) "60,60",
[1] => string(114) "amq.ctag-D9om-gD_cPevzeon52zpig\0\0\0\0\0\0\0\0\0G{queuename}",  //{queuename} is the name of the queue, which is based on clients information I cant share (such as their name)
[2] => object(PhpAmqpLib\Message\AMQPMessage)#0 (9) {
["DELIVERY_MODE_NON_PERSISTENT":constant] => int(1),
["DELIVERY_MODE_PERSISTENT":constant] => int(2),
["body":public] => string(1358647) "{ ... "correlation_id":32,"max_correlation_id":38}"["body_size":public] => int(135864),
["is_truncated":public] => bool(false),
["content_encoding":public] => null,
["propertyDefinitions":protected static] => array(14){ ... }
["delivery_info":public] => array(0){},
["prop_types":protected] => array(14){ ... }
}
}

Как только рабочий умирает (или я его скорее убиваю), сообщение помещается обратно в очередь, и я могу вытащить его в компоненте управления RabbitMq (плагин) в разделе сообщений get. И вот оно,

  Properties
correlation_id: 32:38
delivery_mode:  2
content_encoding:   text/plain
content_type:   application/json

"correlation_id":32,"max_correlation_id":38 соответствует correlation_id: 32:38 потому что мне нужно отслеживать части сообщения. Так что я знаю, что это то же самое сообщение.

Так почему после отмены я получаю последнее сообщение, которое застряло в стране зомби, и есть ли возможность отправить его обратно в очередь, за исключением убийства потребителя.

Кроме того, это не разовое, это происходит каждый раз, когда я отменяю очередь, в которой все еще есть сообщения. Так что это не имеет ничего общего с данным сообщением. Похоже, что он получает одно последнее предпочтительное сообщение, а затем, потому что оно отменено, нет обратного вызова для последнего запуска, и он просто застрял в подвешенном состоянии. Помните, что 0 предварительная выборка — выборка всех сообщений, 1 — самая низкая, которую вы можете установить.

Любая может помочь была бы великолепна.

ОБНОВИТЬ

Я могу найти решение, позвонив

 $this->getAmqpChannel()->basic_recover(true); //basic_recover($requeue)

Либо до, либо после basic_cancel

Это отклоняет сообщение, и я могу даже проверить $this->getAmqpChannel()->getMethodQueue() как показано выше, чтобы увидеть, если $queue то, что я отменяю, содержит сообщение (пока не реализовано).

Я пытался избежать использования recover но я думаю, что все должно быть в порядке, потому что потребители используют один канал и блокируют, и в худшем случае они просто отклонят правильное сообщение 1 раз, что, хотя и не идеально, должно быть приемлемым.

Тем не менее, в некоторых случаях я получаю дополнительное исключение от Кролика,

  PRECONDITION_FAILED - unknown delivery tag {n}

Если у кого-то есть какие-либо подробности об этой дополнительной ошибке, это было бы здорово. Также все очереди требуют Ack, ни одна из них не является автоматической.

Update1

Я заметил в стеке трассировку queue_unbind поэтому то, что я сделал, — это внутреннее отслеживание привязок таким образом, чтобы я мог гарантировать, что удаление выполняется только один раз. Я опубликую некоторый код после того, как завтра проведу еще какое-то тестирование, но мое первоначальное тестирование после реализации, которое больше не приводило к ошибке.

Все это может показаться странным, и я могу объяснить, почему и что я делаю со всем этим, но это, вероятно, выходит за рамки вопроса. Я скажу, что я использовал эту систему в производстве более 2 лет (я разработал ее), и мы можем выполнять 180 тыс. Запросов в минуту (около 100, если учитывать все части системы). Мы также сделали более 280 миллиардов поисковых запросов, и я это сделал. Мы также являемся лидирующей компанией в нашей отрасли, которая либо устранила наших конкурентов, либо они прислали нам свои вещи и больше не занимаются этим самостоятельно. Это во многом из-за нашего быстрого поворота, а также качества наших данных. Так что эта система работает и работает очень хорошо.

Но в недавних аудитах я заметил, что потребителям Daily приходится иметь дело только с примерно 10 миллионами строк (примерно 100 минут работы), тогда как ночные потребители имеют дело примерно с 100 миллионами строк (или около 20 часов работы). Ежедневные потребители могут выполнять ночные задания, но только в нерабочее время (поскольку это сокращает время отклика в течение дня), поэтому существует примерно 10-часовое окно, в котором ночные задания выполняются только на гораздо меньшем и менее работоспособном сервере. Это решение дает нам то, что если нет ежедневных заданий (заданий, отправленных клиентами), они могут динамически переключаться на ночные данные (хранилища данных) на лету. Это должно сохранить большую часть оперативности, при этом не тратя ресурсы, когда никакие задания не отправляются. Мы можем по горизонтали масштабировать столько, сколько мы хотим в поисках, но мы платим много за наш главный сервер и тратим около 8 часов работы, которую мы могли бы сделать.

Я мог бы, вероятно, заполнить небольшую книгу тем, как все это работает, но, надеюсь, это даст некоторое представление о том, что я делаю. Я также связан некоторыми пунктами о неразглашении и неконкурентоспособности моего контракта, так что я действительно могу вникнуть в конкретные детали.

0

Решение

Consumer в фразеологии RabbitMQ означает подписчика в очереди. (Увидеть этот ответ для получения подробной информации о различиях между каналом, потребителем и подключением).

Когда вы включаете подтверждения, они включаются для channel, Любые сообщения, доставленные по этому каналу, будут иметь тег доставки ассоциируется с ними. Когда вы закончите обработку сообщения, вам нужно сообщить серверу по тому же каналу, что сообщение было обработано. Отмена потребителя не влияет на подтверждение уже доставленных сообщений. Фактически, это был бы совершенно допустимый вариант использования для получения сообщения, отмены потребителя, обработки сообщения и отправки подтверждения.

Таким образом, у вас есть два варианта. Вы можете оставить сообщение неподтвержденным, и в этом случае все, что вам нужно сделать, это закрыть канал, и оно будет помещено в очередь в начале очереди. Или вы можете подтвердить это (либо nack или же ack), в этом случае сообщение будет помещено в очередь, если nack или упал, если ack,

Если я правильно помню, НЕ указав количество предварительной выборки (через basic.qos) приведет к предварительной выборке, равной нулю, что означает, что вы должны подтвердить предыдущее сообщение, прежде чем получите следующее сообщение. Я могу ошибаться в этом. Конечно, если вы используете basic.get, Вы избегаете этой проблемы в целом с очень небольшим влиянием на производительность.

0

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]