专栏名称: Python之美
《Python web开发实战》作者的公众号。发现Python之美,主要包含Web开发、Python进阶、架构设计、Python开发招聘信息等方面内容
目录
相关文章推荐
51好读  ›  专栏  ›  Python之美

深入理解asyncio(一)

Python之美  · 公众号  · Python  · 2019-05-23 19:47

正文

请到「今天看啥」查看全文


前言

这几天看asyncio相关的pycon视频又重温了asyncio 的官方文档,收获很多。之前asyncio被吐槽的一点就是文档写的不好,Python 3.7 时 asyncio 的官方文档被 Andrew Svetlov 以及 Yury Selivanov 等核心开发者重写了,新的版本我觉得已经好很多了。借着这篇笔记记录一下我对asyncio的一些理解。

核心概念

asyncio里面主要有4个需要关注的基本概念

Eventloop

Eventloop可以说是asyncio应用的核心,是中央总控。Eventloop实例提供了注册、取消和执行任务和回调的方法。

把一些异步函数(就是任务,Task,一会就会说到)注册到这个事件循环上,事件循环会循环执行这些函数(但同时只能执行一个),当执行到某个函数时,如果它正在等待I/O返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同(Cooperative)运行:这就是事件循环的目标。

Coroutine

协程(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程:

  1. cat coro1.py

  2. import asyncio



  3. async def a():

  4. print('Suspending a')

  5. await asyncio.sleep(0)

  6. print('Resuming a')



  7. async def b():

  8. print('In b')



  9. async def main():

  10. await asyncio.gather(a(), b())



  11. if __name__ == '__main__':

  12. asyncio .run(main())

这里面有4个重要关键点:

  1. 协程要用 async def 声明,Python 3.5时的装饰器写法已经过时,我就不列出来了。

  2. asyncio.gather用来并发运行任务,在这里表示协同的执行a和b2个协程

  3. 在协程a中,有一句 await asyncio . sleep ( 0 ) ,await表示调用协程,sleep 0并不会真的sleep(因为时间为0),但是却可以把控制权交出去了。

  4. asyncio.run是Python 3.7新加的接口,要不然你得这么写:

  1. loop = asyncio.get_event_loop()

  2. loop.run_until_complete(main())

  3. loop.close()

好了,我们先运行一下看看:

  1. python coro1.py

  2. Suspending a

  3. In b

  4. Resuming a

看到了吧,在并发执行中,协程a被挂起又恢复过。

Future

接着说Future,它代表了一个「未来」对象,异步操作结束后会把最终结果设置到这个Future对象上。Future是对协程的封装,不过日常开发基本是不需要直接用这个底层Future类的。我在这里只是演示一下:

  1. In : def c():

  2. ...: print('Inner C')

  3. ...: return 12

  4. ...:


  5. In : future = loop.run_in_executor(None, c) # 这里没用await,None 表示默认的 executor

  6. Inner C


  7. In : future # 虽然c已经执行了,但是状态还是 pending。

  8. Out: <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]>


  9. In : future.done() # 还没有完成

  10. Out: False


  11. In : for a in dir(future):

  12. ...: if not a.startswith('_'):

  13. ...: print(a)

  14. ...:

  15. add_done_callback

  16. cancel

  17. cancelled

  18. done

  19. exception

  20. get_loop

  21. remove_done_callback

  22. result

  23. set_exception

  24. set_result

可以对这个Future实例添加完成后的回调(add_done_callback)、取消任务(cancel)、设置最终结果(set_result)、设置异常(如果有的话,set_exception)等。现在我们让Future完成:

  1. In : await future

  2. Out: 12


  3. In : future

  4. Out: <Future finished result=12>


  5. In : future.done()

  6. Out: True


  7. In : future.result()

  8. Out: 12

看到了吧,await之后状态成了finished。这里顺便说一下,一个对象怎么样就可以被await(或者说怎么样就成了一个awaitable对象)呢?给类实现一个__await__方法,Python版本的Future的实现大概如下:

  1. def __await_(self):

  2. if not self.done():

  3. self._asyncio_future_blocking = True

  4. yield self

  5. if not self.done():

  6. raise RuntimeError("await wasn't used with future" )

  7. return self.result() # May raise too.

这样就可以 await future 了,那为什么 await future 后Future的状态就能改变呢,这是因为用 loop . run_in_executor 创建的Future注册了一个回调(通过 asyncio . futures . wrap_future ,加了一个 _call_set_state 回调, 有兴趣的可以通过延伸阅读链接2找上下文)。

__await__ 里面的 yield self 不要奇怪,主要是为了兼容 __iter__ ,给旧的 yield from 用:

  1. In : future = loop.run_in_executor(None, c)

  2. Inner C


  3. In : future

  4. Out: <Future pending cb=[_chain_future .<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]>


  5. In : def spam():

  6. ...: yield from future

  7. ...:


  8. In : s = spam()


  9. In : next(s)

  10. Out: <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.7/asyncio/futures.py:348]>

新的替代 yield from 的用法await必须在异步函数(用 async def申明)中使用:

  1. In : def spam():

  2. ...: await future

  3. ...:

  4. File "cell_name", line 5

  5. SyntaxError: 'await' outside async function

Task

Eventloop除了支持协程,还支持注册Future和Task2种类型的对象,那为什么要存在Future和Task这2种类型呢?

先回忆前面的例子,Future是协程的封装,Future对象提供了很多任务方法(如完成后的回调、取消、设置任务结果等等),但是开发者并不需要直接操作Future这种底层对象,而是用Future的子类Task协同的调度协程以实现并发。

Task非常容易创建和使用:

  1. # 或者用task = loop.create_task(a())

  2. In : task = asyncio.ensure_future(a())


  3. In : task

  4. Out: <Task pending coro=<a() running at /Users/dongwm/mp/2019-05-22/coro1.py:4>>


  5. In : task.done()

  6. Out: False


  7. In : await task

  8. Suspending a

  9. Resuming a


  10. In : task

  11. Out: <Task finished coro=<a() done, defined at /Users/dongwm/mp/2019-05-22/coro1.py:4> result=None>


  12. In : task.done()

  13. Out: True

asyncio并发的正确/错误姿势

在代码中使用async/await是不是就能发挥asyncio的并发优势么,其实是不对的,我们先看个例子:

  1. async def a():

  2. print('Suspending a')

  3. await asyncio.sleep(3)

  4. print('Resuming a')



  5. async def b():

  6. print('Suspending b')

  7. await asyncio.sleep(1)

  8. print('Resuming b')



  9. async def s1():

  10. await a()

  11. await b()

有2个协程a和b,分别sleep1秒和3秒,如果协程可以并发执行,那么执行时间应该是sleep最大的那个值(3秒),现在它们都在s1协程里面被调用。大家先猜一下s1会运行几秒?

我们写个小程序验证一下:

  1. def show_perf(func):

  2. print('*' * 20)

  3. start = time.perf_counter()

  4. asyncio.run(func())

  5. print(f'{func.__name__} Cost: {time.perf_counter() - start}')

大家注意我这个时间计数用的方法,没有用time.time,而是用了Python 3.3新增的time.perf_counter它是现在推荐的用法。我们在IPython里面验证下:

  1. In : from coro2 import *


  2. In : show_perf(s1)

  3. ********************

  4. Suspending a

  5. Resuming a

  6. Suspending b

  7. Resuming b

  8. s1 Cost: 4.009796932999961

看到了吧,4秒!!!,相当于串行的执行了(sleep 3 + 1)。这是错误的用法,应该怎么用呢,前面的asyncio.gather就可以:

  1. async def c1():

  2. await asyncio.gather(a(), b())


  3. In : show_perf( c1)

  4. ********************

  5. Suspending a

  6. Suspending b

  7. Resuming b

  8. Resuming a

  9. c1 Cost: 3.002452698999832

看到了吧,3秒!另外一个是asyncio.wait:

  1. async def c2():

  2. await asyncio.wait([a(), b()])


  3. In : show_perf(c2)

  4. ...

  5. c2 Cost: 3.0066957049998564

同样是3秒。先别着急,gather和wait下篇文章还会继续对比。还有一个方案就是用asyncio.create_task:

  1. async def c3():

  2. task1 = asyncio.create_task(a())

  3. task2 = asyncio .create_task(b())

  4. await task1

  5. await task2



  6. async def c4():

  7. task = asyncio.create_task(b())

  8. await a()

  9. await task


  10. In : show_perf(c3)

  11. ...

  12. c3 Cost: 3.002332438999929


  13. In : show_perf(c4)

  14. ...

  15. c4 Cost: 3.002270970000154

都是3秒。asyncio.create_task相当于把协程封装成Task。不过大家要注意一个错误的用法:

  1. async def s2():

  2. await asyncio.create_task(a())

  3. await asyncio .create_task(b())


  4. In : show_perf(s2)

  5. ...

  6. s2 Cost: 4.004671427999938

直接await task不会对并发有帮助* 。asyncio.create task是Python 3.7新增的高阶API, 是推荐的用法 ,其实你还可以用asyncio.ensure_future和loop.create task:

  1. async def c5():

  2. task = asyncio.ensure_future(b())

  3. await a()

  4. await task



  5. async def c6():

  6. loop = asyncio.get_event_loop()

  7. task = loop.create_task(b())

  8. await a()

  9. await task


  10. In : show_perf(c5)

  11. ...

  12. c5 Cost: 3.0033873750003295


  13. In : show_perf(c6)

  14. ...

  15. c6 Cost: 3.006120122000084

到这里,我们一共看到2种错误的,6种正确的写法。你学到了么?

代码目录

本文代码可以在mp项目找到

延伸阅读

  1. https://www.python.org/dev/peps/pep-0492/

  2. https://github.com/python/cpython/blob/3.7/Lib/asyncio/futures.py#L365








请到「今天看啥」查看全文