Python获取多进程执行的返回值

Python获取多进程执行的返回值

众所周知,因为GIL的原因,Python至今不支持真正的多线程。为了达到并行运行的目的,我们往往就需要运行多进程了。
一个任务由一个进程来运行,可是它的结果怎么来获取呢?

方法-1.

第一种方法是记录在全局变量中。当然这时候要注意可能会需要用到Lock. 下面是一个例子。

Program-1

import multiprocessing
from multiprocessing import Pool


info_manager = multiprocessing.Manager()
info_lock = info_manager.Lock()
info_dict = info_manager.dict()


def add(n):
    global info_dict, info_lock 
    
    s = 0
    for i in range(n+1):
        s += i
    
    info_lock.acquire()
    info_dict[n] = s
    info_lock.release()
    
    print("In task %d: %d -> %d" % (n, n, s))


def calculate():
    pool = Pool(processes=4) 

    tasks = range(10)
    for n in tasks:
        pool.apply_async(add, (n,))
        
    pool.close()
    pool.join()
    
    
def print_result():
    global info_dict
    
    key_list = sorted(info_dict.keys())
    
    for key in key_list:
        print("%s: %s" % (key, info_dict[key])) 
    
    
if __name__ == '__main__':
    calculate()
    print_result()
    

除了使用全局变量,还有没有其他的方法呢?毕竟全局变量似乎看起来有点危险,不小心就会被弄坏。

方法-2.

第二种方法,就是记录下multiprocessing.Pool.apply_async的返回值(假设称之为result),然后在Pool被join之后,利用result.get()方法来得到原任务函数的返回值。在这里,multiprocessing.Pool.apply_async的返回值的类型是multiprocessing.pool.ApplyResult,其get()方法会返回原任务函数的返回值。

下面是把上面的那个例子重新写一遍。
Program-2

import multiprocessing
from multiprocessing import Pool


def add(n):
    s = 0
    for i in range(n+1):
        s += i
    return (n, s)


def calculate():
    pool = Pool(processes=4)

    tasks = range(10)
    result_list = list()
    info_dict = dict()
    
    for n in tasks:
        result_list.append(pool.apply_async(add, (n,)))
        
    pool.close()
    pool.join()
    
    for result in result_list:
        k, v = result.get()
        info_dict[k] = v
        
    return info_dict
    
    
def print_result():
    info_dict = calculate()
    
    key_list = sorted(info_dict.keys())
    
    for key in key_list:
        print("%s: %s" % (key, info_dict[key])) 
    
    
if __name__ == '__main__':
    calculate()
    print_result()
    

另外,其实也可以不用等到 Pool join 之后才能调get(). 可以立刻调用get(), 但这可能会造成阻塞。
get()函数其实有一个参数,可以指定超时时间以免无限等下去,如,result.get(timeout=2), 就是设置超时为2秒。

其定义在Python3中如下:

get([timeout])
    Return the result when it arrives. 
    If timeout is not None and the result does not arrive within timeout seconds 
    then multiprocessing.TimeoutError is raised. 
    If the remote call raised an exception then that exception will be reraised by get().

也就是说,如果超时了,就会抛出一个multiprocessing.TimeoutError异常;
而如果该任务进程内抛了异常,也会被get()重新抛出来。

本文程序通过Python2和Python3的测试。

(END)

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
心中带点小风骚的头像心中带点小风骚普通用户
上一篇 2023年5月9日 下午10:41
下一篇 2023年5月9日

相关推荐