Создание стандартной карты, которая является поточно-ориентированной

В моем текущем сценарии скорость важна, у меня есть карта, которая читается только несколькими потоками, и это отлично работает. Теперь возникло требование, которое может потребовать время от времени записи в статическую карту, пока карты читаются другими потоками. Я считаю, что это изменит правила игры, поскольку мне нужно было бы заблокировать свои карты для безопасности потоков. Это создает проблему, поскольку у меня есть несколько потоков 10-12 потоков, которые будут читать карту. Если одна карта берет блокировку на карте (с момента ее чтения), я считаю, что блокировка будет необходима, поскольку что-то может быть записано на карту. В любом случае, как я уже говорил ранее, если одна карта читает, то другие карты не будут иметь параллельного доступа к карте, как раньше. Есть ли способ, которым я могу обойти эту проблему?

11

Решение

Вы можете использовать shared_mutex рядом с вашей картой, чтобы приобрести общий или же уникальный доступ. Как правило, операция записи требует уникального доступа, в то время как операции чтения требуют общего доступа.

Доступ к общему доступу может получить любое количество потоков, если только у потоков нет уникального доступа. Если поток пытается получить уникальный доступ, он ожидает, пока не будет освобожден весь общий доступ.

Стандартная библиотека и Boost обеспечивают shared_lock<T> а также unique_lock<T> для ограниченного объема приобретения shared_mutex,

Остерегайтесь того, что некоторые люди утверждают shared_mutex работает плохо, хотя я не видел каких-либо доказательств или сильного анализа в поддержку этих утверждений. Это может стоить изучить, если это имеет значение для вас.

13

Другие решения

просто для вашего удовольствия c ++, прочитайте эту книгу, вы найдете ПУТЬ больше, чем потраченные деньги, ваш мир параллелизма будет широко открыт
C ++ — Параллелизм в действии Практическая многопоточность
В этих книгах рассматриваются самые разные проблемы и практические решения, касающиеся обмена данными между потоками, способов пробуждения потоков, создания пулов потоков и многого другого … больше … и больше

вот пример совместного использования данных между потоками без использования atomic или shared_locks

template<class T>
class TaskQueue
{
public:
TaskQueue(){}
TaskQueue& operator = (TaskQueue&) = delete;

void Push(T value){
std::lock_guard<std::mutex> lk(mut);
data.push(value);
condition.notify_one(); //if you have many threads trying to access the data at same time, this will wake one thread only
}

void Get(T& value){
std::unique_lock<std::mutex> lk(mut);
condition.wait(lk, [this]{ return !data.empty(); }); // in this case it waits if queue is empty, if not needed  you can remove this line
value = data.front();
data.pop();
lk.unlock();
}

private:
std::mutex mut;
std::queue<T> data; //in your case change this to a std::map
std::condition_variable condition;
};
6

Одним из решений может быть сохранение указателя на эту карту, а когда вам нужно ее изменить — сделать копию, изменить эту копию и затем атомарно поменять указатель на новый экземпляр. Это решение потребовало бы больше памяти, но могло бы быть более эффективным, если у вас много потоков чтения, так как этот метод свободен от блокировки.

В приведенном ниже примере только один поток может изменить карту. Это не означает, что один поток за раз, это означает, что один и тот же поток для жизни структуры данных. В противном случае модификацию нужно будет выполнять, удерживая мьютекс, который защищает весь код в updateMap, Читатель темы может получить доступ theData как обычно — без какой-либо блокировки.

typedef std::map<...> Data;

std::atomic<Data *> theData;

void updateMap( ... )
{
Data *newData = new Data( *theData );
// modify newData here
Data *old = theData.exchange( newData );
delete old;
}
4

Вот моя реализация поточного безопасного изменяемого размера hashmap без использования контейнеров stl:

#pragma once

#include <iomanip>
#include <exception>
#include <mutex>
#include <condition_variable>

/*
*  wrapper for items stored in the map
*/
template<typename K, typename V>
class HashItem {
public:
HashItem(K key, V value) {
this->key = key;
this->value = value;
this->nextItem = nullptr;
}

/*
* copy constructor
*/
HashItem(const HashItem & item) {
this->key = item.getKey();
this->value = item.getValue();
this->nextItem = nullptr;
}

void setNext(HashItem<K, V> * item) {
this->nextItem = item;
}

HashItem * getNext() {
return nextItem;
}

K getKey() {
return key;
}

V getValue() {
return value;
}

void setValue(V value) {
this->value = value;
}

private:
K key;
V value;
HashItem * nextItem;

};

/*
* template HashMap for storing items
* default hash function HF = std::hash<K>
*/
template <typename K, typename V, typename HF = std::hash<K>>
class HashMap {
public:
/*
* constructor
* @mSize specifies the bucket size og the map
*/
HashMap(std::size_t mSize) {
// lock initialization for single thread
std::lock_guard<std::mutex>lock(mtx);
if (mSize < 1)
throw std::exception("Number of buckets ust be greater than zero.");

mapSize = mSize;
numOfItems = 0;
// initialize
hMap = new HashItem<K, V> *[mapSize]();
}

/*
* for simplicity no copy constructor
* anyway we want test how different threads
* use same instance of the map
*/
HashMap(const HashMap & hmap) = delete;

/*
* inserts item
* replaces old value with the new one when item already exists
* @key key of the item
* @value value of the item
*/
void insert(const K & key, const V & value) {
std::lock_guard<std::mutex>lock(mtx);
insertHelper(this->hMap, this->mapSize, numOfItems, key, value);
condVar.notify_all();
}

/*
* erases item with key when siúch item exists
* @key of item to erase
*/
void erase(const K & key) {
std::lock_guard<std::mutex>lock(mtx);
// calculate the bucket where item must be inserted
std::size_t hVal = hashFunc(key) % mapSize;
HashItem<K, V> * prev = nullptr;
HashItem<K, V> * item = hMap[hVal];

while ((item != nullptr) && (item->getKey() != key)) {
prev = item;
item = item->getNext();
}
// no item found with the given key
if (item == nullptr) {
return;
}
else {
if (prev == nullptr) {
// item found is the first item in the bucket
hMap[hVal] = item->getNext();
}
else {
// item found in one of the entries in the bucket
prev->setNext(item->getNext());
}
delete item;
numOfItems--;
}
condVar.notify_all();
}

/*
* get element with the given key by reference
* @key is the key of item that has to be found
* @value is the holder where the value of item with key will be copied
*/
bool getItem(const K & key, V & value) const {
std::lock_guard<std::mutex>lock(mtx);
// calculate the bucket where item must be inserted
std::size_t hVal = hashFunc(key) % mapSize;
HashItem<K, V> * item = hMap[hVal];

while ((item != nullptr) && (item->getKey() != key))
item = item->getNext();
// item not found
if (item == nullptr) {
return false;
}

value = item->getValue();
return true;
}/*
* get element with the given key by reference
* @key is the key of item that has to be found
* shows an example of thread waitung for some condition
* @value is the holder where the value of item with key will be copied
*/
bool getWithWait(const K & key, V & value) {
std::unique_lock<std::mutex>ulock(mtxForWait);
condVar.wait(ulock, [this] {return !this->empty(); });
// calculate the bucket where item must be inserted
std::size_t hVal = hashFunc(key) % mapSize;
HashItem<K, V> * item = hMap[hVal];

while ((item != nullptr) && (item->getKey() != key))
item = item->getNext();
// item not found
if (item == nullptr) {
return false;
}

value = item->getValue();
return true;
}/*
* resizes the map
* creates new map on heap
* copies the elements into new map
* @newSize specifies new bucket size
*/
void resize(std::size_t newSize) {
std::lock_guard<std::mutex>lock(mtx);
if (newSize < 1)
throw std::exception("Number of buckets must be greater than zero.");

resizeHelper(newSize);
condVar.notify_all();
}

/*
* outputs all items of the map
*/
void outputMap() const {
std::lock_guard<std::mutex>lock(mtx);
if (numOfItems == 0) {
std::cout << "Map is empty." << std::endl << std::endl;
return;
}
std::cout << "Map contains " << numOfItems << " items." << std::endl;
for (std::size_t i = 0; i < mapSize; i++) {
HashItem<K, V> * item = hMap[i];
while (item != nullptr) {
std::cout << "Bucket: " << std::setw(3) << i << ", key: " << std::setw(3) << item->getKey() << ", value:" << std::setw(3) << item->getValue() << std::endl;
item = item->getNext();
}
}
std::cout << std::endl;
}

/*
* returns true when map has no items
*/
bool empty() const {
std::lock_guard<std::mutex>lock(mtx);
return numOfItems == 0;
}

void clear() {
std::lock_guard<std::mutex>lock(mtx);
deleteMap(hMap, mapSize);
numOfItems = 0;
hMap = new HashItem<K, V> *[mapSize]();
}

/*
* returns number of items stored in the map
*/
std::size_t size() const {
std::lock_guard<std::mutex>lock(mtx);
return numOfItems;
}

/*
* returns number of buckets
*/
std::size_t bucket_count() const {
std::lock_guard<std::mutex>lock(mtx);
return mapSize;
}

/*
* desctructor
*/
~HashMap() {
std::lock_guard<std::mutex>lock(mtx);
deleteMap(hMap, mapSize);
}

private:
std::size_t mapSize;
std::size_t numOfItems;
HF hashFunc;
HashItem<K, V> ** hMap;
mutable std::mutex mtx;
mutable std::mutex mtxForWait;
std::condition_variable condVar;

/*
* help method for inserting key, value item into the map hm
* mapSize specifies the size of the map, items - the number
* of stored items, will be incremented when insertion is completed
* @hm HashMap
* @mSize specifies number of buckets
* @items holds the number of items in hm, will be incremented when insertion successful
* @key - key of item to insert
* @value - value of item to insert
*/
void insertHelper(HashItem<K, V> ** hm, const std::size_t & mSize, std::size_t & items, const K & key, const V & value) {
std::size_t hVal = hashFunc(key) % mSize;
HashItem<K, V> * prev = nullptr;
HashItem<K, V> * item = hm[hVal];

while ((item != nullptr) && (item->getKey() != key)) {
prev = item;
item = item->getNext();
}

// inserting new item
if (item == nullptr) {
item = new HashItem<K, V>(key, value);
items++;
if (prev == nullptr) {
// insert new value as first item in the bucket
hm[hVal] = item;
}
else {
// append new item on previous in the same bucket
prev->setNext(item);
}
}
else {
// replace existing value
item->setValue(value);
}
}

/*
* help method to resize the map
* @newSize specifies new number of buckets
*/
void resizeHelper(std::size_t newSize) {
HashItem<K, V> ** newMap = new HashItem<K, V> *[newSize]();
std::size_t items = 0;
for (std::size_t i = 0; i < mapSize; i++) {
HashItem<K, V> * item = hMap[i];
while (item != nullptr) {
insertHelper(newMap, newSize, items, item->getKey(), item->getValue());
item = item->getNext();
}
}

deleteMap(hMap, mapSize);
hMap = newMap;
mapSize = newSize;
numOfItems = items;
newMap = nullptr;

}

/*
* help function for deleting the map hm
* @hm HashMap
* @mSize number of buckets in hm
*/
void deleteMap(HashItem<K, V> ** hm, std::size_t mSize) {
// delete all nodes
for (std::size_t i = 0; i < mSize; ++i) {
HashItem<K, V> * item = hm[i];
while (item != nullptr) {
HashItem<K, V> * prev = item;
item = item->getNext();
delete prev;
}
hm[i] = nullptr;
}
// delete the map
delete[] hm;
}
};
4

Два других ответа довольно хороши, но я подумал, что мне следует добавить немного цвета:

Клифф Клик написал параллельная хэш-карта без блокировки в Java. Было бы нетривиально адаптировать его к C ++ (без GC, другой модели памяти и т. Д.), Но это лучшая реализация структуры данных без блокировки, которую я когда-либо видел. Если вы можете использовать JAva вместо C ++, это может быть путь.

Я не знаю ни о каких сбалансированных бинарных древовидных структурах без блокировки. Это не значит, что они не существуют.

Вероятно, проще всего пойти с одним из двух других ответов (массовое копирование / атомарный обмен / что-то вроде shared_ptr или блокировки чтения-записи) для контроля доступа к map вместо. Один из двух будет быстрее в зависимости от относительного количества операций чтения и записи и размера map; Вы должны проверить, какой из них вы должны использовать.

0

То, что вам нужно, эквивалентно ConcurrentHashMap в Java, который позволяет одновременно читать и записывать в базовую хеш-таблицу. Этот класс является частью пакета java.util.concurrent и обеспечивает одновременное чтение и запись (до уровня параллелизма, по умолчанию 16).

Вы можете найти больше информации в Javadoc. Я цитирую Javadoc здесь:

Хеш-таблица, поддерживающая полный параллелизм получения и настраиваемый ожидаемый параллелизм для обновлений. Этот класс подчиняется той же функциональной спецификации, что и Hashtable, и включает версии методов, соответствующие каждому методу Hashtable. Однако, несмотря на то, что все операции являются поточно-ориентированными, операции извлечения не влекут за собой блокировку, и нет никакой поддержки для блокировки всей таблицы таким образом, чтобы предотвратить любой доступ. Этот класс полностью совместим с Hashtable в программах, которые полагаются на безопасность потоков, но не на детали синхронизации.

0
По вопросам рекламы [email protected]