disruptor: получение слотов перед лицом целочисленного переполнения

Я реализую шаблон разрушителя для межпотокового взаимодействия в C ++ для нескольких производителей. В реализации от LMAX next(n) метод в MultiProducerSequencer.java использует целые числа со знаком (хорошо, это Java), но порт C ++ (disruptor—) использует целые числа со знаком. После очень (очень) длительного времени переполнение приведет к неопределенному поведению.

Целые числа без знака имеют несколько преимуществ:

  • правильное поведение при переполнении
  • нет необходимости в 64-битных целых

Вот мой подход к требуя n слотов (источник прилагается в конце): next_ индекс следующего свободного слота, который может быть востребован, tail_ последний свободный слот, который может быть востребован (будет обновлен где-то еще). n меньше размера буфера. Мой подход заключается в нормализации следующего и хвостового положения для промежуточных вычислений путем вычитания хвоста из следующего. Добавление n нормализовать следующий norm должен быть меньше размера буфера, чтобы успешно запросить интервалы между next_ а также next_+n, Предполагается, что norm + n не переполнится.

1) это правильно или делает next_ пройти tail_ в некоторых случаях? Это работает с меньшими целочисленными типами как uint32_t или же uint16_t если размер буфера и n ограничены, например, до 1/10 * максимальное целое число этих типов.
2) Если это не правильно, то я хотел бы знать конкретный случай.
3) Что-то еще не так или что можно улучшить? (Я опустил кешлайн)

class msg_ctrl
{
public:
inline msg_ctrl();

inline int claim(size_t n, uint64_t& seq);
inline int publish(size_t n, uint64_t seq);
inline int tail(uint64_t t);

public:
std::atomic<uint64_t> next_;
std::atomic<uint64_t> head_;
std::atomic<uint64_t> tail_;
};

// Implementation -----------------------------------------

msg_ctrl::msg_ctrl() : next_(2), head_(1), tail_(0)
{}

int msg_ctrl::claim(size_t n, uint64_t& seq)
{
uint64_t const size = msg_buffer::size();
if (n > 1024) // please do not try to reserve too much slots
return -1;

uint64_t curr = 0;
do
{
curr = next_.load();

uint64_t tail = tail_.load();
uint64_t norm = curr - tail;
uint64_t next = norm + n;

if (next > size)
std::this_thread::yield(); // todo: some wait strategy
else if (next_.compare_exchange_weak(curr, curr + n))
break;

} while (true);

seq = curr;
return 0;
}

int msg_ctrl::publish(size_t n, uint64_t seq)
{
uint64_t tmp = seq-1;
uint64_t val = seq+n-1;
while (!head_.compare_exchange_weak(tmp, val))
{
tmp = seq-1;
std::this_thread::yield();
}
return 0;
}

int msg_ctrl::tail(uint64_t t)
{
tail_.store(t);
return 0;
}

Публикация в кольцевом буфере будет выглядеть так:

size_t n = 15;
uint64_t seq = 0;
msg_ctrl->claim(n, seq);

//fill items in buffer
buffer[seq + 0] = an item
buffer[seq + 1] = an item
...
buffer[seq + n-1] = an item

msg_ctrl->publish(n, seq);

1

Решение

Задача ещё не решена.

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]