Как я должен многопоточность некоторый код php-cli, который требует тайм-аут?
Я использую PHP 5.6 на Centos 6.6 из командной строки.
Я не очень знаком с многопоточностью терминологии или кода. Я упрощу код здесь, но он на 100% представляет то, что я хочу сделать.
Непоточный код в настоящее время выглядит примерно так:
$datasets = MyLibrary::getAllRawDataFromDBasArrays();
foreach ($datasets as $dataset) {
MyLibrary::processRawDataAndStoreResultInDB($dataset);
}
exit; // just for clarity
Мне нужно предварительно выбрать все мои наборы данных, и каждый processRawDataAndStoreResultInDB () не может получить свой собственный набор данных. Иногда processRawDataAndStoreResultInDB () занимает слишком много времени для обработки набора данных, поэтому я хочу ограничить время, необходимое для его обработки.
Таким образом, вы можете видеть, что создание многопоточного
Обратите внимание, что мне не нужно возвращаться к моей основной программе. Поскольку это упрощение, вы можете верить, что я не хочу собирать все обработанные наборы данных и делать одно сохранение в БД после того, как все они будут выполнены.
Я хотел бы сделать что-то вроде:
class MyWorkerThread extends SomeThreadType {
public function __construct($timeout, $dataset) {
$this->timeout = $timeout;
$this->dataset = $dataset;
}
public function run() {
set_time_limit($this->timeout);
MyLibrary::processRawDataAndStoreResultInDB($this->dataset);
}
}
$numberOfThreads = 4;
$pool = somePoolClass($numberOfThreads);
$pool->start();
$datasets = MyLibrary::getAllRawDataFromDBasArrays();
$timeoutForEachThread = 5; // seconds
foreach ($datasets as $dataset) {
$thread = new MyWorkerThread($timeoutForEachThread, $dataset);
$thread->addCallbackOnTerminated(function() {
if ($this->isTimeout()) {
MyLibrary::saveBadDatasetToDb($dataset);
}
}
$pool->addToQueue($thread);
}
$pool->waitUntilAllWorkersAreFinished();
exit; // for clarity
Из моих онлайн-исследований я нашел pthreads расширения PHP, которые я могу использовать с моим потоково-безопасным php CLI, или я мог бы использовать расширение PCNTL или библиотеку-оболочку вокруг него (скажем, Arara / Process)
Когда я смотрю на них и их примеры (особенно на пример пула pthreads), меня быстро смущает терминология и то, какие классы мне следует использовать для достижения нужного мне вида многопоточности.
Я бы даже не возражал против создания класса пула сам, если бы у меня в классе потока были функции isRunning (), isTermination (), getTerminationStatus () и execute (), поскольку это была бы простая очередь.
Может кто-то с большим опытом, пожалуйста, укажите мне, какую библиотеку, классы и функции я должен использовать, чтобы сопоставить с моим примером выше? Я принимаю неправильный подход полностью?
Заранее спасибо.
Вот пример использования рабочих процессов. Я использую расширение pcntl.
/**
* Spawns a worker process and returns it pid or -1
* if something goes wrong.
*
* @param callback function, closure or method to call
* @return integer
*/
function worker($callback) {
$pid = pcntl_fork();
if($pid === 0) {
// Child process
exit($callback());
} else {
// Main process or an error
return $pid;
}
}$datasets = array(
array('test', '123'),
array('foo', 'bar')
);
$maxWorkers = 1;
$numWorkers = 0;
foreach($datasets as $dataset) {
$pid = worker(function () use ($dataset) {
// Do DB stuff here
var_dump($dataset);
return 0;
});
if($pid !== -1) {
$numWorkers++;
} else {
// Handle fork errors here
echo 'Failed to spawn worker';
}
// If $maxWorkers is reached we need to wait
// for at least one child to return
if($numWorkers === $maxWorkers) {
// $status is passed by reference
$pid = pcntl_wait($status);
echo "child process $pid returned $status\n";
$numWorkers--;
}
}
// (Non blocking) wait for the remaining childs
while(true) {
// $status is passed by reference
$pid = pcntl_wait($status, WNOHANG);
if(is_null($pid) || $pid === -1) {
break;
}
if($pid === 0) {
// Be patient ...
usleep(50000);
continue;
}
echo "child process $pid returned $status\n";
}
Других решений пока нет …