symfony2 FOSElasticaBundle: как отложить обновление документа, когда служба эластичного поиска не работает?

Я использую symfony2 и FOSElasticaBundle.

Моя служба поиска по поиску часто бывает убита или не работает по неизвестной причине. Я поместил systemctl с restart always как временное исправление.

Тем не менее, если он отключен, слушательasticsearch, который выполняет обновление индекса, когда доктрина обновляет сущность, выдает мне ошибку 52:

Не удалось подключиться к хосту, Elasticsearch отключен?

Таким образом, это происходит при регистрации, если также используется FOSUserBundle, который обновляет дату последнего подключения пользователя. Это очень раздражает иметь такую ​​зависимость от эластичного поиска. Я добавил прослушиватель исключений для этой ошибки, но я бы предпочел, чтобы пакет сохранил обновление в течение более позднего времени, когда служба снова станет доступной.

Просматривая файлы комплекта, я нашел:

продавец / friendsofsymfony / эластик-расслоение / стойкая бактерия / ObjectPersister.php

public function replaceMany(array $objects)
{
$documents = array();
foreach ($objects as $object) {
$document = $this->transformToElasticaDocument($object);
$document->setDocAsUpsert(true);
$documents[] = $document;
}

try {
$this->type->updateDocuments($documents);
} catch (BulkException $e) {
$this->log($e);
}
}

Это сервис, который я подключил, может быть перезаписан следующим образом, но это класс, который наследует другой, и дочерние экземпляры создаются вместо вызова как сервиса, поэтому я не вижу, как его перезаписать. Как я мог ?

    try {
$this->type->updateDocuments($documents);
} catch (\Exception $e) {
if ($e instanceof BulkException)
{
$this->log($e);
}
elseif ($e->getMessage() != "Couldn't connect to host, Elasticsearch down?")
{
throw $e;
}
}

Затем, как я могу убедиться, что документ будет обновлен в следующий раз, когда услуга будет доступна?

РЕДАКТИРОВАТЬ:

Мой след, когда я получаю ошибку:

Stack Trace
in vendor/ruflin/elastica/lib/Elastica/Transport/Http.php at line 153   -
}
if ($errorNumber > 0) {
throw new HttpException($errorNumber, $request, $response);
}
return $response;
at Http ->exec (object(Request), array('connection' => array('config' => array('headers' => array()), 'host' => 'localhost', 'port' => '9200', 'logger' => 'fos_elastica.logger', 'enabled' => true)))
in vendor/ruflin/elastica/lib/Elastica/Request.php at line 167   +
at Request ->send ()
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 587   +
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"[email protected]","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array())
in vendor/friendsofsymfony/elastica-bundle/Elastica/Client.php at line 47   +
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"[email protected]","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array())
in vendor/ruflin/elastica/lib/Elastica/Bulk.php at line 342   +
at Bulk ->send ()
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 270   +
at Client ->updateDocuments (array(object(Document)))
in vendor/ruflin/elastica/lib/Elastica/Index.php at line 131   +
at Index ->updateDocuments (array(object(Document)))
in vendor/ruflin/elastica/lib/Elastica/Type.php at line 174   +
at Type ->updateDocuments (array(object(Document)))
in vendor/friendsofsymfony/elastica-bundle/Persister/ObjectPersister.php at line 144   +
at ObjectPersister ->replaceMany (array(object(User)))
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 151   +
at Listener ->persistScheduled ()
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 182   +
at Listener ->postFlush (object(PostFlushEventArgs))
in vendor/symfony/symfony/src/Symfony/Bridge/Doctrine/ContainerAwareEventManager.php at line 63   +
at ContainerAwareEventManager ->dispatchEvent ('postFlush', object(PostFlushEventArgs))
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 3318   +
at UnitOfWork ->dispatchPostFlushEvent ()
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 428   +
at UnitOfWork ->commit (null)
in vendor/doctrine/orm/lib/Doctrine/ORM/EntityManager.php at line 357   +
at EntityManager ->flush (null)
in src/AppBundle/Model/Classes/CustomBaseController.php at line 61   +
at CustomBaseController ->flush ()
in src/AppBundle/Controller/Core/VoteController.php at line 68   +
at VoteController ->voteAction (object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes')
at call_user_func_array (array(object(VoteController), 'voteAction'), array(object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes'))
in app/bootstrap.php.cache at line 3029   +
at HttpKernel ->handleRaw (object(Request), '1')
in app/bootstrap.php.cache at line 2991   +
at HttpKernel ->handle (object(Request), '1', true)
in app/bootstrap.php.cache at line 3140   +
at ContainerAwareHttpKernel ->handle (object(Request), '1', true)
in app/bootstrap.php.cache at line 2384   +
at Kernel ->handle (object(Request))
in web/app_dev.php at line 36   +

0

Решение

Очередь сообщений идеально соответствует вашим требованиям. Вы отправляете сообщение в MQ всякий раз, когда ваша модель обновляется. Вот и все для веб-процесса. Затем у вас есть пул работников, которые используют сообщения из MQ и пытаются обновить индекс ES. Если ES не работает прямо сейчас, возникнет исключение, рабочий умрет, и сообщение вернется в очередь. Таким образом, сообщение все еще в MQ, как только ES работает онлайн, работники делают свою работу.

Тот же шаблон можно использовать не только с ES, но и с любыми другими сторонними сервисами. Например, вы хотите отправить очень важное электронное письмо, но почтовый сервер не работает, вы не можете ждать, пока вам придется отправить ответ клиенту. Так что положите его в MQ и позвольте брокеру и работникам делать свою работу.

Вот код того, как это можно сделать с помощью Ставить Библиотека MQ. Установка и настройка довольно легко сделать так что я пропущу это.

Стандартный слушатель должен быть заменен тем, который отправляет сообщения:

<?php
use Enqueue\Client\ProducerInterface;

class ElasticaUpdateIndexListener
{
private $producer;

public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}

public function postPersist(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();

$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'insert'
]);
}

public function postUpdate(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();

$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'update'
]);
}

public function preRemove(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();

$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'delete'
]);
}
}

Процессор для этих сообщений выглядит так:

<?php

class ElasticaUpdateIndexProcessor implements PsrProcessor, CommandSubscriberInterface
{
private $doctrine;

protected $objectPersister;

protected $propertyAccessor;

private $indexable;

public function __construct(Registry $doctrine, ObjectPersisterInterface $objectPersister, IndexableInterface $indexable)
{
$this->indexable = $indexable;
$this->objectPersister = $objectPersister;
$this->propertyAccessor = PropertyAccess::createPropertyAccessor();
$this->doctrine = $doctrine;
}

public function process(PsrMessage $message, PsrContext $context)
{
$data = JSON::encode($message->getBody());

if ($data['type'] == 'delete') {
$this->objectPersister->deleteManyByIdentifiers([$data['entityId']]);

return self::ACK;
}

if (false == $entity = $this->doctrine->getManagerForClass($data['entityClass'])->find($data['entityId'])) {
return self::REJECT;
}

if (false == ($this->objectPersister->handlesObject($entity) && $this->isObjectIndexable($entity))) {
return self::ACK;
}

if ($data['type'] == 'insert') {
$this->objectPersister->insertMany([$this->scheduledForInsertion]);

return self::ACK;
}

if ($data['type'] == 'update') {
$this->objectPersister->replaceMany([$this->scheduledForInsertion]);

return self::ACK;
}

return self::REJECT;
}

private function isObjectIndexable($object)
{
return $this->indexable->isObjectIndexable(
$this->config['indexName'],
$this->config['typeName'],
$object
);
}

public static function getSubscribedCommand()
{
return 'elastica_index_entity';
}
}

И запустить несколько рабочих:

./bin/console enqueue:consume --setup-broker -vvv
1

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]