Как работает параллельный метод и распределенный метод pytorch?

Я не эксперт в распределенных системах и CUDA. Но есть одна действительно интересная особенность поддержки PyTorch, которая nn.DataParallel а также nn.DistributedDataParallel, Как они на самом деле реализованы? Как они разделяют общие вложения и синхронизируют данные?

Вот основной пример DataParallel,

import torch.nn as nn
from torch.autograd.variable import Variable
import numpy as np

class Model(nn.Module):
def __init__(self):
super().__init__(
embedding=nn.Embedding(1000, 10),
rnn=nn.Linear(10, 10),
)

def forward(self, x):
x = self.embedding(x)
x = self.rnn(x)
return x

model = nn.DataParallel(Model())
model.forward(Variable.from_numpy(np.array([1,2,3,4,5,6], dtype=np.int64)).cuda()).cpu()

PyTorch может разделить входные данные и отправить их во многие графические процессоры и объединить результаты обратно.

Как он управляет внедрением и синхронизацией для параллельной модели или распределенной модели?
Я бродил по коду PyTorch, но очень трудно понять, как работают основы.

9

Решение

Из того, что я могу проследить, код реализован в parallel_apply.py

[Редактировать: Вставьте код сюда для удобства использования]
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
r"""Applies each `module` in :attr:`modules` in parallel on arguments
contained in :attr:`inputs` (positional) and :attr:`kwargs_tup` (keyword)
on each of :attr:`devices`.
Args:
modules (Module): modules to be parallelized
inputs (tensor): inputs to the modules
devices (list of int or torch.device): CUDA devices
:attr:`modules`, :attr:`inputs`, :attr:`kwargs_tup` (if given), and
:attr:`devices` (if given) should all have same length. Moreover, each
element of :attr:`inputs` can either be a single object as the only argument
to a module, or a collection of positional arguments.
"""assert len(modules) == len(inputs)
if kwargs_tup is not None:
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = list(map(lambda x: _get_device_index(x, True), devices))
lock = threading.Lock()
results = {}
grad_enabled = torch.is_grad_enabled()

def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
with torch.cuda.device(device):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs)
with lock:
results[i] = output
except Exception as e:
with lock:
results[i] = e

if len(modules) > 1:
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]

for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])

outputs = []
for i in range(len(inputs)):
output = results[i]
if isinstance(output, Exception):
raise output
outputs.append(output)
return outputs
  • modules являются модулями, которые будут параллельны.
  • inputs являются тензорами для модулей
  • devices устройства CUDA
  • results а также output сохранить окончательный результат
  • _worker() является основной функцией, которая должна выполняться потоком
-1

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]