PHP: потоковые агенты, совместно использующие общий объект

Я запускаю CGI-программу на непотоковом PHP, небольшое упражнение в искусственной жизни и эволюции. У организмов есть геном и интерпретатор, который заставляет их выполнять определенные операции, в том числе перемещаться и взаимодействовать друг с другом на общей карте мира. В настоящее время я поддерживаю плохое подобие многопоточности, используя несколько процессов PHP, взаимодействующих через базу данных MySQL, но я хочу переписать код так, чтобы он использовал pthreads для непрерывной работы в одном потоке, не обязательно используя базу данных (хотя я вероятно, я хотел бы сохранить это для отчетности).

Я просматривал задаваемые и отвеченные вопросы и примеры кода на github, но не смог найти ничего, что, насколько я могу судить, адресовано тому, что я хочу. Поскольку я не совсем гениальный программист ООП и совершенно не знаком с написанием многопоточного кода, особенно в PHP, мои вопросы будут довольно широкими.

Я попытался немного сузить область своих вопросов, написав код, показывающий, что я пытаюсь сделать, но он все еще может быть слишком широким. Буду признателен за любые советы о том, как сузить его дальше.

Мои вопросы по поводу кода ниже:

  • Как заставить Организм воздействовать на общий объект World, чтобы изменения в объекте World передавались всем потокам, избегая конфликтов и поддерживая согласованность?
  • Учитывая, что численность населения в конечном итоге варьируется, есть ли способ сделать ссылки на организмы частью объекта мира (т. Е. $ World-> организмов), а также Может ли Мир создавать новые организмы, как показано ниже (неисправный) код?
  • Учитывая, что я в конечном итоге хочу создать популяцию из сотен Организмов, есть ли у вас какие-либо указания на ограничение количества активных потоков (т. Е. Ограничение использования памяти / процессора) при сохранении согласованности?

Нижеприведенный код (конечно, не запускается, но) иллюстрирует то, чего я хочу достичь:

/*
* Shared object containing a map of the world and
* methods for getting/setting coordinates
*/
class World extends Thread
{
public $map;
public $sx;
public $sy;
public $organisms;

// set all map coords to 0
public function __construct($sx, $sy, $p)
{
$map = array();
for( $i = 0; $i < $sx; $i++ )
{
$map[$i] = array();
for( $j = 0; $j < $sy; $j++ )
{
$map[$i][$j] = 0;
}
}
$this->map = $map;
$this->sx = $sx;
$this->sy = $sy;

// create and start organism threads
$this->organisms = array();
for( $i = 0; $i < $p; $i++ )
{
// this won't work because threaded objects created
// within a thread scope that have no external references
// are destroyed in the constructor
$this->makeOrganism($i+1, $i+1, $i+1);
}
}

// produces a new organism, adds to world population
public function makeOrganism($x, $y, $value)
{
if( $x < 1 || $x > $this->sx ) return false;
if( $y < 1 || $y > $this->sy ) return false;
if( $this->getMap($x, $y) != 0 ) return false;

echo "creating new organism $value\n";
$organisms = $this->organisms;
// doesn't work because the world data is decoupled in the new thread
$organisms[] = new Organism($this, $x, $y, $value);
$this->organisms = $organisms;

return true;
}

// assumes valid coords
public function setMap($x, $y, $value)
{
return $this->map[$x-1][$y-1] = $value;
}

// assumes valid coords
public function getMap($x, $y)
{
return $this->map[$x-1][$y-1];
}

public function getSX()
{
return $this->sx;
}

public function getSY()
{
return $this->sy;
}

public function run()
{
for( $i = 0; $i < count($this->organisms); $i++ )
{
echo "starting organism ", $this->value, "\n";
$this->organisms[$i]->start();
}
}
}

/*
* Autonomously running agent accessing shared World map
*/
class Organism extends Thread
{
public $value;
public $world;
public $x;
public $y;

public function __construct(World $world, $x, $y, $value)
{
$this->world = $world;
$this->value = $value;
$this->x = $x;
$this->y = $y;
// assume coordinates are empty
$this->world->setMap($x, $y, $value);
}

// try to move organism by $dx, $dy
public function move($dx, $dy)
{
$x = $this->x + $dx;
$y = $this->y + $dy;
if( $x < 1 || $x > $this->world->getSX() ) return false;
if( $y < 1 || $y > $this->world->getSY() ) return false;
if( $this->world->getMap($x, $y) != 0 ) return false;

$this->world->setMap($x, $y, $this->value);
$this->world->setMap($this->x, $this->y, 0);
$this->x = $x;
$this->y = $y;
return true;
}

public function getValue()
{
return $this->value;
}

public function run()
{
// infinite loop; organisms move randomly about until they die
while( true )
{
echo "running organism ", $this->getValue(), "\n";
// this should operate on the shared object World,
// maintaining consistency and avoiding conflicts between threads
$dx = rand(-1, 1);
$dy = rand(-1, 1);
$this->move($dx, $dy);

// occasionally add an organism to the population by cloning this one
if( rand(0, 100) > 95 )
{
$this->world->makeOrganism($this->x+1, $this->y+1, $this->value+100);
}

// wait random interval, organisms are
// not expected to move all at the same time
$this->wait(1000 + rand(500, 1500));
}
}
}

// initialize shared object
$world = new World(50, 50, 50);
$world->start();
$world->join();

4

Решение

Я собираюсь ответить за pthreads v3, PHP7.

Пожалуйста, не используйте pthreads v2 для новых проектов, v3 намного лучше.

Как заставить Организм воздействовать на общий объект World, чтобы изменения в объекте World передавались всем потокам, избегая конфликтов и поддерживая согласованность?

Следующий код создает группу потоков, которые управляют общим объектом:

<?php
class Test extends Thread {

public function __construct(Threaded $shared) {
$this->shared = $shared;
}

public function run() {
$this->shared[] = $this->getThreadId();
}
}

$shared = new Threaded();
$tests = [];
for ($i = 0; $i<20; $i++) {
$tests[$i] =
new Test($shared);
$tests[$i]->start();
}

foreach ($tests as $test)
$test->join();

var_dump($shared);
?>

Будет давать что-то похожее на:

object(Threaded)#1 (20) {
[0]=>
int(140322714146560)
[1]=>
int(140322703144704)
[2]=>
int(140322621355776)
[3]=>
int(140322612963072)
[4]=>
int(140322604570368)
[5]=>
int(140322596177664)
[6]=>
int(140322587784960)
[7]=>
int(140322579392256)
[8]=>
int(140322570999552)
[9]=>
int(140322487138048)
[10]=>
int(140322478745344)
[11]=>
int(140322470352640)
[12]=>
int(140322461959936)
[13]=>
int(140322453567232)
[14]=>
int(140322445174528)
[15]=>
int(140322436781824)
[16]=>
int(140322428389120)
[17]=>
int(140322419996416)
[18]=>
int(140322411603712)
[19]=>
int(140322403211008)
}

Не случайно, что количество элементов постоянно:

$this->shared[] = $this->getThreadId();

Когда это выполнено, безопасность предоставляется для вас pthreads.

любой Thread со ссылкой на Threaded $shared объект может манипулировать им.

Согласованность и безопасность — это две разные вещи, рассмотрим следующий код:

<?php
class Test extends Thread {

public function __construct(Threaded $shared) {
$this->shared = $shared;
}

public function run() {
if (!isset($this->shared[0])) {
$this->shared[0] = $this->getThreadId();
}
}

private $shared;
}

$shared = new Threaded();
$tests  = [];

for ($i = 0; $i < 16; $i++) {
$tests[$i] =
new Test($shared);
$tests[$i]->start();
}

foreach ($tests as $test)
$test->join();
?>

Вы можете ожидать только один Thread путешествовать по этому пути:

$this->shared[0] = $this->getThreadId();

Но нет никаких гарантий этого. Поскольку между вызовом нет блокировки isset и утверждение выше, многие потоки могут свободно проходить путь одновременно.

Замена метода запуска Test с помощью следующего кода обеспечит согласованность:

    public function run() {
$this->shared->synchronized(function(){
if (!isset($this->shared[0])) {
$this->shared[0] = $this->getThreadId();
}
});
}

Учитывая, что численность населения в конечном итоге является переменной, существует ли способ сделать ссылки на Организмы частью объекта Мира (т. Е. $ World-> организмов), и позволить Миру создавать новые Организмы, как показано ниже ( неисправный) код?

Это звучит так, как будто нарушает основные принципы SOLID. В любом случае, за этим не стоит гоняться.

Похоже, что организмы должны принадлежать к основному процессу, и поэтому должны быть построены там и переданы в Thread или же Worker или что угодно.

Они должны принадлежать к основному процессу, потому что они могут оказаться в нескольких Thread,

Учитывая, что я в конечном итоге хочу создать популяцию из сотен Организмов, есть ли у вас какие-либо указания на ограничение количества активных потоков (т. Е. Ограничение использования памяти / процессора) при сохранении согласованности?

С использованием Pool реализация обеспечивается pthreads.

Ниже приведен код:

<?php
class Organisms extends Volatile {}

class World extends Threaded {

public function __construct(Organisms $organisms, Volatile $grid) {
$this->organisms = $organisms;
$this->grid = $grid;
}

public function populate($organism, int $x, int $y) : bool {
$reference = $this->getGridReference($x, $y);

return $this->grid->synchronized(function() use($organism, $reference) {
if (isset($this->grid[$reference]))
return false;
return (bool) $this->grid[$reference] = $organism;
});
}

private function getGridReference(int $x, int $y) {
return sprintf("%dx%d", $x, $y);
}

public function getOrganisms() { return $this->organisms; }

private $organisms;
}

class Organism extends Threaded {

public function __construct(World $world) {
$this->world = $world;
}

public function setPosition(int $x, int $y) {
$this->x = $x;
$this->y = $y;
}

public function getWorld() { return $this->world; }

private $world;
private $x = -1;
private $y = -1;
}

class OrganismPopulateTask extends Threaded {

public function __construct(World $world, Organism $organism, int $x, int $y) {
$this->world = $world;
$this->organism = $organism;
$this->x = $x;
$this->y = $y;
}

public function run() {
if ($this->world->populate(
(object) $this->organism, $this->x, $this->y)) {
$this->organism->setPosition($this->x, $this->y);
}
}

private $world;
private $organism;
private $x;
private $y;
}

$organisms = new Organisms();
$grid = new Volatile();
$world = new World($organisms, $grid);
$pool = new Pool(16);

$organisms[] = new Organism($world);
$organisms[] = new Organism($world);

$pool
->submit(new OrganismPopulateTask($world, $organisms[0], 10, 10));
$pool
->submit(new OrganismPopulateTask($world, $organisms[1], 10, 10));

$pool->shutdown();

var_dump($world);
?>

Даст:

object(World)#3 (2) {
["organisms"]=>
object(Organisms)#1 (2) {
[0]=>
object(Organism)#5 (3) {
["world"]=>
*RECURSION*
["x"]=>
int(10)
["y"]=>
int(10)
}
[1]=>
object(Organism)#6 (3) {
["world"]=>
*RECURSION*
["x"]=>
int(-1)
["y"]=>
int(-1)
}
}
["grid"]=>
object(Volatile)#2 (1) {
["10x10"]=>
object(Organism)#5 (3) {
["world"]=>
*RECURSION*
["x"]=>
int(10)
["y"]=>
int(10)
}
}
}

Примечание: это использует функции, включенные в v3.1.2

Большинство из них должно быть самоочевидным, явные приведения должны избегать исключений, вызванных попыткой подключения к объектам, которые исчезли.

Главное отметить, что каждое «действие» рассматривается как «задача» и передается Pool,

2

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]