python

asyncio结合进程与线程(译) - 我的小米粥分你一半

文章暂存

systemime
2021-02-07
6 min

摘要.

注意: 这是一篇译文, 原文为Combining Coroutines with Threads and Processes

现有的许多 Python 库还没有准备好结合 asyncio. 它们可能会阻塞, 或是依赖于模块未提供的 并发功能. 我们仍然有可能在基于 asyncio 的程序中使用到这些库. 方法就是使用 concurrent.futures 提供的 executor 来将这些代码运行在单独的线程或是进程中.

# 线程

event loop 中的run_in_executor方法需要一个 executor 对象, 一个可以被调用的对象, 以及一些需要被传递的参数. 它会返回一个Future对象, 这个对象可以等待函数结束, 并传递返回值. 如果我们没有传递executor对象, 一个ExecutorPoolExecutor将会 被创建, 下面这个例子显式的创建了executor来限制最大的并发线程数.

一个ThreadPoolExecutor会开启自己的线程, 而后会在线程中调用每个传入的函数. 这个例子显示了如何结合run_in_executor()wait(), 使得 event loop 在这些 阻塞函数运行的时候仍然有yield的能力, 而后在这些函数结束时, event loop 将会重新激活调用者.


import asyncio
import concurrent.futures
import logging
import sys
import time


def blocks(n):
    log = logging.getLogger('blocks({})'.format(n))
    log.info('running')
    time.sleep(0.1)
    log.info('done')
    return n ** 2


async def run_blocking_tasks(executor):
    log = logging.getLogger('run_blocking_tasks')
    log.info('starting')

    log.info('creating executor tasks')
    loop = asyncio.get_event_loop()
    blocking_tasks = [
        loop.run_in_executor(executor, blocks, i)
        for i in range(6)
    ]
    log.info('waiting for executor tasks')
    completed, pending = await asyncio.wait(blocking_tasks)
    results = [t.result() for t in completed]
    log.info('results: {!r}'.format(results))

    log.info('exiting')


if __name__ == '__main__':
    
    
    logging.basicConfig(
        level=logging.INFO,
        format='%(threadName)10s %(name)18s: %(message)s',
        stream=sys.stderr,
    )

    
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=3,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            run_blocking_tasks(executor)
        )
    finally:
        event_loop.close()

asyncio_executor_thread.py使用logging记录可以方便的表示是哪个函数或是线程生成. 因为每个阻塞函数中调用了单独的 logger, 输出也显示了一些线程被重用, 以此 来完成工作.

$ python asyncio_exector_thread.py
MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0          blocks(0): running
ThreadPoolExecutor-0_1          blocks(1): running
ThreadPoolExecutor-0_2          blocks(2): running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_0          blocks(0): done
ThreadPoolExecutor-0_0          blocks(3): running
ThreadPoolExecutor-0_2          blocks(2): done
ThreadPoolExecutor-0_2          blocks(4): running
ThreadPoolExecutor-0_1          blocks(1): done
ThreadPoolExecutor-0_1          blocks(5): running
ThreadPoolExecutor-0_0          blocks(3): done
ThreadPoolExecutor-0_2          blocks(4): done
ThreadPoolExecutor-0_1          blocks(5): done
MainThread run_blocking_tasks: results: [4, 9, 0, 16, 25, 1]
MainThread run_blocking_tasks: exiting

# 进程

一个ProcessPoolExecutor的工作与上面的类似, 只是创建了一些工作进程而不是线程. 使用单独的线程需要更多的系统资源, 但是对于计算密集型的操作, 它能够充分的利用 每个 CPU 核.




if __name__ == '__main__':
    
    
    logging.basicConfig(
        level=logging.INFO,
        format='PID %(process)5s %(name)18s: %(message)s',
        stream=sys.stderr,
    )

    
    executor = concurrent.futures.ProcessPoolExecutor(
        max_workers=3,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            run_blocking_tasks(executor)
        )
    finally:
        event_loop.close()

代码中与进程不同的就是创建了不同类型的executor. 这个例子中也改变了日志格式, 打印了线程的 id 而不是进程名称, 以次来表示任务是在单独的进程中运行.

$ python asyncio_exector_process.py
PID 23876 run_blocking_tasks: starting
PID 23876 run_blocking_tasks: creating executor tasks
PID 23876 run_blocking_tasks: waiting for executor tasks
PID 23877          blocks(0): running
PID 23878          blocks(1): running
PID 23879          blocks(2): running
PID 23877          blocks(0): done
PID 23878          blocks(1): done
PID 23879          blocks(2): done
PID 23878          blocks(3): running
PID 23877          blocks(4): running
PID 23879          blocks(5): running
PID 23878          blocks(3): done
PID 23877          blocks(4): done
PID 23879          blocks(5): done
PID 23876 run_blocking_tasks: results: [4, 0, 25, 9, 16, 1]
PID 23876 run_blocking_tasks: exiting

# 我的理解与拓展

文章中第一部分关于线程的介绍, 给人一种线程池的感觉. 首先, 你创建一些线程, executor 就是一种池, 这里只是给定最大值, 而后提交任务. 不同的是, 这里使用 await 将线程的结果与当前 event loop 线程的上下文粘合在了一起, 可以避免写回调.

可能这的确是一种过渡方式, 将某些暂时不支持 asyncio 的库, 强行拓展支持.

官方文档对于run_in_executor的介绍, 相比于原始的上面的博客, 可能更 偏基础一些, 正常的使用还是依照上面的例子比较合适. https://corvo.myseu.cn/2019/05/22/2019-05-22-asyncio%E7%BB%93%E5%90%88%E8%BF%9B%E7%A8%8B%E4%B8%8E%E7%BA%BF%E7%A8%8B/ https://corvo.myseu.cn/2019/05/22/2019-05-22-asyncio%E7%BB%93%E5%90%88%E8%BF%9B%E7%A8%8B%E4%B8%8E%E7%BA%BF%E7%A8%8B/

上次编辑于: 5/20/2021, 7:26:49 AM