php rabbitmq потребитель переподключен

У меня есть приложение PHP, которое использует RabbitMQ. Для обеспечения избыточности я создал пару серверов RabbitMQ и соединил их в кластере. У меня также есть отказоустойчивый кластер VyOS, использующий HAProxy для балансировки нагрузки на соединения и обеспечения перенаправления в случае сбоя.

Вчера наш кластер VyOS решил, что необходимо переключение при сбое (возможно, кратковременное прерывание работы сети). HAproxy был остановлен на одном VyOS, виртуальный IP перемещен, и перезапустил HAproxy на другом узле.

После этого я просмотрел очереди в Rabbit и увидел, что в каждой очереди не было потребителей. Я проверил, на машинах, на которых работают потребители, все еще работает PHP Я оставил их на некоторое время, чтобы посмотреть, будут ли они снова подключаться, а они нет. Мне пришлось убить сценарий PHP и перезапустить их, и они снова подключились и сразу начали потреблять.

Я думаю, что RabbitMQ и HAproxy работают должным образом … Теперь мне нужно, чтобы потребитель PHP поддерживал событие переключения при отказе … другими словами, вместо того, чтобы просто зависать, он должен обнаруживать разъединение и автоматически переподключаться.

Вот мой класс RabbitMQ. Спасибо за любую помощь заранее!

<?php
while(true)
{
try{getMessages("transcode2");}
catch(Exception $e){echo($e->getMessage()."\n");}
sleep(1);
}
require_once("../api/db.php");
require_once("../vendor/autoload.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
function sendMessage($msg,$prio)
{
global $channel;
$msg=json_encode($msg);
$queue="transcode2";
$channel->queue_declare($queue,true,false,false,false);
$channel->basic_publish(new AMQPMessage($msg,array('priority' => $prio)),'',$queue);
}
function getMessages($queue)
{
global $connection,$channel;
$connection=new AMQPStreamConnection(RABBITMQ_SERVER,RABBITMQ_PORT,RABBITMQ_USERNAME,RABBITMQ_PASSWORD);
$channel=$connection->channel();
$channel->queue_declare($queue,true,false,false,false);
$callback=function($msg)
{
if(handleMessage(json_decode($msg->body,true)))
{
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
else
{
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true);
}
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue,'',false,false,false,false,$callback);
while(count($channel->callbacks))
{
try{$channel->wait();}
catch(Exception $e)
{
break;
}
}
$channel->close();
$connection->close();
}
?>

2

Решение

Это может работать, если вы используете параметр timeout для $ channel-> wait ();

0

Другие решения

Нулевой таймаут не работает должным образом, как вы сказали, что брокер может закрыть соединение, а потребитель PHP не обнаружит его.

Решение состоит в том, чтобы использовать нулевой тайм-аут при получении. Убедитесь, что время ожидания соединения больше, чем время получения.

Вот пример, основанный на AMQP Interop:

Установите совместимый с AMQP Interop транспорт, например:

composer require enqueue/amqp-bunny

Код выполняет те же действия, что и вы, с явно установленными таймаутами:

<?php
use Enqueue\AmqpBunny\AmqpConnectionFactory;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\AmqpQueue;

$context = (new AmqpConnectionFactory(sprintf(
'amqp://%s:%s@%s:%s/%2f?connection_timeout=600', // 10 min
RABBITMQ_USERNAME,RABBITMQ_PASSWORD, RABBITMQ_SERVER, RABBITMQ_PORT
)))->createContext();

$context->setQos(null,1,null);

//sendMessage

$queue = $context->createQueue("transcode2");
$queue->addFlag(AmqpQueue::FLAG_PASSIVE);
$context->declareQueue($queue);

$message = $context->createMessage(json_encode($msg));
$message->setPriority($prio);

$producer = $context->createProducer();
$producer->send($queue, $message);

// getMessages

$consumer = $context->createConsumer($queue);
$context->subscribe($consumer, function(AmqpMessage $message, AmqpConsumer $consumer) {
if(handleMessage(json_decode($message->getBody(), true))) {
$consumer->acknowledge($message);
} else {
$consumer->reject($message);
}

return true;
});

$receiveTimeout = 5000; // 5 seconds, should be lesser than connection_timeout which is 600 seconds now.

$context->consume($receiveTimeout);
0

По вопросам рекламы [email protected]