У меня есть поток, который читает из сокета и генерирует данные. После каждой операции поток проверяет std::atomic_bool
флаг, чтобы увидеть, если он должен выйти рано.
Чтобы отменить операцию, я установил флаг отмены на true
затем позвоните join()
на объекте рабочего потока.
Код потока и функция отмены выглядит примерно так:
std::thread work_thread;
std::atomic_bool cancel_requested{false};
void thread_func()
{
while(! cancel_requested.load(std::memory_order_relaxed))
process_next_element();
}
void cancel()
{
cancel_requested.store(true, std::memory_order_relaxed);
work_thread.join();
}
Является std::memory_order_relaxed
правильный порядок памяти для этого использования атомарной переменной?
Пока нет никакой зависимости между cancel_requested
флаг и что-нибудь еще, ты должен быть в безопасности.
Код, как показано, выглядит хорошо, при условии, ты используешь cancel_requested
только для ускорения завершения работы, но также предусматривают упорядоченное завершение работы, например запись в очереди в очереди (и, конечно же, сама очередь синхронизируется).
Что означает, что ваш код на самом деле выглядит так:
std::thread work_thread;
std::atomic_bool cancel_requested{false};
std::mutex work_queue_mutex;
std::condition_variable work_queue_filled_cond;
std::queue work_queue;
void thread_func()
{
while(! cancel_requested.load(std::memory_order_relaxed))
{
std::unique_lock<std::mutex> lock(work_queue_mutex);
work_queue_filled_cond.wait(lock, []{ return !work_queue.empty(); });
auto element = work_queue.front();
work_queue.pop();
lock.unlock();
if (element == exit_sentinel)
break;
process_next_element(element);
}
}
void cancel()
{
std::unique_lock<std::mutex> lock(work_queue_mutex);
work_queue.push_back(exit_sentinel);
work_queue_filled_cond.notify_one();
lock.unlock();
cancel_requested.store(true, std::memory_order_relaxed);
work_thread.join();
}
И если мы так далеко, то cancel_requested
с таким же успехом может стать обычной переменной, код даже станет проще.
std::thread work_thread;
bool cancel_requested = false;
std::mutex work_queue_mutex;
std::condition_variable work_queue_filled_cond;
std::queue work_queue;
void thread_func()
{
while(true)
{
std::unique_lock<std::mutex> lock(work_queue_mutex);
work_queue_filled_cond.wait(lock, []{ return cancel_requested || !work_queue.empty(); });
if (cancel_requested)
break;
auto element = work_queue.front();
work_queue.pop();
lock.unlock();
process_next_element(element);
}
}
void cancel()
{
std::unique_lock<std::mutex> lock(work_queue_mutex);
cancel_requested = true;
work_queue_filled_cond.notify_one();
lock.unlock();
work_thread.join();
}
memory_order_relaxed
обычно трудно рассуждать, потому что это стирает общее представление о последовательном выполнении кода. Таким образом, полезность этого очень, очень ограничена, как объясняет Херб в своем разговор об атомном оружии.
Заметка std::thread::join()
сам по себе действует как барьер памяти между двумя потоками.
Правильность этого кода зависит от многих вещей. Больше всего это зависит от того, что именно вы подразумеваете под «правильным». Насколько я могу судить, фрагменты кода, которые вы показываете, не вызывают неопределенного поведения (при условии, что ваш work_thread
а также cancel_requested
фактически не инициализируются в порядке, предложенном вашим фрагментом выше, так как тогда у вас будет поток, потенциально считывающий неинициализированное значение атома). Если все, что вам нужно сделать, это изменить значение этого флага и заставить поток в конечном итоге увидеть новое значение в какой-то момент независимо от того, что еще может происходить, тогда std::memory_order_relaxed
достаточно.
Тем не менее, я вижу, что ваш рабочий поток вызывает process_next_element()
функция. Это говорит о том, что существует некоторый механизм, посредством которого рабочий поток получает элементы для обработки. Я не вижу выхода из потока, когда все элементы были обработаны. Что значит process_next_element()
делать, когда нет следующего доступного элемента сразу? Он просто сразу возвращается? В этом случае вы ждете больше ввода или отмены, что сработает, но, вероятно, не идеально. Или делает process_next_element()
внутренне вызвать некоторую функцию, которая блокирует, пока элемент не станет доступным !? Если это так, то для отмены потока потребуется сначала установить флаг отмены, а затем сделать все необходимое, чтобы убедиться, что следующий вызов элемента ваш поток потенциально блокирует при возврате. В этом случае потенциально важно, чтобы поток никогда не видел флаг отмены после возврата блокирующего вызова. В противном случае вы могли бы получить возврат вызова, вернуться в цикл, все еще прочитать старый флаг отмены и затем выполнить вызов process_next_element()
снова. Если process_next_element()
гарантированно просто вернется снова, тогда ты в порядке. Если это не так, у вас тупик. Поэтому я считаю, что технически это зависит от того, что именно process_next_element()
делает. Один мог представьте себе реализацию process_next_element()
где вам может понадобиться больше, чем просто порядок в памяти. Однако, если у вас уже есть механизм извлечения новых элементов для обработки, зачем вообще использовать отдельный флаг отмены? Вы можете просто обработать отмену с помощью того же механизма, например, заставить его вернуть следующий элемент со специальным значением или вообще не возвращать элемент, чтобы сигнализировать об отмене обработки и заставить поток возвращаться вместо использования отдельного флага…