rabbitmq — лучший способ создать PHP кролик работника

Предположим, у меня есть очередь кроликов, которая очень часто заполняется некоторыми данными (например, пользователь предоставляет некоторые действия, которые мы должны проанализировать позже). От 30 до 50 новых предметов добавляются каждую секунду.
Мне нужно создать работника, который будет просматривать эту очередь и выполнять некоторые задачи над этими данными. Я могу сделать так:

class Worker
{

public function run()
{
$queue = new Queue('exchange', 'queue');
while (true)
{
$queue->processQueue();
}
}
}

А потом просто беги worker.php на сервере, и это, кажется, работает.

Но мне интересно, добавит ли этот бесконечный цикл дополнительную нагрузку на мой экземпляр кролика, если нет данных для продолжения? Может быть, лучший способ сделать что-то вроде:

class Worker
{
CONST IDLE = 5;

private $start = 0;

public function run()
{
$this->start = time();

$queue = new Queue('exchange', 'queue');
while (true)
{
$queue->processQueue();

//don't allow this worker to be working a lot
if (time() - $this->start >= 60 * 60 - self::IDLE)
{
break;
}

sleep(self::IDLE);
}

$queue->close();
}
}

Так что моя работа не будет постоянно извлекать данные у кролика, а будет спать некоторое время. И после одного часа работы он просто перестанет работать, и другой экземпляр рабочего будет вызван заданием crontab или чем-то еще?

1

Решение

Для управления моими работниками с rabbitmq я использую следующую библиотеку:

https://github.com/php-amqplib/php-amqplib

Затем я создаю класс, который определяет, как должны работать мои работники (содержит всю логику rabbitmq), он дает мне что-то вроде этого:

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

abstract class QueueAMQPConsumer
{
protected $connection;

protected $debug;

protected $queueName;

protected $exchange;

public function __construct(AMQPStreamConnection $AMQPConnection, $queueName, $exchange = null)
{
$this->connection = $AMQPConnection;
$this->queueName = $queueName;
$this->exchange = $exchange;
}

public function run($debug = false)
{
$this->debug = $debug;
$channel = $this->connection->channel();
if ($this->exchange !== null) {
$channel->exchange_declare($this->exchange, "topic", false, true, false);
}

$channel->queue_declare($this->queueName, false, true, false, false);
if ($this->exchange !== null) {
$channel->queue_bind($this->queueName, $this->exchange);
}

$channel->basic_qos(null, 1, null);
$channel->basic_consume($this->queueName, '', false, false, false, false, [$this, 'callback']);

while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$this->connection->close();
}final function callback(AMQPMessage $message)
{
$result = $this->process($message);

if (false === $result) {
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true);
} else {
$message->delivery_info['channel']->basic_ack($message->   delivery_info['delivery_tag']);
}
}

/**
* @param AMQPMessage $message
*
* @return bool
*/
abstract protected function process(AMQPMessage $message);
}

Этот класс позволяет настроить очередь, обмен (в данном случае это тема), QoS (вы можете настроить все эти параметры, это только пример) и т. Д.

Затем он зациклится на обратном вызове. Здесь обратный вызов — это процесс абстрактного метода (…), который будет реализован на ваших разных рабочих, которым необходимо обработать очередь. Так что ответственность за «петлю / прослушивание» лежит на канале: $channel->wait();

Затем я создам дочерний класс, который должен обрабатывать сообщения в очереди:

class MyWorker extends QueueAMQPConsumer
{
protected function process(AMQPMessage $message)
{
// .... process your message here
}
}

Таким образом, работник будет постоянно прослушивать вашу очередь и обрабатывать сообщения в тот момент, когда они поступили в очередь.
Если твой process(...) вернуть что-то кроме false, сообщение будет подтверждено.

Вы просто должны запустить свой класс так:

$consumer = new MyWorker(....);
$consumer->run();
2

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]