предотвращение столкновений при сжатии бесконечного бесконтактного буфера в циклический буфер

Я решаю два канала арбитражной проблемы БЫСТРО протокол.
Пожалуйста, не волнуйтесь, если вы не знакомы с этим, мой вопрос довольно общий. Но я добавляю описание проблемы для тех, кто заинтересован (вы можете пропустить это).


Данные во всех каналах UDP распространяются в двух идентичных каналах (A и B) на двух разных IP-адресах многоадресной рассылки. Настоятельно рекомендуется, чтобы клиент получал и обрабатывал оба канала из-за возможной потери пакетов UDP. Обработка двух одинаковых каналов позволяет статистически снизить вероятность потери пакетов.
Не указано, в каком конкретном фиде (A или B) сообщение появляется впервые. Для арбитража этих каналов следует использовать порядковый номер сообщения, найденный в преамбуле или в теге 34-MsgSeqNum. Использование Преамбулы позволяет определить порядковый номер сообщения без декодирования сообщения FAST.
Обработка сообщений из каналов A и B должна выполняться по следующему алгоритму:

  1. Слушайте каналы A и B
  2. Обрабатывать сообщения в соответствии с их порядковыми номерами.
  3. Игнорировать сообщение, если оно уже было обработано с таким же порядковым номером.
  4. Если появляется разрыв в порядковом номере, это указывает на потерю пакетов в обоих каналах (A и B). Клиент должен инициировать один из процессов восстановления. Но прежде всего клиент должен подождать разумное время, возможно, потерянный пакет придет чуть позже из-за переупорядочения пакета. Протокол UDP не может гарантировать доставку пакетов в последовательности.

    // алгоритм восстановления TCP


Я написал такой очень простой класс. Он предварительно выделяет все необходимые классы, а затем первый поток, который получает определенные seqNum может обработать это. Другая нить добавит это позже:

class MsgQueue
{
public:
MsgQueue();
~MsgQueue(void);
bool Lock(uint32_t msgSeqNum);
Msg& Get(uint32_t msgSeqNum);
void Commit(uint32_t msgSeqNum);
private:
void Process();
static const int QUEUE_LENGTH = 1000000;

// 0 - available for use; 1 - processing; 2 - ready
std::atomic<uint16_t> status[QUEUE_LENGTH];
Msg updates[QUEUE_LENGTH];
};

Реализация:

MsgQueue::MsgQueue()
{
memset(status, 0, sizeof(status));
}

MsgQueue::~MsgQueue(void)
{
}

// For the same msgSeqNum should return true to only one thread
bool MsgQueue::Lock(uint32_t msgSeqNum)
{
uint16_t expected = 0;
return status[msgSeqNum].compare_exchange_strong(expected, 1);
}

void MsgQueue::Commit(uint32_t msgSeqNum)
{
status[msgSeqNum] = 2;
Process();
}

// this method probably should be combined with "Lock" but please ignore! :)
Msg& MsgQueue::Get(uint32_t msgSeqNum)
{
return updates[msgSeqNum];
}

void MsgQueue::Process()
{
// ready packets must be processed,
}

Использование:

if (!msgQueue.Lock(seq)) {
return;
}
Msg msg = msgQueue.Get(seq);
msg.Ticker = "HP"msg.Bid = 100;
msg.Offer = 101;
msgQueue.Commit(seq);

Это прекрасно работает, если мы предположим, что QUEUE_LENGTH бесконечность. Потому что в этом случае один msgSeqNum = один updates элемент массива.

Но я должен сделать буфер круглым, потому что невозможно хранить всю историю (много миллионов пакетов), и нет никаких причин для этого. На самом деле мне нужно буферизовать достаточно пакетов, чтобы восстановить сеанс, и как только сеанс восстановлен, я могу их отбросить.

Но наличие кругового буфера существенно усложняет алгоритм. Например, предположим, что у нас есть кольцевой буфер длиной 1000. И в то же время мы пытаемся обработать seqNum = 10 000 и seqNum = 11 000 (это ОЧЕНЬ маловероятно, но все же возможно). Оба эти пакета будут сопоставлены с массивом updates по указателю 0 и так происходит столкновение. В этом случае буфер должен «отбрасывать» старые пакеты и обрабатывать новые пакеты.

Тривиально реализовать то, что я хочу, используя locks но писать lock-free код кругового буфера, используемый из разных потоков, действительно сложен. Поэтому я приветствую любые предложения и советы, как это сделать. Спасибо!

1

Решение

Я не верю, что вы можете использовать кольцевой буфер. хешированный индекс может быть использован в status[] массив. То есть, hash = seq % 1000, Проблема в том, что порядковый номер диктуется сетью, и вы не можете контролировать ее порядок. Вы хотите замок на основе этого порядкового номера. Ваш массив не должен быть бесконечный, только диапазон порядковый номер; но это, вероятно, больше, чем практично.

Я не уверен, что происходит, когда порядковый номер заблокирован Означает ли это, что другой поток обрабатывает его? Если это так, вы должны поддерживать подсписок для коллизий хешей, чтобы решить конкретный порядковый номер.

Вы также можете рассматривать размер массива как степень 2. Например, 1024 позволит hash = seq & 1023; который должен быть довольно эффективным.

0

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]