• Pytorch 多卡并行(2)—— 使用 torchrun 进行容错处理



    1. torchrun

    • 在训练过程中,很容易遇到各种各样的错误,比如内存不足、网络故障、硬件故障等等。这些错误会导致训练过程中断或失败,从而浪费了训练时间和计算资源。 torchrun 允许我们在训练过程中按一定周期保存快照(snapshots),一旦某一并行进程出错退出,torchrun 会自动从最近 snapshots 重启所有进程。Snapshots 中要保存的参数由我们自行设定,它是模型 checkpoint 的超集,要包含恢复训练所需的全部参数,比如

      • 当前 epoch 值
      • 模型参数 model.state_dict()
      • 学习率调度器参数 lr_scheduler.state_dict()
      • 优化器参数 optimizer.state_dict()
      • 其他必要参数
    • 除了以上自动重启功能外,torchrun 还有其他一些功能

      1. torchrun 可以自动完成所有环境变量的设置,可以从环境变量中获取 rank 和 world size 等信息
        os.environ['RANK']          # 得到在所有node的所有进程中当前GPU进程的rank
        os.environ['LOCAL_RANK']    # 得到在当前node中当前GPU进程的rank
        os.environ['WORLD_SIZE']    # 得到GPU的数量
        
        • 1
        • 2
        • 3
      2. torchrun 可以完成进程分配工作,不再需要使用 mp.spawn 手动分发进程,只需要设置一个通用的 main() 函数入口,然后用 torchrun 命令启动脚本即可
      3. 快照功能允许进行断点续训
    • 使用 torchrun 时,程序通常有以下结构

      def main(args):
      	ddp_setup()				# 初始化进程池
      	load_train_objs(args)	# 设置 dataset, model, optimizer, trainer 等组件,若存在 snapshot 则从中加载参数
      	trian(args)				# 进行训练
      	destroy_process_group()	# 销毁进程池def train(args):
      	for batch in iter(dataset):
      		train_step(batch)if should_checkpoint:
      			save_snapshot(snapshot_path)	# 用 rank0 保存 snapshot
      
      if __name__ == "__main__":
      	# 加载参数
          args = parser.parse_args()	
          
          # 现在 torchrun 负责在各个 GPU 上生成进程并执行,不再需要 mp.spawn 了
          main(args)
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
    • 使用 torchrun 命令来启动程序

      torchrun --standalone --nproc_per_node=gpu XXX.py
      
      • 1
      1. --standalone 代表单机运行
      2. --nproc_per_node=gpu 代表使用所有可用GPU。等于号后也可写gpu数量n,这样会使用前n个GPU

      如果想要进一步指定要运行的 GPU,可以通过 CUDA_VISIBLE_DEVICES 设置GPU可见性,比如

      CUDA_VISIBLE_DEVICES=2,3 torchrun --standalone --nproc_per_node=gpu multi_gpu_torchrun.py
      
      • 1

      这样会把本机上的 GPU2 和 GPU3 看做 GPU0 和 GPU1 运行

    2. 使用 torchrun 改写 DDP 代码

    • 使用 torchrun 改写以下 DDP 代码
      # 使用 DistributedDataParallel 进行单机多卡训练
      import torch
      import torch.nn.functional as F
      from torch.utils.data import Dataset, DataLoader
      import os
      
      # 对 python 多进程的一个 pytorch 包装
      import torch.multiprocessing as mp
      
      # 这个 sampler 可以把采样的数据分散到各个 CPU 上                                      
      from torch.utils.data.distributed import DistributedSampler     
      
      # 实现分布式数据并行的核心类        
      from torch.nn.parallel import DistributedDataParallel as DDP         
      
      # DDP 在每个 GPU 上运行一个进程,其中都有一套完全相同的 Trainer 副本(包括model和optimizer)
      # 各个进程之间通过一个进程池进行通信,这两个方法来初始化和销毁进程池
      from torch.distributed import init_process_group, destroy_process_group 
      
      
      def ddp_setup(rank, world_size):
          """
          setup the distribution process group
      
          Args:
              rank: Unique identifier of each process
              world_size: Total number of processes
          """
          # MASTER Node(运行 rank0 进程,多机多卡时的主机)用来协调各个 Node 的所有进程之间的通信
          os.environ["MASTER_ADDR"] = "localhost" # 由于这里是单机实验所以直接写 localhost
          os.environ["MASTER_PORT"] = "12355"     # 任意空闲端口
          init_process_group(
              backend="nccl",                     # Nvidia CUDA CPU 用这个 "nccl"
              rank=rank,                          
              world_size=world_size
          )
          torch.cuda.set_device(rank)
      
      class Trainer:
          def __init__(
              self,
              model: torch.nn.Module,
              train_data: DataLoader,
              optimizer: torch.optim.Optimizer,
              gpu_id: int,
              save_every: int,
          ) -> None:
              self.gpu_id = gpu_id
              self.model = model.to(gpu_id)
              self.train_data = train_data
              self.optimizer = optimizer
              self.save_every = save_every                    # 指定保存 ckpt 的周期
              self.model = DDP(model, device_ids=[gpu_id])    # model 要用 DDP 包装一下
      
          def _run_batch(self, source, targets):
              self.optimizer.zero_grad()
              output = self.model(source)
              loss = F.cross_entropy(output, targets)
              loss.backward()
              self.optimizer.step()
      
          def _run_epoch(self, epoch):
              b_sz = len(next(iter(self.train_data))[0])
              print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
              self.train_data.sampler.set_epoch(epoch)        # 在各个 epoch 入口调用 DistributedSampler 的 set_epoch 方法是很重要的,这样才能打乱每个 epoch 的样本顺序
              for source, targets in self.train_data: 
                  source = source.to(self.gpu_id)
                  targets = targets.to(self.gpu_id)
                  self._run_batch(source, targets)
      
          def _save_checkpoint(self, epoch):
              ckp = self.model.module.state_dict()            # 由于多了一层 DDP 包装,通过 .module 获取原始参数 
              PATH = "checkpoint.pt"
              torch.save(ckp, PATH)
              print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")
      
          def train(self, max_epochs: int):
              for epoch in range(max_epochs):
                  self._run_epoch(epoch)
                  # 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 ckpt 以免重复保存
                  if self.gpu_id == 0 and epoch % self.save_every == 0:
                      self._save_checkpoint(epoch)
      
      class MyTrainDataset(Dataset):
          def __init__(self, size):
              self.size = size
              self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]
      
          def __len__(self):
              return self.size
          
          def __getitem__(self, index):
              return self.data[index]
      
      def load_train_objs():
          train_set = MyTrainDataset(2048)  # load your dataset
          model = torch.nn.Linear(20, 1)  # load your model
          optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
          return train_set, model, optimizer
      	
      def prepare_dataloader(dataset: Dataset, batch_size: int):
          return DataLoader(
              dataset,
              batch_size=batch_size,
              pin_memory=True,
              shuffle=False,                      # 设置了新的 sampler,参数 shuffle 要设置为 False 
              sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠
          )
      
      def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
          # 初始化进程池
          ddp_setup(rank, world_size)
      
          # 进行训练
          dataset, model, optimizer = load_train_objs()
          train_data = prepare_dataloader(dataset, batch_size)
          trainer = Trainer(model, train_data, optimizer, rank, save_every)
          trainer.train(total_epochs)
         
          # 销毁进程池
          destroy_process_group()
      
      
      if __name__ == "__main__":
          import argparse
          parser = argparse.ArgumentParser(description='simple distributed training job')
          parser.add_argument('--total-epochs', type=int, default=50, help='Total epochs to train the model')
          parser.add_argument('--save-every', type=int, default=10, help='How often to save a snapshot')
          parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
          args = parser.parse_args()
          
          world_size = torch.cuda.device_count()
          
          # 利用 mp.spawn,在整个 distribution group 的 nprocs 个 GPU 上生成进程来执行 fn 方法,并能设置要传入 fn 的参数 args
          # 注意不需要 fn 的 rank 参数,它由 mp.spawn 自动分配
          mp.spawn(
              fn=main, 
              args=(world_size, args.save_every, args.total_epochs, args.batch_size), 
              nprocs=world_size
          )
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
    • 改写后的代码如下所示,请参考注释自行对比
      # 使用 DistributedDataParallel 进行单机多卡训练的基础上,使用 torchrun 进行容错处理,增强程序稳定性
      # torchrun 允许我们在训练过程中按一定保存 snapshots,其中应当包含当前 epoch、模型参数(ckpt)、优化器参数、lr调度器参数等恢复训练所需的全部参数
      # 一旦程序出错退出,torchrun 会自动从最近 snapshots 重启所有进程
      # 除了增强稳定性外,torchrun 还会自动完成所有环境变量设置和进程分配工作,所以不再需要手动设置 rank 或用 mp.spawn 生成并分配进程
      
      import torch
      import torch.nn.functional as F
      from torch.utils.data import Dataset, DataLoader
      import os
      
      # 对 python 多进程的一个 pytorch 包装
      import torch.multiprocessing as mp
      
      # 这个 sampler 可以把采样的数据分散到各个 CPU 上                                      
      from torch.utils.data.distributed import DistributedSampler     
      
      # 实现分布式数据并行的核心类        
      from torch.nn.parallel import DistributedDataParallel as DDP         
      
      # DDP 在每个 GPU 上运行一个进程,其中都有一套完全相同的 Trainer 副本(包括model和optimizer)
      # 各个进程之间通过一个进程池进行通信,这两个方法来初始化和销毁进程池
      from torch.distributed import init_process_group, destroy_process_group 
      
      
      def ddp_setup():
          # torchrun 会处理环境变量以及 rank & world_size 设置
          os.environ["MASTER_ADDR"] = "localhost" # 由于这里是单机实验所以直接写 localhost
          os.environ["MASTER_PORT"] = "12355"     # 任意空闲端口
          init_process_group(backend="nccl")
          torch.cuda.set_device(int(os.environ['LOCAL_RANK'])))
      
      class Trainer:
          def __init__(
              self,
              model: torch.nn.Module,
              train_data: DataLoader,
              optimizer: torch.optim.Optimizer,
              save_every: int,    
              snapshot_path: str,                                 # 保存 snapshots 的位置 
          ) -> None:
              self.gpu_id = int(os.environ['LOCAL_RANK'])         # torchrun 会自动设置这个环境变量指出当前进程的 rank
              self.model = model.to(self.gpu_id)
              self.train_data = train_data
              self.optimizer = optimizer
              self.save_every = save_every                        # 指定保存 snapshots 的周期
              self.epochs_run = 0                                 # 存储将要保存在 snapshots 中的 epoch num 信息
              self.snapshot_path = snapshot_path
      
              # 若存在 snapshots 则加载,这样重复运行指令就能自动继续训练了
              if os.path.exists(snapshot_path):
                  print('loading snapshot')
                  self._load_snapshot(snapshot_path)
              
              self.model = DDP(self.model, device_ids=[self.gpu_id])   # model 要用 DDP 包装一下
      
          def _load_snapshot(self, snapshot_path):
              ''' 加载 snapshot 并重启训练 '''
              loc = f"cuda:{self.gpu_id}"
              snapshot = torch.load(snapshot_path, map_location=loc)
              self.model.load_state_dict(snapshot["MODEL_STATE"])
              self.epochs_run = snapshot["EPOCHS_RUN"]
              print(f"Resuming training from snapshot at Epoch {self.epochs_run}")
          
          def _run_batch(self, source, targets):
              self.optimizer.zero_grad()
              output = self.model(source)
              loss = F.cross_entropy(output, targets)
              loss.backward()
              self.optimizer.step()
      
          def _run_epoch(self, epoch):
              b_sz = len(next(iter(self.train_data))[0])
              print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
              self.train_data.sampler.set_epoch(epoch)
              for source, targets in self.train_data:
                  source = source.to(self.gpu_id)
                  targets = targets.to(self.gpu_id)
                  self._run_batch(source, targets)
      
          def _save_snapshot(self, epoch):
              # 在 snapshot 中保存恢复训练所必须的参数
              snapshot = {
                  "MODEL_STATE": self.model.module.state_dict(),  # 由于多了一层 DDP 包装,通过 .module 获取原始参数 
                  "EPOCHS_RUN": epoch,
              }
              torch.save(snapshot, self.snapshot_path)
              print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")
      
          def train(self, max_epochs: int):
              for epoch in range(self.epochs_run, max_epochs):    # 现在从 self.epochs_run 开始训练,统一重启的情况
                  self._run_epoch(epoch)
      
                  # 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 snapshot 以免重复保存
                  if self.gpu_id == 0 and epoch % self.save_every == 0:
                      self._save_snapshot(epoch)
      
      class MyTrainDataset(Dataset):
          def __init__(self, size):
              self.size = size
              self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]
      
          def __len__(self):
              return self.size
          
          def __getitem__(self, index):
              return self.data[index]
      
      def load_train_objs():
          train_set = MyTrainDataset(2048)  # load your dataset
          model = torch.nn.Linear(20, 1)  # load your model
          optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
          return train_set, model, optimizer
      
      def prepare_dataloader(dataset: Dataset, batch_size: int):
          return DataLoader(
              dataset,
              batch_size=batch_size,
              pin_memory=True,
              shuffle=False,                      # 设置了新的 sampler,参数 shuffle 要设置为 False 
              sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠
          )
      
      def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str="snapshot.pt"):
          # 初始化进程池
          ddp_setup()
      
          # 进行训练
          dataset, model, optimizer = load_train_objs()
          train_data = prepare_dataloader(dataset, batch_size)
          trainer = Trainer(model, train_data, optimizer, save_every, snapshot_path)
          trainer.train(total_epochs)
         
          # 销毁进程池
          destroy_process_group()
      
      if __name__ == "__main__":
          import argparse
          parser = argparse.ArgumentParser(description='simple distributed training job')
          parser.add_argument('--total-epochs', type=int, default=50, help='Total epochs to train the model')
          parser.add_argument('--save-every', type=int, default=10, help='How often to save a snapshot')
          parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
          args = parser.parse_args()
          
          # 现在 torchrun 负责在各个 GPU 上生成进程并执行,不再需要 mp.spawn 了
          main(args.save_every, args.total_epochs, args.batch_size)
      
      '''
      运行命令: 
          torchrun --standalone --nproc_per_node=gpu multi_gpu_torchrun.py
      
      参数说明:
          --standalone 代表单机运行 
          --nproc_per_node=gpu 代表使用所有可用GPU, 等于号后也可写gpu数量n, 这样会使用前n个GPU
      
      运行后获取参数:
          os.environ['RANK']          得到在所有机器所有进程中当前GPU的rank
          os.environ['LOCAL_RANK']    得到在当前node中当前GPU的rank
          os.environ['WORLD_SIZE']    得到GPU的数量
      
      通过 CUDA_VISIBLE_DEVICES 指定程序可见的GPU, 从而实现指定GPU运行:
          CUDA_VISIBLE_DEVICES=0,3 torchrun --standalone --nproc_per_node=gpu multi_gpu_torchrun.py
      '''
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
      • 148
      • 149
      • 150
      • 151
      • 152
      • 153
      • 154
      • 155
      • 156
      • 157
      • 158
      • 159
      • 160
      • 161
      • 162

    3. 调试代码

    • 如果使用 VScode 的话,可以如下编辑 launch.json 文件,然后像往常一样设置断点按 f5 调试即可
      {
          "version": "0.2.0",
          "configurations": [
              {
                  "name": "Python: torchrun",
                  "type": "python",
                  "request": "launch",
                  // 设置 program 的路径为 torchrun 脚本对应的绝对路径
                  "program": "/home/tim/anaconda3/envs/project/lib/python3.8/site-packages/torch/distributed/run.py",
                  // 设置 torchrun 命令的参数
                  "args":[
                      "--standalone",
                      "--nproc_per_node=gpu",
                      "multi_gpu_torchrun.py"
                  ],
                  "console": "integratedTerminal",
                  "justMyCode": true
              }
          ]
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      注意其中 “program” 是你的 torchrun 脚本路径,可使用 pip show torch 查看 torch 的安装路径进而找到它
  • 相关阅读:
    前端mounted的使用
    ECS框架浅析
    Spring如何解决循环依赖问题
    Linux安装Docker完整教程及配置阿里云镜像源
    elasticsearch安装 及 启动异常解决
    第二章:Jvm监控及诊断工具-命令行篇
    JVM监控:JMX组件与底层原理
    部署个人静态网站到阿里云服务器(含域名解析)
    React Native优质开源项目精选
    App Languages 批量导入管理Android多语言文案
  • 原文地址:https://blog.csdn.net/wxc971231/article/details/132827787