Я создавал высокопроизводительное серверное приложение для обмена мультимедийными сообщениями, язык реализации которого C ++. Каждый сервер может использоваться в автономном режиме или многие серверы могут быть объединены для создания оверлейной сети на основе DHT; серверы действуют как супер-пэры, как в случае Skype.
Работа в процессе. В настоящее время сервер может обрабатывать около 200 000 сообщений в секунду (256-байтовые сообщения) и имеет максимальную пропускную способность около 256 МБ / с на моей машине (Intel i3 Mobile 2 ГГц, Fedora Core 18 (64-разрядная), 4 ГБ ОЗУ) для сообщения длиной 4096 байт. На сервере есть два потока: один поток для обработки всех операций ввода-вывода (на основе epoll, инициируемый ребром), другой — для обработки входящих сообщений. Существует еще один поток для управления оверлеями, но он не имеет значения в текущем обсуждении.
Обсуждаемые два потока совместно используют данные, используя два кольцевых буфера. Поток № 1 ставит в очередь свежие сообщения для потока № 2, используя один циклический буфер, а поток № 2 возвращает обратно обработанные сообщения через другой циклический буфер. Сервер полностью заблокирован. Я не использую какие-либо примитивы синхронизации, даже атомарные операции.
Циклические буферы никогда не переполняются, потому что сообщения объединяются (предварительно распределяются при запуске). Фактически все жизненно важные / часто используемые структуры данных объединяются для уменьшения фрагментации памяти и повышения эффективности кэширования, поэтому мы знаем максимальное количество сообщений, которое мы когда-либо собираемся создать при запуске сервера, поэтому мы можем предварительно рассчитать максимальное размер буферов, а затем инициализировать циклические буферы соответственно.
Теперь мой вопрос: Поток № 1 ставит в очередь сериализованные сообщения по одному сообщению за раз (фактически это указатели на объекты сообщений), в то время как поток № 2 извлекает сообщения из очереди в виде фрагментов (фрагменты 32/64/128) и возвращает обработанные сообщения в куски через второй круговой буфер. Если новых сообщений нет, поток №2 продолжает ждать, следовательно, постоянно удерживая одно из ядер ЦП. Как я могу улучшить дизайн дальше? Каковы альтернативы стратегии ожидания ожидания? Я хочу сделать это элегантно и качественно. Я рассмотрел использование семафоры, но я боюсь, что это не лучшее решение по той простой причине, что мне приходится вызывать «sem_post» каждый раз, когда я помещаю сообщение в очередь в потоке # 1, что может значительно снизить пропускную способность, второй поток должен вызвать равное число «sem_post» раз, чтобы предотвратить переполнение семафора. Также я боюсь, что реализация семафора может использовать мьютекс внутри.
Вторым хорошим вариантом может быть использование сигнал если я смогу обнаружить алгоритм для поднятия сигнала только в том случае, если второй поток «опустошил очередь и находится в процессе вызова sigwait» или «уже ожидает sigwait», короче говоря, сигнал должен быть повышен минимальное число раз, хотя не повредит, если сигналы будут подняты в несколько раз больше, чем необходимо. Да, я использовал Поиск Google, но ни одно из найденных в Интернете решений не было удовлетворительным. Вот несколько соображений:
О. При выполнении системных вызовов сервер должен тратить минимум циклов ЦП, а системные вызовы должны использоваться минимальное количество раз.
B. Должны быть очень низкие накладные расходы, и алгоритм должен быть эффективным.
C. Никаких блокировок.
Я хочу, чтобы все варианты были представлены на столе.
Вот ссылка на сайт, где я поделился информацией о моем сервере, чтобы лучше понять функциональность и цель:
www.wanhive.com
Ожидание при занятости хорошо, если вам нужно как можно быстрее разбудить тему №2. Фактически это самый быстрый способ уведомить один процессор об изменениях, внесенных другим процессором. Вам нужно сгенерировать ограждения памяти на обоих концах (напишите забор с одной стороны и прочитайте забор с другой). Но это утверждение верно только в том случае, если оба ваших потока выполняются на выделенных процессорах. В этом случае переключение контекста не требуется, только кеширование трафика.
Есть некоторые улучшения, которые могут быть сделаны.
Распараллеливание шага обработки.
Есть два варианта.
Вам понадобится N циклических буферов и N потоков обработки и N выходных буферов и один потребительский поток в первом случае. Поток # 1 помещает в очередь сообщения в циклическом порядке в буферах этого цикла.
// Thread #1 pseudocode
auto message = recv()
auto buffer_index = atomic_increment(&message_counter);
buffer_index = buffer_index % N; // N is the number of threads
// buffers is an array of cyclic buffers - Buffer* buffers[N];
Buffer* current_buffer = buffers[buffer_index];
current_buffer->euqueue(message);
Каждый поток потребляет сообщения из одного из буферов и помещает результат в свой выделенный буфер вывода.
// Thread #i pseudocode
auto message = my_buffer->dequeue();
auto result = process(message);
my_output_buffer->enqueue(result);
Теперь вам нужно обработать все эти сообщения в порядке прибытия. Вы можете сделать это с выделенным потоком потребителя, удалив обработанные сообщения из выходных циклических буферов циклическим образом.
// Consumer thread pseudocode
// out_message_counter is equal to message_counter at start
auto out_buffer_index = atomic_increment(&out_message_counter);
out_buffer_index = out_buffer_index % N;
// out_buffers is array of output buffers that is used by processing
// threads
auto out_buffer = out_buffers[out_buffer_index];
auto result = out_buffer->dequeue();
send(result); // or whatever you need to do with result
Во втором случае, когда вам не нужно сохранять порядок сообщений — вам не нужны потребительский поток и выходные циклические буферы. Вы просто делаете все, что вам нужно сделать с результатом обработки потока.
N должно быть равно num CPU's
— 3 в первом случае («- 3» — один поток ввода-вывода + один потребительский поток + один поток DHT) и num CPU's
— 2 во втором случае («- 2» — один поток ввода-вывода + один поток DHT). Это связано с тем, что ожидание занятости не может быть эффективным, если у вас переподписка процессоров.
Похоже, вы хотите скоординировать продюсера и потребителя, связанных каким-то общим состоянием. По крайней мере, в Java для таких шаблонов одним из способов избежать ожидания ожидания является использование wait и notify. При таком подходе поток № 2 может перейти в заблокированное состояние, если обнаружит, что очередь пуста, вызвав функцию wait и избегая вращения ЦП. Как только поток # 1 помещает некоторые вещи в очередь, он может сделать уведомление. Быстрый поиск таких механизмов в C ++ дает следующее:
Вы можете заставить поток № 2 перейти в спящий режим на X миллисекунд, когда очередь пуста.
X можно определить по длине очередей, которые вы хотите + какой-нибудь защитный диапазон.
Кстати, в пользовательском режиме (ring3) вы не можете использовать инструкции MONITOR / MWAIT, которые идеально подойдут для вашего вопроса.
редактировать
Вы должны обязательно дать TBB RWlock попытка (есть бесплатная версия). Похоже, что вы ищете.
Edit2
Другой вариант — использовать условные переменные. Они включают мьютекс и состояние. В основном вы ждете при условии, чтобы стать «правдой». Можно найти материал низкого уровня Вот.