Я пишу простой пул потоков для своего приложения, который я тестирую на двухъядерном процессоре. Обычно это работает хорошо, но я заметил, что когда другие процессы используют более 50% процессора, мое приложение почти останавливается. Это сделало меня любопытным, поэтому я решил воспроизвести эту ситуацию и создал вспомогательное приложение, которое просто выполняет бесконечный цикл (без многопоточности), занимая 50% процессора. Во время работы вспомогательного приложения многопоточное приложение практически останавливается, как и раньше (скорость обработки падает с 300-400 задач в секунду до 5-10 задач в секунду). Но когда я изменил привязку процессов моей многопоточной программы к использованию только одного ядра (вспомогательное по-прежнему использует оба), оно начало работать, конечно, используя не более 50% оставшегося процессора. Когда я отключил многопоточность в своем приложении (все еще обрабатывая те же задачи, но без пула потоков), это работало как чудо, без какого-либо замедления от вспомогательного, который все еще работал (и так должны вести себя два приложения при работе на двух ядрах) , Но когда я включаю многопоточность, проблема возвращается.
Я сделал специальный код для тестирования этого конкретного ThreadPool:
заголовок
#ifndef THREADPOOL_H_
#define THREADPOOL_H_
typedef double FloatingPoint;
#include <queue>
#include <vector>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <thread>
using namespace std;
struct ThreadTask
{
int size;
ThreadTask(int s)
{
size = s;
}
~ThreadTask()
{
}
};
class ThreadPool
{
protected:
queue<ThreadTask*> tasks;
vector<std::thread> threads;
std::condition_variable task_ready;
std::mutex variable_mutex;
std::mutex max_mutex;
std::atomic<FloatingPoint> max;
std::atomic<int> sleeping;
std::atomic<bool> running;
int threads_count;
ThreadTask * getTask();
void runWorker();
void processTask(ThreadTask*);
bool isQueueEmpty();
bool isTaskAvailable();
void threadMethod();
void createThreads();
void waitForThreadsToSleep();
public:
ThreadPool(int);
virtual ~ThreadPool();
void addTask(int);
void start();
FloatingPoint getValue();
void reset();
void clearTasks();
};
#endif /* THREADPOOL_H_ */
и .cpp
#include "stdafx.h"#include <climits>
#include <float.h>
#include "ThreadPool.h"
ThreadPool::ThreadPool(int t)
{
running = true;
threads_count = t;
max = FLT_MIN;
sleeping = 0;
if(threads_count < 2) //one worker thread has no sense
{
threads_count = (int)thread::hardware_concurrency(); //default value
if(threads_count == 0) //in case it fails ('If this value is not computable or well defined, the function returns 0')
threads_count = 2;
}
printf("%d worker threads\n", threads_count);
}
ThreadPool::~ThreadPool()
{
running = false;
reset(); //it will make sure that all worker threads are sleeping on condition variable
task_ready.notify_all(); //let them finish in natural way
for (auto& th : threads)
th.join();
}
void ThreadPool::start()
{
createThreads();
}
FloatingPoint ThreadPool::getValue()
{
waitForThreadsToSleep();
return max;
}
void ThreadPool::createThreads()
{
threads.clear();
for(int i = 0; i < threads_count; ++i)
threads.push_back(std::thread(&ThreadPool::threadMethod, this));
}
void ThreadPool::threadMethod()
{
while(running)
runWorker();
}
void ThreadPool::runWorker()
{
ThreadTask * task = getTask();
processTask(task);
}
void ThreadPool::processTask(ThreadTask * task)
{
if(task == NULL)
return;
//do something to simulate processing
vector<int> v;
for(int i = 0; i < task->size; ++i)
v.push_back(i);
delete task;
}
void ThreadPool::addTask(int s)
{
ThreadTask * task = new ThreadTask(s);
std::lock_guard<std::mutex> lock(variable_mutex);
tasks.push(task);
task_ready.notify_one();
}
ThreadTask * ThreadPool::getTask()
{
std::unique_lock<std::mutex> lck(variable_mutex);
if(tasks.empty())
{
++sleeping;
task_ready.wait(lck);
--sleeping;
if(tasks.empty()) //in case of ThreadPool being deleted (destructor calls notify_all), or spurious notifications
return NULL; //return to main loop and repeat it
}
ThreadTask * task = tasks.front();
tasks.pop();
return task;
}
bool ThreadPool::isQueueEmpty()
{
std::lock_guard<std::mutex> lock(variable_mutex);
return tasks.empty();
}
bool ThreadPool::isTaskAvailable()
{
return !isQueueEmpty();
}
void ThreadPool::waitForThreadsToSleep()
{
while(isTaskAvailable())
std::this_thread::yield(); //wait for all tasks to be taken
while(true) //wait for all threads to finish they last tasks
{
if(sleeping == threads_count)
break;
std::this_thread::yield();
}
}
void ThreadPool::clearTasks()
{
std::unique_lock<std::mutex> lock(variable_mutex);
while(!tasks.empty()) tasks.pop();
}
void ThreadPool::reset() //don't call this when var_mutex is already locked by this thread!
{
clearTasks();
waitForThreadsToSleep();
max = FLT_MIN;
}
как это проверено:
ThreadPool tp(2);
tp.start();
int iterations = 1000;
int task_size = 1000;
for(int j = 0; j < iterations; ++j)
{
printf("\r%d left", iterations - j);
tp.reset();
for(int i = 0; i < 1000; ++i)
tp.addTask(task_size);
tp.getValue();
}return 0;
Я построил этот код с MINGW с GCC 4.8.1 (из Вот) и Visual Studio 2012 (VC11) на Win7 64, оба в конфигурации отладки.
Две программы, созданные с использованием упомянутых компиляторов, ведут себя совершенно по-разному.
a) сборка программ с помощью mingw работает намного быстрее, чем сборка на VS, когда она может занимать весь процессор (система показывает почти 100% использования процессора этим процессом, поэтому я не думаю, что mingw тайно устанавливает привязку к одному ядру). Но когда я запускаю вспомогательную программу (используя 50% ЦП), она сильно замедляется (примерно в несколько десятков раз). Загрузка ЦП в этом случае составляет около 50% -50% для основной программы и вспомогательной.
б) сборка программы с VS 2012, при использовании всего процессора, даже медленнее, чем а) с замедлением (когда я установил task_size = 1, их скорости были похожи). Но когда вспомогательная программа работает, основная программа даже занимает большую часть ЦП (загрузка составляет около 66% основной — 33% вспомогательной), и в результате замедление едва заметно.
Когда установлено использование только одного ядра, обе программы заметно ускоряются (примерно в 1,5-2 раза), и mingw one перестает быть уязвимым для конкуренции.
Ну, теперь я не знаю, что делать. Моя программа ведет себя по-разному при сборке двумя разными наборами инструментов. Является ли это недостатком в моем коде (который, предположим, является правдой) или что-то делать с компиляторами, имеющими проблемы с c ++ 11?
Задача ещё не решена.
Других решений пока нет …