专栏名称: Python开发者
人生苦短,我用 Python。伯乐在线旗下账号「Python开发者」分享 Python 相关的技术文章、工具资源、精选课程、热点资讯等。
目录
相关文章推荐
新机器视觉  ·  总结90条写Python程序的建议 ·  4 天前  
Python爱好者社区  ·  终于迈过了5W这道坎! ·  6 天前  
Python爱好者社区  ·  Python爬虫实战.pdf ·  5 天前  
Python爱好者社区  ·  偷偷浏览小网站,被问候了。。。 ·  5 天前  
51好读  ›  专栏  ›  Python开发者

初探 Python 3 的异步 IO 编程

Python开发者  · 公众号  · Python  · 2017-09-14 20:00

正文

(点击上方蓝字,快速关注我们)


来源:keakon

www.keakon.net/2015/09/07/初探Python3的异步IO编程

如有好文章投稿,请点击 → 这里了解详情


小编注:本文写于2015年9月7日


上周终于把知乎日报的新版本做完了,于是趁着这几天的休息,有精力折腾一些感兴趣的玩意了。


虽然工作时并不会接触到 Python 3,但还是对它抱有不少好奇心,于是把 Python 版本更新到了 3.4,开始了折腾之旅。在各种更新中,我最感兴趣的当属 asyncio 模块了,所以就从异步 IO 开始探索吧。 


探索之前,先简单介绍下各种 IO 模型:


最容易做的是阻塞 IO,即读写数据时,需要等待操作完成,才能继续执行。进阶的做法就是用多线程来处理需要 IO 的部分,缺点是开销会有些大。


接着是非阻塞 IO,即读写数据时,如果暂时不可读写,则立刻返回,而不等待。因为不知道什么时候是可读写的,所以轮询时可能会浪费 CPU 时间。


然后是 IO 复用,即在读写数据前,先检查哪些描述符是可读写的,再去读写。select 和 poll 就是这样做的,它们会遍历所有被监视的描述符,查看是否满足,这个检查的过程是阻塞的。而 epoll、kqueue 和 /dev/poll 则做了些改进,事先注册需要检查哪些描述符的哪些事件,当状态发生变化时,内核会调用对应的回调函数,将这些描述符保存下来;下次获取可用的描述符时,直接返回这些发生变化的描述符即可。


再之后是信号驱动,即描述符就绪时,内核发送 SIGIO 信号,再由信号处理程序去处理这些信号即可。不过信号处理的时机是从内核态返回用户态时,感觉也得把这些事件收集起来才好处理,有点像模拟 IO 复用了。


最后是异步 IO,即读写数据时,只注册事件,内核完成读写后(读取的数据会复制到用户态),再调用事件处理函数。这整个过程都不会阻塞调用线程,不过实现它的操作系统比较少,Windows 上有比较成熟的 IOCP,Linux 上的 AIO 则有不少缺点。


虽然真正的异步 IO 需要中间任何步骤都没有阻塞,这对于某些只是偶尔需要处理 IO 请求的情况确实有用(比如文本编辑器偶尔保存一下文件);但对于服务器端编程的大多数情况而言,它的主线程就是用来处理 IO 请求的,如果在空闲时不阻塞在 IO 等待上,也没有别的事情能做,所以本文就不纠结这个异步是否名副其实了。


在 Python 2 的时代,高性能的网络编程主要是使用 Twisted、Tornado 和 gevent 这三个库。


我对 Twisted 不熟,只知道它的缺点是比较重,性能相对而言并不算好。

Tornado 平时用得比较多,缺点是写异步调用时特别麻烦。gevent 我只能算接触过,缺点是不太干净。


由于它们都各自有一个 IO loop,不好混用,而 Tornado 的 web 框架相对而言比较完善,因此成了我的首选。


而从 Python 3.4 开始,标准库里又新增了 asyncio 这个模块。


从原理上来说,它和 Tornado 其实差不多,都是注册 IO 事件,然后在 IO loop 中等待事件发生,然后调用相应的处理函数。


不同之处在于 Python 3 增加了一些新的特性,而 Tornado 需要兼容 Python 2,所以写起来会比较麻烦。


举例来说,Python 3.3 可以在 generator 中 return 返回值(相当于 raise StopIteration),而 Tornado 中需要 raise 一个 Return 对象。此外,Python 3.3 还增加了 yield from 语法,减轻了在 generator 中处理另一个 generator 的工作量(省去了循环和 try … except …)。


不过,虽然 asyncio 有那么多得天独厚的优势,却不一定比 Tornado 的性能更好,所以我写个简单的例子测试一下。


比较方法就是写个最简单的 HTTP 服务器,不做任何检查,读取到任何内容都输出一个 hello world,并断开连接。


测试的客户端就懒得写了,直接用 ab 即可:


ab -n 10000 -c 10 "http://0.0.0.0:8000/"


Tornado 版是这样:


from tornado.gen import coroutine

from tornado.ioloop import IOLoop

from tornado.tcpserver import TCPServer

 

class Server(TCPServer):

    @coroutine

    def handle_stream(self, stream, address):

        try:

            yield stream.read_bytes(1024, partial=True)

            yield stream.write(b'HTTP 1.0 200 OKrnrnhello world')

        finally:

            stream.close()

 

server = Server()

server.bind(8000)

server.start(1)

IOLoop.current().start()


在我的电脑上大概 4000 QPS。


asyncio 版是这样:


import asyncio

 

class Server(asyncio.Protocol):

    def connection_made(self, transport):

        self.transport = transport

 

    def data_received(self, data):

        try:

            self.transport.write(b'HTTP/1.1 200 OKrnrnhello world')

        finally:

            self.transport.close()

 

loop = asyncio.get_event_loop()

server = loop.create_server(Server, '', 8000)

loop.run_until_complete(server)

loop.run_forever()


在我的电脑上大概 3000 QPS,比 Tornado 版慢了一些。此外,asyncio 的 transport 在 write 时不用 yield from,这点可能有些不一致。


asyncio 还有个高级版的 API:


import asyncio

 

@asyncio.coroutine

def handle(reader, writer):

    yield from reader.read(1024)

    writer.write(b'HTTP/1.1 200 OKrnrnhello world')

    yield from writer.drain()

    writer.close()

 

loop = asyncio.get_event_loop()

task = asyncio.start_server(handle, '', 8000, loop=loop)

server = loop.run_until_complete(task)

loop.run_forever()


在我的电脑上大概 2200 QPS。这下读写都要 yield from 了,一致性上来说会好些。


以框架的性能而言,其实都够用,开销都不超过 1 毫秒,而 web 请求一般都需要 10 毫秒的以上的处理时间。


于是顺便再测一下和 MySQL 的搭配,即在每个请求内调用一下 SELECT 1,然后输出返回值。


因为自己懒得写客户端了,于是就用现成的 tornado_mysql 和 aiomysql 来测试了。原理应该都差不多,发送写请求后就返回,等收到可读事件时再获取内容。


Tornado 版是这样:


from tornado.gen import coroutine

from tornado.ioloop import IOLoop

from tornado.tcpserver import TCPServer

from tornado_mysql import pools

 

class Server(TCPServer):

    @coroutine

    def handle_stream(self, stream, address):

        try:

            yield stream.read_bytes(1024, partial=True)

            cursor = yield POOL.execute(b'SELECT 1')

            data = cursor.fetchone()

            yield stream.write('HTTP/1.1 200 OKrnrn{0[0]}'.format(data).encode())  # Python 3.5 的 bytes 才能用 % 格式化

        finally:

            stream.close()

 

POOL = pools.Pool(

    dict(host='127.0.0.1', port=3306, user='root', passwd='123', db='mysql'),

    max_idle_connections=10,

    max_open_connections=10)

 

server = Server()

server.bind(8000)

server.start(1)

IOLoop.current().start()


在我的电脑上大概 680 QPS。


asyncio 版是这样:


import asyncio

 

import aiomysql

 

class Server(asyncio.Protocol):

    def connection_made(self, transport):

        self.transport = transport

 

class Server(asyncio.Protocol):

    def connection_made(self, transport):

        self.transport = transport

 

    def data_received(self, data):

        @asyncio.coroutine

        def handle():

            with (yield from pool) as conn:

                cursor = yield from conn.cursor()

                yield from cursor.execute(b'SELECT 1')

                result = yield from cursor.fetchone()

            try:

                self.transport.write('HTTP/1.1 200 OKrnrn{0[0]}'.format(result).encode())

            finally:

                self.transport.close()

        loop.create_task(handle())  # 或者 asyncio.async(handle())

 

@asyncio.coroutine

def get_pool():

    return(yield from aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='123', loop=loop))

 

loop = asyncio.get_event_loop()

pool = loop.run_until_complete(get_pool())

 

server = loop.create_server(Server, '', 8000)

loop.run_until_complete(server)

loop.run_forever()


在我的电脑上大概 1250 QPS,比 Tornado 版快了不少。不过写起来比较蛋疼,因为 data_received 方法里不能直接用 yield from。


用 cProfile 看了下,Tornado 版在 tornado.gen 和 functools 模块里花了不少时间,可能是异步调用过多了吧。


但如果不做异步库的开发者,而只就使用者的体验而言,Tornado 会显得更加灵活和易用。不过 asyncio 的高级 API 应该也能提供类似的体验。


顺便再用底层 socket 模块写个服务器试试。


先用 poll 看看,错误处理什么的就先不做了:


from functools import partial

import select

import socket

 

class Server:

    def __init__(self):

        self._sock = socket.socket()

        self._poll = select.poll()

        self._handlers = {}

        self._fd_events = {}

 

    def start(self):

        sock = self._sock

        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        sock.setblocking(0)

        sock.bind(('', 8000))

        sock.listen(100)

 

        handlers = self._handlers

        poll = self._poll

        self.add_handler(sock.fileno(), self._accept, select.POLLIN)

 

        while True:

            poll_events = poll.poll(1)

            for fd, event in poll_events:

                handler = handlers.get(fd)

                if handler:

                    handler()

 

    def _accept(self):

        for i in range(100):

            try:

                conn, address = self._sock.accept()

            except OSError:

                break

            else:

                conn.setblocking(0)

                fd = conn.fileno()

                self.add_handler(fd, partial(self._read, conn), select.POLLIN)

 

    def _read(self, conn):

        fd = conn.fileno()

        self.remove_handler(fd)

        try:

            conn.recv(1024)

        except:

            conn.close()

            raise

        else:

            self.add_handler(fd, partial(self._write, conn), select.POLLOUT)

 

    def _write(self, conn):

        fd = conn.fileno()

        self.remove_handler(fd)

        try:

            conn.send(b'HTTP 1.0 200 OKrnrnhello world')

        finally:

            conn.close()

 

    def add_handler(self, fd, handler, event):

        self._handlers[fd] = handler

        self.register(fd, event)

 

    def remove_handler(self, fd):

        self._handlers.pop(fd, None)

        self.unregister(fd)

 

    def register(self, fd, event):

        if fd in self._fd_events:

            raise IOError("fd %s already registered" % fd)

        self._poll.register(fd, event)

        self._fd_events[fd] = event

 

    def unregister(self, fd):

        event = self._fd_events.pop(fd, None)

        if event is not None:

            self._poll.unregister(fd)

 

Server().start()


在我的电脑上大概 7700 QPS,优势巨大。


再用 kqueue 试试(我用的是 OS X):


from functools import partial

import select

import socket

 

class Server:

    def __init__(self):

        self._sock = socket.socket()

        self._kqueue = select.kqueue()

        self._handlers = {}

        self._fd_events = {}

 

    def start(self):

        sock = self._sock

        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        sock.setblocking(0)

        sock.bind(('', 8000))

        sock.listen(100)

 

        self.add_handler(sock.fileno(), self._accept, select.KQ_FILTER_READ)

        handlers = self._handlers

 

        while True:

            kevents = self._kqueue.control(None, 1000, 1)

            for kevent in kevents:

                fd = kevent.ident

                handler = handlers.get(fd)

                if handler:

                    handler()

 

    def _accept(self):

        for i in range(100):

            try:

                conn, address = self._sock.accept()

            except OSError:

                break

            else:

                conn.setblocking(0)

                fd = conn.fileno()

                self.add_handler(fd, partial(self._read, conn), select.KQ_FILTER_READ)

 

    def _read(self, conn):

        fd = conn.fileno()

        self.remove_handler(fd)

        try:

            conn.recv(1024)

        except:

            conn.close()

            raise

        else:

            self.add_handler(fd, partial(self._write, conn), select.KQ_FILTER_WRITE)

 

    def _write(self, conn):

        fd = conn.fileno()

        self.remove_handler(fd)

        try:

            conn.send(b'HTTP 1.0 200 OKrnrnhello world')

        finally:

            conn.close()

 

    def add_handler(self, fd, handler, event):

        self._handlers[fd] = handler

        self.register(fd, event)

 

    def remove_handler(self, fd):

        self._handlers.pop(fd, None)

        self.unregister(fd)

 

    def register(self, fd, event):

        if fd in self._fd_events:

            raise IOError("fd %s already registered" % fd)

        self._control(fd, event, select.KQ_EV_ADD)

        self._fd_events[fd] = event

 

    def unregister(self, fd):

        event = self._fd_events.pop(fd, None)

        if event is not None:

            self._control(fd, event, select.KQ_EV_DELETE)

 

    def _control(self, fd, event, flags):

        change_list = (select.kevent(fd, event, flags),)

        self._kqueue.control(change_list, 0)

 

Server().start()


在我的电脑上大概 7200 QPS,比 poll 版稍慢。不过因为只有 10 个并发连接,而且没有慢速网络的影响,所以 poll 的性能好并不奇怪。


再试试 Python 3.4 新增的 selectors 模块,它的 DefaultSelector 会自动选择所在平台最高效的实现,asyncio 就用到了这个模块。


import selectors

import socket

 

class Server:

    def __init__(self):

        self._sock = socket.socket()

        self._selector = selectors.DefaultSelector()

 

    def start(self):

        sock = self._sock

        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        sock.setblocking(0)

        sock.bind(('', 8000))

        sock.listen(100)

 

        selector = self._selector

        self.add_handler(sock.fileno(), self._accept, selectors.EVENT_READ)

 

        while True:

            events = selector.select(1)

            for key, event in events:

                handler, data = key.data

                if data:

                    handler(**data)

                else:

                    handler()

 

    def _accept(self):

        for i in range(100):

            try:

                conn, address = self._sock.accept()

            except OSError:

                break

            else:

                conn.setblocking(0)

                fd = conn.fileno()

                self.add_handler(fd, self._read, selectors.EVENT_READ, {'conn': conn})

 

    def _read(self, conn):

        fd = conn.fileno()

        self.remove_handler(fd)

        try:

            conn.recv(1024)

        except:

            conn.close()

            raise

        else:

            self.add_handler(fd, self._write, selectors.EVENT_WRITE, {'conn': conn})

 

    def _write(self, conn):

        fd = conn.fileno()

        self.remove_handler(fd)

        try:

            conn.send(b'HTTP 1.0 200 OKrnrnhello world')

        finally:

            conn.close()

 

    def add_handler(self, fd, handler, event, data=None):

        self._selector.register(fd, event, (handler, data))

 

    def remove_handler(self, fd):

        self._selector.unregister(fd)

 

Server().start()


在我的电脑上大概 6100 QPS,成绩也还不错。


从这些测试来看,如果想自己实现一个舍弃了一些功能和兼容性的 Tornado,应该能比它稍快一点,不过似乎没多大必要。


所以暂时不纠结性能了,还是从使用的便利性上来考虑。Tornado 可以用 yield 取代 callback,我们也来实现这个 feature。


实现前先得了解下 yield。


当一个函数内部出现了 yield 语句时,它就不再是一个单纯的函数了,而是一个生成器函数,调用它并不会执行它的代码,而是返回一个生成器。


调用这个生成器的 send 方法时,才会执行内部的代码。当执行到 yield 时,这个 send 方法就返回了,调用者可以得到其返回值。


send 方法在第一次调用时,参数必须为 None。Python 2 中可以用它的 next 方法,Python 3 中改成了 __next__ 方法,还可以用内置的 next 函数来调用。

send 方法可以被多次调用,参数会作为 yield 的返回值,回到生成器内上一次执行的地方,并继续执行下去。


当生成器的代码执行完时,会抛出一个 StopIteration 的异常。Python 3.3 开始可以在生成器里使用 return,返回值可以从 StopIteration 异常的 value 属性获取。


for … in … 循环会自动捕获 StopIteration 异常,并作为循环停止的条件。


由此可见,yield 可以用于跳转。而我们要做的,则是在遇到 IO 请求时,用 yield 返回 IO loop;当事件发生时,找到对应的生成器,用 send 方法继续执行即可。


为了简单起见,我就在 poll 版的基础上进行改造了:



在我的电脑上大概 5300 QPS。


虽然成绩比较尴尬,但毕竟用起来比前一个版本好多了。至于慢的原因,我估计是自己维护了一个堆栈的原因(也可能是有什么 bug,毕竟写这个感觉太跳跃了,能运行起来就谢天谢地了)。

实现时做了两点假设:


  1. handler 为 generator 时,视为异步方法。

  2. 在异步方法中 yield None 时,视为等待 IO;yield / yield from 异步方法时,则是等待方法返回。


实现细节也没什么好说的了,只是觉得在实现 Stream 的 read / write 方法时,调用 IOLoop.add_handler 方法不太优雅。其实可以直接 yield 一个 fd 和 event,在 IOLoop.start 方法中再去注册。不过这个重构其实蛮小的,我就不再贴一次代码了,感兴趣的可以自己试试。


于是这次初探就到此为止了,有空我也许会继续完善它。至少这次探索,让我觉得 Python 3 还是蛮有意思的。


看完本文有收获?请转发分享给更多人

关注「Python开发者」,提升Python技能