浅析python多线程中的锁

Python 中多线程主要有以下几种类型的锁:

  1. threading.Lock:这是最基础的锁对象,不属于任何线程,在Python中,它是一个同步原语对象。一次只有一个线程可以获得锁。如果一个线程试图获得一个已经被其他线程拿着的锁,那么这个线程就会被阻塞,直到拥有锁的线程释放。
  2. threading.RLock:可重入锁(RLock),允许同一线程多次获取锁,但是每次获取锁都需要释放,通常用于递归函数。如果使用Lock,那么在同一个线程多次获取锁时会产生死锁。
  3. threading.Semaphore:信号量,允许一定数量的线程同时获取锁。例如,如果你有一些资源,每次可以被5个线程同时访问,那么你就可以使用一个初始化为5的信号量。
  4. threading.BoundedSemaphore:有界信号量,与Semaphore类似,但是在释放锁的时候,会检查当前的信号量的值是否超过了初始值,如果超过了会抛出一个异常。
  5. threading.Condition:条件变量,允许一个或多个线程等待某个条件满足,然后唤醒。
  6. threading.Event:事件对象。事件对象内部有一个标志位,初始为False。如果标志位为True,wait()不做任何事情,如果标志位为False,wait()会阻塞。可以通过set()来设置标志位,clear()来清除标志位。
  7. threading.Barrier:栅栏对象,允许一定数量的线程同步,直到所有线程都到达栅栏位置,才会全部释放。

注意,在Python中,由于全局解释器锁(GIL)的存在,同一时刻只允许一个线程执行Python字节码,因此Python的多线程并不能实现真正的并行计算。如果需要进行并行计算,可以使用multiprocessing模块,或者使用其他的并行计算框架,如concurrent.futures

threading.Lock():

线程锁,可用于同步多个线程对共享资源的访问。

错误示范:

存在一个问题,就是在每次循环的时候都创建了一个新的锁。这样的话,每个锁都是独立的,它们之间不能保证对共享资源 counter 的互斥访问,因此在多线程环境下,counter += 1 操作可能会出现竞态条件,导致结果不是预期的 10000000

import threading
counter = 0  # 共享资源
def increment():
    global counter
    for _ in range(1000000):
        with threading.Lock():  # 获取线程锁
            counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(counter)  # 输出 10000000

正确示例

import threading
counter = 0  # 共享资源
lock = threading.Lock()  # 创建一个共享的锁
def increment():
    global counter
    for _ in range(1000000):
        with lock:  # 获取线程锁
            counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(counter)  # 输出应该是10000000

threading.RLock()

可重入锁,是一种特殊的线程锁,允许同一线程多次获得锁。

import threading
# 创建一个RLock对象
lock = threading.RLock()
def recursive_function(level):
    with lock:
        if level > 0:
            print('Entering level', level)
            recursive_function(level - 1)
            print('Exiting level', level)
        else:
            print('Base case')
# 在主线程中运行递归函数
recursive_function(5)

在这个例子中,recursive_function 是一个递归函数,它会在每个递归级别上获取一次锁。由于我们使用的是 threading.RLock,同一线程可以多次获取锁,所以这个代码能够正常运行。

但是如果我们使用 threading.Lock 替代 threading.RLock,那么在第二次尝试获取锁时,线程将被阻塞,因为 threading.Lock 不允许同一线程多次获取锁。这会导致死锁,程序将无法继续运行。

threading.RLock(Reentrant Lock)在Python中是一种可重入锁,也就是说,它允许同一线程在没有释放其所有权的情况下多次获取同一个锁。这在某些情况下是非常有用的,例如在递归函数或者嵌套调用中。

以下是一些具体的应用场景:

  1. 递归函数:在递归函数中,同一个线程可能需要多次获取同一个锁。如果使用普通的 threading.Lock,那么在第二次尝试获取锁的时候,线程将会阻塞,因为普通的锁不允许同一线程多次获取。而 threading.RLock 则允许这种行为。
  2. 复杂的同步问题:在某些复杂的同步问题中,你可能需要在同一线程中的多个函数或者多个不同的代码块中获取同一个锁。如果使用普通的 threading.Lock,那么你需要确保在每次获取锁之前都已经释放了锁,这在某些情况下可能会很复杂。而 threading.RLock 则可以简化这种情况的处理。
  3. 实现高级同步原语:你可以使用 threading.RLock 来实现一些更高级的同步原语,例如读写锁(ReadWriteLock)。在这种情况下,读锁可以被同一线程多次获取,而写锁只能被同一线程获取一次。你可以使用 threading.RLock 来实现读锁的行为。

需要注意的是,虽然 threading.RLock 在某些情况下非常有用,但是在大多数情况下,你仍然应该使用更简单的 threading.Lock。因为过度使用 threading.RLock 可能会使你的代码更复杂,更难以理解和维护。而且,不正确的使用 threading.RLock 可能会导致死锁。

threading.Semaphore():

信号量,用于限制同时访问某一资源的线程数量。

模拟一个有限资源池(例如数据库连接池),限制同时访问资源的线程数量。

import threading
import time
# 定义一个有限资源池
RESOURCE_POOL_SIZE = 3
semaphore = threading.Semaphore(RESOURCE_POOL_SIZE)
def access_resource(thread_id):
    print(f"Thread {thread_id} is requesting access to the resource pool.")
    with semaphore:
        print(f"Thread {thread_id} has acquired access to the resource pool.")
        time.sleep(1)  # 模拟资源使用
        print(f"Thread {thread_id} has released access to the resource pool.")
# 创建10个线程
threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

在这个例子中,我们有一个有限的资源池,其大小由 RESOURCE_POOL_SIZE 定义。我们使用一个 threading.Semaphore 对象,将其初始值设为资源池的大小,以限制同时访问资源池的线程数量。

我们创建了10个线程,每个线程都尝试访问资源池。由于我们使用了信号量,一次只能有 RESOURCE_POOL_SIZE 个线程同时访问资源池。其他线程将等待,直到有线程释放资源。这样,我们可以限制同时访问资源的线程数量,防止资源竞争或过载。

threading.BoundedSemaphore:

有界信号量

import threading
# 创建一个有界信号量,初始值为2
semaphore = threading.BoundedSemaphore(2)
def access_resource(thread_id):
    print(f"Thread {thread_id} is requesting access to the resource.")
    with semaphore:
        print(f"Thread {thread_id} has acquired access to the resource.")
        # 模拟资源使用
# 创建3个线程
threads = [threading.Thread(target=access_resource, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()
# 尝试释放未获取的信号量,将引发 ValueError
try:
    semaphore.release()
except ValueError as e:
    print("Caught exception:", e)

在这个代码段中,semaphore.release() 尝试释放(即增加)信号量的计数。在Python中,信号量是一个用来限制线程并发数量的同步原语,它内部有一个计数器。当一个线程调用 acquire() 方法时,信号量的计数器减一;当一个线程调用 release() 方法时,信号量的计数器加一。

threading.BoundedSemaphorethreading.Semaphore 的行为基本相同,但有一点不同:如果在调用 release() 后,信号量的计数器的值大于创建信号量时设定的初始值,threading.BoundedSemaphore 将抛出 ValueError 异常。这种行为有助于检测程序中的一些错误,例如错误地多次释放了信号量。

所以在这个示例中,try/except 块是为了捕获并处理可能由 semaphore.release() 抛出的 ValueError 异常。如果在调用 release() 后,信号量的计数器的值大于创建信号量时设定的初始值,那么将会捕获到 ValueError,并打印出 “Caught exception:” 及其错误信息。

threading.Condition:

条件变量,允许一个或多个线程等待某个条件满足,然后唤醒。

import threading
import time
# 创建一个条件变量
condition = threading.Condition()
# 创建一个共享资源
resource = []
def producer():
    for i in range(5):
        time.sleep(1)  # 模拟生产过程
        with condition:
            resource.append(i)  # 向资源中添加数据
            condition.notify()  # 唤醒一个等待的线程
def consumer():
    while True:
        with condition:
            while not resource:  # 如果资源为空,则等待
                condition.wait()
            item = resource.pop(0)  # 从资源中获取数据
            print(f"Consumer consumed: {item}")
            if item == 4:  # 如果消费了所有的资源,就退出循环
                break
# 创建一个生产者线程和一个消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()

threading.Condition 对象常常与一个锁一起使用,允许一个或多个线程等待直到满足某个特定的条件,然后唤醒。这在某些情况下很有用,比如当你需要一个或多个线程等待直到其他线程完成特定任务或者改变了某个状态。

threading.Barrier:

栅栏对象,允许一定数量的线程同步,直到所有线程都到达栅栏位置,才会全部释放

import threading
import time
# 设定栅栏,允许3个线程进行同步
barrier = threading.Barrier(3)
def worker(thread_id):
    print(f"Thread {thread_id} is starting.")
    time.sleep(thread_id)  # 模拟线程执行过程中的延时
    print(f"Thread {thread_id} is waiting at the barrier.")
    barrier.wait()  # 等待所有线程到达栅栏
    print(f"Thread {thread_id} is released from the barrier.")
# 创建3个线程
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

在这个例子中,我们创建了一个栅栏对象,允许3个线程进行同步。然后我们创建了3个线程,每个线程都会执行 worker 函数。在 worker 函数中,线程首先打印一个开始信息,然后等待一段时间(这里是线程ID,可以模拟线程执行过程中的延时),接着打印一个等待信息,然后调用 barrier.wait() 等待栅栏。

当所有3个线程都调用了 barrier.wait(),栅栏将释放所有等待的线程。这时,线程将继续执行并打印它们已经从栅栏中释放的信息。这个示例演示了如何使用 threading.Barrier 对象来同步一组线程,确保它们在某个点上相互等待。

到此这篇关于浅析python多线程中的锁的文章就介绍到这了,更多相关python多线程锁内容请搜索aitechtogether.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持aitechtogether.com!

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
青葱年少的头像青葱年少普通用户
上一篇 2023年9月12日
下一篇 2023年9月12日

相关推荐

此站出售,如需请站内私信或者邮箱!