python

tornado 中的异步调用模式 | 三月沙

文章暂存

systemime
2021-02-03
8 min

摘要.

# fundamental concepts

# 同步

调用者等待调用结果的返回

# 异步

调用者不等待调用结果的返回,而是通过间隔轮询、通知等方式得知结果

# 阻塞

调用方式影响后续指令的执行

# 非阻塞

调用方式不影响后续指令的执行


一般来说,阻塞是绝对的,非阻塞则是相对的,因为任何指令或调用的执行都要占用 CPU 周期,或网络,或 IO。非阻塞只是说调用或者资源消耗不影响后续逻辑执行,他们经得起等待。非阻塞往往和异步一起出现。

tornado 是一个异步非阻塞 httpserver,同时也是一个 web framework。

Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed. By using non-blocking network I/O, Tornado can scale to tens of thousands of open connections, making it ideal for long polling, WebSockets, and other applications that require a long-lived connection to each user。

在 tornado 中异步和非阻塞是通过 ioloop 和 Future 实现的,Future 是借助 python 的协程来实现非阻塞调用。tornado 提供了不同的异步调用形式来适配不同的调用场景,本文的目的就是为了说明不同形式是如何使用的以及它们的差别。

为了演示方便,这里使用了一个公共模块 util.py 来提供 ioloop 的启动和销毁,下文不做特殊说明皆是如此。

# util.py 
import tornado.ioloop

COUNTER = 0

def stop_loop(times):
    global COUNTER
    COUNTER += 1
    if COUNTER == times:
        tornado.ioloop.IOLoop.instance().stop()
        print('====> ioloop end')


def start_loop():
    print('====> ioloop start')
    ioloop = tornado.ioloop.IOLoop.current()
    ioloop.start()

# 最常用的调用方式

在 tornado 中最普遍的使用方式是把函数的调用结果封装成 Future,利用 ioloop 执行并异步获得结果。

# -*- coding:utf-8 -*-
from tornado.httpclient import AsyncHTTPClient
from tornado.concurrent import Future
from tornado import gen

from util import stop_loop, start_loop


# fetch 返回的是 future
def asyn_fetch_future(url):
    http_client = AsyncHTTPClient()
    return http_client.fetch(url)


def asyn_fetch_future_callback(future):
    result = future.result()
    print('future_callback')
    print(result.request.url, result.code, result.reason, result.request_time)
    stop_loop(1)


if __name__ == '__main__':
    result_future = asyn_fetch_future('http://www.apple.com/cn/')  #1
    result_future.add_done_callback(asyn_fetch_future_callback)
    start_loop()

#1 处通过 tornado 提供的异步 httpclient 获得一个 Future,为 Future 添加执行结果回调函数获得执行结果,Future 的执行结果也是一个 Future。

输出结果如下

====> ioloop start
future_callback
('http://www.apple.com/cn/', 200, 'OK', 0.5546278953552246)
====> ioloop end
        0.71 real         0.06 user         0.06 sys

这种调用方式必须手动启动 ioloop 并在调用结束之后手动销毁 ioloop,想要获得调用结果必须为 Future 添加回调。

# 只关心调用,不在乎结果

有时候我们并不在乎函数的调用结果,只要函数正确执行即可,ioloop 提供了 spawn_callback 来执行这样的操作:

# -*- coding:utf-8 -*-
import tornado.gen
import tornado.ioloop

from util import start_loop, stop_loop


@tornado.gen.coroutine
def divide(x, y):
    return x / y


if __name__ == '__main__':
    # The IOLoop will catch the exception and print a stack trace in
    # the logs. Note that this doesn't look like a normal call, since
    # we pass the function object to be called by the IOLoop.
    tornado.ioloop.IOLoop.current().spawn_callback(divide, 1, 0)
    tornado.ioloop.IOLoop.current().add_timeout(
        tornado.ioloop.time.time() + 1, stop_loop, 1)
    start_loop()

借助 spawn_callback 可以直接执行函数调用,但是依然需要手动处理 ioloop 的开闭。

# 一次性执行

run_sync 可以自动开启 ioloop 并在函数执行结束之后关闭 ioloop,此种调用在执行一次性操作时非常有用,比如要对数据库进行一次性修改:

@tornado.gen.coroutine
def init_tag():
    tb_tag = Tag() #1
    for i in range(10):
        result = yield tb_tag.insert({'name': random.choice('abcdefghjklpoiuytrewq')})
        print(result)


if __name__ == '__main__':
    tornado.ioloop.IOLoop.current().run_sync(lambda: init_tag())

#1 是一个 motor Mongodb collection,借助 run_sync 可以非常方便对数据库执行操作,由于 run_sync 只接受一个参数,如果函数调用需要传参,可以把函数封装成 lambda。

# 使用 callback*

如果调用需要与 callback 函数进行交互,可以利用 gen.task

# -*- coding:utf-8 -*-
import tornado.gen
import tornado.ioloop

from util import start_loop, stop_loop


@tornado.gen.coroutine
def call_task():
    result = yield tornado.gen.Task(some_function, 1, 2)
    raise tornado.gen.Return(result)


def fetch_coroutine_callback(future):
    print('coroutine callback ==> ', future.result())
    stop_loop(1)


def some_function(x, y, callback=None):
    print('some_function called')
    callback(x * y)


if __name__ == '__main__':
    future = call_task()
    future.add_done_callback(fetch_coroutine_callback)
    start_loop()

使用 gen.task 可以直接返回一个 Future ,被调用的函数会自动增加一个 callback 函数,用来在函数执行结束之后把结果进行回调通知。

# 通过 threadPool 来调用阻塞操作

#-*- coding:utf-8 -*-
from tornado.concurrent import Future
from tornado import gen
import time
from concurrent.futures import ThreadPoolExecutor

from util import start_loop, stop_loop


EXECUTOR = ThreadPoolExecutor(max_workers=4)


def fetch_coroutine_callback(future):
    print('coroutine callback')
    print(future.result())
    stop_loop(2)


def sleep_func(t):
    print('sleep_func call')
    time.sleep(t)
    return 'blocking func result'


@gen.coroutine
def blocking():
    result = yield EXECUTOR.submit(sleep_func, *(4, ))
    raise gen.Return(result)


@gen.coroutine
def not_blocking():
    future = Future()
    future.set_result('not blocking func result')
    result = yield future
    raise gen.Return(result)


if __name__ == '__main__':
    """
    用多线程的方式借助协程实现异步调用
    """
    blocking().add_done_callback(fetch_coroutine_callback)
    not_blocking().add_done_callback(fetch_coroutine_callback)
    start_loop()

如果代码中有阻塞操作,可以借助 ThreadPoolExecutor 来完成异步的调用,这样把耗时的操作交给别的线程,可以使当前的线程继续执行后续的操作。这里的阻塞操作一般是 IO 或者其他系统调用。

# 并行执行

tornado 的 Future 是支持并行执行的,可以对多个 future 进行 yield 操作并返回多个结果。

# -*- coding:utf-8 -*-
from tornado.httpclient import AsyncHTTPClient
from tornado.concurrent import Future
from tornado import gen

from util import stop_loop, start_loop


@gen.coroutine
def fetch_many_coroutine(urls):
    http_client = AsyncHTTPClient()
    response1, response2 = yield [http_client.fetch(urls[0]), http_client.fetch(urls[1])]
    raise gen.Return([response1, response2])

@gen.coroutine
def fetch_coroutine(urls):
    http_client = AsyncHTTPClient()
    responses = yield [http_client.fetch(url) for url in urls]
    raise gen.Return(responses)


def fetch_coroutine_callback(future):
    print('coroutine callback')
    for result in future.result():
        print(
            result.request.url,
            result.code, result.reason, result.request_time)
    stop_loop(2)


if __name__ == '__main__':
    """
    使用gen.coroutine 可以很方便让包含 yield 的函数返回future
    """
    result_future = fetch_coroutine(['https://baidu.com', 'https://baidu.com'])
    result_future.add_done_callback(fetch_coroutine_callback)
    result_future = fetch_many_coroutine(['https://baidu.com', 'https://baidu.com'])
    result_future.add_done_callback(fetch_coroutine_callback)
    start_loop()

文中所有代码见 tornado-asyn,欢迎指正。 https://sanyuesha.com/2017/02/08/tornado-async-style/ https://sanyuesha.com/2017/02/08/tornado-async-style/

上次编辑于: 5/20/2021, 7:26:49 AM