Проблема параллелизма PHP, множественные одновременные запросы; мьютексы?

Итак, я только что понял, что PHP потенциально выполняет несколько запросов одновременно. Журналы прошлой ночи, кажется, показывают, что два запроса поступили, были обработаны параллельно; каждый запускает импорт данных с другого сервера; каждый пытался вставить запись в базу данных. Один запрос не удался, когда он попытался вставить запись, которую только что вставил другой поток (импортированные данные поставляются с PK; я не использую увеличивающиеся идентификаторы): SQLSTATE[23000]: Integrity constraint violation: 1062 Duplicate entry '865020' for key 'PRIMARY' ...,

  1. Правильно ли я диагностировал эту проблему?
  2. Как мне это решить?

Ниже приведен фрагмент кода. Я удалил большую часть этого (регистрация, создание других сущностей за пределами Пациента из данных), но следующее должно включать соответствующие фрагменты. Запросы попадают в метод import (), который вызывает importOne () для каждой импортируемой записи. Обратите внимание на метод сохранения в importOne (); это метод Eloquent (использующий Laravel и Eloquent), который генерирует SQL для вставки / обновления записи в зависимости от ситуации.

public function import()
{
$now = Carbon::now();
// Get data from the other server in the time range from last import to current import
$calls = $this->getCalls($this->getLastImport(), $now);
// For each call to import, insert it into the DB (or update if it already exists)
foreach ($calls as $call) {
$this->importOne($call);
}
// Update the last import time to now so that the next import uses the correct range
$this->setLastImport($now);
}

private function importOne($call)
{
// Get the existing patient for the call, or create a new one
$patient = Patient::where('id', '=', $call['PatientID'])->first();
$isNewPatient = $patient === null;
if ($isNewPatient) {
$patient = new Patient(array('id' => $call['PatientID']));
}
// Set the fields
$patient->given_name = $call['PatientGivenName'];
$patient->family_name = $call['PatientFamilyName'];
// Save; will insert/update appropriately
$patient->save();
}

Я думаю, что решение потребует мьютекс вокруг всего блока импорта? И если запрос не может получить мьютекс, он просто переходит к остальной части запроса. Мысли?

РЕДАКТИРОВАТЬ: просто чтобы заметить, это не критический сбой. Исключение перехватывается и регистрируется, а затем на запрос реагируется, как обычно. И импорт успешно выполняется по другому запросу, и затем на этот запрос поступают ответы, как обычно. Пользователи не мудры; они даже не знают об импорте, и это не является главной целью поступающего запроса. Так что на самом деле, я мог бы просто оставить все как есть, и кроме случайного исключения, ничего плохого не происходит. Но если есть исправление для предотвращения дополнительной работы / ненужных отправок нескольких запросов на этот другой сервер, это может быть оправдано.

РЕДАКТИРОВАТЬ 2: Хорошо, я принял решение о реализации механизма блокировки с помощью flock (). Мысли? Будет ли следующая работа? И как бы я протестировал это дополнение?

public function import()
{
try {
$fp = fopen('/tmp/lock.txt', 'w+');
if (flock($fp, LOCK_EX)) {
$now = Carbon::now();
$calls = $this->getCalls($this->getLastImport(), $now);
foreach ($calls as $call) {
$this->importOne($call);
}
$this->setLastImport($now);
flock($fp, LOCK_UN);
// Log success.
} else {
// Could not acquire file lock. Log this.
}
fclose($fp);
} catch (Exception $ex) {
// Log failure.
}
}

EDIT3: Мысли о следующей альтернативной реализации блокировки:

public function import()
{
try {
if ($this->lock()) {
$now = Carbon::now();
$calls = $this->getCalls($this->getLastImport(), $now);
foreach ($calls as $call) {
$this->importOne($call);
}
$this->setLastImport($now);
$this->unlock();
// Log success
} else {
// Could not acquire DB lock. Log this.
}
} catch (Exception $ex) {
// Log failure
}
}

/**
* Get a DB lock, returns true if successful.
*
* @return boolean
*/
public function lock()
{
return DB::SELECT("SELECT GET_LOCK('lock_name', 1) AS result")[0]->result === 1;
}

/**
* Release a DB lock, returns true if successful.
*
* @return boolean
*/
public function unlock()
{
return DB::select("SELECT RELEASE_LOCK('lock_name') AS result")[0]->result === 1;
}

28

Решение

Не похоже, что у вас есть состояние гонки, потому что идентификатор исходит из файла импорта, и если ваш алгоритм импорта работает правильно, то у каждого потока будет свой осколок выполняемой работы, и он никогда не должен конфликтовать с другие. Теперь кажется, что 2 потока получают запрос на создание одного и того же пациента и конфликтуют друг с другом из-за плохого алгоритма.

conflictfree

Убедитесь, что каждый порожденный поток получает новую строку из файла импорта и повторяется только при ошибке.

Если вы не можете этого сделать и хотите придерживаться мьютекса, использование блокировки файлов не кажется очень хорошим решением, поскольку теперь вы решили конфликт внутри приложения, в то время как он фактически возникает в вашей базе данных. Блокировка БД также должна быть намного быстрее, и в целом более приемлемое решение.

Запросите блокировку базы данных, например так:

$ db -> exec (‘LOCK TABLES table1 ЗАПИСЫВАТЬ, table2 ЗАПИСЫВАТЬ’);

И вы можете ожидать ошибку SQL, когда будете писать в заблокированную таблицу, поэтому окружите свой Patient-> save () уловом try.

Еще лучшим решением было бы использование условного атомарного запроса. Запрос БД, который также имеет условие внутри него. Вы можете использовать запрос как этот:

INSERT INTO targetTable(field1)
SELECT field1
FROM myTable
WHERE NOT(field1 IN (SELECT field1 FROM targetTable))
5

Другие решения

Ваш пример кода будет блокировать второй запрос, пока первый не будет завершен. Вам нужно будет использовать LOCK_NB вариант для flock() немедленно вернуть ошибку и не ждать.

Да, вы можете использовать блокировку или семафоры либо на уровне файловой системы, либо непосредственно в базе данных.

В вашем случае, когда вам нужно, чтобы каждый файл импорта обрабатывался только один раз, лучшим решением будет иметь таблицу SQL со строкой для каждого файла импорта. В начале импорта вы вставляете информацию о том, что импорт выполняется, поэтому другие потоки будут знать, что больше не нужно его обрабатывать. После завершения импорта вы отмечаете его как таковой. (Затем несколько часов спустя вы можете проверить таблицу, чтобы убедиться, что импорт действительно завершен.)

Также лучше делать такие одноразовые длительные вещи, как импорт по отдельным сценариям, а не при обслуживании обычных веб-страниц для посетителей. Например, вы можете запланировать ночное задание cron, которое будет брать файл импорта и обрабатывать его.

5

Я вижу три варианта:

— использовать мьютекс / семафор / какой-то другой флаг — нелегко кодировать и поддерживать

— использовать встроенный механизм транзакций БД

— использовать очередь (например, RabbitMQ или 0MQ) для записи сообщений в БД подряд

0
По вопросам рекламы [email protected]