Использование параллелизма в конвейерном исполнении

Я пытаюсь разработать конвейер, в котором данные сначала считываются и обрабатываются, обрабатываются один раз, манипулируются другим способом, а затем отображаются. Я имею в виду конструкцию, в которой ввод-вывод данных подается в буфер, который читается первым манипулятором. Впоследствии этот первый манипулятор записывает данные в другой буфер, который по возможности читается вторым манипулятором. Наконец, выходные данные второго манипулятора записываются в буфер отображения, который считывается визуализатором и отображается с использованием OpenGL.

На мой взгляд, это довольно простая параллельная проблема, в которой каждая задача имеет свой собственный поток, и они взаимодействуют через буферы данных. Тем не менее, все учебники, которые я встречал для многопоточных программ, предполагают, что многопоточность должна быть оставлена ​​на усмотрение некоторого промежуточного программного обеспечения (такого как OpenMP), которое решает, как разделить рабочую нагрузку.

Я новичок в разработке многопоточных приложений, так что это может быть глупый вопрос, но возможно ли то, что я описал, и это можно сделать с помощью промежуточного программного обеспечения, такого как OpenMP? Я понимаю, что очевидный ответ — «попробуй», и я хочу, но учебники не проливают свет на * как * попробовать.

1

Решение

OpenMP лучше подходит для алгоритмов, которые легко распространяются на несколько ядер (SIMD). Возможны и другие сценарии, но в вашем случае я думаю, что прямое использование потоков будет работать лучше, и его будет легче кодировать и поддерживать.

Я делю свой ответ на две части: общее решение без OpenMP и некоторые конкретные изменения для использования OpenMP.

Как уже упоминалось в комментарии, вы сталкиваетесь с проблемой производителя / потребителя, но дважды: один поток заполняет буфер (производит элемент), который затем должен быть прочитан (и изменен) вторым (потребляемым). Особенность вашей проблемы в том, что этот второй поток также является продюсером (изображение, которое нужно нарисовать), а третий поток отвечает за его потребление (визуализатор).

Как вы уже знаете, проблема P / C решается с помощью буфера (возможно, циклического буфера или очереди произведенных элементов), где каждый элемент буфера помечен как произведенный или потребленный, и где потоки имеют монопольный доступ при добавлении или брать предметы из него.


Давайте использовать подход очереди с вашей проблемой в следующем примере программы.

  • Произведенные предметы будут храниться в очереди. Передняя часть очереди содержит самые старые элементы, те, которые должны быть использованы в первую очередь.
  • Существует две очереди: одна для данных, созданных первым манипулятором (и предназначенных для использования вторым манипулятором), и другая для данных, созданных вторым манипулятором (и которые будут визуализироваться другим потоком).
  • Этап производства прост: получите эксклюзивный доступ к соответствующей очереди и вставьте элемент в конце.
  • Расход аналогичен, но он должен ждать, пока в очереди будет хотя бы один элемент (не быть пустым).
  • Я добавил немного спит моделировать другие операции.
  • Условие остановки приведено для иллюстрации.

Примечание: я предполагаю, что у вас есть доступ к компилятору C ++ 11 для простоты. Реализации, использующие другие API, относительно похожи.

#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <list>

using namespace std::chrono_literals;

std::mutex g_data_produced_by_m1_mutex;
std::list<int> g_data_produced_by_m1;

std::mutex g_data_produced_by_m2_mutex;
std::list<int> g_data_produced_by_m2;

std::atomic<bool> stop = false;

void manipulator1_kernel()
{
while (!stop) {
// Producer 1: generate data
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
g_data_produced_by_m1.push_back(rand());
}
std::this_thread::sleep_for(100ms);
}
}

void manipulator2_kernel()
{
int data;

while (!stop) {
// Consumer 1
while (!stop) { // wait until there is an item to be consumed
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex);
if (!g_data_produced_by_m1.empty()) { // is there data to be consumed?
data = g_data_produced_by_m1.front(); // consume
g_data_produced_by_m1.pop_front();
break;
}
}
std::this_thread::sleep_for(100ms);
}

// Producer 2: modify and send to the visualizer
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
g_data_produced_by_m2.push_back(5 * data);
}

std::this_thread::sleep_for(100ms);
}
}

void visualizer_kernel()
{
int data;

while (!stop) {
// Consumer 2
while (!stop) { // wait until there is an item to be visualized
{
std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex);
if (!g_data_produced_by_m2.empty()) {
data = g_data_produced_by_m2.front();
g_data_produced_by_m2.pop_front();
break;
}
}
std::this_thread::sleep_for(100ms);
}

std::cout << data << std::endl; // render to display
std::this_thread::sleep_for(100ms);

if (data % 8 == 0) stop = true; // some stop condition for the example
}
}

int main()
{
std::thread manipulator1(manipulator1_kernel);
std::thread manipulator2(manipulator2_kernel);
std::thread visualizer(visualizer_kernel);

visualizer.join();
manipulator2.join();
manipulator1.join();

return 0;
}

Если вы все еще хотите использовать OpenMP, возможно, вы найдете задачи (начиная с OpenMP 3.0 я думаю). Я не очень много их использовал, но вышеприведенную программу можно переписать так:

int main()
{
#pragma omp parallel
{
#pragma omp task
manipulator1_kernel();
#pragma omp task
manipulator2_kernel();
#pragma omp task
visualizer_kernel();

#pragma omp taskwait
}

return 0;
}

Остальную часть кода можно изменить, чтобы использовать функции OpenMP, но я думаю, что это отвечает на ваш вопрос.

Основная проблема с этим подходом заключается в том, что вам нужно создать блок кода для задач, чтобы жить в OpenMP parallel, легко усложняя всю логику и структуру вашего приложения.

1

Другие решения

Для решения этой конкретной проблемы библиотека Intel® Threading Building Blocks включает специальные конструкции. Intel® TBB это кроссплатформенная библиотека, которая помогает в многопоточном программировании.
Мы могли бы рассматривать сущности, участвующие в вашем приложении, как четырех разных поставщиков задач. Один тип задач — это задачи ввода — те, которые предоставляют входные данные, другой тип задач обеспечивается первой процедурой манипуляции и так далее.

Таким образом, единственное, что нужно сделать пользователю, это предоставить тело для этих задач. В библиотеке есть несколько API для указания, какие тела должны обрабатываться и как это делать параллельно. Все остальное (здесь я имею в виду создание потоков, синхронизацию между выполнением задач, балансировку работы и т. Д.) Выполняется библиотекой.

Самым простым вариантом решения, которое пришло мне в голову, является использование parallel_pipeline функция. Вот прототип:

#include "tbb/pipeline.h"using namespace tbb;

int main() {
parallel_pipeline(/*specify max number of bodies executed in parallel, e.g.*/16,
make_filter<void, input_data_type>(
filter::serial_in_order, // read data sequentially
[](flow_control& fc) -> input_data_type {
if ( /*check some stop condition: EOF, etc.*/ ) {
fc.stop();
return input_data_type(); // return dummy value
}
auto input_data = read_data();
return input_data;
}
) &
make_filter<input_data_type, manipulator1_output_type>(
filter::parallel, // process data in parallel by the first manipulator
[](input_data_type elem) -> manipulator1_output_type {
auto processed_elem = manipulator1::process(elem);
return processed_elem;
}
) &
make_filter<manipulator1_output_type, manipulator2_output_type>(
filter::parallel, // process data in parallel by the second manipulator
[](manipulator1_output_type elem) -> manipulator2_output_type {
auto processed_elem = manipulator2::process(elem);
return processed_elem;
}
) &
make_filter<manipulator2_output_type, void>(
filter::serial_in_order, // visualize frame by frame
[](manipulator2_output_type elem) {
visualize(elem);
}
)
);
return 0;
}

при условии, что необходимые функции (read_data, visualize) реализованы. Вот input_data_type, manipulator1_output_typeи т. д. — это типы, которые передаются между этапами конвейера и манипуляторами. process функции делают необходимые вычисления по переданным аргументам.

Кстати, чтобы избежать работы с блокировками и другими примитивами синхронизации, вы можете использовать concurrent_bounded_queue из библиотеки и помещать свои входные данные в эту очередь, возможно, в другом потоке (например, выделенном для операций ввода-вывода), так просто, как concurrent_bounded_queue_instance.push(elem), а затем прочитать его через input_data_type elem; concurrent_bounded_queue_instance.pop(elem), Обратите внимание, что выталкивание элемента здесь является блокирующей операцией. concurrent_queue обеспечивает неблокирование try_pop альтернатива.

Другая возможность заключается в использовании tbb::flow_graph и его узлы для организации одной и той же схемы конвейеризации. Взгляните на два примера, которые описывают зависимость а также данные потоковые графики. Возможно, вам придется использовать sequencer_node для правильного оформления заказа (при необходимости).

Стоит прочитать вопросы SO, отмеченные тег, чтобы увидеть, как другие люди используют эту библиотеку.

1

Вы реализовали однопоточную версию? профнастил?

они являются важными шагами, и без них вы можете получить оптимальную реализацию вашего высокопараллельного проекта, просто чтобы понять, что узким местом является ввод-вывод ваших буферов и / или синхронизация потоков и / или ложное совместное использование и / или пропадание кэша, или похожие проблемы.

Сначала я бы попробовал простой пул потоков с задачами, которые выполняют все шаги последовательно. Затем, проанализировав, как это работает, каково потребление процессора и т. Д., Я бы поэкспериментировал с более сложными инструментами. всегда сравнивая их производительность с первым простым изданием

0
По вопросам рекламы [email protected]