xcode — повышает параллелизм библиотек, ведущих себя по-разному при компиляции с GNU C ++ / LLVM переполнение стека

Ниже приведен очень простой пример проблемы производителя / потребителя с использованием потоковой неограниченной очереди. Может кто-нибудь пролить немного света на то, почему этот код ведет себя правильно при компиляции с GNU C ++, и все же поток Consumer случайно сдается при компиляции с LLVM C ++?

#include <iostream>
#include <queue>
#include <math.h>
#include <time.h>
#include "boost/thread/condition_variable.hpp"#include "boost/thread.hpp"
//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
std::queue<Data> the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public:
void push(Data const& data)
{
boost::mutex::scoped_lock lock(the_mutex);
the_queue.push(data);
lock.unlock();
printf("\n...just pushed, waking a thread...\n\n");
the_condition_variable.notify_one();
}

bool empty() const
{
boost::mutex::scoped_lock lock(the_mutex);
return the_queue.empty();
}

bool try_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
if(the_queue.empty())
{
return false;
}

popped_value=the_queue.front();
the_queue.pop();
return true;
}

void wait_and_pop(Data& popped_value)
{
boost::mutex::scoped_lock lock(the_mutex);
while(the_queue.empty())
{
printf("\n...buffer empty, waiting to pop...\n\n");
the_condition_variable.wait(lock);
}

popped_value=the_queue.front();
the_queue.pop();
}

int len() {
boost::mutex::scoped_lock lock(the_mutex);
return (int)the_queue.size();
}

};

//
// PRODUCER
//
class Producer {
private:
Concurrent_Queue<int> *buff;
int next;
public:
Producer(Concurrent_Queue<int> *q): buff(q) { printf("Prod up!\n"); }
~Producer() {}
void run() {
int wait_time = 0;
while(1) {
wait_time = (rand()%5)+1;
sleep(wait_time);
printf("wait_time: %d\n", wait_time);
buff->push(wait_time);
printf("buffer_len: %d\n", buff->len());
}
}
};

//
// CONSUMER
//
class Consumer {
private:
Concurrent_Queue<int> * buff;
public:
Consumer(Concurrent_Queue<int> *q): buff(q) { printf("Con up!\n"); }
~Consumer() {}
void run() {
unsigned wait_time = 0;
int latest = 0;
while(1) {
wait_time = (rand()%7)+1;
sleep(wait_time);
buff->wait_and_pop(latest);
printf("latest consumed int: %d\n", latest);
printf("cons buff_len: %d\n", buff->len());
}
}
};

//
//  MAIN
//
int main(int argc, const char * argv[])
{
srand((unsigned)time(NULL));
Concurrent_Queue<int> Con_Q;
Consumer taker(&Con_Q);
//  sleep(3);
Producer giver(&Con_Q);
boost::thread* prod_thread = new boost::thread(boost::bind(&Producer::run, &giver));
boost::thread* cons_thread = new boost::thread(boost::bind(&Consumer::run, &taker));

prod_thread->join();
cons_thread->join();
}

1

Решение

Вы должны переместить уведомления уведомлений под мьютексом.

Это задокументировано где-то на страницах руководства pthreads (7). Я постараюсь найти это.

Обновить наиболее актуальная цитата, которую я могу найти в это время:

pthread_cond_broadcast() или же pthread_cond_signal() функции могут вызываться потоком независимо от того, владеет ли он в настоящее время мьютексом, вызываемым потоком pthread_cond_wait() или же pthread_cond_timedwait() связать с условной переменной во время их ожидания; однако, если требуется предсказуемое поведение планирования, этот мьютекс должен быть заблокирован потоком, вызывающим pthread_cond_broadcast() или же pthread_cond_signal(),

pthread_cond_broadcast() а также pthread_cond_signal() функции не будут иметь эффекта, если в cond нет потоков, заблокированных в данный момент

Я знаю, что инструменты проверки потока, такие как Helgrind, жалуются, если условие блокируется за пределами блокировки.

Примечания стороны:

  • Я случайно написал thread_pool с очередью задач, которая также поддерживает выключение. Вы можете попробовать, страдает ли это от симптомов на вашем Mac:

  • bool empty() const на самом деле не полезно, потому что это расовый вызов. Это было бы только потокобезопасным, если бы это передало блокировку вызывающей стороне

  • int len() const имеет ту же проблему
  • Вы можете использовать предикатную версию cv::wait() чтобы получить более чистый код:

    void wait_and_pop(Data& popped_value)
    {
    namespace phx = boost::phoenix;
    
    boost::unique_lock<boost::mutex> lock(the_mutex);
    
    //if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");
    
    the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));
    
    popped_value = the_queue.front();
    the_queue.pop();
    }
    
  • Я бы предпочел использовать с ++ 11-подобные интерфейсы (unique_lock<> над mutex::scoped_lock) так проще переключаться.

  • у производителя было неиспользованное поле next — Я снял это

Вот моя версия с небольшими изменениями, так что вы можете скопировать / вставить для проверки на MacOS (у меня нет Mac):

#include <iostream>
#include <queue>
#include "boost/thread.hpp"#include "boost/phoenix.hpp"
//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
typedef std::queue<Data> queue_t;
queue_t the_queue;

mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public:
void push(Data const& data)
{
boost::lock_guard<boost::mutex> lock(the_mutex);
the_queue.push(data);

printf("\n...just pushed, waking a thread...\n\n");
the_condition_variable.notify_one();
}

#ifdef UNUSED_CODE
bool empty() const
{
boost::lock_guard<boost::mutex> lock(the_mutex);
return the_queue.empty();
}

bool try_pop(Data& popped_value)
{
boost::lock_guard<boost::mutex> lock(the_mutex);
if(the_queue.empty())
{
return false;
}

popped_value=the_queue.front();
the_queue.pop();
return true;
}
#endif

void wait_and_pop(Data& popped_value)
{
namespace phx = boost::phoenix;

boost::unique_lock<boost::mutex> lock(the_mutex);

//if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");

the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));

popped_value = the_queue.front();
the_queue.pop();
}

std::size_t len() {
boost::lock_guard<boost::mutex> lock(the_mutex);
return the_queue.size();
}

};

//
// PRODUCER
//
class Producer {
private:
Concurrent_Queue<int> &buff;
public:
Producer(Concurrent_Queue<int> &q): buff(q) { printf("Prod up!\n"); }
~Producer() {}
void run() {
int wait_time = 0;
while(1) {
wait_time = (rand()%5)+1;
boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
printf("wait_time: %d\n", wait_time);
buff.push(wait_time);
printf("buffer_len: %lu\n", buff.len());
}
}
};

//
// CONSUMER
//
class Consumer {
private:
Concurrent_Queue<int> & buff;
public:
Consumer(Concurrent_Queue<int> &q): buff(q) { printf("Con up!\n"); }
~Consumer() {}
void run() {
unsigned wait_time = 0;
int latest = 0;
while(1) {
wait_time = (rand()%7)+1;
boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
buff.wait_and_pop(latest);
printf("latest consumed int: %d\n", latest);
printf("cons buff_len: %lu\n", buff.len());
}
}
};

//
//  MAIN
//
int main()
{
srand((unsigned)time(NULL));
Concurrent_Queue<int> Con_Q;

Consumer taker(Con_Q);
//boost::this_thread::sleep_for(boost::chrono::seconds(3));
Producer giver(Con_Q);

boost::thread_group group;
group.create_thread(boost::bind(&Producer::run, &giver));
group.create_thread(boost::bind(&Consumer::run, &taker));

group.join_all();
}
2

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]