DDP

DDP是PyTorch中的一个库,它支持跨多个设备的梯度同步。这意味着您可以通过跨多个GPU并行处理,线性地加快模型训练。DDP的工作原理是为每个GPU创建一个单独的Python进程,每个进程都使用一个不重叠的数据子集。

比起DP来,DDP训练速度更快,显卡负载也更为均衡。目前官方开发者推荐使用DDP代替DP,DP很少维护了,导致有许多bug。比如:nn.ParameterListnn.ParameterDict在DP中会存在其他卡无法复制,变成空值的Bug,而这在DDP中就正常。

使用

① 添加超参数:

1
2
3
4
5
6
7
def parse():
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', type=int, default=0)
...
...
args = parser.parse_args()
return args
  • local_rank:进程内GPU编号,非显式参数,由 torch.distributed.launch 内部指定,默认为GPU可用列表中的第一个GPU,这个是必须加的。

② 在主函数添加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os
import torch.distributed as dist
parser = argparse.ArgumentParser(description='Distributed Training')
# 该参数由torch.distributed.launch自动传递,代表当前进程处理的GPU编号
parser.add_argument('--local_rank', type=int, default=0)

# 所创建的进程数,也就是所使用的GPU数量
num_gpus = int(os.environ['WORLD_SIZE']) if 'WORLD_SIZE' in os.environ else 1
args.is_distributed = num_gpus > 1
args.num_gpus = num_gpus

if args.is_distributed:
torch.cuda.set_device(args.local_rank)
torch.distributed.init_process_group(backend='nccl', init_method='env://')
dist.barrier() # 用dist.barrier()来同步不同进程间的快慢
  • world size:表示全局进程个数。

③ 修改导入数据的接口:

1
2
3
4
5
6
dataset = DAVIS2017(root, 'training')
num_workers = 4 if cuda else 0
# 使用DistributedSampler来为各个进程分发数据,用于将数据集等分成不重叠的数个子集
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
# 在Dataloader中指定sampler时,其中的shuffle必须为False,而DistributedSampler中的shuffle项默认为True,因此训练过程默认执行shuffle
dataloader = DataLoader(dataset,batch_size=batchsize,shuffle=False, num_workers=num_workers,pin_memory=cuda, drop_last=True, sampler=train_sampler)

④ 准备模型:

1
2
3
4
5
6
7
8
# 初始化模型
model = create_model()
# 创建CUDA设备
device = device = torch.device('cuda:{}'.format(args.local_rank))
# 把模型放到cuda上
model = model.to(device)
# 将模型用DDP封装
model = DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank)

⑤ 权重加载:

1
2
3
4
# 加载模型前后用dist.barrier()来同步不同进程间的快慢
dist.barrier()
model.load_state_dict(torch.load('model.pth'), map_location=torch.device('cpu'))
dist.barrier()

⑥ 学习率:

1
2
3
4
# 学习率应扩大GPU数的倍数,因为DDP每张显卡分别跑batchsize
# 而DP是多张显卡跑一个batchsize
# 也可以将batchsize // args.num_gpus作为batchsize,lr不变
lr *= args.num_gpus

⑦ 迭代:

1
2
3
4
5
6
def train(epoch):
# 在多GPU环境下,采样器必须知道哪个epoch
# 因此,每次迭代开始需要为sampler设置当前epoch
train_sampler.set_epoch(epoch)
...
...

⑧ 输出loss:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import torch.distributed as dist

def get_world_size():
if not dist.is_available():
return 1
if not dist.is_initialized():
return 1
return dist.get_world_size()

def reduce_value(value, average=True):
world_size = get_world_size()
if world_size < 2:
return value
with torch.no_grad():
dist.all_reduce(value)
if average:
value /= world_size
return value

def train(epoch):
...
for batch_idx, (inputs, targets) in enumerate(trainloader):
...
loss = criterion(outputs, targets)
...
...
# loss为每个GPU上loss的累加,因此会很大
# 如果想输出一个GPU上的loss,可对loss取平均
if is_main_process():
reduced_loss = reduce_tensor(loss)
print('loss: %.3f' % reduced_loss.item())

⑨ 模型保存、日志输出、控制台打印等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import torch.distributed as dist

def get_rank():
if not dist.is_available():
return 0
if not dist.is_initialized():
return 0
return dist.get_rank()

def is_main_process():
return get_rank() == 0

# 分布式训练会开启多个进程,因此模型的保存、日志输出、控制台打印等也会进行多次
# 因此直接保存会发生冲突
# 遇到此类操作都需要加判断,只在主进程时进行
if is_main_process():
torch.save(xxxx)

⑩ 训练:

1
2
# 在原来基础上加上python -m torch.distributed.launch --nproc_per_node=GPU数量即可
python -m torch.distributed.launch --nproc_per_node=GPU数量 train.py --arg1 --arg2 --arg3