Python并行计算实战:多线程与多进程

文章目录

  • 一、简介
    • 1.1、线程 + 多线程 + 进程 + 多进程
      • 1.1.1、线程:是进程中的一个执行单元(共享同一进程的内存空间)
      • 1.1.2、多线程:同时运行多个线程(在一个进程中)
      • 1.1.3、进程:是操作系统中的一个执行单元
      • 1.1.4、多进程:同时运行多个独立的进程
    • 1.2、多线程与多进程(详细区别)
  • 二、CPU处理器的核心数量
    • 2.1、CPU处理器的核心:物理内核 + 逻辑内核
    • 2.2、CPU处理器的参数解析
    • 2.3、获取CPU处理器的逻辑内核数量:os.cpu_count()
    • 2.4、设置CPU处理器的逻辑内核数量:max_workers
  • 三、函数详解
    • 3.1、多线程并行计算的执行器:concurrent.futures.ThreadPoolExecutor()
    • 3.2、多进程并行计算的执行器:concurrent.futures.ProcessPoolExecutor()
    • 3.3、使用须知:executor.submit()
    • 3.3、使用须知:concurrent.futures.as_completed()
    • 3.4、使用须知:executor.shutdown()
  • 四、项目实战
    • 4.1、同时运行相同任务
      • 4.1.1、多线程:适用于IO密集型任务(读写操作)
      • 4.1.2、多进程:适用于CPU密集型任务(计算操作)
    • 4.2、同时运行不同任务
      • 4.2.1、方法一:多线程
      • 4.2.2、方法二:多进程
      • 4.2.3、方法三:协程(使用 asyncio)
      • 4.2.4、方法四:并行计算库(使用 concurrent.futures)速度极快

一、简介

1.1、线程 + 多线程 + 进程 + 多进程

1.1.1、线程:是进程中的一个执行单元(共享同一进程的内存空间)

1.1.2、多线程:同时运行多个线程(在一个进程中)

1.1.3、进程:是操作系统中的一个执行单元

1.1.4、多进程:同时运行多个独立的进程

1.2、多线程与多进程(详细区别)

  • 线程Thread、多线程Multithreading
  • 进程Process、多进程Multiprocessing
多线程(Multithreading)多进程(Multiprocessing)
任务IO密集型任务(任务执行的主要时间都花费在读写操作上)CPU密集型任务(任务执行的主要时间都花费在数值计算上)
功能单线程在等待IO时会被阻塞,而多线程在等待IO期间可以切换到其他任务利用多核处理器来实现真正的并行计算(一个进程的崩溃不会影响其他进程)
内存共享地址空间(在同一进程内部共享相同的内存空间,因此它们可以方便地访问和修改共享的数据)独立地址空间(每个进程有自己独立的地址空间,数据不共享,需要通过进程间通信(IPC)来传递数据)
消耗较小(创建和销毁线程的开销较小)较大(创建和销毁进程的开销较大)
领域1、文件读写;2、网络通信(下载文件、发送与接收网络请求);3、数据库操作(查询数据库、读写数据);4、图像处理(加载图像、保存图像)1、数值计算(大数据分析、统计、计算);2、图像处理(渲染图像)
Python存在全局解释器锁(GIL)在部分的CPU密集型任务中,多线程的性能不如多进程。每个进程都有自己的GIL,可以并行执行。

全局解释器锁(Global Interpreter Lock,简称GIL):是一种在解释器层面对多线程执行的控制机制。为了简化内存管理以及线程安全,但也引入了一些限制。一次只允许一个线程执行:GIL确保在解释器的任何时刻,即便在多核CPU上运行,同一时刻也只有一个线程在执行Python字节码。

二、CPU处理器的核心数量

2.1、CPU处理器的核心:物理内核 + 逻辑内核

CPU核心数:由处理器和操作系统决定

  • 处理器(硬件)
    • 物理内核(Physical Cores)是实际的处理器核心。每个物理内核都是独立的、实际存在的硬件单元。现代计算机通常配备有多核心的处理器。

      • 在多核处理器中,物理内核的数量表示处理器上实际存在的独立处理单元的数量。每个物理内核可以独立地执行指令,具有自己的执行单元、缓存和执行流水线。
    • 逻辑内核(Logical Cores)通过超线程技术(Hyper-Threading)实现的虚拟核心。超线程允许一个物理内核模拟两个逻辑内核,从而在同一时间执行两个线程。

      • 逻辑内核共享物理内核的执行单元、缓存和执行流水线,但它们可以独立地执行指令流。

      举例:计算机配备了四核处理器,每个核心都支持超线程,实现了4个物理内核和8个逻辑内核的处理能力,可同时并行处理8个线程。

  • 操作系统(软件)
    • 线程的分配和管理:决定哪些线程在哪个核心上运行
    • 支持的最大线程数:不同操作系统对支持的最大线程数有限制,这取决于其设计和内核管理能力。

2.2、CPU处理器的参数解析

处理器:12th Gen Intel( R ) Core( TM ) i7-12700 2.10 GHz

  • 12th Gen Intel(R) Core(TM) :表示英特尔第12代Core处理器
  • i7-12700:表示处理器属于i7系列,具体型号为12700
  • 2.10 GHz:表示处理器的时钟频率
  • 架构: 使用了Intel的Alder Lake架构。
  • 核心数: i7-12700有12个核心。
  • 线程数: 具有24个线程,支持超线程技术(每个核心可以同时执行两个线程)。

2.3、获取CPU处理器的逻辑内核数量:os.cpu_count()

# 方法一
import psutil
print("CPU处理器的逻辑内核数量:", psutil.cpu_count(logical=True))  # Ture:显示逻辑内核、False:显示物理内核
print("CPU处理器的物理内核数量:", psutil.cpu_count(logical=False))  # Ture:显示逻辑内核、False:显示物理内核
# CPU处理器的逻辑内核数量: 20
# CPU处理器的物理内核数量: 12
# 方法二
import os
print("CPU处理器的逻辑内核数量:", os.cpu_count())
# CPU处理器的逻辑内核数量: 20
# 方法三
import multiprocessing as mp
print("CPU处理器的逻辑内核数量:", mp.cpu_count())
# CPU处理器的逻辑内核数量: 20

2.4、设置CPU处理器的逻辑内核数量:max_workers

(1)手动指定CPU处理器的逻辑内核数量
(2)若不指定,则默认使用CPU处理器支持逻辑内核的最大数量(如:max_workers=20)

在不超过系统内存RAM的基础上,如何设置逻辑内核的最大数量?

以多线程为例

  • 需求:循环计算函数100次(每次输入参数不同)
  • 已知:系统内存RAM = 64GB,读取图像的内存损耗 = 6GB(只读取一次)计算图像的内存损耗 = 7.81GB(循环计算)
  • 计算逻辑内核的最大数量 = (64 - 6 - 系统内存) / 7.81GB = 7.42。即每轮最多并行计算7个循环max_workers = 7,否则将显示内存分配不足问题。

备注:每个循环对应的内存损耗都是不同的(函数的输入参数不同导致函数内部计算所损耗的内存不同)。故在指定max_workers时,必须以100次循环中的最大内存损耗为基准线(如:13.81GB)。

三、函数详解

3.1、多线程并行计算的执行器:concurrent.futures.ThreadPoolExecutor()

"""
#############################################################################
函数说明: concurrent.futures.ThreadPoolExecutor(max_workers=None)
输入参数: max_workers       指定最大线程数(默认使用CPU处理器支持逻辑内核的最大数量)
#############################################################################
使用方式:
    (1)executor.submit():    单个任务并行计算
            将所有参数只应用一次到函数,完成并行化计算。      返回一个concurrent.futures.Future对象,可以用于获取任务的执行结果。
    (2)executor.map():       多个任务并行计算
            将列表参数循环应用到函数,完成并行化计算。       返回一个map()迭代器,可以迭代获取每个任务的结果
    (3)executor.shutdown():  等待所有任务完成并关闭线程池
            若不调用,程序可能会在所有任务完成之前提前结束,导致一些任务未能执行完毕。
#############################################################################
"""
import concurrent.futures

with concurrent.futures.ThreadPoolExecutor() as executor:
    # (1)使用executor.submit
    future = executor.submit(my_fun, arg1, arg2)  # 其中:my_function是执行函数,arg1和arg2是函数的参数。
    result = future.result()  # 获取单个任务的执行结果

    for future in concurrent.futures.as_completed(futures):  # 返回一个迭代器,其是在任务完成时按照完成的顺序生成 Future 对象。
	    result = future.result()  # 在迭代过程中使用 future.result() 来获取每个已完成任务的结果。
    	print(result)
    # 若是单个任务,两者没有区别。
	# 若是多个任务,as_completed允许在任务完成的时候立即获取结果,而不需要等待所有任务都完成。
	##############################################################
    # (2)使用executor.map
    future = executor.map(my_fun, [arg1, arg2])  # 其中:my_function是执行函数,[arg1, arg2]是一个包含函数参数的列表。
    results = list(future)  # 使用list()将迭代器转换为列表
    print(results)
	##############################################################
    # (3)关闭进程池
    executor.shutdown()

3.2、多进程并行计算的执行器:concurrent.futures.ProcessPoolExecutor()

  • 多线程共享内存,多进程独立内存:若需要在多个进程之间共享数据,确保使用适当的同步机制,以防止数据竞争和其他并发问题。
  • 性能测试: 在使用多进程之前,建议对单次循环进行性能测试,以确定是否有优化的空间。有时候,优化循环内的算法可能比并行计算更有效
"""
#############################################################################    
函数说明: concurrent.futures.ProcessPoolExecutor(max_workers=None)
输入参数: max_workers       指定最大进程数(默认使用CPU处理器支持逻辑内核的最大数量)
#############################################################################    
使用方式: 
    (1)executor.submit():    单个任务并行计算    
            将所有参数只应用一次到函数,完成并行化计算。      返回一个concurrent.futures.Future对象,可以用于获取任务的执行结果。    
    (2)executor.map():       多个任务并行计算    
            将列表参数循环应用到函数,完成并行化计算。       返回一个map()迭代器,可以迭代获取每个任务的结果
    (3)executor.shutdown():  等待所有任务完成并关闭进程池
            若不调用,程序可能会在所有任务完成之前提前结束,导致一些任务未能执行完毕。
#############################################################################  
"""
import concurrent.futures

with concurrent.futures.ProcessPoolExecutor() as executor:
    # (1)使用executor.submit
    future = executor.submit(my_fun, arg1, arg2)  # 其中:my_function是执行函数,arg1和arg2是函数的参数。
    result = future.result()  # 获取单个任务的执行结果
    
    for future in concurrent.futures.as_completed(futures):  # 返回一个迭代器,其是在任务完成时按照完成的顺序生成 Future 对象。
	    result = future.result()  # 在迭代过程中使用 future.result() 来获取每个已完成任务的结果。
    	print(result)
    # 若是单个任务,两者没有区别。
	# 若是多个任务,as_completed允许在任务完成的时候立即获取结果,而不需要等待所有任务都完成。
	##############################################################
    # (2)使用executor.map
    future = executor.map(my_fun, [arg1, arg2])  # 其中:my_function是执行函数,[arg1, arg2]是一个包含函数参数的列表。
    results = list(future)  # 使用list()将迭代器转换为列表
    print(results)
	##############################################################
    # (3)关闭进程池
    executor.shutdown()

3.3、使用须知:executor.submit()

"""适用于:多进程 + 多线程"""
import concurrent.futures

def process_target_gray(value):
    # 处理每个值的逻辑
    return [value * 2, 1]

if __name__ == '__main__':
    gray_values = [1, 2, 3, 4, 5]
    results = []  # 初始化结果列表
    # with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
        # 使用 submit 方法逐个提交任务(futures是保存所有结果的对象)
        futures = [executor.submit(process_target_gray, value) for value in gray_values]
        # 获取每个任务的结果
        for future in futures:
            result = future.result()
            results.append(result)
    print(results)

"""[[2, 1], [4, 1], [6, 1], [8, 1], [10, 1]]"""
"""只适用于:多线程"""
import concurrent.futures

def process_target_gray(value):
    # 处理每个值的逻辑
    return [value * 2, 1]

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:  # 创建线程池
    futures = []  # 用于存储每个任务的 Future 对象
    results = []  # 用于存储每个任务的结果
    gray_values = [1, 2, 3, 4, 5]
    for i in gray_values:
        future = executor.submit(process_target_gray, i)
        futures.append(future)
        # 获取每个任务的结果
    for future in futures:
        result = future.result()
        results.append(result)
    print(results)

"""[[2, 1], [4, 1], [6, 1], [8, 1], [10, 1]]"""

3.3、使用须知:concurrent.futures.as_completed()

使用 concurrent.futures.as_completed 在任务完成的时候立即获取结果,而不需要等待所有任务都完成。

import concurrent.futures

def process_target_gray(value):
    return [value * 2, 1]

if __name__ == '__main__':
    gray_values = [1, 2, 3, 4, 5]
    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
        # 使用 submit 方法逐个提交任务
        futures = [executor.submit(process_target_gray, value) for value in gray_values]
        # 使用 as_completed 在任务完成的时候立即获取结果,而不需要等待所有任务都完成。
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            print(result)

"""
[2, 1]
[4, 1]
[8, 1]
[10, 1]
[6, 1]
"""

3.4、使用须知:executor.shutdown()

若需要循环100次,每次提交一个任务到线程池,指定线程数为20,且没有显式调用 executor.shutdown(),那么情况会如下:

  • 线程重用: 当一个线程完成一个任务后,它可以被线程池重用来执行下一个任务,而不是被销毁。该方法避免了频繁地创建和销毁线程,提高性能。
  • 任务并发执行: 如果线程池中有多个线程,且任务的执行时间相对较短,那么在某个时刻可能有多个线程同时执行不同的任务。
  • 任务重复执行: 如果线程池中的线程已经执行完任务,但主线程仍在循环中提交新的任务,那么这些线程可能会被重用来执行新的任务。这样,在循环的过程中,20个线程可能会不断地重复执行不同的任务。

调用executor.shutdown() 等待线程池中的所有任务完成后才会返回,然后再继续执行主线程。

  • 在某一批次中)某些线程提前完成了任务,它们会等待其余线程完成任务后,然后再次投入到下一批次的任务。
  • 最后一批次中)已经完成的线程会等待其余的线程一起结束,确保线程池的所有线程都完成了任务。

四、项目实战

4.1、同时运行相同任务

4.1.1、多线程:适用于IO密集型任务(读写操作)

import numpy as np
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from scipy.ndimage import median_filter

def memory_usage():
    import psutil
    process = psutil.Process()  # 创建一个进程对象
    mem_info = process.memory_info()  # 获取当前进程在RAM中的内存使用量
    memory_usage_mb = mem_info.rss / (1024 ** 2)  # 表示进程在当前时刻的实际内存使用情况(字节 - MB)
    peak_memory_mb = mem_info.peak_wset / (1024 ** 2)  # 表示进程在任意时间点的内存使用的峰值(字节 - MB)

    return process, memory_usage_mb, peak_memory_mb
    # 备注: 每次只打印一个进程的结果。
    # (1)多进程(不同进程ID): 打印的峰值内存是不同进程ID对应的值而不是所有ID中的最大值
    # (2)多线程(相同进程ID): 打印的峰值内存是多个线程中最大的峰值内存(多线程在一个进程里)

def median_filter_slice(slice):
    print(memory_usage()[0], memory_usage()[1], memory_usage()[2])
    return median_filter(slice, size=3, mode='reflect')  # 中值滤波函数

def apply_median_filter_3d_array(data):
    # with ProcessPoolExecutor() as executor:  # 使用ProcessPoolExecutor创建进程池
    with ThreadPoolExecutor() as executor:  # 使用ThreadPoolExecutor创建线程池
        # 将每个深度方向的切片提交给进程池
        filtered_slices = list(executor.map(median_filter_slice, [data[d, :, :] for d in range(data.shape[0])]))
    result = np.stack(filtered_slices, axis=0)  # 将结果合并成一个新的三维数组
    return result


if __name__ == "__main__":
    """遍历3D图像的每个slice,并分别进行中值滤波"""
    test_array = np.random.randint(0, 100, size=(30, 10, 10))
    filtered_array = apply_median_filter_3d_array(test_array)

    import napari
    viewer = napari.Viewer()  # 创建napari视图
    viewer.layers.clear()  # 清空图层
    viewer.add_image(test_array, name="test_array")  # 添加图像
    viewer.add_image(filtered_array, name="filtered_array")  # 添加图像
    napari.run()  # 显示napari图形界面

4.1.2、多进程:适用于CPU密集型任务(计算操作)

import numpy as np
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from scipy.ndimage import median_filter

def memory_usage():
    import psutil
    process = psutil.Process()  # 创建一个进程对象
    mem_info = process.memory_info()  # 获取当前进程在RAM中的内存使用量
    memory_usage_mb = mem_info.rss / (1024 ** 2)  # 表示进程在当前时刻的实际内存使用情况(字节 - MB)
    peak_memory_mb = mem_info.peak_wset / (1024 ** 2)  # 表示进程在任意时间点的内存使用的峰值(字节 - MB)

    return process, memory_usage_mb, peak_memory_mb
    # 备注: 每次只打印一个进程的结果。
    # (1)多进程(不同进程ID): 打印的峰值内存是不同进程ID对应的值而不是所有ID中的最大值
    # (2)多线程(相同进程ID): 打印的峰值内存是多个线程中最大的峰值内存(多线程在一个进程里)

def median_filter_slice(slice):
    print(memory_usage()[0], memory_usage()[1], memory_usage()[2])
    return median_filter(slice, size=3, mode='reflect')  # 中值滤波函数

def apply_median_filter_3d_array(data):
    with ProcessPoolExecutor() as executor:  # 使用ProcessPoolExecutor创建进程池
    # with ThreadPoolExecutor() as executor:  # 使用ThreadPoolExecutor创建线程池
        # 将每个深度方向的切片提交给进程池
        filtered_slices = list(executor.map(median_filter_slice, [data[d, :, :] for d in range(data.shape[0])]))
    result = np.stack(filtered_slices, axis=0)  # 将结果合并成一个新的三维数组
    return result


if __name__ == "__main__":
    """遍历3D图像的每个slice,并分别进行中值滤波"""
    test_array = np.random.randint(0, 100, size=(30, 10, 10))
    filtered_array = apply_median_filter_3d_array(test_array)

    import napari
    viewer = napari.Viewer()  # 创建napari视图
    viewer.layers.clear()  # 清空图层
    viewer.add_image(test_array, name="test_array")  # 添加图像
    viewer.add_image(filtered_array, name="filtered_array")  # 添加图像
    napari.run()  # 显示napari图形界面

4.2、同时运行不同任务

  • 多线程适用于 I/O 密集型任务,因为线程切换的开销较小,可以有效地并行执行多个 I/O 操作,如文件读写、网络请求等。但由于 Python 的全局解释器锁(GIL),多线程在 CPU 密集型任务上性能有限。

  • 多进程适用于 CPU 密集型任务,因为每个进程都有独立的 Python 解释器和内存空间,不受 GIL 限制,可以充分利用多核处理器。对于 CPU 密集型任务,多进程通常比多线程更快。

  • 协程适用于高并发的 I/O 密集型任务,协程允许在单线程中执行多个任务,避免了线程切换的开销,但需要合理地设计异步代码。协程可以实现非常高的并发性能,但在 CPU 密集型任务上性能可能较差。

  • 并行计算库:如 concurrent.futures、joblib、dask 等可以提供简单的接口来管理并行任务,性能取决于底层的并行执行策略和硬件资源。

4.2.1、方法一:多线程

import threading
import time

# 定义任务1
def task1():
    for i in range(5):
        print("Task 1 - Step", i + 1)
        time.sleep(1)  # 模拟耗时操作

# 定义任务2
def task2():
    for i in range(3):
        print("Task 2 - Step", i + 1)
        time.sleep(1)  # 模拟耗时操作

if __name__ == "__main__":
	# 创建两个线程
	thread1 = threading.Thread(target=task1)
	thread2 = threading.Thread(target=task2)
	# 启动线程
	thread1.start()
	thread2.start()
	# 等待线程完成
	thread1.join()
	thread2.join()
	print("All tasks are completed.")

"""
Task 1 - Step 1
Task 2 - Step 1
Task 2 - Step 2
Task 1 - Step 2
Task 1 - Step 3
Task 2 - Step 3
Task 1 - Step 4
Task 1 - Step 5
All tasks are completed.
"""

4.2.2、方法二:多进程

import multiprocessing
import time

# 定义任务1
def task1():
    for i in range(5):
        print("Task 1 - Step", i + 1)
        time.sleep(1)  # 模拟耗时操作

# 定义任务2
def task2():
    for i in range(3):
        print("Task 2 - Step", i + 1)
        time.sleep(1)  # 模拟耗时操作

if __name__ == "__main__":
    # 创建两个进程
    process1 = multiprocessing.Process(target=task1)
    process2 = multiprocessing.Process(target=task2)
    # 启动进程
    process1.start()
    process2.start()
    # 等待进程完成
    process1.join()
    process2.join()
    print("All tasks are completed.")
    
"""
Task 1 - Step 1
Task 2 - Step 1
Task 2 - Step 2
Task 1 - Step 2
Task 2 - Step 3
Task 1 - Step 3
Task 1 - Step 4
Task 1 - Step 5
All tasks are completed.
"""

4.2.3、方法三:协程(使用 asyncio)

import asyncio

# 定义任务1
async def task1():
    for i in range(5):
        print("Task 1 - Step", i + 1)
        await asyncio.sleep(1)  # 模拟异步操作

# 定义任务2
async def task2():
    for i in range(3):
        print("Task 2 - Step", i + 1)
        await asyncio.sleep(1)  # 模拟异步操作

async def main():
    # 并行执行 task1 和 task2
    await asyncio.gather(task1(), task2())

if __name__ == "__main__":
    asyncio.run(main())
    
"""
Task 1 - Step 1
Task 2 - Step 1
Task 1 - Step 2
Task 2 - Step 2
Task 1 - Step 3
Task 2 - Step 3
Task 1 - Step 4
Task 1 - Step 5
"""

4.2.4、方法四:并行计算库(使用 concurrent.futures)速度极快

import concurrent.futures

# 定义任务1
def task1():
    for i in range(5):
        print("Task 1 - Step", i + 1)

# 定义任务2
def task2():
    for i in range(3):
        print("Task 2 - Step", i + 1)

if __name__ == "__main__":
    # 使用 ThreadPoolExecutor 创建线程池
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # 提交任务1和任务2给线程池
        future1 = executor.submit(task1)
        future2 = executor.submit(task2)
        # 获取任务1和任务2的结果
        result1 = future1.result()
        result2 = future2.result()
        
"""
Task 1 - Step 1
Task 1 - Step 2
Task 1 - Step 3
Task 1 - Step 4
Task 1 - Step 5
Task 2 - Step 1
Task 2 - Step 2
Task 2 - Step 3
"""

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
青葱年少的头像青葱年少普通用户
上一篇 2023年12月22日
下一篇 2023年12月22日

相关推荐