concurrent_vector против вектора с мьютексом, проблемы потока с push_back

У меня есть объект, который обрабатывается несколькими задачами. Я копирую этот объект несколько раз и сохраняю в векторе задачу, чтобы получить собственную копию для работы в цикле parallel_for. Ниже приведен код со стандартным вектором.

Идея в том, что я начинаю с вектора размером 0 и увеличиваю его в соответствии с количеством задач, которые запускаются параллельно и нуждаются в собственной копии. Я использую атомарный _poolIndex для отслеживания глобального индекса за прогон.

    Object& GetObject()
{
if (_poolIndex >= _objectPool.size())
{
lock_guard<mutex> lock(_mutex);
Object copy(_original);
_objectPool.push_back(move(copy));
}

int taskIndex = _poolIndex.fetch_add(1);

return _objectPool[taskIndex];
}

Я получаю индекс вне границ в приведенном ниже коде в векторном классе, даже если позиция < размер, когда отладчик ломается:

reference operator[](size_type _Pos)
{   // subscript mutable sequence
#if _ITERATOR_DEBUG_LEVEL == 2
if (size() <= _Pos)
{   // report error
_DEBUG_ERROR("vector subscript out of range");
_SCL_SECURE_OUT_OF_RANGE;
}

Так что, очевидно, та часть, которая возвращает size () <= _Pos оценил что-то другое … Я в замешательстве, потому что у меня есть блокировка при нажатии на вектор.

Затем я попробовал concurrent_vector, и push_back давал мне проблемы с компиляцией, вот ошибки Visual Studio 2013:

Ошибка 35, ошибка C2059: синтаксическая ошибка: ‘&’c: \ программные файлы
(x86) \ Microsoft Visual Studio
12.0 \ vc \ include \ concurrent_vector.h 1492 1 UnitTests

Ошибка 36, ошибка C2143: синтаксическая ошибка: отсутствует ‘;’ до ‘)’ c: \ program
файлы (x86) \ Microsoft Visual Studio
12.0 \ vc \ include \ concurrent_vector.h 1492 1 UnitTests

А в классе concurrent_vector приведен код, который дает проблему при переключении _objectPool на concurrent_vector из вектора:

    void _Init(const void *_Src)
{
for(; _I < _N; ++_I)
new( &_My_array[_I] ) _Ty(*static_cast<const _Ty*>(_Src));
}

Если кто-то может дать рекомендации по двум вышеуказанным вопросам, это было бы здорово.

Я также пытаюсь минимизировать критические секции для эффективности. Идея состоит в том, что после запуска алгоритма и многократного его запуска _objectPool будет иметь большинство, если не все копии, уже помещенные в вектор.

1

Решение

вопросы

Во-первых, существует гонка данных, потому что два значения считываются из _poolIndexif а также taskIndex) не синхронизируются. Поменяйте их местами и используйте taskIndex в состоянии вместо чтения разделяемого состояния в другой раз.

Object& GetObject()
{
int taskIndex = _poolIndex.fetch_add(1);
if (taskIndex >= _objectPool.size())    // issue #2: size() is not thread-safe
{
lock_guard<mutex> lock(_mutex);
//This: Object copy(_original);
//      _objectPool.push_back(move(copy));
// can be simplified to:
_objectPool.push_back(_original); // issue #3: it can push at different index
}

return _objectPool[taskIndex];
}

Вторая проблема может быть не видна с std::vector в некоторых условиях. Но это определенно нарушает использование concurrent_vector (понять, почему).

Третья проблема заключается в том, что taskIndex не синхронизируется с порядком блокировки, таким образом, он может создать один объект, но вернуть еще не построенный или не выделенный (вне диапазона).

Правильный путь

Если я правильно понимаю ваше намерение, вы хотите повторно использовать объекты, созданные на первом проходе, для второго и создавать дополнительные объекты, если это необходимо. Я постараюсь исправить проблемы в коде ниже:

Object& GetObject()
{
int taskIndex = _poolIndex.fetch_add(1);                // get current index in the pool
if (taskIndex >= _objectPoolSize.load(memory_order_acquire)) // atomic<size_t>
{
lock_guard<mutex> lock(_mutex);
size_t sz = _objectPoolSize.load(memory_order_relaxed);
if (taskIndex >= sz) {                              // double-check under the lock
sz *= 2;                  // or any other factor, get a bunch of new objects at once
_objectPool.resize(sz, _original);              // construct new copies of _original
_objectPoolSize.store(sz, memory_order_release);// protect from reorder with resize
}
}
return _objectPool[taskIndex];
}

concurrent_vector

Что касается concurrent_vector (доступно в обоих а также ), вы можете использовать grow_to_at_least чтобы снять блокировку .. но:

Object& GetObject()
{
int taskIndex = _poolIndex.fetch_add(1); // get current index in the pool
// construct new copies of _original if free objects is about to ran out
_objectPool.grow_to_at_least(taskIndex+10/*or other*/, _original );
return _objectPool[taskIndex];  // ISSUE: it might not be constructed yet in other thread
}

Страдает от та же проблема как size(), Поэтому требуется либо некоторая синхронизация с _objectPoolSize, либо синхронизация по элементам с конструкторами на основе распределения с нулевым заполнением, как описано в тот же блог.

1

Другие решения


По вопросам рекламы [email protected]