Я хотел бы читать параллельно строка из выходного файла. Каждый поток читает одну строку, а затем работает с данными. В то же время следующая тема должна прочитать следующую строку.
std::ifstream infile("test.txt");
std::mutex mtx;
void read(int id_thread){
while(infile.good()){
mtx.lock();
std::string sLine;
getline(infile, sLine);
std::cout << "Read by thread: " << id_thread;
std::cout << sLine << std::endl;
mtx.unlock();
}
}
void main(){
std::vector<std::thread> threads;
for(int i = 0; i < num; i++){
threads.push_back(std::thread(parallelFun, i));
}
for(auto& thread : threads){
thread.join();
}
return 0;
}
Когда я запускаю этот код, я получаю это:
Первый поток прочитал все строки. Как я могу сделать так, чтобы каждый поток читал одну строку?
РЕДАКТИРОВАТЬ
Как уже упоминалось в комментариях, все, что мне нужно было сделать, это больший тестовый файл.
Спасибо, парни!
Я бы изменил цикл в
while(infile.good()){
mtx.lock();
std::string sLine;
getline(infile, sLine);
mtx.unlock();
std::cout << "Read by thread: " << id_thread;
std::cout << sLine << std::endl;
}
Ваш std :: cout — это занятая часть цикла тестирования, которую вы хотите обменять на реальный код позже. Это дает другому потоку время для запуска. Кроме того, сделайте свой тестовый файл большой. Нередко инициализация потока занимает некоторое время, в течение которого первый поток съедает все данные.
Если вы хотите, чтобы ваши 5 потоков читали ровно каждую 5-ю строку, вы должны синхронизировать чтения, поэтому каждый поток должен знать, что предыдущий завершил чтение своей части. Это требование потенциально навязывает огромный неэффективность, поскольку некоторые потоки могут долго ждать предыдущего, чтобы запустить.
Понятие кода, непроверенное использование на свой страх и риск.
Давайте сначала создадим класс по умолчанию для обработки атомарных блокировок. Мы выравниваем его, чтобы избежать ложного обмена и связанного с ним кэша пинг-понга.
constexpr size_t CACHELINESIZE = 64; // could differ on your architecture
template<class dType>
class alignas(CACHELINESIZE) lockstep {
std::atomic<dType> lock = dType(0);
public:
// spinlock spins until the previous value is prev and then tries to set lock to value
// until success, restart the spin if prev changes.
dType Spinlock(dType prev = dType(0), dType next = dType(1)) {
dType expected = prev;
while (!lock.compare_exchange_weak(expected, next)) { // request for locked-exclusiv ~100 cycles?
expected = prev; // we wish to continue to wait for expected
do {
pause(); // on intel waits roughly one L2 latency time.
} while(lock.load(std::memory_order_relaxed) != prev); // only one cache miss per change
}
return expected;
}
void store(dType value) {
lock.store(value);
}
};
lockstep<int> lock { 0 };
constexpr int NoThreads = 5;
std::ifstream infile("test.txt");
void read(int id_thread) {
locks[id_thread].lock = id_thread;
bool izNoGood = false;
int next = id_thread;
while(!izNoGood){
// get lock for next iteration
lock.spinlock(next, next); // wait on our number
// moved file check into locked region
izNoGood = !infile.good();
if (izNoGood) {
lock.store(next+1); // release next thread to end run.
return;
}
std::string sLine;
getline(infile, sLine);
// release next thread
lock.store(next+1);
// do work asynchronous
// ...
// debug log, hopefully the whole line gets written in one go (atomic)
// but can be in "random" order relative to other lines.
std::cout << "Read by thread: " << id_thread << " line no. " << next
<< " text:" << sLine << std::endl; // endl flushes cout, implicit sync?
next += NoThreads; // our next expected line to process
}
}
void main() {
std::vector<std::thread> threads;
for(int i = 0; i < NoThreads; i++) {
threads.push_back(std::thread(parallelFun, i));
}
for(auto& thread : threads){
thread.join();
}
return 0;
}
На всякий случай, если вы хотите, чтобы каждый поток читал одну строку (что очевидно из вашего описания), удалите цикл while, а затем вам нужно убедиться, что у вас такое же количество потоков, как и количество строк в файле.
Чтобы избавиться от вышеуказанного ограничения, вы можете использовать Boost Threadpool.