Как использовать очередь отложенных сообщений RabbitMQ из PHP?

Я пытаюсь использовать Задержка очереди сообщений для RabbitMQ из PHP, но мои сообщения просто исчезают.

Я объявляю обмен со следующим кодом:

$this->channel->exchange_declare(
'delay',
'x-delayed-message',
false,  /* passive, create if exchange doesn't exist */
true,   /* durable, persist through server reboots */
false,  /* autodelete */
false,  /* internal */
false,  /* nowait */
['x-delayed-type' => ['S', 'direct']]);

Я связываю очередь с этим кодом:

$this->channel->queue_declare(
$queueName,
false,  /* Passive */
true,   /* Durable */
false,  /* Exclusive */
false   /* Auto Delete */
);
$this->channel->queue_bind($queueName, "delay", $queueName);

И я публикую сообщение с этим кодом:

$msg = new AMQPMessage(json_encode($msgData), [
'delivery_mode' => 2,
'x-delay' => 5000]);
$this->channel->basic_publish($msg, 'delay', $queueName);

Но сообщение не задерживается; это все еще немедленно доставлено. Что мне не хватает?

3

Решение

От Вот,

Создание сообщения должно быть

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$msg = new AMQPMessage($data,
array(
'delivery_mode' => 2, # make message persistent
'application_headers' => new AMQPTable([
'x-delay' => 5000
])
)
);
3

Другие решения

Ответ для тех, кто нуждается в задержке сообщений, но не хочет копаться в деталях. Вам нужно всего несколько вещей, чтобы это заработало:

устанавливать amqp interop например, совместимый транспорт enqueue/amqp-bunny а также enqueue/amqp-tools,

composer require enqueue/amqp-bunny enqueue/amqp-tools

Создайте контекст amqp, добавьте стратегию задержки и отправьте отложенные сообщения:

<?php
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;

$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy())

$queue = $context->createQueue('foo');
$context->declareQueue($queue);

$message = $context->createMessage('Hello world!');

$context->createProducer()
->setDeliveryDelay(5000) // 5 sec
->send($queue, $message)
;

Кстати, это не единственная доступная стратегия. есть одна, основанная на очередях мертвых писем RabbitMQ + ttl. Это может быть использовано так же.

3

вам нужен ключ маршрутизации для публикации из обмена в очередь, о которой идет речь.

причина публикации во встроенном прямом обмене работает, потому что этот обмен является особым случаем, который использует ключ маршрутизации в качестве имени очереди назначения.

для всех создаваемых вами обменов и очередей необходимо создать привязку между обменом и очередью с помощью ключа маршрутизации. затем вы публикуете сообщение с этим ключом маршрутизации вместо имени очереди назначения.

я не знаю код PHP для создания привязки … но обычно он выглядит примерно так:

channel.bind(exhange_name, queue_name, routing_key)

тогда в вашей публикации сообщения:

$this->channel->basic_publish($msg, 'delay', $routing_key);

1
По вопросам рекламы [email protected]