В моем проекте я использую pybind11 привязать код C ++ к Python. Недавно мне пришлось иметь дело с очень большими наборами данных (более 70 ГБ), и я столкнулся с необходимостью разделения данных из одного std::deque
между несколькими std::deque
«S. Так как мой набор данных очень большой, я ожидаю, что разделение не будет иметь большой объем памяти. Поэтому я выбрал одну стратегию «толчок — один толчок», которая в целом должна обеспечивать выполнение моих требований.
Это все в теории. На практике мой процесс был убит. Таким образом, я боролся в течение последних двух дней и в итоге нашел следующий минимальный пример, демонстрирующий проблему.
Как правило, минимальный пример создает кучу данных в deque
(~ 11GB), возвращает его в Python, затем снова вызывает C++
переместить элементы. Просто как тот. Перемещение части выполняется в исполнителе.
Интересно, что если я не использую executor, использование памяти будет таким, как ожидалось, а также, когда накладываются ограничения на виртуальную память с помощью ulimit, программа действительно соблюдает эти ограничения и не дает сбоя.
test.py
from test import _test
import asyncio
import concurrent
async def test_main(loop, executor):
numbers = _test.generate()
# moved_numbers = _test.move(numbers) # This works!
moved_numbers = await loop.run_in_executor(executor, _test.move, numbers) # This doesn't!
if __name__ == '__main__':
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(1)
task = loop.create_task(test_main(loop, executor))
loop.run_until_complete(task)
executor.shutdown()
loop.close()
test.cpp
#include <deque>
#include <iostream>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
namespace py = pybind11;
PYBIND11_MAKE_OPAQUE(std::deque<uint64_t>);
PYBIND11_DECLARE_HOLDER_TYPE(T, std::shared_ptr<T>);
template<class T>
void py_bind_opaque_deque(py::module& m, const char* type_name) {
py::class_<std::deque<T>, std::shared_ptr<std::deque<T>>>(m, type_name)
.def(py::init<>())
.def(py::init<size_t, T>());
}
PYBIND11_PLUGIN(_test) {
namespace py = pybind11;
pybind11::module m("_test");
py_bind_opaque_deque<uint64_t>(m, "NumbersDequeue");
// Generate ~11Gb of data.
m.def("generate", []() {
std::deque<uint64_t> numbers;
for (uint64_t i = 0; i < 1500 * 1000000; ++i) {
numbers.push_back(i);
}
return numbers;
});
// Move data from one dequeue to another.
m.def("move", [](std::deque<uint64_t>& numbers) {
std::deque<uint64_t> numbers_moved;
while (!numbers.empty()) {
numbers_moved.push_back(std::move(numbers.back()));
numbers.pop_back();
}
std::cout << "Done!\n";
return numbers_moved;
});
return m.ptr();
}
Тест / __ init__.py
import warnings
warnings.simplefilter("default")
компиляция:
g++ -std=c++14 -O2 -march=native -fPIC -Iextern/pybind11 `python3.5-config --includes` `python3.5-config --ldflags` `python3.5-config --libs` -shared -o test/_test.so test.cpp
Замечания:
moved_numbers = _test.move(numbers)
, все работает как положено, использование памяти, показанное htop, остается 11Gb
, Отлично!.Когда введены ограничения на виртуальную память (~ 15 Гб), все работает нормально, что, пожалуй, самая интересная часть.
ulimit -Sv 15000000 && python3.5 test.py
>> Done!
,
При увеличении лимита программа вылетает (150Gb> моей оперативной памяти).
ulimit -Sv 150000000 && python3.5 test.py
>> [1] 2573 killed python3.5 test.py
Использование метода deque shrink_to_fit
не помогает (и не должно)
Используемое программное обеспечение
Ubuntu 14.04
gcc version 5.4.1 20160904 (Ubuntu 5.4.1-2ubuntu1~14.04)
Python 3.5.2
pybind11 latest release - v1.8.1
Заметка
Обратите внимание, что этот пример был сделан просто для демонстрации проблемы. Использование asyncio
а также pybind
необходимо для возникновения проблемы.
Любые идеи о том, что может происходить, приветствуются.
Проблема оказалась вызвана тем, что данные создавались в одном потоке, а затем освобождались в другом. Это так из-за malloc арен в glibc (для справки см. это). Это можно хорошо продемонстрировать, выполнив:
executor1 = concurrent.futures.ThreadPoolExecutor(1)
executor2 = concurrent.futures.ThreadPoolExecutor(1)
numbers = await loop.run_in_executor(executor1, _test.generate)
moved_numbers = await loop.run_in_executor(executor2, _test.move, numbers)
что займет в два раза больше памяти, выделенной _test.generate
а также
executor = concurrent.futures.ThreadPoolExecutor(1)
numbers = await loop.run_in_executor(executor, _test.generate)
moved_numbers = await loop.run_in_executor(executor, _test.move, numbers)
который не был
Эту проблему можно решить либо переписав код, чтобы он не перемещал элементы из одного контейнера в другой (мой случай), либо установив переменную среды export MALLOC_ARENA_MAX=1
что ограничит количество арен malloc до 1. Это, однако, может повлиять на производительность (есть веская причина иметь несколько арен).
Других решений пока нет …