PyTorch多进程模型推理

进程和线程

进程:一个在内存中运行的应用程序,每个进程有自己独立的一块内存空间。资源分配的最小单位

线程:进程中的一个执行单元,程序执行的最小单位。一个进程可以有多个线程。

Python的多线程特点:在Python中,由于GIL的存在,在多线程的时候,同一时间只能有一个线程在CPU上运行,而且是单个CPU,不管CPU核数为多少。所以,Python不能利用多线程发挥多核的优势,但是,可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

什么时候使用多线程/多进程:在python中,如果一个进程包含多个线程,做CPU密集型任务时,多线程并不能有多少效率提升,相反可能还会因为线程的频繁切换导致效率下降,此时推荐使用多进程;如果做IO密集型任务,多线程的进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。

Python中单线程、多线程和多进程的效率对比实验 | 菜鸟教程 (runoob.com)

Python多进程实现方法

多进程的实现

Python的多进程是通过multiprocessing模块实现,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象。这个进程对象的方法和线程对象的方法差不多,也有start(), run(), join()等方法

from multiprocessing import  Process

def fun1(name):
    print('测试%s多进程' %name)

if __name__ == '__main__':
    process_list = []
    for i in range(5):  # 开启 5 个子进程执行fun1函数
        p = Process(target=fun1,args=('Python',))  # 实例化进程对象
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    print('结束测试')

多进程之间的通信

由于每个进程有自己独立的一块内存空间,系统独立分配资源(CPU、内存),因此进程之间是独立的。每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能共享,这是多进程使用时与多线程的区别。所以,不同的多进程之间需要通信

常用的多进程之间的通信方式有:队列Queue、管道Pipe、Managers。

Queue和Pipe实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据,需要用到Managers来共享内存。

以Queue为例,Python中多进程的通信如下:

def func1(i):
    time.sleep(1)
    print(f'args {i}')

def run__queue():
    from multiprocessing import Process, Queue

    queue = Queue(maxsize=4)  # the following attribute can call in anywhere
    queue.put(True)
    queue.put([0, None, object])  # you can put deepcopy thing
    queue.qsize()  # the length of queue
    print(queue.get())  # First In First Out
    print(queue.get())  # First In First Out
    queue.qsize()  # the length of queue

    process = [Process(target=func1, args=(queue,)),
               Process(target=func1, args=(queue,)), ]
    [p.start() for p in process]
    [p.join() for p in process]

if __name__ =='__main__':
    run__queue()

进程池

进程池维护一个进程序列,使用时去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:apply:同步,一般不使用;apply_async:异步,常用。

但是pool.apply_async不能和pytorch推理一起用,受到spwan的影响。因为GPU模型Pytorch规定多进程的启动方法必须是“spawn”,使用map_async或者apply_async这类方法都不行。

Pytorch 多进程在单卡上测试_咆哮的阿杰的博客-CSDN博客_单卡多进程计算

并且,进程池pool的两个父子进程之间通信不能用Queue,需要Manager。

  • Pool和Process的区别

  • Process需要自己管理进程,起一个Process就是起一个新进程;

  • Pool是进程池,它可以开启固定数量的进程,然后将任务放到一个池子里,系统来调度多进程执行池子里的任务;

参考

  • Python如何使用多进程Process、Pool、Queue、Manager等

一篇文章搞定Python多进程(全) – 知乎 (zhihu.com)

在Python中优雅地用多进程 – 知乎 (zhihu.com)

  • Python使用 Pool.apply_async 和 Manager.Queue实现进程池通信

Python高级——消息队列(Queue)与进程池(Pool)_HMMHMH的博客-CSDN博客

Python中的Queue与多进程(multiprocessing)_SQZHAO的博客-CSDN博客_python queue 多进程

Pytorch结合多进程的使用场景

场景1:读取图片数据,判断是否损坏

只需要给函数open_image使用多进程,不需要考虑进程通信,采用进程池Pool的imap方法

def open_image(img_name):
    try:
        Image.open(os.path.join(image_path, img_name))
    except:
        return img_name

def detect_broken_delete(img_path, delete=False):
    listiter = os.listdir(img_path)
    process_num = max(cpu_count() - 2, 1)
    with Pool(process_num) as pool:
        output = set(tqdm(pool.imap(open_image, listiter), total=len(listiter)))
    print("broken images:", output)
    if delete:
        # delete broken images
        for img in output:
            if img:
                os.remove(os.path.join(img_path, img))
        print("broken images have been deleted.")

场景2:Pytorch减少模型推理时间

在有些时候,pytorch模型的性能瓶颈可能不在模型推断,而是在图像的预处理和后处理。这时候将torch模型转为onnx或者tensorRT收益不大,但可以使用多进程缩短前后处理时间和推理时间。

下面的例子为人脸解析face parsing模型推理案例:推理1w张图片大概从20min缩短为10min。

简单描述:

  1. 首先定义三个函数,分别为预处理preprocess_img,模型推断inference,后处理afterprocess_img(将人脸解析的分割mask作用到原图上,提取想要的人脸部分)

  1. 然后需要定义进程函数,用于读取Queue的数据并用前三个函数处理,实现进程之间的通信。进程函数的输入为queue相关的或者全局变量,然后通过queue的get和put方法实现数据传递。

  1. 总的流程为,第一个进程函数getimgpath_process获取图像,存到第一个队列img_path_queue;第二个进程函数preprocessimg_process,读取第一个队列,并预处理,结果存到第二个队列img_queue;第三个进程函数inference_process读取第二个队列,做出推断,结果存在第三个队列result_queue;第四个进程函数afterprocessimg_process读取第三个队列,做后处理并保存。

  1. 一共有三个Queue存数据、四个Process处理数据。在evaluate_multiprocess中,第一个函数读取全部图像路径,只采用单个进程;后面三个进程函数都采用了8个进程数量。最后关闭进程。

  1. 注意,需要通过 torch.multiprocessing.set_start_method(“spawn”)来设置pytorch多进程和cuda的使用。这样可以在单卡上同时处理多个图片,8个进程就可以同时单卡处理8张图片;当然,需要注意显存的使用,可以和onnx等结合加速推理和减少显存占用。

from model import BiSeNet
import torch
import os
import os.path as osp
import numpy as np
from PIL import Image, ImageFile
from tqdm import tqdm
import time
import torchvision.transforms as transforms
from multiprocessing import Pool, cpu_count, Process, Queue
import torch.multiprocessing as mp
ImageFile.LOAD_TRUNCATED_IMAGES = True
Image.MAX_IMAGE_PIXELS = None


to_tensor = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])

def load_net(model_path="res/cp/79999_iter.pth"):
    n_classes = 19
    net = BiSeNet(n_classes=n_classes)
    net.cuda()
    net.load_state_dict(torch.load(model_path))
    net.eval()
    return net

def preprocess_img(img_path):
    img = Image.open(img_path)
    image = img.resize((512, 512), Image.BILINEAR).convert('RGB')
    img_arr = np.array(image)
    img = to_tensor(image)
    img = torch.unsqueeze(img, 0)
    return img, img_arr

def inference(net, img):
    with torch.no_grad():
        img = img.cuda()
        out = net(img)[0]
        parsing = out.squeeze(0).cpu().numpy().argmax(0)
        return parsing

def afterprocess_img(parsing, img_arr, img_path, res_path):
    mask = np.zeros_like(img_arr)
    indices = np.isin(parsing, [1, 2, 3, 10, 12, 13])  # face_dataset config: 1:skin, 2:l_brow, 3:r_brow, 4:nose, 12:u_lip, 13:l_lip
    mask[indices] = img_arr[indices]
    img_mask = Image.fromarray(mask)
    img_mask.save(osp.join(res_path, osp.basename(img_path)))
    print(f"save {osp.basename(img_path)}")


# accelerate infer by multiprocess, useing Queue in communication between processes (4 Processes & 3 Queues)
# ==================================================================================
# P(getimgpath)-----P(preprocess)-----P(inference)-----P(afterprocess)-->save img
#                |                 |                |
#        Queue(img_path)        Queue(img)     Queue(result)
# ==================================================================================

# get img_path process
def getimgpath_process(root_path, img_path_queue):
    for img_name in os.listdir(root_path):
        img_path = osp.join(root_path, img_name)
        img_path_queue.put(img_path)

# preprocess img process
def preprocessimg_process(img_path_queue, img_queue):
    while True:
        img_path = img_path_queue.get()
        img, img_arr = preprocess_img(img_path)
        img_queue.put((img, img_arr, img_path))

# inference process
def inference_process(net, img_queue, result_queue):
    while True:
        img, img_arr, img_path = img_queue.get()
        parsing = inference(net, img)
        result_queue.put((parsing, img_arr, img_path))

# afterprocess img process
def afterprocessimg_process(result_queue, res_path):
    while True:
        parsing, img_arr, img_path = result_queue.get()
        afterprocess_img(parsing, img_arr, img_path, res_path)


def evaluate_multiprocess(net, root_path, res_path):
    if not os.path.exists(res_path):
        os.makedirs(res_path)
    mp.set_start_method("spawn")
    img_path_queue, img_queue, result_queue = Queue(), Queue(), Queue()
    # pool.apply_async can not use in spawn.
    Imagepath_Process = Process(target=getimgpath_process, args=(root_path, img_path_queue))
    Imagepath_Process.start()
    for i in range(8):
        Preprocess_Process = Process(target=preprocessimg_process, args=(img_path_queue, img_queue))
        Preprocess_Process.start()
    for i in range(8):
        Inference_Process = Process(target=inference_process, args=(net, img_queue, result_queue))
        Inference_Process.start()
    for i in range(8):
        Afterprocess_Process = Process(target=afterprocessimg_process, args=(result_queue, res_path))
        Afterprocess_Process.start()
    # Imagepath_Process.start()
    # Preprocess_Process.start()
    # Inference_Process.start()
    # Afterprocess_Process.start()
    time.sleep(1)  # wait process starting, replace of `join`
    stime = time.time()
    while True:
        if(result_queue.empty() and (img_queue.empty()) and (img_path_queue.empty())):
            time.sleep(1) # wait final process ending
            Imagepath_Process.terminate()
            Preprocess_Process.terminate()
            Inference_Process.terminate()
            Afterprocess_Process.terminate()
            img_path_queue.close()
            img_queue.close()
            result_queue.close()
            break
        else:
            pass
    etime = time.time()
    print(f"all images cost {etime - stime} seconds")


if __name__ == "__main__":
    net = load_net()
    root_path="test"
    res_path='testtest'
    evaluate_multiprocess(net, root_path, res_path)

参考

多进程缩短推理时间 – 知乎 (zhihu.com)

pytorch多进程最佳实践_小篆的博客-CSDN博客_pytorch 多进程

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
xiaoxingxing的头像xiaoxingxing管理团队
上一篇 2023年6月26日
下一篇 2023年6月26日

相关推荐