Я хотел бы распараллелить линейную операцию (подгонка сложной математической функции к некоторому набору данных) с несколькими процессорами.
Предположим, у меня 8 ядер на моей машине, и я хочу разместить 1000 наборов данных. Что я ожидаю, так это какая-то система, которая принимает 1000 наборов данных в виде очереди и отправляет их 8 ядрам для обработки, поэтому она начинает с принятия первых 8 из 1000 в качестве FIFO. Время подбора каждого набора данных в целом отличается от другого, поэтому некоторые из 8 устанавливаемых наборов данных могут занять больше времени, чем другие. То, что я хочу от системы, — это сохранить результаты установленных наборов данных, а затем возобновить получение новых наборов данных из большой очереди (1000 наборов данных) для каждого выполняемого потока. Это должно возобновиться, пока не будут обработаны все 1000 наборов данных. И тогда я мог бы продолжить свою программу.
Как называется такая система? и есть ли модели для этого на C ++?
Я распараллеливаюсь с OpenMP и использую передовые методы C ++, такие как шаблоны и полиморфизм.
Спасибо за любые усилия.
Вы можете использовать OpenMP параллельно с динамическим расписанием или задачами OpenMP. Оба могут быть использованы для распараллеливания случаев, когда каждая итерация занимает разное время для завершения. С динамически запланировано на:
#pragma omp parallel
{
Fitter fitter;
fitter.init();
#pragma omp for schedule(dynamic,1)
for (int i = 0; i < numFits; i++)
fitter.fit(..., &results[i]);
}
schedule(dynamic,1)
заставляет каждый поток выполнять по одной итерации за раз, и потоки никогда не остаются бездействующими, если нет больше итераций для обработки.
С заданиями:
#pragma omp parallel
{
Fitter fitter;
fitter.init();
#pragma omp single
for (int i = 0; i < numFits; i++)
{
#pragma omp task
fitter.fit(..., &results[i]);
}
#pragma omp taskwait
// ^^^ only necessary if more code before the end of the parallel region
}
Здесь один из потоков запускает цикл for, который создает 1000 задач OpenMP. Задачи OMP хранятся в очереди и обрабатываются незанятыми потоками. Он работает в некоторой степени аналогично динамическим циклам for, но обеспечивает большую свободу в конструкциях кода (например, с задачами, которые вы можете распараллелить рекурсивными алгоритмами). taskwait
Конструкция ожидает выполнения всех ожидающих выполнения задач. Это подразумевается в конце параллельной области, поэтому это действительно необходимо, только если больше кода следует до конца параллельной области.
В обоих случаях каждый вызов fit()
будет сделано в другой теме. Вы должны убедиться, что подгонка одного набора параметров не влияет на подгонку других наборов, например, тот fit()
это потокобезопасный метод / функция. В обоих случаях также требуется время для выполнения fit()
намного выше, чем издержки конструкций OpenMP.
Для работы с OpenMP требуется OpenMP 3.0-совместимый компилятор. Это исключает все версии MS VC ++ (даже ту, что в VS2012), если вам случится разрабатывать на Windows.
Если вы хотите, чтобы для каждого потока когда-либо инициализировался только один экземпляр установщика, то вам следует использовать несколько иной подход, например, сделать объект-установщик глобальным и threadprivate
:
#include <omp.h>
Fitter fitter;
#pragma omp threadprivate(fitter)
...
int main()
{
// Disable dynamic teams
omp_set_dynamic(0);
// Initialise all fitters once per thread
#pragma omp parallel
{
fitter.init();
}
...
#pragma omp parallel
{
#pragma omp for schedule(dynamic,1)
for (int i = 0; i < numFits; i++)
fitter.fit(..., &results[i]);
}
...
return 0;
}
Вот fitter
является глобальным примером Fitter
учебный класс. omp threadprivate
директива указывает компилятору поместить его в локальное хранилище потоков, например, сделать его глобальной переменной для каждого потока. Это сохраняется между различными параллельными областями. Вы также можете использовать omp threadprivate
на static
локальные переменные. Они также сохраняются между различными параллельными областями (но только в одной и той же функции):
#include <omp.h>
int main()
{
// Disable dynamic teams
omp_set_dynamic(0);
static Fitter fitter; // must be static
#pragma omp threadprivate(fitter)
// Initialise all fitters once per thread
#pragma omp parallel
{
fitter.init();
}
...
#pragma omp parallel
{
#pragma omp for schedule(dynamic,1)
for (int i = 0; i < numFits; i++)
fitter.fit(..., &results[i]);
}
...
return 0;
}
omp_set_dynamic(0)
вызов отключает динамические команды, то есть каждая параллельная область всегда будет выполняться с таким количеством потоков, как указано в OMP_NUM_THREADS
переменная окружения.
То, что вы в основном хотите, это пул работников (или пул потоков), которые берут задание из очереди, обрабатывают его и затем переходят к другому заданию. OpenMP предоставляет различные подходы для решения таких задач, например, барьеры (все работники работают до определенной точки и действуют только при выполнении определенного требования) или сокращения для накопления значений в глобальную переменную после того, как работникам удалось вычислить их соответствующие части.
Ваш вопрос очень широкий, но еще один совет, который я могу вам дать, — взглянуть на парадигму MapReduce. В этой парадигме функция отображается в наборе данных, и результат упорядочивается в сегменты, которые уменьшаются с помощью другой функции (которая может снова быть той же самой функцией). В вашем случае это будет означать, что каждый из ваших процессоров / ядер / узлов сопоставляет данную функцию с назначенным ей набором данных и отправляет области результатов другому узлу, ответственному за ее объединение. Я предполагаю, что вам нужно заглянуть в MPI, если вы хотите использовать MapReduce с C ++ и без использования конкретной инфраструктуры MapReduce. Поскольку вы запускаете программу на одном узле, возможно, вы можете сделать что-то похожее с OpenMP, поэтому поиск в Интернете может помочь.
TL; DR ищи пул рабочих (пул потоков), барьеры а также Уменьшение карты.