Порядок атомарной памяти в C ++ 11 — это правильное использование упорядоченного порядка (релиз-потребление)?

Недавно я сделал порт для C ++ 11 с использованием std :: atomic тройного буфера, который будет использоваться в качестве механизма синхронизации параллелизма. Идея этого подхода синхронизации потоков заключается в том, что для ситуации производитель-потребитель, где у вас работает поставщик, который работает Быстрее что тройная буферизация потребителя может дать некоторые преимущества, поскольку поток производителя не будет «замедлен» из-за необходимости ждать потребителя. В моем случае у меня есть физический поток, который обновляется со скоростью ~ 120 кадров в секунду, и поток рендеринга, который работает со скоростью ~ 60 кадров в секунду. Очевидно, я хочу, чтобы поток рендеринга всегда получал самое последнее возможное состояние, но я также знаю, что я буду пропускать много кадров из потока физики из-за разницы в скоростях. С другой стороны, я хочу, чтобы физический поток поддерживал постоянную частоту обновления и не был ограничен медленным потоком рендеринга, блокирующим мои данные.

Оригинальный код на C был написан ремиссионами, а полное объяснение в его блог. Я призываю всех, кто заинтересован в чтении, для дальнейшего понимания оригинальной реализации.

Моя реализация может быть найдена Вот.

Основная идея состоит в том, чтобы иметь массив с 3 позициями (буферами) и атомарный флаг, который сравнивается и заменяется, чтобы определить, какие элементы массива соответствуют какому состоянию в любой момент времени. Таким образом, только одна атомарная переменная используется для моделирования всех 3 индексов массива и логики тройной буферизации. 3 позиции буфера называются «грязный», «чистый» и «мгновенный». режиссер всегда записывает в индекс «грязный» и может перевернуть записывающее устройство, чтобы заменить «грязный» текущим индексом «чистый». потребитель может запросить новую Snap, которая меняет текущий индекс Snap на индекс Clean, чтобы получить самый последний буфер. потребитель всегда читает буфер в положении Snap.

Флаг состоит из 8-битного целого без знака, и биты соответствуют:

(не используется) (новая запись) (2x грязный) (2x чистый) (2x snap)

Флаг дополнительных битов newWrite устанавливается писателем и сбрасывается читателем. Читатель может использовать это, чтобы проверить, были ли какие-либо записи с момента последней привязки, и если нет, это не займет еще одну привязку. Флаг и индексы могут быть получены с помощью простых побитовых операций.

Хорошо, теперь для кода:

template <typename T>
class TripleBuffer
{

public:

TripleBuffer<T>();
TripleBuffer<T>(const T& init);

// non-copyable behavior
TripleBuffer<T>(const TripleBuffer<T>&) = delete;
TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;

T snap() const; // get the current snap to read
void write(const T newT); // write a new value
bool newSnap(); // swap to the latest value, if any
void flipWriter(); // flip writer positions dirty / clean

T readLast(); // wrapper to read the last available element (newSnap + snap)
void update(T newT); // wrapper to update with a new element (write + flipWriter)

private:

bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes

// 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
// newWrite   = (flags & 0x40)
// dirtyIndex = (flags & 0x30) >> 4
// cleanIndex = (flags & 0xC) >> 2
// snapIndex  = (flags & 0x3)
mutable atomic_uint_fast8_t flags;

T buffer[3];
};

реализация:

template <typename T>
TripleBuffer<T>::TripleBuffer(){

T dummy = T();

buffer[0] = dummy;
buffer[1] = dummy;
buffer[2] = dummy;

flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}

template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){

buffer[0] = init;
buffer[1] = init;
buffer[2] = init;

flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}

template <typename T>
T TripleBuffer<T>::snap() const{

return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}

template <typename T>
void TripleBuffer<T>::write(const T newT){

buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}

template <typename T>
bool TripleBuffer<T>::newSnap(){

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do {
if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
return false;
} while(!flags.compare_exchange_weak(flagsNow,
swapSnapWithClean(flagsNow),
memory_order_release,
memory_order_consume));
return true;
}

template <typename T>
void TripleBuffer<T>::flipWriter(){

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow,
newWriteSwapCleanWithDirty(flagsNow),
memory_order_release,
memory_order_consume));
}

template <typename T>
T TripleBuffer<T>::readLast(){
newSnap(); // get most recent value
return snap(); // return it
}

template <typename T>
void TripleBuffer<T>::update(T newT){
write(newT); // write new value
flipWriter(); // change dirty/clean buffer positions for the next update
}

template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
// check if the newWrite bit is 1
return ((flags & 0x40) != 0);
}

template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
// swap snap with clean
return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}

template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
// set newWrite bit to 1 and swap clean with dirty
return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}

Как видите, я решил использовать Выпуск-Потребляйте шаблон для упорядочения памяти.
Релиз (memory_order_release) для магазина гарантирует, что никакие записи в текущем потоке не могут быть переупорядочены после магазин. С другой стороны, потреблять гарантирует, что никакие чтения в текущем потоке в зависимости от загруженного значения могут быть переупорядочены до этот груз. Это гарантирует, что записи в зависимые переменные в других потоках, которые выпускают ту же атомарную переменную, видны в текущем потоке.

Если мое понимание верно, так как мне нужно, чтобы флаги устанавливались только атомарно, компилятор может свободно переупорядочивать операции над другими переменными, которые не влияют непосредственно на флаги, что позволяет проводить больше оптимизаций. Прочитав некоторые документы о новой модели памяти, я также осознаю, что эти смягченные атомные эффекты будут иметь заметное влияние только на такие платформы, как ARM и POWER (они были представлены в основном из-за них). Поскольку я нацеливаюсь на ARM, я считаю, что смогу извлечь выгоду из этих операций и смогу немного повысить производительность.

Теперь к вопросу:

Правильно ли я использую расслабленный порядок выпуска-потребления для этой конкретной проблемы?

Спасибо,

Андре

PS: извините за длинный пост, но я полагал, что для лучшего понимания проблемы нужен какой-то приличный контекст.

РЕДАКТИРОВАТЬ :
Реализованы предложения @ Yakk:

  • Исправлена flags читать дальше newSnap() а также flipWriter() которые использовали прямое назначение, следовательно, используя по умолчанию load(std::memory_order_seq_cst),
  • Для ясности перемещены операции с битами в специальные функции.
  • добавленной bool тип возврата newSnap(), теперь возвращает ложь, когда нет ничего нового, и правда в противном случае.
  • Определенный класс как не подлежащий копированию с помощью = delete идиома, так как конструкторы копирования и присваивания были небезопасны, если TripleBuffer использовался.

РЕДАКТИРОВАТЬ 2:
Исправлено описание, которое было неверным (спасибо @Useless). Это потребитель который запрашивает новую Snap и читает из индекса Snap (не «писатель»). Извините за отвлечение и спасибо Бесполезно за то, что указал на это.

РЕДАКТИРОВАТЬ 3:
Оптимизировано newSnap() а также flipriter() работает в соответствии с предложениями @Display Name, эффективно удаляя 2 лишних load()за цикл цикла.

38

Решение

Почему вы дважды загружаете старые значения флагов в свои циклы CAS? Первый раз flags.load()и второй по compare_exchange_weak(), который стандарт указывает на сбой CAS, загрузит предыдущее значение в первый аргумент, который в этом случае является flagsNow.

В соответствии с http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange, «В противном случае загружает фактическое значение, сохраненное в * this, в ожидаемое (выполняет операцию загрузки).«Итак, что делает ваш цикл, так это при неудаче, compare_exchange_weak() перезагружается flagsNow, затем цикл повторяется, и первый оператор загружает его еще раз, сразу после загрузки compare_exchange_weak(), Мне кажется, что ваша петля должна вместо этого вытягивать нагрузку за пределы петли. Например, newSnap() было бы:

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do
{
if( !isNewWrite(flagsNow)) return false; // nothing new, no need to swap
} while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume));

а также flipWriter():

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume));
2

Другие решения

Да, это разница между memory_order_acquire и memory_order_consume, но вы не заметите этого, когда будете использовать 180 или около того в секунду. Вы можете запустить мой тест с m2 = memory_order_consume, если хотите узнать ответ в цифрах. Просто замените provider_or_consumer_Thread на что-то вроде этого:

TripleBuffer <int> tb;

void producer_or_consumer_Thread(void *arg)
{
struct Arg * a = (struct Arg *) arg;
bool succeeded = false;
int i = 0, k, kold = -1, kcur;

while (a->run)
{
while (a->wait) a->is_waiting = true; // busy wait
if (a->producer)
{
i++;
tb.update(i);
a->counter[0]++;
}
else
{
kcur = tb.snap();
if (kold != -1 && kcur != kold) a->counter[1]++;
succeeded = tb0.newSnap();
if (succeeded)
{
k = tb.readLast();
if (kold == -1)
kold = k;
else if (kold = k + 1)
kold = k;
else
succeeded = false;
}
if (succeeded) a->counter[0]++;
}
}
a->is_waiting =  true;
}

Результат испытаний:

_#_  __Produced __Consumed _____Total
1    39258150   19509292   58767442
2    24598892   14730385   39329277
3    10615129   10016276   20631405
4    10617349   10026637   20643986
5    10600334    9976625   20576959
6    10624009   10069984   20693993
7    10609040   10016174   20625214
8    25864915   15136263   41001178
9    39847163   19809974   59657137
10    29981232   16139823   46121055
11    10555174    9870567   20425741
12    25975381   15171559   41146940
13    24311523   14490089   38801612
14    10512252    9686540   20198792
15    10520211    9693305   20213516
16    10523458    9720930   20244388
17    10576840    9917756   20494596
18    11048180    9528808   20576988
19    11500654    9530853   21031507
20    11264789    9746040   21010829
1

По вопросам рекламы [email protected]