Я хочу привнести многопоточность в мой проект, который написан на PHP. Я узнал, что pthreads — это хорошая библиотека для удовлетворения моих требований. Я постараюсь описать рабочий процесс моего приложения абстрактно, поэтому решение может быть применено для любого другого проекта с такой же архитектурой.
Прежде всего, у меня есть очередь сообщений, которую предоставляет RabbitMQ. У меня есть процесс проверки этих сообщений. После того, как я получаю и проверяю сообщение из очереди, мне нужно отправить его в одно из моих внешних приложений.
У меня есть класс (назовем его EXT_APP), который отвечает за взаимодействие
с внешним приложением через сокет. Также у меня есть класс (назовем его EXT_APP_DISPATCHER), который имеет пул EXT_APP, и когда я заканчиваю проверку сообщения, я отправляю его через EXT_APP_DISPATCHER в EXT_APP с наименьшим количеством занятых каналов (скажем, не беспокойтесь о каналах, скажем так у каждого EXT_APP есть функция, которая дает значение, которое описывает, насколько тяжело сейчас загружается это внешнее приложение)
Поэтому я хочу, чтобы все эти EXP_APP были в отдельных потоках, потому что есть функции, которые читают / записывают события из / в сокет, и это может быть хорошим моментом для улучшения производительности приложения. Но как я могу решить, в какой поток я отправляю проверенное сообщение в зависимости от его «загрузки». Я четко понимаю, что я не могу просто вызвать функцию объекта, который находится в другом потоке. Есть способ реализовать или изменить пул потоков, поэтому я отправлю сообщение в EXT_APP в зависимости от их загрузки.
Я написал уроки, чтобы вы поняли, о чем я говорю:
class EXT_APP_DISPATCHER
{
private $connection_options = array(
array(
'host' => '127.0.0.1',
'scheme' => 'tcp://',
'port' => 5039,
'username' => 'asterisk_2',
'secret' => 'ajhu8dyvUWzV5JKa',
'connect_timeout' => 100,
'read_timeout' => 100
),
array(
'host' => '127.0.0.1',
'scheme' => 'tcp://',
'port' => 5038,
'username' => 'asterisk_1',
'secret' => 'DWSg33tCqwr7rXBk',
'connect_timeout' => 100,
'read_timeout' => 100
)
);
private $pool;
public function __construct()
{
$this->pool = new \Pool(2, EXT_APP::class, [$this->connection_options]);
}
public function processMessage($message): void
{
//todo Here I need to implement load balancing
//todo I need to get load from every worker and decide which one should receive task
$this->pool->submit(new TASK());
}
}
class EXT_APP extends \Worker
{
public static $counter = 0;
private $client;
public function __construct($options)
{
$this->client = new ClientImpl($options[self::$counter]);
self::$counter++;
$this->client->registerEventListener(function ($event)
{
echo 'Hey I received an event!';
});
$this->client->open();
}
public function run()
{
}
public function sendEvent($event)
{
$this->client->send($event);
}
public function getAmountOfBusyChannels()
{
$action = new CoreShowChannelsAction();
$actionResult = $this->client->send($action);
foreach ($actionResult->getEvents() as $eventMessage) {
if ($eventMessage instanceof CoreShowChannelsCompleteEvent) {
return $eventMessage->getKey('listitems');
}
}
return null;
}
}
class TASK extends \Threaded
{
public function run()
{
$this->worker->sendEvent(null);
}
}
/*
* EXT_APP_DISPATCHER initialization which is going to be located in main application class,
* so actually here before calling function processMessage
* I do receive message from RabbitMQ and validate it
*/
$ext_app_dispatcher = new EXT_APP_DISPATCHER();
$ext_app_dispatcher->processMessage(null);
$ext_app_dispatcher->processMessage(null);
Я ожидаю, что класс EXT_APP_DISPATCHER получит сообщение и передаст его работнику, который имеет наименьшую нагрузку. Все рабочие должны быть в разных темах. У вас есть идеи, как это можно сделать наилучшим образом?
Задача ещё не решена.
Других решений пока нет …