DDP
DDP是PyTorch中的一个库,它支持跨多个设备的梯度同步。这意味着您可以通过跨多个GPU并行处理,线性地加快模型训练。DDP的工作原理是为每个GPU创建一个单独的Python进程,每个进程都使用一个不重叠的数据子集。
比起DP来,DDP训练速度更快,显卡负载也更为均衡。目前官方开发者推荐使用DDP代替DP,DP很少维护了,导致有许多bug。比如:nn.ParameterList
和nn.ParameterDict
在DP中会存在其他卡无法复制,变成空值的Bug,而这在DDP中就正常。
使用
① 添加超参数:
1 2 3 4 5 6 7
| def parse(): parser = argparse.ArgumentParser() parser.add_argument('--local_rank', type=int, default=0) ... ... args = parser.parse_args() return args
|
- local_rank:进程内GPU编号,非显式参数,由
torch.distributed.launch
内部指定,默认为GPU可用列表中的第一个GPU,这个是必须加的。
② 在主函数添加:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import os import torch.distributed as dist parser = argparse.ArgumentParser(description='Distributed Training')
parser.add_argument('--local_rank', type=int, default=0)
num_gpus = int(os.environ['WORLD_SIZE']) if 'WORLD_SIZE' in os.environ else 1 args.is_distributed = num_gpus > 1 args.num_gpus = num_gpus
if args.is_distributed: torch.cuda.set_device(args.local_rank) torch.distributed.init_process_group(backend='nccl', init_method='env://') dist.barrier()
|
③ 修改导入数据的接口:
1 2 3 4 5 6
| dataset = DAVIS2017(root, 'training') num_workers = 4 if cuda else 0
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
dataloader = DataLoader(dataset,batch_size=batchsize,shuffle=False, num_workers=num_workers,pin_memory=cuda, drop_last=True, sampler=train_sampler)
|
④ 准备模型:
1 2 3 4 5 6 7 8
| model = create_model()
device = device = torch.device('cuda:{}'.format(args.local_rank))
model = model.to(device)
model = DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank)
|
⑤ 权重加载:
1 2 3 4
| dist.barrier() model.load_state_dict(torch.load('model.pth'), map_location=torch.device('cpu')) dist.barrier()
|
⑥ 学习率:
⑦ 迭代:
1 2 3 4 5 6
| def train(epoch): train_sampler.set_epoch(epoch) ... ...
|
⑧ 输出loss:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import torch.distributed as dist
def get_world_size(): if not dist.is_available(): return 1 if not dist.is_initialized(): return 1 return dist.get_world_size()
def reduce_value(value, average=True): world_size = get_world_size() if world_size < 2: return value with torch.no_grad(): dist.all_reduce(value) if average: value /= world_size return value def train(epoch): ... for batch_idx, (inputs, targets) in enumerate(trainloader): ... loss = criterion(outputs, targets) ... ... if is_main_process(): reduced_loss = reduce_tensor(loss) print('loss: %.3f' % reduced_loss.item())
|
⑨ 模型保存、日志输出、控制台打印等:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import torch.distributed as dist
def get_rank(): if not dist.is_available(): return 0 if not dist.is_initialized(): return 0 return dist.get_rank()
def is_main_process(): return get_rank() == 0
if is_main_process(): torch.save(xxxx)
|
⑩ 训练:
1 2
| # 在原来基础上加上python -m torch.distributed.launch --nproc_per_node=GPU数量即可 python -m torch.distributed.launch --nproc_per_node=GPU数量 train.py --arg1 --arg2 --arg3
|