C ++ без блокировки стек поврежден

я пытаюсь реализовать стек без блокировки, чтобы его можно было использовать с внешней управляемой памятью из ограниченного простого массива c. Я знаю справочные реализации (например, от Энтони Уильямса: параллелизм в действии) и другие книги и блоги / статьи в Интернете.

Реализация следует этим ссылкам и позволяет избежать проблемы ABA, поскольку адреса внешней памяти адресуются с использованием уникальных индексов, а не переработанных указателей. Поэтому он вообще не должен иметь дело с управлением памятью и прост.

Я написал несколько тестов, которые выполняют операции pop и push для этого стека при высокой нагрузке и конкуренции (стресс-тесты) и однопоточные. Первые терпят неудачу со странными проблемами, которые я не понимаю и для меня выглядит неясным.

Может быть, у кого-то есть идея?

  1. Проблема: не удается отправить уже извлеченный узел обратно в стек, поскольку нарушается предварительное условие, что у узла нет преемника (следующий).

    BOOST_ASSERT(!m_aData.m_aNodes[nNode-1].next);
    

    Настройка воспроизведения: минимум 3 потока и емкость ~ 16. Около 500 проходов. Тогда нажать оп не получается.

  2. Проблема: количество элементов, вытолкнутых всеми потоками, и количество элементов, оставшихся в стеке после объединения, не соответствуют емкости (узлы теряются при переходе).

    BOOST_ASSERT(aNodes.size()+nPopped == nCapacity);
    

    Настройка воспроизведения: 2 потока и емкость 2. Требуется много проходов, для меня не менее 700. После этого заголовок стека равен 0, но в вытесненном контейнере присутствует только один узел. Узел {2,0} болтается.

Я скомпилирован с vs2005, vs2013 и vs2015. У всех одна и та же проблема (vs2005 также является причиной того, что код выглядит как C ++ 03).

Вот основной код для узла + стека

template <typename sizeT> struct node
{
sizeT         cur;  //!< construction invariant
atomic<sizeT> next;
atomic<sizeT> data;

explicit node() // invalid node
: cur(0), next(0), data(0)
{}

explicit node(sizeT const& nCur, sizeT const& nNext, sizeT const& nData)
: cur(nCur), next(nNext), data(nData)
{}

node& operator=(node const& rhs)
{
cur  = rhs.cur;
next.store(rhs.next.load(memory_order_relaxed));
data.store(rhs.data.load(memory_order_relaxed));
return *this;
}
};

template <typename sizeT> struct stack
{
private:
static memory_order const relaxed = memory_order_relaxed;
atomic<sizeT> m_aHead;

public:
explicit stack(sizeT const& nHead) : m_aHead(nHead) {}

template <typename tagT, typename T, std::size_t N>
typename enable_if<is_same<tagT,Synchronized>,sizeT>::type
pop(T (&aNodes)[N])
{
sizeT nOldHead = m_aHead.load();

for(;;)
{
if(!nOldHead) return 0;

BOOST_ASSERT(nOldHead <= N);
T& aOldHead = aNodes[nOldHead-1];
sizeT const nNewHead = aOldHead.next.load(/*relaxed*/);
BOOST_ASSERT(nNewHead <= N);
sizeT const nExpected = nOldHead;

if(m_aHead.compare_exchange_weak(nOldHead,nNewHead
/*,std::memory_order_acquire,std::memory_order_relaxed*/))
{
BOOST_ASSERT(nExpected == nOldHead);

// <--- from here on aOldHead is thread local ---> //
aOldHead.next.store(0 /*,relaxed*/);

return nOldHead;
}

// TODO: add back-off strategy under contention (use loop var)
}
}

template <typename tagT, typename T, std::size_t N>
typename enable_if<is_same<tagT,Synchronized>,void>::type
push(T (&aNodes)[N], sizeT const& nNewHead)
{
#ifndef NDEBUG
{
BOOST_ASSERT(0 < nNewHead && nNewHead <= N);
sizeT const nNext = aNodes[nNewHead-1].next;
BOOST_ASSERT(!nNext);
}
#endif

sizeT nOldHead = m_aHead.load(/*relaxed*/);

for(;;)
{
aNodes[nNewHead-1].next.store(nOldHead /*,relaxed*/);
sizeT const nExpected = nOldHead;
BOOST_ASSERT(nOldHead <= N);

if(m_aHead.compare_exchange_weak(nOldHead,nNewHead
/*,std::memory_order_release,std::memory_order_relaxed*/))
{
BOOST_ASSERT(nExpected == nOldHead);
return;
}

// TODO: add back-off strategy under contention (use loop var)
}
}
};

и довольно шумный тестовый класс

class StackTest
{
private:

typedef boost::mpl::size_t<64> Capacity;
//typedef boost::uint_t<static_log2_ceil<Capacity::value>::value>::least    size_type;
typedef std::size_t size_type;

static size_type const nCapacity = Capacity::value;
static size_type const nNodes = Capacity::value;

typedef node<size_type>  Node;
typedef stack<size_type> Stack;

typedef mt19937                                        Twister;
typedef random::uniform_int_distribution<std::size_t>  Distribution;
typedef variate_generator<Twister,Distribution>        Die;

struct Data //!< shared along threads
{
Node  m_aNodes[nNodes];
Stack m_aStack;

explicit Data() : m_aStack(nNodes)
{
m_aNodes[0] = Node(1,0,0); // tail of stack

for(size_type i=1; i<nNodes; ++i)
{
m_aNodes[i] = Node(static_cast<size_type>(i+1),i,0);
}
}

template <typename syncT>
void Run(
uuids::random_generator& aUUIDGen,
std::size_t const&       nPasses,
std::size_t const&       nThreads)
{
std::vector<ThreadLocalData>  aThreadLocalDatas(nThreads,ThreadLocalData(*this));

{
static std::size_t const N = 100000;
Die aRepetition(Twister(hash_value(aUUIDGen())),Distribution(0,N));
Die aAction(Twister(hash_value(aUUIDGen())),Distribution(0,1));

for(std::size_t i=0; i<nThreads; ++i)
{
std::vector<bool>& aActions = aThreadLocalDatas[i].m_aActions;
std::size_t const nRepetition = aRepetition();
aActions.reserve(nRepetition);

for(std::size_t k=0; k<nRepetition; ++k)
{
aActions.push_back(static_cast<bool>(aAction()));
}
}
}

std::size_t nPopped = 0;

if(nThreads == 1)
{
std::size_t const i = 0;
aThreadLocalDatas[i].Run<syncT>(i);
nPopped += aThreadLocalDatas[i].m_aPopped.size();
}
else
{
std::vector<boost::shared_ptr<thread> > aThreads;
aThreads.reserve(nThreads);

for(std::size_t i=0; i<nThreads; ++i)
{
aThreads.push_back(boost::make_shared<thread>(boost::bind(&ThreadLocalData::Run<syncT>,&aThreadLocalDatas[i],i)));
}

for(std::size_t i=0; i<nThreads; ++i)
{
aThreads[i]->join();
nPopped += aThreadLocalDatas[i].m_aPopped.size();
}
}

std::vector<size_type> aNodes;
aNodes.reserve(nCapacity);

while(size_type const nNode = m_aStack.pop<syncT>(m_aNodes))
{
aNodes.push_back(nNode);
}

std::clog << dump(m_aNodes,4) << std::endl;

BOOST_ASSERT(aNodes.size()+nPopped == nCapacity);
}
};struct ThreadLocalData //!< local to each thread
{
Data&                  m_aData;    //!< shared along threads
std::vector<bool>      m_aActions; //!< either pop or push
std::vector<size_type> m_aPopped;   //!< popp'ed nodes

explicit ThreadLocalData(Data& aData)
: m_aData(aData), m_aActions(), m_aPopped()
{
m_aPopped.reserve(nNodes);
}

template <typename syncT>
void Run(std::size_t const& k)
{
BOOST_FOREACH(bool const& aAction, m_aActions)
{
if(aAction)
{
if(size_type const nNode = m_aData.m_aStack.pop<syncT>(m_aData.m_aNodes))
{
BOOST_ASSERT(!m_aData.m_aNodes[nNode-1].next);
m_aPopped.push_back(nNode);
}
}
else
{
if(!m_aPopped.empty())
{
size_type const nNode = m_aPopped.back();
size_type const nNext = m_aData.m_aNodes[nNode-1].next;
ASSERT_IF(!nNext,"nNext=" << nNext << " for " << m_aData.m_aNodes[nNode-1] << "\n\n" << dump(m_aData.m_aNodes));
m_aData.m_aStack.push<syncT>(m_aData.m_aNodes,nNode);
m_aPopped.pop_back();
}
}
}
}
};template <typename syncT>
static void PushPop(
uuids::random_generator& aUUIDGen,
std::size_t const&       nPasses,
std::size_t const&       nThreads)
{
BOOST_ASSERT(nThreads > 0);
BOOST_ASSERT(nThreads == 1 || (is_same<syncT,Synchronized>::value));

std::clog << BOOST_CURRENT_FUNCTION << " with threads=" << nThreads << std::endl;

for(std::size_t nPass=0; nPass<nPasses; ++nPass)
{
std::ostringstream s;
s << "  " << nPass << "/" << nPasses << ": ...";
std::clog << s.str() << std::endl;

Data().Run<syncT>(aUUIDGen,nPass,nThreads);
}
}

public:

static void Run()
{
typedef StackTest self_t;

uuids::random_generator aUUIDGen;

static std::size_t const nMaxPasses = 1000;
Die aPasses(Twister(hash_value(aUUIDGen())),Distribution(0,nMaxPasses));

{
//std::size_t const nThreads = 2; // thread::hardware_concurrency()+1;
std::size_t const nThreads = thread::hardware_concurrency()+1;
self_t().PushPop<Synchronized>(aUUIDGen,aPasses(),nThreads);
}
}
};

Вот ссылка на сайт скачать все необходимые файлы.

1

Решение

Обе проблемы являются еще одним аспектом проблемы ABA.

стек: {2,1}, {1,0}

  1. Нить А
    1. поп
      new_head = 1
      … превышен интервал времени
  2. Нить Б
    1. поп
      стек: {1,0}, вытолкнутый: {2,0}
    2. поп
      стек: {}, выбрано: {2,0}, {1,0}
    3. толчок ({2,0})
      стек: {2,0}
  3. Нить А
    1. поп продолжил
      cmp_exch успешно, потому что голова 2
      stack: {}, head = 1 — НЕПРАВИЛЬНО, 0 будет правильным

Могут возникнуть любые проблемы, потому что доступ к узлам больше не является локальным для потока. Это включает в себя неожиданные модификации следующего для извлеченных узлов (проблема 1) или потерянных узлов (проблема 2).

head + next необходимо изменить в одном cmp_exch, чтобы избежать этой проблемы.

0

Другие решения

Других решений пока нет …

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector