Я не понимаю, как можно реализовать оптимистичный параллелизм в C ++ 11

Я пытаюсь реализовать защищенную переменную, которая не использует блокировки в C ++ 11. Я немного читал об оптимистическом параллелизме, но не могу понять, как его можно реализовать ни в C ++, ни на каком-либо языке.

Я пытаюсь реализовать оптимистичный параллелизм с помощью «идентификатора последней модификации». Процесс, который я делаю:

  • Возьмите копию последней модификации id.
  • Изменить защищенное значение.
  • Сравните локальную копию идентификатора модификации с текущей.
  • Если приведенное выше сравнение верно, передайте изменения.

Проблема, которую я вижу в том, что после сравнения «идентификаторы последней модификации» (локальная копия и текущая копия) и перед совершением перемены, нет никакого способа гарантировать, что никакие другие потоки не изменили значение защищенной переменной.

Ниже приведен пример кода. Предположим, что многие потоки выполняют этот код и совместно используют переменную var,

/**
* This struct is pretended to implement a protected variable,
* but using optimistic concurrency instead of locks.
*/
struct ProtectedVariable final {

ProtectedVariable() : var(0), lastModificationId(0){ }

int getValue() const {
return var.load();
}

void setValue(int val) {
// This method is not atomic, other thread could change the value
// of val before being able to increment the 'last modification id'.
var.store(val);
lastModificationId.store(lastModificationId.load() + 1);
}

size_t getLastModificationId() const {
return lastModificationId.load();
}

private:
std::atomic<int> var;
std::atomic<size_t> lastModificationId;
};ProtectedVariable var;/**
* Suppose this method writes a value in some sort of database.
*/
int commitChanges(int val){
// Now, if nobody has changed the value of 'var', commit its value,
// retry the transaction otherwise.
if(var.getLastModificationId() == currModifId) {

// Here is one of the problems. After comparing the value of both Ids, other
// thread could modify the value of 'var', hence I would be
// performing the commit with a corrupted value.
var.setValue(val);

// Again, the same problem as above.
writeToDatabase(val);

// Return 'ok' in case of everything has gone ok.
return 0;
} else {
// If someone has changed the value of var while trying to
// calculating and commiting it, return error;
return -1;
}
}

/**
* This method is pretended to be atomic, but without using locks.
*/
void modifyVar(){
// Get the modification id for checking whether or not some
// thread has modified the value of 'var' after commiting it.
size_t currModifId = lastModificationId.load();

// Get a local copy of 'var'.
int currVal = var.getValue();

// Perform some operations basing on the current value of
// 'var'.
int newVal = currVal + 1 * 2 / 3;

if(commitChanges(newVal) != 0){
// If someone has changed the value of var while trying to
// calculating and commiting it, retry the transaction.
modifyVar();
}
}

Я знаю, что вышеприведенный код содержит ошибки, но я не понимаю, как правильно реализовать что-то подобное выше, без ошибок.

2

Решение

Оптимистичный параллелизм не означает, что вы не используете блокировки, это просто означает, что вы не сохраняете блокировки в течение большей части операции.

Идея состоит в том, что вы разбили свою модификацию на три части:

  1. Инициализация, как получение lastModificationId. Эта часть может нуждаться в замках, но не обязательно.
  2. Актуальные вычисления. Весь дорогой или блокирующий код идет сюда (включая любые записи на диск или сетевой код). Результаты написаны таким образом, что они не затеняют предыдущую версию. Вероятнее всего, это работает путем сохранения новых значений рядом со старыми, проиндексированных еще не переданной версией.
  3. Атомный коммит. Эта часть заблокирована и должна быть короткой, простой и не блокируемой. Скорее всего, он работает так, что он просто увеличивает номер версии — после подтверждения того, что за это время не было зафиксировано никакой другой версии. На этом этапе нет базы данных.

Основное предположение здесь заключается в том, что часть вычислений намного дороже части фиксации. Если ваша модификация тривиальна, а вычисления дешевы, то вы можете просто использовать блокировку, которая намного проще.

Некоторый пример кода, структурированный в эти 3 части, может выглядеть следующим образом:

struct Data {
...
}

...

std::mutex lock;
volatile const Data* value;  // The protected data
volatile int current_value_version = 0;

...

bool modifyProtectedValue() {
// Initialize.
int version_on_entry = value_version;

// Compute the new value, using the current value.
// We don't have any lock here, so it's fine to make heavy
// computations or block on I/O.
Data* new_value = new Data;
compute_new_value(value, new_value);

// Commit or fail.
bool success;
lock.lock();
if (current_value_version == version_on_entry) {
value = new_value;
current_value_version++;
success = true;
} else {
success = false;
}
lock.unlock();

// Roll back in case of failure.
if (!success) {
delete new_value;
}

// Inform caller about success or failure.
return success;
}

// It's cleaner to keep retry logic separately.
bool retryModification(int retries = 5) {
for (int i = 0; i < retries; ++i) {
if (modifyProtectedValue()) {
return true;
}
}
return false;
}

Это очень простой подход, и особенно откат является тривиальным. В реальном примере воссоздание всего объекта Data (или его аналога), вероятно, было бы невозможным, поэтому управление версиями должно было бы быть выполнено где-то внутри, а откат мог бы быть гораздо более сложным. Но я надеюсь, что это показывает общую идею.

1

Другие решения

Ключевым моментом здесь является семантика получения-выпуска и тестирование-и-приращение. Семантика Acquire-Release — это способ обеспечения порядка операций. Тест-и-приращение — это то, как вы выбираете, какой поток выигрывает в случае гонки.

Поэтому ваша проблема .store(lastModificationId+1), Тебе понадобиться .fetch_add(1), Возвращает старое значение. Если это не ожидаемое значение (от до ваше чтение), то вы проиграли гонку и повторите попытку.

1

Если я понимаю ваш вопрос, вы хотите убедиться, var а также lastModificationId либо оба изменены, либо нет.

Почему бы не использовать std::atomic<T> где T будет структура, которая содержит как int и size_t?

struct VarWithModificationId {
int var;
size_t lastModificationId;
};

class ProtectedVariable {
private std::atomic<VarWithModificationId> protectedVar;

// Add your public setter/getter methods here
// You should be guaranteed that if two threads access protectedVar, they'll each get a 'consistent' view of that variable, but the setter will need to use a lock
};
0

Оптимистичный параллелизм используется в ядрах баз данных, когда ожидается, что разные пользователи будут редко получать доступ к одним и тем же данным. Это может пойти так:

Первый пользователь читает данные и метку времени. Пользователь обрабатывает данные в течение некоторого времени, пользователь проверяет, не изменилась ли временная метка в БД с тех пор, как он прочитал данные, а если нет, то пользователь обновляет данные и временную метку.

Но внутренне DB-движок использует блокировки для обновления в любом случае, во время этой блокировки он проверяет, была ли изменена временная метка, и если она не была, движок обновляет данные. Просто время, за которое данные блокируются меньше, чем при пессимистическом параллелизме. И вам также нужно использовать какую-то блокировку.

0
По вопросам рекламы [email protected]