Сохранение данных при выполнении многопоточной обработки

Существует приложение-загрузчик, которое выполняет различные виды обработки элементов загрузки в нескольких потоках. Некоторые потоки анализируют входные данные, другие выполняют загрузку, извлечение, сохранение состояния и т. Д. Таким образом, каждый тип потока работает с определенными элементами данных, и некоторые из этих потоков могут выполняться одновременно. Элемент загрузки может быть описан так:

class File;

class Download
{
public:
enum State
{
Parsing, Downloading, Extracting, Repairing, Finished
};

Download(const std::string &filePath): filePath(filePath) { }

void save()
{
// TODO: save data consistently

StateFile f;    // state file for this download

// save general download parameters
f << filePath << state << bytesWritten << totalFiles << processedFiles;

// Now we are to save the parameters of the files which belong to this download,
// (!) but assume the downloading thread kicks in, downloads some data and
// changes the state of a file. That causes "bytesWritten", "processedFiles"// and "state" to be different from what we have just saved.

// When we finally save the state of the files their parameters don't match
// the parameters of the download (state, bytesWritten, processedFiles).
for (File *f : files)
{
// save the file...
}
}

private:
std::string filePath;
std::atomic<State> state = Parsing;
std::atomic<int> bytesWritten = 0;
int totalFiles = 0;
std::atomic<int> processedFiles = 0;
std::mutex fileMutex;
std::vector<File*> files;
};

Интересно, как сохранить эти данные последовательно. Например, состояние и количество обработанных файлов, возможно, уже были сохранены, и мы собираемся сохранить список файлов. Между тем, некоторые другие потоки могут изменять состояние файла и, следовательно, количество обработанных файлов или состояние загрузки, делая сохраненные данные несогласованными.

Первая идея, которая приходит на ум, — это добавить один мьютекс для всех элементов данных и каждый раз блокировать его. любой из них доступен. Но это было бы, вероятно, неэффективно, так как большинство временных потоков обращаются к различным элементам данных, а сохранение происходит только один раз в несколько минут.

Мне кажется, что такая задача довольно распространена в многопоточном программировании, поэтому я надеюсь, что опытные люди могли бы предложить лучший способ.

0

Решение

Я бы посоветовал вам использовать шаблон производителя.

Загрузчик выдает анализатору и уведомляет его о потреблении, анализатор производит извлечению и уведомляет его о потреблении и экстрактор ремонтнику.
Затем у вас будет очередь для каждой задачи. Синхронизация может быть оптимизирована с использованием условных переменных, так что потребитель извлекает информацию только тогда, когда ему сообщают, что что-то произведено. В конечном итоге вы будете использовать гораздо меньше мьютексов и гораздо более удобочитаемый и эффективный дизайн.

Вот пример кода для очереди и как это сделать, если вам нужно одновременно загружать, анализировать, извлекать и сохранять:

#include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>
template<typename T>
class synchronized_queu
{
public:
T consume_one()
{
std::unique_lock<std::mutex> lock(lock_);
while (queue_.size() == 0)
condition_.wait(lock); //release and obtain again
T available_data = queue_.front();
queue_.pop();
return available_data;
}
void produce_one(const T& data)
{
std::unique_lock<std::mutex> lock(lock_);
queue_.push(data);
condition_.notify_one();//notify only one or all as per your design...
}
private:
std::queue<T> queue_;
std::mutex lock_;
std::condition_variable condition_;
};
struct data
{
//.....
};

void download(synchronized_queu<data>& q)
{
//...
data data_downloaded = ; //data downloaded;
q.produce_one(data_downloaded);
}

void parse(synchronized_queu<data>& q1, synchronized_queu<data>& q2)
{
//...
data data_downloaded = q1.consume_one();
//parse
data data_parsed = ;//....
q2.produce_one(data_parsed);
}

void extract(synchronized_queu<data>& q1, synchronized_queu<data>& q2)
{
//...
data data_parsed = q1.consume_one();
//parse
data data_extracted = ;//....
q2.produce_one(data_extracted);
}
void save(synchronized_queu<data>& q)
{
data data_extracted = q.consume_one();
//save....
}

int main()
{
synchronized_queu<data> dowlowded_queue;
synchronized_queu<data> parsed_queue;
synchronized_queu<data> extracted_queue;

std::thread downloader(download, dowlowded_queue);
std::thread parser(parse, dowlowded_queue, parsed_queue);
std::thread extractor(extract, parsed_queue, extracted_queue);
std::thread saver(save, extracted_queue);
while (/*condition to stop program*/)
{

}
downloader.join();
parser.join();
extractor.join();
saver.join();
return 0;
}
0

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]