многопоточность — PHP pthreads — общие объекты

Я ищу безопасный и быстрый способ использовать общий объект.

Я задал вопрос уже здесь: https://github.com/krakjoe/pthreads/issues/470
но очевидно, что это было не то место.

Попытка поделиться объектом (с резьбой) со многими другими контекстами (поток).
Все потоки обновляют этот осколочный объект — они могут устанавливать собственные запросы и также должны отвечать на запросы других.

Теперь, когда Кракджо ответил, что блокировка / разблокировка не будет доступна в 7, у меня возникла проблема.

Я знаю о: .synchronized, но не знаю, как его использовать, чтобы он работал для моих нужд.

Как я могу использовать :: synchronized для написания таких методов, как

  • замок()
  • разблокировать ()
  • is_locked () — проверьте, не заблокирован ли, и если нет, просто попробуйте позже

РЕДАКТИРОВАТЬ:

Я написал (IMO) очень простой тестовый скрипт.

этот скрипт включает в себя нет syc / lock / … методы атм.

это должно просто показать, что я пытаюсь сделать.

Я все еще ищу способ использовать ::, чтобы сделать этот общий доступ безопасным.

код:

<?php
/*
TEST:
create n threads
each will
- Shared::set() its own ref
- check if Shared::exists() its own ref
- Shared::get() its ref back
- call method ::isRunning() at returned val to easily check if is ref or got overwritten by another context

TODO:
using ::synchronized to handle multi-context-access

NOTES:
every method as public to prevent pthreads v2 "Method Modifiers - Special Behaviour"see: "Method Modifiers - Special Behaviour"at http://blog.krakjoe.ninja/2015/08/a-letter-from-future.html
*/
class Shared extends Threaded
{
public $data;
public function exists($ident)
{
return isset($this->data[$ident]);
}
public function set($ident, $ref)
{
$return = false;
if(!isset($this->data[$ident])){
$data = $this->data;
$data[$ident] = $ref;
$this->data = $data;
$return = $this->data[$ident];
}
#echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
return $return;
}
public function get($ident)
{
$return = false;
if($this->exists($ident) === true){
$data = $this->data;
$return = $data[$ident];
unset($data[$ident]);
$this->data = $data;
}
#echo __METHOD__ . '(' . $ident . ') => ' . gettype($return) . PHP_EOL;
return $return;
}
}

class T extends Thread
{
public $count;
public function __construct(Shared $Shared, $ident)
{
$this->Shared = $Shared;
$this->ident = $ident;
}
public function run()
{
$slowdown = true;
$this->count = 0;
while(true){
if($slowdown){
// "don't allow usleep or sleep" : https://github.com/krakjoe/pthreads/commit/a157b34057b0f584b4db326f30961b5c760dead8
//  loop a bit to simulate work:
$start = microtime(true);
$until = rand(1, 100000)/1000000;
while(microtime(true)-$start < $until){
// ...
}
}

if($this->Shared->exists($this->ident) === true){
$ref = $this->Shared->get($this->ident);
}
else{
$ref = $this->Shared->set($this->ident, $this);
}
// calling a method on $ref -- if not a ref we crash
$ref->isRunning();
unset($ref);
$this->count++;
}
}
}


echo 'start ...' . PHP_EOL;

$n = 8;
$Shared = new Shared();
for($i = 0, $refs = array(); $i < $n; $i++){
$refs[$i] = new T($Shared, $i);
$refs[$i]->start();
}

while(!empty($refs)){
// print status:
if(!isset($t)or microtime(true)-$t > 1){
$t = microtime(true);
echo 'status: ' . count($refs) . ' running atm ...' . PHP_EOL;
}

// join crashed threads:
foreach($refs as $i => $thread){
if($thread->isRunning() === false){
echo 'T-' . $i . ' stopped after ' . $thread->count . PHP_EOL;
if($thread->isJoined() === false){
$thread->join();
}
unset($refs[$i]);
}
}
}

echo 'no thread running anymore.' . PHP_EOL;

/* output
start ...
status: 8 running atm ...

Notice: Undefined offset: 6 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-6 stopped after 10
status: 7 running atm ...

Notice: Undefined offset: 4 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-4 stopped after 35
status: 6 running atm ...

Notice: Undefined offset: 7 in ...\shared_test.php on line 33

Fatal error: Call to a member function isRunning() on null in ...\shared_test.php on line 82
T-7 stopped after 43
status: 5 running atm ...
status: 5 running atm ...
status: 5 running atm ...

[...]
*/
?>

5

Решение

Threaded объекты уже поточнобезопасны, то есть каждый раз, когда вы читаете, пишете, проверяете существование или удаляете (отменяете) элемент, операция является атомарной — никакой другой контекст не может выполнить ни одну из вышеупомянутых операций, пока Первая операция имеет место. То же самое верно для обработчиков движка, о которых пользователь не знает, все до самого низкого уровня неявно безопасно.

Впрочем, это довольно убедительно … Это имеет очевидные ограничения, когда логика становится более сложной, такой как проверка существования члена перед установкой или выполнение чего-то еще с ним, как вы делаете: в то время как операции над объектом являются атомарными нет ничего, что могло бы остановить другой контекст unsetУчастник между вызовом isset и вызов, чтобы прочитать свойство / измерение.

Это относится к PHP7 (pthreads v3 +)

Безопасность и целостность здесь две разные вещи. Когда целостность важна, вы можете использовать Threaded::synchronized в PHP7, чтобы сохранить его правильно. В PHP5 вы также можете сохранить его, но код будет более сложным, как и объяснение.

Ваш второй пример должен работать бесконечно, если я понимаю, что это логика. Поэтому я использую это предположение для построения правильного кода, я собираюсь сделать дополнительные предположения о том, что вы, возможно, захотите сделать в этом бесконечном цикле, и дать некоторое представление о том, где это кажется необходимым.

<?php
class Referee extends Threaded {

public function find(string $ident, Threaded $reference) {
return $this->synchronized(function () use($ident, $reference) {
if (isset($this[$ident])) {
return $this[$ident];
} else return ($this[$ident] = $reference);
});
}

public function foreach(Closure $closure) {
$this->synchronized(function() use($closure) {
foreach ($this as $ident => $reference) {
$closure($ident, $reference);
}
});
}
}

class Test extends Thread {

public function __construct(Referee $referee, string $ident, bool $delay) {
$this->referee = $referee;
$this->ident   = $ident;
$this->delay   = $delay;
}

public function run() {
while (1) {
if ($this->delay) {
$this->synchronized(function(){
$this->wait(1000000);
});
}

$reference =
$this->referee->find($this->ident, $this);

/* do something with reference here, I guess */

/* do something with all references here */
$this->referee->foreach(function($ident, $reference){
var_dump(Thread::getCurrentThreadId(),
$reference->getIdent(),
$reference->isRunning());
});
}
}

public function getIdent() {
return $this->ident;
}

private $referee;
private $ident;
private $delay;
}

$referee = new Referee();
$threads = [];
$thread = 0;
$idents = [
"smelly",
"dopey",
"bashful",
"grumpy",
"sneezy",
"sleepy",
"happy",
"naughty"];

while ($thread < 8) {
$threads[$thread] = new Test($referee, $idents[$thread], rand(0, 1));
$threads[$thread]->start();
$thread++;
}

foreach ($threads as $thread)
$thread->join();
?>

Итак, мы посмотрим на различия, я расскажу вам, почему они такие, какие они есть, и как еще вы могли бы их написать, вы уже знаете, что мы сейчас говорим не о безопасности, а о честности, вам предоставляется (довольно замечательно) предположение, что все, что вы пишете, является «безопасным», как объяснено.

Первое основное отличие заключается в следующем:

if ($this->delay) {
$this->synchronized(function(){
$this->wait(1000000);
});
}

Это просто подходящий способ сделать Thread подождите, вам не придется использовать Thread Сам для синхронизации, вы можете использовать любой Threaded объект. Преимущество правильного выполнения действий в том случае, если неясно, что режим сна и сон не оставляют потоки в восприимчивом состоянии, используя ::wait делает.

В реальном мире, где вы действительно должны только ждать за что-то, что было бы более сложным блоком, это могло бы (и должно) быть больше похоже на:

if ($this->delay) {
$this->synchronized(function(){
while ($this->condition) {
$this->wait(1000000);
}
});
}

Примечание: ожидание тайм-аута — это технически ожидание чего-то, однако вас может разбудить нечто иное, чем истекший тайм-аут, и для этого должен быть подготовлен код.

Таким образом, другой контекст может уведомить Thread что он должен прекратить ожидание и корректно завершить работу или немедленно выполнить какое-либо другое важное действие, просто синхронизировав, изменив условие и уведомив Thread,

Для предсказуемого кода крайне важно научиться работать с синхронизацией, ждать и уведомлять.

Далее у нас есть логика для установки и / или получения ссылки:

$reference =
$this->referee->find($this->ident, $this);

Что вызывает это:

public function find(string $ident, Threaded $reference) {
return $this->synchronized(function () use($ident, $reference) {
if (isset($this[$ident])) {
return $this[$ident];
} else return ($this[$ident] = $reference);
});
}

Это плохо названо, называть вещи сложно, но вы можете видеть, что целостность сохраняется синхронизацией во время выполнения этих сгруппированных операций. Тот же метод может также использоваться для получения ссылки на другой объект с небольшим изменением.

Я думаю, что вы делаете что-то с этой конкретной ссылкой (которая всегда будет $this В настоящее время). Я не могу догадаться что. Двигаясь дальше …

Я сделал предположение, что вы захотите что-то сделать с каждым из этих Threadsи вы хотите сохранить целостность данных во время всей итерации:

$this->referee->foreach(function($ident, $reference){
var_dump(Thread::getCurrentThreadId(),
$reference->getIdent(),
$reference->isRunning());
});

Что вызывает:

public function foreach(Closure $closure) {
$this->synchronized(function() use($closure) {
foreach ($this as $ident => $reference) {
$closure($ident, $reference);
}
});
}

Вот как бы вы поступили так.

Стоит отметить, что здесь не обязательно синхронизироваться; так же, как ничего плохого не произойдет, если вы удалите элемент из массива, по которому вы выполняете итерацию, ничего плохого не случится, если вы сбросите или установите или сделаете что-то еще с объектом во время итерации.

8

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]