摘要.
我想运行使用协程和多线程请求 URL 的服务。但是,我无法将协程传递给执行者中的工人。有关此问题的最小示例,请参见下面的代码:
import time
import asyncio
import concurrent.futures
EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=5)
async def async_request(loop):
await asyncio.sleep(3)
def sync_request(_):
time.sleep(3)
async def main(loop):
futures = [loop.run_in_executor(EXECUTOR, async_request,loop)
for x in range(10)]
await asyncio.wait(futures)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
导致以下错误:
Traceback (most recent call last):
File "co_test.py", line 17, in <module>
loop.run_until_complete(main(loop))
File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "co_test.py", line 10, in main
futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)]
File "co_test.py", line 10, in <listcomp>
futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)]
File "/usr/lib/python3.5/asyncio/base_events.py", line 541, in run_in_executor
raise TypeError("coroutines cannot be used with run_in_executor()")
TypeError: coroutines cannot be used with run_in_executor()
我知道我可以使用sync_request
函数而不是async_request
,在这种情况下,我可以通过将阻塞函数发送到另一个线程来获得协程。
我也知道我可以在事件循环中调用async_request
十次。类似于下面的代码:
loop = asyncio.get_event_loop()
futures = [async_request(loop) for i in range(10)]
loop.run_until_complete(asyncio.wait(futures))
但是在这种情况下,我将使用单个线程。
我如何使用两种方案,协程在多线程中工作?从代码中可以看到,我正在将(不使用)pool
传递给async_request
,希望我可以编写一些代码来告诉工作人员创造未来,将其发送到池中并异步进行(释放工人)等待结果。
我想要这样做的原因是使应用程序可伸缩。这是不必要的步骤吗?我应该每个网址都简单地拥有一个线程吗?就像是:
LEN = len(list_of_urls)
EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=LEN)
够好吗?
最佳答案
您必须在线程上下文中创建并设置新的事件循环,才能运行协程:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def run(corofn, *args):
loop = asyncio.new_event_loop()
try:
coro = corofn(*args)
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
finally:
loop.close()
async def main():
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=5)
futures = [
loop.run_in_executor(executor, run, asyncio.sleep, 1, x)
for x in range(10)]
print(await asyncio.gather(*futures))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
https://www.coder.work/article/6186367 https://www.coder.work/article/6186367