Я использовал Symfony2 с RabbitMqBundle создать работника, который отправляет документы в ElasticSearch. Индексирование документов с частотой один на один намного медленнее, чем при использовании массового API ElasticSearch. Поэтому я создал буфер, который сбрасывает документы в ES группами по тысяче. Код выглядит (немного упрощенно) следующим образом:
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
}
}
Все это работает довольно хорошо, но есть небольшая проблема. Очередь заполняется сообщениями с непредсказуемой скоростью. Иногда 100000 за 5 минут, иногда не по часам. Например, когда в очереди 82671 документ, последние 671 документ не индексируются до получения еще 329 документов, что может занять несколько часов. Я хотел бы иметь возможность сделать следующее:
Внимание: код Sci-Fi! Это явно не сработает:
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
protected $flushTimer;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
// Highly Sci-fi code
$this->flushTimer = new Timer();
// Flush buffer after 5 minutes of inactivity.
$this->flushTimer->setTimeout(5 * 60);
$this->flushTimer->setCallback([$this, 'flush']);
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
} else {
// Start a timer that will flush the buffer after a timeout.
$this->initTimer();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
// There are no longer messages to be send, stop the timer.
$this->flushTimer->stop();
}
protected function initTimer()
{
// Start or restart timer
$this->flushTimer->isRunning()
? $this->flushTimer->reset()
: $this->flushTimer->start();
}
}
Теперь я знаю об ограничениях PHP, не связанных с событиями. Но это 2015 год, и есть такие решения, как ReactPHP, так что это должно быть возможно, верно? Для ØMQ есть эта функция. Какое решение будет работать для RabbitMQ или независимо от какого-либо расширения очереди сообщений?
Решения, к которым я скептически отношусь:
declare(ticks = 1);
, Я не уверен, что это эффективный и надежный подход. Есть идеи?Как я уже упоминал в моем комментарии, вы можете использовать сигналы. PHP позволяет вам регистрировать обработчики сигналов для сигналов ваших скриптов (то есть SIGINT, SIGKILL и т. Д.)
В вашем случае вы можете использовать сигнал SIGALRM. Этот сигнал предупредит ваш скрипт по истечении определенного времени (которое вы можете установить). Положительной стороной этих сигналов является то, что они неблокирующие. Другими словами, нормальная работа вашего скрипта не будет нарушена.
Настроенное решение (галочки устарели с PHP 5.3):
function signal_handler($signal) {
// You would flush here
print "Caught SIGALRM\n";
// Set the SIGALRM timer again or it won't trigger again
pcntl_alarm(300);
}
// register your handler with the SIGALRM signal
pcntl_signal(SIGALRM, "signal_handler", true);
// set the timeout for the SIGALRM signal to 300 seconds
pcntl_alarm(300);
// start loop and check for pending signals
while(pcntl_signal_dispatch() && your_loop_condition) {
//Execute your code here
}
Примечание: вы можете использовать только 1 сигнал SIGALRM в вашем скрипте, если вы установите время сигнала с помощью pcntl_alarm
таймер для вашего будильника будет сброшен (без срабатывания сигнала) на новое значение.
Других решений пока нет …