многопоточность — лучший способ разгрузить одноразовые рабочие потоки в PHP? Pthreads? FCNTL?

Как я должен многопоточность некоторый код php-cli, который требует тайм-аут?

Я использую PHP 5.6 на Centos 6.6 из командной строки.

Я не очень знаком с многопоточностью терминологии или кода. Я упрощу код здесь, но он на 100% представляет то, что я хочу сделать.

Непоточный код в настоящее время выглядит примерно так:

$datasets = MyLibrary::getAllRawDataFromDBasArrays();
foreach ($datasets as $dataset) {
MyLibrary::processRawDataAndStoreResultInDB($dataset);
}
exit; // just for clarity

Мне нужно предварительно выбрать все мои наборы данных, и каждый processRawDataAndStoreResultInDB () не может получить свой собственный набор данных. Иногда processRawDataAndStoreResultInDB () занимает слишком много времени для обработки набора данных, поэтому я хочу ограничить время, необходимое для его обработки.

Таким образом, вы можете видеть, что создание многопоточного

  1. Ускорьте его, разрешив одновременное выполнение нескольких processRawDataAndStoreResultInDB ()
  2. Используйте set_time_limit (), чтобы ограничить количество времени, которое каждый должен обрабатывать каждый набор данных

Обратите внимание, что мне не нужно возвращаться к моей основной программе. Поскольку это упрощение, вы можете верить, что я не хочу собирать все обработанные наборы данных и делать одно сохранение в БД после того, как все они будут выполнены.

Я хотел бы сделать что-то вроде:

class MyWorkerThread extends SomeThreadType {
public function __construct($timeout, $dataset) {
$this->timeout = $timeout;
$this->dataset = $dataset;
}

public function run() {
set_time_limit($this->timeout);
MyLibrary::processRawDataAndStoreResultInDB($this->dataset);
}
}

$numberOfThreads = 4;
$pool = somePoolClass($numberOfThreads);
$pool->start();

$datasets = MyLibrary::getAllRawDataFromDBasArrays();
$timeoutForEachThread = 5; // seconds
foreach ($datasets as $dataset) {
$thread = new MyWorkerThread($timeoutForEachThread, $dataset);

$thread->addCallbackOnTerminated(function() {
if ($this->isTimeout()) {
MyLibrary::saveBadDatasetToDb($dataset);
}
}

$pool->addToQueue($thread);
}

$pool->waitUntilAllWorkersAreFinished();
exit; // for clarity

Из моих онлайн-исследований я нашел pthreads расширения PHP, которые я могу использовать с моим потоково-безопасным php CLI, или я мог бы использовать расширение PCNTL или библиотеку-оболочку вокруг него (скажем, Arara / Process)

Когда я смотрю на них и их примеры (особенно на пример пула pthreads), меня быстро смущает терминология и то, какие классы мне следует использовать для достижения нужного мне вида многопоточности.

Я бы даже не возражал против создания класса пула сам, если бы у меня в классе потока были функции isRunning (), isTermination (), getTerminationStatus () и execute (), поскольку это была бы простая очередь.

Может кто-то с большим опытом, пожалуйста, укажите мне, какую библиотеку, классы и функции я должен использовать, чтобы сопоставить с моим примером выше? Я принимаю неправильный подход полностью?

Заранее спасибо.

3

Решение

Вот пример использования рабочих процессов. Я использую расширение pcntl.

/**
* Spawns a worker process and returns it pid or -1
* if something goes wrong.
*
* @param callback function, closure or method to call
* @return integer
*/
function worker($callback) {
$pid = pcntl_fork();
if($pid === 0) {
// Child process
exit($callback());
} else {
// Main process or an error
return $pid;
}
}$datasets = array(
array('test', '123'),
array('foo', 'bar')
);

$maxWorkers = 1;
$numWorkers = 0;
foreach($datasets as $dataset) {
$pid = worker(function () use ($dataset) {
// Do DB stuff here
var_dump($dataset);
return 0;
});

if($pid !== -1) {
$numWorkers++;
} else {
// Handle fork errors here
echo 'Failed to spawn worker';
}

// If $maxWorkers is reached we need to wait
// for at least one child to return
if($numWorkers === $maxWorkers) {
// $status is passed by reference
$pid = pcntl_wait($status);
echo "child process $pid returned $status\n";
$numWorkers--;
}
}

// (Non blocking) wait for the remaining childs
while(true) {
// $status is passed by reference
$pid = pcntl_wait($status, WNOHANG);

if(is_null($pid) || $pid === -1) {
break;
}

if($pid === 0) {
// Be patient ...
usleep(50000);
continue;
}

echo "child process $pid returned $status\n";
}
0

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]