У меня есть поток производителя, который создает объекты со скоростью, которая может (временно) быть слишком быстрой для потока потребителя. Поэтому я хотел бы FIFO, который буферизирует оставшуюся работу. Как только FIFO заполнен, производитель должен просто уйти в отставку или повторить попытку позже. Кроме того, я хотел бы иметь возможность уведомить потребителя о том, что больше не нужно выполнять работу (не помещая специальный объект в FIFO). Производитель не должен замедляться, а потребитель не должен тратить циклы процессора, поэтому у меня есть следующие требования:
Я предполагаю следующий класс C ++:
template <typename T, std::size_t N>
class spsc_circular_half_blocking {
std::array<T, N> buffer;
// bookkeeping, atomics, mutexes etc. go here
public:
bool try_push(const T&); // returns whether object could be enqueued
void notify(); // notifies consumer
bool wait_pop(T&); // returns whether object was dequeued or notification was received
};
Быть способным изменить элементы на месте было бы неплохо. Он также может использовать динамическое выделение буфера (например, размер передается в конструктор, буфер является unique_ptr
).
Теперь о моих вопросах. Это вообще возможно (на x86, по крайней мере)?
Указатели на связанный материал, даже если он не совсем соответствует моим потребностям, также будут высоко оценены.
Одним из решений будет использование увеличить очередь для одного производителя с вашей собственной сигнализацией и блокировкой.
Потребитель будет продолжать выталкивать элементы из очереди. Когда очередь пуста, потребитель блокируется условной переменной. Когда этот блокирующий вызов возвращается, это означает, что больше элементов теперь доступно в очереди.
Производитель помещает элементы в очередь. Если потребитель заблокирован, производитель сигнализирует потребителю о том, чтобы он проснулся и начал опустошать очередь.
Пример:
#include <boost/lockfree/spsc_queue.hpp>
#include <condition_variable>
#include <atomic>
#include <mutex>
template<class T, size_t Size>
class MyQueue {
boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Size>> queue_;
enum State : uint8_t {READY, BLOCKED, UNBLOCK};
std::atomic<State> state_{READY};
std::mutex m_;
std::condition_variable c_;
static constexpr auto R = std::memory_order_relaxed;
public:
template<class U>
bool push(U&& element) {
if(!queue_.push(std::forward<U>(element)))
return false; // The queue is full.
// Notify the consumer if it is blocked.
// Or there is 1 element in the queue but the consumer just missed it
// and is about to block.
if(BLOCKED == state_.load(R) || queue_.write_available() == Size - 1) {
// The consumer is blocked or about to block. Unblock it.
// Lock to make sure the consumer has blocked on waiting on the condition variable.
// Otherwise the condition variable notification can be lost.
m_.lock();
state_.store(UNBLOCK, R);
m_.unlock();
c_.notify_one();
}
return true;
}
T pop() {
T result;
while(!queue_.pop(result)) {
auto expected = READY;
if(state_.compare_exchange_strong(expected, BLOCKED, R, R)) {
std::unique_lock<decltype(m_)> l(m_);
while(UNBLOCK != state_.load(R))
c_.wait(l);
} // Otherwise state_ was UNBLOCK already.
state_.store(READY, R);
}
return result;
}
};
Использование:
int main () {
MyQueue<int, 10> q;
std::thread consumer([&q]() { std::cout << q.pop() << '\n'; });
q.push(1);
consumer.join();
}
Других решений пока нет …