摘要.
# 协程与异步
协程是Tornado中进行异步I/O代码开发的方法。协程使用了Python关键字yield将调用者挂起和恢复执行。
# 1. 迭代器
迭代器(lterator)是访问集合内元素的一种方式。迭代器对象从集合的第一个元素开始访问,知道所有元素都被访问一遍后结束,迭代器只能向前迭代,不能后退
python中最常见调用迭代器对象的场景是for循环,for
是迭代器封装集合,可以逐个访问集合元素并循环
迭代器相对于普通的python对象,多出一个__next__()
方法,每次调用该方法将返回一个元素,for
做的就是不断的调用__next__()
方法;迭代器并不能无限迭代,当最后一个集合元素被访问后,会返回Stoplteration
异常,早期开发者使用yield时,通过捕捉这个异常进行返回或赋值
# 2. yield
调用任何定义包含yield关键字的函数都不会执行,而是会获得一个对应于该函数当迭代器
# tornado异步
# 1. IOLoop的run_sync运行流程
# 2. 一个Demo
import random
import time
from tornado import gen
from tornado.ioloop import IOLoop
@gen.coroutine
def get_url(url):
wait_time = random.randint(1, 4)
yield gen.sleep(wait_time)
print('URL {} took {}s to get!'.format(url, wait_time))
raise gen.Return((url, wait_time))
@gen.coroutine
def outer_coroutine():
before = time.time()
coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
result = yield coroutines
after = time.time()
print(result)
print('total time: {} seconds'.format(after - before))
if __name__ == '__main__':
IOLoop.current().run_sync(outer_coroutine)
输出类似于这样:
URL URL1 took 1s to get!
URL URL2 took 2s to get!
URL URL3 took 2s to get!
[('URL1', 1), ('URL2', 2), ('URL3', 2)]
total time: 2.00353884697 seconds
# 3. Coroutine
起初我以为调用协程后,返回的是一个生成器对象,毕竟gen.coroutine
装饰在一个函数或者生成器上。看了源码发现,其实每次调用一个协程,它在获取了生成器对象之后,同时又对它执行了next
操作来获取生成器内部yield出来的值,这个可以是一个值,当然也可以是一个由内部协程嵌套调用返回的future对象。
协程coroutine
中则有一个中断挂起的概念,比如说有任务A和B,A执行过程中发现自己需要被挂起或者线程发现要把A挂起,那么就挂起A去执行B,知道B被挂起然后A继续执行。如此反复。
最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。 第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。 协程 - 廖雪峰的官方网站
使用协程必须要application层实现协程的调度,同时需要语言本身的支持,我们来看一下tornado官方文档中使用协程的例子。
class GenAsyncHandler(RequestHandler):
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")
几乎是用同步方式来写代码,避免了callback的存在。
其实我们都知道callback不可避免,如果使用框架的时候没有显式使用callback,那么一定是这个框架做了一些工作(挂起协程重新得到执行需要callback)。
我想了一下怎么写出tornado风格的程序,也就是程序对yield的处理,这个很有意思。来看下面这段代码:
from backports_abc import Generator as GeneratorType
class Return(Exception):
def __init__(self, value=None):
self.value = value
def coroutine(func):
def wrapper(*args, **kwargs):
def _dispatch(yielded):
if isinstance(yielded, GeneratorType):
return _execute_yield(yielded)
else:
return _send(yielded)
def _send(yielded):
try:
yielded = origin_gen.send(yielded)
return _dispatch(yielded)
except (StopIteration, Return) as e:
return getattr(e, 'value', None)
except Exception as error:
print 'terrible error happened: %r' % error
def _execute_yield(gen):
yielded = next(gen)
return _dispatch(yielded)
result = func(*args, **kwargs)
origin_gen = result
return _execute_yield(result)
return wrapper
def get_value2():
return 10086
def get_value1():
yield get_value2()
@coroutine
def test():
value1 = yield get_value1()
print 'got value1: %d' % value1
value2 = yield get_value2()
print 'got value2: %d' % value2
raise Return(value1 == value2)
if __name__ == '__main__':
result = test()
print result
"""
>>> got value1: 10086
>>> got value2: 10086
>>> True
"""
主要在处理yield挂起的协程怎么继续执行,并且在包含yield的函数中实现同步的返回。
这里面有很关键的两行
yielded = next(gen)
next()是生成器的一次执行, 执行完可能等到一个结果实体,也可能还是一个生成器
yielded = origin_gen.send(yielded)
send()方法用户将挂起的协程唤醒,继续执行这个挂起的协程
# 现在协程的关键在哪里呢?
关键就在于什么时候执行next()
我们知道返回一个生成器是基本不会耗费什么资源的, 但是生成器执行一次就说不准了, 生成器中的代码可能是CPU密集,可能包含io,这些都会影响主线程的执行。
所以现在有一个想法,为了获得运行效率,我们要避开在执行next()的时候其中的io等待。
如何避开呢?就是把所有的io操作全部注册到ioloop上,收到ioloop通知说io事件已经完成了,请执行next()吧
这样的好处是在协程执行过程中没有io等待时间,CPU不会因为io等待被抢占。
tornado主要有4个组件
- Return() 用户同步返回的特殊异常
- Future() 被coroutine装饰的函数/方法的返回值
- Runner 调度器
- coroutine装饰器
先看一下最简单的Return
class Return(Exception):
def __init__(self, value=None):
self.value = value
然后是Future类
class Future(object):
def __init__(self):
self.result = None
self.exc_info = None
self.done = False
self.callbacks = []
def set_result(self, result):
self.result = result
self.done = True
for cb in self.callbacks:
cb(self)
def add_done_callback(self, fn):
if self.done:
fn(self)
else:
self.callbacks.append(fn)
上面没有什么好说的,都是字面意思
然后是coroutine装饰器
def coroutine(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
future = Future()
try:
result = func(*args, **kwargs)
except (StopIteration, Return) as e:
result = getattr(e, 'value', None)
except Exception:
future.exc_info = sys.exc_info()
return future
else:
if isinstance(result, GeneratorType):
try:
yielded = next(result)
except (StopIteration, Return) as e:
future.set_result(getattr(e, 'value', None))
except Exception:
future.exc_info = sys.exc_info()
else:
# result is generator, yielded is Future
Runner(result, future, yielded)
try:
return future
finally:
future = None
future.result = result
future.done = True
return future
return wrapper
coroutine装饰器会直接执行一次next()方法:
- 如果直接有返回那么被装饰的函数就直接返回了.
- 如果返回Future对象, 那么生成一个Runner实例来处理
那么我们看一下Runner
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.future = _null_future
self.running = False
self.finished = False
self.ioloop = IOLoop.instance()
if self.handle_yield(first_yielded):
self.run()
def handle_yield(self, yielded):
# yielded is definitely Future
self.future = yielded
if not self.future.done:
self.ioloop.add_future(self.future, lambda f: self.run())
return False
return True
def run(self):
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done:
return
self.future = None
try:
value = future.result
yielded = self.gen.send(value)
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
self.result_future.set_result(getattr(e, 'value', None))
self.result_future = None
return
if not self.handle_yield(yielded):
return
finally:
self.running = False
首先这里值得注意的是,任何coroutine装饰的函数都会直接返回一个Future, 也就是说next()这个过程可以随便执行,因为生成一个Future对象也是不耗费资源的。
然后Runner会检查Future的done的状态,如果完成了,那么就切换至协程的断点继续执行,否则注册到ioloop中,Future done之后由ioloop来通知Runner进行调度。
好,我们来总结一下执行过程
- 生成Future,协程挂起
- Runner检查Future是否完成,完成则恢复协程执行,否则添加至ioloop中
- ioloop通知Runner 某个Future完成了,Runner恢复协程执行
同样的,我们看到利用了ioloop避开了io等待,从而实现了高效。
协程跟人来处理事情其实非常类似:
- 你准备焖饭,把饭放进电饭煲,焖好了会有铃声提醒
- 现在你会去炒菜,不会在电饭煲前傻等,相当与协程的切换
- 收到饭焖好了通知,停下炒菜去看一眼饭糊了没有,完成焖饭事件,删除改协程
- 回来继续炒菜
因为我们的目的是避开io等待,利用协程就是为了达到这个目的。
值得注意的是:
tornado中的io事件都会注册到callback中,比如用来进行网络请求tornado.httpclient.AsyncHTTPClient
这个client,如果一个第三方库完全没有适配tornado的ioloop,如requests
, 那么你使用它就会是一个灾难, 相当于在一个非阻塞的环境里强行进行阻塞操作,所以请勿使用任何未经适配tornado的第三方package在你基于tornado的项目中。
# gen部分源码
# gen.py
def _make_coroutine_wrapper(func, replace_callback):
@functools.wraps(func)
def wrapper(*args, **kwargs):
future = TracebackFuture()
# 省略n行
try:
result = func(*args, **kwargs)
# 省略n个except
else:
if isinstance(result, types.GeneratorType):
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result)
# 如果func内部有yield关键字,result是一个生成器
# 如果func内部又调用了其它协程,yielded将会是由嵌套协程返回的future对象
# 省略n行
# 省略n个except
else:
Runner(result, future, yielded)
try:
return future
finally:
future = None
future.set_result(result)
return future
return wrapper
# 4. Future
Future的设计目标是作为协程(coroutine)和IOLoop的媒介,从而将协程和IOLoop关联起来。
Future在concurrent.py中定义,是异步操作结果的占位符,用于等待结果返回。通常作为函数IOLoop.add_future()的参数或gen.coroutine协程中yield的返回值。
等到结果返回时,外部可以通过调用set_result()设置真正的结果,然后调用所有回调函数,恢复协程的执行
我觉得Future
在tornado中是一个很奇妙的对象,它是一个穿梭于协程和调度器之间的信使。提供了回调函数注册(当异步事件完成后,调用注册的回调)、中间结果保存、嵌套协程唤醒父协程(通过Runner实现)等功能。Coroutine和Future是一一对应的,可以从上节gen.coroutine装饰器的实现中看到。
每调用一个协程,表达式所返回的就是一个Future对象,它所表达的意义为:这个协程的内部各种异步逻辑执行完毕后,会把结果保存在这个Future中,同时调用这个Future中指定的回调函数,而future中的回调函数是什么时候被注册的呢?那就是当前——你通过调用协程,返回了这个future对象的时候:
我们看看demo代码中run_sync的实现:
# ioloop.py IOLoop
def run_sync(self, func, timeout=None):
future_cell = [None]
def run():
try:
result = func()
except Exception:
future_cell[0] = TracebackFuture()
future_cell[0].set_exc_info(sys.exc_info())
else:
if is_future(result):
future_cell[0] = result
else:
future_cell[0] = TracebackFuture()
future_cell[0].set_result(result)
self.add_future(future_cell[0], lambda future: self.stop())
self.add_callback(run)
if timeout is not None:
timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
self.start()
if timeout is not None:
self.remove_timeout(timeout_handle)
if not future_cell[0].done():
raise TimeoutError('Operation timed out after %s seconds' % timeout)
return future_cell[0].result()
代码中先给IOLoop
注册一个回调函数,等下个事件循环再执行内部定义的run函数。在run中通过result = func()
执行协程outer_coroutine
,result则是该协程对应的future对象。如果这个时候不对future作任何操作,最后这个future完成后也不会执行任何回调。
所以在源码中通过add_future
给这个future添加回调函数,也就是self.stop()
,表明这个协程执行完毕后触发的操作是退出事件循环。
其实IOLoop::add_future这个函数的命名会有些奇怪,刚读代码还不知道它是干嘛的(给IOLoop添加future是什么鬼?如果说是add_callback那还容易理解),看了add_future的实现就明白了:
# ioloop.py IOLoop
def add_future(self, future, callback):
"""Schedules a callback on the ``IOLoop`` when the given
`.Future` is finished.
The callback is invoked with one argument, the
`.Future`.
"""
assert is_future(future)
callback = stack_context.wrap(callback)
future.add_done_callback(
lambda future: self.add_callback(callback, future))
它并不会给IOLoop添加future(也没有什么意义),它只是给这个future添加回调函数而已,而这个回调函数是当这个future完成以后给IOLoop添加一个回调函数(有点绕,哈哈~给IOLoop添加的回调函数在这里就是stop)。因此当一个future完成以后,到最后future的回调函数真正被执行将会隔着一个IOLoop的事件循环,而不是马上会被执行。
# 5. Runner
如果说tornado是一辆车,那么Runner对象就是它的发动机,由它来调度各种协程来完成异步事件的操作。
Coroutine和Runner也是一一对应的,每个Coroutine都是由一个Runner实例去执行的。协程包装着生成器(当然也有可能是函数,本文考虑比较复杂的协程嵌套调用的情况),在生成器内部,也有可能会调用其它的协程,从而把内部协程的future对象yield出来,这个runner就会通过调用返回的方式(future = next(gen)
)接到内部出来的future,并把它纳入执行的loop中,先是handle_yielded
,再是run
(中间会隔着一个或者多个IOLoop的事件循环,因此图中是用虚线表示的)。
调度器中有两个比较重要的函数: handle_yielded
和run
,先来看handle_yielded
:
# gen.py Runner
def handle_yield(self, yielded):
# Lists containing YieldPoints require stack contexts;
# other lists are handled via multi_future in convert_yielded.
if (isinstance(yielded, list) and
any(isinstance(f, YieldPoint) for f in yielded)):
yielded = Multi(yielded)
elif (isinstance(yielded, dict) and
any(isinstance(f, YieldPoint) for f in yielded.values())):
yielded = Multi(yielded)
if isinstance(yielded, YieldPoint):
# 省略n行
else:
try:
self.future = convert_yielded(yielded)
except BadYieldError:
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())
if not self.future.done() or self.future is moment:
self.io_loop.add_future(
self.future, lambda f: self.run())
return False
return True
在runner中,handle_yielded
用于处理generator返回的内部协程future对象。因为协程处理的大部分是异步的事件,所以内部协程yield出来的future对象状态多半还是处于未完成。这个时候收到该future的Runner所能做的也仅仅只是注册一个回调函数而已(上面源码的最后几行)。
再来看看run
:
# gen.py Runner
def run(self):
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done():
return
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
exc_info = None
try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()
if exc_info is not None:
yielded = self.gen.throw(*exc_info)
exc_info = None
else:
yielded = self.gen.send(value)
if stack_context._state.contexts is not orig_stack_contexts:
self.gen.throw(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
if self.pending_callbacks and not self.had_exception:
# If we ran cleanly without waiting on all callbacks
# raise an error (really more of a warning). If we
# had an exception then some callbacks may have been
# orphaned, so skip the check in that case.
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
self.result_future.set_result(getattr(e, 'value', None))
self.result_future = None
self._deactivate_stack_context()
return
except Exception:
self.finished = True
self.future = _null_future
self.result_future.set_exc_info(sys.exc_info())
self.result_future = None
self._deactivate_stack_context()
return
if not self.handle_yield(yielded):
return
finally:
self.running = False
run函数中的注释很好得诠释了它的作用,它就是不断地给传入Runner的generator执行next或者send操作(next或send都会让生成器继续运行,区别就是send会传一个参数进去),直到generator返回的future对象状态还未完成,需要等待异步响应,这个时候它会调用handle_yielded。
异步响应来了以后,就会调用这个run,为什么呢?因为在handle_yielded
中给这个future注册了回调函数,回调函数就是run
函数。然后在run函数中执行send(value),让这个生成器继续运行,如此往复循环,直到generator退出。
generator退出就代表着这个Runner引擎所跑的Coroutine完成了,然后再给这个Coroutine所对应的Future对象执行set_result操作,表示这个协程的Future已完成了,可以执行它的回调函数了。
**
这个回调函数对于outer_coroutine的future来说就是执行IOLoop的stop操作。对于inner_coroutine的future来说就是outer_coroutine对应的Runner的run操作。 这句话很绕,但是要是真读懂了,相信对于它的运行原理也就了解的差不多了。
# 6. IOLoop
IOLoop是一个很常见的模块,就是多路复用IO机制,好多项目中都有这一块的封装,原理都差不多。
IOLoop是一个I/O事件循环,用于调度socket相关的连接、响应、异步读写等网络事件,并支持在事件循环中添加回调(callback)和定时回调(timeout)。在支持的平台上,默认使用epoll进行I/O多路复用处理。
IOLoop是Tornado的核心,绝大部分模块都依赖于IOLoop的调度。在协程运行环境中,IOLoop担任着协程调度器的角色,能够让暂停的协程重新获得控制权,从而能继续执行。
通过add_future()实现对Future对支持
def add_future(self, future, callback):
assert is_future(future)
callback = stack_context.wrap(callback)
future.add_done_callback(lambda future: self.add_callback(callback, future))
通过调用future的add_done_callback(),使当future在操作完成时,能够通过add_callback将callback添加到IOLoop中,让callback在IOLoop下一次迭代中执行(不在本轮是为了避免饿死)。
也可以参考shadowsocks中的loop模块,它也是用python实现的基于多种不同操作系统io多路复用的封装。tornado的ioloop也是类似的,记录了一个个文件描述符和handler的pair,每当有io事件发生,就会调用该文件描述符对应的handler。如果这个handler是对future执行set_result操作,那连锁地就会执行Runner中的run,从而进入Runner的运行循环中,直到需要等待下一个异步事件,然后再向ioloop注册事件。。。如此循环往复。
# 协程具体流程
gen.coroutine是一个装饰器,负责将普通函数包装成future对象,用于外部设置等待的结果。
第一次协程调用yield释放控制权后
-> [Runner]handle_yield
处理yield返回的结果
-> [Runner]ioloop.add_future(self.future, lambda f: self.run())
将结果构造成future后添加到ioloop
-> [future]add_done_callback(lambda future: self.add_callback(callback, future))
将Runner.run()加入到完成时的回调函数列表中
-> [future]set_result
已经得到future的结果,设置之
-> [future]_set_done
调用future所有回调函数(_callbacks)
-> [ioloop]add_callback(callback, future)
callback为[Runner]add_future添加的那个,即[Runner]self.run(),将在下一轮循环被执行
-> [Runner]self.run()
取出Runner的self.future(上次yield的返回值):
1. 如果future未完成,return,流程结束,等待下一次set_result
2. 如果future完成
-> [Runner]yielded = self.gen.send(value)
通过send把future的result发送给协程,并让其恢复执行:
1. 如果协程结束(没yield了)
-> [Runner]self.result_future.set_result
设置最终的结果result_future
2. 未结束(再次遇到yield)
-> [Runner]handle_yield
则再次调用handle_yield
# 实例
AsyncHTTPClient
的fetch()
是一个异步操作,其构造了一个HTTP请求,然后调用fetch_impl()
,返回一个future
。fetch_impl()
取决于AsyncHTTPClient
的具体实现,默认情况下,AsyncHTTPClient
生成的是子类SimpleAsyncHTTPClient
的实例,所以主要看SimpleAsyncHTTPClient
的fetch_impl()
:
def fetch_impl(self, request, callback):
key = object()
self.queue.append((key, request, callback))
if not len(self.active) < self.max_clients:
timeout_handle = self.io_loop.add_timeout(
self.io_loop.time() + min(request.connect_timeout,
request.request_timeout),
functools.partial(self._on_timeout, key))
else:
timeout_handle = None
self.waiting[key] = (request, callback, timeout_handle)
self._process_queue()
if self.queue:
gen_log.debug("max_clients limit reached, request queued. "
"%d active, %d queued requests." % (len(self.active), len(self.queue)))
fetch_impl()
接受两个参数,request为fetch()
中构造的HTTP请求,callback
为fetch中的回调函数handle_response
:
def handle_response(response):
if raise_error and response.error:
future.set_exception(response.error)
else:
future.set_result(response)
在handle_response()
中,调用了我们期待的set_result()
。所以我们把目光转移到fetch_impl()
的callback
。在fetch_impl()
中,函数先将callback
加到队列中,然后通过_process_queue()
处理掉,处理时调用_handle_request()
:
def _handle_request(self, request, release_callback, final_callback):
self._connection_class()(
self.io_loop, self, request, release_callback,
final_callback, self.max_buffer_size, self.tcp_client,
self.max_header_size, self.max_body_size)
这里构造了一个_connection_class
对象,即HTTPConnection
。HTTPConnection
通过self.tcp_client.connect()
来建立TCP连接,然后通过该连接发送HTTP请求, 在超时(timeout)或完成(finish)时调用callback
。
tcp_client
在建立异步TCP连接时,先进行DNS解析(又是协程),然后建立socket来构造IOStream对象,最后调用IOStream.connect()
。在IOStream.connect()
的过程中,我们看到了关键操作:
self.io_loop.add_handler(self.fileno(), self._handle_events, self._state)
还记得我们前面说过的IOLoop
吗?IOLoop
可以添加socket
、callback
和timeout
,并当它们就绪时调用相应的回调函数。这里add_handler
处理的就是socket的多路复用,默认的实现是**epoll
**。当epoll中该socket就绪时,相关函数得以回调。于是tcp_client
读取socket内容获得HTTP response
,handle_response()
被调用,最终set_result()
被调用。
到这里我们恍然大悟,AsyncHTTPClient
的set_result()
调用依赖于IO多路复用方案,这里是**epoll**
,在epoll
中相应socket
的就绪的是set_result()
得到调用的根本原因。而这个就绪事件的传递,离不开Tornado内建的IOStream
,异步TCPClient
、异步HTTPConnection
,这些类的存在为我们隐藏了简单调用后的复杂性。因此当我们在用yield返回耗时操作时,如果不是Tornado的内建组件,则必须自己负责设计set_result的方案,比如以下代码:
@gen.coroutine
def add(self, a, b):
future = Future()
def callback(a, b):
print("calculating the sum of %d + %d:" % (a,b))
future.set_result(a+b)
tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)
result = yield future
print("%d + %d = %d" % (a, b, result))
通过手动将包含set_result()的回调函数加到IOLoop中,于是回调下一次迭代中执行,set_result()被调用,协程恢复控制权