Я что-то играю с pthreads 3.1.6-dev и PHP 7.1. Моя цель — создать небольшой веб-сканер.
Запланированный рабочий процесс: вы помещаете один URL в пул (возможно, домашнюю страницу), и сканер (расширяет потоки) ловит все ссылки с этого URL. После небольшой фильтрации искатель должен добавить все новые ссылки в пул (внешние ссылки не должны добавляться в пул). В качестве альтернативы сканер передает новые URL-адреса «кому-то другому», который добавляет их в пул.
Процесс должен продолжаться, пока не будут найдены новые URL.
Моя проблема в том, что я не нашел рабочего решения для этого. Мой текущий розыгрыш выглядит следующим образом: сканер извлекает URL-адреса и помещает их в пул. Для этого каждый Worker хранит ссылку на пул, чтобы программа-обходчик могла получить доступ к рабочему объекту пула.
Проблема с этим решением: если «поздняя» ветвь добавит новую ветвь в пул, эта новая задача не будет выполнена.
Некоторый демонстрационный код:
class WebWorker extends Worker
{
public function __construct(WebPool $pool)
{
$this->pool = $pool;
print_r("Create a new worker\n");
}
public function run()
{
print_r("Worker {$this->getThreadId()}: " . __METHOD__ . "\n");
}
public function getPool()
{
return $this->pool;
}
private $pool;
}
class WebWork extends Threaded
{
public function run()
{
print_r("Webwork from Worker {$this->worker->getThreadId()}\n");
if (rand(0, 10) > 5) {
print_r("Webwork {$this->worker->getThreadId()} add new Webwork\n");
$this->worker->getPool()->submit(new WebWork());
}
}
}
class WebPool extends Pool
{
public function __construct($size, $class)
{
parent::__construct($size, $class, [$this]);
}
}$pool = new WebPool(2, 'WebWorker');
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
while ($pool->collect(function ($work) {
return $work->isGarbage();
})) continue;
$pool->shutdown();
Один пример результата:
Create a new worker
Worker 139878744053504: WebWorker::run
Webwork from Worker 139878744053504
Create a new worker
Worker 139878731872000: WebWorker::run
Webwork from Worker 139878731872000
Webwork from Worker 139878731872000
Webwork 139878731872000 add new Webwork
Webwork from Worker 139878744053504
Create a new worker
Worker 139878719289088: WebWorker::run
Webwork from Worker 139878719289088
Webwork 139878719289088 add new Webwork
Может кто-нибудь сказать мне лучшую практику для этой проблемы?
Проблема в том, что вы полагаетесь на сборщик мусора, чтобы заблокировать основной поток, тогда как на самом деле вы должны использовать свое собственное условие для блокировки. Переопределение сборщика мусора по умолчанию в pthreads v3 на самом деле не требуется, за исключением особых случаев, когда вы не хотите, чтобы задача собиралась сразу после ее завершения.
Одним из возможных решений вашей проблемы является наличие переменной счетчика ссылок, которая увеличивается для каждой новой найденной ссылки (которая требует сканирования) и уменьшается для каждой просканированной ссылки. Когда эта переменная достигает 0, вы можете предположить, что веб-сайт был полностью просканирован, и, следовательно, вы можете безопасно закрыть свой пул потоков.
Вот это решение в коде:
<?php
class WebsiteCrawler extends Worker
{
public $pool; // expose the pool to our LinkCrawler tasks
public function __construct(Pool $pool)
{
$this->pool = $pool;
}
}
class LinkCrawler extends Threaded
{
private $link;
public static $domain;
public function __construct(string $link)
{
$this->link = $link;
WebCrawlerPool::$links[] = $link;
++WebCrawlerPool::$linksRemaining->count;
var_dump($link); // for debugging, just to show that it is collecting links
}
public function run()
{
$content = file_get_contents($this->link);
$domain = preg_quote(self::$domain);
preg_match_all("~href=\"(.+?{$domain}.+?)\"~", $content, $matches); // naive regex to fetch links
$links = $matches[1];
foreach ($links as $link) {
if (count(WebCrawlerPool::$links) > 9) { // stop at 10 links (for example purposes...)
break;
}
if (!in_array($link, get_object_vars(WebCrawlerPool::$links), true)) {
$this->worker->pool->submit(new LinkCrawler($link));
}
}
--WebCrawlerPool::$linksRemaining->count;
}
}
class WebCrawlerPool extends Pool
{
public static $linksRemaining;
public static $links;
public function __construct(int $size, string $class, array $ctor = [])
{
parent::__construct($size, $class, [$this]);
self::$links = new Threaded();
self::$linksRemaining = new Threaded();
self::$linksRemaining->count = 0;
}
}
LinkCrawler::$domain = 'php.net';
$pool = new WebCrawlerPool(8, 'WebsiteCrawler');
$pool->submit(new LinkCrawler('http://php.net/', $pool)); // kick things off
while (WebCrawlerPool::$linksRemaining->count !== 0);
$pool->shutdown();
print_r(WebCrawlerPool::$links);
Выше, конечно, просто пример кода — вы можете сделать что-то по-другому. Но вот пара примечательных моментов по этому поводу:
$linksRemaining
это Threaded
объект, а не простое целое число. Это потому что Threaded
объекты имеют синхронизацию встроенный. Это означает, что многие контексты могут манипулировать одним и тем же Threaded
объект безопасно. Используя это свойство Threaded
объекты, вы можете упростить код, который вы пишете в pthreads.WebCrawlerPool::$links
) проводятся в Threaded
объект, который создается в основном контексте. Это было сделано по двум причинам. Во-первых, массивы PHP нельзя безопасно передавать и манипулировать ими из разных контекстов (в отличие от Threaded
obejcts). И во-вторых, Threaded
объекты привязаны к тому контексту, в котором они созданы. Это означает, что мы должны создавать их в самом внешнем контексте, в котором мы хотим их использовать — в противном случае они станут недоступными при присоединении к потоку, который их создал.Других решений пока нет …