многопоточность — как запустить методы объекта PHP параллельно и синхронизировать результаты в массив

Привет, пытаясь найти способ, как запустить метод объекта PHP параллельно.

Посмотрел несколько решений по многопоточности с PHP, но, похоже, не может найти способ параллельного запуска методов объекта, может кто-нибудь объяснить, что я делаю неправильно, и предложить исправление любого решения или альтернативного примера с классом Country где метод get_data будет работать в нескольких параллельных процессах?

  1. pcntl_fork () — Форкинг с PHP
  2. Pthreads — Расширение PHP
  3. misterion / ко-процесс — пакет композитора
  4. duncan3dc / вилка-хелперы — пакет композитора
  5. освещения / очереди — пакет композитора

Тестирование pcntl_fork ()

    <?php

class Countries {
function get_data($country){

usleep(1000);
foreach($i=0; $i<1000;$i++ ){
$data[$i] = $country;
}

return $data;

}
}

$os = new Countries;

$countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



// how to add and control a limit of max processes running at the time?

$start_time = microtime(true);

foreach($countries as $country) {

$pid = pcntl_fork();

if (!$pid) {

error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);

// How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
$d[$country] = $os->get_data($country);

error_log( date('Y-m-d H:i:s').' - !pid -> d['.$country.']  ='.var_export($d[$country],true)." \n", 3, $log);
exit($country);
}
}

while (pcntl_waitpid(0, $status) != -1);
// do something with $d[$country] here after all child processes completed


$end_time = microtime(true);
$duration = $end_time - $start_time;
$duration = number_format($duration,3);
error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."\n", 3, $log);





?>

Тестирование Pthreads

<?php


if (extension_loaded('pthreads')) {

$pool = new Pool(4);

class Countries {
function get_data($country){

usleep(1000);
foreach($i=0; $i<1000;$i++ ){
$data[$i] = $country;
}

return $data;

}
}

$os = new Countries;

$countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");


$start_time = microtime(true);
foreach($countries as $country) {
$dataN = new Threaded();
$dataN->country = $country;
$dataN->os = $os;
$dataN->result = "";

$threads[] = $dataN;

$pool->submit(
new class($dataN) extends Threaded {
public $data;

public function __construct($data)
{
$this->data = $data;
}

public function run()
{

$this->data->result = $this->data->os->get_data($this->data->country);

}
}
);

}


while ($pool->collect());

$pool->shutdown();

foreach ($threads as $thread) {

error_log( date('Y-m-d H:i:s').' - d['.$thread->country.'] = '.var_export($thread->result,true)."\n", 3, $log);
$d[$thread->country] = $thread->result;

}

// do something with $d[$country] here after all child processes completed

$end_time = microtime(true);
$duration = $end_time - $start_time;
$duration = number_format($duration,3);
error_log( date('Y-m-d H:i:s').' - 2. PHP PThreads example duration='.$duration."\n", 3, $log);
}else{
error_log( date('Y-m-d H:i:s').' - pthreads extension is not loaded!'."\n", 3, $log);

}

?>

Тестирование мистериона / ко-процесса

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';

class Countries {
function get_data($country){

usleep(1000);
foreach($i=0; $i<1000;$i++ ){
$data[$i] = $country;
}

return $data;

}
}

$os = new Countries;

$countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



// how to add and control a limit of max processes running at the time?

$start_time = microtime(true);



$manager = new Ko\ProcessManager();

foreach($countries as $country) {

$manager->fork(function(Ko\Process $p) {
error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);
// How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
$d[$country] = $os->get_data($country);
});

}

error_log( date('Y-m-d H:i:s')." - Waiting for the threads to finish... \n", 3, $log);
$manager->wait();

error_log( date('Y-m-d H:i:s')." - threads finished. \n", 3, $log);

// do something with $d[$country] here after all child processes completed


$end_time = microtime(true);
$duration = $end_time - $start_time;
$duration = number_format($duration,3);
error_log( date('Y-m-d H:i:s').' - 3. misterion/ko-process example duration='.$duration."\n", 3, $log);



?>

Тестирование duncan3dc / fork-helper

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';

class Countries {
function get_data($country){

usleep(1000);
foreach($i=0; $i<1000;$i++ ){
$data[$i] = $country;
}

return $data;

}
}

$os = new Countries;

$countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");



// how to add and control a limit of max processes running at the time?

$start_time = microtime(true);


$fork = new \duncan3dc\Forker\Fork;

foreach($countries as $country) {

$fork->call(function () {
error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);
// How to execute $os->get_table_data($country) method in a child process and collect results into $d[$country]?
$d[$country] = $os->get_data($country);

});


}

error_log( date('Y-m-d H:i:s')." - Waiting for the threads to finish... \n", 3, $log);

$fork->wait();
error_log( date('Y-m-d H:i:s')." - threads finished. \n", 3, $log);

// do something with $d[$country] here after all child processes completed


$end_time = microtime(true);
$duration = $end_time - $start_time;
$duration = number_format($duration,3);
error_log( date('Y-m-d H:i:s').' - 3. duncan3dc/fork-helper example duration='.$duration."\n", 3, $log);





?>

Тестирование подсветки / очереди

<?php

require_once $_SERVER["DOCUMENT_ROOT"].'/vendor/autoload.php';


class Countries {

public $data;

function __construct($country){
$this->data[$country] = $this->get_data($country);
}

function get_data($country){

usleep(1000);
foreach($i=0; $i<1000;$i++ ){
$data[$i] = $country;
}

return $data;

}
}

$os = new Countries;

$countries = array("GB","US","FR","DE","IT","ES","LT","BR","BE","JP","CN");

use Illuminate\Queue\Capsule\Manager as Queue;

$queue = new Queue;

$queue->addConnection([
'driver' => 'beanstalkd',
'host' => 'localhost',
'queue' => 'default',
]);

// Make this Capsule instance available globally via static methods... (optional)
//$queue->setAsGlobal();


// how to add and control a limit of max processes running at the same time?
foreach($countries as $country) {
$d[$country] = $queue->push('Countries', array("country"=>$country));
}
// how to get results after all processes completed into $d[$country]?
// do something with results


$end_time = microtime(true);
$duration = $end_time - $start_time;
$duration = number_format($duration,3);
error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."\n", 3, $log);


?>

0

Решение

Я не могу помочь с pthreads, ko-process, fork-helper или queue (у меня просто нет опыта их использования), но это один из способов заставить ваш код работать с pcntl_fork и использование сокетов для передачи сообщений между дочерним и родительским процессами:

<?php
class Countries {
function get_data($country){
usleep(1000);
for($i=0; $i<1000; $i++){
$data[$i] = $country;
}

return $data;
}
}

$os = new Countries;

$countries = ["GB", "US", "FR", "DE", "IT", "ES", "LT", "BR", "BE", "JP", "CN"];

// To answer your question about limiting the number of concurrent processes, you
// need to limit the number of times you call pctnl_fork(). You might do something
// like:
//    1. Chunk the $countries array: [["GB", "US"], ["FR", "DE"], ["IT", "ES"], ...
//    2. Call pctnl_fork() once for each inner array (half as many)
//    3. Child process calls $os->get_data() once for each country in the sub-array
//
// Another solution is to utilize what's known as a "Pool" -- where you give a
// collection of tasks to a class which spins up threads for you and hands tasks to
// threads as they become available. This method abstracts the complexity of
// multiprocessing, but will require you to find a third-party library you like or
// implement the Pool class on your own.
$start_time = microtime(true);

// Initialize $d in the parent thread (before pcntl_fork())
$d = [];

// Keep a list of child processes, so that we can wait for ALL of them to terminate
$pids = [];

// Initialize a socket for message passing (see below)
socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $socket);

foreach($countries as $country) {
$pid = pcntl_fork();

if (!$pid) {
error_log( date('Y-m-d H:i:s').' - In child  '.$country." \n", 3, $log);

// To answer your question about how to collect the result in the $d array,
// you need to pass the results back to the parent thread via some message
// channel. The easiest solution I know of is a socket pair.
//
// In order for the socket to be available to both the parent and child,
// the socket must be created before you fork (see above)
$data = serialize($os->get_data($country));

// Sockets are just raw byte streams with no organization or semantics. It's
// up to you to understand the output of the socket. I devised a basic
// protocol here where I begin with the country code, follow it up with a
// serialized data structure, then terminate with a double-new-line
socket_write($socket[0], $country . " " . $data . "\n\n");
socket_close($socket[0]);
exit();
}

$pids[] = $pid;
}

// Wait for all child processes to finish
foreach($pids as $pid) {
pcntl_waitpid($pid, $status);
}

// Keep reading from the socket until there's no data left
$new_data = socket_read($socket[1], 1024);
$data = $new_data;
while(strlen($new_data) == 1024) {
$new_data = socket_read($socket[1], 1024);
$data .= $new_data;
}

// Split at double-new-line to get individual return values
$results = explode("\n\n", $data);

// Now parse the results (per my custom protocol I defined above)
foreach($results as $result) {
$country = substr($result, 0, 2);
$result = substr($result, 3);
$d[$country] = unserialize($result);
}

$end_time = microtime(true);
$duration = $end_time - $start_time;
$duration = number_format($duration, 3);

error_log( date('Y-m-d H:i:s').' - 1. pcntl_fork() example duration='.$duration."\n", 3, $log);

?>

Я хочу отметить одну вещь: во многих случаях многопроцессорная обработка магическим образом не заставляет программы работать быстрее, как думают люди. Если задача связана с ЦП (то есть вы тратите все свое время на выполнение сложных операций с ЦП), то многопроцессорная обработка не будет иметь никакого эффекта или замедлит ее. Если задача связана с вводом-выводом (то есть вы тратите все свое время на ожидание выполнения сетевых или дисковых операций), вы можете значительно ускорить ее выполнение, позволяя процессору выполнять значимую работу, а не сидеть на руках и ждать.

1

Другие решения

Других решений пока нет …

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector