Activemq и Php Stomp: синхронный пример производителя

Я пытаюсь, чтобы этот принцип работал:

  • производитель, который отправляет один message (1) и ожидает подтверждения, содержащего некоторый результат (на самом деле результат операции json)
  • потребитель, который проверяет все ожидающие сообщения каждые 5 секунд, и обработать все из них в одном ряду, и подтверждает все из них в одном ряду, затем снова ждать 5 секунд (бесконечный цикл).

Вот 30 строк моего stompproducer.php:

<?php

function msg($txt)
{
echo date('H:i:s > ').$txt."\n";
}

$queue  = '/aaaa';
$msg    = 'bar';
if (count($argv)<3) {
echo $argv[0]." [msg] [nb to send]\n";
exit(1);
}
$msg     = (string)$argv[1];
$to_send = intval($argv[2]);

try {
$stomp = new Stomp('tcp://localhost:61613');
while (--$to_send) {
msg("Sending...");
$result = $stomp->send(
$queue,
$msg." ". date("Y-m-d H:i:s"),
array('receipt' => 'message-123')
);
echo 'result='.var_export($result,true)."\n";
msg("Done.");
}
} catch(StompException $e) {
die('Connection failed: ' . $e->getMessage());
}

Вот 30 строк моего stompconsumer.php:

<?php

$queue  = '/aaaa';
$_waitTimer=5000000;
$_timeLastAsk = microtime(true);

function msg($txt)
{
echo date('H:i:s > ').$txt."\n";
}

try {
$stomp = new Stomp('tcp://localhost:61613');
$stomp->subscribe($queue, array('activemq.prefetchSize' => 40));
$stomp->setReadTimeout(0, 10000);
while (true) {
$frames_read=array();
while ($stomp->hasFrame()) {
$frame = $stomp->readFrame();
if ($frame != null) {
array_push($frames_read, $frame);
}
if (count($frames_read)==40) {
break;
}
}
msg("Nombre de frames lues : ".count($frames_read));
msg("Pause...");
$e=$_waitTimer-(microtime(true)-$_timeLastAsk);
if ($e>0) {
usleep($e);
}
if (count($frames_read)>0) {
msg("Ack now...");
foreach ($frames_read as $frame) {
$stomp->ack($frame);
}
}
$_timeLastAsk = microtime(true);
}
} catch(StompException $e) {
die('Connection failed: ' . $e->getMessage());
}

Я не могу сделать синхронный продюсер, то есть продюсер, который ждет подтверждения потребителя. Если вы запустите образцы, которые я сделал здесь, вы увидите, что производитель мгновенно отправляет все сообщения, затем выходит со всеми «true», как «ok» результатами при вызове $stomp->send(),
Я до сих пор не нашел ни хороших примеров, ни хорошей документации с простым примером блокировки.

Что я должен сделать, чтобы мой производитель заблокировал, пока потребитель не отправит подтверждение?

NB: я прочитал всю документацию Вот и топать вопросы о php на stackoverflow Вот а также Вот.

0

Решение

Первое, что пришло мне в голову: взгляните на этот плагин Stomp:

http://activemq.apache.org/message-redelivery-and-dlq-handling.html

Еще один обходной путь:
На стороне производителя:
1. Измените вашего производителя для отправки постоянных сообщений

На вашей стороне потребителя:
Используйте таймер.
1. Прочитайте сообщение / фреймы, пока не достигнете пустого или максимального значения.
2. Создайте запрос CURL и пустой упакованный список сообщений.
3. Спи свой сервер на 5 секунд

Вы определенно должны проверить это дальше, но должно работать. Как только процесс проснется, вы сможете прочитать все сообщения в очереди.

Что нужно учитывать:
— постоянным сообщениям потребуется время истечения
— Вам потребуется ACK на стороне потребителя, чтобы удостовериться в обновлении статуса уже посещенных сообщений. Используйте ACK = client, чтобы вы могли подтверждать все подтвержденные сообщения
— Это проще, если вам не нужно ждать, пока ваш CURL ответит.
— Из коробки не поддерживается отправка ACK от потребителя (на стороне сервера).

Удачи

0

Другие решения

Из вопроса звучит так, будто вы ищете шаблон сообщения типа запрос / ответ. Это то, что вы должны реализовать сами, так как STOMP, на который вы ссылаетесь, только отправляет сообщение брокеру сообщений от имени потребителя, производитель не знает об этом. Запрос ответа включает в себя установку адреса для ответа на исходящее сообщение, а затем ожидание получения ответа на этот адрес перед отправкой следующего сообщения. Есть очень много статей, которые документируют такие вещи, как эта один.

Или, если вам нужно только узнать, получил ли брокер сообщение от клиента и сохранил его, вы можете использовать встроенный STOMP. чек механизм, чтобы брокер отправлял вам квитанцию ​​о том, что он обработал ваше отправленное сообщение. Это, однако, не гарантирует, что потребитель уже обработал сообщение.

0

Я только что вспомнил, вы можете попробовать реагировать библиотеки / Stomp.
Это управляемая событиями библиотека, которая может вам помочь. специально взглянуть на основные функции addPeriodicTimer

https://github.com/reactphp/stomp

ура

0
По вопросам рекламы [email protected]