Многопоточное вычисление множества Мандельброта

Я создал программу, которая создает Mandelbrot задавать. Сейчас я пытаюсь сделать его многопоточным.

// mandelbrot.cpp
// compile with: g++ -std=c++11 mandelbrot.cpp -o mandelbrot
// view output with: eog mandelbrot.ppm

#include <fstream>
#include <complex> // if you make use of complex number facilities in C++
#include <iostream>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <vector>using namespace std;

template <class T> struct RGB { T r, g, b; };

template <class T>
class Matrix {
public:
Matrix(const size_t rows, const size_t cols) : _rows(rows), _cols(cols) {
_matrix = new T*[rows];
for (size_t i = 0; i < rows; ++i) {
_matrix[i] = new T[cols];
}
}
Matrix(const Matrix &m) : _rows(m._rows), _cols(m._cols) {
_matrix = new T*[m._rows];
for (size_t i = 0; i < m._rows; ++i) {
_matrix[i] = new T[m._cols];
for (size_t j = 0; j < m._cols; ++j) {
_matrix[i][j] = m._matrix[i][j];
}
}
}
~Matrix() {
for (size_t i = 0; i < _rows; ++i) {
delete [] _matrix[i];
}
delete [] _matrix;
}
T *operator[] (const size_t nIndex)
{
return _matrix[nIndex];
}
size_t width() const { return _cols; }
size_t height() const { return _rows; }
protected:
size_t _rows, _cols;
T **_matrix;
};

// Portable PixMap image
class PPMImage : public Matrix<RGB<unsigned char> >
{
public:
unsigned int size;

PPMImage(const size_t height, const size_t width) : Matrix(height, width) { }
void save(const std::string &filename)
{
std::ofstream out(filename, std::ios_base::binary);
out <<"P6" << std::endl << _cols << " " << _rows << std::endl << 255 << std::endl;
for (size_t y=0; y<_rows; y++)
for (size_t x=0; x<_cols; x++)
out << _matrix[y][x].r << _matrix[y][x].g << _matrix[y][x].b;
}
};

/*Draw mandelbrot according to the provided parameters*/
void draw_Mandelbrot(PPMImage & image, const unsigned width, const unsigned height, double cxmin, double cxmax, double cymin, double cymax,unsigned int max_iterations)
{

for (std::size_t ix = 0; ix < width; ++ix)
for (std::size_t iy = 0; iy < height; ++iy)
{
std::complex<double> c(cxmin + ix / (width - 1.0)*(cxmax - cxmin), cymin + iy / (height - 1.0)*(cymax - cymin));
std::complex<double> z = 0;
unsigned int iterations;

for (iterations = 0; iterations < max_iterations && std::abs(z) < 2.0; ++iterations)
z = z*z + c;

image[iy][ix].r = image[iy][ix].g = image[iy][ix].b = iterations;

}
}

int main()
{
const unsigned width = 1600;
const unsigned height = 1600;

PPMImage image(height, width);int parts = 8;

std::vector<int>bnd (parts, image.size);

std::thread *tt = new std::thread[parts - 1];

time_t start, end;
time(&start);
//Lauch parts-1 threads
for (int i = 0; i < parts - 1; ++i) {
tt[i] = std::thread(draw_Mandelbrot,ref(image), width, height, -2.0, 0.5, -1.0, 1.0, 10);
}

//Use the main thread to do part of the work !!!
for (int i = parts - 1; i < parts; ++i) {
draw_Mandelbrot(ref(image), width, height, -2.0, 0.5, -1.0, 1.0, 10);
}

//Join parts-1 threads
for (int i = 0; i < parts - 1; ++i)
tt[i].join();

time(&end);
std::cout << difftime(end, start) << " seconds" << std::endl;image.save("mandelbrot.ppm");

delete[] tt;

return 0;
}

Теперь каждый thread рисует полный фрактал (смотрите в main()). Как я могу позволить нитям нарисовать разные части фрактала?

2

Решение

Вы делаете это (довольно много) сложнее, чем нужно. Это та задача, для которой OpenMP почти идеально подходит. Для этой задачи это дает почти идеальное масштабирование с голый минимум усилий.

Я изменил ваш draw_mandelbrot вставив прагму перед внешним for цикл:

#pragma omp parallel for
for (int ix = 0; ix < width; ++ix)
for (int iy = 0; iy < height; ++iy)

Тогда я упростил ваш main до:

int main() {
const unsigned width = 1600;
const unsigned height = 1600;

PPMImage image(height, width);

clock_t start = clock();
draw_Mandelbrot(image, width, height, -2.0, 0.5, -1.0, 1.0, 10);
clock_t stop = clock();

std::cout << (double(stop - start) / CLOCKS_PER_SEC) << " seconds\n";

image.save("mandelbrot.ppm");

return 0;
}

На моей (довольно медленной) машине ваш оригинальный код работал за 4,73 секунды. Мой модифицированный код запустился за 1,38 секунды. Это улучшение в 3,4 раза по сравнению с кодом, которое почти неотличимо от тривиальной однопоточной версии.

Просто для того, чтобы это стоило, я сделал немного больше переписывания, чтобы получить это:

// mandelbrot.cpp
// compile with: g++ -std=c++11 mandelbrot.cpp -o mandelbrot
// view output with: eog mandelbrot.ppm

#include <fstream>
#include <complex> // if you make use of complex number facilities in C++
#include <iostream>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <vector>

using namespace std;

template <class T> struct RGB { T r, g, b; };

template <class T>
struct Matrix
{
std::vector<T> data;
size_t rows;
size_t cols;

class proxy {
Matrix &m;
size_t index_1;
public:
proxy(Matrix &m, size_t index_1) : m(m), index_1(index_1) { }

T &operator[](size_t index) { return m.data[index * m.rows + index_1]; }
};

class const_proxy {
Matrix const &m;
size_t index_1;
public:
const_proxy(Matrix const &m, size_t index_1) : m(m), index_1(index_1) { }

T const &operator[](size_t index) const { return m.data[index * m.rows + index_1]; }
};public:
Matrix(size_t rows, size_t cols) : data(rows * cols), rows(rows), cols(cols) { }

proxy operator[](size_t index) { return proxy(*this, index); }
const_proxy operator[](size_t index) const { return const_proxy(*this, index); }

};

template <class T>
std::ostream &operator<<(std::ostream &out, Matrix<T> const &m) {
out << "P6" << std::endl << m.cols << " " << m.rows << std::endl << 255 << std::endl;
for (size_t y = 0; y < m.rows; y++)
for (size_t x = 0; x < m.cols; x++) {
T pixel = m[y][x];
out << pixel.r << pixel.g << pixel.b;
}
return out;
}

/*Draw Mandelbrot according to the provided parameters*/
template <class T>
void draw_Mandelbrot(T & image, const unsigned width, const unsigned height, double cxmin, double cxmax, double cymin, double cymax, unsigned int max_iterations) {

#pragma omp parallel for
for (int ix = 0; ix < width; ++ix)
for (int iy = 0; iy < height; ++iy)
{
std::complex<double> c(cxmin + ix / (width - 1.0)*(cxmax - cxmin), cymin + iy / (height - 1.0)*(cymax - cymin));
std::complex<double> z = 0;
unsigned int iterations;

for (iterations = 0; iterations < max_iterations && std::abs(z) < 2.0; ++iterations)
z = z*z + c;

image[iy][ix].r = image[iy][ix].g = image[iy][ix].b = iterations;

}
}

int main() {
const unsigned width = 1600;
const unsigned height = 1600;

Matrix<RGB<unsigned char>> image(height, width);

clock_t start = clock();
draw_Mandelbrot(image, width, height, -2.0, 0.5, -1.0, 1.0, 255);
clock_t stop = clock();

std::cout << (double(stop - start) / CLOCKS_PER_SEC) << " seconds\n";

std::ofstream out("mandelbrot.ppm", std::ios::binary);
out << image;

return 0;
}

Когда моя машина работает в «быстром» (ну, менее медленном) режиме, этот код выполняется за 0,5–0,6 секунды.

Что касается того, почему я сделал эти изменения: в основном, чтобы сделать это быстрее, чище и проще. Ваш класс Matrix выделил отдельный блок памяти для каждой строки (или, возможно, столбца — не уделял пристального внимания). Это выделяет один непрерывный блок всей матрицы вместо этого. Это устраняет уровень косвенности для доступа к данным и увеличивает локальность ссылок, тем самым улучшая использование кэша. Это также уменьшает общий объем используемых данных.

Переход от использования time использовать clock чтобы определить время, нужно было измерить время процессора, а не время стены (и, как правило, существенно повысить точность).

Избавление от класса PPMImage было сделано просто потому, что (IMO) иметь класс PPImage, который является производным от класса Matrix, просто не имеет большого (если вообще есть) смысла. Я предполагаю, что это работает (для достаточно свободного определения «работы»), но это не кажется мне хорошим дизайном. Если вы настаиваете на том, чтобы делать это вообще, это должно быть, по крайней мере, частным производным, потому что вы просто используете Матрицу как способ реализации вашего класса PPMImage, а не (по крайней мере, я, конечно, надеюсь, что нет) пытаться сделать утверждения о свойствах PPM изображения.

Если по какой-либо причине вы решите обрабатывать потоки вручную, очевидным способом разделения работы между потоками будет все равно смотреть на циклы внутри draw_mandelbrot, Очевидным было бы оставить ваш внешний цикл в покое, но отправить вычисления для каждой итерации в пул потоков:
для (int ix = 0; ix < ширина; ++ IX)
compute_thread (IX);

где тело compute_thread в основном этот кусок кода:

        for (int iy = 0; iy < height; ++iy)
{
std::complex<double> c(cxmin + ix / (width - 1.0)*(cxmax - cxmin), cymin + iy / (height - 1.0)*(cymax - cymin));
std::complex<double> z = 0;
unsigned int iterations;

for (iterations = 0; iterations < max_iterations && std::abs(z) < 2.0; ++iterations)
z = z*z + c;

image[iy][ix].r = image[iy][ix].g = image[iy][ix].b = iterations;

}

Очевидно, что было бы немного работы по передаче правильных данных в вычислительный поток (каждый поток должен передавать ссылку на фрагмент результирующего изображения), но это было бы очевидным и довольно чистым местом для разделения вещей. В частности, он разделяет работу на достаточное количество задач, и вы полуавтоматически получаете довольно хорошую балансировку нагрузки (т. Е. Вы можете держать все ядра занятыми), но достаточно большими, чтобы не тратить огромное количество времени на связь и синхронизацию между потоки.

Что касается результата с числом итераций, равным 255, я получаю следующее (масштабируется до 25%):

введите описание изображения здесь

…что в значительной степени, как я ожидал.

4

Другие решения

Одна из главных проблем этого подхода заключается в том, что разные регионы требуют разного количества времени для расчета.

Более общий подход.

  • Запустите 1 исходную ветку.
  • Запустить N рабочих потоков.
  • Начинаю 1 сливную нить.
  • Создайте 2 потокобезопасных очереди (назовите их исходная очередь и очередь приемника).
  • Разделите изображение на М (много больше чем N) кусочков.
  • Исходный поток помещает части в исходную очередь
  • Рабочие извлекают кусок из исходной очереди, преобразуют фрагменты в результирующие фрагменты и помещают эти фрагменты в очередь приемников.
  • Поток-приемник берет фрагменты из очереди-приемника и объединяет их в окончательное изображение.

При таком распределении работы все рабочие потоки будут все время заняты.

1

Вы можете разделить фрактал на части, разделив начало и конец фрактала с размером экрана:

   $this->stepsRe = (double)((($this->startRe * -1) + ($this->endeRe)) / ($this->size_x-1));
$this->stepsIm = (double)((($this->startIm * -1) + ($this->endeIm)) / ($this->size_y-1));
0
По вопросам рекламы [email protected]