параллелизм — один производитель, структура данных с одним потребителем и двойной буфер в переполнении стека

У меня есть приложение на $ work, где мне нужно перемещаться между двумя потоками в реальном времени, которые запланированы на разных частотах. (Фактическое планирование вне моего контроля.) Приложение работает в режиме реального времени (один из потоков должен управлять аппаратным интерфейсом), поэтому передача данных между потоками должна быть без блокировок и без ожидания для насколько это возможно

Важно отметить, что должен быть передан только один блок данных: поскольку два потока работают с разной скоростью, будут случаи, когда две итерации более быстрого потока завершаются между двумя пробуждениями более медленного потока; в этом случае можно перезаписать данные в буфере записи, чтобы более медленный поток получал только самые последние данные.

Другими словами, вместо очереди достаточно решения с двойной буферизацией. Два буфера выделяются во время инициализации, и потоки чтения и записи могут вызывать методы класса, чтобы получить указатели на один из этих буферов.

Код C ++:

#include <mutex>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() {
m_write_busy = false;
m_read_idx = m_write_idx = 0;
}

~ProducerConsumerDoubleBuffer() { }

// The writer thread using this class must call
// start_writing() at the start of its iteration
// before doing anything else to get the pointer
// to the current write buffer.
T * start_writing(void) {
std::lock_guard<std::mutex> lock(m_mutex);

m_write_busy = true;
m_write_idx = 1 - m_read_idx;

return &m_buf[m_write_idx];
}
// The writer thread must call end_writing()
// as the last thing it does
// to release the write busy flag.
void end_writing(void) {
std::lock_guard<std::mutex> lock(m_mutex);

m_write_busy = false;
}

// The reader thread must call start_reading()
// at the start of its iteration to get the pointer
// to the current read buffer.
// If the write thread is not active at this time,
// the read buffer pointer will be set to the
// (previous) write buffer - so the reader gets the latest data.
// If the write buffer is busy, the read pointer is not changed.
// In this case the read buffer may contain stale data,
// it is up to the user to deal with this case.
T * start_reading(void) {
std::lock_guard<std::mutex> lock(m_mutex);

if (!m_write_busy) {
m_read_idx = m_write_idx;
}

return &m_buf[m_read_idx];
}
// The reader thread must call end_reading()
// at the end of its iteration.
void end_reading(void) {
std::lock_guard<std::mutex> lock(m_mutex);

m_read_idx = m_write_idx;
}

private:
T m_buf[2];
bool m_write_busy;
unsigned int m_read_idx, m_write_idx;
std::mutex m_mutex;
};

Чтобы избежать устаревших данных в потоке читателя, структура полезных данных является версионной.
Для облегчения двунаправленной передачи данных между потоками используются два экземпляра вышеуказанного чудовища в противоположных направлениях.

Вопросы:

  • Эта схема безопасна? Если он сломан, где?
  • Можно ли это сделать без мьютекса? Возможно только с барьерами памяти или инструкциями CAS?
  • Можно ли сделать это лучше?

25

Решение

Очень интересная проблема! Намного хитрее, чем я думал сначала 🙂
Мне нравятся решения без блокировок, поэтому я постарался найти решение ниже.

Есть много способов думать об этой системе. Вы можете моделировать
это как кольцевой буфер / очередь фиксированного размера (с двумя записями), но затем
вы теряете возможность обновлять следующее доступное значение для потребления,
так как вы не знаете, начал ли потребитель читать последние опубликованные
значение или все еще (потенциально) читает предыдущий. Итак, дополнительное состояние
необходимо за пределами стандартного кольцевого буфера, чтобы достичь более оптимального
решение.

Сначала обратите внимание, что всегда есть ячейка, в которую производитель может безопасно написать
в любой данный момент времени; если одна ячейка читается потребителем,
другие могут быть написаны. Давайте назовем ячейку, которую можно безопасно записать в
«активная» ячейка (ячейка, из которой потенциально можно прочитать, является любой ячейкой не
активный). Активная ячейка может быть переключена, только если другая ячейка не
в настоящее время читается из.

В отличие от активной ячейки, в которую всегда можно записать, неактивная ячейка может
быть прочитанным, только если оно содержит значение; как только это значение будет использовано, оно исчезнет.
(Это означает, что в случае агрессивного продюсера избегают livelock;
точка, потребитель опустошит ячейку и перестанет прикасаться к ячейкам. однажды
что происходит, производитель может определенно опубликовать значение, тогда как до этого момента,
он может только опубликовать значение (изменить активную ячейку), если потребитель не находится в
середина чтения.)

Если там является значение, которое готово к употреблению, только потребитель может изменить это
факт (для неактивной ячейки, во всяком случае); последующие производства могут изменить, какая ячейка
является активным и опубликованным значением, но значение всегда будет готово для чтения до
это потребляется.

Как только продюсер завершит запись в активную ячейку, он может «опубликовать» это значение
изменение какой ячейки является активной (замена индекса), при условии, что потребитель
не в середине чтения другой клетки. Если потребитель является во время
читая другую ячейку, обмен не может произойти, но в этом случае потребитель может поменяться
после его закончил чтение значения, при условии, что производитель не находится в середине
напишите (и если это так, продюсер поменяется, как только это будет сделано).
На самом деле, в общем, потребитель всегда может поменяться местами после того, как прочитает (если это единственный
доступ к системе), потому что ложные обмены потребителем являются доброкачественными: если есть
что-то в другой ячейке, то перестановка приведет к тому, что будет прочитано следующее, и если
нет, обмен не влияет ни на что.

Итак, нам нужна общая переменная для отслеживания активной ячейки, а также нам нужна
как для производителя, так и для потребителя, чтобы указать, если они находятся в середине
операция. Мы можем хранить эти три части состояния в одной атомарной переменной в порядке
иметь возможность воздействовать на них все сразу (атомарно).
Нам также нужен способ проверить, есть ли в
неактивная ячейка в первую очередь, и для обоих потоков, чтобы изменить это состояние
по мере необходимости. Я попробовал несколько других подходов, но в итоге самый простой был
включить эту информацию в другую атомную переменную тоже. Это многое делает
проще рассуждать, так как все изменения состояния в системе, таким образом, являются атомарными.

Я придумал реализацию без ожидания (без блокировки, и все операции завершены
в ограниченном количестве инструкций).

Кодовое время!

#include <atomic>
#include <cstdint>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() : m_state(0) { }
~ProducerConsumerDoubleBuffer() { }

// Never returns nullptr
T* start_writing() {
// Increment active users; once we do this, no one
// can swap the active cell on us until we're done
auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
return &m_buf[state & 1];
}

void end_writing() {
// We want to swap the active cell, but only if we were the last
// ones concurrently accessing the data (otherwise the consumer
// will do it for us when *it's* done accessing the data)

auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
if ((state & 0x6) == 0) {
// The consumer wasn't in the middle of a read, we should
// swap (unless the consumer has since started a read or
// already swapped or read a value and is about to swap).
// If we swap, we also want to clear the full flag on what
// will become the active cell, otherwise the consumer could
// eventually read two values out of order (it reads a new
// value, then swaps and reads the old value while the
// producer is idle).
m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
}
}

// Returns nullptr if there appears to be no more data to read yet
T* start_reading() {
m_readState = m_state.load(std::memory_order_relaxed);
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// Nothing to read here!
return nullptr;
}

// At this point, there is guaranteed to be something to
// read, because the full flag is never turned off by the
// producer thread once it's on; the only thing that could
// happen is that the active cell changes, but that can
// only happen after the producer wrote a value into it,
// in which case there's still a value to read, just in a
// different cell.

m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;

// Now that we've incremented the user count, nobody can swap until
// we decrement it
return &m_buf[(m_readState & 1) ^ 1];
}

void end_reading() {
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// There was nothing to read; shame to repeat this
// check, but if these functions are inlined it might
// not matter. Otherwise the API could be changed.
// Or just don't call this method if start_reading()
// returns nullptr -- then you could also get rid
// of m_readState.
return;
}

// Alright, at this point the active cell cannot change on
// us, but the active cell's flag could change and the user
// count could change. We want to release our user count
// and remove the flag on the value we read.

auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
// Oi, we were the last ones accessing the data when we released our cell.
// That means we should swap, but only if the producer isn't in the middle
// of producing something, and hasn't already swapped, and hasn't already
// set the flag we just reset (which would mean they swapped an even number
// of times).  Note that we don't bother swapping if there's nothing to read
// in the other cell.
m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
}
}

private:
T m_buf[2];

// The bottom (lowest) bit will be the active cell (the one for writing).
// The active cell can only be switched if there's at most one concurrent
// user. The next two bits of state will be the number of concurrent users.
// The fourth bit indicates if there's a value available for reading
// in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
std::atomic<std::uint32_t> m_state;

std::uint32_t m_readState;
};

Обратите внимание, что семантика такова, что потребитель никогда не сможет прочитать данное значение дважды,
и значение, которое он читает, всегда новее, чем последнее прочитанное значение. Это также довольно
Эффективное использование памяти (два буфера, как ваше оригинальное решение). Я избегал петель CAS
потому что они, как правило, менее эффективны, чем отдельная атомная операция в споре.

Если вы решите использовать приведенный выше код, я предлагаю вам сначала написать для него несколько комплексных (многопоточных) модульных тестов.
И правильные ориентиры. Я проверял это, но только с трудом. Дайте мне знать, если вы найдете какие-либо ошибки 🙂

Мой юнит тест:

ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_writing();
if (item != nullptr) {      // Always true
*item = i;
}
buf.end_writing();
}
});
std::thread consumer([&]() {
int prev = -1;
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_reading();
if (item != nullptr) {
assert(*item > prev);
prev = *item;
}
buf.end_reading();
}
});
producer.join();
consumer.join();

Что касается вашей первоначальной реализации, я только посмотрел на нее (гораздо интереснее
разрабатывать новые вещи, хе), но ответ david.pfx, кажется, касается этой части вашего вопроса.

10

Другие решения

Да, я думаю, что это сломано.

Если считыватель последовательно запускает / завершает / запускает, он обновляет свой индекс чтения до индекса записи и потенциально читает данные из индекса записи, даже если запись занята.

По сути, проблема заключается в том, что записывающий не знает, какой буфер будет использовать читатель, поэтому писатель должен убедиться, что оба буфера действительны в любое время. Это не может быть сделано, если для записи данных в буфер потребуется какое-то время (если только я неправильно понял некоторую логику, которая здесь не показана).

Да, я думаю, что это можно сделать без блокировок, используя CAS или эквивалентную логику. Я не собираюсь пытаться выразить алгоритм в этом пространстве. Я уверен, что это существует, но не то, чтобы я мог правильно написать это с первого раза. И небольшой поиск в сети выявил некоторых вероятных кандидатов. IPC без ожидания с использованием CAS представляется довольно интересной темой и предметом некоторых исследований.


После некоторых дальнейших размышлений алгоритм выглядит следующим образом. Тебе нужно:

  • 3 буфера: один для писателя, один для читателя и один дополнительный.
    Буферы упорядочены: они образуют кольцо (но см. Примечание).
  • Состояние для каждого буфера: свободный, полный, запись, чтение.
  • Функция, которая может проверять состояние буфера и условно изменять состояние на другое значение в одной атомарной операции. Я буду использовать CSET для этого.

Автор:

Find the first buffer that is FREE or FULL
Fail: assert (should never fail, reader can only use one buffer)
CSET buffer to WRITING
Write into the buffer
CSET buffer to FULL

Читатель:

Find first buffer that is FULL
Fail: wait (writer may be slow)
CSET buffer to READING
Read and consume buffer
CSET buffer to FREE

Примечание. Этот алгоритм не гарантирует, что буферы обрабатываются строго в порядке поступления, и никакие простые изменения не позволят это сделать. Если это важно, алгоритм должен быть дополнен порядковым номером в буфере, установленным писателем, чтобы читатель мог выбрать самый последний буфер.

Я оставляю код как деталь реализации.


Функция CSET нетривиальна. Он должен атомарно проверить, что определенное расположение разделяемой памяти равно ожидаемому значению, и, если это так, изменить его на новое значение. Он возвращает true, если он успешно сделал изменение, и false в противном случае. Реализация должна избегать условий гонки, если два потока обращаются к одному и тому же местоположению в одно и то же время (и, возможно, на разных процессорах).

Стандартная библиотека атомарных операций C ++ содержит набор функций atomic_compare_exchange, которые должны служить цели, если она доступна.

4

Здесь версия с использованием InterlockedExchangePointer() и СЛИСТЫ.

Это решение не поддерживает повторное чтение последнего буфера. Но если это необходимо, это можно сделать на стороне читателя с помощью копии и if( NULL == doubleBuffer.beginReader(...) ) { use backup copy ... },
Это сделано не потому, что его сложно добавить, а потому, что оно не очень реалистично. Представьте, что ваше последнее известное значение становится старше и старше — секунды, дни, недели. Вряд ли приложение все равно захочет его использовать. Таким образом, добавление функциональности перечитывания в код с двойным буфером лишает приложение гибкости.

Двойной буфер имеет 1 член указателя чтения. Всякий раз, когда beginRead () вызывается, это значение возвращается и атомарно заменяется на NULL. Думайте об этом как «Читатель берет буфер».
С endRead(), считыватель возвращает буфер, и он добавляется в SLIST, содержащий доступные буферы для операций записи.

Первоначально оба буфера добавляются в SLIST, указатель чтения равен NULL.

beginWrite() выскакивает следующий доступный буфер из SLIST. И это значение не может быть NULL, из-за способа endWrite() реализовано.

Не в последнюю очередь, endWrite() атомарно меняет указатель чтения на возвращенный, только что записанный буфер, и, если указатель чтения не был НЕДЕЙСТВИТЕЛЕН, он помещает его в SLIST.

Таким образом, даже если сторона чтения никогда не читает, сторона записи никогда не исчерпывает буферы. Когда читатель читает, он получает последнее известное значение (один раз!).

От чего эта реализация не безопасна, так это если есть несколько одновременно работающих читателей или писателей. Но это не было целью в первую очередь.

С другой стороны, буферы должны быть структурами с некоторым элементом SLIST_HEADER сверху.

Вот код, но имейте в виду, что я не виноват, если ваш марсоход приземлится на Венере!

const size_t MAX_DATA_SIZE = 512;
typedef
//__declspec(align(MEMORY_ALLOCATION_ALIGNMENT))
struct DataItem_tag
{
SLIST_ENTRY listNode;
uint8_t data[MAX_DATA_SIZE];
size_t length;
} DataItem_t;

class CDoubleBuffer
{
SLIST_HEADER m_writePointers;
DataItem_t m_buffers[2];
volatile DataItem_t *m_readPointer;

public:
CDoubleBuffer()
: m_writePointers()
, m_buffers()
, m_readPointer(NULL)
{
InitializeSListHead(&m_writePointers);
InterlockedPushEntrySList(&m_writePointers, &m_buffers[0].listNode);
InterlockedPushEntrySList(&m_writePointers, &m_buffers[1].listNode);
}
DataItem_t *beginRead()
{
DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, NULL));
return result;
}
void endRead(DataItem_t *dataItem)
{
if (NULL != dataItem)
{
InterlockedPushEntrySList(&m_writePointers, &dataItem->listNode);
}
}
DataItem_t *beginWrite()
{
DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedPopEntrySList(&m_writePointers));
return result;
}
void endWrite(DataItem_t *dataItem)
{
DataItem_t *oldReadPointer = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, dataItem));
if (NULL != oldReadPointer)
{
InterlockedPushEntrySList(&m_writePointers, &oldReadPointer->listNode);
}
}
};

И вот тестовый код для него. (Как для вышеуказанного, так и для тестового кода вам нужно <windows.h> а также <assert.h>.)

CDoubleBuffer doubleBuffer;

DataItem_t *readValue;
DataItem_t *writeValue;

// nothing to read yet. Make sure NULL is returned.
assert(NULL == doubleBuffer.beginRead());
doubleBuffer.endRead(NULL); // we got nothing, we return nothing.

// First write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 0;
doubleBuffer.endWrite(writeValue);

// Second write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 1;
doubleBuffer.endWrite(writeValue);

// Third write without read - works because it reuses the old buffer for the new write.
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 2;
doubleBuffer.endWrite(writeValue);

readValue = doubleBuffer.beginRead();
assert(NULL != readValue); // NULL would obviously be a terrible bug.
assert(2 == readValue->length); // We got the latest and greatest?
doubleBuffer.endRead(readValue);

readValue = doubleBuffer.beginRead();
assert(NULL == readValue); // We expect NULL here. Re-reading is not a feature of this implementation!
doubleBuffer.endRead(readValue);
0
По вопросам рекламы [email protected]