asyncio 概念和用法

声明:本文针对的是python3.4以后的版本的,因为从3.4开始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不过语法上面有稍微的改变。比如在3.4版本中使用@asyncio.coroutine装饰器和yield from语句,但是在3.5以后的版本中使用asyncawait两个关键字代替,虽然语法上稍微有所差异,但是原理是一样的。本文用最通俗的语言解释了python asyncio背后的一些核心概念,简要解析了asyncio的设计架构,并给出了使用python进行asyncio异步编程的一般模板。

目录

一、一些最重要的概念

对于其他的并发模型大多数采取的都是线性的方式编写。并且依赖于语言运行时系统或操作系统的底层线程或进程来适当地改变上下文,而基于asyncio的应用要求应用代码显示的处理上下文切换。

asyncio提供的框架以事件循环(event loop)为中心,程序开启一个无限的循环,程序会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。

1. 协程(coroutine)——本质就是一个函数

所谓的“协程”就是一个函数,这个函数需要有两个基本的组成要素:

  • 第一,需要使用@asyncio.coroutine进行装饰;
  • 第二,函数体内一定要有yield from返回的generator,或者是说使用yield from返回另一个协程对象。

当然,这两个条件并不是硬性规定的,如果没有这两个条件,依然是函数,只不过是普通函数而已。

怎么判断一个函数是不是协程?通过asyncio.iscoroutine(obj)asyncio.iscoroutinefunction(func)加以判断,返回True,则是。

import asyncio

async def time():
    asyncio.sleep(3)

t = time()
asyncio.iscoroutine(t)
asyncio.iscoroutinefunction(time)

返回都是True

2. 事件循环——event_loop

事件循环是一种处理多并发量的有效方式,在维基百科中它被描述为「一种等待程序分配事件或消息的编程架构」,我们可以定义事件循环来简化使用轮询方法来监控事件,通俗的说法就是「当A发生时,执行B」。事件循环利用poller对象,使得程序员不用控制任务的添加、删除和事件的控制。事件循环使用回调方法来知道事件的发生。它是asyncio提供的「中央处理设备」,支持如下操作:

  • 注册、执行和取消延迟调用(超时)
  • 创建可用于多种类型的通信的服务端和客户端的Transports
  • 启动进程以及相关的和外部通信程序的Transports
  • 将耗时函数调用委托给一个线程池
  • 单线程(进程)的架构也避免的多线程(进程)修改可变状态的锁的问题。

与事件循环交互的应用要显示地注册将运行的代码,让事件循环在资源可用时向应用代码发出必要的调用。如:一个套接字再没有更多的数据可以读取,那么服务器会把控制全交给事件循环。

3. 什么是awaitable对象——即可暂停等待的对象

有三类对象是可等待的,即coroutinesTasksFutures

  • coroutine:本质上就是一个函数,以前面的生成器yieldyield from为基础;
  • Tasks: 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;
  • Future:它是一个“更底层”的概念,他代表一个一步操作的最终结果,因为一步操作一般用于耗时操作,结果不会立即得到,会在“将来”得到异步运行的结果,故而命名为Future。

三者的关系,coroutine可以自动封装成task,而Task是Future的子类。

4. 什么是task任务

task是Future的一个子类,它知道如何包装和管理一个协程的执行。任务所需的资源可用时,事件循环会调度任务允许,并生成一个结果,从而可以由其他协程消费。

如前所述,Task用来并发调度的协程,即对协程函数的进一步包装。那为什么还需要包装呢?因为单纯的协程函数仅仅是一个函数而已,将其包装成任务,任务是可以包含各种状态的,异步编程最重要的就是对异步操作状态的把控。

5. 什么是future?

future是一个数据结构,表示还未完成的工作结果。事件循环可以监视Future对象是否完成。从而允许应用的一部分等待另一部分完成一些工作。

Future是一个较低层的可等待(awaitable)对象,他表示的是异步操作的最终结果,当一个Future对象被等待的时候,协程会一直等待,直到Future已经运算完毕。

Future是Task的父类,一般情况下,已不用去管它们两者的详细区别。

返回 future 对象的低级函数的一个很好的例子是loop.run_in_executor()

二、asyncio的基本架构

asyncio分为高层API和低层API,我们都可以使用,我们前面所讲的Coroutine和Tasks属于高层API,而Event Loop 和Future属于低层API。当然asyncio所涉及到的功能远不止于此,所谓的高层API主要是指那些asyncio.xxx()的方法。

下面是是高层API和低层API的概览:

一.常见的一些高层API方法

1. 运行异步协程 asyncio.run

asyncio.run(coro, *, debug=False)  #运行一个一步程序,python3.7 新添加的内容

协程函数,不是像普通函数那样直接调用运行的,必须添加到事件循环中,然后由事件循环去运行,单独运行协程函数是不会有结果的,看一个简单的例子:

import time
import asyncio
 
async def say_after_time(delay,what):
    await asyncio.sleep(delay)
    print(what)
    
async def main():
    print(f"开始时间为:{time.time()}")
    await say_after_time(1, "hello")
    await say_after_time(2, "world")
    print(f"结束时间为:{time.time()}")
    
loop=asyncio.get_event_loop()    #创建事件循环对象
#loop=asyncio.new_event_loop()   #与上面等价,创建新的事件循环
loop.run_until_complete(main())  #通过事件循环对象运行协程函数
loop.close()

如果我们单独像执行普通函数那样执行一个协程函数,只会返回一个coroutine对象。如下所示:

>>> main()
<coroutine object main at 0x000001ED74F89040>

获取事件循环对象的几种方式:

下面几种方式可以用来获取、设置、创建事件循环对象loop

  • loop=asyncio.get_running_loop()返回(获取)在当前线程中正在运行的事件循环,如果没有正在运行的事件循环,则会显示错误;它是python3.7中新添加的。
  • loop=asyncio.get_event_loop()获得一个事件循环,如果当前线程还没有事件循环,则创建一个新的事件循环loop。
  • loop=asyncio.set_event_loop(loop)设置一个事件循环为当前线程的事件循环。
  • loop=asyncio.new_event_loop()创建一个新的事件循环。

通过事件循环运行协程函数的两种方式:

  • 方式一:创建事件循环对象loop,即asyncio.get_event_loop(),通过事件循环运行协程函数
  • 方式二:直接通过asyncio.run(function_name)运行协程函数。但是需要注意的是,首先run函数是python3.7版本新添加的,前面的版本是没有的;其次,这个run函数总是会创建一个新的事件循环并在run结束之后关闭事件循环,所以,如果在同一个线程中已经有了一个事件循环,则不能再使用这个函数了,因为同一个线程不能有两个事件循环,而且这个run函数不能同时运行两次,因为他已经创建一个了。即同一个线程中是不允许有多个事件循环loop的。

asyncio.run()是python3.7 新添加的内容,也是后面推荐的运行任务的方式,因为它是高层API,后面会讲到它与asyncio.run_until_complete()的差异性,run_until_complete()是相对较低层的API。

注意:到底什么是事件循环?如何理解?

可以这样理解:线程一直在各个协程方法之间永不停歇的游走,遇到一个yield from或者await就悬挂起来,然后又走到另外一个方法,依次进行下去,知道事件循环所有的方法执行完毕。实际上loop是BaseEventLoop的一个实例,我们可以查看定义,它到底有哪些方法可调用。

2. 创建任务 asyncio.create_task

(1) 创建任务(两种方法):

  • 方法一:task = asyncio.create_task(coro()) # 这是3.7版本新添加的
  • 方法二:task = asyncio.ensure_future(coro())

也可以使用

loop.create_future()
loop.create_task(coro)
备注:

loop.create_task接受的参数需要是一个协程,但是asyncio.ensure_future除了接受协程,还可以是Future对象或者awaitable对象:

  1. 如果参数是协程,其实底层还是用的loop.create_task,返回Task对。
  2. 如果是Future对象会直接返回。
  3. 如果是一个awaitable对象会await这个对象的__await__方法,再执行一次ensure_future,最后返回Task或者Future。

所以就像ensure_future名字说的,确保这个是一个Future对象:Task是Future 子类,前面说过一般情况下开发者不需要自己创建Future

其实前面说的asyncio.waitasyncio.gather里面都用了asyncio.ensure_future。对于绝大多数场景要并发执行的是协程,所以直接用asyncio.create_task就足够了。

(2) 获取某一个任务的方法:

  • 方法一:task=asyncio.current_task(loop=None)
    返回在某一个指定的loop中,当前正在运行的任务,如果没有任务正在运行,则返回None;如果loop为None,则默认为在当前的事件循环中获取。

  • 方法二:asyncio.all_tasks(loop=None)
    返回某一个loop中还没有结束的任务。需要注意的是,传入ensure_future() 的 coroutine 不会立马启动,需要有某个地方使用了await语句操作创建的 task 的时候它才会被执行。

3. 睡眠 asyncio.sleep

await asyncio.sleep(delay, result=None, *, loop=None)

这个函数表示的是:当前的那个任务(协程函数)睡眠多长时间,而允许其他任务执行。这是它与time.sleep()的区别,time.sleep()是当前线程休息。

另外如果提供了参数result,当当前任务(协程)结束的时候,它会返回;

loop参数将会在3.10中移除。

4. 多个协程函数时候的等候 asyncio.wait

await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

并发执行aws可迭代对象中的可等待对象并进入阻塞状态直到全部满足return_when的条件。

第一个参数aws是一个集合,要写成集合set的形式,比如:
{func(),func(),func3()}
表示的是一系列的协程函数或者是任务,其中协程会自动包装成任务。事实上,写成列表的形式也是可以的。

注意:该函数的返回值是两个Tasks/Futures的集合:(done, pending)

  • done是一个集合,表示已经完成的任务tasks;
  • pending也是一个集合,表示还没有完成的任务。

常见的使用方法为:done, pending = await asyncio.wait(aws)

参数解释:

  • timeout (a float or int), 同上面的含义一样,需要注意的是,这个不会触发asyncio.TimeoutError异常,如果到了timeout还有任务没有执行完,那些没有执行完的tasks和futures会被返回到第二个集合pending里面。
  • return_when参数,顾名思义,他表示的是,什么时候wait函数该返回值。只能够去下面的几个值:
    • FIRST_COMPLETED:first_completes.当任何一个task或者是future完成或者是取消,wait函数就返回
    • FIRST_EXCEPTION :当任何一个task或者是future触发了某一个异常,就返回,.如果是所有的task和future都没有触发异常,则等价与下面的 ALL_COMPLETED.
    • ALL_COMPLETED:当所有的task或者是future都完成或者是都取消的时候,再返回。

例子一:

import asyncio
import time
 
a=time.time()
 
async def sleep1():  #大约1秒
   print("sleep1 begin")
   await asyncio.sleep(1)
   print("sleep1 end")
 
async def sleep2():  #大约2秒
    print("sleep2 begin")
    await asyncio.sleep(2)
    print("sleep2 end")
 
async def sleep3():  #大约3秒
    print("sleep3 begin")
    await asyncio.sleep(3)
    print("sleep3 end")
 
async def main():   #入口函数
    done,pending=await asyncio.wait({sleep1(),sleep2(),sleep3()},return_when=asyncio.FIRST_COMPLETED)
    for i in done:
        print(i)
    for j in pending:
        print(j)
 
asyncio.run(main()) #运行入口函数
b=time.time()
print('---------------------------------------')
print(b-a)

运行结果为:

sleep3 begin
sleep1 begin
sleep2 begin
sleep1 end
<Task finished name='Task-3' coro=<sleep1() done, defined at /data/py_code/asyncio_example/aio_wait.py:6> result=None>
<Task pending name='Task-4' coro=<sleep2() running at /data/py_code/asyncio_example/aio_wait.py:13> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f6133a81c10>()]>>
<Task pending name='Task-2' coro=<sleep3() running at /data/py_code/asyncio_example/aio_wait.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f6133a81bb0>()]>>
---------------------------------------
1.0069866180419922

从上面可以看出,sleep1()是运行结束了的,sleep2()sleep3()还没结束。

例子二:

import asyncio
 
async def num(n):
    try:
        await asyncio.sleep(n*0.1)
        return n
    except asyncio.CancelledError:
        print(f"数字{n}被取消")
        raise
 
async def main():
    tasks = [num(i) for i in range(10)]
    complete, pending = await asyncio.wait(tasks, timeout=0.5)
    for i in complete:
        print("当前数字",i.result())
    if pending:
        print("取消未完成的任务")
        for p in pending:
            p.cancel()
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

运行结果为:

当前数字 2
当前数字 1
当前数字 0
当前数字 3
当前数字 4
取消未完成的任务
数字5被取消
数字6被取消
数字8被取消
数字9被取消
数字7被取消

可以发现结果并没有按照数字的顺序显示,在内部wait()使用一个set保存它创建的Task实例。因为set是无序的所以这也就是我们的任务不是顺序执行的原因。 wait的返回值是一个元组,包括两个集合,分别表示已完成和未完成的任务。wait第二个参数为一个超时值。

达到这个超时时间后,未完成的任务状态变为pending,如果没有调用cancel方法取消任务,当程序退出时还有任务没有完成此时就会看到如下的错误提示。

Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<num() running at py_code/asyncio_example/aio_wait2.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f07ec732e20>()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-9' coro=<num() running at py_code/asyncio_example/aio_wait2.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f07ec732eb0>()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-10' coro=<num() running at py_code/asyncio_example/aio_wait2.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f07ec732ee0>()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-11' coro=<num() running at py_code/asyncio_example/aio_wait2.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f07ec732f10>()]>>

此时可以通过迭代调用cancel方法取消任务。也就是这段代码

    if pending:
        print("取消未完成的任务")
        for p in pending:
            p.cancel()

与 wait_for() 不同,wait() 在超时发生时不会取消可等待对象。

5. 并发运行多个任务 asyncio.gather

await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

它本身也是awaitable的。

*coros_or_futures是一个序列拆分操作,如果是以个协程函数,则会自动转换成Task。当所有的任务都完成之后,返回的结果是一个列表的形式,列表中值的顺序和*coros_or_futures完成的顺序是一样的。

return_exceptions=False,这是他的默认值,第一个出发异常的任务会立即返回,然后其他的任务继续执行;
return_exceptions=True,对于已经发生了异常的任务,也会像成功执行了任务那样,等到所有的任务执行结束一起将错误的结果返回到最终的结果列表里面。

如果gather()本身被取消了,那么绑定在它里面的任务也就取消了。

gather的使用:

gather的作用和wait类似不同的是。

  • gather任务无法取消。
  • 返回值是一个结果列表
  • 可以按照传入参数的顺序,顺序输出。

我们将上面的代码改为gather的方式

import asyncio
 
async def num(n):
    try:
        await asyncio.sleep(n * 0.1)
        return n
    except asyncio.CancelledError:
        print(f"数字{n}被取消")
        raise
 
async def main():
    tasks = [num(i) for i in range(10)]
    complete = await asyncio.gather(*tasks)
    for i in complete:
        print("当前数字", i)
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

输出:

当前数字 0
当前数字 1
当前数字 2
当前数字 3
当前数字 4
当前数字 5
当前数字 6
当前数字 7
当前数字 8
当前数字 9

gather通常被用来阶段性的一个操作,做完第一步才能做第二步,比如下面这样

第二阶段完成
此时用时 2.003401517868042
第一阶段完成
此时用时 5.003954172134399
5
2
总用时 5.004146337509155

可以通过上面结果得到如下结论:

  • step1和step2是并行运行的。
  • gather会等待最耗时的那个完成之后才返回结果,耗时总时间取决于其中任务最长时间的那个。

6. 防止任务取消 asyncio.shield

await asyncio.shield(*arg, *, loop=None)

它本身也是awaitable。顾名思义,shield为屏蔽、保护的意思,即保护一个awaitable 对象防止取消,一般情况下不推荐使用,而且在使用的过程中,最好使用try语句块更好。

try:
   res = asyncio.shield(something())
except asyncio.CancelledError:
   res = None

7. 设置超时 asyncio.wait_for

await asyncio.wait_for(aw, timeout, *, loop=None)

等待aw可等待对象完成,指定timeout秒后超时。

  • 如果 aw 是一个协程函数,会自动包装成一个任务task。
  • 等待 aw 可等待对象完成,指定 timeout 秒数后超时。
  • 如果 aw 是一个协程,它将自动作为任务加入日程。
  • timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数。如果timeout 为 None,则等待直到完成。
  • 如果发生超时,任务将取消并引发asyncio.TimeoutError
  • 要避免任务取消,可以加上shield()
  • 函数将等待直到目标对象确实被取消,所以总等待时间可能超过 timeout 指定的秒数。
  • 如果等待被取消,则 aw 指定的对象也会被取消。
  • loop 参数已弃用,计划在 Python 3.10 中移除。

参见下面的例子:

import asyncio
 
async def eternity():
   print('我马上开始执行')
   await asyncio.sleep(3600)  #当前任务休眠1小时,即3600秒
   print('终于轮到我了')
 
async def main():
   # Wait for at most 1 second
    try:
        print('等你3秒钟哦')
        await asyncio.wait_for(eternity(), timeout=3)  #休息3秒钟了执行任务
    except asyncio.TimeoutError:
        print('超时了!')
 
asyncio.run(main())

运行结果为:

等你3秒钟哦
我马上开始执行
超时了!

首先调用main()函数,作为入口函数,当输出‘等你3秒钟哦’之后,main挂起,执行eternity,然后打印‘我马上开始执行’,然后eternity挂起,而且要挂起3600秒,大于3,这时候触发TimeoutError。修改一下:

import asyncio
    
async def eternity():
    print('我马上开始执行')
    await asyncio.sleep(2)  #当前任务休眠2秒钟,2<3
    print('终于轮到我了')
    
async def main():
    # Wait for at most 1 second
    try:
        print('等你3秒钟哦')
        await asyncio.wait_for(eternity(), timeout=3)  #给你3秒钟执行你的任务
    except asyncio.TimeoutError:
        print('超时了!')
    
asyncio.run(main())

运行结果为:

等你3秒钟哦
我马上开始执行
终于轮到我了

总结:当异步操作需要执行的时间超过wait_for设置的timeout,就会触发异常,所以在编写程序的时候,如果要给异步操作设置timeout,一定要选择合适,如果异步操作本身的耗时较长,而你设置的timeout太短,会涉及到她还没做完,就抛出异常了。

8. asyncio.as_completed()函数

as_complete是一个生成器,会管理指定的一个任务列表,并生成他们的结果。每个协程结束运行时一次生成一个结果。与wait一样,as_complete不能保证顺序,不过执行其他动作之前没有必要等待所以后台操作完成。

asyncio.as_completed(aws, *, loop=None, timeout=None)

第一个参数aws:同上面一样,是一个集合{}集合里面的元素是coroutine、task或者future
第三个参数timeout:意义和上面讲的的一样

import asyncio
import time
 
a=time.time()
 
async def sleep5():
    print("sleep5 begin")
    await asyncio.sleep(5)  #大约5秒
    print("sleep5 end")
    return '哈哈5'
  
async def sleep3():
   print("sleep3 begin")
   await asyncio.sleep(3) #大约3秒
   print("sleep3 end")
   return '哈哈3'
 
async def sleep4():
   print("sleep4 begin")
   await asyncio.sleep(4) #大约4秒
   print("sleep4 end")
   return '哈哈4'
 
async def main():
   s=asyncio.as_completed({sleep5(),sleep3(),sleep4()})
   for f in s:
       result=await f
       print(result)
   
asyncio.run(main())
b=time.time()
print('---------------------------------------')
print(b-a)

运行结果为:

sleep4 begin
sleep5 begin
sleep3 begin
sleep3 end
哈哈3
sleep4 end
哈哈4
sleep5 end
哈哈5
---------------------------------------
5.003020286560059

结论:asyncio.as_completed()函数返回的是一个可迭代(iterator)的对象,对象的每个元素就是一个future对象,很多小伙伴说,这不是相当于没变吗?其实返回的future集合是对参数的future集合重新组合,组合的顺序就是,最先执行完的协程函数(coroutine、task、future)最先返回,从上面的代码可知,参数为
aws={ {sleep5(),sleep3(),sleep4()}},因为sleep5大约花费5秒、sleep3大约花费3秒、sleep4大约花费4秒。返回的结果为
s={ {sleep3(),sleep4(),sleep5()}},因为sleep3()时间最短,故而放在前面,sleep5时间最长,故而放在最后面。然后对返回的集合s开始迭代。

例子二:

import asyncio
import time
 
async def foo(n):
    print('Waiting: ', n)
    await asyncio.sleep(n)
    return n
 
async def main():
    coroutine1 = foo(1)
    coroutine2 = foo(2)
    coroutine3 = foo(4)
 
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result))
 
now = lambda : time.time()
start = now()
 
loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print(now() - start)

输出

Waiting:  1
Waiting:  2
Waiting:  4
Task ret: 1
Task ret: 2
Task ret: 4
4.004435062408447

可以发现结果逐个输出。

9. 协程中调用普通函数 call_soon, call_later, call_at

在协程中可以通过一些方法去调用普通的函数。可以使用的关键字有call_soon,call_later,call_at。

call_soon

可以通过字面意思理解调用立即返回。

loop.call_soon(callback, *args, context=None)

在下一个迭代的时间循环中立刻调用回调函数,大部分的回调函数支持位置参数,而不支持”关键字参数”,如果是想要使用关键字参数,则推荐使用functools.aprtial()对方法进一步包装。可选关键字context允许指定要运行的回调的自定义contextvars.Context。当没有提供上下文时使用当前上下文。在Python 3.7中, asyncio协程加入了对上下文的支持。使用上下文就可以在一些场景下隐式地传递变量,比如数据库连接session等,而不需要在所有方法调用显示地传递这些变量。

下面来看一下具体的使用例子。

import asyncio
import functools
 
def callback(args, *, kwargs="defalut"):
    print(f"普通函数做为回调函数,获取参数:{args},{kwargs}")
 
async def main(loop):
    print("注册callback")
    loop.call_soon(callback, 1)
    wrapped = functools.partial(callback, kwargs="not defalut")
    loop.call_soon(wrapped, 2)
    await asyncio.sleep(0.2)
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main(loop))
finally:
    loop.close()

输出结果

注册callback
普通函数做为回调函数,获取参数:1,defalut
普通函数做为回调函数,获取参数:2,not defalut

通过输出结果我们可以发现我们在协程中成功调用了一个普通函数,顺序的打印了1和2。

有时候我们不想立即调用一个函数,此时我们就可以call_later延时去调用一个函数了。

call_later

loop.call_later(delay, callback, *args, context=None)

首先简单的说一下它的含义,就是事件循环在delay多长时间之后才执行callback函数。配合上面的call_soon让我们看一个小例子:

import asyncio
 
def callback(n):
    print(f"callback {n} invoked")
 
async def main(loop):
    print("注册callbacks")
    loop.call_later(0.2, callback, 1)
    loop.call_later(0.1, callback, 2)
    loop.call_soon(callback, 3)
    await asyncio.sleep(0.4)
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

输出

注册callbacks
callback 3 invoked
callback 2 invoked
callback 1 invoked

通过上面的输出可以得到如下结果:

  1. call_soon会在call_later之前执行,和它的位置在哪无关
  2. call_later的第一个参数越小,越先执行。

call_at

loop.call_at(when, callback, *args, context=None)

call_at第一个参数的含义代表的是一个单调时间,它和我们平时说的系统时间有点差异,这里的时间指的是事件循环内部时间,可以通过loop.time()获取,然后可以在此基础上进行操作。后面的参数和前面的两个方法一样。实际上call_later内部就是调用的call_at。


import asyncio
 
def call_back(n, loop):
    print(f"callback {n} 运行时间点{loop.time()}")
 
async def main(loop):
    now = loop.time()
    print("当前的内部时间", now)
    print("循环时间", now)
    print("注册callback")
    loop.call_at(now + 0.1, call_back, 1, loop)
    loop.call_at(now + 0.2, call_back, 2, loop)
    loop.call_soon(call_back, 3, loop)
    await asyncio.sleep(1)
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        print("进入事件循环")
        loop.run_until_complete(main(loop))
    finally:
        print("关闭循环")
        loop.close()

输出:

进入事件循环
当前的内部时间 21494.175900164
循环时间 21494.175900164
注册callback
callback 3 运行时间点21494.1760417
callback 1 运行时间点21494.276630947
callback 2 运行时间点21494.377943985
关闭循环

因为call_later内部实现就是通过call_at实现。

10. 协程队列 asyncio.Queue

mport asyncio
import random
 
async def product(queue,n):
    for x in range(n):
        print('producing {}/{}'.format(x,n))
        await asyncio.sleep(random.random())
        item = str(x)
        await queue.put(item)
    
async def consume(queue):
    while True:
        item = await queue.get()
        print('consuming{}...'.format(item))
        await asyncio.sleep(random.random())
        # 通知队列该项目已被处理
        queue.task_done()
 
async def main(n):
    queue = asyncio.Queue(maxsize=4)
    # 此时consume方法并没有真正开始运行
    consumer = asyncio.ensure_future(consume(queue))
    # 此时produce生产后,consume才开始运行
    await product(queue,n)
    # 等到消费者处理完所有项目
    await queue.join()
    # 消费者仍在等待商品,取消它
    consumer.cancel()
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(10))

程序通过produce方法把10个任务放入Queue中,通过consume方法进行消费回收。
在此,要想强调一下关于Queue.put_nowait方法,官方的解释是:

Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
意思是将item放入无阻塞的queue中,如果没有可用的插槽(即没有存放空间),就抛出QueueFull异常。把上面的程序改一下:

import asyncio
import random
 
async def produce(queue,n):
    for x in range(n):
        print('producing {}/{}'.format(x,n))
        await asyncio.sleep(random.random())
        item = str(x)
        #替换 await queue.put(item)
        queue.put_nowait(item)
        #打印当前queue里面item存放数量
        print('qsize:',queue.qsize())
    
async def main(n):
    queue = asyncio.Queue(maxsize=3)
    await produce(queue, n)
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(7))
    loop.close()

将await put方法替换成了put_nowait方法,将Queue的容量设为3个,而生产的任务设为6个。运行结果如下:

producing 0/7
qsize: 1
producing 1/7
qsize: 2
producing 2/7
qsize: 3
producing 3/7
Traceback (most recent call last):
  File "/data/code/py_code/asyncio_example/aio_queue1.py", line 20, in <module>
    loop.run_until_complete(main(7))
  File "/data/python3/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/data/code/py_code/asyncio_example/aio_queue1.py", line 16, in main
    await produce(queue, n)
  File "/data/code/py_code/asyncio_example/aio_queue1.py", line 10, in produce
    queue.put_nowait(item)
  File "/data/python3/lib/python3.8/asyncio/queues.py", line 148, in put_nowait
    raise QueueFull
asyncio.queues.QueueFull

可以看出,在生产第4个任务的时候,因为Queue的容量只有3个,所以抛出QueueFull错误,符合预期要求。

注意,上面的程序只使用了produce方法,没有调用consume方法,因为我们一边调用produce,一边调用consume时,consume方法会将完成的任务进行回收。相当于出水管将水抽出水池,而入水管又将水抽入,有可能永远也不会满(即抛出QueueFull异常)。

11. 协程锁 asyncio.Lock

多个函数调用同一个异步函数的时候,被调用函数可能在执行同一个参数对象;

例如多个 func() 调用同一个包含 request 功能的异步函数,其请求的 url 可能是同一个 url,这样就会有重复的请求,实际生产中这是不能被允许的;

这个时候就需要使用 asyncio 内置的 Lock,保证 url 不被重复调用,此处的 Lock 是应用级别实现的,并没有没有像线程锁一样深入操作系统;示例代码如下:

import aiohttp
import asyncio
 
cache = {}
lock = asyncio.Lock()
 
async def get_stuff(url):
    # await lock.acquire()
    async with lock:
        if url in cache:
            return cache[url]
        stuff = await aiohttp.request('GET', url)
        cache[url] = stuff
        return stuff
    # lock.release()
 
async def parse_sutff():
    stuff = await get_stuff()
    # do some parsing
    
async def use_stuff():
    stuff = await get_stuff()
    # use stuff to do something interesting
    
tasks = [parse_sutff(), use_stuff()]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

12. 异步加线程池或进程池 loop.run_in_executor

线程池:

前面的代码都是异步的,就如 sleep,需要用asyncio.sleep而不是阻塞的time.sleep,如果有同步逻辑,怎么;利用 asyncio 实现并发呢?答案是用run_in_executor
loop.run_in_executor(None, a)这里面第一个参数是要传递concurrent.futures.Executo实例的,传递 None 会选择默认的 executor:

import asyncio
import random
import time
from concurrent.futures import ThreadPoolExecutor
 
def random_sleep(num):
    print('sleep start:', num, 's')
    time.sleep(num)
    print('sleep end:', num, 's')
    
async def main():
    executor = ThreadPoolExecutor(5)
    tasks = []
    for _ in range(5):
        sleep_time = random.randint(1,5)
        task = loop.run_in_executor(executor, random_sleep, sleep_time)
        tasks.append(task)
    await asyncio.wait(tasks)
    
if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    
    loop.run_until_complete(main())
    print('end time: {}'.format(time.time()-start_time))

运行结果如下:

sleep start: 1 s
sleep start: 4 s
sleep start: 2 s
sleep start: 5 s
sleep start: 2 s
sleep end: 1 s
sleep end: 2 s
sleep end: 2 s
sleep end: 4 s
sleep end: 5 s
end time: 5.010895490646362

进程池:

import asyncio
import random
import time
from concurrent.futures import ProcessPoolExecutor
 
def random_sleep(num):
    print('sleep start:', num, 's')
    time.sleep(num)
    print('sleep end:', num, 's')
    
async def main():
    executor = ProcessPoolExecutor(5)
    tasks = []
    for _ in range(5):
        sleep_time = random.randint(1,5)
        task = loop.run_in_executor(executor, random_sleep, sleep_time)
        tasks.append(task)
    await asyncio.wait(tasks)
    
if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    
    loop.run_until_complete(main())
    print('end time: {}'.format(time.time()-start_time))

13. 在其他线程执行协程 run_coroutine_threadsafe(这是线程安全的)

示例代码如下:

import time
import asyncio
from threading import Thread
from functools import partial
 
async def a():
    time.sleep(1)
    return 'A'
 
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
 
def shutdown(loop):
    loop.stop()
    
if __name__ == '__main__':
    new_loop = asyncio.new_event_loop()
    t = Thread(target=start_loop, args=(new_loop,))
    t.start()
    future = asyncio.run_coroutine_threadsafe(a(), new_loop)
    print(future)
    print(f'Result: {future.result(timeout=2)}')
    new_loop.call_soon_threadsafe(partial(shutdown, new_loop))

运行结果如下:

<Future at 0x7f6da03bcd60 state=pending>
Result: A

这里面有几个细节要注意:

  1. 协程应该从另一个线程中调用,而非事件循环运行所在线程,所以用asyncio.new_event_loop()新建一个事件循环。
  2. 在执行协程前要确保新创建的事件循环是运行着的,所以需要用start_loop之类的方式启动循环
  3. 接着就可以用asyncio.run_coroutine_threadsafe执行协程 a 了,它返回了一个 Future 对象。
  4. 可以通过输出感受到 future 一开始是 pending 的,因为协程 a 里面会 sleep 1 秒才返回结果。
  5. future.result (timeout=2)就可以获得结果,设置 timeout 的值要大于 a 协程执行时间,要不然会抛出 TimeoutError。
  6. 一开始我们创建的新的事件循环跑在一个线程里面,由于loop.run_forever会阻塞程序关闭,所以需要结束时杀掉线程,所以用call_soon_threadsafe回调函数 shutdown 去停止事件循环。

这里再说一下call_soon_threadsafe,看名字就知道它是线程安全版本的call_soon,其实就是在另外一个线程里面调度回调。BTW, 其实asyncio.run_coroutine_threadsafe底层也是用的它。

14 信号量 asyncio.Semaphore

用法:

class asyncio.Semaphore(value=1)

一个信号量对象。不是线程安全的。

信号量管理一个内部计数器,该计数器由每个acquire() 调用递减,并由每个release() 调用递增。计数器永远不会低于零;当 acquire() 发现它为零时,它会阻塞,等待某些任务调用 release() 。

可选的value 参数给出内部计数器的初始值(默认为1)。如果给定值小于 0,则会引发 ValueError。

在 3.10 版中更改:删除了loop范围。

使用信号量的首选方法是 async with 语句:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

这相当于:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()

简单例子:

import httpx
import asyncio

semaphore = 5
tasks_list = []

base_url = 'http://httpbin.org/get?page='

sem = asyncio.Semaphore(semaphore)
    
async def download(client, url):
    async with sem:
        response = await client.get(url)
        print(response.json().get("args"))
        await asyncio.sleep(2)
            
async def read_data():
    async with httpx.AsyncClient(timeout=30) as client:
        for num in range(1,20):
            url = base_url+str(num)
            task = asyncio.create_task(download(client, url))
            tasks_list.append(task)
        await asyncio.gather(*tasks_list)

    
def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(read_data())

if __name__ == '__main__':
    main()

运行结果示例如下:

{'page': '4'}
{'page': '3'}
{'page': '1'}
{'page': '2'}
{'page': '5'}
{'page': '7'}
{'page': '6'}
{'page': '8'}
{'page': '10'}
{'page': '9'}
{'page': '11'}
{'page': '12'}
{'page': '13'}
{'page': '14'}
{'page': '15'}
{'page': '16'}
{'page': '17'}
{'page': '18'}
{'page': '19'}

从运行过程得知,其中的返回不是按顺序的,所以是协程并发的。都是按照asyncio.Semaphore定义的5个并发运行,并且每请求完一个后asyncio.sleep2秒。

二. Task 类详解

先来看一下Task类的简单介绍(英文原文文档):

class asyncio.Task(coro, *, loop=None)
A Future-like object that runs a Python coroutine. Not thread-safe.
Tasks are used to run coroutines in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the completion of the Future. When the Future is done, the execution of the wrapped coroutine resumes.
Event loops use cooperative scheduling: an event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other Tasks, callbacks, or performs IO operations.
Use the high-level asyncio.create_task() function to create Tasks, or the low-level loop.create_task() or ensure_future() functions. Manual instantiation of Tasks is discouraged.
To cancel a running Task use the cancel() method. Calling it will cause the Task to throw a CancelledError exception into the wrapped coroutine. If a coroutine is awaiting on a Future object during cancellation, the Future object will be cancelled.
cancelled() can be used to check if the Task was cancelled. The method returns True if the wrapped coroutine did not suppress the CancelledError exception and was actually cancelled.
asyncio.Task inherits from Future all of its APIs except Future.set_result() and Future.set_exception().
Tasks support the contextvars module. When a Task is created it copies the current context and later runs its coroutine in the copied context.

上面的文字描述中推出了几个非常重要的信息,特在此总结如下:

  1. 他是作为一个python协程对象,和Future对象很像的这么一个对象,但不是线程安全的;他继承了Future所有的API,,除了Future.set_result()和Future.set_Exception();
  2. 使用高层API asyncio.ccreate_task()创建任务,或者是使用低层API loop.create_task()或者是loop.ensure_future()创建任务对象;
  3. 相比于协程函数,任务时有状态的,可以使用Task.cancel()进行取消,这会触发CancelledError异常,使用cancelled()检查是否取消。

下面介绍Task类常见的一些使用函数

cancel()

Request the Task to be cancelled.

其实前面已经有所介绍,最好是使用他会出发CancelledError异常,所以需要取消的协程函数里面的代码最好在try-except语句块中进行,这样方便触发异常,打印相关信息,但是Task.cancel()没有办法保证任务一定会取消,而Future.cancel()是可以保证任务一定取消的。可以参见下面的一个例子:

import asyncio
 
async def cancel_me():
   print('cancel_me(): before sleep')
   try:
       await asyncio.sleep(3600) #模拟一个耗时任务
   except asyncio.CancelledError:
       print('cancel_me(): cancel sleep')
       raise
   finally:
        print('cancel_me(): after sleep')
 
async def main():
   #通过协程创建一个任务,需要注意的是,在创建任务的时候,就会跳入到异步开始执行
   #因为是3.7版本,创建一个任务就相当于是运行了异步函数cancel_me
   task = asyncio.create_task(cancel_me()) 
   #等待一秒钟
   await asyncio.sleep(1)
   print('main函数休息完了')
   #发出取消任务的请求
   task.cancel()  
   try:
       await task  #因为任务被取消,触发了异常
   except asyncio.CancelledError:
       print("main(): cancel_me is cancelled now")
 
asyncio.run(main())

运行结果为:

cancel_me(): before sleep
main函数休息完了
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now

运行过程分析:

  1. 首先run函数启动主函数入口main,在main中,因为第一句话就是调用异步函数cancel_me()函数,所以先打印出了第一句话;
  2. 然后进入cancel_me中的try语句,遇到await,暂停,这时候返回main中执行,但是有在main中遇到了await,也会暂停,但是由于main中只需要暂停1秒,而cancel_me中要暂停3600秒,所以等到main的暂停结束后,接着运行main,所以打印出第二句话;
  3. 接下来遇到取消任务的请求task.cancel(),然后继续执行main里面的try,又遇到了await,接着main进入暂停,接下来进入到cancel_me函数中,但是由于main中请求了取消任务,所以那个耗时3600秒的任务就不再执行了,直接触发了Cancelled_Error异常,打印出第三句话,接下来又raise一个异常信息;
  4. 接下来执行cancel_me的finally,打印出第四句话,此时cancel_me执行完毕,由于他抛出了一个异常,返回到主程序main中,触发异常,打印出第五句话。

done()

当一个被包装得协程既没有触发异常、也没有被取消的时候,意味着它是done的,返回true。

result()

返回任务的执行结果,
当任务被正常执行完毕,则返回结果;
当任务被取消了,调用这个方法,会触发CancelledError异常;
当任务返回的结果是无用的时候,则调用这个方法会触发InvalidStateError;
当任务出发了一个异常而中断,调用这个方法还会再次触发这个使程序中断的异常。

exception()

返回任务的异常信息,触发了什么异常,就返回什么异常,如果任务是正常执行的无异常,则返回None;
当任务被取消了,调用这个方法会触发CancelledError异常;
当任务没有做完,调用这个方法会触发InvalidStateError异常。
下面还有一些不常用的方法,如下:

add_done_callback(callback, *, context=None)

remove_done_callback(callback)

get_stack(*, limit=None)

print_stack(*, limit=None, file=None)

all_tasks(loop=None),这是一个类方法

current_task(loop=None),这是一个类方法

三. 异步函数的结果获取

对于异步编程、异步函数而言,最重要的就是异步函数调用结束之后,获取异步函数的返回值,我们可以有以下几种方式来获取函数的返回值,第一是直接通过Task.result()来获取;第二种是绑定一个回调函数来获取,即函数执行完毕后调用一个函数来获取异步函数的返回值。

1. 直接通过result来获取

import asyncio
import time
 
async def hello1(a,b):
   print("Hello world 01 begin")
   await asyncio.sleep(3)  #模拟耗时任务3秒
   print("Hello again 01 end")
   return a+b
 
coroutine=hello1(10,5)
loop = asyncio.get_event_loop()                #第一步:创建事件循环
task=asyncio.ensure_future(coroutine)         #第二步:将多个协程函数包装成任务列表
loop.run_until_complete(task)                  #第三步:通过事件循环运行,run_until_complete的参数是一个futrue对象
print('-------------------------------------')
print(task.result())

运行结果为:

Hello world 01 begin
Hello again 01 end
-------------------------------------
15

2. 通过定义回调函数来获取

import asyncio
import time
 
async def hello1(a,b):
   print("Hello world 01 begin")
   await asyncio.sleep(3)  #模拟耗时任务3秒
   print("Hello again 01 end")
   return a+b
 
def callback(future):   #定义的回调函数
   print(future.result())
 
loop = asyncio.get_event_loop()                #第一步:创建事件循环
task=asyncio.ensure_future(hello1(10,5))       #第二步:将多个协程函数包装成任务
task.add_done_callback(callback)               #并被任务绑定一个回调函数
loop.run_until_complete(task)                  #第三步:通过事件循环运行
loop.close()                                   #第四步:关闭事件循环

运行结果为:

Hello world 01 begin
Hello again 01 end
15

注意:所谓的回调函数,就是指协程函数coroutine执行结束时候会调用回调函数。并通过参数future获取协程执行的结果。我们创建的task和回调里的future对象,实际上是同一个对象,因为task是future的子类。

三、asyncio异步编程的基本写法

定义协程

协程的定义,需要使用async def语句。

async def do_some_work(x): 
    pass

do_some_work 便是一个协程。准确来说,do_some_work 是一个协程函数,可以通过 asyncio.iscoroutinefunction来验证:

import asyncio

async def do_some_work(x): 
    pass

print(asyncio.iscoroutinefunction(do_some_work))

运行结果:

True

这个协程什么都没做,我们让它睡眠几秒,以模拟实际的工作量 :

async def do_some_work(x): 
    print("waiting " + str(x))
    await asyncio.sleep(x)

在解释 await 之前,有必要说明一下协程可以做哪些事。协程可以:

  • 等待一个 future 结束
  • 等待另一个协程(产生一个结果,或引发一个异常)
  • 产生一个结果给正在等它的协程
  • 引发一个异常给正在等它的协程

asyncio.sleep也是一个协程,所以await asyncio.sleep(x)就是等待另一个协程。可参见asyncio.sleep的文档:

async def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""

运行协程

调用协程函数,协程并不会开始运行,只是返回一个协程对象,可以asyncio.iscoroutine 来验证:

print(asyncio.iscoroutinefunction(do_some_work()))

运行结果:

True

此处还会引发一条警告:

/data/code/temp.py:7: RuntimeWarning: coroutine 'do_some_work' was never awaited
  print(asyncio.iscoroutine(do_some_work(3)))
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

要让这个协程对象运行的话,有两种方式:

  • 在另一个已经运行的协程中用 await 等待它
  • 通过 ensure_future 函数计划它的执行

简单来说,只有 loop 运行了,协程才可能运行。下面先拿到当前线程缺省的 loop ,然后把协程对象交给 loop.run_until_complete,协程对象随后会在 loop 里得到运行。

loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))

run_until_complete 是一个阻塞(blocking)调用,直到协程运行结束,它才返回。这一点从函数名不难看出。run_until_complete 的参数是一个 future,但是我们这里传给它的却是协程对象,之所以能这样,是因为它在内部做了检查,通过 ensure_future 函数把协程对象包装(wrap)成了 future。所以,我们可以写得更明显一些:

loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

完整代码:

import asyncio
 
async def do_some_work(x): 
    print("waiting " + str(x))
    await asyncio.sleep(x)
 
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

运行结果:

Waiting 3
<三秒钟后程序结束>

案例模板

1. python3.7之前的版本

a. 从协程中返回值

import asyncio
 
async def foo():
    print("这是一个协程")
    return "返回值"
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        print("开始运行协程")
        coro = foo()
        print("进入事件循环")
        result = loop.run_until_complete(coro)
        print(f"run_until_complete可以获取协程的{result},默认输出None")
    finally:
        print("关闭事件循环")
        loop.close()

运行结果如下:

开始运行协程
进入事件循环
这是一个协程
run_until_complete可以获取协程的返回值,默认输出None
关闭事件循环

run_until_complete可以获取协程的返回值,如果没有给定返回值,则像函数一样,默认返回None。

b. 协程调用协程

一个协程可以启动另一个协程,从而可以任务根据工作内容,封装到不同的协程中。我们可以在协程中使用await关键字,链式的调度协程,来形成一个协程任务流。向下面的例子一样。

import asyncio
 
async def main():
    print("主协程")
    print("等待result1协程运行")
    res1 = await result1()
    print("等待result2协程运行")
    res2 = await result2(res1)
    return (res1,res2)
 
async def result1():
    print("这是result1协程")
    return "result1"
 
async def result2(arg):
    print("这是result2协程")
    return f"result2接收了一个参数,{arg}"
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        result = loop.run_until_complete(main())
        print(f"获取返回值:{result}")
    finally:
        print("关闭事件循环")
        loop.close()

运行结果:

主协程
等待result1协程运行
这是result1协程
等待result2协程运行
这是result2协程
获取返回值:('result1', 'result2接收了一个参数,result1')
关闭事件循环

c. 无参数、无返回值

import asyncio
 
async def sleep1():
   print("Hello world 01 begin")
   await asyncio.sleep(1)  #模拟耗时任务1秒
   print("Hello again 01 end")
 
async def sleep2():
   print("Hello world 02 begin")
   await asyncio.sleep(2)   #模拟耗时任务2秒
   print("Hello again 02 end")
 
async def sleep3():
   print("Hello world 03 begin")
   await asyncio.sleep(3)   #模拟耗时任务3秒
   print("Hello again 03 end")
 
loop = asyncio.get_event_loop()                #第一步:创建事件循环
tasks = [sleep1(), sleep2(),sleep3()]          #第二步:将多个协程函数包装成任务列表
# loop.run_until_complete(asyncio.gather(*tasks))
loop.run_until_complete(asyncio.wait(tasks))   #第三步:通过事件循环运行
loop.close()                                   #第四步:取消事件循环

运行结果如下:

Hello world 01 begin
Hello world 03 begin
Hello world 02 begin
Hello again 01 end
Hello again 02 end
Hello again 03 end

d. 有参数、有返回值

import asyncio
 
async def sleep1(a,b):
   print("Hello world 01 begin")
   await asyncio.sleep(1)  #模拟耗时任务1秒
   print("Hello again 01 end")
   return a+b
 
async def sleep2(a,b):
   print("Hello world 02 begin")
   await asyncio.sleep(2)   #模拟耗时任务2秒
   print("Hello again 02 end")
   return a-b
 
async def sleep3(a,b):
   print("Hello world 03 begin")
   await asyncio.sleep(4)   #模拟耗时任务3秒
   print("Hello again 03 end")
   return a*b
  
loop = asyncio.get_event_loop()                     # 第一步:创建事件循环
task1=asyncio.ensure_future(sleep1(10,5))
task2=asyncio.ensure_future(sleep2(10,5))
task3=asyncio.ensure_future(sleep3(10,5))
tasks = [task1,task2,task3]                         # 第二步:将多个协程函数包装成任务列表
loop.run_until_complete(asyncio.wait(tasks))        # 第三步:通过事件循环运行
# loop.run_until_complete(asyncio.gather(*tasks))
print(task1.result())                               # 并且在所有的任务完成之后,获取异步函数的返回值   
print(task2.result())
print(task3.result())
loop.close()                                        # 第四步:关闭事件循环

运行结果为:

Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 01 end
Hello again 02 end
Hello again 03 end
15
5
50

e. 有参数、有返回值,并按列表顺序输出结果

import asyncio
 
async def sleep1():
    print("Hello world 01 begin")
    await asyncio.sleep(1)
    print("Hello world 01 end")
    return 'sleep1'
    
async def sleep2():
    print("Hello world 02 begin")
    await asyncio.sleep(2)
    print("Hello world 02 end")
    return 'sleep2'
    
async def sleep3():
    print("Hello world 03 begin")
    await asyncio.sleep(3)
    print("Hello world 03 end")
    return 'sleep3'
    
async def run():
    task1 = asyncio.ensure_future(sleep1())
    task2 = asyncio.ensure_future(sleep2())
    task3 = asyncio.ensure_future(sleep3())
    tasks = [task3,task1,task2]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)
 
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

运行结果为:

Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello world 01 end
Hello world 02 end
Hello world 03 end
sleep3
sleep1
sleep2

总结:四步走(针对python3.7之前的版本)

第一步:构造事件循环

  • loop=asyncio.get_running_loop() #返回(获取)在当前线程中正在运行的事件循环,如果没有正在运行的事件循环,则会显示错误;它是python3.7中新添加的
  • loop=asyncio.get_event_loop() #获得一个事件循环,如果当前线程还没有事件循环,则创建一个新的事件循环loop;
  • loop=asyncio.set_event_loop(loop) #设置一个事件循环为当前线程的事件循环;
  • loop=asyncio.new_event_loop() #创建一个新的事件循环

第二步:将一个或者是多个协程函数包装成Task对象

#高层API

task = asyncio.create_task(coro(参数列表))   # 这是3.7版本新添加的
task = asyncio.ensure_future(coro(参数列表)) 

#低层API

loop.create_future(coro)
loop.create_task(coro)

‘’‘需要注意的是,在使用Task.result()获取协程函数结果的时候,使用asyncio.create_task()却会显示错
但是使用asyncio.ensure_future却正确’‘’

第三步:通过事件循环运行

  • loop.run_until_complete(asyncio.wait(tasks)) #通过asyncio.wait()整合多个task

  • loop.run_until_complete(asyncio.gather(tasks)) #通过asyncio.gather()整合多个task

  • loop.run_until_complete(task_1) #单个任务则不需要整合

  • loop.run_forever() #但是这个方法在新版本已经取消,不再推荐使用,因为使用起来不简洁

使用gather或者wait可以同时注册多个任务,实现并发,但他们的设计是完全不一样的,主要区别如下:

(1)参数形式不一样

gather的参数为 *coroutines_or_futures,即如这种形式

tasks = asyncio.gather(*[task1,task2,task3])或者
tasks = asyncio.gather(task1,task2,task3)

loop.run_until_complete(tasks)

wait的参数为列表或者集合的形式,如下

tasks = asyncio.wait([task1,task2,task3])

loop.run_until_complete(tasks)
(2)返回的值不一样

gather的定义如下,gather返回的是每一个任务运行的结果,

results = await asyncio.gather(*tasks) 

wait的定义如下,返回dones是已经完成的任务,pending是未完成的任务,都是集合类型

done, pending = yield from asyncio.wait(fs)

简单来说相同点和不同点:

  • 相同:从功能上看,asyncio.wait 和 asyncio.gather 实现的效果是相同的,都是把所有 Task 任务结果收集起来。
  • 不同:asyncio.wait 会返回两个值:done 和 pending,done 为已完成的协程 Task,pending 为超时未完成的协程 Task,需通过 future.result 调用 Task 的 result;而asyncio.gather 返回的是所有已完成 Task 的 result,不需要再进行调用或其他操作,就可以得到全部结果。
results = await asyncio.gather(*tasks)
for result in results:
    print(result)

等效于:

done,pending = await asyncio.wait(tasks)
for done_task in done:
    print(done_task.result())
asyncio.gather 和asyncio.wait 它俩的区别的第一层区别:
  1. asyncio.gather封装的Task全程黑盒,只告诉你协程结果。
  2. asyncio.wait会返回封装的Task(包含已完成和挂起的任务),如果你关注协程执行结果你需要从对应Task实例里面用result方法自己拿。

为什么说「第一层区别」,asyncio.wait看名字可以理解为「等待」,所以返回值的第二项是pending列表,但是看上面的例子,pending是空集合,那么在什么情况下,pending里面不为空呢?这就是第二层区别:asyncio.wait支持选择返回的时机。

asyncio.wait支持一个接收参数return_when,在默认情况下,asyncio.wait会等待全部任务完成(return_when='ALL_COMPLETED'),它还支持FIRST_COMPLETED(第一个协程完成就返回)和FIRST_EXCEPTION(出现第一个异常就返回)

import asyncio
 
async def a():
    print('Suspending a')
    await asyncio.sleep(3)
    print('Resuming a')
    return 'A'
 
async def b():
    print('Suspending b')
    await asyncio.sleep(1)
    print('Resuming b')
    return 'B'
 
async def main():
    done, pending = await asyncio.wait([a(), b()], return_when=asyncio.tasks.FIRST_COMPLETED)
    print(done)
    print(pending)
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

运行结果如下:

Suspending b
Suspending a
Resuming b
{<Task finished name='Task-2' coro=<b() done, defined at /data/py_code/asyncio_example/aio_wait_gather.py:10> result='B'>}
{<Task pending name='Task-3' coro=<a() running at /data/py_code/asyncio_example/aio_wait_gather.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f6313d72ac0>()]>>}

这次只有协程b完成了,协程a还是pending状态。

在大部分情况下,用asyncio.gather是足够的,如果你有特殊需求,可以选择asyncio.wait,举2个例子:

  1. 需要拿到封装好的Task,以便取消或者添加成功回调等
  2. 业务上需要FIRST_COMPLETED/FIRST_EXCEPTION即返回的

第四步:关闭事件循环

loop.close()

以上示例都没有调用 loop.close,好像也没有什么问题。所以到底要不要调 loop.close 呢?
简单来说,loop 只要不关闭,就还可以再运行:

loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()

但是如果关闭了,就不能再运行了:

loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3))  # 此处异常

建议调用loop.close,以彻底清理 loop 对象防止误用

2. python3.7版本

在python3.7版本中,asyncio又引进了一些新的特性和API

(1)例子一:无参数、无返回值

import asyncio
 
async def sleep1():
   print("Hello world 01 begin")
   await asyncio.sleep(1)  #模拟耗时任务1秒
   print("Hello again 01 end")
 
async def sleep2():
   print("Hello world 02 begin")
   await asyncio.sleep(2)   #模拟耗时任务2秒
   print("Hello again 02 end")
 
async def sleep3():
   print("Hello world 03 begin")
   await asyncio.sleep(3)   #模拟耗时任务3秒
   print("Hello again 03 end")
 
async def main():
   results=await asyncio.gather(sleep1(),sleep2(),sleep3())
   for result in results:
       print(result)     #因为没返回值,故而返回None
 
asyncio.run(main())

运行结果为:

Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 01 end
Hello again 02 end
Hello again 03 end
None
None
None

(2)例子二:有参数、有返回值

import asyncio
import time
 
a = time.time()
 
async def sleep1():
    print("Hello world 01 begin")
    await asyncio.sleep(1)
    print("Hello world 01 end")
    return 'sleep1'
    
async def sleep2():
    print("Hello world 02 begin")
    await asyncio.sleep(2)
    print("Hello world 02 end")
    return 'sleep2'
    
async def sleep3():
    print("Hello world 03 begin")
    await asyncio.sleep(3)
    print("Hello world 03 end")
    return 'sleep3'
    
async def main():
    task1 = asyncio.ensure_future(sleep1())
    task2 = asyncio.ensure_future(sleep2())
    task3 = asyncio.ensure_future(sleep3())
    tasks = [task3,task1,task2]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)
 
asyncio.run(main())

运行结果为:

Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello world 01 end
Hello world 02 end
Hello world 03 end
sleep3
sleep1
sleep2

总结:两步走(针对python3.7)

第一步:构建一个入口函数main

他也是一个异步协程函数,即通过async定义,并且要在main函数里面await一个或者是多个协程,同前面一样,我可以通过gather或者是wait进行组合,对于有返回值的协程函数,一般就在main里面进行结果的获取。

第二步:启动主函数main

这是python3.7新添加的函数,就一句话,即

asyncio.run(main())

注意:
不再需要显式的创建事件循环,因为在启动run函数的时候,就会自动创建一个新的事件循环。而且在main中也不需要通过事件循环去调用被包装的协程函数,只需要向普通函数那样调用即可 ,只不过使用了await关键字而已。

原文参考:
https://blog.csdn.net/qq_27825451/article/details/86218230
https://blog.csdn.net/qiuqiuit/article/details/86773310
https://zhuanlan.zhihu.com/p/59671241
https://mp.weixin.qq.com/s/bJKbHph1T0Z95rCioV4iPA
https://mozillazg.com/2017/08/python-asyncio-note-task-usage.html

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
心中带点小风骚的头像心中带点小风骚普通用户
上一篇 2023年3月8日 下午10:21
下一篇 2023年3月8日 下午10:23

相关推荐