простой потокобезопасный и быстрая реализация пула памяти?

После переосмысления дизайна и некоторого вклада Пэдди я придумал что-то вроде этого, но меня интересует правильность этого, кажется, что хорошо, когда я его запускаю …
Идея состоит в том, что предварительно выделенные объекты наследуются от следующего:

struct Node
{
void* pool;
};

Таким образом, мы вводим в каждый выделенный объект указатель на его пул для последующего освобождения. Тогда мы имеем:

template<class T, int thesize>
struct MemPool
{
T* getNext();
void free(T* ptr);

struct ThreadLocalMemPool
{
T* getNextTL();
void freeTL();

int size;
vector<T*> buffer;
vector<int> freeList;
int freeListIdx;
int bufferIdx;
ThreadLocalMemPool* nextTlPool; //within a thread's context a linked list
};

int size;
threadlocal ThreadLocalMemPool* tlPool; //one of these per thread
};

Так что в основном я говорю MemPool<Cat, 100> и это дает мне mempool, который для каждого потока, который getNexts это, будет создавать экземпляр локального mempool. Размеры округляются внутри до ближайшей степени двойки для простоты по модулю (которую для простоты я опущу). Так как getNext() является локальным для каждого потока, он не требует блокировки, и я пытаюсь использовать атомику для освобождающей части следующим образом:

T* ThreadLocalMemPool::getNextTL()
{
int iHead = ++bufferIdx % size;
int iTail = freeListIdx % size;

if (iHead != iTail)  // If head reaches tail, the free list is empty.
{
int & idx = freeList[iHead];
while (idx == DIRTY) {}
return buffer[idx];
}
else
{
bufferIdx--; //we will recheck next time
if (nextTLPool)
return nextTLPool->getNextTL();
else
//set nextTLPool to a new ThreadLocalMemPool and return getNextTL() from it..
}
}

void ThreadLocalMemPool::free(T* ptr)
{
//the outer struct handles calling this in the right ThreadLocalMemPool

//we compute the index in the pool from which this pool came from by subtracting from
//its address the address of the first pointer in this guys buffer
int idx = computeAsInComment(ptr);

int oldListIdx = atomic_increment_returns_old_value(freeListIdx);
freeList[oldListIdx % size] = idx;
}

Теперь идея заключается в freeListIdx всегда будет следовать за bufferIdx в бассейне, потому что вы не можете (я предполагаю правильное использование)
бесплатно больше, чем вы выделили. Призывы к бесплатной синхронизации порядка, в котором они возвращают индексы буфера в свободный список
и getNext поймет это, когда вернется назад. Я думал об этом немного и не вижу ничего семантически неправильного
с логикой, кажется ли это звучит или есть что-то тонкое, что может сломать это?

0

Решение

Потокобезопасная проблема требует блокировки. Если вы хотите уменьшить это, вам нужно ограничение, что только один поток когда-либо использует пул. Вы можете расширить это до двух потоков, если вы используете циклический свободный список, который я опишу ниже, при условии, что один поток отвечает за распределение, а другой — за освобождение.

Что касается использования вектора без какого-либо другого управления, это плохая идея … Как только вы начинаете фрагментироваться, ваши ассигнования получают удар.

Хороший способ реализовать это — просто выделить большой блок T. Затем сделать круговую очередь достаточно большой, чтобы указывать на каждый из этих блоков. Это ваш «свободный список». Вы можете просто использовать индексы. Если вы ограничите каждый пул 65536 элементами, вы можете выбрать unsigned short для экономии места (на самом деле, это 65535, чтобы обеспечить эффективное циклическое управление очередью)

Используя циклическую очередь, вы разрешаете постоянное распределение и освобождение независимо от фрагментации. Вы также знаете, когда ваш пул заполнен (т. Е. Свободный список пуст), и вы можете создать другой пул. Очевидно, что когда вы создаете пул, вам нужно заполнить свободный список.

Итак, ваш класс будет выглядеть примерно так:

template<class T, size_t initSize>
class MemPool
{
vector<T> poolBuffer;              // The memory pool
vector<unsigned short> freeList;   // Ring-buffer (indices of free items)
unsigned short nHead, nTail;       // Ring-buffer management
int nCount;                        // Number of elements in ring-buffer
MemPool<T,initSize> *nextPool;     // For expanding memory pool

// etc...
};

Теперь для блокировки. Если у вас есть доступ к атомарным инструкциям приращения и убывания, и вы достаточно осторожны, вы можете поддерживать свободный список с безопасностью потоков. Единственная необходимая блокировка в стиле мьютекса — это когда вам нужно выделить новый пул памяти.

Я изменил свое первоначальное мышление. Вам нужны две атомарные операции, и вам нужно зарезервированное значение индекса (0xffff) для включения неатомарных операций в очереди:

// I'll have the functions atomic_incr() and atomic_decr().  The assumption here
// is that they do the operation and return the value as it was prior to the
// increment/decrement.  I'll also assume they work correctly for both int and
// unsigned short types.
unsigned short atomic_incr( unsigned short & );
int atomic_incr( int & );
int atomic_decr( int & );

Таким образом, распределение происходит примерно так:

T* alloc()
{
// Check the queue size.  If it's zero (or less) we need to pass on
// to the next pool and/or allocate a new one.
if( nCount <= 0 ) {
return alloc_extend();
}

int count = atomic_decr(nCount);
if( count <= 0 ) {
T *mem = alloc_extend();
atomic_incr(nCount);     // undo
return mem;
}

// We are guaranteed that there is at least 1 element in the list for us.
// This will overflow naturally to achieve modulo by 65536.  You can only
// deal with queue sizes that are a power of 2.  If you want 32768 values,
// for example, you must do this: head &= 0x7fff;
unsigned short head = atomic_incr(nHead);

// Spin until the element is valid (use a reference)
unsigned short & idx = freeList[head];
while( idx == 0xffff );

// Grab the pool item, and blitz the index from the queue
T * mem = &poolBuffer[idx];
idx = 0xffff;

return mem;
};

Выше используется новая закрытая функция-член:

T * alloc_extend()
{
if( nextPool == NULL ) {
acquire_mutex_here();
if( nextPool == NULL ) nextPool = new MemPool<T>;
release_mutex_here();
if( nextPool == NULL ) return NULL;
}
return nextPool->alloc();
}

Когда вы хотите бесплатно:

void free(T* mem)
{
// Find the right pool to free from.
if( mem < &poolBuffer.front() || mem > &poolBuffer.back() )
{
if( nextPool ) nextPool->free(mem);
return;
}

// You might want to maintain a bitset that indicates whether the memory has
// actually been allocated so you don't corrupt your pool here, but I won't
// do that in this example...

// Work out the index.  Hope my pointer arithmetic is correct here.
unsigned short idx = (unsigned short)(mem - &poolBuffer.front());

// Push index back onto the queue.  As with alloc(), you might want to
// use a mask on the tail to achieve modulo.
int tail = atomic_incr(nTail);
freeList[tail] = idx;

// Don't need to check the list size.  We assume everything is sane. =)
atomic_incr(nCount);
}

Обратите внимание, что я использовал значение 0xffff, фактически, как NULL. Установка, очистка и вращение на этом значении предназначены для предотвращения ситуации гонки. Вы не можете гарантировать, что безопасно оставлять старые данные в очереди, когда несколько потоков могут вызывать free пока у вас есть другие темы, вызывающие alloc, Ваша очередь будет циклически проходить, но данные в ней могут быть еще не установлены.

Конечно, вместо индексов вы можете просто использовать указатели. Но это составляет 4 байта (или 8 байтов в 64-разрядном приложении), и затраты памяти могут не стоить того, в зависимости от размера данных, которые вы объединяете. Лично я бы использовал указатели, но по некоторым причинам в этом ответе было проще использовать индексы.

3

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]