深入理解asyncio(二)
/ / / 阅读数:28500Asyncio.gather vs asyncio.wait
在上篇文章已经看到多次用asyncio.gather
了,还有另外一个用法是asyncio.wait
,他们都可以让多个协程并发执行。那为什么提供 2 个方法呢?他们有什么区别,适用场景是怎么样的呢?其实我之前也是有点困惑,直到我读了 asyncio 的源码。我们先看 2 个协程的例子:
async def a(): print('Suspending a') await asyncio.sleep(3) print('Resuming a') return 'A' async def b(): print('Suspending b') await asyncio.sleep(1) print('Resuming b') return 'B' |
在 IPython 里面用 gather 执行一下:
In : return_value_a, return_value_b = await asyncio.gather(a(), b()) Suspending a Suspending b Resuming b Resuming a In : return_value_a, return_value_b Out: ('A', 'B') |
Ok,asyncio.gather
方法的名字说明了它的用途,gather 的意思是「搜集」,也就是能够收集协程的结果,而且要注意,它会按输入协程的顺序保存的对应协程的执行结果。
接着我们说asyncio.await
,先执行一下:
In : done, pending = await asyncio.wait([a(), b()]) Suspending b Suspending a Resuming b Resuming a In : done Out: {<Task finished coro=<a() done, defined at <ipython-input-5-5ee142734d16>:1> result='A'>, <Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result='B'>} In : pending Out: set() In : task = list(done)[0] In : task Out: <Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result='B'> In : task.result() Out: 'B' |
asyncio.wait
的返回值有 2 项,第一项表示完成的任务列表 (done),第二项表示等待 (Future) 完成的任务列表 (pending),每个任务都是一个 Task 实例,由于这 2 个任务都已经完成,所以可以执行task.result()
获得协程返回值。
Ok, 说到这里,我总结下它俩的区别的第一层区别:
asyncio.gather
封装的 Task 全程黑盒,只告诉你协程结果。asyncio.wait
会返回封装的 Task (包含已完成和挂起的任务),如果你关注协程执行结果你需要从对应 Task 实例里面用 result 方法自己拿。
为什么说「第一层区别」,asyncio.wait
看名字可以理解为「等待」,所以返回值的第二项是 pending 列表,但是看上面的例子,pending 是空集合,那么在什么情况下,pending 里面不为空呢?这就是第二层区别:asyncio.wait
支持选择返回的时机。
asyncio.wait
支持一个接收参数return_when
,在默认情况下,asyncio.wait
会等待全部任务完成 (return_when='ALL_COMPLETED'),它还支持 FIRST_COMPLETED(第一个协程完成就返回)和 FIRST_EXCEPTION(出现第一个异常就返回):
In : done, pending = await asyncio.wait([a(), b()], return_when=asyncio.tasks.FIRST_COMPLETED) Suspending a Suspending b Resuming b In : done Out: {<Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result='B'>} In : pending Out: {<Task pending coro=<a() running at <ipython-input-5-5ee142734d16>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108065e58>()]>>} |
看到了吧,这次只有协程 b 完成了,协程 a 还是 pending 状态。
在大部分情况下,用 asyncio.gather 是足够的,如果你有特殊需求,可以选择 asyncio.wait,举 2 个例子:
- 需要拿到封装好的 Task,以便取消或者添加成功回调等
- 业务上需要 FIRST_COMPLETED/FIRST_EXCEPTION 即返回的
asyncio.create_task vs loop.create_task vs asyncio.ensure_future
创建一个 Task 一共有 3 种方法,如这小节的标题。在上篇文章我说过,从 Python 3.7 开始可以统一的使用更高阶的asyncio.create_task
。其实asyncio.create_task
就是用的loop.create_task
:
def create_task(coro): loop = events.get_running_loop() return loop.create_task(coro) |
loop.create_task
接受的参数需要是一个协程,但是asyncio.ensure_future
除了接受协程,还可以是 Future 对象或者 awaitable 对象:
- 如果参数是协程,其实底层还是用的
loop.create_task
,返回 Task 对象 - 如果是 Future 对象会直接返回
- 如果是一个 awaitable 对象会 await 这个对象的__await__方法,再执行一次
ensure_future
,最后返回 Task 或者 Future
所以就像ensure_future
名字说的,确保这个是一个 Future 对象:Task 是 Future 子类,前面说过一般情况下开发者不需要自己创建 Future
其实前面说的asyncio.wait
和asyncio.gather
里面都用了asyncio.ensure_future
。对于绝大多数场景要并发执行的是协程,所以直接用asyncio.create_task
就足够了~
shield
接着说asyncio.shield
,用它可以屏蔽取消操作。一直到这里,我们还没有见识过 Task 的取消。看一个例子:
In : loop = asyncio.get_event_loop() In : task1 = loop.create_task(a()) In : task2 = loop.create_task(b()) In : task1.cancel() Out: True In : await asyncio.gather(task1, task2) Suspending a Suspending b --------------------------------------------------------------------------- CancelledError Traceback (most recent call last) cell_name in async-def-wrapper() CancelledError: |
在上面的例子中,task1 被取消了后再用asyncio.gather
收集结果,直接抛 CancelledError 错误了。这里有个细节,gather 支持return_exceptions
参数:
In : await asyncio.gather(task1, task2, return_exceptions=True) Out: [concurrent.futures._base.CancelledError(), 'B'] |
可以看到,task2 依然会执行完成,但是 task1 的返回值是一个 CancelledError 错误,也就是任务被取消了。如果一个创建后就不希望被任何情况取消,可以使用asyncio.shield
保护任务能顺利完成:
In : task1 = asyncio.shield(a()) In : task2 = loop.create_task(b()) In : ts = asyncio.gather(task1, task2, return_exceptions=True) In : task1.cancel() Out: True In : await ts Suspending a Suspending b Resuming a Resuming b Out: [concurrent.futures._base.CancelledError(), 'B'] |
可以看到虽然结果是一个 CancelledError 错误,但是看输出能确认协程实际上是执行了的。
注:此处之前有一个理解错误,已经在 深入 asyncio.shield 中重新解释和理解,推荐阅读。
asynccontextmanager
如果你了解 Python,之前可能听过或者用过 contextmanager ,一个上下文管理器。通过一个计时的例子就理解它的作用:
from contextlib import contextmanager async def a(): await asyncio.sleep(3) return 'A' async def b(): await asyncio.sleep(1) return 'B' async def s1(): return await asyncio.gather(a(), b()) @contextmanager def timed(func): start = time.perf_counter() yield asyncio.run(func()) print(f'Cost: {time.perf_counter() - start}') |
timed 函数用了 contextmanager 装饰器,把协程的运行结果 yield 出来,执行结束后还计算了耗时:
In : from contextmanager import * In : with timed(s1) as rv: ...: print(f'Result: {rv}') ...: Result: ['A', 'B'] Cost: 3.0052654459999992 |
大家先体会一下。在 Python 3.7 添加了 asynccontextmanager,也就是异步版本的 contextmanager,适合异步函数的执行,上例可以这么改:
@asynccontextmanager async def async_timed(func): start = time.perf_counter() yield await func() print(f'Cost: {time.perf_counter() - start}') async def main(): async with async_timed(s1) as rv: print(f'Result: {rv}') In : asyncio.run(main()) Result: ['A', 'B'] Cost: 3.00414147500004 |
async 版本的 with 要用async with
,另外要注意yield await func()
这句,相当于 yield +await func()
PS: contextmanager 和 asynccontextmanager 最好的理解方法是去看源码注释,可以看延伸阅读链接 2,另外延伸阅读链接 3 包含的 PR 中相关的测试代码部分也能帮助你理解
代码目录
本文代码可以在 mp 项目 找到
手动点个赞