C ++ 11 потокобезопасная очередь

Проект, над которым я работаю, использует несколько потоков для работы с коллекцией файлов. Каждый поток может добавлять файлы в список файлов для обработки, поэтому я собрал (как я думал, очередь) потокобезопасную очередь. Соответствующие части следуют:

// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
std::lock_guard<std::mutex> lock(qMutex);
q.push(std::move(filename));

// Notify anyone waiting for additional files that more have arrived
populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
if (q.empty()) {
if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
std::string ret = q.front();
q.pop();
return ret;
}
else {
return std::string();
}
}
else {
std::string ret = q.front();
q.pop();
return ret;
}
}

Тем не менее, я иногда segfaulting внутри if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { } block, а проверка в gdb указывает на то, что ошибки происходят, потому что очередь пуста. Как это возможно? Это было мое понимание того, что wait_for только возвращается cv_status::no_timeout когда он был уведомлен, и это должно произойти только после FileQueue::enqueue только что поместил новый элемент в очередь.

51

Решение

В соответствии со стандартом condition_variables разрешено пробуждаться, даже если событие не произошло. В случае ложного пробуждения он вернется cv_status::no_timeout (так как он проснулся вместо того, чтобы отключить время), даже если он не был уведомлен. Правильное решение для этого, конечно, чтобы проверить, было ли пробуждение на самом деле законным, прежде чем продолжить.

Подробности указаны в стандарте §30.5.1 [thread.condition.condvar]:

—Функция будет разблокирована, когда сигнализируется вызовом notify_one (), вызовом notify_all (), истечением абсолютного тайм-аута (30.2.4), указанного в abs_time, или ложно.

Возвращает: cv_status :: timeout, если истекло абсолютное время ожидания (30.2.4), указанное в параметре abs_time, в противном случае cv_status :: no_timeout.

26

Другие решения

Просто глядя на это, когда вы проверяете переменную условия, лучше всего использовать цикл while (так что, если он просыпается и все еще не является недействительным, вы проверяете снова). Я только что написал шаблон для асинхронной очереди, надеюсь, это поможет.

#ifndef SAFE_QUEUE
#define SAFE_QUEUE

#include <queue>
#include <mutex>
#include <condition_variable>

// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
SafeQueue(void)
: q()
, m()
, c()
{}

~SafeQueue(void)
{}

// Add an element to the queue.
void enqueue(T t)
{
std::lock_guard<std::mutex> lock(m);
q.push(t);
c.notify_one();
}

// Get the "front"-element.
// If the queue is empty, wait till a element is avaiable.
T dequeue(void)
{
std::unique_lock<std::mutex> lock(m);
while(q.empty())
{
// release lock as long as the wait and reaquire it afterwards.
c.wait(lock);
}
T val = q.front();
q.pop();
return val;
}

private:
std::queue<T> q;
mutable std::mutex m;
std::condition_variable c;
};
#endif
40

Это, вероятно, как вы должны это сделать:

void push(std::string&& filename)
{
{
std::lock_guard<std::mutex> lock(qMutex);

q.push(std::move(filename));
}

populatedNotifier.notify_one();
}

bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> lock(qMutex);

if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
return false;

filename = std::move(q.front());
q.pop();

return true;
}
11

Добавляя к принятому ответу, я бы сказал, что реализация правильной очереди для нескольких производителей / нескольких потребителей затруднительна (хотя с C ++ 11 проще)

Я бы предложил вам попробовать (очень хорошо) заблокировать бесплатный буст библиотека, структура «очередь» будет делать то, что вы хотите, с гарантиями без ожидания / без блокировки и без необходимости компилятора C ++ 11.

Я добавляю этот ответ сейчас, потому что библиотека без блокировки является довольно новой для повышения (с версии 1.53, я считаю)

9

Я бы переписал вашу функцию dequeue как:

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
while(q.empty()) {
if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout )
return std::string();
}
std::string ret = q.front();
q.pop();
return ret;
}

Он короче и не имеет дублирующего кода, как у вас. Только выпуск может подождать, что тайм-аут. Чтобы предотвратить это, вам нужно будет запомнить время начала до цикла, проверьте время ожидания и скорректируйте время ожидания соответственно. Или укажите абсолютное время при условии ожидания.

5

Для этого случая также есть решение GLib, я еще не пробовал, но считаю, что это хорошее решение.
https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new

1

BlockingCollection является потокобезопасным классом коллекции C ++ 11, который обеспечивает поддержку очереди, стека и приоритетных контейнеров. Он обрабатывает сценарий «пустой» очереди, который вы описали. А также «полная» очередь.

1

Вам может понравиться lfqueue, https://github.com/Taymindis/lfqueue.
Это параллельная очередь без блокировки. В настоящее время я использую его для использования очереди из нескольких входящих вызовов и работает как чудо.

0
По вопросам рекламы [email protected]