Pheanstalk резервирование одной и той же работы beanstalkd дважды или более

У меня есть 3 процесса очереди beanstalkd, работающих на одном и том же IP, но на разных портах. У меня есть отдельный сервер, на котором работает супервизор, порождающий php-работников параллельно (по 20 на порт beanstalkd) для обработки очереди. Моя проблема в том, что кажется, что два процесса могут зарезервировать один и тот же идентификатор задания на одном сервере в одно и то же время.

Вот пример вывода из моего журнала:

2017-02-23 09:59:56 --> START JOB (port: 11301 | u: 0.45138600 1487861996 | jid:1695074 | pid:30019 | j:leads_to_tags_add | tr:1)
2017-02-23 09:59:57 --> START JOB (port: 11301 | u: 0.55024800 1487861997 | jid:1695074 | pid:30157 | j:leads_to_tags_add | tr:2)
2017-02-23 09:59:58 --> DEL   JOB (port: 11301 | u: 0.54731000 1487861998 | jid:1695074 | pid:30019 | j:leads_to_tags_add)
2017-02-23 09:59:58 --> DEL   JOB (port: 11301 | u: 0.58927900 1487861998 | jid:1695074 | pid:30157 | j:leads_to_tags_add)

Кажется, что два резерва происходят сразу после друг друга, а второй резерв происходит до того, как первый процесс завершает работу и удаляет задание.

Я добавил счетчик в redis per jobid, и стало ясно, что к тому времени, когда он зарезервирует второй раз, счетчик повысился один раз (tr). TTRR установлен на 3600, поэтому он не может истечь до завершения первого процесса.

Вот как выглядит статус задания сразу после второго резерва процесса:

Pheanstalk\Response\ArrayResponse::__set_state(array(
'id' => '1695074',
'tube' => 'action-medium',
'state' => 'reserved',
'pri' => '0',
'age' => '1',
'delay' => '0',
'ttr' => '3600',
'time-left' => '3599',
'file' => '385',
'reserves' => '2',
'timeouts' => '0',
'releases' => '0',
'buries' => '0',
'kicks' => '0',
))

Такое поведение очень случайное, иногда только один процесс сможет зарезервировать, пока задание не заблокируется, иногда 2, иногда даже 4 или более (редко). Конечно, это создает непоследовательное количество выполняемых дубликатов.

Краткая версия кода:

$this->job = $this->pheanstalk->watch($tube)->reserve($timeout);set_error_handler(function($errno, $errstr, $errfile, $errline, array $errcontext) {
// error was suppressed with the @-operator
if (0 === error_reporting()) {
return false;
}

throw new ErrorException($errstr, 0, $errno, $errfile, $errline);
});

$this->log_message('info', __METHOD__ . ": START JOB (" . $this->_logDetails() . " | tr:{$tries})");if ($this->_process_job()) {
$this->log_message('info', __METHOD__ . ": FINISHED JOB (" . $this->_logDetails() . ")");
$this->_delete_job();
} else {
$this->log_message('error', __METHOD__ . ": FAILED JOB (" . $this->_logDetails() . ")");
}

restore_error_handler();

а также

protected function _delete_job()
{
$this->pheanstalk->delete($this->job);
$this->log_message('info', __METHOD__ . ": DELETED JOB (" . $this->_logDetails() . ")");
}

0

Решение

Проблема заключалась в отключении TCP в бите process_job моего кода. Объект pheanstalk будет перезаписан где-то вниз по цепочке выполнения.

У меня есть библиотека pheanstalk, которая проверяет наличие самого пустого сервера beanstalkd, доступного перед отправкой команды put. Затем он создаст новый экземпляр Pheanstalk с лучшими деталями сервера, сохранит его как текущее соединение и затем отправит команду.

Иногда рабочий может отправить подзадачу в очередь. Библиотека будет загружена в рабочий сначала для получения задания, и снова в process_job для отправки в очередь. Из-за внедрения зависимостей в Codeigniter эта библиотека будет ссылаться на один и тот же объект, а внутри process_job она будет перезаписывать текущее соединение рабочего и вызывать разрыв TCP.

0

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]