Как я могу предотвратить неопределенное поведение и проблему ABA в этой функции стека без блокировки?

В настоящее время я работаю над свободным от блокировки односвязным списком в C ++ 11, и у меня возникла проблема с моим popFront() функция — или я должен, по крайней мере, сказать, что я знаю, что у него будут проблемы в определенных случаях.

В любом случае, это то, что у меня сейчас есть:

std::shared_ptr<T> popFront(void)
{
auto p = atomic_load(&head);
while(p && !atomic_compare_exchange_weak(&head, &p, p->next))
{}
return p ? p->data : std::shared_ptr<T>();
}

Обратите внимание, что head имеет тип shared_ptr,

Тем не менее, я ожидаю несколько вопросов. Первая — это ситуация, в которой выполняются два потока. popFront()они оба читают одно и то же headи один поток заканчивается первым. Перед завершением второго потока вызывающая сторона удаляет объект, на который было указано, поэтому второй поток теперь работает с удаленной памятью. Второй вопрос — классическая проблема ABA.

Идея этого связанного списка состоит в том, чтобы он был свободным от блокировки, поэтому я хочу избежать блокировки этой функции. К сожалению, я не уверен, как решить эти проблемы. Мы ценим любые предложения.

2

Решение

Существует множество решений для создания очереди без блокировки без проблем ABA.

это статья должен предоставить пару идей и некоторые общие инструменты для решения этой проблемы можно найти Вот.

Теперь о описанных вами проблемах:

Перед завершением второго потока вызывающая сторона удаляет объект, на который был указан объект, поэтому второй поток теперь работает с
удаленная память

Да, это может произойти, и решение для этого заключается в использовании помеченные указатели: на 32 битных архитектурах последние 2 (или больше) биты не используются, поэтому их можно использовать для тегирования, а на 64-битных архитектурах у нас есть как минимум 3 неиспользованных бита.

Таким образом, мы можем установить как логически удаленный указатель, но не физически удалить его, установив некоторые неиспользуемые биты указателя, например так:

__inline struct node* setTag(struct node* p, unsigned long TAG)
{
return (struct node*) ((uintptr_t)p | TAG);
}

__inline bool isTagged(struct node* p, unsigned long TAG)
{
return (uintptr_t)p == (uintptr_t)p &  ~TAG;
}

__inline struct node* getUntaggedAddress(struct node* p, unsigned long TAG)
{
return (struct node*)((uintptr_t)p &  ~TAG);
}

где TAG — до 4 (для 32-разрядных архитектур) и до 8 на 64-разрядных архитектурах (2/3 или более неиспользуемых бит в зависимости от архитектуры компьютера и выравнивания слов).

Теперь, делая CAS, мы игнорируем теговые указатели =>, таким образом, работая только с действительными указателями.

При выполнении очереди в очереди мы можем выполнить следующее:

int dequeue(qroot* root)
{
qnode* oldHead;

do
{
oldHead = root->head;

if (isTagged(root->head)) //disregard tagged addresses
return NULL;

oldHead = getUntaggedAddress(root->head); //we do a CAS only if the old head was unchanged

} while (root->head.compare_exchange_strong(oldHead, oldHead->next, std::memory_order_seq_cst));

return &(oldHead->data);
}

дано

typedef struct qnode
{
std::atomic<qnode*> next;
int data;
}qnode;

typedef struct qroot
{
std::atomic<qnode*> head; //Dequeue and peek will be performed from head
std::atomic<qnode*> tail; //Enqueue will be performed to tail
}qroot;
0

Другие решения

Одна вещь, которая помогает сделать потоки намного проще, это не освобождение памяти. Если вы работаете с несколькими связанными узлами списка, вы можете рассмотреть их пул. Вместо освобождения узла вы возвращаете его в пул. Это решает часть вашей проблемы.

АБА это просто. Вам нужно увеличивать счетчик каждый раз, когда вы меняете указатель головы. Вам нужно атомарно написать этот счетчик с указателем в то же время. Если используется 32-битная адресация, используйте 64-битное сравнение и своп (CAS) и сохраните счетчик в дополнительных 32 битах. Если используется 64-битная адресация, избегайте 128-битного сравнения и свопинга, потому что это может быть медленным (на наших ксеноновых чипах все до 64-битное быстро) Поскольку Windows и Linux не поддерживают полную 64-битную адресацию, вы можете использовать некоторые из 64-битных для ABA. Я использую союзы, чтобы сделать это для 32- и 64-битных режимов адресации.

Вам не нужно считать каждое изменение. Вам просто нужно ловить каждое изменение. Даже при попытке вызвать ABA с большим количеством потоков, изменяющих голову как можно быстрее, это все равно случится редко. В реальной жизни это редко, редко, редко случается. То есть Вам не нужно считать очень высоко. Я обычно использую 4 бита и позволяю ему катиться. Я мог бы попасть в неприятности за это. Используйте больше, если хотите.

В этом примере я использовал 64 бита и использовал CAS () для сравнения и замены, поэтому вам придется заменить все, что ваш компилятор использует для CAS:

typedef unsigned __int64_t U8;

struct TNode {
TNode* m_pNext;
};

template<class T>
union THead {
struct {
U8  m_nABA : 4,
m_pNode:60;  // Windows only supports 44 bits addressing anyway.
};
U8  m_n64; // for CAS
// this constructor will make an atomic copy on intel
THead(THead& r)         { m_n64 = r.m_n64; }
T* Node()               { return (T*)m_pNode; }
// changeing Node bumps aba
void Node(T* p)         { m_nABA++; m_pNode = (U8)p; return this; }
};

// pop pNode from head of list.
template<class T>
T* Pop(volatile THead<T>& Head) {
while (1) { // race loop
// Get an atomic copy of head and call it old.
THead<T> Old(Head);
if (!Old.Node())
return NULL;
// Copy old and call it new.
THead<T> New(Old);
// change New's Node, which bumps internal aba
New.Node(Old.Node()->m_pNext);
// compare and swap New with Head if it still matches Old.
if (CAS(&Head.m_n64, Old.m_n64, New.m_n64))
return Old.Node(); // success
// race, try again
}
}

// push pNode onto head of list.
template<class T>
void Push(volatile THead<T>& Head, T* pNode) {
while (1) { // race loop
// Get an atomic copy of head and call it old.
// Copy old and call it new.
THead<T> Old(Head), New(Old);
// Wire node t Head
pNode->m_pNext = New.Node();
// change New's head ptr, which bumps internal aba
New.Node(pNode);
// compare and swap New with Head if it still matches Old.
if (CAS(&Head.m_n64, Old.m_n64, New.m_n64))
break; // success
// race, try again
}
}
0

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector