免费编程软件「python+pycharm」
链接:https://pan.quark.cn/s/48a86be2fdc0
一个让我加班到凌晨的故事
先跟你说个真事。
上个月,我接了个任务:写一个爬虫,要爬取一万个网页。每个网页里又包含几十个图片链接,需要把这些图片也下载下来。
这活儿听着简单对吧?用requests循环一万次,里面再循环几十次,完事。
但我不能这么干。一万个网页,每个网页几十张图,加起来几十万次网络请求。要是同步地一个一个等,估计跑完得等到下个版本上线。
所以我用了异步。asyncio、aiohttp都安排上了。
代码写完了,一跑——慢得要命。跟同步差不多,完全没有体现出异步的优势。
我懵了。
折腾了一整个晚上,翻了无数篇帖子,最后发现原因就藏在一个我完全没注意到的细节里。
今天我就把这个坑完完整整地讲给你听。保证你听完之后,不光知道怎么避坑,还能真正理解异步循环嵌套到底是怎么回事。
先搭个场景
我们做一个小例子。假设你要从三个网站上抓数据,每个网站需要先请求page接口(耗时1秒),然后再根据返回的结果请求detail接口(也耗时1秒)。
同步写法很简单:
import time def fetch_page(site): time.sleep(1) # 模拟网络请求 return f"{site} 的数据" def fetch_detail(site): time.sleep(1) return f"{site} 的详细信息" def main(): sites = ["site_a", "site_b", "site_c"] for site in sites: page = fetch_page(site) detail = fetch_detail(site) print(page, detail) start = time.time() main() print(f"耗时: {time.time() - start:.2f}秒")跑一下,耗时大概6秒。每个站点2秒,三个站点就是6秒。这没问题。
异步版本呢?理想情况下,三个站点的请求可以同时进行,总共只需要2秒左右。
我们来写一个异步版本:
import asyncio async def fetch_page(site): await asyncio.sleep(1) return f"{site} 的数据" async def fetch_detail(site): await asyncio.sleep(1) return f"{site} 的详细信息" async def process_site(site): page = await fetch_page(site) detail = await fetch_detail(site) return page, detail async def main(): sites = ["site_a", "site_b", "site_c"] tasks = [process_site(site) for site in sites] results = await asyncio.gather(*tasks) for result in results: print(result) start = time.time() asyncio.run(main()) print(f"耗时: {time.time() - start:.2f}秒")这个版本耗时多少?2秒左右。完美。
这个例子看起来很简单,对吧?但就是在这个基础上,稍微嵌套一层循环,问题就来了。
我的真实代码长这样
我当时的代码大概是这个结构:
async def fetch_page(site, page_num): await asyncio.sleep(0.1) # 模拟请求 return f"{site} 第{page_num}页的数据" async def fetch_images(page_data): await asyncio.sleep(0.05) # 模拟请求图片 return [f"image_{i}" for i in range(3)] async def process_site(site): all_images = [] # 外层循环:这个站点的每一页 for page_num in range(1, 11): # 假设每个站点10页 page_data = await fetch_page(site, page_num) # 内层循环:这一页的每一张图片 images = await fetch_images(page_data) all_images.extend(images) return all_images async def main(): sites = ["site_a", "site_b", "site_c"] tasks = [process_site(site) for site in sites] results = await asyncio.gather(*tasks)乍一看没问题啊?外层循环是每个站点,内层循环是每个站点里的每一页,每页里的图片又是异步请求的。这不挺好的吗?
但跑起来发现,三个站点之间确实是并发的,但每个站点内部的10页是顺序执行的——先请求第1页,等返回了,再请求第1页的图片,然后才能开始第2页,再等图片,再第3页……
这就相当于:三个站点各自排成一队,一页一页地处理。完全没有利用到“一页里的多张图片可以同时下”这个优化机会。
更糟糕的是,如果我每个站点有100页,每页有50张图,那这个顺序执行的问题会被放大100倍。
我当时愣是没看出来问题在哪。直到我在纸上把执行顺序画出来。
画出执行顺序你就懂了
咱们用手画一下这个执行过程。假设只有两个站点,每个站点只有两页,每页两张图。
我当时的代码执行顺序是这样的:
站点A:请求第1页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图 站点A:请求第2页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图 站点B:请求第1页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图 站点B:请求第2页 → 等待 → 拿到数据 → 请求图1 → 等待 → 请求图2 → 等待 → 存图每个“等待”的位置,CPU其实都闲着,但程序就是不去做别的事,非得等这个请求回来。
这就是问题核心:**await会挂起当前这个异步函数,但它只挂起自己,不会影响到同一层级的其他任务**。
等等,这句话有点绕。我用大白话再说一遍:
当你在一个异步函数里写await something(),这个函数就会停在这里,等something()完成。但这不意味着整个程序都停了——程序可以去执行别的异步任务,比如另一个站点的任务。
所以在我上面的代码里,process_site('site_a')这个任务在等待第1页返回的时候,程序确实可以去处理process_site('site_b')。这一点是好的,所以三个站点之间是并发的。
问题在于:在同一个process_site任务内部,for循环里的每一次await都会让这个任务停下来,直到这次请求完成,才会进入下一次循环。内层循环也是这样。
所以每个站点内部,所有请求是串行的。
正确的做法是什么?
如果你想让一个站点内部的多个请求也并发执行,你需要把那些独立的请求批量收集起来,然后用asyncio.gather或asyncio.wait一次性发出去。
拿图片下载来说,正确的做法应该是:先把这个页面的所有图片链接收集好,然后一次性创建所有图片的异步任务,同时等待它们全部完成。
代码大概是这样的:
async def process_site_correct(site): all_images = [] for page_num in range(1, 11): page_data = await fetch_page(site, page_num) # 先获取这一页的所有图片链接 image_urls = extract_image_urls(page_data) # 关键在这里:一次性创建所有图片任务,并发执行 image_tasks = [fetch_image(url) for url in image_urls] images = await asyncio.gather(*image_tasks) all_images.extend(images) return all_images这样改完之后,执行顺序就变成了:
站点A:请求第1页 → 等待 (等待期间,站点B可以做自己的事) 第1页返回 → 同时请求该页的所有图片(假设10张图同时发请求) 等待所有图片返回 → 然后继续第2页图片下载这部分就从串行变成了并发。
但这里还有一个优化空间:页面请求本身能不能也并发?比如一个站点有10页,我可不可以同时请求这10页?
可以。但要注意:同时请求10页可能会对目标服务器造成压力,也可能导致你自己的网络连接数爆掉。合理控制并发数是个单独的话题,今天不展开。
更隐蔽的坑:嵌套循环里的 gather
再深入一层。假设我每个站点的每一页,返回的数据里包含的不只是图片链接,还有另外的 API 需要调用(比如每个图片需要额外请求一个评论接口)。
这时候代码可能变成这样:
async def fetch_image_with_comments(image_url): image_data = await fetch_image(image_url) comments = await fetch_comments(image_url) return {"image": image_data, "comments": comments} async def process_page(page_num): page_data = await fetch_page(page_num) image_urls = extract_urls(page_data) # 这里看起来是并发的 tasks = [fetch_image_with_comments(url) for url in image_urls] results = await asyncio.gather(*tasks) return results这个看起来没问题吧?每个fetch_image_with_comments内部其实是串行的(先等图,再等评论),但不同图片之间是并发的。
这已经很好了。
但如果你写出这样的代码:
# 错误示范 async def fetch_image_with_comments_wrong(image_url): # 里面又套了一层循环?或者又用了 gather 但忘了 await? tasks = [fetch_image(image_url), fetch_comments(image_url)] # 这里没有 await,返回的是一个协程对象,不是结果 return asyncio.gather(*tasks) # 注意:这里没有 await你会在某个地方发现结果不对,或者更糟——程序根本没执行这些请求,因为你返回的是一个还没被调度的协程对象。
这属于另一个经典错误:asyncio.gather返回的是一个 awaitable 对象,你必须await它,或者用asyncio.run去跑,否则它不会真正执行。
调试方法:打日志看时间
如果你不确定自己的异步代码是不是真的并发了,最简单的办法就是打时间戳。
import time async def fetch_with_log(name, delay): start = time.time() print(f"[{start:.3f}] 开始 {name}") await asyncio.sleep(delay) end = time.time() print(f"[{end:.3f}] 结束 {name},耗时 {end-start:.2f}秒") return name async def test_serial(): print("串行版本:") for i in range(3): await fetch_with_log(f"任务{i}", 0.5) async def test_concurrent(): print("并发版本:") tasks = [fetch_with_log(f"任务{i}", 0.5) for i in range(3)] await asyncio.gather(*tasks) # 跑一下你就看到区别了 # 串行:开始时间依次相差0.5秒 # 并发:三个任务的开始时间几乎相同这个技巧我用了无数遍。每当你怀疑某个地方的循环是不是串行的,就把里面的关键操作加上日志,看看开始时间是不是挤在一起的。
如果开始时间是连成一串的,那就是串行。如果几乎同时打印出来,那就是并发。
我后来总结的几条简单规则
经过那次加班之后,我给自己定了几条规则,你可以参考:
规则1:看见await在循环里,就要警惕
for循环里面如果直接await一个异步函数,那这个循环一定是串行的。除非你就是想要串行,否则要考虑改成先收集任务再gather。
规则2:搞清楚“谁和谁可以并发”
不同站点之间:可以并发
同一个站点的不同页面:如果服务器扛得住,可以并发
同一个页面里的不同图片:可以并发
同一张图片的下载和评论请求:一般不能并发(因为有依赖关系)
规则3:gather不是万能的,它只是“同时等待”
很多人以为用了gather就自动并发了。其实gather做的事情很简单:把你传给它的多个协程任务同时调度起来,然后等待它们全部完成。但前提是这些任务本身要独立。
如果你传给gather的是一堆[fetch_page(1), fetch_page(2), fetch_page(3)],这三个请求会同时发出去,很好。
但如果你传给gather的是一堆[process_page(1), process_page(2), process_page(3)],而每个process_page内部又是串行的,那gather也救不了你。
规则4:异步不是自动并行
这是最容易被误解的一点。async/await给你的只是“在等待的时候不阻塞”,而不是“自动把循环拆成多线程”。并发需要你显式地用gather、create_task、wait等工具来组织。
回到我那个爬虫
最后我的爬虫改成了这样:
async def process_site_optimized(site): # 先获取这个站点所有需要抓的页面列表 page_tasks = [fetch_page(site, page_num) for page_num in range(1, 101)] # 同时请求所有页面(限制并发数,用 semaphore) pages_data = await limited_gather(page_tasks, max_concurrent=10) # 收集所有图片 URL all_image_tasks = [] for page_data in pages_data: image_urls = extract_image_urls(page_data) all_image_tasks.extend([fetch_image(url) for url in image_urls]) # 同时下载所有图片(同样限制并发) images = await limited_gather(all_image_tasks, max_concurrent=20) return images这里的limited_gather是自己写的一个包装,用asyncio.Semaphore控制同时进行的请求数量。这样既利用了异步并发的优势,又不会把服务器打爆或者把自己的连接池耗尽。
改完之后,原来要跑20分钟的活儿,1分多钟就跑完了。
最后说几句
异步编程的难点不在于async/await这两个关键字,而在于思维的转换。
在同步编程里,你写for循环,脑子里想的是“一个一个来”。在异步编程里,你需要想的是“哪些事情可以同时做,哪些事情必须等”。
当你看到嵌套循环的时候,不要急着写代码。先在纸上画一下:外层循环的每一次迭代,是否依赖上一次的结果?内层循环的每一次迭代,是否互相依赖?
如果不依赖,那它们就可以并发。并发的方式就是先把所有任务收集到一个列表里,然后一次性await gather。
就这么简单。
但就是这么简单的事情,我当时愣是想了一整个晚上。
希望你看完这篇文章之后,不用再像我一样加班到凌晨了。