Помогите мне понять, как именно работает Pool :: collect.
Pool :: collect — собирать ссылки на выполненные задачи
public void Pool::collect ( Callable $collector )
То, что я предполагаю, было: Pool::collect
регистрирует функцию, которая будет вызываться после каждого \Threaded $task
выполнен. Так я и сделал:
<?php
$pool = new Pool(4);
$pool->collect($collector);
$pool->submit(new Task);
Не сработало Но следующее делает:
<?php
$pool = new Pool(4);
$pool->submit(new Task);
$pool->collect($collector);
Итак, я думаю, что Pool::collect
делает это: прикрепляет $collector
для каждого \Threaded $task
ранее представлен.
Теперь, когда именно $collector
называется? Я полагаю, был вызван после Threaded::run()
выполнен. Опять не так.
<?php
class Task extends Threaded {
public function run () { echo "Task complete\n"; }
}
$collector = function (\Task $task) {
echo "Collect task\n";
return true;
};
$pool = new Pool(4);
$pool->submit(new Task);
$pool->collect($collector);
$pool->shutdown();
Выходы:
Collect task
Task complete
$collector
называется раньше Threaded::run()
выполнен.
Документация не говорит много. Разве событие не говорит о том, что $collector
должен вернуть логическое значение. Я этого не знала.
Я пытался использовать Pool :: collect как своего рода обратный вызов после того, как каждое задание $ выполнено. Я думаю, что я на неправильном пути.
Редактировать 1. Как насчет этой попытки?
Pool::collect
Обходит список объектов, передавая каждый из $collector
,
$collector
функция должна вернуться true
когда двигатель может уничтожить ссылку на Threaded
объект в списке работ.
::collect
функциональность была перенесена в Worker
хотя это все еще выставлено Pool
за полезность.
Существует два списка, один список элементов, готовых к выполнению, и другой список элементов, которые были выполнены (или выполняются в настоящее время).
Pool::collect
Обходит второй список, элементы, которые были выполнены (или выполняются в настоящее время).
Pool::collect
возвращает количество элементов, оставшихся в списке мусора для всех Worker
объекты в Pool
чтобы помочь с планированием сбора мусора.
<?php
$pool = new Pool(4);
while (@$i++<10) {
$pool->submit(new class($i) extends Threaded {
public function __construct($id) {
$this->id = $id;
}
public function run() {
printf(
"Hello World from %d\n", $this->id);
}
public $id;
});
}
while ($pool->collect(function(Collectable $work){
printf(
"Collecting %d\n", $work->id);
return $work->isGarbage();
})) continue;
$pool->shutdown();
?>
Даст что-то вроде:
Hello World from 1
Hello World from 2
Hello World from 3
Hello World from 4
Hello World from 8
Collecting 1
Hello World from 7
Hello World from 5
Collecting 5
Hello World from 6
Collecting 9
Collecting 2
Collecting 6
Collecting 10
Hello World from 9
Collecting 3
Collecting 7
Collecting 4
Hello World from 10
Collecting 8
Collecting 5
Collecting 10
Вы пропустили цикл аудита.
Задание не может быть выполнено немедленно, поэтому сначала функция сбора не получает положительного результата. Несколько раз нужно проверить, для этого нужен цикл.
В примере, если бесконечно верно, всегда работают 4 задачи. В противном случае, если размер очереди равен размеру пула (4), он закончен, вы можете выйти из цикла.
<?php
class Task extends \Threaded
{
protected $result;
protected $completed = false;
public function run()
{
echo "Task complete\n";
$this->result = md5(rand(1, 2000));
sleep(rand(1, 5));
$this->completed = true;
}
public function isCompleted()
{
return $this->completed;
}
public function getResult()
{
return $this->result;
}
}
$infinite = false;
$poolSize = 4;
$queue = array();
$pool = new \Pool($poolSize);
$pool->submit(new Task);
$pool->submit(new Task);
$pool->submit(new Task);
$pool->submit(new Task);do {
if($infinite === true){
$queue = array();
}
$pool->collect(function (\Task $task) use (&$queue) {
if ($task->isCompleted()) {
echo "Collect task\n";
$queue[] = $task->getResult();
return true;
} else {
echo "task not complete\n";
return false;
}
});
$size = sizeof($queue);
if ($size > 0) {
echo $size . " result\n";
print_r($queue);
if($infinite === true) {
for ($m = 0; $m < $size; $m++) {
$pool->submit(new Task);
}
} else{
if($size == $poolSize){
break;
}
}
}
usleep(100000);
} while (true);
$pool->shutdown();