点击上方
“
小白学视觉
”,选择加"
星标
"或“
置顶
”
重磅干货,第一时间送达
扫描下方二维码,加入前沿学术论文交流星球
!
可以获得最新顶会/顶刊论文的idea解读、解读的PDF
和
CV从入门到精通资料,及最前沿应用
!
来源丨https://zhuanlan.zhihu.com/p/489892744
由于工作需要,最近在补充分布式训练方面的知识。经过一番理论学习后仍觉得意犹未尽,很多知识点无法准确get到(例如:分布式原语scatter、all reduce等代码层面应该是什么样的,ring all reduce 算法在梯度同步时是怎么使用的,parameter server参数是如何部分更新的)。
著名物理学家,诺贝尔奖得主Richard Feynman办公室的黑板上写了:"What I cannot create, I do not understand."。在程序员界也经常有"show me the code"的口号。因此,我打算写一系列的分布式训练的文章,将以往抽象的分布式训练的概念以代码的形式展现出来,并保证每个代码可执行、可验证、可复现,并贡献出来源码让大家相互交流。
经过调研发现pytorch对于分布式训练做好很好的抽象且接口完善,因此本系列文章将以pytorch为主要框架进行,文章中的例子很多都来自pytorch的文档,并在此基础上进行了调试和扩充。
最后,由于分布式训练的理论介绍网络上已经很多了,理论部分的介绍不会是本系列文章的重点,我会将重点放在代码层面的介绍上面。
Pytorch - 分布式训练极简体验:https://zhuanlan.zhihu.com/p/477073906
Pytorch - 分布式通信原语(附源码):https://zhuanlan.zhihu.com/p/478953028
Pytorch - 手写allreduce分布式训练(附源码):https://zhuanlan.zhihu.com/p/482557067
Pytorch - 算子间并行极简实现(附源码):https://zhuanlan.zhihu.com/p/483640235
Pytorch - 多机多卡极简实现(附源码):https://zhuanlan.zhihu.com/p/486130584
1. 介绍
Pytorch在1.9.0引入了torchrun,用其替代1.9.0以前版本的
torch.distributed.launch
。torchrun在
torch.distributed.launch
功能的基础上主要新增了两个功能:
Failover: 当worker训练失败时,会自动重新启动所有worker继续进行训练;
Elastic: 可以动态增加或或删除node节点,本文将通过一个例子说明Elastic Training应该如何使用;
本例中会先在Node0上启动4 GPU的worker group ,等其训练一段时间后,会在Node1上再启动4 GPU的workers,并与Node1上的workers构成一个新的worker group,最终构成一个2机8卡的分布式训练。
2. 模型构建
一个简单的全连接模型神经网络模型
class ToyModel (nn.Module) : def __init__ (self) : super(ToyModel, self).__init__() self.net1 = nn.Linear(10 , 10 ) self.relu = nn.ReLU() self.net2 = nn.Linear(10 , 5 ) def forward (self, x) : return self.net2(self.relu(self.net1(x)))
3. checkpoint 处理
由于再每次增加或删除node时,会将所有worker kill掉,然后再重新启动所有worker进行训练。因此,在训练代码中要对训练的状态进行保存,以保证重启后能接着上次的状态继续训练。
需要保存的信息一般有如下内容:
save和load的代码如下所示
torch.save
:利用python的pickle将python的object 进行序列化,并保存到本地文件;
torch.load
: 将torch.save后的本地文件进行反序列化,并加载到内存中;
model.state_dict():
存储了model 每个layer和其对应的param信息
optimizer.state_dict()
:存储了优化器的参数信信息
def save_checkpoint (epoch, model, optimizer, path) : torch.save({ "epoch" : epoch, "model_state_dict" : model.state_dict(), "optimize_state_dict" : optimizer.state_dict(), }, path)def load_checkpoint (path) : checkpoint = torch.load(path) return checkpoint
4. 训练代码
初始化逻辑如下:
1~3行: 输出当前worker的关键环境变量,用于后面的结果展示
14~19行:如果存在checkpoint,则加载checkpoint,并赋值给model、optimizer和firt_epoch
local_rank = int(os.environ["LOCAL_RANK" ]) rank = int(os.environ["RANK" ]) print(f"[{os.getpid()} ] (rank = {rank} , local_rank = {local_rank} ) train worker starting..." ) model = ToyModel().cuda(local_rank) ddp_model = DDP(model, [local_rank]) loss_fn = nn.MSELoss() optimizer = optim.SGD(ddp_model.parameters(), lr=0.001 ) optimizer.zero_grad() max_epoch = 100 first_epoch = 0 ckp_path = "checkpoint.pt" if os.path.exists(ckp_path): print(f"load checkpoint from {ckp_path} " ) checkpoint = load_checkpoint(ckp_path) model.load_state_dict(checkpoint["model_state_dict" ]) optimizer.load_state_dict(checkpoint["optimize_state_dict" ]) first_epoch = checkpoint["epoch" ]
训练逻辑:
1行:epoch执行的次数为first_epoch到max_epoch,以便能够在worker被重启后继续原有的epoch继续训练;
2行:为了展示动态添加node效果,这里添加sleep函数来降低训练的速度;
9行:为了简单,文本每个epoch进行一次checkpoint保存;将当前的epoch,model和optimizer保存到checkpoint中;
for i in range(first_epoch, max_epoch): time.sleep(1 ) # 为了展示动态添加node效果,这里添加sleep函数来降低训练的速度 outputs = ddp_model(torch.randn(20 , 10 ).to(local_rank)) labels = torch.randn(20 , 5 ).to(local_rank) loss = loss_fn(outputs, labels) loss.backward() print(f"[{os.getpid()} ] epoch {i} (rank = {rank} , local_rank = {local_rank} ) loss = {loss.item()} \n" ) optimizer.step() save_checkpoint(i, model, optimizer, ckp_path)
5. 启动方式
由于我们使用torchrun来启动多机多卡任务,无需使用spawn接口来启动多个进程(torchrun会负责将我们的python script启动为一个process),因此直接调用上文编写的train函数,并在前后分别添加DistributedDataParallel的初始化和效果函数即可。
下面代码描述了上文train接口的调用。
def run () : env_dict = { key: os.environ[key] for key in ("MASTER_ADDR" , "MASTER_PORT" , "WORLD_SIZE" , "LOCAL_WORLD_SIZE" ) } print(f"[{os.getpid()}
] Initializing process group with: {env_dict} " ) dist.init_process_group(backend="nccl" ) train() dist.destroy_process_group()if __name__ == "__main__" : run()
本例中使用torchrun来执行多机多卡的分布式训练任务(注:
torch.distributed.launch
已经被pytorch淘汰了,尽量不要再使用)。启动脚本描述如下(注:node0和node1均通过该脚本进行启动)
--nnodes=1:3
:表示当前训练任务接受最少1个node,最多3个node参与分布式训练;
--nproc_per_node=4
:表示每个node上节点有4个process
--max_restarts=3
: worker group最大的重启次数;这里需要注意的是,node fail、node scale down和node scale up都会导致restart;
--rdzv_id=1
:一个unique的job id,所有node均使用同一个job id;
--rdzv_backend
: rendezvous的backend实现,默认支持c10d和etcd两种;rendezvous用于多个node之间的通信和协调;
--rdzv_endpoint
:rendezvous的地址,应该为一个node的host ip和port;
torchrun \ --nnodes=1:3\ --nproc_per_node=4\ --max_restarts=3\ --rdzv_id=1\ --rdzv_backend=c10d\ --rdzv_endpoint="192.0.0.1:1234"\ train_elastic.py
6. 结果分析
代码:BetterDL - train_elastic.py:https://github.com/tingshua-yts/BetterDL/blob/master/test/pytorch/DDP/train_elastic.py
运行环境: 2台4卡 v100机器
image: pytorch/pytorch:1.11.0-cuda11.3-cudnn8-runtime gpu: v100
先在node0上执行执行启动脚本
torchrun \ --nnodes=1:3\ --nproc_per_node=4\ --max_restarts=3\ --rdzv_id=1\ --rdzv_backend=c10d\ --rdzv_endpoint="192.0.0.1:1234"\ train_elastic.py
得到如下结果
2~5行:当前启动的是单机4卡的训练任务,因此WORLD_SIZE为4, LOCAL_WORKD_SIZE也为4
6~9行:共有4个rank参与了分布式训练,rank0~rank3
10~18行: rank0~rank3 均从epoch=0开始训练
r/workspace/DDP# sh run_elastic.sh [4031] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '44901', 'WORLD_SIZE': '4', 'LOCAL_WORLD_SIZE': '4'} [4029] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '44901', 'WORLD_SIZE': '4', 'LOCAL_WORLD_SIZE': '4'} [4030] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '44901', 'WORLD_SIZE': '4', 'LOCAL_WORLD_SIZE': '4'} [4032] Initializing process group with: {'MASTER_ADDR': '192.0.0.1', 'MASTER_PORT': '44901', 'WORLD_SIZE': '4', 'LOCAL_WORLD_SIZE': '4'} [4029] (rank = 0, local_rank = 0) train worker starting... [4030] (rank = 1, local_rank = 1) train worker starting... [4032] (rank = 3, local_rank = 3) train worker starting... [4031] (rank = 2, local_rank = 2) train worker starting... [4101] epoch 0 (rank = 1, local_rank = 1) loss = 0.9288564920425415 [4103] epoch 0 (rank = 3, local_rank = 3) loss = 0.9711472988128662 [4102] epoch 0 (rank = 2, local_rank = 2) loss = 1.0727070569992065 [4100] epoch 0 (rank = 0, local_rank = 0) loss = 0.9402943253517151 [4100] epoch 1 (rank = 0, local_rank = 0) loss = 1.0327017307281494 [4101] epoch 1 (rank = 1, local_rank = 1) loss = 1.4485043287277222 [4103] epoch 1 (rank = 3, local_rank = 3) loss = 1.0959293842315674 [4102] epoch 1 (rank = 2, local_rank = 2) loss = 1.0669530630111694 ...
在node1上执行与上面相同的脚本
torchrun \ --nnodes=1:3\ --nproc_per_node=4\ --max_restarts=3\ --rdzv_id=1\ --rdzv_backend=c10d\ --rdzv_endpoint="192.0.0.1:1234"\ train_elastic.py
node1上结果如下:
2~5行:由于添加node1,当前执行的是2机8卡的分布式训练任务,因此WORLD_SIZE=8, LOCAL_WORLD_SIZE=4
6~9行:当前node1上workers的rank为rank4 ~rank7