Мне нужно распараллелить некоторые задачи в программе на C ++, и я совершенно новичок в параллельном программировании. До сих пор я добился определенного прогресса в поиске в Интернете, но сейчас немного застрял. Я хотел бы повторно использовать некоторые темы в цикле, но явно не знаю, как сделать то, что я пытаюсь.
Я получаю данные с двух карт АЦП на компьютере (полученные параллельно), затем мне нужно выполнить некоторые операции над собранными данными (обработанными параллельно) при сборе следующей партии данных. Вот некоторый псевдокод для иллюстрации
//Acquire some data, wait for all the data to be acquired before proceeding
std::thread acq1(AcquireData, boardHandle1, memoryAddress1a);
std::thread acq2(AcquireData, boardHandle2, memoryAddress2a);
acq1.join();
acq2.join();
while(user doesn't interrupt)
{
//Process first batch of data while acquiring new data
std::thread proc1(ProcessData,memoryAddress1a);
std::thread proc2(ProcessData,memoryAddress2a);
acq1(AcquireData, boardHandle1, memoryAddress1b);
acq2(AcquireData, boardHandle2, memoryAddress2b);
acq1.join();
acq2.join();
proc1.join();
proc2.join();
/*Proceed in this manner, alternating which memory address
is written to and being processed until the user interrupts the program.*/
}
Это главная суть этого. Следующее выполнение цикла записывает адреса памяти «a» во время обработки данных «b» и продолжает чередоваться (я могу получить код для этого, просто извлек его, чтобы не загромождать проблему).
В любом случае, проблема (как я уверен, некоторые люди уже могут сказать) заключается в том, что во второй раз, когда я пытаюсь использовать acq1 и acq2, компилятор (VS2012) говорит: «IntelliSense: вызов объекта типа класса без соответствующего оператора ( ) или преобразование функций в указатель на тип функции «. Аналогично, если я помещаю std :: thread перед acq1 и acq2 снова, он говорит: «ошибка C2374:« acq1 »: переопределение; множественная инициализация».
Итак, вопрос в том, могу ли я переназначить потоки на новую задачу, когда они завершат свою предыдущую задачу? Я всегда жду окончания предыдущего использования потока, прежде чем вызывать его снова, но я не знаю, как переназначить поток, и, поскольку он находится в цикле, я не могу каждый раз создавать новый поток (или если я может, это кажется расточительным и ненужным, но я могу ошибаться).
заранее спасибо
Самый простой способ — использовать ожидаемую очередь std::function
объекты. Как это:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <chrono>class ThreadPool
{
public:
ThreadPool (int threads) : shutdown_ (false)
{
// Create the specified number of threads
threads_.reserve (threads);
for (int i = 0; i < threads; ++i)
threads_.emplace_back (std::bind (&ThreadPool::threadEntry, this, i));
}
~ThreadPool ()
{
{
// Unblock any threads and tell them to stop
std::unique_lock <std::mutex> l (lock_);
shutdown_ = true;
condVar_.notify_all();
}
// Wait for all threads to stop
std::cerr << "Joining threads" << std::endl;
for (auto& thread : threads_)
thread.join();
}
void doJob (std::function <void (void)> func)
{
// Place a job on the queu and unblock a thread
std::unique_lock <std::mutex> l (lock_);
jobs_.emplace (std::move (func));
condVar_.notify_one();
}
protected:
void threadEntry (int i)
{
std::function <void (void)> job;
while (1)
{
{
std::unique_lock <std::mutex> l (lock_);
while (! shutdown_ && jobs_.empty())
condVar_.wait (l);
if (jobs_.empty ())
{
// No jobs to do and we are shutting down
std::cerr << "Thread " << i << " terminates" << std::endl;
return;
}
std::cerr << "Thread " << i << " does a job" << std::endl;
job = std::move (jobs_.front ());
jobs_.pop();
}
// Do the job without holding any locks
job ();
}
}
std::mutex lock_;
std::condition_variable condVar_;
bool shutdown_;
std::queue <std::function <void (void)>> jobs_;
std::vector <std::thread> threads_;
};
void silly (int n)
{
// A silly job for demonstration purposes
std::cerr << "Sleeping for " << n << " seconds" << std::endl;
std::this_thread::sleep_for (std::chrono::seconds (n));
}
int main()
{
// Create two threads
ThreadPool p (2);
// Assign them 4 jobs
p.doJob (std::bind (silly, 1));
p.doJob (std::bind (silly, 2));
p.doJob (std::bind (silly, 3));
p.doJob (std::bind (silly, 4));
}
std::thread
Класс предназначен для выполнения ровно одной задачи (той, которую вы даете в конструкторе), а затем ее завершения. Если вы хотите сделать больше работы, вам понадобится новая тема. Начиная с C ++ 11 это все, что у нас есть. Пулы потоков не попали в стандарт. (Я не уверен, что C ++ 14 должен сказать о них.)
К счастью, вы можете легко реализовать необходимую логику самостоятельно. Вот масштабная картина:
Самая сложная часть здесь (которая все еще довольно проста) — правильно спроектировать рабочую очередь. Обычно для этого подойдет синхронизированный связанный список (из STL). Синхронизированный означает, что любой поток, который хочет манипулировать очередью, должен делать это только после того, как он получил std::mutex
так, чтобы избежать гоночных условий. Если рабочий поток находит список пустым, он должен ждать, пока снова не будет выполнено какое-либо задание. Вы можете использовать std::condition_variable
за это. Каждый раз, когда новая задача вставляется в очередь, поток вставки оповещает поток, который ожидает переменную условия и поэтому прекратит блокировку и в конечном итоге начнет обработку новой задачи.
Вторая не очень тривиальная часть — это то, как сигнализировать рабочим потокам, что больше нет работы. Ясно, что вы можете установить некоторый глобальный флаг, но если работник заблокирован, ожидая в очереди, это не будет реализовано в ближайшее время. Одним из решений может быть notify_all()
темы и пусть они проверяют флаг каждый раз, когда они получают уведомление. Другой вариант — вставить какой-то особый «токсичный» элемент в очередь. Если рабочий сталкивается с этим предметом, он уходит сам.
Представление очереди задач является простым, используя ваш собственный task
предметы или просто лямбды.
Все вышеперечисленное — это особенности C ++ 11. Если вы застряли с более ранней версией, вам придется прибегнуть к сторонним библиотекам, которые обеспечивают многопоточность для вашей конкретной платформы.
Хотя все это не является ракетостроением, в первый раз легко ошибиться. И, к сожалению, ошибки, связанные с параллелизмом, являются одними из самых трудных для устранения. Если вы потратите несколько часов на чтение соответствующих разделов хорошей книги или на учебник, то можете быстро окупиться.
это
std::thread acq1(...)
это вызов конструктора. создание нового объекта с именем acq1
это
acq1(...)
является приложением оператора () к существующему объекту aqc1. Если такой оператор не определен для std :: thread, компилятор жалуется.
Насколько я знаю, вы не можете повторно использовать std :: threads. Вы строите и запускаете их. Присоединяйся к ним и выбрасывай их,
Ну, это зависит от того, планируете ли вы переназначение или нет. Вы можете перемещать поток, но не делать его копию.
Код ниже будет создавать новую пару потоков на каждой итерации и перемещать их вместо старых потоков. Я думаю, это должно работать, потому что новый thread
объекты будут временными.
while(user doesn't interrupt)
{
//Process first batch of data while acquiring new data
std::thread proc1(ProcessData,memoryAddress1a);
std::thread proc2(ProcessData,memoryAddress2a);
acq1 = std::thread(AcquireData, boardHandle1, memoryAddress1b);
acq2 = std::thread(AcquireData, boardHandle2, memoryAddress2b);
acq1.join();
acq2.join();
proc1.join();
proc2.join();
/*Proceed in this manner, alternating which memory address
is written to and being processed until the user interrupts the program.*/
}
Что происходит, так это то, что объект фактически не заканчивает свое время жизни в конце итерации, потому что он объявлен во внешней области видимости цикла. Но каждый раз создается новый объект и move
происходит. Я не понимаю, что можно пощадить (я мог бы быть глупым), поэтому я представляю, что это точно так же, как объявление acq
s внутри цикла и просто повторно использовать символ. В общем … да, это о том, как вы классифицируете создание временных и move
,
Кроме того, это явно запускает новый поток в каждом цикле (конечно, заканчивая ранее назначенный поток), он не заставляет поток ждать новые данные и магически передавать их в канал обработки. Вам нужно будет реализовать это по-другому, как. Например: пул рабочих потоков и связь по очередям.
Рекомендации: operator=
, (ctor)
.
Я думаю, что ошибки, которые вы получаете, говорят сами за себя, поэтому я пропущу их объяснение.
Я думаю, вам нужен гораздо более простой ответ для запуска набора потоков более одного раза, это лучшее решение:
do{
std::vector<std::thread> thread_vector;
for (int i=0;i<nworkers;i++)
{
thread_vector.push_back(std::thread(yourFunction,Parameter1,Parameter2, ...));
}
for(std::thread& it: thread_vector)
{
it.join();
}
q++;
} while(q<NTIMES);
Вы также можете создать свой собственный класс Thread и вызвать его метод run, например:
class MyThread
{
public:
void run(std::function<void()> func) {
thread_ = std::thread(func);
}
void join() {
if(thread_.joinable())
thread_.join();
}
private:
std::thread thread_;
};
// Application code...
MyThread myThread;
myThread.run(AcquireData);