如何使用torchrun启动单机多卡DDP并行训练

如何使用torchrun启动单机多卡DDP并行训练

这是一个最近项目中需要使用的方式,新近的数据集大概在40w的规模,而且载入的原始特征都比较大(5~7M),所以准备尝试DistributedDataParallel;

主要目的有两个:

  • 1是通过并行训练有效降低训练耗时,之前也使用过单机单卡,以及单机多卡DataParallel的方式;
  • 2是通过使用并行训练的方式,倍增有效batch_size的数量,提高单卡对显存的利用率,仍然是为了加速;

类似的实现代码你可以在许多技术博客上检索到,以下是我实际实践的主要代码;

单机单卡到单机多卡的代码修改

# 导入库
import math
# import torch
from torch.utils.data import Dataset, DataLoader
import torch.distributed as dist ## DDP
from torch.utils.data.distributed import DistributedSampler ## DDP
from torch.nn.parallel import DistributedDataParallel as DDP ## DDP
from torch.distributed import init_process_group, destroy_process_group ## DDP

# 设置可见的GPU
# os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
# 忽略冗余数据存在的潜在问题
# os.environ["KMP_DUPLICATE_LIB_OK"] = "True"


def train():
    # setup
    local_rank = int(os.environ["LOCAL_RANK"]) ## DDP   
    init_process_group(backend="nccl")
    torch.cuda.set_device(local_rank)

    # ...

    # 
    local_rank = dist.get_rank()
    total_rank = dist.get_world_size()
    device = torch.device("cuda", local_rank)
    
    # random reset
    seed = 1234
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    random.seed(seed)
    np.random.seed(seed)

    # 
    model = Model().to(device)
    if local_rank <= 0:
        print(model)
    # model.load(local_model_path)
    lr *= math.sqrt(total_rank)
    # optimizer = ...
    # scheduler = ...


    #
    # train_datas = Dataset("train")
    sampler = DistributedSampler(train_datas)
    # 或者自定义
    # sampler = MyDistributedSampler(train_data_path)
    train_dataloader = DataLoader(
        train_datas,
        batch_size=batch_size,
        num_workers=num_workers,
        shuffle=(sampler is None),
        sampler=sampler,
        pin_memory=True,
        drop_last=True,
    )
    train_dataloaders = [train_dataloader]

    # warp model
    model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
    model = DDP(model, device_ids=[local_rank], output_device=local_rank, find_unused_parameters=True)


    for epoch in range(max_epoch):
        e_INDEX = train_epoch(epoch, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler)

    dist.destroy_process_group()

def train_epoch(epoch, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler):
    model.train()
    if local_rank >= 0:
        torch.distributed.barrier()

    # 如果使用了自定义的Sampler 记得实现一个对应set_epoch方法功能的新方法,并由采样器进行调用
    if local_rank >= 0:
        for i, train_dataloader in enumerate(train_dataloaders):
            # train_dataloader.sampler.set_epoch(epoch)
            train_dataloader.sampler.shuffle_data_on_ranks(seed=epoch + i)
    
    e_loss = 0
    e_num = 0
    # ...

    for train_dataloader in train_dataloaders:
        for data in train_dataloader:
            loss = train_step(data, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler)

            e_loss += loss[0]
            e_num += loss[1]

    if local_rank >= 0:
        torch.distributed.barrier()
        total = torch.tensor(
            copy.deepcopy([e_loss, e_num]),
            dtype=torch.float32,
            device=device,
        )
        torch.distributed.all_reduce(
            total, torch.distributed.ReduceOp.SUM, async_op=False
        )
        e_loss, e_num = total.tolist()

    e_loss /= e_num

    # 
    if local_rank <= 0:
        if isinstance(model, torch.nn.parallel.DistributedDataParallel):
            model.module.save(opt.notes)   
        else:
            model.save(opt.notes)   

    scheduler.step(e_loss) # if need e_loss
    # scheduler.step()

    # 测试:每个GPU进程输出的模型第一层参数是相同的
    for param in model.parameters():
        print("    GPU{} Model param layer1=>".format(local_rank), param)
        break
    e_INDEX = eval_epcoh(model, val_dataloader, epoch, local_rank)

    return e_INDEX

def train_step(data, local_rank, total_rank, device, train_dataloaders, model, optimizer, scheduler):

    data_a,label_a = data

    data_a = data_a.to(device)
    label_a = label_a.to(device)

    # 这里要注意,如果使用的是自定义api 如这里的compute_loss该方法中 包含了模型的推理和损失计算过程,那么就需要使用model.module进行调用
    # 如果直接使用模型推理 那么直接调用 forward函数即可,如model(data_a)
    if isinstance(model, torch.nn.parallel.DistributedDataParallel):
        loss, _ = model.module.compute_loss(data_a, label_a)
    else:
        loss, _loss_dict_ = model.compute_loss(data_a, label_a)   

    optimizer.zero_grad() 
    loss.backward() 
    optimizer.step() 

    return loss.item(), label_a.shape[0]



对于测试集的验证,可以不使用Sampler(验证集/测试集在每个GPU进程中都相同),仅在某一个GPU中,完整测试即可;

启动方式:

  • torchrun -m --nnodes=1 --nproc_per_node=4 main
  • nohup torchrun -m --nnodes=1 --nproc_per_node=4 main 127.0.0.1 5301 > /dev/null 2>&1 & disown

测试模型的第一层输出(part),可以看到它们都是相同的:

    GPU0 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158,  0.0723, -0.0191],
          [ 0.0715, -0.1204, -0.0295],
          [-0.0790,  0.0723,  0.0963],

    GPU3 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158,  0.0723, -0.0191],
          [ 0.0715, -0.1204, -0.0295],
          [-0.0790,  0.0723,  0.0963],

    GPU1 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158,  0.0723, -0.0191],
          [ 0.0715, -0.1204, -0.0295],
          [-0.0790,  0.0723,  0.0963],

    GPU2 Model param layer1=> Parameter containing:
tensor([[[[ 0.1158,  0.0723, -0.0191],
          [ 0.0715, -0.1204, -0.0295],
          [-0.0790,  0.0723,  0.0963],                    

一点经验

梯度同步后的模型参数不一致

其实主要的代码并不复杂,但是这里我要提到的一点经验是:如果你的模型参数在每个epoch结束之后(梯度同步更新完成)的打印结果,并不一致,并且你的DDP主要代码也与上述代码大同小异,那么很有可能是你的模型有问题(并不是不正确,只不过无法得到DDP的支持);

这也是我的项目所面临的问题:我使用了一个相对简单的模型,可以很好的实践DDP,但另外一个复杂许多的模型就无法通过测试(每个epoch后模型参数在各个GPU进程间会变的不同),具体的后续补充;

整理附上GPT的回答,仅供参考:

我正在使用一个以Conformer为主要网络结构的模型,训练一个音频分类模型,但是我发现这个模型并不能在DP和DDP中得到很好的支持,主要表现在每个epoch训练梯度同步之后,各个GPU进程中的模型参数不一致,而如果我换成一个简单的卷积网实现分类模型,就可以被DP和DDP很好的支持,它们会在每个epoch训练梯度同步之后,使得各个GPU进程中的模型参数完全一致;请根据我所描述的情况,分析为什么这个基于Conformer的模型结构无法被DP和DDP支持?

R:
在使用 DataParallel(DP)和 DistributedDataParallel(DDP)时,确保模型能够正确同步梯度并保持参数一致是关键。对于复杂的模型结构,如基于 Conformer 的音频分类模型,可能会出现一些挑战,导致同步问题。以下是可能的原因:

1. **自定义操作或模块:** 如果你的 Conformer 模型包含自定义的操作或模块,这可能导致 DP 或 DDP 的同步问题。确保所有操作都能够正确地在 GPU 上执行,并且它们的梯度能够被正确地计算和同步。

2. **非标准的层:** Conformer 模型可能包含一些非标准的层,如深度卷积块、多头自注意力等。确保这些层在 DP 和 DDP 中能够正确地进行梯度同步。你可能需要在这些层的实现中查看是否有专门为 DP 和 DDP 提供支持的选项。

3. **权重初始化:** DP 和 DDP 要求每个 GPU 上的模型初始权重是一致的。确保你的模型在每个 GPU 上的权重初始化是相同的,可以考虑在初始化之后显式地同步一次模型参数。

4. **梯度累积:** 在某些情况下,可能会发生梯度累积的情况,导致模型参数不一致。确保在 DP 和 DDP 中使用梯度累积时,模型的参数同步仍然能够正确进行。

5. **DDP 启动方式:** 使用 DDP 时,确保在启动时每个 GPU 上的进程都能够正确初始化。这可能需要查看 DDP 初始化的方式,确保每个进程都按预期地启动。

6. **PyTorch 版本:** 有时,特定版本的 PyTorch 可能对某些模型结构的 DP 和 DDP 支持更好。确保你使用的是最新版本的 PyTorch。

在调试时,可以尝试通过在每个 GPU 上输出和比较梯度、权重等信息,以找出在同步中出现问题的具体位置。此外,查看 PyTorch 的官方文档和社区讨论,以获取与你的模型结构相关的最新支持和最佳实践。

NCCL timeout

Watchdog caught collective operation timeout: WorkNCCL(SeqNum=1363168, OpType=BROADCAST, Timeout(ms)=1800000) ran for 1805862 milliseconds before timing out.

我在测试的时候,每个GPU进程都做了验证集的val,这可能并不必要,有效验证应该仅在Rank0进程进行即可;

当然还有可能是其他原因造成的这个异常,可=>参考链接;

LR的设置

可以是简单的线性增大,我查了一些资料因此有上述代码的实现,但要注意的是,具体增大多少要结合具体的任务,也就是要调参,LR保持不变或更小可能会使得收敛十分缓慢,如果太大又可能使训练崩溃;

一些参考链接:

  • 参考链接1
  • 参考链接2
  • 参考链接3
  • 参考链接4
  • 参考链接5
  • 参考链接6
  • 参考链接7
  • 参考链接8

关于MyDistributedSampler

from torch.utils.data.sampler import Sampler


class MyDistributedSampler(Sampler):
    def __init__(self, data_path, m, batch_size, local_rank, total_rank, device, distribute=False):
        # 这里的读取 与 Dataset中 读取原始数据是一致的(我们使用的文件是excel)
        data_lines = pd.read_excel(data_path)
        self.distribute = distribute

        self.total_rank = -1
        self.local_rank = -1
        self.device = device

        if distribute:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            self.total_rank = total_rank
            self.local_rank = local_rank

        self._m_per_class = m
        self._batch_size = batch_size

        # 拿到与单卡一致的所有数据
        self._labels_to_indices = self._get_labels_to_indices(data_lines)
        # 拿到与单卡一致的索引数据
        self._global_labels = (
            self._labels_to_indices.reset_index()["a_cls"]
            .drop_duplicates()
            .values.tolist()
        )

        if not self.distribute:
            # 单卡情况 待操作数据 就是 所有的索引数据
            self.labels = self._global_labels
        else:
            # 多卡时,则要按照 当前的local_rank 进行划分
            self.labels = self._split_label_randoms(seed=0)[self.local_rank]
        assert (
            self._batch_size % self._m_per_class
        ) == 0, "m_per_class must divide batch_size without any remainder"
        # 采样器的样本数
        self._sample_length = self._get_sample_length()

        print(
            "Init Sampler with Mper with {} items, and m = {}, batch_num = {}"
            "\n".format(self._sample_length, m, self.num_iters())
        )

        return

    def __iter__(self):
        # 创建一个迭代器:迭代长度 与样本数量一致 并初始化为0
        idx_list = [0] * self._sample_length
        i = 0
        num_iters = self.num_iters()
        for _ in range(num_iters):
            random.shuffle(self.labels)
            # 为使用HardTripletLoss计算损失,这里随机 固定个数的分类 以batch_size=32 m=4为例,则这里去的个数为8
            # 对于我们的数据集:实际count的最小关联数是6 因此将m设为4是合理的
            curr_label_set = self.labels[: self._batch_size // self._m_per_class]
            for label in curr_label_set:
                if len(self._labels_to_indices.loc[label]) >= self._m_per_class:
                    items = self._labels_to_indices.loc[label].sample(
                        self._m_per_class
                    ) 
                else:
                    items = self._labels_to_indices.loc[label].sample(
                        self._m_per_class,
                        replace=True
                    )  
                t = []
                for item in items.index.values:
                    cur_id = np.random.choice(
                        self._labels_to_indices.loc[label, item]["ids"],
                        1,
                        replace=False,
                    )[0]
                    t.append(cur_id)
                # 确定采样规范 并返回,可以看到这里已经是一个符合要求的完整序列
                # 注意:对应的 Dataloader 不可以使用shuffle
                idx_list[i : i + self._m_per_class] = t[: self._m_per_class]
                i += self._m_per_class
        return iter(idx_list)

    # set_epoch的实现: 仅多卡的时候才需要调用
    def shuffle_data_on_ranks(self, seed, display_details=False):
        # 每个epoch都要重新划分各个GPU进程所分的的数据(它们彼此不重复)
        self.labels = self._split_label_randoms(seed)[self.local_rank]
        self._sample_length = self._get_sample_length()
        if display_details:
            display = sorted(self.labels)[:5]
            print(
                "Shuffle sampler with {} items, rank = {}, total rank = {}, batch_num = {}, label of head{} is {}".format(
                    self._sample_length,
                    self.local_rank,
                    self.total_rank,
                    self.num_iters(),
                    len(display),
                    display,
                )
            )

    def num_iters(self):
        # 样本数 整除 batch_size的 值,即整个训练集被迭代一次 所需要的次数
        return self._sample_length // self._batch_size

    def _get_sample_length(self):
        # 这就是实际要加载的样本数,由于这里的self.labels 已经根据local_rank完成了划分,因此这个sample_len就是每个GPU进程所分得得样本数
        sample_length = sum([self._labels_to_indices.loc[k]["count"].sum() for k in self.labels])

        # 由于梯度同步时,需要每个GPU进程都有数据,因此如下这段同步只要是为了 保证所有GPU进程中的采样数量是一致的(与最小值相同)(如果你的数据很规整 这段同步实际用不到)
        if self.total_rank >= 0:
            torch.distributed.barrier()
            total = torch.tensor(
                [sample_length],
                dtype=torch.float32,
                device=self.device,
            )
            torch.distributed.all_reduce(
                total, torch.distributed.ReduceOp.MIN, async_op=True
            )
            sample_length = int(total.tolist()[0])

        sample_length -= sample_length % self._batch_size
        return sample_length
    
    def _split_label_randoms(self, seed):
        split_label = []
        global_label = self._global_labels.copy()
        random.Random(seed).shuffle(global_label)
        for i in range(self.total_rank):
            # 可以看到 索引数据会根据 total_rank 间隔取值
            split_label.append(global_label[i :: self.total_rank])
        return split_label

    # @staticmethod
    def _get_labels_to_indices(self, data_lines):
        df = data_lines.groupby(["a_cls", "a_item"]).agg(
            count=("a_id", "count"), ids=("a_id", "unique")
        )
        return df

    def __len__(self):
        return self._sample_length

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
心中带点小风骚的头像心中带点小风骚普通用户
上一篇 2023年12月6日
下一篇 2023年12月6日

相关推荐