Кольцевой буфер для бесперебойного производителя и блокирующего потребителя

У меня есть поток производителя, который создает объекты со скоростью, которая может (временно) быть слишком быстрой для потока потребителя. Поэтому я хотел бы FIFO, который буферизирует оставшуюся работу. Как только FIFO заполнен, производитель должен просто уйти в отставку или повторить попытку позже. Кроме того, я хотел бы иметь возможность уведомить потребителя о том, что больше не нужно выполнять работу (не помещая специальный объект в FIFO). Производитель не должен замедляться, а потребитель не должен тратить циклы процессора, поэтому у меня есть следующие требования:

  • Кольцевой кольцевой буфер фиксированного размера.
  • Производитель никогда не должен ждать: он должен иметь возможность ставить элементы в очередь как можно быстрее.
  • Потребитель должен быть в состоянии заблокировать: нет ожидания в ожидании потенциально медленного производителя. Гибридный замок был бы хорош.

Я предполагаю следующий класс C ++:

template <typename T, std::size_t N>
class spsc_circular_half_blocking {
std::array<T, N> buffer;
// bookkeeping, atomics, mutexes etc. go here
public:
bool try_push(const T&); // returns whether object could be enqueued
void notify(); // notifies consumer
bool wait_pop(T&); // returns whether object was dequeued or notification was received
};

Быть способным изменить элементы на месте было бы неплохо. Он также может использовать динамическое выделение буфера (например, размер передается в конструктор, буфер является unique_ptr).

Теперь о моих вопросах. Это вообще возможно (на x86, по крайней мере)?

  • Если да, как это будет работать? Я действительно хотел бы некоторую реализацию, желательно, но не обязательно C ++.
  • Если нет, то почему нет?

Указатели на связанный материал, даже если он не совсем соответствует моим потребностям, также будут высоко оценены.

2

Решение

Одним из решений будет использование увеличить очередь для одного производителя с вашей собственной сигнализацией и блокировкой.

Потребитель будет продолжать выталкивать элементы из очереди. Когда очередь пуста, потребитель блокируется условной переменной. Когда этот блокирующий вызов возвращается, это означает, что больше элементов теперь доступно в очереди.

Производитель помещает элементы в очередь. Если потребитель заблокирован, производитель сигнализирует потребителю о том, чтобы он проснулся и начал опустошать очередь.

Пример:

#include <boost/lockfree/spsc_queue.hpp>
#include <condition_variable>
#include <atomic>
#include <mutex>

template<class T, size_t Size>
class MyQueue {
boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Size>> queue_;
enum State : uint8_t {READY, BLOCKED, UNBLOCK};
std::atomic<State> state_{READY};
std::mutex m_;
std::condition_variable c_;

static constexpr auto R = std::memory_order_relaxed;

public:
template<class U>
bool push(U&& element) {
if(!queue_.push(std::forward<U>(element)))
return false; // The queue is full.

// Notify the consumer if it is blocked.
// Or there is 1 element in the queue but the consumer just missed it
// and is about to block.
if(BLOCKED == state_.load(R) || queue_.write_available() == Size - 1) {
// The consumer is blocked or about to block. Unblock it.
// Lock to make sure the consumer has blocked on waiting on the condition variable.
// Otherwise the condition variable notification can be lost.
m_.lock();
state_.store(UNBLOCK, R);
m_.unlock();
c_.notify_one();
}

return true;
}

T pop() {
T result;
while(!queue_.pop(result)) {
auto expected = READY;
if(state_.compare_exchange_strong(expected, BLOCKED, R, R)) {
std::unique_lock<decltype(m_)> l(m_);
while(UNBLOCK != state_.load(R))
c_.wait(l);
} // Otherwise state_ was UNBLOCK already.
state_.store(READY, R);
}
return result;
}
};

Использование:

int main () {
MyQueue<int, 10> q;
std::thread consumer([&q]() { std::cout << q.pop() << '\n'; });
q.push(1);
consumer.join();
}
3

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]