Я реализую шаблон разрушителя для межпотокового взаимодействия в C ++ для нескольких производителей. В реализации от LMAX next(n)
метод в MultiProducerSequencer.java
использует целые числа со знаком (хорошо, это Java), но порт C ++ (disruptor—) использует целые числа со знаком. После очень (очень) длительного времени переполнение приведет к неопределенному поведению.
Целые числа без знака имеют несколько преимуществ:
Вот мой подход к требуя n слотов (источник прилагается в конце): next_
индекс следующего свободного слота, который может быть востребован, tail_
последний свободный слот, который может быть востребован (будет обновлен где-то еще). n
меньше размера буфера. Мой подход заключается в нормализации следующего и хвостового положения для промежуточных вычислений путем вычитания хвоста из следующего. Добавление n
нормализовать следующий norm
должен быть меньше размера буфера, чтобы успешно запросить интервалы между next_
а также next_+n
, Предполагается, что norm + n
не переполнится.
1) это правильно или делает next_
пройти tail_
в некоторых случаях? Это работает с меньшими целочисленными типами как uint32_t
или же uint16_t
если размер буфера и n
ограничены, например, до 1/10 * максимальное целое число этих типов.
2) Если это не правильно, то я хотел бы знать конкретный случай.
3) Что-то еще не так или что можно улучшить? (Я опустил кешлайн)
class msg_ctrl
{
public:
inline msg_ctrl();
inline int claim(size_t n, uint64_t& seq);
inline int publish(size_t n, uint64_t seq);
inline int tail(uint64_t t);
public:
std::atomic<uint64_t> next_;
std::atomic<uint64_t> head_;
std::atomic<uint64_t> tail_;
};
// Implementation -----------------------------------------
msg_ctrl::msg_ctrl() : next_(2), head_(1), tail_(0)
{}
int msg_ctrl::claim(size_t n, uint64_t& seq)
{
uint64_t const size = msg_buffer::size();
if (n > 1024) // please do not try to reserve too much slots
return -1;
uint64_t curr = 0;
do
{
curr = next_.load();
uint64_t tail = tail_.load();
uint64_t norm = curr - tail;
uint64_t next = norm + n;
if (next > size)
std::this_thread::yield(); // todo: some wait strategy
else if (next_.compare_exchange_weak(curr, curr + n))
break;
} while (true);
seq = curr;
return 0;
}
int msg_ctrl::publish(size_t n, uint64_t seq)
{
uint64_t tmp = seq-1;
uint64_t val = seq+n-1;
while (!head_.compare_exchange_weak(tmp, val))
{
tmp = seq-1;
std::this_thread::yield();
}
return 0;
}
int msg_ctrl::tail(uint64_t t)
{
tail_.store(t);
return 0;
}
Публикация в кольцевом буфере будет выглядеть так:
size_t n = 15;
uint64_t seq = 0;
msg_ctrl->claim(n, seq);
//fill items in buffer
buffer[seq + 0] = an item
buffer[seq + 1] = an item
...
buffer[seq + n-1] = an item
msg_ctrl->publish(n, seq);
Задача ещё не решена.
Других решений пока нет …