Параллельная программа без увеличения скорости по сравнению с линейной программой

Я создал модельную программу из более сложной программы, которая будет использовать многопоточность и несколько жестких дисков для повышения производительности. Размер данных настолько велик, что считывание всех данных в память будет невозможно, поэтому данные будут считываться, обрабатываться и записываться обратно порциями. Эта тестовая программа использует конструкцию конвейера для одновременного чтения, обработки и записи в 3 разных потоках. Поскольку чтение и запись относятся к разным жестким дискам, нет проблем с чтением и записью одновременно. Однако программа, использующая многопоточность, кажется, работает в 2 раза медленнее, чем ее линейная версия (также в коде). Я пытался, чтобы поток чтения и записи не был уничтожен после запуска чанка, но синхронизация, кажется, замедлила его даже больше, чем в текущей версии. Мне было интересно, если я делаю что-то не так или как я могу улучшить это. Благодарю вас.

Протестировано с использованием i3-2100 @ 3.1ghz и 16 ГБ оперативной памяти.

#include <iostream>
#include <fstream>
#include <ctime>
#include <thread>

#define CHUNKSIZE 8192    //size of each chunk to process
#define DATASIZE 2097152  //total size of data

using namespace std;

int data[3][CHUNKSIZE];
int run = 0;
int totalRun = DATASIZE/CHUNKSIZE;

bool finishRead = false, finishWrite = false;

ifstream infile;
ofstream outfile;

clock_t starttime, endtime;

/*
Process a chunk of data(simulate only, does not require to sort all data)
*/
void quickSort(int arr[], int left, int right) {

int i = left, j = right;
int tmp;
int pivot = arr[(left + right) / 2];

while (i <= j) {
while (arr[i] < pivot) i++;
while (arr[j] > pivot) j--;
if (i <= j) {
tmp = arr[i];
arr[i] = arr[j];
arr[j] = tmp;
i++;
j--;
}
};

if (left < j) quickSort(arr, left, j);
if (i < right) quickSort(arr, i, right);
}

/*
Find runtime
*/
void diffclock(){
double diff = (endtime - starttime)/(CLOCKS_PER_SEC/1000);
cout<<"Total run time: "<<diff<<"ms"<<endl;
}

/*
Read a chunk of data
*/
void readData(){

for(int i = 0; i < CHUNKSIZE; i++){
infile>>data[run%3][i];
}
finishRead = true;

}

/*
Write a chunk of data
*/
void writeData(){

for(int i = 0; i < CHUNKSIZE; i++){
outfile<<data[(run-2)%3][i]<<endl;
}
finishWrite = true;
}

/*
Pipelines Read, Process, Write using multithread
*/
void threadtransfer(){

starttime = clock();

infile.open("/home/pcg/test/iothread/source.txt");
outfile.open("/media/pcg/Data/test/iothread/ThreadDuplicate.txt");

thread read, write;

run = 0;
readData();

run = 1;
readData();
quickSort(data[(run-1)%3], 0, CHUNKSIZE - 1);

run = 2;
while(run < totalRun){
//cout<<run<<endl;
finishRead = finishWrite = false;
read = thread(readData);
write = thread(writeData);
read.detach();
write.detach();
quickSort(data[(run-1)%3], 0, CHUNKSIZE - 1);
while(!finishRead||!finishWrite){}  //check if next cycle is ready.
run++;
}quickSort(data[(run-1)%3], 0, CHUNKSIZE - 1);
writeData();

run++;
writeData();

infile.close();
outfile.close();

endtime = clock();
diffclock();
}

/*
Linearly read, sort, and write a chunk and repeat.
*/
void lineartransfer(){

int totalRun = DATASIZE/CHUNKSIZE;
int holder[CHUNKSIZE];
starttime = clock();

infile.open("/home/pcg/test/iothread/source.txt");
outfile.open("/media/pcg/Data/test/iothread/Linearduplicate.txt");

run = 0;

while(run < totalRun){

for(int i = 0; i < CHUNKSIZE; i++) infile>>holder[i];
quickSort(holder, 0, CHUNKSIZE - 1);
for(int i = 0; i < CHUNKSIZE; i++) outfile<<holder[i]<<endl;
run++;
}

endtime = clock();
diffclock();
}

/*
Create large amount of data for testing
*/
void createData(){
outfile.open("/home/pcg/test/iothread/source.txt");

for(int i = 0; i < DATASIZE; i++){
outfile<<rand()<<endl;

}
outfile.close();
}int main(){

int mode=0;
cout<<"Number of threads: "<<thread::hardware_concurrency()<<endl;
cout<<"Enter mode\n1.Create Data\n2.thread copy\n3.linear copy\ninput mode:";
cin>>mode;

if(mode == 1) createData();
else if(mode == 2) threadtransfer();
else if(mode == 3) lineartransfer();

return 0;
}

1

Решение

Поскольку вы измеряете время, используя clock на машине с Linux я ожидаю, что общее время процессора (примерно) одинаково, независимо от того, используете ли вы один поток или несколько потоков.

Может быть, вы хотите использовать time myprog вместо? Или использовать gettimeofday получить время (которое даст вам время в секундах + наносекунды [хотя наносекунды могут быть не «точными» вплоть до последней цифры]).

Редактировать:
Далее не используйте endl при записи в файл. Это сильно тормозит, потому что среда выполнения C ++ запускается и сбрасывается в файл, который является вызовом операционной системы. Он почти наверняка как-то защищен от нескольких потоков, поэтому у вас есть три потока, выполняющих запись данных по одной строке синхронно за раз. Скорее всего, это займет почти 3 раза больше, чем запуск одного потока. Кроме того, не пишите в один и тот же файл из трех разных потоков — это будет так или иначе плохо.

1

Другие решения

Не заняты — подождите. Это тратит драгоценное время процессора и может значительно замедлить остальное (не говоря уже о том, что компилятор может оптимизировать его в бесконечный цикл, потому что он не может угадать, будут ли эти флаги изменяться или нет, поэтому он даже не корректен с самого начала). И не detach() или. Заменить оба detach() и занят ожидания join():

while (run < totalRun) {
read = thread(readData);
write = thread(writeData);
quickSort(data[(run-1)%3], 0, CHUNKSIZE - 1);
read.join();
write.join();
run++;
}

Что касается глобального дизайна, ну, игнорируя глобальные переменные, я думаю, что в противном случае это приемлемо, если вы не ожидаете обработки (quickSort) часть, когда-либо превышающая время чтения / записи. Я бы, например, использовал очереди сообщений для передачи буферов между различными потоками (что позволяет добавлять больше потоков обработки, если вам это нужно, либо выполнять одни и те же задачи параллельно, либо разные задачи последовательно), но, возможно, это потому, что я привык делай так

2

Пожалуйста, исправьте меня, если я ошибаюсь, но кажется, что ваша многопоточная функция — это в основном линейная функция, выполняющая в 3 раза больше работы вашей линейной функции?

В многопоточной программе вы создаете три потока и запускаете функции readData / quicksort по одному разу в каждом потоке (распределяя рабочую нагрузку), но в вашей программе кажется, что симуляция потока фактически просто читает три раза, быстро сортирует три раза и записывает три раз и общее время, необходимое для выполнения всех трех из каждого.

0
По вопросам рекламы [email protected]