Я пытаюсь отправить некоторые данные из приложения php в браузер пользователя с помощью веб-сокетов. Поэтому я решил использовать Swoole в сочетании с RabbitMQ.
Я впервые работаю с веб-сокетами, и после прочтения некоторых постов о Socket.IO, Ratchet и т. Д. Я решил остановиться на Swoole, потому что он написан на C и удобен для использования с php.
Вот как я понял идею включения передачи данных с помощью веб-сокетов:
1) Запустите рабочий RabbitMQ и сервер Swoole в CLI
2) приложение php отправляет данные в RabbitMQ
3) RabbitMQ отправляет сообщение с данными работнику
4) Рабочий получает сообщение с данными + устанавливает сокет-соединение с сервером сокетов Swoole.
5) Сервер Swoole транслирует данные на все соединения
Вопрос в том, как связать сервер сокетов Swoole с RabbitMQ? Или как заставить RabbitMQ установить соединение с Swoole и отправить на него данные?
Вот код:
Swoole сервер (swoole_sever.php)
$server = new \swoole_websocket_server("0.0.0.0", 2345, SWOOLE_BASE);
$server->on('open', function(\Swoole\Websocket\Server $server, $req)
{
echo "connection open: {$req->fd}\n";
});
$server->on('message', function($server, \Swoole\Websocket\Frame $frame)
{
echo "received message: {$frame->data}\n";
$server->push($frame->fd, json_encode(["hello", "world"]));
});
$server->on('close', function($server, $fd)
{
echo "connection close: {$fd}\n";
});
$server->start();
Рабочий, который получает сообщение от RabbitMQ, затем устанавливает соединение с Swoole и транслирует сообщение через сокетное соединение (worker.php).
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
// Here I'm trying to make connection to Swoole server and sernd data
$cli = new \swoole_http_client('0.0.0.0', 2345);
$cli->on('message', function ($_cli, $frame) {
var_dump($frame);
});
$cli->upgrade('/', function($cli)
{
$cli->push('This is the message to send to Swoole server');
$cli->close();
});
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
Новая задача, где сообщение будет отправлено в RabbitMQ (new_task.php):
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
После запуска сервера и сервера swoole я запускаю new_task.php из командной строки:
php new_task.php
В командной строке, где работает RabbitMQ Worker (worker.php), я вижу, что сообщение доставлено работнику (появляется сообщение «[x] Received Hello World!»).
Однако в командной строке, где работает сервер Swoole, ничего не происходит.
Итак, вопросы:
1) Правильна ли идея такого подхода?
2) Что я делаю не так?
В обратном вызове (в worker.php
) который срабатывает при получении сообщения, которое вы используете swoole_http_client
только асинхронный Кажется, это приводит к тому, что код никогда не выполняется полностью, поскольку функция обратного вызова возвращается до того, как будет запущен асинхронный код.
Синхронный способ сделать то же самое решит проблему. Вот простой пример:
$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();
Проверьте WebSocketClient
класс и пример использования в GitHub.
Вы также можете обернуть его в сопрограммная, как это:
go(function () {
$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();
});
Других решений пока нет …