Недавно я сделал порт для C ++ 11 с использованием std :: atomic тройного буфера, который будет использоваться в качестве механизма синхронизации параллелизма. Идея этого подхода синхронизации потоков заключается в том, что для ситуации производитель-потребитель, где у вас работает поставщик, который работает Быстрее что тройная буферизация потребителя может дать некоторые преимущества, поскольку поток производителя не будет «замедлен» из-за необходимости ждать потребителя. В моем случае у меня есть физический поток, который обновляется со скоростью ~ 120 кадров в секунду, и поток рендеринга, который работает со скоростью ~ 60 кадров в секунду. Очевидно, я хочу, чтобы поток рендеринга всегда получал самое последнее возможное состояние, но я также знаю, что я буду пропускать много кадров из потока физики из-за разницы в скоростях. С другой стороны, я хочу, чтобы физический поток поддерживал постоянную частоту обновления и не был ограничен медленным потоком рендеринга, блокирующим мои данные.
Оригинальный код на C был написан ремиссионами, а полное объяснение в его блог. Я призываю всех, кто заинтересован в чтении, для дальнейшего понимания оригинальной реализации.
Моя реализация может быть найдена Вот.
Основная идея состоит в том, чтобы иметь массив с 3 позициями (буферами) и атомарный флаг, который сравнивается и заменяется, чтобы определить, какие элементы массива соответствуют какому состоянию в любой момент времени. Таким образом, только одна атомарная переменная используется для моделирования всех 3 индексов массива и логики тройной буферизации. 3 позиции буфера называются «грязный», «чистый» и «мгновенный». режиссер всегда записывает в индекс «грязный» и может перевернуть записывающее устройство, чтобы заменить «грязный» текущим индексом «чистый». потребитель может запросить новую Snap, которая меняет текущий индекс Snap на индекс Clean, чтобы получить самый последний буфер. потребитель всегда читает буфер в положении Snap.
Флаг состоит из 8-битного целого без знака, и биты соответствуют:
(не используется) (новая запись) (2x грязный) (2x чистый) (2x snap)
Флаг дополнительных битов newWrite устанавливается писателем и сбрасывается читателем. Читатель может использовать это, чтобы проверить, были ли какие-либо записи с момента последней привязки, и если нет, это не займет еще одну привязку. Флаг и индексы могут быть получены с помощью простых побитовых операций.
Хорошо, теперь для кода:
template <typename T>
class TripleBuffer
{
public:
TripleBuffer<T>();
TripleBuffer<T>(const T& init);
// non-copyable behavior
TripleBuffer<T>(const TripleBuffer<T>&) = delete;
TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;
T snap() const; // get the current snap to read
void write(const T newT); // write a new value
bool newSnap(); // swap to the latest value, if any
void flipWriter(); // flip writer positions dirty / clean
T readLast(); // wrapper to read the last available element (newSnap + snap)
void update(T newT); // wrapper to update with a new element (write + flipWriter)
private:
bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes
// 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
// newWrite = (flags & 0x40)
// dirtyIndex = (flags & 0x30) >> 4
// cleanIndex = (flags & 0xC) >> 2
// snapIndex = (flags & 0x3)
mutable atomic_uint_fast8_t flags;
T buffer[3];
};
реализация:
template <typename T>
TripleBuffer<T>::TripleBuffer(){
T dummy = T();
buffer[0] = dummy;
buffer[1] = dummy;
buffer[2] = dummy;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){
buffer[0] = init;
buffer[1] = init;
buffer[2] = init;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
T TripleBuffer<T>::snap() const{
return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}
template <typename T>
void TripleBuffer<T>::write(const T newT){
buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}
template <typename T>
bool TripleBuffer<T>::newSnap(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do {
if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
return false;
} while(!flags.compare_exchange_weak(flagsNow,
swapSnapWithClean(flagsNow),
memory_order_release,
memory_order_consume));
return true;
}
template <typename T>
void TripleBuffer<T>::flipWriter(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow,
newWriteSwapCleanWithDirty(flagsNow),
memory_order_release,
memory_order_consume));
}
template <typename T>
T TripleBuffer<T>::readLast(){
newSnap(); // get most recent value
return snap(); // return it
}
template <typename T>
void TripleBuffer<T>::update(T newT){
write(newT); // write new value
flipWriter(); // change dirty/clean buffer positions for the next update
}
template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
// check if the newWrite bit is 1
return ((flags & 0x40) != 0);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
// swap snap with clean
return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
// set newWrite bit to 1 and swap clean with dirty
return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}
Как видите, я решил использовать Выпуск-Потребляйте шаблон для упорядочения памяти.
Релиз (memory_order_release) для магазина гарантирует, что никакие записи в текущем потоке не могут быть переупорядочены после магазин. С другой стороны, потреблять гарантирует, что никакие чтения в текущем потоке в зависимости от загруженного значения могут быть переупорядочены до этот груз. Это гарантирует, что записи в зависимые переменные в других потоках, которые выпускают ту же атомарную переменную, видны в текущем потоке.
Если мое понимание верно, так как мне нужно, чтобы флаги устанавливались только атомарно, компилятор может свободно переупорядочивать операции над другими переменными, которые не влияют непосредственно на флаги, что позволяет проводить больше оптимизаций. Прочитав некоторые документы о новой модели памяти, я также осознаю, что эти смягченные атомные эффекты будут иметь заметное влияние только на такие платформы, как ARM и POWER (они были представлены в основном из-за них). Поскольку я нацеливаюсь на ARM, я считаю, что смогу извлечь выгоду из этих операций и смогу немного повысить производительность.
Теперь к вопросу:
Правильно ли я использую расслабленный порядок выпуска-потребления для этой конкретной проблемы?
Спасибо,
Андре
PS: извините за длинный пост, но я полагал, что для лучшего понимания проблемы нужен какой-то приличный контекст.
РЕДАКТИРОВАТЬ :
Реализованы предложения @ Yakk:
flags
читать дальше newSnap()
а также flipWriter()
которые использовали прямое назначение, следовательно, используя по умолчанию load(std::memory_order_seq_cst)
,bool
тип возврата newSnap()
, теперь возвращает ложь, когда нет ничего нового, и правда в противном случае.= delete
идиома, так как конструкторы копирования и присваивания были небезопасны, если TripleBuffer
использовался.РЕДАКТИРОВАТЬ 2:
Исправлено описание, которое было неверным (спасибо @Useless). Это потребитель который запрашивает новую Snap и читает из индекса Snap (не «писатель»). Извините за отвлечение и спасибо Бесполезно за то, что указал на это.
РЕДАКТИРОВАТЬ 3:
Оптимизировано newSnap()
а также flipriter()
работает в соответствии с предложениями @Display Name, эффективно удаляя 2 лишних load()
за цикл цикла.
Почему вы дважды загружаете старые значения флагов в свои циклы CAS? Первый раз flags.load()
и второй по compare_exchange_weak()
, который стандарт указывает на сбой CAS, загрузит предыдущее значение в первый аргумент, который в этом случае является flagsNow.
В соответствии с http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange, «В противном случае загружает фактическое значение, сохраненное в * this, в ожидаемое (выполняет операцию загрузки).«Итак, что делает ваш цикл, так это при неудаче, compare_exchange_weak()
перезагружается flagsNow
, затем цикл повторяется, и первый оператор загружает его еще раз, сразу после загрузки compare_exchange_weak()
, Мне кажется, что ваша петля должна вместо этого вытягивать нагрузку за пределы петли. Например, newSnap()
было бы:
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do
{
if( !isNewWrite(flagsNow)) return false; // nothing new, no need to swap
} while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume));
а также flipWriter()
:
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume));
Да, это разница между memory_order_acquire и memory_order_consume, но вы не заметите этого, когда будете использовать 180 или около того в секунду. Вы можете запустить мой тест с m2 = memory_order_consume, если хотите узнать ответ в цифрах. Просто замените provider_or_consumer_Thread на что-то вроде этого:
TripleBuffer <int> tb;
void producer_or_consumer_Thread(void *arg)
{
struct Arg * a = (struct Arg *) arg;
bool succeeded = false;
int i = 0, k, kold = -1, kcur;
while (a->run)
{
while (a->wait) a->is_waiting = true; // busy wait
if (a->producer)
{
i++;
tb.update(i);
a->counter[0]++;
}
else
{
kcur = tb.snap();
if (kold != -1 && kcur != kold) a->counter[1]++;
succeeded = tb0.newSnap();
if (succeeded)
{
k = tb.readLast();
if (kold == -1)
kold = k;
else if (kold = k + 1)
kold = k;
else
succeeded = false;
}
if (succeeded) a->counter[0]++;
}
}
a->is_waiting = true;
}
Результат испытаний:
_#_ __Produced __Consumed _____Total
1 39258150 19509292 58767442
2 24598892 14730385 39329277
3 10615129 10016276 20631405
4 10617349 10026637 20643986
5 10600334 9976625 20576959
6 10624009 10069984 20693993
7 10609040 10016174 20625214
8 25864915 15136263 41001178
9 39847163 19809974 59657137
10 29981232 16139823 46121055
11 10555174 9870567 20425741
12 25975381 15171559 41146940
13 24311523 14490089 38801612
14 10512252 9686540 20198792
15 10520211 9693305 20213516
16 10523458 9720930 20244388
17 10576840 9917756 20494596
18 11048180 9528808 20576988
19 11500654 9530853 21031507
20 11264789 9746040 21010829