многопоточность — C ++ поток с низкой задержкой, асинхронный, буферизованный поток (предназначен для ведения журнала) — Boost

Вопрос:

3 цикла while содержат код, который был закомментирован. Я ищу («TAG1», «TAG2» и «TAG3») для легкой идентификации. Я просто хочу, чтобы циклы while дожидались, пока проверенное условие станет истинным, прежде чем продолжить работу, максимально минимизируя ресурсы ЦП. Сначала я попытался использовать переменные условия Boost, но есть условие гонки. Перевести поток в спящий режим на «х» микросекунд неэффективно, потому что нет способа точно определить время пробуждения. Наконец, boost :: this_thread :: yield (), похоже, ничего не делает. Вероятно, потому что у меня только 2 активных потока в двухъядерной системе. В частности, как я могу сделать так, чтобы три помеченных области ниже работали более эффективно, вводя как можно меньше ненужных блокировок.

ФОН

Задача:

У меня есть приложение, которое регистрирует много данных. После профилирования я обнаружил, что много времени тратится на операции регистрации (запись текста или двоичного файла в файл на локальном жестком диске). Моя цель состоит в том, чтобы уменьшить задержку при вызовах logData, заменив не поточные вызовы прямой записи вызовами в потоковый буферизованный потоковый регистратор.

Изученные варианты:

  • Обновление медленного жесткого диска 2005 года до SSD … возможно. Стоимость не является чрезмерной … но требует много работы … более 200 компьютеров должны быть обновлены …
  • Ускорьте ASIO … Мне не нужны все затраты на проактор / сетевые возможности, я ищу что-то более простое и более легкое.

Дизайн:

  • Шаблон потока производителя и потребителя, приложение записывает данные в буфер, а фоновый поток затем записывает их на диск через некоторое время. Таким образом, конечная цель состоит в том, чтобы функция writeMessage, вызываемая прикладным уровнем, возвращала как можно быстрее, пока данные правильно / полностью регистрируются в файле журнала в порядке FIFO через некоторое время.
  • Только один поток приложения, только один поток записи.
  • На основе кольцевого буфера. Причина этого решения состоит в том, чтобы использовать как можно меньше блокировок и в идеале … и, пожалуйста, исправьте меня, если я ошибаюсь … Я не думаю, что мне нужен какой-либо.
  • Буфер — это статически размещенный символьный массив, но он может перемещать его в кучу, если это необходимо / желательно по соображениям производительности.
  • Буфер имеет начальный указатель, который указывает на следующий символ, который должен быть записан в файл. Буфер имеет указатель конца, который указывает на индекс массива после последнего символа, который будет записан в файл. Указатель конца НИКОГДА не пропускает указатель начала. Если приходит сообщение, которое больше, чем буфер, то средство записи ожидает, пока буфер не будет очищен, и записывает новое сообщение в файл напрямую, не помещая сообщение слишком большого размера в буфер (как только буфер очищается, рабочий поток не буду ничего писать так что без разногласий).
  • Автор (рабочий поток) обновляет только указатель начала кольцевого буфера.
  • Основной (поток приложения) только обновляет указатель конца кольцевого буфера, и опять же, он только вставляет новые данные в буфер, когда есть доступное пространство … в противном случае он либо ждет, пока пространство в буфере станет доступным, либо записывает напрямую, как описано выше.
  • Рабочий поток постоянно проверяет, есть ли данные для записи (на что указывает случай, когда указатель начала буфера! = Указатель конца буфера). Если нет данных для записи, рабочий поток в идеале должен перейти в спящий режим и проснуться после того, как поток приложения вставил что-то в буфер (и изменил указатель конца буфера так, чтобы он больше не указывал на тот же индекс, что и начало указатель). То, что я имею ниже, включает циклы while, постоянно проверяющие это состояние. Это очень плохой / неэффективный способ ожидания в буфере.

Результаты:

  • На моем двухъядерном ноутбуке 2009 года с твердотельным накопителем я вижу, что общее время записи в тесте с поточной / буферизованной и прямой записью составляет примерно 1: 6 (0,609 с против 0,095 с), но сильно варьируется. Часто тест буферизованной записи на самом деле медленнее, чем прямая запись. Я полагаю, что изменчивость вызвана плохой реализацией ожидания освобождения места в буфере, ожидания его освобождения и наличия рабочего потока в ожидании доступности работы. Я измерил, что некоторые из циклов while потребляют более 10000 циклов, и я подозреваю, что эти циклы фактически конкурируют за аппаратные ресурсы, которые требуются другому потоку (рабочему или приложению) для завершения ожидаемого вычисления.
  • Выход, кажется, проверить. С включенным режимом TEST и небольшим размером буфера, равным 10, в качестве стресс-теста я вывел сотни мегабайт на выходе и обнаружил, что он равен входу.

Компилируется с текущей версией Boost (1.55)

заголовок

    #ifndef BufferedLogStream_h
#define BufferedLogStream_h

#include <stdio.h>
#include <iostream>
#include <iostream>
#include <cstdlib>
#include "boost\chrono\chrono.hpp"#include "boost\thread\thread.hpp"#include "boost\thread\locks.hpp"#include "boost\thread\mutex.hpp"#include "boost\thread\condition_variable.hpp"#include <time.h>

using namespace std;

#define BENCHMARK_STR_SIZE 128
#define NUM_BENCHMARK_WRITES 524288
#define TEST 0
#define BENCHMARK 1
#define WORKER_LOOP_WAIT_MICROSEC 20
#define MAIN_LOOP_WAIT_MICROSEC 10

#if(TEST)
#define BUFFER_SIZE 10
#else
#define BUFFER_SIZE 33554432 //4 MB
#endif

class BufferedLogStream {
public:
BufferedLogStream();
void openFile(char* filename);
void flush();
void close();
inline void writeMessage(const char* message, unsigned int length);
void writeMessage(string message);
bool operator() () { return start != end; }

private:
void threadedWriter();
inline bool hasSomethingToWrite();
inline unsigned int getFreeSpaceInBuffer();
void appendStringToBuffer(const char* message, unsigned int length);

FILE* fp;
char* start;
char* end;
char* endofringbuffer;
char ringbuffer[BUFFER_SIZE];
bool workerthreadkeepalive;
boost::mutex mtx;
boost::condition_variable waitforempty;
boost::mutex workmtx;
boost::condition_variable waitforwork;

#if(TEST)
struct testbuffer {
int length;
char message[BUFFER_SIZE * 2];
};

public:
void test();

private:
void getNextRandomTest(testbuffer &tb);
FILE* datatowrite;
#endif

#if(BENCHMARK)
public:
void runBenchmark();

private:
void initBenchmarkString();
void runDirectWriteBaseline();
void runBufferedWriteBenchmark();

char benchmarkstr[BENCHMARK_STR_SIZE];
#endif
};

#if(TEST)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->openFile("replicated.txt");
bl->test();
bl->close();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif

#if(BENCHMARK)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->runBenchmark();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif //for benchmark

#endif

Реализация

    #include "BufferedLogStream.h"
BufferedLogStream::BufferedLogStream() {
fp = NULL;
start = ringbuffer;
end = ringbuffer;
endofringbuffer = ringbuffer + BUFFER_SIZE;
workerthreadkeepalive = true;
}

void BufferedLogStream::openFile(char* filename) {
if(fp) close();
workerthreadkeepalive = true;
boost::thread t2(&BufferedLogStream::threadedWriter, this);
fp = fopen(filename, "w+b");
}

void BufferedLogStream::flush() {
fflush(fp);
}

void BufferedLogStream::close() {
workerthreadkeepalive = false;
if(!fp) return;
while(hasSomethingToWrite()) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
flush();
fclose(fp);
fp = NULL;
}

void BufferedLogStream::threadedWriter() {
while(true) {
if(start != end) {
char* currentend = end;
if(start < currentend) {
fwrite(start, 1, currentend - start, fp);
}
else if(start > currentend) {
if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp);
fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
}
start = currentend;
waitforempty.notify_one();
}
else { //start == end...no work to do
if(!workerthreadkeepalive) return;
boost::unique_lock<boost::mutex> u(workmtx);
waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
}
}
}

bool BufferedLogStream::hasSomethingToWrite() {
return start != end;
}

void BufferedLogStream::writeMessage(string message) {
writeMessage(message.c_str(), message.length());
}

unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
if(end == start) return BUFFER_SIZE-1;
return start - end - 1; //case where start > end
}

void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
memcpy(end, message, length);
end += length;
}
else {
int lengthtoendofbuffer = endofringbuffer - end;
if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
int remainderlength =  length - lengthtoendofbuffer;
memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
end = ringbuffer + remainderlength;
}
}

void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
while(hasSomethingToWrite()); {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
fwrite(message, 1, length, fp);
}
else {
//wait until there is enough free space to insert new string
while(getFreeSpaceInBuffer() < length) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
appendStringToBuffer(message, length);
}
waitforwork.notify_one();
}

#if(TEST)
void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
for(int i = 0; i < tb.length; i++) {
tb.message[i] = rand() % 26 + 65;
}
tb.message[tb.length] = '\n';
tb.length++;
tb.message[tb.length] = '\0';
}

void BufferedLogStream::test() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
testbuffer tb;
datatowrite = fopen("orig.txt", "w+b");
for(unsigned int i = 0; i < 7000000; i++) {
if(i % 1000000 == 0) cout << i << endl;
getNextRandomTest(tb);
writeMessage(tb.message, tb.length);
fwrite(tb.message, 1, tb.length, datatowrite);
}
fflush(datatowrite);
fclose(datatowrite);
}
#endif

#if(BENCHMARK)
void BufferedLogStream::initBenchmarkString() {
for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
benchmarkstr[i] = rand() % 26 + 65;
}
benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
}

void BufferedLogStream::runDirectWriteBaseline() {
clock_t starttime = clock();
fp = fopen("BenchMarkBaseline.txt", "w+b");
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
}
fflush(fp);
fclose(fp);
clock_t elapsedtime = clock() - starttime;
cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}

void BufferedLogStream::runBufferedWriteBenchmark() {
clock_t starttime = clock();
openFile("BufferedBenchmark.txt");
cout << "Opend file" << endl;
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
}
cout << "Wrote" << endl;
close();
cout << "Close" << endl;
clock_t elapsedtime = clock() - starttime;
cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}

void BufferedLogStream::runBenchmark() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
initBenchmarkString();
runDirectWriteBaseline();
runBufferedWriteBenchmark();
}
#endif

Обновление: 25 ноября 2013 г.

Я обновил код ниже, используя boost :: condition_variables, а именно метод wait_for (), как рекомендовал Евгений Панасюк. Это позволяет избежать ненужной проверки одного и того же состояния снова и снова. В настоящее время я вижу, что буферизованная версия запускается примерно в 1/6 раз как небуферизованная / прямая запись. Это не идеальный случай, потому что оба случая ограничены жестким диском (в моем случае SSD эпохи 2010 года). Я планирую использовать приведенный ниже код в среде, где жесткий диск не будет узким местом, и в большинстве случаев, если не всегда, в буфере должно быть достаточно свободного места для размещения запросов writeMessage. Это подводит меня к следующему вопросу. Насколько большой я должен сделать буфер? Я не против выделить 32 или 64 МБ, чтобы они никогда не заполнялись. Код будет работать в системах, которые могут сэкономить это. Интуитивно я чувствую, что статически выделять массив из 32 МБ — плохая идея. Это? В любом случае, я ожидаю, что при запуске приведенного ниже кода для моего предполагаемого приложения задержка вызовов logData () будет значительно уменьшена, что приведет к значительному сокращению общего времени обработки.

Если кто-нибудь найдет способ сделать код ниже лучше (быстрее, надежнее, экономичнее и т. Д.), Пожалуйста, дайте мне знать. Я ценю обратную связь. Лазин, как ваш подход будет быстрее или эффективнее, чем то, что я опубликовал ниже? Мне нравится идея иметь один буфер и сделать его достаточно большим, чтобы он практически никогда не заполнялся. Тогда мне не нужно беспокоиться о чтении из разных буферов. Евгений Панасюк, мне нравится подход по возможности использовать существующий код, особенно если это существующая библиотека Boost. Тем не менее, я также не вижу, насколько spcs_queue более эффективен, чем у меня ниже. Я предпочел бы иметь дело с одним большим буфером, чем со многими меньшими, и мне нужно беспокоиться о разделении, разделении моего входного потока на входе и объединении его вместе на выходе. Ваш подход позволил бы мне переложить форматирование из основного потока в рабочий поток. Это умный подход. Но я пока не уверен, сэкономит ли это много времени, и чтобы реализовать все преимущества, мне придется изменить код, которым я не владею.

// Конец обновления

5

Решение

Общее решение.

Я думаю, вы должны посмотреть на Алгоритм наглости. Для одного производителя и одного потребителя это будет выглядеть так:

  • В начале буфер пуст, рабочий поток простаивает и ждет событий.
  • Производитель записывает данные в буфер и уведомляет рабочий поток.
  • Рабочий поток проснулся и запустил операцию записи.
  • Producer пытается написать другое сообщение, но работник использует буфер, поэтому производитель выделяет другой буфер и записывает в него сообщение.
  • Производитель пытается записать другое сообщение, ввод-вывод все еще выполняется, поэтому производитель записывает сообщение в ранее выделенный буфер.
  • Рабочий поток завершил запись буфера в файл и видит, что есть еще один буфер с данными, поэтому он захватывает его и начинает запись.
  • Самый первый буфер используется производителем для записи всех последовательных сообщений до выполнения второй операции записи.

Эта схема поможет достичь минимальных требований к задержке, одно сообщение будет записано на диск мгновенно, но большое количество событий будет записано большими пакетами для большей пропускной способности.

Если ваши сообщения журнала имеют уровни — вы можете немного улучшить эту схему. Все сообщения об ошибках имеют высокий приоритет (уровень) и должны быть немедленно сохранены на диске (потому что они редки, но очень ценны), но сообщения отладки и трассировки имеют низкий приоритет и могут быть помещены в буфер для экономии пропускной способности (потому что они очень часты, но не так ценны как сообщения об ошибках и информация). Поэтому, когда вы пишете error сообщение, вы должны подождать, пока рабочий поток завершит запись вашего сообщения (и всех сообщений, находящихся в одном буфере), а затем продолжить, но сообщения отладки и трассировки могут быть просто записаны в буфер.

Threading.

Создание рабочего потока для каждого потока приложения является дорогостоящим. Вы должны использовать один поток записи для каждого файла журнала. Буферы записи должны быть разделены между потоками. Каждый буфер должен иметь два указателя — commit_pointer а также prepare_pointer, Все буферное пространство между началом буфера и commit_pointer доступны для рабочего потока. Буферное пространство между commit_pointer а также prepare_pointer В настоящее время обновляются темы приложений. Инвариантная: commit_pointer <знак равно prepare_pointer,

Операции записи могут быть выполнены в два этапа.

  1. Готовься писать. Эта операция резервирует место в буфере.
    • Производитель рассчитывает len (сообщение) и атомарно обновляет prepare_pointer;
    • старый prepare_pointer стоимость и длина сохраняются потребителем;
  2. Коммит пишите.
    • Производитель пишет сообщение в начале зарезервированного буферного пространства (старое значение prepare_pointer).
    • Продюсер занят-ждет пока commit_pointer равно старому prepare_pointer Значение, что его сохранить в локальной переменной.
    • Производитель фиксирует операцию записи, выполняя commit_pointer знак равно commit_pointer + лен атомно.

Чтобы предотвратить ложное совместное использование, len (сообщение) может быть округлено до размера строки кэша, а все дополнительное пространство может быть заполнено пробелами.

// pseudocode
void write(const char* message) {
int len = strlen(message);  // TODO: round to cache line size
const char* old_prepare_ptr;
// Prepare step
while(1)
{
old_prepare_ptr = prepare_ptr;
if (
CAS(&prepare_ptr,
old_prepare_ptr,
prepare_ptr + len) == old_prepare_ptr
)
break;
// retry if another thread perform prepare op.
}
// Write message
memcpy((void*)old_prepare_ptr, (void*)message, len);
// Commit step
while(1)
{
const char* old_commit_ptr = commit_ptr;
if (
CAS(&commit_ptr,
old_commit_ptr,
old_commit_ptr + len) == old_commit_ptr
)
break;
// retry if another thread commits
}
notify_worker_thread();
}
6

Другие решения

Вопрос, который у меня возникает, состоит в том, как заставить рабочий поток работать, как только есть работа, и спать, когда нет работы.

Есть boost::lockfree::spsc_queue — очередь без единого производителя без ожидания. Он может быть настроен на время компиляции (размер внутреннего кольцевого буфера).

Из того, что я понимаю, вы хотите что-то похожее на следующую конфигурацию:

template<typename T, size_t N>
class concurrent_queue
{
// T can be wrapped into struct with padding in order to avoid false sharing
mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
mutable mutex m;
mutable condition_variable c;

void wait() const
{
unique_lock<mutex> u(m);
c.wait_for(u, chrono::microseconds(1)); // Or whatever period you need.
// Timeout is required, because modification happens not under mutex
//     and notification can be lost.
// Another option is just to use sleep/yield, without notifications.
}
void notify() const
{
c.notify_one();
}
public:
void push(const T &t)
{
while(!q.push(t))
wait();
notify();
}
void pop(T &result)
{
while(!q.pop(result))
wait();
notify();
}
};

Когда есть элементы в очереди — pop не блокирует И когда есть достаточно места во внутреннем буфере — push не блокирует


Я хочу максимально сократить время форматирования и записи, поэтому планирую сократить и то, и другое.

Проверять, выписываться Херб Саттер говорить в C ++ и после 2012 года: C ++ параллелизм. В страница 14 он показывает пример concurrent<T>, В основном это обертка вокруг объекта типа T который запускает отдельный поток для выполнения всех операций с этим объектом. Использование это:

concurrent<ostream*> x(&cout); // starts thread internally
// ...
// x acts as function object.
// It's function call operator accepts action
//   which is performed on wrapped object in separate thread.
int i = 42;
x([i](ostream *out){ *out << "i=" << i; }); // passing lambda as action

Вы можете использовать подобный шаблон для разгрузки всей работы по форматированию в потребительский поток.


В противном случае новые буферы выделяются, и я хочу избежать выделения памяти после того, как поток буфера создан.

Выше concurrent_queue<T, Size> В примере используется буфер фиксированного размера, который полностью содержится в очереди и не предполагает дополнительных выделений.

Тем не менее, Херб concurrent<T> пример использования std::function передать действие в рабочий поток. Это может повлечь за собой дорогостоящее распределение.

std::function Реализации могут использовать Оптимизацию Малых Объектов (как и большинство реализаций) — небольшие функциональные объекты создаются на месте во внутреннем буфере, но это не гарантируется, а для функциональных объектов, превышающих пороговое значение, может произойти выделение кучи.

Есть несколько вариантов, чтобы избежать этого распределения:

  1. Воплощать в жизнь std::function аналог с внутренним буфером, достаточно большим для хранения целевых объектов функций (например, вы можете попытаться изменить boost::function или же этот версия).

  2. Используйте свой собственный объект функции, который будет представлять все типы сообщений журнала. В основном он будет содержать только значения, необходимые для форматирования сообщения. Поскольку потенциально существуют различные типы сообщений, рассмотрите возможность использования boost::variant (который является литературным объединение в сочетании с тегом типа) представлять их.

Собирая все вместе, вот подтверждение концепции (используя второй вариант):

LIVE DEMO

#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/variant.hpp>

#include <condition_variable>
#include <iostream>
#include <cstddef>
#include <thread>
#include <chrono>
#include <mutex>

using namespace std;

/*********************************************/
template<typename T, size_t N>
class concurrent_queue
{
mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
mutable mutex m;
mutable condition_variable c;

void wait() const
{
unique_lock<mutex> u(m);
c.wait_for(u, chrono::microseconds(1));
}
void notify() const
{
c.notify_one();
}
public:
void push(const T &t)
{
while(!q.push(t))
wait();
notify();
}
void pop(T &result)
{
while(!q.pop(result))
wait();
notify();
}
};

/*********************************************/
template<typename T, typename F>
class concurrent
{
typedef boost::optional<F> Job;

mutable concurrent_queue<Job, 16> q; // use custom size
mutable T x;
thread worker;

public:
concurrent(T x)
: x{x}, worker{[this]
{
Job j;
while(true)
{
q.pop(j);
if(!j) break;
(*j)(this->x); // you may need to handle exceptions in some way
}
}}
{}
void operator()(const F &f)
{
q.push(Job{f});
}
~concurrent()
{
q.push(Job{});
worker.join();
}
};

/*********************************************/
struct LogEntry
{
struct Formatter
{
typedef void result_type;
ostream *out;

void operator()(double x) const
{
*out << "floating point: " << x << endl;
}
void operator()(int x) const
{
*out << "integer: " << x << endl;
}
};
boost::variant<int, double> data;

void operator()(ostream *out)
{
boost::apply_visitor(Formatter{out}, data);
}
};

/*********************************************/
int main()
{
concurrent<ostream*, LogEntry> log{&cout};

for(int i=0; i!=1024; ++i)
{
log({i});
log({i/10.});
}
}
3

По вопросам рекламы [email protected]