多 Agent系统的编排工程实践:LangGraph vs AutoGen vs 自研调度器,我们踩的 5 个生产坑

一个 AI Agent 是个功能,五十个 Agent 是个分布式系统问题,但没人讨论后者。
去年我们把团队的智能客服系统从单 Agent 升级到多 Agent 协作架构,前后花了三个月,踩了无数坑,中途还把框架从 AutoGen 0.x 换成了 LangGraph,最后部分场景还是选择了自研调度器。这三个月里最痛的体验不是代码写不出来,而是每次以为搞定了,又冒出一个新的问题:成本超预算了、某个 Agent 在循环了、出了问题不知道是哪一步的锅。
这篇文章不是框架教程——GitHub 上那些要多少有多少。我想写的是:在真实生产环境里,三种方案各自在哪个节点上让你欲哭无泪,以及我们是怎么做决策的。
先说结论:三个框架各有一个「致命缺陷」
在进入细节之前,我们先把结论摆出来,避免你花了 2 小时读完文章再发现方向不对:
- AutoGen:对话循环的 token 消耗以二次方增长,没有终止保护就敢上生产等于慢性失血;加上 0.x → Microsoft Agent Framework 的迁移压力,新项目不建议从它开始。
- LangGraph:图结构的 StateGraph 需要 10-14 个工程日才能熟练,初期投入高;State schema 一旦上线就很难改动,早期设计错误代价极高。
- 自研调度器:灵活性最高,但 observability 几乎为零,调试成本是其他方案的 3-5 倍;没有 30+ 工程日的投入不要轻易尝试。
没有银弹。我们最终的选择是:核心生产 pipeline 用 LangGraph,超长异步任务用自研队列,新功能快速验证用 AutoGen 原型但不上生产。
为什么多 Agent 系统的工程复杂度远超预期
多数团队进入多 Agent 架构的方式是这样的:先做出一个表现不错的单 Agent,某天发现任务复杂到单 Agent 撑不住,于是拆解角色、引入协作,以为多几个 Agent 只是量变。
错了。
单 Agent 的核心问题是「智能够不够」。多 Agent 系统的核心问题是「分布式系统的一致性、调度、状态同步和故障恢复」,跟你熟悉的微服务架构本质上是同一类问题,但 LLM 调用的非确定性让每个环节的调试成本都翻了几倍。
我们在升级过程中遭遇的核心挑战:
- 状态传播:谁负责维护全局上下文?每个 Agent 各存一份还是集中管理?
- 调用链追踪:某个 Agent 给出了错误答案,如何定位是哪个上游 Agent 的输入有问题?
- 成本爆炸:多 Agent 对话循环导致 token 消耗以二次方增长。
- 故障恢复:Agent B 调用超时,整个 pipeline 如何重试?已完成的步骤要不要重跑?
- 终止条件:没有硬性限制时,Agent 会无限循环寻找「更好」的答案。
在这个背景下,我们依次评估并实践了三种方案。
另外还有一个很多团队低估的问题:多 Agent 系统的错误率是单 Agent 的指数级。假设每个 Agent 的准确率是 90%,一个 5 个 Agent 串行的 pipeline 整体准确率只有 0.9^5 = 59%。这意味着你在引入多 Agent 之前必须先想清楚:哪些步骤真的需要 Agent 而不是确定性函数?
我们的经验是:把多 Agent 链路中每个「Agent」的调用都分类:
- LLM 必须的:需要语义理解、生成或推理的步骤
- 可以是普通函数的:数据转换、格式化、条件判断
- 可以是工具调用的:API 请求、数据库查询、文件操作
把第二、三类从 Agent 链路里拿出来,变成普通函数或工具,既降低错误率,又节省大量 token 成本。
方案一:AutoGen(0.x 时代,2025 年 Q1)
为什么最初选它
AutoGen 是 某海外知名研究机构 出品,文档齐全,GitHub Star 数高,当时社区案例最多,上手极快。我们两天就搭出了原型。
核心模型是「对话式多智能体」——Agent 之间互发消息,通过 ConversableAgent 互相协作,天然支持角色扮演场景。
import autogen
# 定义两个 Agent
researcher = autogen.AssistantAgent(
name="Researcher",
system_message="你负责检索和分析信息,给出有据可查的研究结论。",
llm_config={"model": "DeepSeek-V3", "temperature": 0}
)
writer = autogen.AssistantAgent(
name="Writer",
system_message="你负责把研究结论整理成清晰的用户回复。",
llm_config={"model": "DeepSeek-V3", "temperature": 0.3}
)
user_proxy = autogen.UserProxyAgent(
name="UserProxy",
human_input_mode="NEVER", # 生产环境自动运行
max_consecutive_auto_reply=10,
is_termination_msg=lambda x: "TASK_COMPLETE" in x.get("content", "")
)
# 启动对话
user_proxy.initiate_chat(researcher, message="分析最近一周的用户投诉趋势")
坑 #1:对话历史不裁剪,token 消耗爆炸
AutoGen 的对话循环默认把所有历史消息都发给每个 Agent。
这意味着:
- 第 1 轮:Researcher 消耗 500 token
- 第 5 轮:每个 Agent 调用都携带前 4 轮完整历史,消耗变成 ~2500 token
- 第 10 轮:~5000 token
以二次方增长。一次看似简单的多 Agent 分析任务,最终账单让我们惊出一身冷汗。
# AutoGen 默认行为:每次调用都带完整历史
# [system] + [user_turn_1] + [assistant_turn_1] + ... + [user_turn_n]
# 随着对话深入,成本 = O(n²)
# 缓解方案:强制 max_consecutive_auto_reply + 自定义消息裁剪
def trim_messages(messages, max_tokens=3000):
"""保留最近 N 条消息,保证不超 token 预算"""
trimmed = []
total = 0
for msg in reversed(messages):
msg_tokens = estimate_tokens(msg["content"])
if total + msg_tokens > max_tokens:
break
trimmed.insert(0, msg)
total += msg_tokens
return trimmed
但这是补丁,不是架构级解法。
坑 #2:终止条件设计不当导致无限循环
is_termination_msg 是基于字符串匹配的软性终止。如果 Agent 没有生成预期的结束信号(LLM 输出是概率性的),对话会一直继续。
我们遇到过 Researcher Agent 陷入「我需要更多信息 → 搜索 → 还不够 → 继续搜索」的死循环,max_consecutive_auto_reply=10 触发之前已经烧了 $3。
正确姿势:硬性 + 软性双保险
user_proxy = autogen.UserProxyAgent(
name="UserProxy",
human_input_mode="NEVER",
max_consecutive_auto_reply=5, # 硬性上限:最多 5 轮自动回复
is_termination_msg=lambda x: (
"TASK_COMPLETE" in x.get("content", "") or
"无法完成" in x.get("content", "") or
len(x.get("content", "")) < 10 # 空回复也视为终止
)
)
AutoGen 2026 年现状:迁移警告
值得特别注意的是:AutoGen 0.x/0.4.x 正在向 Microsoft Agent Framework 迁移(官方迁移指南发布于 2026 年 4 月)。如果你现在新建项目选择 AutoGen,要做好未来迁移成本的心理准备。
AutoGen 适合谁:Microsoft 技术栈(国产大模型 API)、对话式多智能体原型、短期项目。不适合需要长期维护的生产系统。
补充一点:AutoGen 0.4.x 引入了 AgentChat API,相比 0.x 在会话管理上有所改进,也支持更细粒度的 termination_condition 配置。但架构上的对话全量历史问题并没有根本性解决,需要手动配置 SelectorGroupChat 加上 MaxMessageTermination:
# AutoGen 0.4.x 更好的终止配置
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
termination = MaxMessageTermination(max_messages=10) | TextMentionTermination("TASK_COMPLETE")
team = RoundRobinGroupChat(
[researcher, writer, reviewer],
termination_condition=termination,
max_turns=10 # 额外的硬上限
)
# 运行(异步)
result = await team.run(task="分析用户投诉趋势")
这比 0.x 的方式更可控,但如果你已经在用 AutoGen 0.4,建议关注 Microsoft Agent Framework 的迁移时间线,早做准备。
方案二:LangGraph(2025 年 Q3 迁移)
我们在 AutoGen 上撑了半年,问题越积越多,最终决定迁移到 LangGraph。
LangGraph 的核心心智模型
LangGraph 把 Agent 工作流建模为有向图(StateGraph),每个节点是一个处理单元,状态(State)在节点间流转,边(Edge)决定执行路径。
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator
# 定义全局状态 schema
class AgentState(TypedDict):
messages: Annotated[List[dict], operator.add] # 消息追加
task: str
research_result: str
draft: str
review_passed: bool
iteration: int
# 创建图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("researcher", research_agent)
workflow.add_node("writer", writing_agent)
workflow.add_node("reviewer", review_agent)
# 添加边(条件路由)
workflow.add_conditional_edges(
"reviewer",
lambda state: "writer" if not state["review_passed"] and state["iteration"] < 3 else END,
{
"writer": "writer",
END: END
}
)
workflow.set_entry_point("researcher")
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", "reviewer")
# 编译(带 checkpointing)
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("agents.db")
app = workflow.compile(checkpointer=checkpointer)
为什么图结构解决了核心问题
状态传播清晰:AgentState 是显式的、类型化的共享状态,每个节点只修改自己负责的字段,不会误改其他 Agent 的数据。相比 AutoGen 的全量对话历史传递,这个设计让每个 Agent 的职责边界一目了然,也让新加入项目的工程师能快速理解整个系统的数据流,减少了不少「这个字段是哪个 Agent 写进来的?」的困惑。
token 成本可控:LangGraph 的节点之间传递的是状态 delta,不是完整对话历史。每个节点的 LLM 调用只接收它需要的字段,token 消耗是线性的。从工程角度看,这类似于微服务里的 event-driven delta 更新代替全量同步,避免了 AutoGen 式的二次方成本增长。
一个 5 节点 Agent 工作流,LangGraph 的 token 消耗大约是 CrewAI 的 1/3,是原始 AutoGen 对话模式的 1/5(这是我们自己在相同任务上的测量数据,任务:分析 100 条用户反馈并分类)。
坑 #3:StateGraph 调试需要专门工具,裸 print 不够
图结构的另一面是:当工作流出错时,你不能简单 print 每一步输出,因为节点是并发执行的,日志会乱序。
正确做法:
# 方案 A:用 LangSmith(推荐生产环境)
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-production"
# 方案 B:用 stream 逐步观察状态变化
config = {"configurable": {"thread_id": "debug-run-001"}}
for step in app.stream(initial_state, config):
for node_name, output in step.items():
print(f"节点 [{node_name}] 输出: {output}")
# 逐节点检查,比 print 有用得多
checkpointing 的真正价值在生产里才体现出来:
# 任务中途失败?从最后一个 checkpoint 恢复,不用从头重跑
config = {"configurable": {"thread_id": "task-20260613-001"}}
# 恢复执行
result = app.invoke(None, config) # None = 从 checkpoint 加载状态
# 人工审核介入(Human-in-the-Loop)
# 图执行到需要审批的节点时自动暂停,等待人工确认后恢复
这个能力在我们的场景里价值巨大:一个分析任务跑到第 8 个 Agent 时 LLM 超时,AutoGen 时代需要从头重跑,LangGraph 时代直接从第 8 个 checkpoint 恢复,节省了 80% 的重试成本。
从 Aerospike 的 LangGraph 生产基准测试数据来看,同一个 5 Agent 工作流(100 次重复执行),LangGraph 的完成时间是 CrewAI 的不到一半。关键原因是 LangGraph 的状态 delta 传递机制:每次节点调用只携带必要的字段,而不是整个对话历史。
生产部署注意事项:LangGraph Server(官方托管版)和自托管版在 checkpointing 存储后端上有差异:
# 开发/测试:内存 checkpointer(重启后丢失)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
# 生产推荐:PostgreSQL(持久化,支持横向扩展)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost:5432/agents"
)
# 或 Redis(高吞吐量场景)
from langgraph.checkpoint.redis import RedisSaver
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
app = workflow.compile(checkpointer=checkpointer)
不要在生产环境使用 MemorySaver——进程重启后所有 checkpoint 消失,等于回到 AutoGen 时代的困境。
坑 #4:图状态设计错了,后期改动成本极高
这是我们付出最大代价的坑。LangGraph 的 AgentState 定义一旦上线,修改 schema 需要同步修改所有相关节点,并处理历史 checkpoint 的兼容性。
我们在上线三周后想给 State 加一个 context_window 字段,结果:
- 7 个节点需要修改
- 历史 checkpoint 反序列化报错
- 不得不做一次带数据迁移的 hotfix
State 设计原则(血泪经验):
还有一个不那么明显的坑:Annotated[List[dict], operator.add] 的追加语义在节点并发执行时会导致竞争条件。如果你有并行节点(add_node 后设置为 parallel),要么改用 Annotated[List[dict], lambda a, b: a + b],要么确保并行节点不操作同一个 list 字段。
# ❌ 错误:字段太细,后期难以扩展
class AgentState(TypedDict):
user_query: str
search_keyword_1: str
search_keyword_2: str
search_result_1: str
search_result_2: str
# ✅ 正确:用 dict 做扩展点,避免 schema 频繁变更
class AgentState(TypedDict):
messages: Annotated[List[dict], operator.add]
task: str
metadata: dict # 灵活扩展字段,不改 schema
artifacts: dict # 各 Agent 产物,key = agent_name
control: dict # 控制信息:iteration, max_iterations, status
# 示例:artifacts 字段使用
state["artifacts"]["researcher"] = {
"sources": [...],
"summary": "...",
"confidence": 0.85
}
state["artifacts"]["writer"] = {
"draft": "...",
"word_count": 1200
}
方案三:自研调度器(部分场景)
LangGraph 非常好,但它并不适合所有场景。我们最终在以下场景选择了自研调度器:
- 超长任务(>30 分钟):LangGraph 的 checkpoint 在内存中,进程重启后需要额外的持久化配置
- 动态 Agent 数量:某些任务需要根据输入动态生成 N 个并发 Agent,图结构的静态定义不够灵活
- 与现有任务队列集成:我们已经有 Celery + Redis 基础设施,不想再引入 LangGraph Server
自研调度器的核心实现:
import asyncio
import json
from datetime import datetime
from redis import asyncio as aioredis
class AgentTaskQueue:
"""基于 Redis 的 Agent 任务调度器"""
def __init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url)
async def submit_task(
self,
task_id: str,
agent_chain: list[str], # ["researcher", "writer", "reviewer"]
payload: dict,
max_retries: int = 3
) -> str:
task = {
"task_id": task_id,
"agent_chain": agent_chain,
"current_step": 0,
"payload": payload,
"results": {},
"status": "pending",
"created_at": datetime.now().isoformat(),
"max_retries": max_retries,
"retry_count": 0
}
# 持久化到 Redis
await self.redis.set(f"task:{task_id}", json.dumps(task), ex=3600)
await self.redis.lpush("task_queue", task_id)
return task_id
async def advance_task(self, task_id: str, step_result: dict):
"""某个 Agent 完成后,推进任务到下一步"""
raw = await self.redis.get(f"task:{task_id}")
task = json.loads(raw)
current_agent = task["agent_chain"][task["current_step"]]
task["results"][current_agent] = step_result
task["current_step"] += 1
if task["current_step"] >= len(task["agent_chain"]):
task["status"] = "completed"
else:
# 把下一步 Agent 的输入推入队列
next_agent = task["agent_chain"][task["current_step"]]
await self.redis.lpush(f"agent_queue:{next_agent}", task_id)
await self.redis.set(f"task:{task_id}", json.dumps(task), ex=3600)
async def handle_failure(self, task_id: str, error: str):
"""失败处理:重试或死信队列"""
raw = await self.redis.get(f"task:{task_id}")
task = json.loads(raw)
task["retry_count"] += 1
if task["retry_count"] <= task["max_retries"]:
task["status"] = "retrying"
current_agent = task["agent_chain"][task["current_step"]]
# 指数退避重试
delay = 2 ** task["retry_count"]
await asyncio.sleep(delay)
await self.redis.lpush(f"agent_queue:{current_agent}", task_id)
else:
task["status"] = "failed"
task["error"] = error
await self.redis.lpush("dead_letter_queue", task_id)
await self.redis.set(f"task:{task_id}", json.dumps(task), ex=86400)
坑 #5:自研调度器缺乏 observability,调试成本极高
自研的最大风险:没有 LangSmith 这样的开箱即用可观测性工具。
我们早期的自研版本没有 tracing,出了问题全靠日志查,定位一个 Agent 链路 bug 平均要花 2-3 小时。
最小可行 observability 方案:
import structlog
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
# 结构化日志 + OTel trace
log = structlog.get_logger()
tracer = trace.get_tracer("multi-agent")
class InstrumentedAgent:
def __init__(self, name: str, agent_fn):
self.name = name
self.agent_fn = agent_fn
async def run(self, task_id: str, input_data: dict) -> dict:
with tracer.start_as_current_span(f"agent.{self.name}") as span:
span.set_attribute("task.id", task_id)
span.set_attribute("agent.name", self.name)
span.set_attribute("input.token_estimate", estimate_tokens(str(input_data)))
try:
result = await self.agent_fn(input_data)
span.set_attribute("output.token_estimate", estimate_tokens(str(result)))
span.set_attribute("status", "success")
log.info("agent_success",
agent=self.name,
task_id=task_id,
input_tokens=estimate_tokens(str(input_data)),
output_tokens=estimate_tokens(str(result))
)
return result
except Exception as e:
span.record_exception(e)
span.set_attribute("status", "error")
log.error("agent_failed",
agent=self.name,
task_id=task_id,
error=str(e)
)
raise
实际性能对比:三个框架在同一任务上的数据
以下是我们在一个真实任务上的测量数据(任务:分析 50 条用户反馈,分类并生成摘要报告):
| 框架 | 总 token 消耗 | 完成时间 | 失败率(100次测试) | 成本(DeepSeek-V3 定价) |
|---|---|---|---|---|
| AutoGen 0.x(默认配置) | ~85,000 | 4.2 min | 12%(超时/循环) | ~$1.70 |
| AutoGen 0.x(加终止保护) | ~42,000 | 2.8 min | 4% | ~$0.84 |
| LangGraph | ~18,000 | 1.6 min | 1% | ~$0.36 |
| 自研调度器 | ~16,000 | 1.8 min | 2% | ~$0.32 |
说明:
- AutoGen 默认配置的高 token 消耗来自全量历史传递
- LangGraph 的 token 效率接近自研方案的原因是 delta 传递机制
- 自研方案的失败率略高于 LangGraph,因为我们当时的重试逻辑还不够成熟
- 测试时间:2025 年 Q4,模型版本 DeepSeek-V3-2024-11-20
重要提醒:以上数据基于特定任务和配置,不代表所有场景。你的任务复杂度、Agent 数量、状态大小都会显著影响结果。这里提供数字的目的是给出量级参考,而不是作为选型的唯一依据。
一个容易忽视的问题:多 Agent 系统的容量规划
单 Agent 系统的容量规划相对简单:QPS × 平均 token/请求 = token 消耗率。
多 Agent 系统复杂得多,因为每个外部请求可能触发多个 LLM 调用:
1 个用户请求
→ Supervisor Agent(~500 token)
→ Research Agent × N(~1500 token each)
→ Writer Agent(~2000 token)
→ Review Agent(~800 token)
→ 合计:~5000-8000 token/用户请求(含并发)
如果你的应用有 100 QPS,实际 LLM 调用可能是 400-800 QPS,每个调用还有并发的 latency tail risk。
容量规划公式:
实际 LLM QPS = 用户 QPS × 平均 Agent 数 × 平均分支因子
预留 headroom = 1.5x(Agent 循环重试 + 工具调用重试)
我们犯过的错误:按单 Agent 的 QPS 做了 Rate Limiting 配置,结果多 Agent 系统一上线就撞 429,而且因为错误是在 Agent 链路中间发生的,没有做好 backpressure,整个 pipeline 直接崩了。这个教训催生了我们之前文章写的[请求队列与背压工程实践]。
框架选型决策矩阵
基于上述实践,这是我们总结的选型参考:
| 维度 | LangGraph | AutoGen | 自研 |
|---|---|---|---|
| 学习曲线 | 高(图结构心智模型) | 低(对话式) | 极高 |
| 生产可靠性 | ★★★★★ | ★★★(0.x)/ ★★★★(迁移后) | 取决于投入 |
| Token 成本可控性 | ★★★★★(delta 传递) | ★★(历史全量传递) | ★★★★★ |
| Observability | ★★★★★(LangSmith) | ★★★ | ★★(需自建) |
| Human-in-the-Loop | ★★★★★(原生支持) | ★★★(需 HumanProxyAgent) | 灵活 |
| 故障恢复 | ★★★★★(checkpointing) | ★★ | 取决于实现 |
| 动态 Agent 数量 | ★★★ | ★★★★ | ★★★★★ |
| 生态稳定性 | ★★★★★ | ★★★(正在迁移) | ★★★★★(自主) |
| 初始投入 | 10-14 engineer-days | 2-5 engineer-days | 30+ engineer-days |
决策建议:
- 新项目,需要长期维护 → LangGraph,接受学习曲线,换取生产可靠性
- 快速验证 POC,Azure 技术栈 → AutoGen(注意迁移风险)
- 已有任务队列基础设施,需极度灵活 → 自研,但必须同步投入 observability
- 任何情况都不推荐 → 在没有终止条件和 token 上限保护的情况下上生产
有一个细节值得单独说明:上表里「初始投入」的估算是基于从零开始的情况。如果你们团队已经熟悉 Python 异步编程和有向图的概念(比如做过 Airflow DAG 或者 Celery workflow),LangGraph 的学习曲线会大幅缩短,实际可能 5-7 个工程日就能上手。反之,如果团队背景主要是前端或者数据科学,这个曲线可能比估算的更陡。
另外,三个框架的社区活跃度和文档质量也在影响实际上手速度:LangGraph 的官方文档在 2026 年经过了一次大幅重组,现在质量明显好于 2024 年;AutoGen 的文档因为 0.x 到 0.4.x 再到 Microsoft Agent Framework 的迁移,目前有些碎片化,找到正确版本的文档需要额外注意。
从自研迁移到 LangGraph:迁移路径与成本估算
如果你和我们当时一样,已经有一套跑着的自研调度器或 AutoGen 代码,想迁移到 LangGraph,这里是一些实际经验。
迁移成本估算
| 现有方案 | 规模 | 迁移工作量估算 |
|---|---|---|
| AutoGen 0.x(<5 Agent) | 小型 | 3-5 天 |
| AutoGen 0.x(5-15 Agent) | 中型 | 2-3 周 |
| 自研串行调度器 | 小型 | 1-2 周 |
| 自研复杂调度器(含并发) | 大型 | 1-2 个月 |
影响迁移成本的关键因素:
- 状态 schema 设计是否清晰:如果现有方案的 Agent 间通信靠 dict 乱传,迁移时需要先整理 schema
- 有没有完善的测试用例:LangGraph 的图行为是确定性的,但需要测试用例来验证行为等价
- 对历史会话的依赖程度:LangGraph 的 checkpoint 格式与现有存储不兼容,历史会话需要迁移或放弃
迁移策略:双写期
我们采用的是「双写期」策略,用 4 周时间把流量逐步从 AutoGen 切到 LangGraph:
第 1 周:新功能只在 LangGraph 实现,老功能继续用 AutoGen
第 2 周:10% 新增请求路由到 LangGraph,监控错误率
第 3 周:50% 新增请求路由到 LangGraph
第 4 周:100% 新增请求,老 AutoGen 进入只读状态
第 5 周:彻底下线 AutoGen,清理旧代码
路由层的实现非常简单:
import hashlib
def route_to_framework(user_id: str, feature_flags: dict) -> str:
"""基于用户 ID 的一致性路由,同一用户总是进同一个框架"""
if feature_flags.get("langgraph_rollout_pct", 0) == 100:
return "langgraph"
# 一致性哈希,保证同一用户体验一致
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16) % 100
rollout_pct = feature_flags.get("langgraph_rollout_pct", 0)
return "langgraph" if hash_val < rollout_pct else "autogen"
双写期的核心价值:给你时间发现 LangGraph 在你特定业务场景下的行为差异,而不是靠大爆炸式迁移发现问题。
多 Agent 系统中最值得关注的安全问题
多 Agent 系统引入了一类传统微服务没有的安全威胁:Agent 间的信任边界。
Prompt Injection 在多 Agent 场景的放大效应
单 Agent 的 Prompt Injection 攻击是已知问题(我们之前写过专门的文章)。多 Agent 系统会放大这个问题:
攻击者构造恶意输入 → Research Agent 处理 → 恶意指令被提取为「研究结论」
→ Writer Agent 把恶意指令误认为合法输入 → 生成攻击者期望的输出
即使每个 Agent 自身对 Prompt Injection 有防御,Agent 链路之间的信任传递也可能绕过防御。
防御策略:
- 输出清洗(Output Sanitization):每个 Agent 的输出在传给下一个 Agent 之前,过滤掉可能是指令的内容
- 最小权限原则:Writer Agent 不应该有调用外部 API 的工具,即使 Researcher Agent 的输出里包含了这类指令
- Agent 间调用签名:在 State 里记录每一步数据的来源 Agent 和处理时间,便于事后审计
# 在 State 里记录数据来源链
class AgentState(TypedDict):
artifacts: dict # 各 Agent 产物
provenance: dict # 数据来源追踪
# 每个 Agent 写出数据时记录来源
def research_agent(state: AgentState) -> dict:
result = do_research(state["task"])
return {
"artifacts": {"researcher": result},
"provenance": {
"researcher": {
"agent": "researcher",
"timestamp": datetime.now().isoformat(),
"input_hash": hashlib.md5(state["task"].encode()).hexdigest()
}
}
}
这个设计不只是安全层面的,在调试异常输出时也非常有用:你能直接追溯「这段内容是哪个 Agent 在什么时候写入的」。
结语:多 Agent 是个分布式系统问题
回头看这段经历,最大的教训是:多 Agent 架构不是 AI 问题,是分布式系统问题。
- 状态管理 → 和微服务的状态一致性是同一个问题
- 故障恢复 → 和任务队列的 at-least-once 语义是同一个问题
- 成本控制 → 和数据库查询优化是同一个问题(token 是计算资源)
- Observability → 和分布式 tracing 是同一个问题
唯一的新变量是 LLM 的非确定性:你的 Agent 链路可能因为模型输出的一个词不同就走向完全不同的分支。这要求你的调度层有比传统微服务更强的防御性设计。
LangGraph 在这个维度上目前是工具选择里最成熟的。如果你现在刚开始构建多 Agent 系统,我的建议是先花 2 周学好 LangGraph 的图模型,这个投入值得。
最后说一个反直觉的经验:我们花在「如何让 Agent 更聪明」上的时间,远不如花在「如何让框架更可靠」上的时间带来的收益大。模型在进步,聪明这件事交给模型厂商去做。工程师的价值在于把这些模型能力稳定、可预期、有成本意识地组合成业务价值。
这就是为什么框架选型值得认真对待。它不只是技术选择,也是你的团队未来几年要面对什么样的工程问题的选择。
附录:快速检查清单——上生产前必须确认的 10 件事
不管你用哪个框架,在多 Agent 系统上生产之前,对照这个清单逐项确认:
终止保护(必须)
- 每条 Agent 链路都有硬性轮次上限(max_iterations / max_consecutive_auto_reply)
- 每个 LLM 调用都有超时配置(timeout_seconds)
- 整个 pipeline 有全局超时兜底
成本控制(必须)
- 每次 LLM 调用的 token 上限已设置(max_tokens)
- 有 per-task 的 token budget 监控,超预算发告警
- 对话历史有裁剪策略,防止二次方增长
故障恢复(强烈建议)
- 已测试「Agent B 失败后整个 pipeline 的行为」
- 重试逻辑有指数退避,而不是立即重试
- 长任务有 checkpoint 机制,避免全量重跑
可观测性(强烈建议)
- 每个 Agent 的调用链路有 trace id 关联
- 关键指标已接入监控:token 消耗、延迟、错误率
- 有能力回放单次失败 task 的完整执行链路
驉側话:什么时候不应该上多 Agent
这篇文章里是如何应对多 Agent 系统的工程问题。但有一个问题同样重要:你的场景真的需要多 Agent 吗?
以下情况下,单 Agent 就够了,不要引入多 Agent 复杂度:
1. 任务内在就是线性的
用户输入 → 检索 → 生成 → 输出。这个流程没有多个其并发的步骤,也没有需要动态路由的判断逻辑。此时用多 Agent 只是增加了调用延迟和复杂度。
2. 任务对 latency 极度敏感且需要达到 500ms 以内
每个额外的 Agent 层就是一次额外的 LLM API RTT。如果业务需要 sub-second 响应,优先考虑单 Agent 加工具调用,而不是多 Agent 并发。
3. 问题本身简单,是模型能力不够
如果你引入多 Agent 的原因是「单 Agent 没办法解决这个问题」,先检查是否是 prompt 设计问题,或者就应该换一个更强的模型。多 Agent 系统不是弥补模型能力不足的戞鸟工具。
4. 团队没有多 Agent 系统的运维经验
多 Agent 系统的调试和运维复杂度远超单 Agent。如果团队还没有成熟的 Agent 开发经验,引入多 Agent 只会把运维复杂度迅速叠加。
判断树:
需要多个具有不同层次知识的専家协作?
└ 是 → 多 Agent
└ 否 → 单 Agent + 工具调用
任务需要并发处理且单个 Agent 没办法同时运行多个子任务?
└ 是 → 多 Agent
└ 否 → 单 Agent + 多次工具调用
工作流需要其中某些步骤的输出为另一些步骤的输入,且有内在岐1循环逐代逻辑?
└ 是 → 多 Agent
└ 否 → 单 Agent + 结构化输出
简单的语言则:如果一个工具调用能解决的,就不要用 Agent。留着 Agent 劳动,就要确保至少有 Agent 层面的智能和自主性在起作用。
实战建议:从第一个多 Agent 任务开始的正确姿势
如果你现在刚开始,以下是我们最想告诉过去的自己的几条建议:
第一步:先把单 Agent 做到极限
在引入第二个 Agent 之前,用尽一切手段优化单 Agent:更好的 prompt、更强的模型、工具调用、结构化输出。单 Agent 的极限往往比你想象的高很多。我们团队有一次把「必须多 Agent」的任务用更精细的 prompt 设计加工具调用解决了,省掉了整套多 Agent 架构的开发成本。
第二步:在纸上画出你的 Agent 图,然后问自己三个问题
- 这张图里哪些节点可以是普通函数?
- 这张图里哪些节点之间的通信可以是同步 API 调用而不是 LLM 调用?
- 这张图里最长的路径有多少步?每一步预期的延迟是多少?
如果第三个问题的答案超过 5 步,仔细想想是否真的需要这么多层。
第三步:从最简单的线性 pipeline 开始,而不是从复杂的层级结构开始
层级结构(Supervisor + 多个 Sub-Agent)听起来很优雅,但调试起来非常痛苦。先做线性的,证明每一步的输出质量可以接受之后,再引入动态路由和条件分支。
第四步:在生产环境的第一个月,把 token 成本监控和错误率监控放在最显眼的位置
不是 dashboard 深处,是每天工作时能看到的地方。多 Agent 系统的问题很容易在你不注意的时候悄悄演化成大问题。早发现早处理,比出了大事故再修要好得多。
第五步:为你的框架选择写下决策记录
无论你选了 LangGraph、AutoGen 还是自研,把选择的理由、当时考虑的替代方案、放弃其他方案的原因记录下来。六个月后你的团队可能会有新成员,或者框架本身又出了新版本,这份记录能帮你快速评估是否需要重新审视当初的决策。
多 Agent 工程是一个快速演化的领域。今天的最佳实践,明年可能就有更好的方案。保持学习,同时也保持批判性思维——不是所有新框架都值得立即迁移。
如果你在多 Agent 编排上踩过别的坑,欢迎在评论区交流,很期待听到不同场景下的实战经验。
更多推荐

所有评论(0)