я использую https://github.com/videlalvaro/php-amqplib сделать некоторую работу rabbitmq:
Я пытаюсь создать блокирующую версию basic_get (или версию basic_consume, которую я могу вызывать несколько раз и получать только одну msg каждый раз), которая будет блокировать до готовности сообщения, а затем будет возвращать его вместо возврата null, если нет в очереди.
Когда я пытаюсь получить единственное сообщение с basic_consume, все смешивается, и я получаю кучу «не готовых», но неиспользованных сообщений. (Если я получаю только одно сообщение таким образом, оно работает каждый раз, если я пытаюсь получить 2 сообщения, оно иногда зависает и работает с другими)
class Foo {
...
private function blockingGet() {
/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
*/
$this->ch->basic_consume($this->queueName, "consumer_".$this->consumerNum++, false, false, false, false, function($msg) {
$this->msgCache = json_decode($msg->body);
$this->ch->basic_ack($msg->delivery_info['delivery_tag']);
$this->ch->basic_cancel($msg->delivery_info['consumer_tag']);
});
while (count($this->ch->callbacks)) {
$this->ch->wait();
}
return $this->msgCache;
}
}
$q = new Foo();
for ($i = 0; $i < 5; $i++) {
print $q->blockingGet();
}
Я реализовал нечто похожее на то, что вам нужно, сохранив полученное сообщение в закрытии, переданном параметру обратного вызова $channel->basic_consume()
, а затем иметь дело с этим после $channel->wait()
позвони, как wait()
вернет управление, если получено сообщение (или если задан параметр тайм-аута и достигнут тайм-аут). Попробуйте что-то вроде ниже:
class Foo {
// ...
public function __construct() {
$this->ch->basic_consume($this->queueName, "", false, false, false, false, function($msg) {
$this->msgCache = json_decode($msg->body);
$this->ch->basic_ack($msg->delivery_info['delivery_tag']);
});
}
// ...
private function blockingGet() {
$this->ch->wait();
if ($this->msgCache) {
$msgCache = $this->msgCache;
$this->msgCache = null;
return $msgCache;
}
return null;
}
}
$q = new Foo();
for ($i = 0; $i < 5; $i++) {
print $q->blockingGet();
}
Других решений пока нет …