0. 项目组织

  • 本文改写 MinGPT 库中的 chargpt 例程,这是一个 character-level 语言模型项目,组织如下
    Pytorch 多卡并行(3)—— 使用 DDP 加速 minGPT 训练-LMLPHP
  • 说明一下主要文件内容
    1. data/input.txt 是训练用的数据集
    2. char_dataset.py 定义了一个 char-level 的 torch.utils.data.Dataset
    3. gpt_snapshot.pt 是程序运行过程中保存的快照,使用 torchrun 时可以从此重启所有进程的训练
    4. gpt2_train_cfg.yaml 是 yaml 配置文件,记录了训练超参数
    5. main.log 是 hydra 生成的 logging 文件
    6. main.py 是程序入口,符合前文 使用 torchrun 进行容错处理 第1节给出的标准形式
    7. model.py 定义了 GPT 模型结构和 optimizer 的构造方法
    8. trainer.py 定义了训练过程,包括快照保存和加载等操作

1. 参数准备

  • 本项目使用 YAML文件存储超参数设置。YAML 是一种轻量级的数据序列化格式。相较于JSON等其他格式,YAML 更加易读易写,也更加适合用于配置文件等场景。YAML的语法结构主要包含键值对、列表、注释等几种元素
    data_config:
      path: ./data/input.txt
      block_size: 128   # 输入序列长度
      train_split: 0.9  # 训练集和测试集划分
      truncate: 0.02    # 只用5%的数据进行训练
    gpt_config:
      n_layer: 8
      n_head: 8
      n_embd: 512       
    trainer_config:
      max_epochs: 10
      batch_size: 216
      data_loader_workers: 4
      grad_norm_clip: 1.0
      snapshot_path: gpt_snapshot.pt
      save_every: 3
      use_amp: True
    optimizer_config:
      weight_decay: 0.1
      learning_rate: 0.0003
    
    hydra:
      run:
        dir: ./
    
  • 使用 Hydra 来管理超参数,它可以以装饰器的形式方便地加载不同路径下的 yaml 配置文件,最小用例如下
    import hydra
    from omegaconf import DictConfig
    
    @hydra.main(version_base=None, config_path='configs', config_name='config')
    def main(cfg: DictConfig) -> None:
        cfg['key'] # 获得对应的参数值
    
    if __name__ == '__main__':
        main()
    
    这样就把 ./configs/config.yaml 文件的参数加载到 main 函数中了,使用 cfg['key'] 这样的形式获取参数值
  • 使用 Hydra 还有一个好处是它对 logging 标准库进行了包装。在 hydra.main 装饰器中对 log 输出格式规范为 "[%(asctime)s][%(name)s][%(levelname)s] - %(message)s",并设置 level 为 info,运行程序就会自动生成 main.log 日志文件。可以通过命令行的hydra.verbose 参数修改 log 的显示 level

2. 数据准备

  • 使用的数据是 tiny-shakespear 数据集,它是一个记录了一些英文对话的文本文档,截取如下
    First Citizen:
    Before we proceed any further, hear me speak.
    
    All:
    Speak, speak.
    
    First Citizen:
    You are all resolved rather to die than to famish?
    
    All:
    Resolved. resolved.
    
    First Citizen:
    First, you know Caius Marcius is chief enemy to the people.
    
    All:
    We know't, we know't.
    
    First Citizen:
    Let us kill him, and we'll have corn at our own price.
    Is't a verdict?
    
    All:
    No more talking on't; let it be done: away, away!
    
  • 下面来构造数据集,思路是把 txt 文件中所有字符去重排序生成 vocab table;样本生成时先把 txt 内容全部读取进来,然后构造 n-gram 样本。如下
    import torch
    from torch.utils.data import Dataset
    import fsspec
    from dataclasses import dataclass
    
    """
    Adapted from https://github.com/karpathy/minGPT/blob/master/projects/chargpt/chargpt.py
    """
    
    @dataclass
    class DataConfig:
        path: str = None
        block_size: int = None      # 输入序列长度    
        train_split: float = None   # 训练集和测试集划分
        truncate: float = 1.0       # 用于训练的数据占全体数据的比例
    
    class CharDataset(Dataset):
    
        def __init__(self, data_cfg: DataConfig): #data_path: str, block_size):
            # 加载所需比例的数据
            data = fsspec.open(data_cfg.path).open().read().decode('utf-8')
            data = data[ : int(len(data) * data_cfg.truncate)]
    
            # Set 去重,转 list 后排序得到数据集中的唯一字符列表作为词表
            chars = sorted(list(set(data))) 
            data_size, vocab_size = len(data), len(chars)
            print('Data has %d characters, %d unique.' % (data_size, vocab_size))
    
            # 得到字符和词表索引之间的双射
            self.stoi = {ch: i for i, ch in enumerate(chars)}   # 字符 -> 词表索引
            self.itos = {i: ch for i, ch in enumerate(chars)}   # 词表索引 -> 字符
            
            self.block_size = data_cfg.block_size  	# 模型输入序列长度
            self.vocab_size = vocab_size			# 词表尺寸
            self.data = data
    
        def __len__(self):
            return len(self.data) - self.block_size
    
        def __getitem__(self, idx):
            # grab a chunk of (block_size + 1) characters from the data
            chunk = self.data[idx:idx + self.block_size + 1]
            
            # encode every character to an integer
            dix = [self.stoi[s] for s in chunk]
            x = torch.tensor(dix[:-1], dtype=torch.long)
            y = torch.tensor(dix[1:], dtype=torch.long)
            return x, y
    

3. 程序入口

  • 使用 torchrun 命令进行容错,按前文 使用 torchrun 进行容错处理 给出的标准形式来编写程序入口(mian.py),如下
    import os
    import torch
    from torch.utils.data import random_split
    from torch.distributed import init_process_group, destroy_process_group
    from model import GPT, GPTConfig, OptimizerConfig, create_optimizer
    from trainer import Trainer, TrainerConfig
    from char_dataset import CharDataset, DataConfig
    from omegaconf import DictConfig
    import hydra
    
    
    def ddp_setup():
    	os.environ["MASTER_ADDR"] = "localhost" # 由于这里是单机实验所以直接写 localhost
        os.environ["MASTER_PORT"] = "12355"     # 任意空闲端口
        init_process_group(backend="nccl")
        torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    
    def get_train_objs(gpt_cfg: GPTConfig, opt_cfg: OptimizerConfig, data_cfg: DataConfig):
        dataset = CharDataset(data_cfg)
        train_len = int(len(dataset) * data_cfg.train_split)
        train_set, test_set = random_split(dataset, [train_len, len(dataset) - train_len])
    
        gpt_cfg.vocab_size = dataset.vocab_size
        gpt_cfg.block_size = dataset.block_size
        model = GPT(gpt_cfg)
        optimizer = create_optimizer(model, opt_cfg)
        
        return model, optimizer, train_set, test_set
     
    @hydra.main(version_base=None, config_path=".", config_name="gpt2_train_cfg")
    def main(cfg: DictConfig):
        # 初始化进程池
        ddp_setup()
    
        # 从 yaml 文件读取超参数
        gpt_cfg = GPTConfig(**cfg['gpt_config'])
        opt_cfg = OptimizerConfig(**cfg['optimizer_config'])
        data_cfg = DataConfig(**cfg['data_config'])
        trainer_cfg = TrainerConfig(**cfg['trainer_config'])
    
        # 创建训练对象
        model, optimizer, train_data, test_data = get_train_objs(gpt_cfg, opt_cfg, data_cfg)
        trainer = Trainer(trainer_cfg, model, optimizer, train_data, test_data)
        
        # 开始训练
        trainer.train()
    
        # 训练完成后,销毁进程池
        destroy_process_group()
    
    
    if __name__ == "__main__":
        main()
    
  • 注意其中使用 hydra.main 装饰器来加载参数;运行时使用以下命令指定 GPU 运行
    CUDA_VISIBLE_DEVICES=1,2 torchrun --standalone --nproc_per_node=gpu main.py
    

4. 定义模型

  • 整个模型定义部分相比 MinGPT 原始代码逻辑上没有区别,只是换了一下写法看起来更清晰一点。首先定义两个 @dataclass 保存模型和优化器参数
    from dataclasses import dataclass
    import math
    import torch
    import torch.nn as nn
    from torch.nn import functional as F
    
    @dataclass
    class GPTConfig:
        model_type: str = 'gpt2'
        # model configurations
        n_layer: int = None
        n_head: int = None
        n_embd: int =  None
        # openai's values for gpt2
        vocab_size: int = 50257 
        block_size: int = 1024
        # dropout hyperparameters
        embd_pdrop: float = 0.1
        resid_pdrop: float = 0.1
        attn_pdrop: float = 0.1
    
    @dataclass
    class OptimizerConfig:
        learning_rate: float = 3e-4
        weight_decay: float = 0.1
    
  • 定义多头 masked self-attention 模块,原本 MinGPT 库是全部手写的,这里则用了 pytorch 自己的多头注意力模块。具体做法是使用 torch.nn.MultiheadAttention 定义普通多头注意力层,在 forward 方法中用同一个序列输入构造 qkv 实现 self-attention,再用过对注意力输出设置遮盖实现 mask
    class MultiheadAttentionLayer(nn.Module):
        """
        A multi-head masked self-attention layer with a projection at the end.
        """
    
        def __init__(self, config, device="cpu", dtype=torch.float32):
            super().__init__()
            assert config.n_embd % config.n_head == 0
            self.resid_drop = nn.Dropout(config.resid_pdrop)
            
            # output projection
            self.c_proj = nn.Linear(config.n_embd, config.n_embd, device=device, dtype=dtype)
    
            # Causal mask。注意这个mask是通过 self.register_buffer 方法登记的
            # 这样登记过的张量可以求梯度也可以随模型在 CPU/GPU 之间移动,但是不进行参数优化
            self.register_buffer("mask", torch.tril(torch.ones(config.block_size, config.block_size))
                                 .view(1, 1, config.block_size, config.block_size))
            
            self.attn = torch.nn.MultiheadAttention(
                embed_dim=config.n_embd,
                num_heads=config.n_head,
                dropout=config.attn_pdrop,
                batch_first=True,
                device=device,
                dtype=dtype
            )
    
        def forward(self, x):
            _, seq_size, _ = x.size()   # batch size, sequence length, embedding dimensionality (n_embd)
            y = self.attn(x, x, x, attn_mask=self.mask[0, 0, :seq_size, :seq_size])[0]
            y = self.resid_drop(self.c_proj(y))
            return y
    
  • 定义 Transformer block
    class Block(nn.Module):
        """ an unassuming Transformer block """
        def __init__(self, config: GPTConfig):
            super().__init__()
            self.ln1 = nn.LayerNorm(config.n_embd)
            self.ln2 = nn.LayerNorm(config.n_embd)
            self.attn = MultiheadAttentionLayer(config)
            self.mlp = nn.Sequential(
                nn.Linear(config.n_embd, 4 * config.n_embd),
                nn.GELU(),
                nn.Linear(4 * config.n_embd, config.n_embd),
                nn.Dropout(config.resid_pdrop),
            )
    
        def forward(self, x):
            x = x + self.attn(self.ln1(x))
            x = x + self.mlp(self.ln2(x))
            return x
    
  • 定义字符嵌入层,用 nn.Embedding 嵌入 token,再设置一个 nn.Parameter 作为可学习的位置编码
    class EmbeddingStem(nn.Module):
        def __init__(self, config: GPTConfig, device="cpu", dtype=torch.float32):
            super().__init__()
            self.tok_emb = nn.Embedding(config.vocab_size, config.n_embd, device=device, dtype=dtype)
            self.pos_emb = nn.Parameter(torch.zeros(1, config.block_size, config.n_embd, device=device, dtype=dtype))
            self.drop = nn.Dropout(config.embd_pdrop)
            self.block_size = config.block_size
    
        def reset_parameters(self): 
            self.tok_emb.reset_parameters() # 将 nn.Embedding 层参数初始化为正态分布采样
    
        def forward(self, idx):
            b, t = idx.size()
            assert t <= self.block_size, f"Cannot forward sequence of length {t}, block size is only {self.block_size}"
    
            token_embeddings = self.tok_emb(idx)            # each index maps to a (learnable) embedding vector
            position_embeddings = self.pos_emb[:, :t, :]    # each position maps to a (learnable) position vector
            return self.drop(token_embeddings + position_embeddings)
    
  • 把以上组件合在一起,定义 GPT 模型
    class GPT(nn.Module):
        """ GPT Language Model """
    
        def __init__(self, config: GPTConfig):
            super().__init__()
            self.block_size = config.block_size
            config = self._set_model_config(config)
    
            # input embedding stem
            self.emb_stem = EmbeddingStem(config)
            # transformer
            self.blocks = nn.Sequential(*[Block(config) for _ in range(config.n_layer)])
            # decoder head
            self.ln_f = nn.LayerNorm(config.n_embd)
            self.head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
    
            # init all weights, and apply a special scaled init to the residual projections, per GPT-2 paper
            self.apply(self._init_weights)
            for pn, p in self.named_parameters():
                if pn.endswith('c_proj.weight'):
                    p.data.normal_(mean=0.0, std=0.02/math.sqrt(2 * config.n_layer))
    
            # report number of parameters (note we don't count the decoder parameters in lm_head)
            n_params = sum(p.numel() for p in self.blocks.parameters())
            print("number of parameters: %.2fM" % (n_params/1e6,))
    
        def _set_model_config(self, config):
            type_given = config.model_type is not None
            params_given = all([config.n_layer is not None, config.n_head is not None, config.n_embd is not None])
            # assert type_given ^ params_given # exactly one of these (XOR)
            if type_given and not params_given:
                # translate from model_type to detailed configuration
                config.__dict__.update({
                    # names follow the huggingface naming conventions
                    # GPT-1
                    'openai-gpt':   dict(n_layer=12, n_head=12, n_embd=768),  # 117M params
                    # GPT-2 configs
                    'gpt2':         dict(n_layer=12, n_head=12, n_embd=768),  # 124M params
                    'gpt2-medium':  dict(n_layer=24, n_head=16, n_embd=1024), # 350M params
                    'gpt2-large':   dict(n_layer=36, n_head=20, n_embd=1280), # 774M params
                    'gpt2-xl':      dict(n_layer=48, n_head=25, n_embd=1600), # 1558M params
                    # Gophers
                    'gopher-44m':   dict(n_layer=8, n_head=16, n_embd=512),
                    # (there are a number more...)
                    # I made these tiny models up
                    'gpt-mini':     dict(n_layer=6, n_head=6, n_embd=192),
                    'gpt-micro':    dict(n_layer=4, n_head=4, n_embd=128),
                    'gpt-nano':     dict(n_layer=3, n_head=3, n_embd=48),
                }[config.model_type])
            return config
        
        def _init_weights(self, module):
            if isinstance(module, (nn.Linear, nn.Embedding)):
                module.weight.data.normal_(mean=0.0, std=0.02)
                if isinstance(module, nn.Linear) and module.bias is not None:
                    module.bias.data.zero_()
            elif isinstance(module, nn.LayerNorm):
                module.bias.data.zero_()
                module.weight.data.fill_(1.0)
    
        def forward(self, idx, targets=None):
            x = self.emb_stem(idx)
            x = self.blocks(x)
            x = self.ln_f(x)
            logits = self.head(x)
    
            # if we are given some desired targets also calculate the loss
            loss = None
            if targets is not None:
                loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
    
            return logits, loss
    
        @torch.no_grad()
        def generate(self, idx, max_new_tokens, temperature=1.0, do_sample=False, top_k=None):
            """
            Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and complete
            the sequence max_new_tokens times, feeding the predictions back into the model each time.
            Most likely you'll want to make sure to be in model.eval() mode of operation for this.
            """
            for _ in range(max_new_tokens):
                # if the sequence context is growing too long we must crop it at block_size
                idx_cond = idx if idx.size(1) <= self.block_size else idx[:, -self.block_size:]
                # forward the model to get the logits for the index in the sequence
                logits, _ = self(idx_cond)
                # pluck the logits at the final step and scale by desired temperature
                logits = logits[:, -1, :] / temperature
                # optionally crop the logits to only the top k options
                if top_k is not None:
                    v, _ = torch.topk(logits, top_k)
                    logits[logits < v[:, [-1]]] = -float('Inf')
                # apply softmax to convert logits to (normalized) probabilities
                probs = F.softmax(logits, dim=-1)
                # either sample from the distribution or take the most likely element
                if do_sample:
                    idx_next = torch.multinomial(probs, num_samples=1)
                else:
                    _, idx_next = torch.topk(probs, k=1, dim=-1)
                # append sampled index to the running sequence and continue
                idx = torch.cat((idx, idx_next), dim=1)
    
            return idx
    
  • 最后我们来定义优化器,
    def create_optimizer(model: torch.nn.Module, opt_config: OptimizerConfig):
        """
        This long function is unfortunately doing something very simple and is being very defensive:
        We are separating out all parameters of the model into two buckets: those that will experience
        weight decay for regularization and those that won't (biases, and layernorm/embedding weights).
        We are then returning the PyTorch optimizer object.
        """
    
        # separate out all parameters to those that will and won't experience regularizing weight decay
        decay = set()
        no_decay = set()
        whitelist_weight_modules = (torch.nn.Linear, )
        blacklist_weight_modules = (torch.nn.LayerNorm, torch.nn.Embedding)
        for mn, m in model.named_modules():
            for pn, p in m.named_parameters():
                fpn = '%s.%s' % (mn, pn) if mn else pn # full param name
                # random note: because named_modules and named_parameters are recursive
                # we will see the same tensors p many many times. but doing it this way
                # allows us to know which parent module any tensor p belongs to...
                if pn.endswith('bias'):
                    # all biases will not be decayed
                    no_decay.add(fpn)
                elif pn.endswith('weight') and isinstance(m, whitelist_weight_modules):
                    # weights of whitelist modules will be weight decayed
                    decay.add(fpn)
                elif pn.endswith('in_proj_weight'):
                    # MHA projection layer
                    decay.add(fpn)
                elif pn.endswith('weight') and isinstance(m, blacklist_weight_modules):
                    # weights of blacklist modules will NOT be weight decayed
                    no_decay.add(fpn)
                elif pn.endswith('pos_emb'):
                    # positional embedding shouldn't be decayed
                    no_decay.add(fpn)
    
        # validate that we considered every parameter
        param_dict = {pn: p for pn, p in model.named_parameters()}
        inter_params = decay & no_decay
        union_params = decay | no_decay
        assert len(inter_params) == 0, "parameters %s made it into both decay/no_decay sets!" % (str(inter_params), )
        assert len(param_dict.keys() - union_params) == 0, "parameters %s were not separated into either decay/no_decay set!" \
                                                    % (str(param_dict.keys() - union_params), )
    
        # create the pytorch optimizer object
        optim_groups = [
            {"params": [param_dict[pn] for pn in sorted(list(decay))], "weight_decay": opt_config.weight_decay},
            {"params": [param_dict[pn] for pn in sorted(list(no_decay))], "weight_decay": 0.0},
        ]
        optimizer = torch.optim.AdamW(optim_groups, lr=opt_config.learning_rate, betas=(0.9, 0.95))
        return optimizer
    
    这里主要是通过权重衰减方法来进行正则化避免过拟合。注意到作者通过一个二重遍历考察 GPT 模型所有 sub module 的所有 parameters,仅对所有 torch.nn.Linear 层的 weight 参数进行衰减,bias 参数及所有 torch.nn.LayerNormtorch.nn.Embedding 模块的参数都不做处理。由于模块是递归组织的,这个二重遍历会重复访问很多参数,所以通过 set 自动去重,最后根据处理结果定义 torch.optim.AdamW 优化器返回

5. 定义 Trainer

  • Trainer 定义和原始 MinGPT 库主要有两个区别

    1. 按指定周期要求 rank0 进程保存 snapshot,本项目中应包含 epoch、模型参数和优化器参数三部分内容;初始化 Trainer 时应当加载可能存在的 snapshot 文件,这样在 torchrun 自动重启进程时可以从最近的 snapshot 恢复训练

    2. 可以使用 torch.cuda.amp.GradScaler 进行混合精度训练

      本项目使用 pytorch 的 amp 库进行混合精度训练,主要用到 GradScaler 和 autocast 两个组件。其中 Gradscalar 对会检查梯度是否发现溢出,并对优化器进行控制 (将丢弃的batches转换为 no-op);autocast 是一个上下文管理器,当进入 autocast 上下文后,tensor 的数据类型会自动转换为半精度浮点型,从而在不损失训练精度的情况下加快运算,而不需要手动调用 .half()。 一个最小实践示例为

      from torch.cuda.amp import autocast as autocast, GradScaler
      '''
      other code
      '''
       
      # 在训练最开始之前实例化一个GradScaler对象
      scaler = GradScaler()
      '''
      other code
      '''
              # 前向过程(model + loss)开启 autocast
              with autocast():
                  output = model(input)
                  loss = loss_fn(output, target)
       
              # Scales loss,这是因为半精度的数值范围有限,因此需要用它放大
              scaler.scale(loss).backward()
       
              # scaler.step() unscale之前放大后的梯度,但是scale太多可能出现inf或NaN
              # 故其会判断是否出现了inf/NaN
              # 如果梯度的值不是 infs 或者 NaNs, 那么调用optimizer.step()来更新权重,
              # 如果检测到出现了inf或者NaN,就跳过这次梯度更新,同时动态调整scaler的大小
              scaler.step(optimizer)
       
              # 查看是否要更新scaler,这个要注意不能丢
              scaler.update()
       
      '''
      other code
      '''
      
  • 下面开始分析 trainer 代码,首先定义两个 @dataclass 存储 Trainer 参数和 snapshot 参数

    @dataclass
    class TrainerConfig:
        max_epochs: int = None
        batch_size: int = None
        data_loader_workers: int = None
        grad_norm_clip: float = None
        snapshot_path: Optional[str] = None
        save_every: int = None
        use_amp: bool = None
    
    @dataclass
    class Snapshot:
        model_state: 'OrderedDict[str, torch.Tensor]'
        optimizer_state: Dict[str, Any]
        finished_epoch: int
    
  • 定义 Trianer 的初始化方法

    class Trainer:
        def __init__(self, trainer_config: TrainerConfig, model, optimizer, train_dataset, test_dataset=None):
            self.config = trainer_config
            # set torchrun variables
            self.local_rank = int(os.environ["LOCAL_RANK"]) # 在所有node的所有进程中当前GPU进程的rank
            self.global_rank = int(os.environ["RANK"])      # 在当前node中当前GPU进程的rank
            
            # data stuff
            self.train_dataset = train_dataset
            self.train_loader = self._prepare_dataloader(train_dataset)
            self.test_loader = self._prepare_dataloader(test_dataset) if test_dataset else None
            
            # initialize train states
            self.epochs_run = 0
            self.model = model.to(self.local_rank)
            self.optimizer = optimizer        
            self.save_every = self.config.save_every
    
            # load snapshot if available. only necessary on the first node.
            if self.config.snapshot_path is None:
                self.config.snapshot_path = "snapshot.pt"
            self._load_snapshot()
    
            # wrap with DDP. this step will synch model across all the processes.
            self.model = DDP(self.model, device_ids=[self.local_rank])
    
            # torch.cuda.amp.GradScaler 是一个用于自动混合精度训练的 PyTorch 工具,它可以帮助加速模型训练并减少显存使用量
            # 具体来说,GradScaler 可以将梯度缩放到较小的范围,以避免数值下溢或溢出的问题,同时保持足够的精度以避免模型的性能下降
            if self.config.use_amp: 
                self.scaler = torch.cuda.amp.GradScaler()
    

    注意几点

    1. torchrun 帮助我们自动分发进程,通过环境变量获取当前运行代码的 GPU rank 信息
    2. 初始化 Trainer 时加载可能存在的 snapshot,实现断点续训
    3. 模型使用 DDP 进行包装
    4. 定义混合精度训练所需的 torch.cuda.amp.GradScaler()
  • 定义 DataLoder,注意使用 DistributedSampler 来分发训练数据

    def _prepare_dataloader(self, dataset: Dataset):
       return DataLoader(
            dataset,
            batch_size=self.config.batch_size,
            pin_memory=True,
            shuffle=False,
            num_workers=self.config.data_loader_workers,
            sampler=DistributedSampler(dataset)                 # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠
        )
    
  • 定义 snapshot 的加载和保存方法

    def _save_snapshot(self, epoch):
    	# capture snapshot
    	model = self.model
    	raw_model = model.module if hasattr(model, "module") else model
    	snapshot = Snapshot(
    	    model_state=raw_model.state_dict(),
    	    optimizer_state=self.optimizer.state_dict(),
    	    finished_epoch=epoch
    	)
    	# save snapshot
    	snapshot = asdict(snapshot)
    	torch.save(snapshot, self.config.snapshot_path)
    	print(f"Snapshot saved at epoch {epoch}")
    
    def _load_snapshot(self):
        try:
            snapshot = fsspec.open(self.config.snapshot_path)   # fsspec 为各种后端存储系统提供统一的 Python 接口,可以用相同的语法打开本地、AWS S3 和 GCS 等各种云存储平台的文件
            with snapshot as f:
                snapshot_data = torch.load(f, map_location="cpu")
        except FileNotFoundError:
            print("Snapshot not found. Training model from scratch")
            return 
    
        snapshot = Snapshot(**snapshot_data)
        self.model.load_state_dict(snapshot.model_state)
        self.optimizer.load_state_dict(snapshot.optimizer_state)
        self.epochs_run = snapshot.finished_epoch
        print(f"Resuming training from snapshot at Epoch {self.epochs_run}")
    
  • 定义训练流程

    def _run_batch(self, source, targets, train: bool = True) -> float:
        with torch.set_grad_enabled(train), torch.cuda.amp.autocast(dtype=torch.float16, enabled=(self.config.use_amp)):
            _, loss = self.model(source, targets)
        
        if train:
            self.optimizer.zero_grad(set_to_none=True)
            if self.config.use_amp: 
                self.scaler.scale(loss).backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_norm_clip)
                self.scaler.step(self.optimizer)
                self.scaler.update()
            else:
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_norm_clip)
                self.optimizer.step()
        
        #return loss.item()
        return loss
    
    def _run_epoch(self, epoch: int, dataloader: DataLoader, train: bool = True):
        dataloader.sampler.set_epoch(epoch)
        for iter, (source, targets) in enumerate(dataloader):
            step_type = "Train" if train else "Eval"
            source = source.to(self.local_rank)
            targets = targets.to(self.local_rank)
            batch_loss = self._run_batch(source, targets, train)
            if iter % 100 == 0:
                #print(f"[GPU{self.global_rank}] Epoch {epoch} | Iter {iter} | {step_type} Loss {batch_loss.item():.5f}")
                if train:
                    print(f"[GPU{self.global_rank}] Epoch {epoch} | Iter {iter} | {step_type} Loss {batch_loss.item():.5f}")
                else:
                    eval_loss_list = [torch.zeros_like(batch_loss) for _ in range(int(os.environ['WORLD_SIZE']))]
                    dist.gather(
                        batch_loss,
                        eval_loss_list if self.local_rank == 0 else None, 
                        dst=0
                    )
                    if self.local_rank == 0:
                        for i, loss in enumerate(eval_loss_list):
                            print(f"[GPU{i}] Epoch {epoch} | Iter {iter} | {step_type} Loss {loss.item():.5f}")
    
    def train(self):
        for epoch in range(self.epochs_run, self.config.max_epochs):
            epoch += 1
            
            # train for one epoch
            self._run_epoch(epoch, self.train_loader, train=True)
    
            # 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 snapshot 以免重复保存
            if self.local_rank == 0 and epoch % self.save_every == 0:
                self._save_snapshot(epoch)
    
            # eval run
            if self.test_loader:
                self._run_epoch(epoch, self.test_loader, train=False)
    

    这里需要注意几点:

    1. 指定 rank0 进程保存 snapshot 以免重复保存
    2. _run_batch 方法中,计算 loss 的部分设置在 torch.amp.autocast 上下文中,启动混合精度训练
    3. _run_epoch 方法中,使用 torch.distributed.gather 原语汇聚各个 GPU 的验证损失信息到 rank0 上,常用这种操作进行 log 训练信息。除此以外 Pytorch 一共提供了六个进程通信原语,如下
      import torch.distributed as dist
      
      dist.broadcast(tensor, src, group)				# 将 tensor 从 src 复制到所有其他进程。
      dist.reduce(tensor, dst, op, group)				# 将 op 应用于每个 tensor 并将结果存储在 dst 中。
      dist.all_reduce(tensor, op, group)				# 与 reduce 相同,但结果存储在所有进程中。
      dist.scatter(tensor, scatter_list, src, group)	# 复制  tensor scatter_lost[i] 到  进程
      dist.gather(tensor,gather_list, dst, group)		# 从 dst 中的所有进程复制 tensor。
      dist.all_gather(tensor_list, tensor, group)		# 将所有进程的 tensor 复制到所有进程上的 tensor_list。
      dist.barrier(group)								# 阻塞组中的所有进程,直到每个进程都进入该函数。
      
      其中 op 操作有四种
      dist.ReduceOp.SUM,
      dist.ReduceOp.PRODUCT,
      dist.ReduceOp.MAX,
      dist.ReduceOp.MIN.
      
      这些方法在需要手动汇聚或分发信息时特别有用,具体用法可以参考 pytorch 官方文档
09-13 05:38