У меня есть объект, который обрабатывается несколькими задачами. Я копирую этот объект несколько раз и сохраняю в векторе задачу, чтобы получить собственную копию для работы в цикле parallel_for. Ниже приведен код со стандартным вектором.
Идея в том, что я начинаю с вектора размером 0 и увеличиваю его в соответствии с количеством задач, которые запускаются параллельно и нуждаются в собственной копии. Я использую атомарный _poolIndex для отслеживания глобального индекса за прогон.
Object& GetObject()
{
if (_poolIndex >= _objectPool.size())
{
lock_guard<mutex> lock(_mutex);
Object copy(_original);
_objectPool.push_back(move(copy));
}
int taskIndex = _poolIndex.fetch_add(1);
return _objectPool[taskIndex];
}
Я получаю индекс вне границ в приведенном ниже коде в векторном классе, даже если позиция < размер, когда отладчик ломается:
reference operator[](size_type _Pos)
{ // subscript mutable sequence
#if _ITERATOR_DEBUG_LEVEL == 2
if (size() <= _Pos)
{ // report error
_DEBUG_ERROR("vector subscript out of range");
_SCL_SECURE_OUT_OF_RANGE;
}
Так что, очевидно, та часть, которая возвращает size () <= _Pos оценил что-то другое … Я в замешательстве, потому что у меня есть блокировка при нажатии на вектор.
Затем я попробовал concurrent_vector, и push_back давал мне проблемы с компиляцией, вот ошибки Visual Studio 2013:
Ошибка 35, ошибка C2059: синтаксическая ошибка: ‘&’c: \ программные файлы
(x86) \ Microsoft Visual Studio
12.0 \ vc \ include \ concurrent_vector.h 1492 1 UnitTestsОшибка 36, ошибка C2143: синтаксическая ошибка: отсутствует ‘;’ до ‘)’ c: \ program
файлы (x86) \ Microsoft Visual Studio
12.0 \ vc \ include \ concurrent_vector.h 1492 1 UnitTests
А в классе concurrent_vector приведен код, который дает проблему при переключении _objectPool на concurrent_vector из вектора:
void _Init(const void *_Src)
{
for(; _I < _N; ++_I)
new( &_My_array[_I] ) _Ty(*static_cast<const _Ty*>(_Src));
}
Если кто-то может дать рекомендации по двум вышеуказанным вопросам, это было бы здорово.
Я также пытаюсь минимизировать критические секции для эффективности. Идея состоит в том, что после запуска алгоритма и многократного его запуска _objectPool будет иметь большинство, если не все копии, уже помещенные в вектор.
Во-первых, существует гонка данных, потому что два значения считываются из _poolIndex
(в if
а также taskIndex
) не синхронизируются. Поменяйте их местами и используйте taskIndex
в состоянии вместо чтения разделяемого состояния в другой раз.
Object& GetObject()
{
int taskIndex = _poolIndex.fetch_add(1);
if (taskIndex >= _objectPool.size()) // issue #2: size() is not thread-safe
{
lock_guard<mutex> lock(_mutex);
//This: Object copy(_original);
// _objectPool.push_back(move(copy));
// can be simplified to:
_objectPool.push_back(_original); // issue #3: it can push at different index
}
return _objectPool[taskIndex];
}
Вторая проблема может быть не видна с std::vector
в некоторых условиях. Но это определенно нарушает использование concurrent_vector (понять, почему).
Третья проблема заключается в том, что taskIndex
не синхронизируется с порядком блокировки, таким образом, он может создать один объект, но вернуть еще не построенный или не выделенный (вне диапазона).
Если я правильно понимаю ваше намерение, вы хотите повторно использовать объекты, созданные на первом проходе, для второго и создавать дополнительные объекты, если это необходимо. Я постараюсь исправить проблемы в коде ниже:
Object& GetObject()
{
int taskIndex = _poolIndex.fetch_add(1); // get current index in the pool
if (taskIndex >= _objectPoolSize.load(memory_order_acquire)) // atomic<size_t>
{
lock_guard<mutex> lock(_mutex);
size_t sz = _objectPoolSize.load(memory_order_relaxed);
if (taskIndex >= sz) { // double-check under the lock
sz *= 2; // or any other factor, get a bunch of new objects at once
_objectPool.resize(sz, _original); // construct new copies of _original
_objectPoolSize.store(sz, memory_order_release);// protect from reorder with resize
}
}
return _objectPool[taskIndex];
}
Что касается concurrent_vector (доступно в обоих госзакупках а также ТВВ), вы можете использовать grow_to_at_least чтобы снять блокировку .. но:
Object& GetObject()
{
int taskIndex = _poolIndex.fetch_add(1); // get current index in the pool
// construct new copies of _original if free objects is about to ran out
_objectPool.grow_to_at_least(taskIndex+10/*or other*/, _original );
return _objectPool[taskIndex]; // ISSUE: it might not be constructed yet in other thread
}
Страдает от та же проблема как size()
, Поэтому требуется либо некоторая синхронизация с _objectPoolSize, либо синхронизация по элементам с конструкторами на основе распределения с нулевым заполнением, как описано в тот же блог.