Python多进程(multiprocessing)

Python多进程(multiprocessing)

最近,我一直在做一些联邦学习方面的研究。因为联邦学习的过程中涉及到多个客户端的训练,所以我在写程序的一开始并没有在程序中添加多进程相关的代码,导致训练神经网络的速度比较。慢,服务器一直是“一核难,多核围观”😂后来查了一些相关资料,给程序加了多个进程,速度提升不少。这里记录一下自己的一些调试经验。

首先是为什么要使用多进程而不是多线程。因为Python的GIL(全局解释器锁)的存在,导致Python的多线程是虚假的多线程,所以如果要进行并行运算,我们要使用多进程来达到更好的效果

在这次的Python多进程实践中,我主要使用的是multiprocessing这个类。我们先来看一个简单的例子

img

Python多进程训练神经网络:

from datetime import time

import torch.multiprocessing as mp
import torch
from model import MNIST_Net
from torchvision import datasets, transforms


def train(model, data_loader):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    for data, labels in data_loader:
        data = data.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        loss = torch.nn.functional.cross_entropy(model(data), labels)
        loss = loss.to(device)
        loss.backward()
        optimizer.step() 
        print(loss)


if __name__ == '__main__':
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    device_cpu = "cpu"
    train_dataset = datasets.MNIST("", train=True, download=True, transform=transforms.ToTensor())
    eval_dataset = datasets.MNIST("", train=False, transform=transforms.ToTensor())
    data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=512)
    model_1 = MNIST_Net()
    model_2 = MNIST_Net()

    model_1 = model_1.to(device)
    model_2 = model_2.to(device)
    model = [model_1, model_2]
    optimizer_model_1 = torch.optim.Adam(model_1.parameters(), lr=0.001)
    optimizer_model_2 = torch.optim.Adam(model_2.parameters(), lr=0.001)
    processes = []
    process = [mp.Process(target=train, args=(m, data_loader)) for m in model]
    [p.start() for p in process]
    [p.join() for p in process]

假设我们有model_1和model_2两个神经网络,我们希望能够将两个神经网络放到不同的进程里面进行训练,来提高训练的速度。整段代码的核心在最后四行,我们用代码分别为这两个模型的训练创建了一个进程,从而达到了并行化的效果。

Python多进程(multiprocessing)

需要注意的是如果你使用的是Windows而不是Linux系统的话,如果用了cuda就会报错(如上图所示),解决办法是把程序放到Linux上面跑。

如果我们要使用进程间通信,我们可以选择Queue,Pipe或者Pool,一般来说Pipe的速度最快,Queue次之,Pool的速度最慢。下面用一个简单的例子来展示一下其中Queue的用法。

import multiprocessing as mp
from multiprocessing import Queue, Process


def test(queue_1):
    queue_1.put(1)


if __name__ == '__main__':
    candidates = [1, 2, 3]
    queue_1 = Queue()
    process = [Process(target=test, args=(queue_1,)) for c in candidates]
    [p.start() for p in process]
    [p.join() for p in process]
    res = [queue_1.get() for p in process]
    print(res)

结果:

Python多进程(multiprocessing)

从结果可以看出程序的多进程运行正常。

Python多进程中遇到bug,一些可能的解决方案:

  • 去掉多进程中的方法的return
  • 使用 process = [Process(target=test(queue_1)) for c in candidates] 代替原本的那行代码(以上面的例子为例)

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
心中带点小风骚的头像心中带点小风骚普通用户
上一篇 2022年3月29日 下午4:43
下一篇 2022年3月29日 下午4:50

相关推荐