Использование сообщения RabbitMQ в контроллере

В настоящее время я изучаю сервисы, API и систему обмена сообщениями AMQP, точнее RabbitMQ, и я следую за этим руководство о сообщениях RabbitMQ. У меня все работает, но я хотел изменить некоторые вещи в своем проекте. Я хочу вызывать сценарии издателя и потребителя из моих маршрутов и контроллеров вместо того, чтобы вводить их в моем терминале (php src / publisher.php или же php src / consumer.php)

Для начала я создал два маршрута:

 Route::get('/send-message', 'ServiceAController@index');
Route::get('/receive-message', 'ServiceBController@index');

Первый маршрут (send-message) используется для отправки HTTP-запросов в виде сообщения в RabbitMQ, и это делается с помощью POST-запроса Postman, где я вставляю нужные параметры. Контроллер для этого маршрута отлично работает и выглядит так:

public function index(Request $request){
//Returning status 200 and sending message if amount is in range
if( (-100000000  <= $request->amount )  &&  ($request->amount <= 100000000 )){
//Sending message to RabbitMQ
$amount = $request->amount;
$currency = $request->currency;

//Saving request data to variable to publish it
$messageContent = json_encode([
'amount' => $amount * 100,
'currency' => $currency,
]);

//Sending broker message
$host = 'secret';
$port = 5672;
$user = 'secret';
$pass = 'secret';
$vhost = 'secret';
$exchange = 'balance';
$queue = 'local_balance';

$connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
$channel = $connection->channel();
/*
The following code is the same both in the consumer and the producer.
In this way we are sure we always have a queue to consume from and an
exchange where to publish messages.
*/
/*
name: $queue
passive: false
durable: true // the queue will survive server restarts
exclusive: false // the queue can be accessed in other channels
auto_delete: false //the queue won't be deleted once the channel is closed.
*/
$channel->queue_declare($queue, false, true, false, false);
/*
name: $exchange
type: direct
passive: false
durable: true // the exchange will survive server restarts
auto_delete: false //the exchange won't be deleted once the channel is closed.
*/
$channel->exchange_declare($exchange, 'direct', false, true, false);
$channel->queue_bind($queue, $exchange);
$messageBody = $messageContent;
$message = new AMQPMessage($messageBody, ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($message, $exchange);
$channel->close();
$connection->close();

//Returning json response of HTTP payload
$response = json_encode([
'amount' => +number_format($amount, 2, '.', ''),
'currency' => $currency,
]);
return $response;
}else{
//Returning status 400 if amount is not in acceptable range
abort(400, 'Amount is not in acceptable range'); //Returning code 400 if condition isn't met
}
}

Но мой проблема начинается, когда я помещаю свой потребительский код в ServiceBController, так же, как я делал это для предыдущего. Мой ServiceBController выглядит так:

public function index(){

$host = 'secret';
$port = 5672;
$user = 'secret';
$pass = 'secret';
$vhost = 'secret';
$exchange = 'balance';
$queue = 'local_balance';

$connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
$channel = $connection->channel();
/*
The following code is the same both in the consumer and the producer.
In this way we are sure we always have a queue to consume from and an
exchange where to publish messages.
*/
/*
name: $queue
passive: false
durable: true // the queue will survive server restarts
exclusive: false // the queue can be accessed in other channels
auto_delete: false //the queue won't be deleted once the channel is closed.
*/
$channel->queue_declare($queue, false, true, false, false);
/*
name: $exchange
type: direct
passive: false
durable: true // the exchange will survive server restarts
auto_delete: false //the exchange won't be deleted once the channel is closed.
*/
$channel->exchange_declare($exchange, 'direct', false, true, false);
$channel->queue_bind($queue, $exchange);
/**
* @param AMQPMessage $message
*/
function process_message(AMQPMessage $message){
$messageBody = json_decode($message->body);
$amount = $messageBody->amount;
$currency = $messageBody->currency;

file_put_contents('C:/xampp/htdocs/nsoft/data' . '.json', $message->body);

$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
*/
$consumerTag = 'local.consumer';
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
/**
* @param \PhpAmqpLib\Channel\AMQPChannel $channel
* @param \PhpAmqpLib\Connection\AbstractConnection $connection
*/
function shutdown($channel, $connection){
$channel->close();
$connection->close();
}

register_shutdown_function('shutdown', $channel, $connection);

while (count($channel->callbacks)) {
$channel->wait();
}
}

После вызова в Postman с запросом get я получаю следующую ошибку:

Symfony \ Component \ Debug \ Exception \ FatalErrorException: максимальное время выполнения в 30 секунд превышено в файле C: \ xampp \ htdocs \ nsoft \ vendor \ php-amqplib \ php-amqplib \ PhpAmqpLib \ Wire \ IO \ StreamIO.php в строке 227 ,

Я застрял с этой ошибкой в ​​течение нескольких дней, и, похоже, я не могу найти решение, поэтому мне нужна помощь кого-то. Что я тут не так делаю? Для справки, тот же потребительский скрипт работает, когда я помещаю его в отдельный файл (SRC / consumer.php) и когда я звоню через мой терминал. Любая помощь приветствуется.

0

Решение

изменить значение для параметра max_execution_time в php.ini

max_execution_time = 360   ;Execution time in seconds
0

Другие решения

Я могу посоветовать вам использовать драйвер очереди RabbitMQ для Laravel:

https://github.com/vyuldashev/laravel-queue-rabbitmq

С сервером RabbitMQ вы можете управлять всеми сообщениями через собственную систему очередей Laravel. Единственное, что вам нужно (после настройки), это запустить: php artisan queue:work в консоли.

Есть несколько способов использовать мгновенную обработку очереди. Например, вы можете использовать Supervisor:

http://supervisord.org/

0

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector