专栏名称: 马哥Linux运维
马哥linux致力于linux运维培训,连续多年排名第一,订阅者可免费获得学习机会和相关Linux独家实战资料!
目录
相关文章推荐
运维  ·  再见,CDN 巨头:Akamai 宣布 ... ·  3 天前  
51好读  ›  专栏  ›  马哥Linux运维

Tornado 源码阅读:初步认识

马哥Linux运维  · 公众号  · 运维  · 2018-08-23 18:00

正文

作者:国夫君

来源:见文末

ioloop


`ioloop` `tornado` 的核心模块 , 也是个调度模块 , 各种异步事件都是由他调度的 , 所以必须弄清他的执行逻辑

源码分析


而`ioloop`的核心部分则是 `while True`这个循环内部的逻辑,贴上他的代码下


def start ( self ) :

if self . _running :

raise RuntimeError ( "IOLoop is already running" )

self . _setup_logging ()

if self . _stopped :

self . _stopped = False

return

old_current = getattr ( IOLoop . _current , "instance" , None )

IOLoop . _current . instance = self

self . _thread_ident = thread . get_ident ()

self . _running = True

old_wakeup_fd = None

if hasattr ( signal , 'set_wakeup_fd' ) and os . name == 'posix' :

try :

old_wakeup_fd = signal . set_wakeup_fd ( self . _waker . write_fileno ())

if old_wakeup_fd != - 1 :

signal . set_wakeup_fd ( old_wakeup_fd )

old_wakeup_fd = None

except ValueError :

old_wakeup_fd = None

try :

while True :

with self . _callback_lock :

callbacks = self . _callbacks

self . _callbacks = []

due_timeouts = []

if self . _timeouts :

now = self . time ()

while self . _timeouts :

if self . _timeouts [ 0 ]. callback is None :

heapq . heappop ( self . _timeouts )

self . _cancellations -= 1

elif self . _timeouts [ 0 ]. deadline <= now :

due_timeouts . append ( heapq . heappop ( self . _timeouts ))

else :

break

if ( self . _cancellations > 512

and self . _cancellations > ( len ( self . _timeouts ) >> 1 )) :

self . _cancellations = 0

self . _timeouts = [ x for x in self . _timeouts

if x . callback is not None ]

heapq . heapify ( self . _timeouts )

for callback in callbacks :

self . _run_callback ( callback )

for timeout in due_timeouts :

if timeout . callback is not None :

self . _run_callback ( timeout . callback )

callbacks = callback = due_timeouts = timeout = None

if self . _callbacks :

poll_timeout = 0.0

elif self . _timeouts :

poll_timeout = self . _timeouts [ 0 ]. deadline - self . time ()

poll_timeout = max ( 0 , min ( poll_timeout , _POLL_TIMEOUT ))

else :

poll_timeout = _POLL_TIMEOUT

if not self . _running :

break

if self . _blocking_signal_threshold is not None :

signal . setitimer ( signal . ITIMER_REAL , 0 , 0 )

try :

event_pairs = self . _impl . poll ( poll_timeout )

except Exception as e :

if errno_from_exception ( e ) == errno . EINTR :

continue

else :

raise

if self . _blocking_signal_threshold is not None :

signal . setitimer ( signal . ITIMER_REAL ,

self . _blocking_signal_threshold , 0 )

self . _events . update ( event_pairs )

while self . _events :

fd , events = self . _events . popitem ()

try :

fd_obj , handler_func = self . _handlers [ fd ]

handler_func ( fd_obj , events )

except ( OSError , IOError ) as e :

if errno_from_exception ( e ) == errno . EPIPE :

pass

else :

self . handle_callback_exception ( self . _handlers . get ( fd ))

except Exception :

self . handle_callback_exception ( self . _handlers . get ( fd ))

fd_obj = handler_func = None

finally :

self . _stopped = False

if self . _blocking_signal_threshold is not None :

signal . setitimer ( signal . ITIMER_REAL , 0 , 0 )

IOLoop . _current . instance = old_current

if old_wakeup_fd is not None :

signal . set_wakeup_fd ( old_wakeup_fd )


除去注释,代码其实没多少行. 由while 内部代码可以看出ioloop主要由三部分组成:


1.回调 callbacks


他是ioloop回调的基础部分,通过IOLoop.instance().add_callback()添加到self._callbacks,他们将在每一次loop中被运行.


主要用途是将逻辑分块,在适合时机将包装好的callback添加到self._callbacks让其执行.


例如ioloop中的add_future


def add_future ( self , future , callback ) :

"""Schedules a callback on the ``IOLoop`` when the given

`.Future` is finished.

The callback is invoked with one argument, the

`.Future`.

"""

assert is_future ( future )

callback = stack_context . wrap ( callback )

future . add_done_callback (

lambda future : self . add_callback ( callback , future ))


future对象得到result的时候会调用future.add_done_callback添加的callback,再将其转至ioloop执行


2.定时器 due_timeouts


这是定时器,在指定的事件执行callback.


跟1中的callback类似,通过IOLoop.instance().add_callback在每一次循环,会计算timeouts回调列表里的事件,运行已到期的callback.


当然不是无节操的循环.


因为poll操作会阻塞到有io操作发生,所以只要计算最近的timeout, 然后用这个时间作为self._impl.poll(poll_timeout) 的 poll_timeout ,就可以达到按时运行了


但是,假设poll_timeout的时间很大时,self._impl.poll一直在堵塞中(没有io事件,但在处理某一个io事件), 那添加刚才1中的callback不是要等很久才会被运行吗? 答案当然是不会.


ioloop中有个waker对象,他是由两个fd组成,一个读一个写.


ioloop在初始化的时候把waker绑定到epoll里了,add_callback时会触发waker的读写.


这样ioloop就会在poll中被唤醒了,接着就可以及时处理timeout callback了


用这样的方式也可以自己封装一个小的定时器功能玩玩


3.io事件的event loop


处理epoll事件的功能

通过IOLoop.instance().add_handler(fd, handler, events)绑定fd event的处理事件

在httpserver.listen的代码内,

netutil.py中的netutil.py的add_accept_handler绑定accept handler处理客户端接入的逻辑


如法炮制,其他的io事件也这样绑定,业务逻辑的分块交由ioloop的callback和future处理


关于epoll的用法的内容.详情见我第一篇文章吧,哈哈


总结


ioloop由callback(业务分块), timeout callback(定时任务) io event(io传输和解析) 三块组成,互相配合完成异步的功能,构建gen,httpclient,iostream等功能。


串联大致的流程是,tornado 绑定io event,处理io传输解析,传输完成后(结合Future)回调(callback)业务处理的逻辑和一些固定操作 . 定时器则是较为独立的模块


Futrue


个人认为Future是tornado仅此ioloop重要的模块,他贯穿全文,所有异步操作都有他的身影。顾名思义,他主要是关注日后要做的事,类似jquery的Deferred吧


一般的用法是通过ioloop的add_future定义future的done callback, 当future被set_result的时候,future的done callback就会被调用. 从而完成Future的功能.


具体可以参考gen.coroutine的实现,本文后面也会讲到


他的组成不复杂,只有几个重要的方法,最重要的是 add_done_callback , set_result


tornado用Future和ioloop,yield实现了gen.coroutine


1. add_done_callback


跟ioloop的callback类似 , 存储事件完成后的callback在self._callbacks里


def add_done_callback ( self , fn ) :

if self . _done :

fn ( self )

else :

self . _callbacks . append ( fn )


2.set_result


设置事件的结果,并运行之前存储好的callback


def set_result ( self , result ) :

self . _result = result

self . _set_done ()

def _set_done ( self ) :

self . _done = True

for cb in self . _callbacks :

try :

cb ( self )

except Exception :

app_log . exception ( 'Exception in callback %r for %r' ,

cb , self )

self . _callbacks = None


为了验证之前所说的,上一段测试代码


#! /usr/bin/env python

#coding=utf-8

import tornado . web

import tornado . ioloop

from tornado . gen import coroutine

from tornado . concurrent import Future

def test () :

def pp ( s ) :

print s

future = Future ()

iol = tornado . ioloop . IOLoop . instance ()

print 'init future %s' % future

iol . add_future ( future , lambda f : pp ( 'ioloop callback after future done,future is %s' % f ))

#模拟io延迟操作

iol . add_timeout ( iol . time () + 5 , lambda : future . set_result ( 'set future is done' ))

print 'init complete'

tornado . ioloop . IOLoop . instance (). start ()

if __name__ == "__main__" :

test ()


运行结果:



gen.coroutine


接着继续延伸,看看coroutine的实现。


gen.coroutine实现的功能其实是将原来的callback的写法,用yield的写法代替. 即以yield为分界,将代码分成两部分.


如:


#! /usr/bin/env python

#coding=utf-8

import tornado . ioloop

from tornado . gen import coroutine

from tornado . httpclient import AsyncHTTPClient

@ coroutine

def cotest () :

client = AsyncHTTPClient ()

res = yield client . fetch ( "http://www.segmentfault.com/" )

print res

if __name__ == "__main__" :

f = cotest ()

print f #这里返回了一个future哦

tornado . ioloop . IOLoop . instance (). start ()


运行结果:



源码分析


接下来分析下coroutine的实现


def _make_coroutine_wrapper ( func , replace_callback ) :

@ functools . wraps ( func )

def wrapper ( * args , ** kwargs ) :

future = TracebackFuture ()

if replace_callback and 'callback' in kwargs :

callback = kwargs . pop ( 'callback' )

IOLoop . current (). add_future (

future , lambda future : callback ( future . result ()))

try :

result = func ( * args , ** kwargs )

except ( Return , StopIteration ) as e :

result = getattr ( e , 'value' , None )

except Exception :

future . set_exc_info ( sys . exc_info ())

return future

else :

if isinstance ( result , types . GeneratorType ) :

try :

orig_stack_contexts = stack_context . _state . contexts

yielded = next ( result )

if stack_context . _state . contexts is not orig_stack_contexts :

yielded = TracebackFuture ()

yielded . set_exception (

stack_context . StackContextInconsistentError (

'stack_context inconsistency (probably caused '

'by yield within a "with StackContext" block)' ))

except ( StopIteration , Return ) as e :

future . set_result ( getattr







请到「今天看啥」查看全文