Ниже приведен очень простой пример проблемы производителя / потребителя с использованием потоковой неограниченной очереди. Может кто-нибудь пролить немного света на то, почему этот код ведет себя правильно при компиляции с GNU C ++, и все же поток Consumer случайно сдается при компиляции с LLVM C ++?
#include <iostream>
#include <queue>
#include <math.h>
#include <time.h>
#include "boost/thread/condition_variable.hpp"#include "boost/thread.hpp"
//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
std::queue<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public:
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push(data);
lock.unlock();
printf("\n...just pushed, waking a thread...\n\n");
the_condition_variable.notify_one();
}
bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
}
bool try_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
if(the_queue.empty())
{
return false;
}
popped_value=the_queue.front();
the_queue.pop();
return true;
}
void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
printf("\n...buffer empty, waiting to pop...\n\n");
the_condition_variable.wait(lock);
}
popped_value=the_queue.front();
the_queue.pop();
}
int len() {
boost::mutex::scoped_lock lock(the_mutex);
return (int)the_queue.size();
}
};
//
// PRODUCER
//
class Producer {
private:
Concurrent_Queue<int> *buff;
int next;
public:
Producer(Concurrent_Queue<int> *q): buff(q) { printf("Prod up!\n"); }
~Producer() {}
void run() {
int wait_time = 0;
while(1) {
wait_time = (rand()%5)+1;
sleep(wait_time);
printf("wait_time: %d\n", wait_time);
buff->push(wait_time);
printf("buffer_len: %d\n", buff->len());
}
}
};
//
// CONSUMER
//
class Consumer {
private:
Concurrent_Queue<int> * buff;
public:
Consumer(Concurrent_Queue<int> *q): buff(q) { printf("Con up!\n"); }
~Consumer() {}
void run() {
unsigned wait_time = 0;
int latest = 0;
while(1) {
wait_time = (rand()%7)+1;
sleep(wait_time);
buff->wait_and_pop(latest);
printf("latest consumed int: %d\n", latest);
printf("cons buff_len: %d\n", buff->len());
}
}
};
//
// MAIN
//
int main(int argc, const char * argv[])
{
srand((unsigned)time(NULL));
Concurrent_Queue<int> Con_Q;
Consumer taker(&Con_Q);
// sleep(3);
Producer giver(&Con_Q);
boost::thread* prod_thread = new boost::thread(boost::bind(&Producer::run, &giver));
boost::thread* cons_thread = new boost::thread(boost::bind(&Consumer::run, &taker));
prod_thread->join();
cons_thread->join();
}
Вы должны переместить уведомления уведомлений под мьютексом.
Это задокументировано где-то на страницах руководства pthreads (7). Я постараюсь найти это.
Обновить наиболее актуальная цитата, которую я могу найти в это время:
pthread_cond_broadcast()
или жеpthread_cond_signal()
функции могут вызываться потоком независимо от того, владеет ли он в настоящее время мьютексом, вызываемым потокомpthread_cond_wait()
или жеpthread_cond_timedwait()
связать с условной переменной во время их ожидания; однако, если требуется предсказуемое поведение планирования, этот мьютекс должен быть заблокирован потоком, вызывающимpthread_cond_broadcast()
или жеpthread_cond_signal()
,
pthread_cond_broadcast()
а такжеpthread_cond_signal()
функции не будут иметь эффекта, если в cond нет потоков, заблокированных в данный момент
Я знаю, что инструменты проверки потока, такие как Helgrind, жалуются, если условие блокируется за пределами блокировки.
Примечания стороны:
Я случайно написал thread_pool с очередью задач, которая также поддерживает выключение. Вы можете попробовать, страдает ли это от симптомов на вашем Mac:
bool empty() const
на самом деле не полезно, потому что это расовый вызов. Это было бы только потокобезопасным, если бы это передало блокировку вызывающей стороне
int len() const
имеет ту же проблемуВы можете использовать предикатную версию cv::wait()
чтобы получить более чистый код:
void wait_and_pop(Data& popped_value)
{
namespace phx = boost::phoenix;
boost::unique_lock<boost::mutex> lock(the_mutex);
//if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");
the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));
popped_value = the_queue.front();
the_queue.pop();
}
Я бы предпочел использовать с ++ 11-подобные интерфейсы (unique_lock<>
над mutex::scoped_lock
) так проще переключаться.
next
— Я снял этоВот моя версия с небольшими изменениями, так что вы можете скопировать / вставить для проверки на MacOS (у меня нет Mac):
#include <iostream>
#include <queue>
#include "boost/thread.hpp"#include "boost/phoenix.hpp"
//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
typedef std::queue<Data> queue_t;
queue_t the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public:
void push(Data const& data)
{
boost::lock_guard<boost::mutex> lock(the_mutex);
the_queue.push(data);
printf("\n...just pushed, waking a thread...\n\n");
the_condition_variable.notify_one();
}
#ifdef UNUSED_CODE
bool empty() const
{
boost::lock_guard<boost::mutex> lock(the_mutex);
return the_queue.empty();
}
bool try_pop(Data& popped_value)
{
boost::lock_guard<boost::mutex> lock(the_mutex);
if(the_queue.empty())
{
return false;
}
popped_value=the_queue.front();
the_queue.pop();
return true;
}
#endif
void wait_and_pop(Data& popped_value)
{
namespace phx = boost::phoenix;
boost::unique_lock<boost::mutex> lock(the_mutex);
//if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");
the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));
popped_value = the_queue.front();
the_queue.pop();
}
std::size_t len() {
boost::lock_guard<boost::mutex> lock(the_mutex);
return the_queue.size();
}
};
//
// PRODUCER
//
class Producer {
private:
Concurrent_Queue<int> &buff;
public:
Producer(Concurrent_Queue<int> &q): buff(q) { printf("Prod up!\n"); }
~Producer() {}
void run() {
int wait_time = 0;
while(1) {
wait_time = (rand()%5)+1;
boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
printf("wait_time: %d\n", wait_time);
buff.push(wait_time);
printf("buffer_len: %lu\n", buff.len());
}
}
};
//
// CONSUMER
//
class Consumer {
private:
Concurrent_Queue<int> & buff;
public:
Consumer(Concurrent_Queue<int> &q): buff(q) { printf("Con up!\n"); }
~Consumer() {}
void run() {
unsigned wait_time = 0;
int latest = 0;
while(1) {
wait_time = (rand()%7)+1;
boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
buff.wait_and_pop(latest);
printf("latest consumed int: %d\n", latest);
printf("cons buff_len: %lu\n", buff.len());
}
}
};
//
// MAIN
//
int main()
{
srand((unsigned)time(NULL));
Concurrent_Queue<int> Con_Q;
Consumer taker(Con_Q);
//boost::this_thread::sleep_for(boost::chrono::seconds(3));
Producer giver(Con_Q);
boost::thread_group group;
group.create_thread(boost::bind(&Producer::run, &giver));
group.create_thread(boost::bind(&Consumer::run, &taker));
group.join_all();
}
Других решений пока нет …