摘要.
注意: 这是一篇译文, 原文为Combining Coroutines with Threads and Processes
现有的许多 Python 库还没有准备好结合 asyncio. 它们可能会阻塞, 或是依赖于模块未提供的 并发功能. 我们仍然有可能在基于 asyncio 的程序中使用到这些库. 方法就是使用 concurrent.futures 提供的 executor 来将这些代码运行在单独的线程或是进程中.
# 线程
event loop 中的run_in_executor
方法需要一个 executor 对象, 一个可以被调用的对象, 以及一些需要被传递的参数. 它会返回一个Future
对象, 这个对象可以等待函数结束, 并传递返回值. 如果我们没有传递executor
对象, 一个ExecutorPoolExecutor
将会 被创建, 下面这个例子显式的创建了executor
来限制最大的并发线程数.
一个ThreadPoolExecutor
会开启自己的线程, 而后会在线程中调用每个传入的函数. 这个例子显示了如何结合run_in_executor()
与wait()
, 使得 event loop 在这些 阻塞函数运行的时候仍然有yield
的能力, 而后在这些函数结束时, event loop 将会重新激活调用者.
import asyncio
import concurrent.futures
import logging
import sys
import time
def blocks(n):
log = logging.getLogger('blocks({})'.format(n))
log.info('running')
time.sleep(0.1)
log.info('done')
return n ** 2
async def run_blocking_tasks(executor):
log = logging.getLogger('run_blocking_tasks')
log.info('starting')
log.info('creating executor tasks')
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, blocks, i)
for i in range(6)
]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
asyncio_executor_thread.py
使用logging
记录可以方便的表示是哪个函数或是线程生成. 因为每个阻塞函数中调用了单独的 logger, 输出也显示了一些线程被重用, 以此 来完成工作.
$ python asyncio_exector_thread.py
MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0 blocks(0): running
ThreadPoolExecutor-0_1 blocks(1): running
ThreadPoolExecutor-0_2 blocks(2): running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_0 blocks(0): done
ThreadPoolExecutor-0_0 blocks(3): running
ThreadPoolExecutor-0_2 blocks(2): done
ThreadPoolExecutor-0_2 blocks(4): running
ThreadPoolExecutor-0_1 blocks(1): done
ThreadPoolExecutor-0_1 blocks(5): running
ThreadPoolExecutor-0_0 blocks(3): done
ThreadPoolExecutor-0_2 blocks(4): done
ThreadPoolExecutor-0_1 blocks(5): done
MainThread run_blocking_tasks: results: [4, 9, 0, 16, 25, 1]
MainThread run_blocking_tasks: exiting
# 进程
一个ProcessPoolExecutor
的工作与上面的类似, 只是创建了一些工作进程而不是线程. 使用单独的线程需要更多的系统资源, 但是对于计算密集型的操作, 它能够充分的利用 每个 CPU 核.
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='PID %(process)5s %(name)18s: %(message)s',
stream=sys.stderr,
)
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
代码中与进程不同的就是创建了不同类型的executor
. 这个例子中也改变了日志格式, 打印了线程的 id 而不是进程名称, 以次来表示任务是在单独的进程中运行.
$ python asyncio_exector_process.py
PID 23876 run_blocking_tasks: starting
PID 23876 run_blocking_tasks: creating executor tasks
PID 23876 run_blocking_tasks: waiting for executor tasks
PID 23877 blocks(0): running
PID 23878 blocks(1): running
PID 23879 blocks(2): running
PID 23877 blocks(0): done
PID 23878 blocks(1): done
PID 23879 blocks(2): done
PID 23878 blocks(3): running
PID 23877 blocks(4): running
PID 23879 blocks(5): running
PID 23878 blocks(3): done
PID 23877 blocks(4): done
PID 23879 blocks(5): done
PID 23876 run_blocking_tasks: results: [4, 0, 25, 9, 16, 1]
PID 23876 run_blocking_tasks: exiting
# 我的理解与拓展
文章中第一部分关于线程的介绍, 给人一种线程池的感觉. 首先, 你创建一些线程, executor 就是一种池, 这里只是给定最大值, 而后提交任务. 不同的是, 这里使用 await 将线程的结果与当前 event loop 线程的上下文粘合在了一起, 可以避免写回调.
可能这的确是一种过渡方式, 将某些暂时不支持 asyncio 的库, 强行拓展支持.
官方文档对于run_in_executor
的介绍, 相比于原始的上面的博客, 可能更 偏基础一些, 正常的使用还是依照上面的例子比较合适.
https://corvo.myseu.cn/2019/05/22/2019-05-22-asyncio%E7%BB%93%E5%90%88%E8%BF%9B%E7%A8%8B%E4%B8%8E%E7%BA%BF%E7%A8%8B/ https://corvo.myseu.cn/2019/05/22/2019-05-22-asyncio%E7%BB%93%E5%90%88%E8%BF%9B%E7%A8%8B%E4%B8%8E%E7%BA%BF%E7%A8%8B/