马哥linux致力于linux运维培训,连续多年排名第一,订阅者可免费获得学习机会和相关Linux独家实战资料! |
|
运维 · 再见,CDN 巨头:Akamai 宣布 ... · 3 天前 |
作者:国夫君
来源:见文末
ioloop
`ioloop` 是 `tornado` 的核心模块 , 也是个调度模块 , 各种异步事件都是由他调度的 , 所以必须弄清他的执行逻辑
源码分析
而`ioloop`的核心部分则是 `while True`这个循环内部的逻辑,贴上他的代码下
|
def start ( self ) :
if self . _running :
raise RuntimeError ( "IOLoop is already running" )
self . _setup_logging ()
if self . _stopped :
self . _stopped = False
return
old_current = getattr ( IOLoop . _current , "instance" , None )
IOLoop . _current . instance = self
self . _thread_ident = thread . get_ident ()
self . _running = True
old_wakeup_fd = None
if hasattr ( signal , 'set_wakeup_fd' ) and os . name == 'posix' :
try :
old_wakeup_fd = signal . set_wakeup_fd ( self . _waker . write_fileno ())
if old_wakeup_fd != - 1 :
signal . set_wakeup_fd ( old_wakeup_fd )
old_wakeup_fd = None
except ValueError :
old_wakeup_fd = None
try :
while True :
with self . _callback_lock :
callbacks = self . _callbacks
self . _callbacks = []
due_timeouts = []
if self . _timeouts :
now = self . time ()
while self . _timeouts :
if self . _timeouts [ 0 ]. callback is None :
heapq . heappop ( self . _timeouts )
self . _cancellations -= 1
elif self . _timeouts [ 0 ]. deadline <= now :
due_timeouts . append ( heapq . heappop ( self . _timeouts ))
else :
break
if ( self . _cancellations > 512
and self . _cancellations > ( len ( self . _timeouts ) >> 1 )) :
self . _cancellations = 0
self . _timeouts = [ x for x in self . _timeouts
if x . callback is not None ]
heapq . heapify ( self . _timeouts )
for callback in callbacks :
self . _run_callback ( callback )
for timeout in due_timeouts :
if timeout . callback is not None :
self . _run_callback ( timeout . callback )
callbacks = callback = due_timeouts = timeout = None
if self . _callbacks :
poll_timeout = 0.0
elif self . _timeouts :
poll_timeout = self . _timeouts [ 0 ]. deadline - self . time ()
poll_timeout = max ( 0 , min ( poll_timeout , _POLL_TIMEOUT ))
else :
poll_timeout = _POLL_TIMEOUT
if not self . _running :
break
if self . _blocking_signal_threshold is not None :
signal . setitimer ( signal . ITIMER_REAL , 0 , 0 )
try :
event_pairs = self . _impl . poll ( poll_timeout )
except Exception as e :
if errno_from_exception ( e ) == errno . EINTR :
continue
else :
raise
if self . _blocking_signal_threshold is not None :
signal . setitimer ( signal . ITIMER_REAL ,
self . _blocking_signal_threshold , 0 )
self . _events . update ( event_pairs )
while self . _events :
fd , events = self . _events . popitem ()
try :
fd_obj , handler_func = self . _handlers [ fd ]
handler_func ( fd_obj , events )
except ( OSError , IOError ) as e :
if errno_from_exception ( e ) == errno . EPIPE :
pass
else :
self . handle_callback_exception ( self . _handlers . get ( fd ))
except Exception :
self . handle_callback_exception ( self . _handlers . get ( fd ))
fd_obj = handler_func = None
finally :
self . _stopped = False
if self . _blocking_signal_threshold is not None :
signal . setitimer ( signal . ITIMER_REAL , 0 , 0 )
IOLoop . _current . instance = old_current
if old_wakeup_fd is not None :
signal . set_wakeup_fd ( old_wakeup_fd )
除去注释,代码其实没多少行. 由while 内部代码可以看出ioloop主要由三部分组成:
1.回调 callbacks
他是ioloop回调的基础部分,通过IOLoop.instance().add_callback()添加到self._callbacks,他们将在每一次loop中被运行.
主要用途是将逻辑分块,在适合时机将包装好的callback添加到self._callbacks让其执行.
例如ioloop中的add_future
def add_future ( self , future , callback ) :
"""Schedules a callback on the ``IOLoop`` when the given
`.Future` is finished.
The callback is invoked with one argument, the
`.Future`.
"""
assert is_future ( future )
callback = stack_context . wrap ( callback )
future . add_done_callback (
lambda future : self . add_callback ( callback , future ))
future对象得到result的时候会调用future.add_done_callback添加的callback,再将其转至ioloop执行
2.定时器 due_timeouts
这是定时器,在指定的事件执行callback.
跟1中的callback类似,通过IOLoop.instance().add_callback在每一次循环,会计算timeouts回调列表里的事件,运行已到期的callback.
当然不是无节操的循环.
因为poll操作会阻塞到有io操作发生,所以只要计算最近的timeout, 然后用这个时间作为self._impl.poll(poll_timeout) 的 poll_timeout ,就可以达到按时运行了
但是,假设poll_timeout的时间很大时,self._impl.poll一直在堵塞中(没有io事件,但在处理某一个io事件), 那添加刚才1中的callback不是要等很久才会被运行吗? 答案当然是不会.
ioloop中有个waker对象,他是由两个fd组成,一个读一个写.
ioloop在初始化的时候把waker绑定到epoll里了,add_callback时会触发waker的读写.
这样ioloop就会在poll中被唤醒了,接着就可以及时处理timeout callback了
用这样的方式也可以自己封装一个小的定时器功能玩玩
3.io事件的event loop
处理epoll事件的功能
通过IOLoop.instance().add_handler(fd, handler, events)绑定fd event的处理事件
在httpserver.listen的代码内,
netutil.py中的netutil.py的add_accept_handler绑定accept handler处理客户端接入的逻辑
如法炮制,其他的io事件也这样绑定,业务逻辑的分块交由ioloop的callback和future处理
关于epoll的用法的内容.详情见我第一篇文章吧,哈哈
总结
ioloop由callback(业务分块), timeout callback(定时任务) io event(io传输和解析) 三块组成,互相配合完成异步的功能,构建gen,httpclient,iostream等功能。
串联大致的流程是,tornado 绑定io event,处理io传输解析,传输完成后(结合Future)回调(callback)业务处理的逻辑和一些固定操作 . 定时器则是较为独立的模块
Futrue
个人认为Future是tornado仅此ioloop重要的模块,他贯穿全文,所有异步操作都有他的身影。顾名思义,他主要是关注日后要做的事,类似jquery的Deferred吧
一般的用法是通过ioloop的add_future定义future的done callback, 当future被set_result的时候,future的done callback就会被调用. 从而完成Future的功能.
具体可以参考gen.coroutine的实现,本文后面也会讲到
他的组成不复杂,只有几个重要的方法,最重要的是 add_done_callback , set_result
tornado用Future和ioloop,yield实现了gen.coroutine
1. add_done_callback
跟ioloop的callback类似 , 存储事件完成后的callback在self._callbacks里
def add_done_callback ( self , fn ) :
if self . _done :
fn ( self )
else :
self . _callbacks . append ( fn )
2.set_result
设置事件的结果,并运行之前存储好的callback
def set_result ( self , result ) :
self . _result = result
self . _set_done ()
def _set_done ( self ) :
self . _done = True
for cb in self . _callbacks :
try :
cb ( self )
except Exception :
app_log . exception ( 'Exception in callback %r for %r' ,
cb , self )
self . _callbacks = None
为了验证之前所说的,上一段测试代码
#! /usr/bin/env python
#coding=utf-8
import tornado . web
import tornado . ioloop
from tornado . gen import coroutine
from tornado . concurrent import Future
def test () :
def pp ( s ) :
print s
future = Future ()
iol = tornado . ioloop . IOLoop . instance ()
print 'init future %s' % future
iol . add_future ( future , lambda f : pp ( 'ioloop callback after future done,future is %s' % f ))
#模拟io延迟操作
iol . add_timeout ( iol . time () + 5 , lambda : future . set_result ( 'set future is done' ))
print 'init complete'
tornado . ioloop . IOLoop . instance (). start ()
if __name__ == "__main__" :
test ()
运行结果:
gen.coroutine
接着继续延伸,看看coroutine的实现。
gen.coroutine实现的功能其实是将原来的callback的写法,用yield的写法代替. 即以yield为分界,将代码分成两部分.
如:
#! /usr/bin/env python
#coding=utf-8
import tornado . ioloop
from tornado . gen import coroutine
from tornado . httpclient import AsyncHTTPClient
@ coroutine
def cotest () :
client = AsyncHTTPClient ()
res = yield client . fetch ( "http://www.segmentfault.com/" )
print res
if __name__ == "__main__" :
f = cotest ()
print f #这里返回了一个future哦
tornado . ioloop . IOLoop . instance (). start ()
运行结果:
源码分析
接下来分析下coroutine的实现
def _make_coroutine_wrapper ( func , replace_callback ) :
@ functools . wraps ( func )
def wrapper ( * args , ** kwargs ) :
future = TracebackFuture ()
if replace_callback and 'callback' in kwargs :
callback = kwargs . pop ( 'callback' )
IOLoop . current (). add_future (
future , lambda future : callback ( future . result ()))
try :
result = func ( * args , ** kwargs )
except ( Return , StopIteration ) as e :
result = getattr ( e , 'value' , None )
except Exception :
future . set_exc_info ( sys . exc_info ())
return future
else :
if isinstance ( result , types . GeneratorType ) :
try :
orig_stack_contexts = stack_context . _state . contexts
yielded = next ( result )
if stack_context . _state . contexts is not orig_stack_contexts :
yielded = TracebackFuture ()
yielded . set_exception (
stack_context . StackContextInconsistentError (
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)' ))
except ( StopIteration , Return ) as e :
future . set_result ( getattr
|
运维 · 再见,CDN 巨头:Akamai 宣布 2026 年终止中国服务 3 天前 |
|
冷兔 · 一位网友发现了邻居家的狗有一个特殊的喜好。。 7 年前 |
|
中国经济网 · 最好的嫁妆是贞操、女人常挨揍不闹病…这个“老师”的讲座为什么网友都在怼? 7 年前 |
|
药物一致性评价 · 地方 | 湖北省提高药物临床试验质量 推进仿制药一致性评价 7 年前 |
|
小学生作文 · 小学语文教师核心技能培训预告(语文辅导教师资格证) 7 年前 |
|
药渡 · 麻醉用药之全麻药市场分析:平稳增长,格局稳定 7 年前 |