🚀 作者 :“大数据小禅@yopai”

🚀 专栏简介 :本专栏后续将持续更新大模型相关文章,从开发到微调到RAG、多Agent等,个V: 【yopa66】交流,持续分享前沿AI实战。

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬
在这里插入图片描述

一、async/await 到底在解决什么问题

很多人第一次学 asyncioasync/await,会被一堆词绕晕:

异步
协程
事件循环
Task
Future
非阻塞
高并发
ASGI
worker
线程池

其实先别急着背概念。你先记住一句话:

async/await 主要解决的是:程序在“等待”的时候,不要傻站着。

比如一个接口要做这几件事:

查数据库
请求 OpenAI / Claude / Gemini
查 Redis
请求向量数据库
调用一个外部 API
把结果返回给用户

这些事情里面,CPU 真正在计算的时间可能很少,大量时间都浪费在“等”上:

等数据库返回
等大模型返回
等网络返回
等 Redis 返回
等文件读取完成
等第三方服务响应

如果是同步阻塞代码,程序遇到一个等待就卡住:

import time

def get_data():
    time.sleep(3)
    return "data"

这 3 秒里,程序什么都做不了。

但是异步代码的想法是:

我现在要等数据库,那我先让出执行权;
你们其他请求谁能跑,先去跑;
等数据库返回了,再叫我回来继续干活。

这就是 await 最核心的意思。

Python 官方文档对 asyncio 的定义也很直接:它是一个用 async/await 语法编写并发代码的标准库,很多异步 Web 框架、网络库、数据库库都会基于它或兼容它。


二、async/await 和 asyncio 的关系

很多人会把这几个东西混在一起:

async
await
asyncio
协程
事件循环
Task

它们不是一个东西。

你可以这样理解:

async/await 是语法
asyncio 是工具箱
协程是任务本身
事件循环是调度员
Task 是被调度员正式登记的任务

举个生活里的例子。

你开了一家奶茶店:

顾客 A 点奶茶,要等珍珠煮好
顾客 B 点咖啡,要等咖啡机萃取
顾客 C 点果茶,要等水果切好

如果店员很死板:

先等 A 的珍珠煮好
再处理 B
再处理 C

那效率很差。

更聪明的方式是:

A 的珍珠在煮,店员去处理 B
B 的咖啡机在萃取,店员去处理 C
C 的水果在准备,店员再回来看 A 好了没

这就是异步。

在这个例子里:

async def 定义一个可以被暂停的任务
await 表示这里要等一下,可以先让别人执行
asyncio 是整套调度系统
event loop 是那个不停检查“谁能继续干活”的店长
Task 是被店长放进待办列表里的任务

所以:

async def hello():
    return "hello"

这只是定义了一个“可以异步执行的函数”。

调用它:

coro = hello()

并不会马上真正执行,它只是得到一个协程对象。

你要这样才会真正执行:

import asyncio

async def hello():
    return "hello"

async def main():
    result = await hello()
    print(result)

asyncio.run(main())

asyncio.run(main()) 的作用就是启动事件循环,然后让 main() 跑起来。


三、进程、线程、协程分别是怎样的概念?

这一块是面试最爱问的,因为它能看出来你到底懂不懂高并发!

进程:最重,但是隔离最好

进程可以理解成一个独立的程序运行空间。

比如你启动一个 FastAPI 服务:

uvicorn main:app --workers 4

这里的 4 个 worker,通常就是 4 个独立进程。

每个进程都有自己的:

内存
变量
事件循环
数据库连接池
缓存对象
全局变量

所以你在代码里写:

cache = {}

如果你开了 4 个 worker,那不是 4 个 worker 共用一个 cache,而是:

worker 1 有自己的 cache
worker 2 有自己的 cache
worker 3 有自己的 cache
worker 4 有自己的 cache

这点特别重要。

很多新手会踩这个坑:

users = {}

@app.post("/login")
async def login(user_id: str):
    users[user_id] = "online"

如果部署时开了多个 worker,请求 A 可能进 worker 1,请求 B 可能进 worker 3。你以为用户在线状态存在全局变量里,其实不同进程看到的数据不一样。

所以生产环境里,共享状态应该放到:

Redis
数据库
消息队列
外部缓存系统
分布式存储

而不是放 Python 全局变量。

FastAPI 官方部署文档也说明,多进程 worker 的目的之一就是利用多核 CPU,并处理更多请求。


线程:比进程轻,但共享内存,容易出并发问题

线程可以理解成同一个进程里的多个执行分支。

多个线程共享同一个进程里的内存,所以它们可以访问同一个变量:

counter = 0

但也正因为共享变量,所以容易出现问题。

比如两个线程同时改一个变量:

counter += 1

这行代码看起来简单,但底层不是一步完成的。它可能会经历:

读取 counter
加 1
写回 counter

如果两个线程同时干这件事,就可能互相覆盖结果。

这就是线程安全问题。

另外在 CPython 里,还有一个很重要的东西叫 GIL。简单说,GIL 会让同一个 Python 进程里,同一时刻通常只有一个线程执行 Python 字节码。所以 Python 多线程对 CPU 密集型任务帮助有限,更适合 I/O 等待类任务。Python 官方 threading 文档也明确提到,由于 GIL,CPU-bound 任务不能很好地靠线程获得性能提升。([Python documentation][3])

所以线程适合什么?

适合这种:

调用同步 HTTP 接口
调用同步数据库驱动
读写文件
处理一些阻塞 I/O
兼容老代码

不太适合这种:

大规模数学计算
图片压缩
视频转码
大量 Python 循环计算
复杂特征工程

这些 CPU 密集型任务,更适合多进程、任务队列,或者交给专门的计算服务。


协程:最轻,但必须自己在 await 的地方让出控制权

协程可以理解成:

一个函数执行到某个地方,发现要等,就主动暂停一下,让别人先跑。

协程不是操作系统线程,也不是进程。它更像 Python 代码层面的“轻量任务”。

协程切换很便宜,因为它不需要操作系统频繁切线程。它只是在 await 的地方暂停,等结果回来再继续。

比如:

import asyncio

async def task(name):
    print(f"{name} 开始")
    await asyncio.sleep(2)
    print(f"{name} 结束")

async def main():
    await asyncio.gather(
        task("A"),
        task("B"),
        task("C"),
    )

asyncio.run(main())

这段代码大概 2 秒结束,不是 6 秒。

因为 A、B、C 都在等 asyncio.sleep(2),等待期间事件循环可以来回调度它们。

Python 官方文档里也说得很清楚:事件循环是协作式调度,一次运行一个 Task;当一个 Task 在等待 Future 时,事件循环会运行其他 Task。([Python documentation][4])

注意这句话很关键:

asyncio 是并发,不等于 CPU 并行。

一个事件循环通常还是在一个线程里跑。它不是让多个 Python 代码同时在多个 CPU 核心上跑,而是让程序在等待 I/O 的时候更会安排时间。


四、用代码把 async/await 讲透

1. async def 调用后不会马上执行

普通函数是这样:

def hello():
    print("hello")
    return "done"

result = hello()
print(result)

输出:

hello
done

但是协程函数不一样:

async def hello():
    print("hello")
    return "done"

result = hello()
print(result)

你会看到类似:

<coroutine object hello at 0x...>

因为 hello() 只是创建了一个协程对象,还没有真正执行。

要执行它,得交给事件循环:

import asyncio

async def hello():
    print("hello")
    return "done"

async def main():
    result = await hello()
    print(result)

asyncio.run(main())

这里的执行顺序是:

asyncio.run 启动事件循环
main 开始执行
遇到 await hello()
hello 开始执行
hello 返回 done
main 继续执行

2. await 不是“开新线程”,而是“我先等等,别人先跑”

很多人误会 await

result = await fetch_data()

以为这行代码会开一个新线程。

不是。

它真正的意思是:

我要等 fetch_data 的结果;
如果它还没好,我就先暂停;
事件循环你去看看其他任务能不能跑;
等 fetch_data 好了,再回来继续我这段代码。

所以 await 不是魔法,也不是加速器。它只是一个很明确的让出点。


3. 连续 await 不一定是并发

这是面试非常常见的问题。

看这段:

async def main():
    a = await fetch_a()
    b = await fetch_b()
    c = await fetch_c()
    return a, b, c

很多人会说这是并发。

其实通常不是。

它的执行方式更像:

先等 fetch_a 完成
再等 fetch_b 完成
最后等 fetch_c 完成

如果每个请求 2 秒,总时间大概 6 秒。

如果你想让它们并发,要这样写:

async def main():
    a, b, c = await asyncio.gather(
        fetch_a(),
        fetch_b(),
        fetch_c(),
    )
    return a, b, c

这才是:

A、B、C 一起发出去
谁先返回都可以
最后等三个都完成

如果每个请求 2 秒,总时间大概 2 秒多一点。


4. create_task 是把协程真正丢给事件循环调度

比如:

task = asyncio.create_task(fetch_data())

意思是:

事件循环,这个任务你帮我安排一下。

如果只是:

coro = fetch_data()

那它只是一个协程对象,还没正式排队执行。

看一个更完整的例子:

import asyncio

async def fetch_data():
    print("开始请求")
    await asyncio.sleep(2)
    print("请求完成")
    return "data"

async def main():
    task = asyncio.create_task(fetch_data())

    print("我可以先干点别的")
    await asyncio.sleep(1)

    result = await task
    print(result)

asyncio.run(main())

大概流程是:

fetch_data 被创建成 task
它开始执行
遇到 sleep 后暂停
main 继续执行别的事情
最后 await task 拿结果

5. gather 适合批量并发

比如你要同时请求多个模型:

import asyncio

async def call_model(name: str):
    await asyncio.sleep(2)
    return f"{name} result"

async def main():
    results = await asyncio.gather(
        call_model("gpt"),
        call_model("claude"),
        call_model("gemini"),
    )
    print(results)

asyncio.run(main())

这个写法在 AI Agent 开发里非常常见:

同时调用多个模型做投票
同时跑多个工具
同时查多个数据源
同时请求多个检索服务
同时生成多个候选答案

6. 高并发一定要限制并发量

新手很容易这样写:

results = await asyncio.gather(
    *(call_api(item) for item in items)
)

如果 items 只有 10 个,没问题。

如果 items 有 10 万个,这就很危险。

你等于一次性创建 10 万个任务,可能会把这些东西打爆:

本机内存
网络连接
数据库连接池
第三方 API 限流
LLM API rate limit
向量数据库 QPS

更稳的写法是加 Semaphore

import asyncio

sem = asyncio.Semaphore(20)

async def safe_call_api(item):
    async with sem:
        return await call_api(item)

async def main(items):
    results = await asyncio.gather(
        *(safe_call_api(item) for item in items)
    )
    return results

这句话的意思是:

最多同时跑 20 个;
其他任务排队等着;
别一口气把系统冲垮。

实际项目里,这个非常重要。


五、FastAPI 里 async def、def、进程、线程池到底怎么工作

FastAPI 本身是 ASGI 框架,通常跑在 Uvicorn 这种 ASGI server 上。Uvicorn 官方文档也说明它是 Python 的 ASGI Web Server,并支持 HTTP 和 WebSocket。([Uvicorn][5])

你可以把 FastAPI 的执行链路想成这样:

用户请求
  ↓
Nginx / Load Balancer
  ↓
Uvicorn
  ↓
FastAPI
  ↓
你的接口函数
  ↓
数据库 / Redis / LLM / 向量库 / 外部 API

FastAPI 中 async def 适合什么

如果你的接口里用的是异步库,就用 async def

比如:

from fastapi import FastAPI
import httpx

app = FastAPI()

@app.get("/agent/chat")
async def chat():
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://api.example.com/llm",
            json={"prompt": "hello"},
        )
    return response.json()

这里的 httpx.AsyncClient() 是异步客户端,所以你可以 await client.post(...)

这种写法适合:

异步 HTTP 请求
异步数据库
异步 Redis
异步向量数据库
WebSocket
SSE 流式输出
Agent 多工具并发调用

FastAPI 官方文档也建议:如果你调用的第三方库需要使用 await,那你的路径函数就应该用 async def。([FastAPI][6])


FastAPI 中 def 适合什么

如果你用的是同步阻塞库,比如:

requests
pymysql
psycopg2
普通文件处理
某些 SDK 的同步 client

你可以写普通 def

import requests
from fastapi import FastAPI

app = FastAPI()

@app.get("/sync-api")
def sync_api():
    response = requests.get("https://example.com")
    return response.json()

FastAPI 对普通 def 的处理不是直接在事件循环里跑,而是会把它放到线程池里执行,避免阻塞主事件循环。这一点来自 FastAPI/Starlette 的设计,Starlette 文档也说明同步 endpoint、文件处理、后台任务等场景会使用线程池。([Starlette][7])

但要注意:线程池不是无限的。

Starlette 文档里提到默认线程池容量是 40 个 token,也就是同一时间能跑的同步线程任务是有限的。([Starlette][7])

所以如果你大量接口都是同步阻塞代码,高并发下会变成:

请求进来
同步代码进入线程池
线程池满了
后面的请求排队
响应变慢
P95/P99 延迟飙升

最危险的是:在 async def 里面写阻塞代码

比如:

import time
from fastapi import FastAPI

app = FastAPI()

@app.get("/bad")
async def bad():
    time.sleep(5)
    return {"ok": True}

这段代码很糟糕。

为什么?

因为 time.sleep(5) 是阻塞的。

你在 async def 里写了阻塞代码,相当于把事件循环卡住了。事件循环一旦卡住,其他请求也会受影响。

正确写法应该是:

import asyncio
from fastapi import FastAPI

app = FastAPI()

@app.get("/good")
async def good():
    await asyncio.sleep(5)
    return {"ok": True}

再比如:

import requests

@app.get("/bad-llm")
async def bad_llm():
    response = requests.post("https://api.example.com/llm")
    return response.json()

这也不好,因为 requests.post() 是同步阻塞的。

更好的写法:

import httpx

@app.get("/good-llm")
async def good_llm():
    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.post("https://api.example.com/llm")
    return response.json()

如果你必须用同步 SDK,可以先这样兜底:

import asyncio

def call_sync_llm():
    # 这里假设某个 SDK 只有同步接口
    return sync_llm_client.chat("hello")

@app.get("/agent")
async def agent():
    result = await asyncio.to_thread(call_sync_llm)
    return {"result": result}

asyncio.to_thread() 的意思是:

这个同步阻塞函数我没法 await;
那就把它丢到线程里跑;
不要堵住事件循环。

六、FastAPI 高并发不是只写 async 就完事

很多人以为:

FastAPI + async def = 高并发

这句话只对了一半。

真正的高并发要看整条链路。

1. 单个 worker 里靠协程处理大量 I/O

一个 Uvicorn worker 里面通常会跑一个事件循环。

这个事件循环可以同时管理很多请求:

请求 A 在等数据库
请求 B 在等 LLM
请求 C 在等 Redis
请求 D 在等向量数据库
请求 E 在等外部 API

只要这些等待都是异步非阻塞的,事件循环就能不断切换,效率很高。


2. 多个 worker 进程用来利用多核

单个事件循环通常跑在一个线程里,不能指望它把所有 CPU 核心都吃满。

所以生产环境经常开多个 worker:

uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

或者:

fastapi run main.py --workers 4

大概结构是:

worker 1:一个进程,一个事件循环
worker 2:一个进程,一个事件循环
worker 3:一个进程,一个事件循环
worker 4:一个进程,一个事件循环

多个 worker 的好处:

利用多核 CPU
提高吞吐
某个 worker 挂了,其他 worker 还能工作
减少单进程压力

但是 worker 不是越多越好。

如果你开太多 worker,会带来这些问题:

内存占用变高
数据库连接数暴涨
Redis 连接数变多
模型 client 重复初始化
缓存不共享
上下文切换成本上升

3. 数据库连接池经常是瓶颈

比如你部署了:

4 个 worker
每个 worker 一个数据库连接池
每个连接池最大 20 个连接

那总连接数就是:

4 * 20 = 80

如果你改成:

8 个 worker
每个连接池还是 20

总连接数变成:

8 * 20 = 160

数据库可能直接压力爆炸。

所以高并发不是只看 FastAPI,还要看:

worker 数量
数据库连接池大小
Redis 连接池大小
LLM API 限流
向量数据库 QPS
线程池容量
CPU
内存
网络带宽
超时设置
重试策略

4. CPU 密集型任务不要塞进接口里硬跑

比如这个接口:

@app.get("/calculate")
async def calculate():
    total = 0
    for i in range(10 ** 9):
        total += i
    return {"total": total}

这不是异步能救的。

这段代码是 CPU 密集型,它会一直占着 CPU 算。它没有 await,也不会让出控制权。

更合理的方案:

丢到 Celery / RQ / Dramatiq
丢到独立 worker 服务
丢到 ProcessPoolExecutor
用消息队列异步处理
拆成离线任务
让专门的模型推理服务处理

FastAPI 接口只负责:

接收请求
校验参数
提交任务
返回 task_id
让前端轮询或用 WebSocket/SSE 接收结果

七、面试汇总

问:asyncio 是什么?

你可以这样回答:

asyncio 是 Python 标准库里的异步 I/O 框架。它的核心是事件循环,事件循环负责调度协程。协程执行到 await 的时候,如果要等 I/O,就会暂停,把执行权交还给事件循环。这样一个线程就可以同时管理很多正在等待的任务。它适合 I/O 密集型场景,比如网络请求、数据库、Redis、WebSocket、LLM API 调用,但不适合直接加速 CPU 密集型计算。


问:async 和 await 是什么?

可以这样回答:

async def 用来定义协程函数。协程函数调用后不会马上执行,而是返回一个 coroutine 对象。await 用来等待一个可等待对象的结果。等待期间当前协程会暂停,事件循环可以去执行其他任务。结果回来以后,再从暂停的位置继续执行。


问:协程和线程有什么区别?

可以这样回答:

线程是操作系统调度的,协程是用户态由事件循环调度的。线程可能在任何时刻被操作系统切走,所以共享变量要考虑锁和线程安全。协程通常只在 await 的地方主动让出控制权,切换成本更低,更适合大量 I/O 等待任务。但协程不是 CPU 并行,它不能让一个 Python 进程里的 CPU 密集代码自动跑满多个核心。


问:FastAPI 里什么时候用 async def,什么时候用 def?

可以这样回答:

如果接口内部用的是异步库,比如 async 数据库驱动、httpx.AsyncClient、异步 Redis、WebSocket,就用 async def。如果内部调用的是同步阻塞库,比如 requests 或老的同步数据库驱动,可以用普通 def,FastAPI 会把它放进线程池里执行,避免直接堵住事件循环。但线程池容量有限,所以高并发服务里最好逐步换成真正的异步库。


问:为什么 async 能提高高并发?

可以这样回答:

因为大多数 Web 请求不是一直在算 CPU,而是在等数据库、网络、Redis、第三方 API。同步模型下,请求在等待时会占住线程。异步模型下,请求在等待 I/O 时会通过 await 让出控制权,事件循环继续处理其他请求。所以 async 提升的是 I/O 等待场景下的资源利用率,不是让 CPU 计算变快。


问:asyncio 和多进程是什么关系?

可以这样回答:

单个 asyncio 事件循环适合在一个进程里处理大量 I/O 并发。多进程是为了利用多个 CPU 核心和提升服务吞吐。FastAPI 部署时常见做法是多个 worker 进程,每个 worker 里有自己的事件循环。也就是说,进程负责横向扩展,协程负责单进程内的 I/O 并发。


八、AI Agent 开发里的实际应用场景

这一部分很重要,因为现在做 Agent,很容易遇到并发、异步、流式、工具调用、长任务这些问题。

场景 1:Agent 同时调用多个工具

假设用户问:

帮我分析今天特斯拉股价、相关新闻、财报数据,并给出投资风险总结。

Agent 可能要调用这些工具:

股票价格 API
新闻搜索 API
财报数据库
公司公告接口
LLM 总结接口

如果同步执行:

先查股票 2 秒
再查新闻 3 秒
再查财报 2 秒
再查公告 2 秒
最后总结 4 秒

总时间可能 13 秒。

异步并发后:

股票、新闻、财报、公告一起查
等它们都回来
再交给 LLM 总结

代码结构大概是:

async def agent_research(query: str):
    stock_task = asyncio.create_task(get_stock_price("TSLA"))
    news_task = asyncio.create_task(search_news("Tesla"))
    report_task = asyncio.create_task(get_financial_report("TSLA"))
    filing_task = asyncio.create_task(get_company_filings("TSLA"))

    stock, news, report, filing = await asyncio.gather(
        stock_task,
        news_task,
        report_task,
        filing_task,
    )

    final_answer = await call_llm(
        stock=stock,
        news=news,
        report=report,
        filing=filing,
        query=query,
    )

    return final_answer

这个场景非常适合 async。

因为这些工具调用大部分时间都在等网络。


场景 2:RAG 同时查多个知识源

一个企业 Agent 可能要查:

向量数据库
Elasticsearch
PostgreSQL
内部文档系统
历史工单系统
网页搜索

如果一个一个查,速度会很慢。

异步写法:

async def retrieve_context(question: str):
    vector_task = asyncio.create_task(search_vector_db(question))
    es_task = asyncio.create_task(search_elasticsearch(question))
    sql_task = asyncio.create_task(search_postgres(question))
    ticket_task = asyncio.create_task(search_tickets(question))

    vector_docs, es_docs, sql_rows, tickets = await asyncio.gather(
        vector_task,
        es_task,
        sql_task,
        ticket_task,
    )

    return merge_and_rerank(
        vector_docs,
        es_docs,
        sql_rows,
        tickets,
    )

实际业务里很常见:

客服 Agent 查知识库
销售 Agent 查 CRM
研发 Agent 查 GitHub Issue
法务 Agent 查合同库
HR Agent 查制度文档

异步能明显降低总等待时间。


场景 3:Agent 批量处理用户上传的文件

比如用户上传 20 个 PDF,让 Agent 总结。

流程可能是:

读取文件
切分 chunk
生成 embedding
写入向量库
让 LLM 总结

这里有些步骤是 I/O,有些步骤是 CPU。

比较合理的拆法:

文件读取:异步
调用 embedding API:异步并发,但要限流
写入向量库:异步
PDF 解析:如果很重,放线程池或进程池
大规模总结:任务队列

示例:

sem = asyncio.Semaphore(10)

async def embed_chunk(chunk: str):
    async with sem:
        return await call_embedding_api(chunk)

async def process_document(chunks: list[str]):
    embeddings = await asyncio.gather(
        *(embed_chunk(chunk) for chunk in chunks)
    )
    await save_to_vector_db(embeddings)
    return {"status": "done"}

这里一定要加 Semaphore

因为 embedding API 通常有限流。如果你一次性发 1000 个请求,可能马上被限流,甚至账号被封。


场景 4:Agent 调用 LLM 时做超时控制

大模型调用可能会慢,也可能会卡。

实际项目里不能无限等。

你应该加 timeout:

import asyncio

async def safe_call_llm(prompt: str):
    try:
        return await asyncio.wait_for(
            call_llm(prompt),
            timeout=30,
        )
    except asyncio.TimeoutError:
        return "模型响应超时,请稍后重试"

Agent 系统里,超时很重要。

否则一个 LLM 请求卡住,会导致:

接口迟迟不返回
连接一直占着
用户体验很差
worker 资源被拖住
后续请求排队

更成熟的做法是:

LLM 调用设置 timeout
工具调用设置 timeout
数据库查询设置 timeout
外部 API 设置 timeout
失败后做降级
必要时返回部分结果

场景 5:Agent 多模型投票

有些 Agent 会同时调用多个模型:

GPT
Claude
Gemini
本地模型

然后让它们分别给答案,再做投票或融合。

异步非常适合这个场景:

async def multi_model_vote(question: str):
    results = await asyncio.gather(
        call_gpt(question),
        call_claude(question),
        call_gemini(question),
        return_exceptions=True,
    )

    valid_results = [
        result for result in results
        if not isinstance(result, Exception)
    ]

    return await judge_and_merge(valid_results)

这里的 return_exceptions=True 很实用。

它的意思是:

某个模型失败了,不要让整个 gather 直接炸掉;
把异常当成结果返回;
我后面自己判断怎么处理。

在 Agent 系统里,外部依赖很多,失败是常态。

所以不能写得太脆。


场景 6:Agent 流式输出

比如 ChatGPT 这种效果:

模型一边生成,前端一边显示

这通常会用:

Streaming Response
SSE
WebSocket

FastAPI 里可以这样做:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def token_generator():
    tokens = ["你好", ",", "我是", "Agent", "。"]
    for token in tokens:
        await asyncio.sleep(0.5)
        yield token

@app.get("/stream")
async def stream():
    return StreamingResponse(
        token_generator(),
        media_type="text/plain",
    )

真实 Agent 里,流式输出很重要:

用户不用等完整答案
前端体验更好
长任务看起来不那么卡
模型边生成边展示

但是要注意:

流式连接会占用连接资源
超时要处理
客户端断开要处理
异常要处理
不要在流式生成里写阻塞代码

场景 7:Agent 后台执行长任务

有些任务不应该让用户一直等 HTTP 响应。

比如:

分析 100 个网页
总结 50 个 PDF
生成一份市场报告
批量跑 1000 条数据
做复杂代码审查
生成长文档

更合理的模式是:

用户提交任务
接口快速返回 task_id
后台 worker 慢慢处理
前端轮询任务状态
或者用 WebSocket/SSE 推送进度

简单版本可以用 FastAPI BackgroundTasks,但它更适合轻量任务。FastAPI 文档说明 BackgroundTasks 适合在返回响应后执行一些操作。([FastAPI][8])

更重的生产任务,建议用:

Celery
RQ
Dramatiq
Arq
Kafka + worker
Redis Stream
云函数
独立任务服务

示意代码:

@app.post("/agent/report")
async def create_report(request: ReportRequest):
    task_id = await submit_report_task(request)
    return {"task_id": task_id, "status": "submitted"}

@app.get("/agent/report/{task_id}")
async def get_report_status(task_id: str):
    status = await get_task_status(task_id)
    return status

这种架构更稳。


场景 8:Agent 工具调用需要限流

假设一个 Agent 可以查网页。

用户问:

帮我调研 100 家 AI 公司,整理融资信息、产品定位、创始团队。

如果 Agent 同时发 1000 个请求:

搜索 API 会限流
目标网站会拒绝
本机连接数会爆
代理 IP 会被封
任务失败率会很高

应该这么做:

search_sem = asyncio.Semaphore(5)
fetch_sem = asyncio.Semaphore(20)

async def safe_search(query: str):
    async with search_sem:
        return await search_api(query)

async def safe_fetch(url: str):
    async with fetch_sem:
        return await fetch_page(url)

实际项目里,不同资源要有不同并发限制:

LLM API:并发 5
Embedding API:并发 20
搜索 API:并发 3
网页抓取:并发 30
数据库写入:并发 10
向量库写入:批量写

这才是工程化 Agent。


场景 9:Agent 记忆系统异步写入

比如聊天 Agent 每次对话后,要写入:

短期记忆
长期记忆
用户画像
向量数据库
行为日志
审计日志

这些不一定都要阻塞主响应。

可以这样设计:

主流程先返回答案
记忆写入异步执行
失败了记录日志
必要时重试

示例:

@app.post("/chat")
async def chat(request: ChatRequest):
    answer = await run_agent(request.message)

    asyncio.create_task(
        save_memory_async(
            user_id=request.user_id,
            message=request.message,
            answer=answer,
        )
    )

    return {"answer": answer}

但这里要小心。

create_task 创建的后台任务如果没有管理好,进程重启时可能丢任务。所以生产环境里,重要任务不要只靠 create_task,更建议丢到队列:

Redis Queue
Celery
Kafka
RabbitMQ
云消息队列

简单任务可以 create_task,关键任务要进队列。


场景 10:Agent 编排多个步骤

一个复杂 Agent 可能是这样的:

理解用户问题
判断是否需要工具
并发调用工具
整理工具结果
再次调用 LLM
判断答案是否可信
必要时继续查
最终返回

伪代码:

async def run_agent(user_input: str):
    plan = await make_plan(user_input)

    tool_tasks = []

    for tool_call in plan.tool_calls:
        if tool_call.name == "search":
            tool_tasks.append(search_web(tool_call.query))
        elif tool_call.name == "db":
            tool_tasks.append(query_database(tool_call.sql))
        elif tool_call.name == "vector":
            tool_tasks.append(search_vector_db(tool_call.query))

    tool_results = await asyncio.gather(
        *tool_tasks,
        return_exceptions=True,
    )

    cleaned_results = handle_tool_errors(tool_results)

    final_answer = await generate_answer(
        user_input=user_input,
        context=cleaned_results,
    )

    return final_answer

这就是 Agent 和 asyncio 非常匹配的地方:

Agent 本质上经常要等很多外部工具
asyncio 正好擅长管理大量等待

九、实际开发时怎么选方案

你可以按这个判断。

适合 async/await 的

调用 LLM API
调用 embedding API
调用多个工具
RAG 检索
HTTP 请求
WebSocket
SSE 流式输出
异步数据库
异步 Redis
异步向量数据库
高并发聊天接口
大量网络 I/O

适合线程池的

某个 SDK 只有同步版本
同步 requests
同步数据库驱动
轻量文件读写
老项目同步代码迁移
短时间阻塞任务

适合多进程或任务队列的

PDF 大批量解析
图片处理
视频处理
本地模型推理
复杂数据清洗
大规模报表生成
代码静态分析
CPU 密集型计算
长时间 Agent 任务

适合消息队列的

任务不能丢
任务很慢
需要重试
需要记录状态
需要分布式 worker
用户不需要立即拿到结果
任务执行时间超过几十秒

十、常见坑总结

第一,async 不是加速按钮。

async def cpu_work():
    for i in range(10 ** 9):
        ...

这种 CPU 计算不会因为你加了 async 就变快。

第二,await 不代表并发。

await a()
await b()
await c()

这通常还是顺序执行。

第三,async def 里不要写阻塞代码。

time.sleep()
requests.get()
同步数据库查询
重 CPU 循环

这些都可能卡住事件循环。

第四,任务并发要限流。

asyncio.gather(*(task(x) for x in huge_list))

数据量大时很危险。

第五,多 worker 下不要依赖本地全局变量共享状态。

cache = {}

多进程时每个 worker 都有自己的 cache。

第六,数据库连接池要算总量。

总连接数 = worker 数量 * 每个 worker 的连接池大小

第七,Agent 里的外部依赖一定要考虑失败。

LLM 超时
工具失败
搜索 API 限流
向量库异常
数据库慢查询
用户断开连接

Agent 系统不是写通就完事,真正难的是稳定性。


小结

Python 的 async/await 解决的不是“让 CPU 算得更快”,而是“让程序在等待 I/O 的时候别闲着”。async def 定义协程,await 在等待时让出控制权,asyncio 的事件循环负责调度这些协程。FastAPI 里,async def 适合异步 I/O,普通 def 会进入线程池,多个 worker 进程用来利用多核和提高吞吐。真正的高并发不是只写 async,而是要同时考虑阻塞代码、线程池、数据库连接池、worker 数量、外部 API 限流、超时、重试和任务队列。

放到 AI Agent 开发里,async/await 非常重要,因为 Agent 经常要同时调用 LLM、搜索、数据库、向量库、工具 API、记忆系统和日志系统。这些操作大部分都是 I/O 等待,非常适合异步并发。但如果任务是 PDF 重解析、图片处理、本地模型推理、复杂计算,就不能指望 asyncio 解决,应该交给多进程、任务队列或独立计算服务。

等网络、等数据库、等 LLM,用 async;吃 CPU、跑重任务,用进程或队列;同步老库别硬塞进 async,必要时放线程池。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐