Вопрос:
3 цикла while содержат код, который был закомментирован. Я ищу («TAG1», «TAG2» и «TAG3») для легкой идентификации. Я просто хочу, чтобы циклы while дожидались, пока проверенное условие станет истинным, прежде чем продолжить работу, максимально минимизируя ресурсы ЦП. Сначала я попытался использовать переменные условия Boost, но есть условие гонки. Перевести поток в спящий режим на «х» микросекунд неэффективно, потому что нет способа точно определить время пробуждения. Наконец, boost :: this_thread :: yield (), похоже, ничего не делает. Вероятно, потому что у меня только 2 активных потока в двухъядерной системе. В частности, как я могу сделать так, чтобы три помеченных области ниже работали более эффективно, вводя как можно меньше ненужных блокировок.
ФОН
Задача:
У меня есть приложение, которое регистрирует много данных. После профилирования я обнаружил, что много времени тратится на операции регистрации (запись текста или двоичного файла в файл на локальном жестком диске). Моя цель состоит в том, чтобы уменьшить задержку при вызовах logData, заменив не поточные вызовы прямой записи вызовами в потоковый буферизованный потоковый регистратор.
Изученные варианты:
Дизайн:
Результаты:
Компилируется с текущей версией Boost (1.55)
заголовок
#ifndef BufferedLogStream_h
#define BufferedLogStream_h
#include <stdio.h>
#include <iostream>
#include <iostream>
#include <cstdlib>
#include "boost\chrono\chrono.hpp"#include "boost\thread\thread.hpp"#include "boost\thread\locks.hpp"#include "boost\thread\mutex.hpp"#include "boost\thread\condition_variable.hpp"#include <time.h>
using namespace std;
#define BENCHMARK_STR_SIZE 128
#define NUM_BENCHMARK_WRITES 524288
#define TEST 0
#define BENCHMARK 1
#define WORKER_LOOP_WAIT_MICROSEC 20
#define MAIN_LOOP_WAIT_MICROSEC 10
#if(TEST)
#define BUFFER_SIZE 10
#else
#define BUFFER_SIZE 33554432 //4 MB
#endif
class BufferedLogStream {
public:
BufferedLogStream();
void openFile(char* filename);
void flush();
void close();
inline void writeMessage(const char* message, unsigned int length);
void writeMessage(string message);
bool operator() () { return start != end; }
private:
void threadedWriter();
inline bool hasSomethingToWrite();
inline unsigned int getFreeSpaceInBuffer();
void appendStringToBuffer(const char* message, unsigned int length);
FILE* fp;
char* start;
char* end;
char* endofringbuffer;
char ringbuffer[BUFFER_SIZE];
bool workerthreadkeepalive;
boost::mutex mtx;
boost::condition_variable waitforempty;
boost::mutex workmtx;
boost::condition_variable waitforwork;
#if(TEST)
struct testbuffer {
int length;
char message[BUFFER_SIZE * 2];
};
public:
void test();
private:
void getNextRandomTest(testbuffer &tb);
FILE* datatowrite;
#endif
#if(BENCHMARK)
public:
void runBenchmark();
private:
void initBenchmarkString();
void runDirectWriteBaseline();
void runBufferedWriteBenchmark();
char benchmarkstr[BENCHMARK_STR_SIZE];
#endif
};
#if(TEST)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->openFile("replicated.txt");
bl->test();
bl->close();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif
#if(BENCHMARK)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->runBenchmark();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif //for benchmark
#endif
Реализация
#include "BufferedLogStream.h"
BufferedLogStream::BufferedLogStream() {
fp = NULL;
start = ringbuffer;
end = ringbuffer;
endofringbuffer = ringbuffer + BUFFER_SIZE;
workerthreadkeepalive = true;
}
void BufferedLogStream::openFile(char* filename) {
if(fp) close();
workerthreadkeepalive = true;
boost::thread t2(&BufferedLogStream::threadedWriter, this);
fp = fopen(filename, "w+b");
}
void BufferedLogStream::flush() {
fflush(fp);
}
void BufferedLogStream::close() {
workerthreadkeepalive = false;
if(!fp) return;
while(hasSomethingToWrite()) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
flush();
fclose(fp);
fp = NULL;
}
void BufferedLogStream::threadedWriter() {
while(true) {
if(start != end) {
char* currentend = end;
if(start < currentend) {
fwrite(start, 1, currentend - start, fp);
}
else if(start > currentend) {
if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp);
fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
}
start = currentend;
waitforempty.notify_one();
}
else { //start == end...no work to do
if(!workerthreadkeepalive) return;
boost::unique_lock<boost::mutex> u(workmtx);
waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
}
}
}
bool BufferedLogStream::hasSomethingToWrite() {
return start != end;
}
void BufferedLogStream::writeMessage(string message) {
writeMessage(message.c_str(), message.length());
}
unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
if(end == start) return BUFFER_SIZE-1;
return start - end - 1; //case where start > end
}
void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
memcpy(end, message, length);
end += length;
}
else {
int lengthtoendofbuffer = endofringbuffer - end;
if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
int remainderlength = length - lengthtoendofbuffer;
memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
end = ringbuffer + remainderlength;
}
}
void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
while(hasSomethingToWrite()); {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
fwrite(message, 1, length, fp);
}
else {
//wait until there is enough free space to insert new string
while(getFreeSpaceInBuffer() < length) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
appendStringToBuffer(message, length);
}
waitforwork.notify_one();
}
#if(TEST)
void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
for(int i = 0; i < tb.length; i++) {
tb.message[i] = rand() % 26 + 65;
}
tb.message[tb.length] = '\n';
tb.length++;
tb.message[tb.length] = '\0';
}
void BufferedLogStream::test() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
testbuffer tb;
datatowrite = fopen("orig.txt", "w+b");
for(unsigned int i = 0; i < 7000000; i++) {
if(i % 1000000 == 0) cout << i << endl;
getNextRandomTest(tb);
writeMessage(tb.message, tb.length);
fwrite(tb.message, 1, tb.length, datatowrite);
}
fflush(datatowrite);
fclose(datatowrite);
}
#endif
#if(BENCHMARK)
void BufferedLogStream::initBenchmarkString() {
for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
benchmarkstr[i] = rand() % 26 + 65;
}
benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
}
void BufferedLogStream::runDirectWriteBaseline() {
clock_t starttime = clock();
fp = fopen("BenchMarkBaseline.txt", "w+b");
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
}
fflush(fp);
fclose(fp);
clock_t elapsedtime = clock() - starttime;
cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}
void BufferedLogStream::runBufferedWriteBenchmark() {
clock_t starttime = clock();
openFile("BufferedBenchmark.txt");
cout << "Opend file" << endl;
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
}
cout << "Wrote" << endl;
close();
cout << "Close" << endl;
clock_t elapsedtime = clock() - starttime;
cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}
void BufferedLogStream::runBenchmark() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
initBenchmarkString();
runDirectWriteBaseline();
runBufferedWriteBenchmark();
}
#endif
Обновление: 25 ноября 2013 г.
Я обновил код ниже, используя boost :: condition_variables, а именно метод wait_for (), как рекомендовал Евгений Панасюк. Это позволяет избежать ненужной проверки одного и того же состояния снова и снова. В настоящее время я вижу, что буферизованная версия запускается примерно в 1/6 раз как небуферизованная / прямая запись. Это не идеальный случай, потому что оба случая ограничены жестким диском (в моем случае SSD эпохи 2010 года). Я планирую использовать приведенный ниже код в среде, где жесткий диск не будет узким местом, и в большинстве случаев, если не всегда, в буфере должно быть достаточно свободного места для размещения запросов writeMessage. Это подводит меня к следующему вопросу. Насколько большой я должен сделать буфер? Я не против выделить 32 или 64 МБ, чтобы они никогда не заполнялись. Код будет работать в системах, которые могут сэкономить это. Интуитивно я чувствую, что статически выделять массив из 32 МБ — плохая идея. Это? В любом случае, я ожидаю, что при запуске приведенного ниже кода для моего предполагаемого приложения задержка вызовов logData () будет значительно уменьшена, что приведет к значительному сокращению общего времени обработки.
Если кто-нибудь найдет способ сделать код ниже лучше (быстрее, надежнее, экономичнее и т. Д.), Пожалуйста, дайте мне знать. Я ценю обратную связь. Лазин, как ваш подход будет быстрее или эффективнее, чем то, что я опубликовал ниже? Мне нравится идея иметь один буфер и сделать его достаточно большим, чтобы он практически никогда не заполнялся. Тогда мне не нужно беспокоиться о чтении из разных буферов. Евгений Панасюк, мне нравится подход по возможности использовать существующий код, особенно если это существующая библиотека Boost. Тем не менее, я также не вижу, насколько spcs_queue более эффективен, чем у меня ниже. Я предпочел бы иметь дело с одним большим буфером, чем со многими меньшими, и мне нужно беспокоиться о разделении, разделении моего входного потока на входе и объединении его вместе на выходе. Ваш подход позволил бы мне переложить форматирование из основного потока в рабочий поток. Это умный подход. Но я пока не уверен, сэкономит ли это много времени, и чтобы реализовать все преимущества, мне придется изменить код, которым я не владею.
// Конец обновления
Общее решение.
Я думаю, вы должны посмотреть на Алгоритм наглости. Для одного производителя и одного потребителя это будет выглядеть так:
Эта схема поможет достичь минимальных требований к задержке, одно сообщение будет записано на диск мгновенно, но большое количество событий будет записано большими пакетами для большей пропускной способности.
Если ваши сообщения журнала имеют уровни — вы можете немного улучшить эту схему. Все сообщения об ошибках имеют высокий приоритет (уровень) и должны быть немедленно сохранены на диске (потому что они редки, но очень ценны), но сообщения отладки и трассировки имеют низкий приоритет и могут быть помещены в буфер для экономии пропускной способности (потому что они очень часты, но не так ценны как сообщения об ошибках и информация). Поэтому, когда вы пишете error
сообщение, вы должны подождать, пока рабочий поток завершит запись вашего сообщения (и всех сообщений, находящихся в одном буфере), а затем продолжить, но сообщения отладки и трассировки могут быть просто записаны в буфер.
Threading.
Создание рабочего потока для каждого потока приложения является дорогостоящим. Вы должны использовать один поток записи для каждого файла журнала. Буферы записи должны быть разделены между потоками. Каждый буфер должен иметь два указателя — commit_pointer
а также prepare_pointer
, Все буферное пространство между началом буфера и commit_pointer
доступны для рабочего потока. Буферное пространство между commit_pointer
а также prepare_pointer
В настоящее время обновляются темы приложений. Инвариантная: commit_pointer
<знак равно prepare_pointer
,
Операции записи могут быть выполнены в два этапа.
prepare_pointer
;prepare_pointer
стоимость и длина сохраняются потребителем;commit_pointer
равно старому prepare_pointer
Значение, что его сохранить в локальной переменной.commit_pointer
знак равно commit_pointer
+ лен атомно.Чтобы предотвратить ложное совместное использование, len (сообщение) может быть округлено до размера строки кэша, а все дополнительное пространство может быть заполнено пробелами.
// pseudocode
void write(const char* message) {
int len = strlen(message); // TODO: round to cache line size
const char* old_prepare_ptr;
// Prepare step
while(1)
{
old_prepare_ptr = prepare_ptr;
if (
CAS(&prepare_ptr,
old_prepare_ptr,
prepare_ptr + len) == old_prepare_ptr
)
break;
// retry if another thread perform prepare op.
}
// Write message
memcpy((void*)old_prepare_ptr, (void*)message, len);
// Commit step
while(1)
{
const char* old_commit_ptr = commit_ptr;
if (
CAS(&commit_ptr,
old_commit_ptr,
old_commit_ptr + len) == old_commit_ptr
)
break;
// retry if another thread commits
}
notify_worker_thread();
}
Вопрос, который у меня возникает, состоит в том, как заставить рабочий поток работать, как только есть работа, и спать, когда нет работы.
Есть boost::lockfree::spsc_queue
— очередь без единого производителя без ожидания. Он может быть настроен на время компиляции (размер внутреннего кольцевого буфера).
Из того, что я понимаю, вы хотите что-то похожее на следующую конфигурацию:
template<typename T, size_t N>
class concurrent_queue
{
// T can be wrapped into struct with padding in order to avoid false sharing
mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
mutable mutex m;
mutable condition_variable c;
void wait() const
{
unique_lock<mutex> u(m);
c.wait_for(u, chrono::microseconds(1)); // Or whatever period you need.
// Timeout is required, because modification happens not under mutex
// and notification can be lost.
// Another option is just to use sleep/yield, without notifications.
}
void notify() const
{
c.notify_one();
}
public:
void push(const T &t)
{
while(!q.push(t))
wait();
notify();
}
void pop(T &result)
{
while(!q.pop(result))
wait();
notify();
}
};
Когда есть элементы в очереди — pop
не блокирует И когда есть достаточно места во внутреннем буфере — push
не блокирует
Я хочу максимально сократить время форматирования и записи, поэтому планирую сократить и то, и другое.
Проверять, выписываться Херб Саттер говорить в C ++ и после 2012 года: C ++ параллелизм. В страница 14 он показывает пример concurrent<T>
, В основном это обертка вокруг объекта типа T
который запускает отдельный поток для выполнения всех операций с этим объектом. Использование это:
concurrent<ostream*> x(&cout); // starts thread internally
// ...
// x acts as function object.
// It's function call operator accepts action
// which is performed on wrapped object in separate thread.
int i = 42;
x([i](ostream *out){ *out << "i=" << i; }); // passing lambda as action
Вы можете использовать подобный шаблон для разгрузки всей работы по форматированию в потребительский поток.
В противном случае новые буферы выделяются, и я хочу избежать выделения памяти после того, как поток буфера создан.
Выше concurrent_queue<T, Size>
В примере используется буфер фиксированного размера, который полностью содержится в очереди и не предполагает дополнительных выделений.
Тем не менее, Херб concurrent<T>
пример использования std::function
передать действие в рабочий поток. Это может повлечь за собой дорогостоящее распределение.
std::function
Реализации могут использовать Оптимизацию Малых Объектов (как и большинство реализаций) — небольшие функциональные объекты создаются на месте во внутреннем буфере, но это не гарантируется, а для функциональных объектов, превышающих пороговое значение, может произойти выделение кучи.
Есть несколько вариантов, чтобы избежать этого распределения:
Воплощать в жизнь std::function
аналог с внутренним буфером, достаточно большим для хранения целевых объектов функций (например, вы можете попытаться изменить boost::function
или же этот версия).
Используйте свой собственный объект функции, который будет представлять все типы сообщений журнала. В основном он будет содержать только значения, необходимые для форматирования сообщения. Поскольку потенциально существуют различные типы сообщений, рассмотрите возможность использования boost::variant
(который является литературным объединение в сочетании с тегом типа) представлять их.
Собирая все вместе, вот подтверждение концепции (используя второй вариант):
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/variant.hpp>
#include <condition_variable>
#include <iostream>
#include <cstddef>
#include <thread>
#include <chrono>
#include <mutex>
using namespace std;
/*********************************************/
template<typename T, size_t N>
class concurrent_queue
{
mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
mutable mutex m;
mutable condition_variable c;
void wait() const
{
unique_lock<mutex> u(m);
c.wait_for(u, chrono::microseconds(1));
}
void notify() const
{
c.notify_one();
}
public:
void push(const T &t)
{
while(!q.push(t))
wait();
notify();
}
void pop(T &result)
{
while(!q.pop(result))
wait();
notify();
}
};
/*********************************************/
template<typename T, typename F>
class concurrent
{
typedef boost::optional<F> Job;
mutable concurrent_queue<Job, 16> q; // use custom size
mutable T x;
thread worker;
public:
concurrent(T x)
: x{x}, worker{[this]
{
Job j;
while(true)
{
q.pop(j);
if(!j) break;
(*j)(this->x); // you may need to handle exceptions in some way
}
}}
{}
void operator()(const F &f)
{
q.push(Job{f});
}
~concurrent()
{
q.push(Job{});
worker.join();
}
};
/*********************************************/
struct LogEntry
{
struct Formatter
{
typedef void result_type;
ostream *out;
void operator()(double x) const
{
*out << "floating point: " << x << endl;
}
void operator()(int x) const
{
*out << "integer: " << x << endl;
}
};
boost::variant<int, double> data;
void operator()(ostream *out)
{
boost::apply_visitor(Formatter{out}, data);
}
};
/*********************************************/
int main()
{
concurrent<ostream*, LogEntry> log{&cout};
for(int i=0; i!=1024; ++i)
{
log({i});
log({i/10.});
}
}