многопоточность — многопоточность с использованием повышенной синхронизации

Я хотел бы сделать многопоточность, когда поток ONE передает данные в 4-5 рабочих потоков, которые обрабатывают данные, и те, которые ВСЕ рабочие потоки завершены, я хотел бы продолжить. Я использую надстройку, чтобы понять, что у меня проблема с синхронизацией. Это означает, что в какой-то момент программа останавливается и не продолжает работать.

Я использовал OpenMP раньше, и это прекрасно работает, но я бы хотел установить приоритеты потоков индивидуально, и я не мог понять, как это сделать с OpenMP, поэтому я работал над своим собственным решением:

Я был бы очень рад, если бы некоторые могли дать подсказки, чтобы найти ошибку в этом коде, или могли бы помочь мне найти другой подход к проблеме.

Спасибо,
KmgL

#include <QCoreApplication>

#include <boost/thread.hpp>

#define N_CORE 6
#define N_POINTS 10
#define N_RUNS 100000

class Sema{

public:
Sema(int _n =0): m_count(_n),m_mut(),m_cond(){}

void set(int _n)
{
boost::unique_lock<boost::mutex> w_lock(m_mut);
m_count = -_n;
}

void wait()
{
boost::unique_lock<boost::mutex> lock(m_mut);
while (m_count < 0)
{
m_cond.wait(lock);
}
--m_count;
}
void post()
{
boost::unique_lock<boost::mutex> lock(m_mut);
++m_count;
m_cond.notify_all();
}private:
boost::condition_variable m_cond;
boost::mutex m_mut;
int m_count;

};

class Pool
{
private:
boost::thread m_WorkerThread;
boost::condition_variable m_startWork;
bool m_WorkerRun;
bool m_InnerRun;
Sema * m_sem;

std::vector<int> *m_Ep;
std::vector<int>  m_ret;

void calc()
{
unsigned int    no_pt(m_Ep->size());
std::vector<int> c_ret;
for(unsigned int i=0;i<no_pt;i++)
c_ret.push_back(100 + m_Ep->at(i));

m_ret = c_ret;
}
void run()
{
boost::mutex WaitWorker_MUTEX;
while(m_WorkerRun)
{
boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX);
m_startWork.wait(u_lock);
calc();
m_sem->post();
}

}

public:
Pool():m_WorkerRun(false),m_InnerRun(false){}
~Pool(){}
void start(Sema * _sem){
m_WorkerRun = true;
m_sem = _sem;
m_ret.clear();
m_WorkerThread = boost::thread(&Pool::run, this);}
void stop(){m_WorkerRun = false;}
void join(){m_WorkerThread.join();}

void newWork(std::vector<int> &Ep)
{
m_Ep = &Ep;
m_startWork.notify_all();
}
std::vector<int> getWork(){return m_ret;}};

int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);

Pool TP[N_CORE];

Sema _sem(0);
for(int k=0;k<N_CORE;k++)
TP[k].start(&_sem);boost::this_thread::sleep(boost::posix_time::milliseconds(10));

std::vector<int> V[N_CORE];

for(int k=0;k<N_CORE;k++)
for(int i=0;i<N_POINTS;i++)
{
V[k].push_back((k+1)*1000+i);
}

for(int j=0;j<N_RUNS;j++)
{
_sem.set(N_CORE);
for(int k=0;k<N_CORE;k++)
{
TP[k].newWork(V[k]);
}

_sem.wait();

for(int k=0;k<N_CORE;k++)
{
V[k].clear();
V[k]=TP[k].getWork();
if(V[k].size()!=N_POINTS)
std::cout<<"ERROR: "<<"V["<<k<<"].size(): "<<V[k].size()<<std::endl;

}
if((j+1)%100==0)
std::cout<<"LOOP: "<<j+1<<std::endl;
}
std::cout<<"FINISHED: "<<std::endl;

return a.exec();
}

2

Решение

У вас есть гонка между звонками Pool::newWork() а также Pool::run(),

Вы должны помнить, что сигнализация / трансляция переменной условия не является залипшим событием. Если ваш поток не ожидает переменную условия во время передачи сигнала, сигнал будет потерян. Вот что может произойти в вашей программе: ничто не мешает вашему основному потоку вызвать Pool::newWork() на каждом из ваших объектов пула, прежде чем они успеют позвонить wait() на вашей переменной состояния.

Чтобы решить это, вам нужно переместить boost::mutex WaitWorker_MUTEX как член класса вместо того, чтобы быть локальной переменной. Pool::newWork() необходимо получить этот мьютекс перед обновлением:

boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX);
m_Ep = &Ep;
m_startWork.notify(); // no need to use notify_all()

Поскольку вы используете условную переменную в Pool::run()Вам нужно справиться с ложным пробуждением. Я бы порекомендовал установить m_Ep в NULL, когда вы создаете объект и каждый раз, когда вы закончите с рабочим элементом:

boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX);
while (1) {
while (m_Ep == NULL && m_workerRun) {
m_startWork.wait(u_lock);
}
if (!m_workerRun) {
return;
}
calc();
m_sem->post();
m_Ep = NULL;
}

stop () нужно будет захватить мьютекс и уведомить ():

boost::unique_lock<boost::mutex> u_lock(WaitWorker_MUTEX);
m_workRun = false;
m_startWork.notify();

Эти изменения должны привести к 10-минутному сну, который вам не нужен. Вы, кажется, не звоните Pool::stop() или же Pool::join(), Вы должны изменить свой код, чтобы позвонить им.

Вы также получите лучшую производительность, работая над m_ret в Pool::calc() чем копировать результат в конце. Вы также делаете копии, когда возвращаете работу. Вы могли бы хотеть Pool::getWork() вернуть констант в m_ret,

Я не запускал этот код, поэтому могут возникнуть другие проблемы. Это должно помочь вам двигаться

Из вашего кода вы, вероятно, задаетесь вопросом, почему условные переменные должны идти рука об руку с мьютексом (потому что вы объявляете один локальный мьютекс в Pool::run()). Надеюсь, мое исправление прояснит ситуацию.

0

Другие решения

Это может быть сделано с Повысить фьючерсы. Тогда начните темы ждать всех из них, чтобы закончить. Никакой другой синхронизации не требуется.

0

По вопросам рекламы [email protected]