python

tornado协程解析

文章暂存

systemime
2020-07-20
25 min

摘要.

# 协程与异步

协程是Tornado中进行异步I/O代码开发的方法。协程使用了Python关键字yield将调用者挂起和恢复执行。

# 1. 迭代器

迭代器(lterator)是访问集合内元素的一种方式。迭代器对象从集合的第一个元素开始访问,知道所有元素都被访问一遍后结束,迭代器只能向前迭代,不能后退


python中最常见调用迭代器对象的场景是for循环,for是迭代器封装集合,可以逐个访问集合元素并循环

迭代器相对于普通的python对象,多出一个__next__()方法,每次调用该方法将返回一个元素,for做的就是不断的调用__next__()方法;迭代器并不能无限迭代,当最后一个集合元素被访问后,会返回Stoplteration异常,早期开发者使用yield时,通过捕捉这个异常进行返回或赋值

# 2. yield

调用任何定义包含yield关键字的函数都不会执行,而是会获得一个对应于该函数当迭代器


网上资料很多,yield就不赘述了

# tornado异步

# 1. IOLoop的run_sync运行流程

# 2. 一个Demo

import random
import time
from tornado import gen
from tornado.ioloop import IOLoop


@gen.coroutine
def get_url(url):
    wait_time = random.randint(1, 4)
    yield gen.sleep(wait_time)
    print('URL {} took {}s to get!'.format(url, wait_time))
    raise gen.Return((url, wait_time))


@gen.coroutine
def outer_coroutine():
    before = time.time()
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    result = yield coroutines
    after = time.time()
    print(result)
    print('total time: {} seconds'.format(after - before))

if __name__ == '__main__':
    IOLoop.current().run_sync(outer_coroutine)

输出类似于这样:

URL URL1 took 1s to get!
URL URL2 took 2s to get!
URL URL3 took 2s to get!
[('URL1', 1), ('URL2', 2), ('URL3', 2)]
total time: 2.00353884697 seconds

# 3. Coroutine

起初我以为调用协程后,返回的是一个生成器对象,毕竟gen.coroutine装饰在一个函数或者生成器上。看了源码发现,其实每次调用一个协程,它在获取了生成器对象之后,同时又对它执行了next操作来获取生成器内部yield出来的值,这个可以是一个值,当然也可以是一个由内部协程嵌套调用返回的future对象。

协程coroutine中则有一个中断挂起的概念,比如说有任务A和B,A执行过程中发现自己需要被挂起或者线程发现要把A挂起,那么就挂起A去执行B,知道B被挂起然后A继续执行。如此反复。

最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。 第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。 协程 - 廖雪峰的官方网站

使用协程必须要application层实现协程的调度,同时需要语言本身的支持,我们来看一下tornado官方文档中使用协程的例子。

class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        do_something_with_response(response)
        self.render("template.html")

几乎是用同步方式来写代码,避免了callback的存在。
其实我们都知道callback不可避免,如果使用框架的时候没有显式使用callback,那么一定是这个框架做了一些工作(挂起协程重新得到执行需要callback)。
我想了一下怎么写出tornado风格的程序,也就是程序对yield的处理,这个很有意思。来看下面这段代码:

from backports_abc import Generator as GeneratorType


class Return(Exception):
    def __init__(self, value=None):
        self.value = value


def coroutine(func):
    def wrapper(*args, **kwargs):
        def _dispatch(yielded):
            if isinstance(yielded, GeneratorType):
                return _execute_yield(yielded)
            else:
                return _send(yielded)

        def _send(yielded):
            try:
                yielded = origin_gen.send(yielded)
                return _dispatch(yielded)
            except (StopIteration, Return) as e:
                return getattr(e, 'value', None)
            except Exception as error:
                print 'terrible error happened: %r' % error

        def _execute_yield(gen):
            yielded = next(gen)
            return _dispatch(yielded)

        result = func(*args, **kwargs)
        origin_gen = result
        return _execute_yield(result)

    return wrapper


def get_value2():
    return 10086


def get_value1():
    yield get_value2()


@coroutine
def test():
    value1 = yield get_value1()
    print 'got value1: %d' % value1

    value2 = yield get_value2()
    print 'got value2: %d' % value2

    raise Return(value1 == value2)


if __name__ == '__main__':
    result = test()
    print result

"""
>>> got value1: 10086
>>> got value2: 10086
>>> True
"""

主要在处理yield挂起的协程怎么继续执行,并且在包含yield的函数中实现同步的返回。

这里面有很关键的两行

yielded = next(gen)


next()是生成器的一次执行, 执行完可能等到一个结果实体,也可能还是一个生成器

yielded = origin_gen.send(yielded)


send()方法用户将挂起的协程唤醒,继续执行这个挂起的协程

# 现在协程的关键在哪里呢?

关键就在于什么时候执行next()

我们知道返回一个生成器是基本不会耗费什么资源的, 但是生成器执行一次就说不准了, 生成器中的代码可能是CPU密集,可能包含io,这些都会影响主线程的执行。

所以现在有一个想法,为了获得运行效率,我们要避开在执行next()的时候其中的io等待。

如何避开呢?就是把所有的io操作全部注册到ioloop上,收到ioloop通知说io事件已经完成了,请执行next()吧

这样的好处是在协程执行过程中没有io等待时间,CPU不会因为io等待被抢占。

tornado主要有4个组件

  1. Return() 用户同步返回的特殊异常
  2. Future() 被coroutine装饰的函数/方法的返回值
  3. Runner 调度器
  4. coroutine装饰器


先看一下最简单的Return

class Return(Exception):
    def __init__(self, value=None):
        self.value = value

然后是Future类

class Future(object):
    def __init__(self):
        self.result = None
        self.exc_info = None
        self.done = False
        self.callbacks = []
    def set_result(self, result):
        self.result = result
        self.done = True
        for cb in self.callbacks:
            cb(self)
    def add_done_callback(self, fn):
        if self.done:
            fn(self)
        else:
            self.callbacks.append(fn)

上面没有什么好说的,都是字面意思

然后是coroutine装饰器

def coroutine(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = Future()
        try:
            result = func(*args, **kwargs)
        except (StopIteration, Return) as e:
            result = getattr(e, 'value', None)
        except Exception:
            future.exc_info = sys.exc_info()
            return future
        else:
            if isinstance(result, GeneratorType):
                try:
                    yielded = next(result)
                except (StopIteration, Return) as e:
                    future.set_result(getattr(e, 'value', None))
                except Exception:
                    future.exc_info = sys.exc_info()
                else:
                    # result is generator, yielded is Future
                    Runner(result, future, yielded)
                try:
                    return future
                finally:
                    future = None
        future.result = result
        future.done = True
        return future
    return wrapper

coroutine装饰器会直接执行一次next()方法:

  1. 如果直接有返回那么被装饰的函数就直接返回了.
  2. 如果返回Future对象, 那么生成一个Runner实例来处理


那么我们看一下Runner

class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        self.running = False
        self.finished = False
        self.ioloop = IOLoop.instance()
        if self.handle_yield(first_yielded):
            self.run()
    def handle_yield(self, yielded):
        # yielded is definitely Future
        self.future = yielded
        if not self.future.done:
            self.ioloop.add_future(self.future, lambda f: self.run())
            return False
        return True
    def run(self):
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done:
                    return
                self.future = None
                try:
                    value = future.result
                    yielded = self.gen.send(value)
                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    self.result_future.set_result(getattr(e, 'value', None))
                    self.result_future = None
                    return
                if not self.handle_yield(yielded):
                    return
        finally:
            self.running = False


首先这里值得注意的是,任何coroutine装饰的函数都会直接返回一个Future, 也就是说next()这个过程可以随便执行,因为生成一个Future对象也是不耗费资源的。

然后Runner会检查Future的done的状态,如果完成了,那么就切换至协程的断点继续执行,否则注册到ioloop中,Future done之后由ioloop来通知Runner进行调度。

好,我们来总结一下执行过程

  1. 生成Future,协程挂起
  2. Runner检查Future是否完成,完成则恢复协程执行,否则添加至ioloop中
  3. ioloop通知Runner 某个Future完成了,Runner恢复协程执行


同样的,我们看到利用了ioloop避开了io等待,从而实现了高效。

协程跟人来处理事情其实非常类似:

  1. 你准备焖饭,把饭放进电饭煲,焖好了会有铃声提醒
  2. 现在你会去炒菜,不会在电饭煲前傻等,相当与协程的切换
  3. 收到饭焖好了通知,停下炒菜去看一眼饭糊了没有,完成焖饭事件,删除改协程
  4. 回来继续炒菜


因为我们的目的是避开io等待,利用协程就是为了达到这个目的。

值得注意的是:
tornado中的io事件都会注册到callback中,比如用来进行网络请求tornado.httpclient.AsyncHTTPClient这个client,如果一个第三方库完全没有适配tornado的ioloop,如requests, 那么你使用它就会是一个灾难, 相当于在一个非阻塞的环境里强行进行阻塞操作,所以请勿使用任何未经适配tornado的第三方package在你基于tornado的项目中。

# gen部分源码

# gen.py
def _make_coroutine_wrapper(func, replace_callback):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = TracebackFuture()
        # 省略n行
        try:
            result = func(*args, **kwargs)
        # 省略n个except
        else:
            if isinstance(result, types.GeneratorType):
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)    
                    # 如果func内部有yield关键字,result是一个生成器
                    # 如果func内部又调用了其它协程,yielded将会是由嵌套协程返回的future对象
                    # 省略n行
                # 省略n个except
                else:
                    Runner(result, future, yielded)
                try:
                    return future
                finally:
                    future = None
        future.set_result(result)
        return future
    return wrapper


# 4. Future

Future的设计目标是作为协程(coroutine)和IOLoop的媒介,从而将协程和IOLoop关联起来。

Future在concurrent.py中定义,是异步操作结果的占位符,用于等待结果返回。通常作为函数IOLoop.add_future()的参数或gen.coroutine协程中yield的返回值。

等到结果返回时,外部可以通过调用set_result()设置真正的结果,然后调用所有回调函数,恢复协程的执行

我觉得Future在tornado中是一个很奇妙的对象,它是一个穿梭于协程和调度器之间的信使。提供了回调函数注册(当异步事件完成后,调用注册的回调)、中间结果保存、嵌套协程唤醒父协程(通过Runner实现)等功能。Coroutine和Future是一一对应的,可以从上节gen.coroutine装饰器的实现中看到。

每调用一个协程,表达式所返回的就是一个Future对象,它所表达的意义为:这个协程的内部各种异步逻辑执行完毕后,会把结果保存在这个Future中,同时调用这个Future中指定的回调函数,而future中的回调函数是什么时候被注册的呢?那就是当前——你通过调用协程,返回了这个future对象的时候:

我们看看demo代码中run_sync的实现:

# ioloop.py IOLoop
def run_sync(self, func, timeout=None):
    future_cell = [None]

    def run():
        try:
            result = func()
        except Exception:
            future_cell[0] = TracebackFuture()
            future_cell[0].set_exc_info(sys.exc_info())
        else:
            if is_future(result):
                future_cell[0] = result
            else:
                future_cell[0] = TracebackFuture()
                future_cell[0].set_result(result)
        self.add_future(future_cell[0], lambda future: self.stop())
    self.add_callback(run)
    if timeout is not None:
        timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
    self.start()
    if timeout is not None:
        self.remove_timeout(timeout_handle)
    if not future_cell[0].done():
        raise TimeoutError('Operation timed out after %s seconds' % timeout)
    return future_cell[0].result()

代码中先给IOLoop注册一个回调函数,等下个事件循环再执行内部定义的run函数。在run中通过result = func()执行协程outer_coroutine,result则是该协程对应的future对象。如果这个时候不对future作任何操作,最后这个future完成后也不会执行任何回调。

所以在源码中通过add_future给这个future添加回调函数,也就是self.stop(),表明这个协程执行完毕后触发的操作是退出事件循环。

其实IOLoop::add_future这个函数的命名会有些奇怪,刚读代码还不知道它是干嘛的(给IOLoop添加future是什么鬼?如果说是add_callback那还容易理解),看了add_future的实现就明白了:

# ioloop.py IOLoop
def add_future(self, future, callback):
    """Schedules a callback on the ``IOLoop`` when the given
    `.Future` is finished.

    The callback is invoked with one argument, the
    `.Future`.
    """
    assert is_future(future)
    callback = stack_context.wrap(callback)
    future.add_done_callback(
        lambda future: self.add_callback(callback, future))

它并不会给IOLoop添加future(也没有什么意义),它只是给这个future添加回调函数而已,而这个回调函数是当这个future完成以后给IOLoop添加一个回调函数(有点绕,哈哈~给IOLoop添加的回调函数在这里就是stop)。因此当一个future完成以后,到最后future的回调函数真正被执行将会隔着一个IOLoop的事件循环,而不是马上会被执行

# 5. Runner

如果说tornado是一辆车,那么Runner对象就是它的发动机,由它来调度各种协程来完成异步事件的操作。

Coroutine和Runner也是一一对应的,每个Coroutine都是由一个Runner实例去执行的。协程包装着生成器(当然也有可能是函数,本文考虑比较复杂的协程嵌套调用的情况),在生成器内部,也有可能会调用其它的协程,从而把内部协程的future对象yield出来,这个runner就会通过调用返回的方式(future = next(gen))接到内部出来的future,并把它纳入执行的loop中,先是handle_yielded,再是run(中间会隔着一个或者多个IOLoop的事件循环,因此图中是用虚线表示的)。

调度器中有两个比较重要的函数: handle_yieldedrun,先来看handle_yielded:

# gen.py Runner
def handle_yield(self, yielded):
    # Lists containing YieldPoints require stack contexts;
    # other lists are handled via multi_future in convert_yielded.
    if (isinstance(yielded, list) and
            any(isinstance(f, YieldPoint) for f in yielded)):
        yielded = Multi(yielded)
    elif (isinstance(yielded, dict) and
          any(isinstance(f, YieldPoint) for f in yielded.values())):
        yielded = Multi(yielded)

    if isinstance(yielded, YieldPoint):
        # 省略n行
    else:
        try:
            self.future = convert_yielded(yielded)
        except BadYieldError:
            self.future = TracebackFuture()
            self.future.set_exc_info(sys.exc_info())

    if not self.future.done() or self.future is moment:
        self.io_loop.add_future(
            self.future, lambda f: self.run())
        return False
    return True

在runner中,handle_yielded用于处理generator返回的内部协程future对象。因为协程处理的大部分是异步的事件,所以内部协程yield出来的future对象状态多半还是处于未完成。这个时候收到该future的Runner所能做的也仅仅只是注册一个回调函数而已(上面源码的最后几行)。

再来看看run:

# gen.py Runner
def run(self):
    """Starts or resumes the generator, running until it reaches a
    yield point that is not ready.
    """
    if self.running or self.finished:
        return
    try:
        self.running = True
        while True:
            future = self.future
            if not future.done():
                return
            self.future = None
            try:
                orig_stack_contexts = stack_context._state.contexts
                exc_info = None

                try:
                    value = future.result()
                except Exception:
                    self.had_exception = True
                    exc_info = sys.exc_info()

                if exc_info is not None:
                    yielded = self.gen.throw(*exc_info)
                    exc_info = None
                else:
                    yielded = self.gen.send(value)

                if stack_context._state.contexts is not orig_stack_contexts:
                    self.gen.throw(
                        stack_context.StackContextInconsistentError(
                            'stack_context inconsistency (probably caused '
                            'by yield within a "with StackContext" block)'))
            except (StopIteration, Return) as e:
                self.finished = True
                self.future = _null_future
                if self.pending_callbacks and not self.had_exception:
                    # If we ran cleanly without waiting on all callbacks
                    # raise an error (really more of a warning).  If we
                    # had an exception then some callbacks may have been
                    # orphaned, so skip the check in that case.
                    raise LeakedCallbackError(
                        "finished without waiting for callbacks %r" %
                        self.pending_callbacks)
                self.result_future.set_result(getattr(e, 'value', None))
                self.result_future = None
                self._deactivate_stack_context()
                return
            except Exception:
                self.finished = True
                self.future = _null_future
                self.result_future.set_exc_info(sys.exc_info())
                self.result_future = None
                self._deactivate_stack_context()
                return
            if not self.handle_yield(yielded):
                return
    finally:
        self.running = False

run函数中的注释很好得诠释了它的作用,它就是不断地给传入Runner的generator执行next或者send操作(next或send都会让生成器继续运行,区别就是send会传一个参数进去),直到generator返回的future对象状态还未完成,需要等待异步响应,这个时候它会调用handle_yielded。

异步响应来了以后,就会调用这个run,为什么呢?因为在handle_yielded中给这个future注册了回调函数,回调函数就是run函数。然后在run函数中执行send(value),让这个生成器继续运行,如此往复循环,直到generator退出。

generator退出就代表着这个Runner引擎所跑的Coroutine完成了,然后再给这个Coroutine所对应的Future对象执行set_result操作,表示这个协程的Future已完成了,可以执行它的回调函数了。
**
这个回调函数对于outer_coroutine的future来说就是执行IOLoop的stop操作。对于inner_coroutine的future来说就是outer_coroutine对应的Runner的run操作。 这句话很绕,但是要是真读懂了,相信对于它的运行原理也就了解的差不多了。

# 6. IOLoop

IOLoop是一个很常见的模块,就是多路复用IO机制,好多项目中都有这一块的封装,原理都差不多。

IOLoop是一个I/O事件循环,用于调度socket相关的连接、响应、异步读写等网络事件,并支持在事件循环中添加回调(callback)和定时回调(timeout)。在支持的平台上,默认使用epoll进行I/O多路复用处理。

IOLoop是Tornado的核心,绝大部分模块都依赖于IOLoop的调度。在协程运行环境中,IOLoop担任着协程调度器的角色,能够让暂停的协程重新获得控制权,从而能继续执行。

通过add_future()实现对Future对支持

def add_future(self, future, callback):
    assert is_future(future)
    callback = stack_context.wrap(callback)
    future.add_done_callback(lambda future: self.add_callback(callback, future))

通过调用future的add_done_callback(),使当future在操作完成时,能够通过add_callback将callback添加到IOLoop中,让callback在IOLoop下一次迭代中执行(不在本轮是为了避免饿死)。


也可以参考shadowsocks中的loop模块,它也是用python实现的基于多种不同操作系统io多路复用的封装。tornado的ioloop也是类似的,记录了一个个文件描述符和handler的pair,每当有io事件发生,就会调用该文件描述符对应的handler。如果这个handler是对future执行set_result操作,那连锁地就会执行Runner中的run,从而进入Runner的运行循环中,直到需要等待下一个异步事件,然后再向ioloop注册事件。。。如此循环往复。

# 协程具体流程


gen.coroutine是一个装饰器,负责将普通函数包装成future对象,用于外部设置等待的结果。

第一次协程调用yield释放控制权后

>  [Runner]handle_yield
处理yield返回的结果

->  [Runner]ioloop.add_future(self.future, lambda f: self.run())
将结果构造成future后添加到ioloop

->  [future]add_done_callback(lambda future: self.add_callback(callback, future))
将Runner.run()加入到完成时的回调函数列表中
->  [future]set_result
    已经得到future的结果,设置之

->  [future]_set_done
    调用future所有回调函数(_callbacks)

->  [ioloop]add_callback(callback, future)
    callback为[Runner]add_future添加的那个,即[Runner]self.run(),将在下一轮循环被执行

->  [Runner]self.run()
    取出Runner的self.future(上次yield的返回值)1. 如果future未完成,return,流程结束,等待下一次set_result

    2. 如果future完成
        ->  [Runner]yielded = self.gen.send(value)
        通过send把future的result发送给协程,并让其恢复执行:

        1. 如果协程结束(yield)
            ->  [Runner]self.result_future.set_result
            设置最终的结果result_future

        2. 未结束(再次遇到yield)
            ->  [Runner]handle_yield
            则再次调用handle_yield

# 实例


AsyncHTTPClientfetch()是一个异步操作,其构造了一个HTTP请求,然后调用fetch_impl(),返回一个futurefetch_impl()取决于AsyncHTTPClient的具体实现,默认情况下,AsyncHTTPClient生成的是子类SimpleAsyncHTTPClient的实例,所以主要看SimpleAsyncHTTPClientfetch_impl()

def fetch_impl(self, request, callback):
    key = object()
    self.queue.append((key, request, callback))
    if not len(self.active) < self.max_clients:
        timeout_handle = self.io_loop.add_timeout(
            self.io_loop.time() + min(request.connect_timeout,
                                      request.request_timeout),
            functools.partial(self._on_timeout, key))
    else:
        timeout_handle = None
    self.waiting[key] = (request, callback, timeout_handle)
    self._process_queue()
    if self.queue:
        gen_log.debug("max_clients limit reached, request queued. "
                      "%d active, %d queued requests." % (len(self.active), len(self.queue)))

fetch_impl()接受两个参数,request为fetch()中构造的HTTP请求,callback为fetch中的回调函数handle_response

def handle_response(response):
    if raise_error and response.error:
        future.set_exception(response.error)
    else:
        future.set_result(response)


handle_response()中,调用了我们期待的set_result()。所以我们把目光转移到fetch_impl()callback。在fetch_impl()中,函数先将callback加到队列中,然后通过_process_queue()处理掉,处理时调用_handle_request()

def _handle_request(self, request, release_callback, final_callback):
    self._connection_class()(
        self.io_loop, self, request, release_callback,
        final_callback, self.max_buffer_size, self.tcp_client,
        self.max_header_size, self.max_body_size)

这里构造了一个_connection_class对象,即HTTPConnectionHTTPConnection通过self.tcp_client.connect()来建立TCP连接,然后通过该连接发送HTTP请求, 在超时(timeout)或完成(finish)时调用callback

tcp_client在建立异步TCP连接时,先进行DNS解析(又是协程),然后建立socket来构造IOStream对象,最后调用IOStream.connect()。在IOStream.connect()的过程中,我们看到了关键操作:

self.io_loop.add_handler(self.fileno(), self._handle_events, self._state)


还记得我们前面说过的IOLoop吗?IOLoop可以添加socketcallbacktimeout,并当它们就绪时调用相应的回调函数。这里add_handler处理的就是socket的多路复用,默认的实现是**epoll**。当epoll中该socket就绪时,相关函数得以回调。于是tcp_client读取socket内容获得HTTP responsehandle_response()被调用,最终set_result()被调用。

到这里我们恍然大悟,AsyncHTTPClientset_result()调用依赖于IO多路复用方案,这里是**epoll**,在epoll中相应socket的就绪的是set_result()得到调用的根本原因。而这个就绪事件的传递,离不开Tornado内建的IOStream,异步TCPClient、异步HTTPConnection,这些类的存在为我们隐藏了简单调用后的复杂性。因此当我们在用yield返回耗时操作时,如果不是Tornado的内建组件,则必须自己负责设计set_result的方案,比如以下代码:

@gen.coroutine
def add(self, a, b):
    future = Future()
    def callback(a, b):
        print("calculating the sum of %d + %d:" % (a,b))
        future.set_result(a+b)
    tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)

    result = yield future
    print("%d + %d = %d" % (a, b, result))

通过手动将包含set_result()的回调函数加到IOLoop中,于是回调下一次迭代中执行,set_result()被调用,协程恢复控制权

# 参考


浅析tornado协程运行原理
我所理解的 tornado - concurrent 部分
分析tornado的协程实现






上次编辑于: 5/20/2021, 7:26:49 AM