Я пытаюсь разработать конвейер, в котором данные сначала считываются и обрабатываются, обрабатываются один раз, манипулируются другим способом, а затем отображаются. Я имею в виду конструкцию, в которой ввод-вывод данных подается в буфер, который читается первым манипулятором. Впоследствии этот первый манипулятор записывает данные в другой буфер, который по возможности читается вторым манипулятором. Наконец, выходные данные второго манипулятора записываются в буфер отображения, который считывается визуализатором и отображается с использованием OpenGL.
На мой взгляд, это довольно простая параллельная проблема, в которой каждая задача имеет свой собственный поток, и они взаимодействуют через буферы данных. Тем не менее, все учебники, которые я встречал для многопоточных программ, предполагают, что многопоточность должна быть оставлена на усмотрение некоторого промежуточного программного обеспечения (такого как OpenMP), которое решает, как разделить рабочую нагрузку.
Я новичок в разработке многопоточных приложений, так что это может быть глупый вопрос, но возможно ли то, что я описал, и это можно сделать с помощью промежуточного программного обеспечения, такого как OpenMP? Я понимаю, что очевидный ответ — «попробуй», и я хочу, но учебники не проливают свет на * как * попробовать.
OpenMP лучше подходит для алгоритмов, которые легко распространяются на несколько ядер (SIMD). Возможны и другие сценарии, но в вашем случае я думаю, что прямое использование потоков будет работать лучше, и его будет легче кодировать и поддерживать.
Я делю свой ответ на две части: общее решение без OpenMP и некоторые конкретные изменения для использования OpenMP.
Как уже упоминалось в комментарии, вы сталкиваетесь с проблемой производителя / потребителя, но дважды: один поток заполняет буфер (производит элемент), который затем должен быть прочитан (и изменен) вторым (потребляемым). Особенность вашей проблемы в том, что этот второй поток также является продюсером (изображение, которое нужно нарисовать), а третий поток отвечает за его потребление (визуализатор).
Как вы уже знаете, проблема P / C решается с помощью буфера (возможно, циклического буфера или очереди произведенных элементов), где каждый элемент буфера помечен как произведенный или потребленный, и где потоки имеют монопольный доступ при добавлении или брать предметы из него.
Давайте использовать подход очереди с вашей проблемой в следующем примере программы.
Примечание: я предполагаю, что у вас есть доступ к компилятору C ++ 11 для простоты. Реализации, использующие другие API, относительно похожи.
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <list>
using namespace std::chrono_literals;
std::mutex g_data_produced_by_m1_mutex;
std::list<int> g_data_produced_by_m1;
std::mutex g_data_produced_by_m2_mutex;
std::list<int> g_data_produced_by_m2;
std::atomic<bool> stop = false;
void manipulator1_kernel()
{
while (!stop) {
// Producer 1: generate data
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
g_data_produced_by_m1.push_back(rand());
}
std::this_thread::sleep_for(100ms);
}
}
void manipulator2_kernel()
{
int data;
while (!stop) {
// Consumer 1
while (!stop) { // wait until there is an item to be consumed
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
if (!g_data_produced_by_m1.empty()) { // is there data to be consumed?
data = g_data_produced_by_m1.front(); // consume
g_data_produced_by_m1.pop_front();
break;
}
}
std::this_thread::sleep_for(100ms);
}
// Producer 2: modify and send to the visualizer
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
g_data_produced_by_m2.push_back(5 * data);
}
std::this_thread::sleep_for(100ms);
}
}
void visualizer_kernel()
{
int data;
while (!stop) {
// Consumer 2
while (!stop) { // wait until there is an item to be visualized
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
if (!g_data_produced_by_m2.empty()) {
data = g_data_produced_by_m2.front();
g_data_produced_by_m2.pop_front();
break;
}
}
std::this_thread::sleep_for(100ms);
}
std::cout << data << std::endl; // render to display
std::this_thread::sleep_for(100ms);
if (data % 8 == 0) stop = true; // some stop condition for the example
}
}
int main()
{
std::thread manipulator1(manipulator1_kernel);
std::thread manipulator2(manipulator2_kernel);
std::thread visualizer(visualizer_kernel);
visualizer.join();
manipulator2.join();
manipulator1.join();
return 0;
}
Если вы все еще хотите использовать OpenMP, возможно, вы найдете задачи (начиная с OpenMP 3.0 я думаю). Я не очень много их использовал, но вышеприведенную программу можно переписать так:
int main()
{
#pragma omp parallel
{
#pragma omp task
manipulator1_kernel();
#pragma omp task
manipulator2_kernel();
#pragma omp task
visualizer_kernel();
#pragma omp taskwait
}
return 0;
}
Остальную часть кода можно изменить, чтобы использовать функции OpenMP, но я думаю, что это отвечает на ваш вопрос.
Основная проблема с этим подходом заключается в том, что вам нужно создать блок кода для задач, чтобы жить в OpenMP parallel
, легко усложняя всю логику и структуру вашего приложения.
Для решения этой конкретной проблемы библиотека Intel® Threading Building Blocks включает специальные конструкции. Intel® TBB это кроссплатформенная библиотека, которая помогает в многопоточном программировании.
Мы могли бы рассматривать сущности, участвующие в вашем приложении, как четырех разных поставщиков задач. Один тип задач — это задачи ввода — те, которые предоставляют входные данные, другой тип задач обеспечивается первой процедурой манипуляции и так далее.
Таким образом, единственное, что нужно сделать пользователю, это предоставить тело для этих задач. В библиотеке есть несколько API для указания, какие тела должны обрабатываться и как это делать параллельно. Все остальное (здесь я имею в виду создание потоков, синхронизацию между выполнением задач, балансировку работы и т. Д.) Выполняется библиотекой.
Самым простым вариантом решения, которое пришло мне в голову, является использование parallel_pipeline функция. Вот прототип:
#include "tbb/pipeline.h"using namespace tbb;
int main() {
parallel_pipeline(/*specify max number of bodies executed in parallel, e.g.*/16,
make_filter<void, input_data_type>(
filter::serial_in_order, // read data sequentially
[](flow_control& fc) -> input_data_type {
if ( /*check some stop condition: EOF, etc.*/ ) {
fc.stop();
return input_data_type(); // return dummy value
}
auto input_data = read_data();
return input_data;
}
) &
make_filter<input_data_type, manipulator1_output_type>(
filter::parallel, // process data in parallel by the first manipulator
[](input_data_type elem) -> manipulator1_output_type {
auto processed_elem = manipulator1::process(elem);
return processed_elem;
}
) &
make_filter<manipulator1_output_type, manipulator2_output_type>(
filter::parallel, // process data in parallel by the second manipulator
[](manipulator1_output_type elem) -> manipulator2_output_type {
auto processed_elem = manipulator2::process(elem);
return processed_elem;
}
) &
make_filter<manipulator2_output_type, void>(
filter::serial_in_order, // visualize frame by frame
[](manipulator2_output_type elem) {
visualize(elem);
}
)
);
return 0;
}
при условии, что необходимые функции (read_data, visualize) реализованы. Вот input_data_type
, manipulator1_output_type
и т. д. — это типы, которые передаются между этапами конвейера и манипуляторами. process
функции делают необходимые вычисления по переданным аргументам.
Кстати, чтобы избежать работы с блокировками и другими примитивами синхронизации, вы можете использовать concurrent_bounded_queue из библиотеки и помещать свои входные данные в эту очередь, возможно, в другом потоке (например, выделенном для операций ввода-вывода), так просто, как concurrent_bounded_queue_instance.push(elem)
, а затем прочитать его через input_data_type elem; concurrent_bounded_queue_instance.pop(elem)
, Обратите внимание, что выталкивание элемента здесь является блокирующей операцией. concurrent_queue
обеспечивает неблокирование try_pop
альтернатива.
Другая возможность заключается в использовании tbb::flow_graph
и его узлы для организации одной и той же схемы конвейеризации. Взгляните на два примера, которые описывают зависимость а также данные потоковые графики. Возможно, вам придется использовать sequencer_node для правильного оформления заказа (при необходимости).
Стоит прочитать вопросы SO, отмеченные ТВВ тег, чтобы увидеть, как другие люди используют эту библиотеку.
Вы реализовали однопоточную версию? профнастил?
они являются важными шагами, и без них вы можете получить оптимальную реализацию вашего высокопараллельного проекта, просто чтобы понять, что узким местом является ввод-вывод ваших буферов и / или синхронизация потоков и / или ложное совместное использование и / или пропадание кэша, или похожие проблемы.
Сначала я бы попробовал простой пул потоков с задачами, которые выполняют все шаги последовательно. Затем, проанализировав, как это работает, каково потребление процессора и т. Д., Я бы поэкспериментировал с более сложными инструментами. всегда сравнивая их производительность с первым простым изданием