Я должен прочитать некоторые данные (которые поступают с невероятной скоростью — до 5000 сообщений в секунду) из многоадресного (UDP) потока. Поскольку поток является многоадресным (и данные весьма критичны), поставщик данных предоставил два потока, которые отправляют идентичные данные (их логика заключается в том, что вероятность отбрасывания одного и того же пакета в обоих потоках очень близка к нулю). Все пакеты данных помечены порядковым номером для отслеживания.
Кроме того, приложение настолько критично ко времени, что я вынужден слушать оба потока параллельно и выбирать следующий порядковый номер из того потока многоадресной рассылки, который был получен первым — когда тот же пакет поступает в зеркальный поток, я просто отбрасываю его ,
Я планирую реализовать эту функцию удаления, используя общую переменную sequence_number между двумя функциями, которые, кстати, работают в разных потоках. Порядковый номер atomic
как это будет читаться и обновляться из двух разных потоков.
Очевидный алгоритм, который приходит на ум,
if (sequence number received from the stream > sequence_number)
{
process packet;
sequence_number = sequence number received from the stream;
}
(Приведенный выше алгоритм необходимо модифицировать для случаев, когда порядковые номера выходят из строя — и они могут, поскольку это поток UDP, — но давайте пока забудем об этом)
У меня вопрос такой:
С того времени я std::load
мой sequence_number
проверьте, меньше ли он порядкового номера, который я получил из потока, примите пакет и в конце концов std::store
новый порядковый номер sequence_number
; если другой поток принимает тот же пакет (с тем же порядковым номером) и выполняет те же операции (до завершения первого потока) std::store
по этому порядковому номеру), я по сути получу один и тот же пакет дважды в моей системе. Как можно преодолеть эту ситуацию?
Не откладывайте на потом беспокойство по поводу обработки неупорядоченных пакетов, потому что решение, которое также обеспечивает наиболее элегантное решение для синхронизации потоков.
Элементы массива являются уникальными ячейками памяти для гонок данных. Если вы поместите каждый пакет (атомарно с помощью записи указателя) в другой элемент массива в соответствии с его порядковым номером, вы избавитесь от большей части разногласий. Также используйте сравнительный обмен, чтобы определить, видел ли другой поток (другой поток) этот пакет.
Обратите внимание, что у вас не будет повторного цикла, обычно связанного с Compare-Exchange, либо у вас есть первая копия пакета, и сравнительный обмен успешно выполнен, либо пакет уже существует, и ваша копия может быть отброшена. Так что этот подход не только без блокировки, но и без ожидания 🙂
Вот один из вариантов, если вы используете std::atomic
значения, используя compare_exchange
.
Не показано, как инициализировать last_processed_seqnum
, так как вам нужно будет установить его на допустимое значение, а именно на единицу меньше, чем последовательность следующего пакета, чтобы прибыть.
Он должен быть адаптирован для случая, в котором есть разрывы порядкового номера. Вы упоминаете как часть своего предположения, что не будет опущенных последовательностей; но приведенный ниже пример прекратит обработку пакетов (то есть катастрофически завершится) при любых пробелах в последовательности.
std::atomic<int> last_processed_seqnum;
// sync last_processed_seqnum to first message(s).
int seqnum_from_stream = ...;
int putative_last_processed_seqnum = seqnum_from_stream - 1;if (last_processed_seqnum.compare_exchange_strong(putative_last_processed_seqnum,
seqnum_from_stream))
{
// sequence number has been updated in compare_exchange_strong
// process packet;
}
В идеале, то, что мы хотим, это compare_exchange
функция, которая использует больше чем не равно. Я не знаю ни одного способа добиться такого поведения за одну операцию. ТАК вопрос, на который я ссылался на ссылки ответ об итерации по всем значениям, меньшим, чем цель, для обновления.
Вы, вероятно, реализуете обработчик ценового потока, какой это обмен и какой протокол? Это ITCH или FIX Fast? Я бы не рекомендовал две темы для одного и того же канала, поскольку вам, вероятно, придется объединить несколько групп многоадресной рассылки для разных сегментов рынка / плат.