python

python asyncio aiohttp 异步下载 完整例子

文章暂存

systemime
2020-08-09
4 min

摘要.

asyncio 基础:python asyncio 协程
之前完整的说明了并发下载例子: 并发下载
使用了aiohttp,
注意, 下面写文件没有使用异步写入.一般情况下可以配合ThreadPoolExecutor来结合写入文件 : 关于aiohttp下载大文件的方式

1. import os,sys,time,asyncio,aiohttp
2. import tqdm
3. FLAGS = ('CN IN US ID BR PK NG BD RU JP '
4. 'MX PH VN ET EG DE IR TR CD FR').split()
5. BASE_URL = 'http://flupy.org/data/flags'        #下载url
6. DEST_DIR = 'downloads/'                           #保存目录
7. #获取链接,下载文件
8. async def fetch(session:aiohttp.ClientSession,url:str,path:str,flag:str):
9.     print(flag, ' 开始下载')
10. async with session.get(url) as resp:
11. with open(path,'wb') as fd:
12. while 1:
13.                 chunk = await resp.content.read(1024)    #每次获取1024字节
14. if not chunk:
15. break
16.                 fd.write(chunk)
17. return flag
18. 
19. async def begin_download(sem,session:aiohttp.ClientSession,url:str,path:str,flag:str):
20. #控制协程并发数量
21. with (await sem):
22. return await fetch(session,url,path,flag)
23. 
24. async def download(sem:asyncio.Semaphore):
25.     tasks = []
26. async with aiohttp.ClientSession() as session:
27. for flag in FLAGS:
28. #创建路径以及url
29.             path = os.path.join(DEST_DIR, flag.lower() + '.gif')
30.             url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=flag.lower())
31. #构造一个协程列表
32.             tasks.append(asyncio.ensure_future(begin_download(sem,session, url, path, flag)))
33. #等待返回结果
34.         tasks_iter = asyncio.as_completed(tasks)
35. #创建一个进度条
36.         fk_task_iter = tqdm.tqdm(tasks_iter,total=len(FLAGS))
37. for coroutine in fk_task_iter:
38. #获取结果
39.             res = await coroutine
40.             print(res, '下载完成')
41. 
42. #创建目录
43. os.makedirs(DEST_DIR,exist_ok=True)
44. #获取事件循环
45. lp = asyncio.get_event_loop()
46. start = time.time()
47. #创建一个信号量以防止DDos
48. sem = asyncio.Semaphore(4)
49. lp.run_until_complete(download(sem))
50. end = time.time()
51. lp.close()
52. print('耗时:',end-start)


在写文件那块代码可以使用 run_in_executor 这个函数执行,事件循环中包含一个默认的线程池(ThreadPoolExecutor).
run_in_executor(executor, func, *args) 原型 . executor == None, 使用默认的. 可以自己创建一个;
具体代码:

1. #线程运行的函数
2. def save_file(fd:io.BufferedWriter,chunk):
3.     fd.write(chunk)
4. #获取链接,下载文件
5. async def fetch(session:aiohttp.ClientSession,url:str,path:str,flag:str):
6.     print(flag, ' 开始下载')
7. async with session.get(url) as resp:
8. with open(path,'wb') as fd:
9. while 1:
10.                 chunk = await resp.content.read(8192)
11. if not chunk:
12. break
13.                 lp = asyncio.get_event_loop()
14.                 lp.run_in_executor(None,save_file,fd,chunk)
15. # fd.write(chunk)
16. return flag

上次编辑于: 5/20/2021, 7:26:49 AM