专栏名称: Python之美
《Python web开发实战》作者的公众号。发现Python之美,主要包含Web开发、Python进阶、架构设计、Python开发招聘信息等方面内容
目录
相关文章推荐
51好读  ›  专栏  ›  Python之美

深入asyncio.shield

Python之美  · 公众号  · Python  · 2019-07-25 18:24

正文

请到「今天看啥」查看全文


在之前的深入理解asyncio(二)一文中我认为正确使用 asyncio . shield 的步骤是:

  1. 先创建 GatheringFuture 对象 ts

  2. 取消任务

  3. await ts

当时举的例子:

  1. import asyncio



  2. async def a():

  3. print('Suspending a')

  4. await asyncio.sleep(2)

  5. print('Resuming a')

  6. return 'A'



  7. async def b():

  8. print('Suspending b')

  9. await asyncio.sleep(1)

  10. print('Resuming b')

  11. return 'B'



  12. async def c1 ():

  13. task1 = asyncio.shield(a())

  14. task2 = asyncio.create_task(b())

  15. task1.cancel()

  16. await asyncio.gather(task1, task2, return_exceptions=True)



  17. async def c2():

  18. task1 = asyncio.shield(b())

  19. task2 = asyncio.create_task(a())

  20. task1.cancel()

  21. await asyncio.gather(task1, task2, return_exceptions=True)


  22. async def c3():

  23. task1 = asyncio.shield(a())

  24. task2 = asyncio.create_task(b())

  25. ts = asyncio.gather(task1, task2, return_exceptions=True)

  26. task1.cancel()

  27. await ts

按当时我的理解,c1和c2中先cancel再gather的用法是错误的,正确的是c3的写法。

昨天一位买过我书的读者(网名: 秋月风夏, 感谢反馈)QQ问我为什么不能复现,一番讨论发现我被IPython提供的await欺骗了:

  1. In [1]: from coro import *


  2. In [2]: await c3()

  3. Suspending a

  4. Suspending b

  5. Resuming b # 第一次执行await并没有完全执行a的逻辑


  6. In [3]: await c3()

  7. Suspending a

  8. Suspending b

  9. Resuming a

  10. Resuming b # 之后执行await才会让a执行完整


  11. In [4]: await c1()

  12. Suspending a

  13. Suspending b

  14. Resuming a

  15. Resuming b


  16. In [5]: await c1()

  17. Suspending a

  18. Suspending b

  19. Resuming a

  20. Resuming b

可以看到除了第一次,c1也都是正常完成的,不存在我说的「陷阱」。

研究了半天IPython源码没发现这个问题是什么造成的,但是我们可以不使用IPython提供的await支持:

  1. In [6]: asyncio.run(c1())

  2. Suspending a

  3. Suspending b

  4. Resuming b


  5. In [7]: asyncio.run(c1())

  6. Suspending a

  7. Suspending b

  8. Resuming b


  9. In [8]: asyncio.run(c3())

  10. Suspending a

  11. Suspending b

  12. Resuming b


  13. In [9]: asyncio.run(c3())

  14. Suspending a

  15. Suspending b

  16. Resuming b

看到了吧?直接用 asyncio . run 运行这些发现他们都是不对的!!那回过头来,看看c2:

  1. In [10]: asyncio.run(c2())

  2. Suspending b

  3. Suspending a

  4. Resuming b

  5. Resuming a


  6. In [11]: asyncio.run(c2())

  7. Suspending b

  8. Suspending a

  9. Resuming b

  10. Resuming a

它是正常的。而c1和c2的区别仅仅是取消协程a还是协程b的问题,那么为什么会造成取消的任务有些执行完整情况不同呢?大家可以先思考一下。

提示:协程a/b的 asyncio . sleep 时间不同:a任务执行完整至少要3秒,b任务执行完成至少要1秒。

所以在没有看源码确认前,我「猜」是 asyncio . gather 在执行任务时不考虑那些被取消了的任务的感受,现有任务都完成即结束 。所以:

  1. 如果取消任务a,当b任务1秒结束后,a还没完成,任务收集就结束了。所以任务a没有执行完整

  2. 如果取消任务b,协程b要早于协程a的执行时间

所以,是「先cancel再gather」还是「先gather再cancel」没关系,要看取消的任务(们)来不来得及赶上gather最后一班车!

学习不能靠猜,刚才我读了源码和大家分享下gather是怎么处理取消了的任务(Task)的。
 我们就拿前面说的c3里面的逻辑来说。

asyncio . gather 中首先会对所有Task(task1和task2)加一个done_callback,由于task1被取消了,所以task1.done()的结果为True(只要不是pending状态就为done),在asyncio.futures.Future的实现中,task1会立刻回调(详细的可以看延伸阅读链接1):

  1. static PyObject *

  2. future_add_done_callback(FutureObj *fut, PyObject *arg, PyObject *ctx)

  3. {

  4. ...

  5. if (fut->fut_state != STATE_PENDING) {

  6. /* The future is done/cancelled, so schedule the callback

  7. right away. */

  8. if (call_soon(fut->fut_loop, arg, (PyObject*) fut, ctx)) {

  9. return NULL;

  10. }

  11. }

  12. }

接着看一下gather里面的回调 _done_callback (详细的可以看延伸阅读链接2):

  1. def _done_callback(fut):

  2. nonlocal nfinished

  3. nfinished += 1

  4. ...

  5. if nfinished == nfuts:

  6. results = []


  7. for fut in children:

  8. if fut.cancelled():

  9. res = exceptions.CancelledError()

  10. else:

  11. res = fut.exception()

  12. if res is None:

  13. res = fut.result()

  14. results.append(res)

  15. ...

每次有任务完成回调,一开始先让nfinished累加1,nfuts是任务总数。如果完成数(nfinished)等于任务总数时就开始对全部任务设置结果,如果任务被取消就会设置为 exceptions . CancelledError () (哪怕它正常完成了也不会通过result方法把结果返回出来)。

前面说到task1在gather一开始就回调了,但是由于nfuts为2,而nfinished为1,所以不符合条件,需要等待task2(也就是未取消的任务完成)才能返回。

asyncio . shield 所谓的保护其实就是让协程a作为一个Inner Future(内部的),再在事件循环上创建一个新的 Outer Future(外部的),所以a的逻辑继续进行(inner),而我们取消的task1只是outer。

那怎么用Shield才正确?

第一种情况:符合短板理论。也就是说「取消的任务耗时小于正常任务耗时」,那么在gather搜集结果时被取消的任务已经完成。可以感受到前面例子中的 c2 是正确的。

接着说第二种,也就是官网提到的场景。在官方文档中对它的描述非常模糊,就说了一句:

Protect an awaitable object from being cancelled.

我的理解就是「保护一个可await对象,防止其被取消」。这里就不得不吐槽官方文档不够明确的描述和例子了。我觉得应该这样说:

假如有个Task(叫做something)被shield保护,如下:

  1. outer = shield(something())

  2. res = await outer

如果 outer 被取消了,不会影响Task本身(something)的执行。

所以官网里面说了一句:

From the point of view of something(), the cancellation did not happen.

也就是「从something() 的角度看来,取消操作并没有发生」。官网没有给出完整的例子,我用2个例子来帮助理解:

  1. async def stop_after(loop, when):

  2. await asyncio .sleep(when)

  3. loop.stop()



  4. def c4():

  5. loop = asyncio.get_event_loop()

  6. outer = asyncio.shield(a())

  7. outer.cancel()

  8. loop.create_task(stop_after(loop, 3))


  9. loop.run_forever()

  10. print(outer.cancelled())

注意c4不是异步函数,在里面使用 loop . run_forever 让事件循环一直运行下去,里面有3个任务: a () outer stop_after ( loop , 3 ) ,第三个任务会在3秒后把停掉,这个例子可以用来验证上面说的「如果 outer 被取消了,不会影响Task本身(something)的执行」

运行一下:

  1. In [14]: c4()

  2. Suspending a

  3. Resuming a

  4. True

可以看到这样做,虽然outer取消了,但是异步函数a的逻辑执行完整了。

基于这种思路,我突然想到为什么「IPython用await第一次没有执行完整,之后每次都能执行完整」,说结论前先看另外一个例子:

  1. async def cancel_after(task, when):

  2. await asyncio.sleep(when)

  3. task.cancel()



  4. async def d(n):

  5. print(f'Suspending a with {n}')

  6. await asyncio.sleep(2)

  7. print(f'Resuming a with {n}')

  8. return 'A'



  9. async def c5():

  10. loop = asyncio.get_event_loop()

  11. n = random.randint(1, 100)

  12. outer = asyncio.shield(d(n))

  13. loop.create_task(cancel_after(outer, 1))

  14. try:

  15. await outer

  16. except asyncio.CancelledError:

  17. print('Cancelled!')

这个例子中有2个任务, outer cancel_after ( outer , 1 ) ,cancel_after会在1秒后把outer取消掉。另外这次用了新的一步函数d,接受一个随机参数(一会就能感受到用意)而且这次要重新进入IPython交互环境(防止之前的测试对其产生影响),而且不再用 aysncio . run 了:

  1. In [1]: from coro import *


  2. In [2]: import asyncio


  3. In [3]: import random


  4. In [4]: loop = asyncio.get_event_loop()


  5. In [5]: loop.run_until_complete(c5())

  6. Suspending a with 72

  7. Cancelled! # 注意,这就是第一次,也没有把d运行完整


  8. In [6]: loop.run_until_complete(c5())

  9. Suspending a with 48 # 启动了下一轮随机数

  10. Resuming a with 72 # 结束了上一轮随机数

  11. Cancelled!


  12. In [7]: loop.run_until_complete(c5())

  13. Suspending a with 26

  14. Resuming a with 48

  15. Cancelled!


  16. In [8]: loop.run_until_complete(c5())

  17. Suspending a with 38

  18. Resuming a with 26

  19. Cancelled!

每次事件循环会把全部任务轮询一遍,outer.cancel虽然取消了Outer任务,但是Inner任务还是running状态的(有兴趣的同学可以写个例子证实一下),等待下次事件循环( loop.run until complete的目标是等待c5执行结束,它并不关注d运行状态)。而之前在IPython里面用 await 就会造成第一次不完整也解释通了:

  1. async def c6():

  2. n = random.randint(1, 100)

  3. task1 = asyncio.shield(d(n))

  4. task2 = asyncio.create_task(b())

  5. task1.cancel()

  6. await asyncio.gather(task1, task2, return_exceptions=True)


  7. ipython


  8. In [1]: from coro import *


  9. In [2]: await c6()

  10. Suspending a with 34

  11. Suspending b

  12. Resuming b


  13. In [3]: await c6()

  14. Suspending a with 41

  15. Suspending b

  16. Resuming a with 34

  17. Resuming b


  18. In [4]: await c6()

  19. Suspending a with 71

  20. Suspending b

  21. Resuming a with 41

  22. Resuming b

看到了吧。那为什么说用 asyncio . run 不行呢:

  1. In [5]: asyncio.run(c6())

  2. Suspending a with 62

  3. Suspending b

  4. Resuming b


  5. In [6]: asyncio.run(c6())

  6. Suspending a with 94

  7. Suspending b

  8. Resuming b

其实可以猜出来:每次都创建了一个新的loop。当然也得看源码确认一下(详见延伸阅读链接3):

  1. def run(main, *, debug=False):

  2. ...

  3. loop = events.new_event_loop()

  4. try:

  5. events.set_event_loop(loop)

  6. loop.set_debug(debug)

  7. return loop.run_until_complete(main)

  8. finally:

  9. ...

里面用 events . new_event_loop 创建了新的事件循环,所以拿不到之前的事件循环上的待执行任务。

感觉这类场景这个比较适合Web框架中使用。

后记

希望这些内容让你对asyncio和 asyncio . shield 有更深的了解。

延伸阅读

  1. https://github.com/python/cpython/blob/b9a0376b0dedf16a2f82fa43d851119d1f7a2707/Modules/_asynciomodule.c#L627-L632

  2. https://github.com/python/cpython/blob/0baa72f4b2e7185298d09cf64c7b591efcd22af0/Lib/asyncio/tasks.py#L690-L724

  3. https://github.com/python/cpython/blob/master/Lib/asyncio/runners.py#L39-L41








请到「今天看啥」查看全文