摘要.
# fundamental concepts
# 同步
调用者等待调用结果的返回
# 异步
调用者不等待调用结果的返回,而是通过间隔轮询、通知等方式得知结果
# 阻塞
调用方式影响后续指令的执行
# 非阻塞
调用方式不影响后续指令的执行
一般来说,阻塞是绝对的,非阻塞则是相对的,因为任何指令或调用的执行都要占用 CPU 周期,或网络,或 IO。非阻塞只是说调用或者资源消耗不影响后续逻辑执行,他们经得起等待。非阻塞往往和异步一起出现。
tornado 是一个异步非阻塞 httpserver,同时也是一个 web framework。
Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed. By using non-blocking network I/O, Tornado can scale to tens of thousands of open connections, making it ideal for long polling, WebSockets, and other applications that require a long-lived connection to each user。
在 tornado 中异步和非阻塞是通过 ioloop 和 Future 实现的,Future 是借助 python 的协程来实现非阻塞调用。tornado 提供了不同的异步调用形式来适配不同的调用场景,本文的目的就是为了说明不同形式是如何使用的以及它们的差别。
为了演示方便,这里使用了一个公共模块 util.py 来提供 ioloop 的启动和销毁,下文不做特殊说明皆是如此。
# util.py
import tornado.ioloop
COUNTER = 0
def stop_loop(times):
global COUNTER
COUNTER += 1
if COUNTER == times:
tornado.ioloop.IOLoop.instance().stop()
print('====> ioloop end')
def start_loop():
print('====> ioloop start')
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()
# 最常用的调用方式
在 tornado 中最普遍的使用方式是把函数的调用结果封装成 Future,利用 ioloop 执行并异步获得结果。
# -*- coding:utf-8 -*-
from tornado.httpclient import AsyncHTTPClient
from tornado.concurrent import Future
from tornado import gen
from util import stop_loop, start_loop
# fetch 返回的是 future
def asyn_fetch_future(url):
http_client = AsyncHTTPClient()
return http_client.fetch(url)
def asyn_fetch_future_callback(future):
result = future.result()
print('future_callback')
print(result.request.url, result.code, result.reason, result.request_time)
stop_loop(1)
if __name__ == '__main__':
result_future = asyn_fetch_future('http://www.apple.com/cn/') #1
result_future.add_done_callback(asyn_fetch_future_callback)
start_loop()
在 #1
处通过 tornado 提供的异步 httpclient 获得一个 Future,为 Future 添加执行结果回调函数获得执行结果,Future 的执行结果也是一个 Future。
输出结果如下
====> ioloop start
future_callback
('http://www.apple.com/cn/', 200, 'OK', 0.5546278953552246)
====> ioloop end
0.71 real 0.06 user 0.06 sys
这种调用方式必须手动启动 ioloop 并在调用结束之后手动销毁 ioloop,想要获得调用结果必须为 Future 添加回调。
# 只关心调用,不在乎结果
有时候我们并不在乎函数的调用结果,只要函数正确执行即可,ioloop 提供了 spawn_callback 来执行这样的操作:
# -*- coding:utf-8 -*-
import tornado.gen
import tornado.ioloop
from util import start_loop, stop_loop
@tornado.gen.coroutine
def divide(x, y):
return x / y
if __name__ == '__main__':
# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
tornado.ioloop.IOLoop.current().spawn_callback(divide, 1, 0)
tornado.ioloop.IOLoop.current().add_timeout(
tornado.ioloop.time.time() + 1, stop_loop, 1)
start_loop()
借助 spawn_callback 可以直接执行函数调用,但是依然需要手动处理 ioloop 的开闭。
# 一次性执行
run_sync 可以自动开启 ioloop 并在函数执行结束之后关闭 ioloop,此种调用在执行一次性操作时非常有用,比如要对数据库进行一次性修改:
@tornado.gen.coroutine
def init_tag():
tb_tag = Tag() #1
for i in range(10):
result = yield tb_tag.insert({'name': random.choice('abcdefghjklpoiuytrewq')})
print(result)
if __name__ == '__main__':
tornado.ioloop.IOLoop.current().run_sync(lambda: init_tag())
#1
是一个 motor Mongodb collection,借助 run_sync 可以非常方便对数据库执行操作,由于 run_sync 只接受一个参数,如果函数调用需要传参,可以把函数封装成 lambda。
# 使用 callback*
如果调用需要与 callback 函数进行交互,可以利用 gen.task
# -*- coding:utf-8 -*-
import tornado.gen
import tornado.ioloop
from util import start_loop, stop_loop
@tornado.gen.coroutine
def call_task():
result = yield tornado.gen.Task(some_function, 1, 2)
raise tornado.gen.Return(result)
def fetch_coroutine_callback(future):
print('coroutine callback ==> ', future.result())
stop_loop(1)
def some_function(x, y, callback=None):
print('some_function called')
callback(x * y)
if __name__ == '__main__':
future = call_task()
future.add_done_callback(fetch_coroutine_callback)
start_loop()
使用 gen.task 可以直接返回一个 Future ,被调用的函数会自动增加一个 callback 函数,用来在函数执行结束之后把结果进行回调通知。
# 通过 threadPool 来调用阻塞操作
#-*- coding:utf-8 -*-
from tornado.concurrent import Future
from tornado import gen
import time
from concurrent.futures import ThreadPoolExecutor
from util import start_loop, stop_loop
EXECUTOR = ThreadPoolExecutor(max_workers=4)
def fetch_coroutine_callback(future):
print('coroutine callback')
print(future.result())
stop_loop(2)
def sleep_func(t):
print('sleep_func call')
time.sleep(t)
return 'blocking func result'
@gen.coroutine
def blocking():
result = yield EXECUTOR.submit(sleep_func, *(4, ))
raise gen.Return(result)
@gen.coroutine
def not_blocking():
future = Future()
future.set_result('not blocking func result')
result = yield future
raise gen.Return(result)
if __name__ == '__main__':
"""
用多线程的方式借助协程实现异步调用
"""
blocking().add_done_callback(fetch_coroutine_callback)
not_blocking().add_done_callback(fetch_coroutine_callback)
start_loop()
如果代码中有阻塞操作,可以借助 ThreadPoolExecutor 来完成异步的调用,这样把耗时的操作交给别的线程,可以使当前的线程继续执行后续的操作。这里的阻塞操作一般是 IO 或者其他系统调用。
# 并行执行
tornado 的 Future 是支持并行执行的,可以对多个 future 进行 yield 操作并返回多个结果。
# -*- coding:utf-8 -*-
from tornado.httpclient import AsyncHTTPClient
from tornado.concurrent import Future
from tornado import gen
from util import stop_loop, start_loop
@gen.coroutine
def fetch_many_coroutine(urls):
http_client = AsyncHTTPClient()
response1, response2 = yield [http_client.fetch(urls[0]), http_client.fetch(urls[1])]
raise gen.Return([response1, response2])
@gen.coroutine
def fetch_coroutine(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url in urls]
raise gen.Return(responses)
def fetch_coroutine_callback(future):
print('coroutine callback')
for result in future.result():
print(
result.request.url,
result.code, result.reason, result.request_time)
stop_loop(2)
if __name__ == '__main__':
"""
使用gen.coroutine 可以很方便让包含 yield 的函数返回future
"""
result_future = fetch_coroutine(['https://baidu.com', 'https://baidu.com'])
result_future.add_done_callback(fetch_coroutine_callback)
result_future = fetch_many_coroutine(['https://baidu.com', 'https://baidu.com'])
result_future.add_done_callback(fetch_coroutine_callback)
start_loop()
文中所有代码见 tornado-asyn,欢迎指正。 https://sanyuesha.com/2017/02/08/tornado-async-style/ https://sanyuesha.com/2017/02/08/tornado-async-style/