多 Agent 系统的编排工程实践:LangGraph vs AutoGen vs 自研调度器,我们踩的 5 个生产坑

一个 AI Agent 是个功能,五十个 Agent 是个分布式系统问题,但没人讨论后者。

去年我们把团队的智能客服系统从单 Agent 升级到多 Agent 协作架构,前后花了三个月,踩了无数坑,中途还把框架从 AutoGen 0.x 换成了 LangGraph,最后部分场景还是选择了自研调度器。这三个月里最痛的体验不是代码写不出来,而是每次以为搞定了,又冒出一个新的问题:成本超预算了、某个 Agent 在循环了、出了问题不知道是哪一步的锅。

这篇文章不是框架教程——GitHub 上那些要多少有多少。我想写的是:在真实生产环境里,三种方案各自在哪个节点上让你欲哭无泪,以及我们是怎么做决策的。


先说结论:三个框架各有一个「致命缺陷」

在进入细节之前,我们先把结论摆出来,避免你花了 2 小时读完文章再发现方向不对:

  • AutoGen:对话循环的 token 消耗以二次方增长,没有终止保护就敢上生产等于慢性失血;加上 0.x → Microsoft Agent Framework 的迁移压力,新项目不建议从它开始。
  • LangGraph:图结构的 StateGraph 需要 10-14 个工程日才能熟练,初期投入高;State schema 一旦上线就很难改动,早期设计错误代价极高。
  • 自研调度器:灵活性最高,但 observability 几乎为零,调试成本是其他方案的 3-5 倍;没有 30+ 工程日的投入不要轻易尝试。

没有银弹。我们最终的选择是:核心生产 pipeline 用 LangGraph,超长异步任务用自研队列,新功能快速验证用 AutoGen 原型但不上生产


为什么多 Agent 系统的工程复杂度远超预期

多数团队进入多 Agent 架构的方式是这样的:先做出一个表现不错的单 Agent,某天发现任务复杂到单 Agent 撑不住,于是拆解角色、引入协作,以为多几个 Agent 只是量变。

错了。

单 Agent 的核心问题是「智能够不够」。多 Agent 系统的核心问题是「分布式系统的一致性、调度、状态同步和故障恢复」,跟你熟悉的微服务架构本质上是同一类问题,但 LLM 调用的非确定性让每个环节的调试成本都翻了几倍。

我们在升级过程中遭遇的核心挑战:

  1. 状态传播:谁负责维护全局上下文?每个 Agent 各存一份还是集中管理?
  2. 调用链追踪:某个 Agent 给出了错误答案,如何定位是哪个上游 Agent 的输入有问题?
  3. 成本爆炸:多 Agent 对话循环导致 token 消耗以二次方增长。
  4. 故障恢复:Agent B 调用超时,整个 pipeline 如何重试?已完成的步骤要不要重跑?
  5. 终止条件:没有硬性限制时,Agent 会无限循环寻找「更好」的答案。

在这个背景下,我们依次评估并实践了三种方案。

另外还有一个很多团队低估的问题:多 Agent 系统的错误率是单 Agent 的指数级。假设每个 Agent 的准确率是 90%,一个 5 个 Agent 串行的 pipeline 整体准确率只有 0.9^5 = 59%。这意味着你在引入多 Agent 之前必须先想清楚:哪些步骤真的需要 Agent 而不是确定性函数?

我们的经验是:把多 Agent 链路中每个「Agent」的调用都分类:

  • LLM 必须的:需要语义理解、生成或推理的步骤
  • 可以是普通函数的:数据转换、格式化、条件判断
  • 可以是工具调用的:API 请求、数据库查询、文件操作

把第二、三类从 Agent 链路里拿出来,变成普通函数或工具,既降低错误率,又节省大量 token 成本。


方案一:AutoGen(0.x 时代,2025 年 Q1)

为什么最初选它

AutoGen 是 某海外知名研究机构 出品,文档齐全,GitHub Star 数高,当时社区案例最多,上手极快。我们两天就搭出了原型。

核心模型是「对话式多智能体」——Agent 之间互发消息,通过 ConversableAgent 互相协作,天然支持角色扮演场景。

import autogen

# 定义两个 Agent
researcher = autogen.AssistantAgent(
    name="Researcher",
    system_message="你负责检索和分析信息,给出有据可查的研究结论。",
    llm_config={"model": "DeepSeek-V3", "temperature": 0}
)

writer = autogen.AssistantAgent(
    name="Writer",
    system_message="你负责把研究结论整理成清晰的用户回复。",
    llm_config={"model": "DeepSeek-V3", "temperature": 0.3}
)

user_proxy = autogen.UserProxyAgent(
    name="UserProxy",
    human_input_mode="NEVER",  # 生产环境自动运行
    max_consecutive_auto_reply=10,
    is_termination_msg=lambda x: "TASK_COMPLETE" in x.get("content", "")
)

# 启动对话
user_proxy.initiate_chat(researcher, message="分析最近一周的用户投诉趋势")

坑 #1:对话历史不裁剪,token 消耗爆炸

AutoGen 的对话循环默认把所有历史消息都发给每个 Agent。

这意味着:

  • 第 1 轮:Researcher 消耗 500 token
  • 第 5 轮:每个 Agent 调用都携带前 4 轮完整历史,消耗变成 ~2500 token
  • 第 10 轮:~5000 token

以二次方增长。一次看似简单的多 Agent 分析任务,最终账单让我们惊出一身冷汗。

# AutoGen 默认行为:每次调用都带完整历史
# [system] + [user_turn_1] + [assistant_turn_1] + ... + [user_turn_n]
# 随着对话深入,成本 = O(n²)

# 缓解方案:强制 max_consecutive_auto_reply + 自定义消息裁剪
def trim_messages(messages, max_tokens=3000):
    """保留最近 N 条消息,保证不超 token 预算"""
    trimmed = []
    total = 0
    for msg in reversed(messages):
        msg_tokens = estimate_tokens(msg["content"])
        if total + msg_tokens > max_tokens:
            break
        trimmed.insert(0, msg)
        total += msg_tokens
    return trimmed

但这是补丁,不是架构级解法。

坑 #2:终止条件设计不当导致无限循环

is_termination_msg 是基于字符串匹配的软性终止。如果 Agent 没有生成预期的结束信号(LLM 输出是概率性的),对话会一直继续。

我们遇到过 Researcher Agent 陷入「我需要更多信息 → 搜索 → 还不够 → 继续搜索」的死循环,max_consecutive_auto_reply=10 触发之前已经烧了 $3。

正确姿势:硬性 + 软性双保险

user_proxy = autogen.UserProxyAgent(
    name="UserProxy",
    human_input_mode="NEVER",
    max_consecutive_auto_reply=5,  # 硬性上限:最多 5 轮自动回复
    is_termination_msg=lambda x: (
        "TASK_COMPLETE" in x.get("content", "") or
        "无法完成" in x.get("content", "") or
        len(x.get("content", "")) < 10  # 空回复也视为终止
    )
)

AutoGen 2026 年现状:迁移警告

值得特别注意的是:AutoGen 0.x/0.4.x 正在向 Microsoft Agent Framework 迁移(官方迁移指南发布于 2026 年 4 月)。如果你现在新建项目选择 AutoGen,要做好未来迁移成本的心理准备。

AutoGen 适合谁:Microsoft 技术栈(国产大模型 API)、对话式多智能体原型、短期项目。不适合需要长期维护的生产系统。

补充一点:AutoGen 0.4.x 引入了 AgentChat API,相比 0.x 在会话管理上有所改进,也支持更细粒度的 termination_condition 配置。但架构上的对话全量历史问题并没有根本性解决,需要手动配置 SelectorGroupChat 加上 MaxMessageTermination

# AutoGen 0.4.x 更好的终止配置
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat

termination = MaxMessageTermination(max_messages=10) | TextMentionTermination("TASK_COMPLETE")

team = RoundRobinGroupChat(
    [researcher, writer, reviewer],
    termination_condition=termination,
    max_turns=10  # 额外的硬上限
)

# 运行(异步)
result = await team.run(task="分析用户投诉趋势")

这比 0.x 的方式更可控,但如果你已经在用 AutoGen 0.4,建议关注 Microsoft Agent Framework 的迁移时间线,早做准备。


方案二:LangGraph(2025 年 Q3 迁移)

我们在 AutoGen 上撑了半年,问题越积越多,最终决定迁移到 LangGraph。

LangGraph 的核心心智模型

LangGraph 把 Agent 工作流建模为有向图(StateGraph),每个节点是一个处理单元,状态(State)在节点间流转,边(Edge)决定执行路径。

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator

# 定义全局状态 schema
class AgentState(TypedDict):
    messages: Annotated[List[dict], operator.add]  # 消息追加
    task: str
    research_result: str
    draft: str
    review_passed: bool
    iteration: int

# 创建图
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("researcher", research_agent)
workflow.add_node("writer", writing_agent)
workflow.add_node("reviewer", review_agent)

# 添加边(条件路由)
workflow.add_conditional_edges(
    "reviewer",
    lambda state: "writer" if not state["review_passed"] and state["iteration"] < 3 else END,
    {
        "writer": "writer",
        END: END
    }
)

workflow.set_entry_point("researcher")
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", "reviewer")

# 编译(带 checkpointing)
from langgraph.checkpoint.sqlite import SqliteSaver
checkpointer = SqliteSaver.from_conn_string("agents.db")
app = workflow.compile(checkpointer=checkpointer)

为什么图结构解决了核心问题

状态传播清晰AgentState 是显式的、类型化的共享状态,每个节点只修改自己负责的字段,不会误改其他 Agent 的数据。相比 AutoGen 的全量对话历史传递,这个设计让每个 Agent 的职责边界一目了然,也让新加入项目的工程师能快速理解整个系统的数据流,减少了不少「这个字段是哪个 Agent 写进来的?」的困惑。

token 成本可控:LangGraph 的节点之间传递的是状态 delta,不是完整对话历史。每个节点的 LLM 调用只接收它需要的字段,token 消耗是线性的。从工程角度看,这类似于微服务里的 event-driven delta 更新代替全量同步,避免了 AutoGen 式的二次方成本增长。

一个 5 节点 Agent 工作流,LangGraph 的 token 消耗大约是 CrewAI 的 1/3,是原始 AutoGen 对话模式的 1/5(这是我们自己在相同任务上的测量数据,任务:分析 100 条用户反馈并分类)。

坑 #3:StateGraph 调试需要专门工具,裸 print 不够

图结构的另一面是:当工作流出错时,你不能简单 print 每一步输出,因为节点是并发执行的,日志会乱序。

正确做法

# 方案 A:用 LangSmith(推荐生产环境)
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-key"
os.environ["LANGCHAIN_PROJECT"] = "multi-agent-production"

# 方案 B:用 stream 逐步观察状态变化
config = {"configurable": {"thread_id": "debug-run-001"}}
for step in app.stream(initial_state, config):
    for node_name, output in step.items():
        print(f"节点 [{node_name}] 输出: {output}")
        # 逐节点检查,比 print 有用得多

checkpointing 的真正价值在生产里才体现出来

# 任务中途失败?从最后一个 checkpoint 恢复,不用从头重跑
config = {"configurable": {"thread_id": "task-20260613-001"}}

# 恢复执行
result = app.invoke(None, config)  # None = 从 checkpoint 加载状态

# 人工审核介入(Human-in-the-Loop)
# 图执行到需要审批的节点时自动暂停,等待人工确认后恢复

这个能力在我们的场景里价值巨大:一个分析任务跑到第 8 个 Agent 时 LLM 超时,AutoGen 时代需要从头重跑,LangGraph 时代直接从第 8 个 checkpoint 恢复,节省了 80% 的重试成本。

从 Aerospike 的 LangGraph 生产基准测试数据来看,同一个 5 Agent 工作流(100 次重复执行),LangGraph 的完成时间是 CrewAI 的不到一半。关键原因是 LangGraph 的状态 delta 传递机制:每次节点调用只携带必要的字段,而不是整个对话历史。

生产部署注意事项:LangGraph Server(官方托管版)和自托管版在 checkpointing 存储后端上有差异:

# 开发/测试:内存 checkpointer(重启后丢失)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()

# 生产推荐:PostgreSQL(持久化,支持横向扩展)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost:5432/agents"
)

# 或 Redis(高吞吐量场景)
from langgraph.checkpoint.redis import RedisSaver
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")

app = workflow.compile(checkpointer=checkpointer)

不要在生产环境使用 MemorySaver——进程重启后所有 checkpoint 消失,等于回到 AutoGen 时代的困境。

坑 #4:图状态设计错了,后期改动成本极高

这是我们付出最大代价的坑。LangGraph 的 AgentState 定义一旦上线,修改 schema 需要同步修改所有相关节点,并处理历史 checkpoint 的兼容性。

我们在上线三周后想给 State 加一个 context_window 字段,结果:

  • 7 个节点需要修改
  • 历史 checkpoint 反序列化报错
  • 不得不做一次带数据迁移的 hotfix

State 设计原则(血泪经验):

还有一个不那么明显的坑:Annotated[List[dict], operator.add] 的追加语义在节点并发执行时会导致竞争条件。如果你有并行节点(add_node 后设置为 parallel),要么改用 Annotated[List[dict], lambda a, b: a + b],要么确保并行节点不操作同一个 list 字段。

# ❌ 错误:字段太细,后期难以扩展
class AgentState(TypedDict):
    user_query: str
    search_keyword_1: str
    search_keyword_2: str
    search_result_1: str
    search_result_2: str

# ✅ 正确:用 dict 做扩展点,避免 schema 频繁变更
class AgentState(TypedDict):
    messages: Annotated[List[dict], operator.add]
    task: str
    metadata: dict          # 灵活扩展字段,不改 schema
    artifacts: dict         # 各 Agent 产物,key = agent_name
    control: dict           # 控制信息:iteration, max_iterations, status

# 示例:artifacts 字段使用
state["artifacts"]["researcher"] = {
    "sources": [...],
    "summary": "...",
    "confidence": 0.85
}
state["artifacts"]["writer"] = {
    "draft": "...",
    "word_count": 1200
}

方案三:自研调度器(部分场景)

LangGraph 非常好,但它并不适合所有场景。我们最终在以下场景选择了自研调度器:

  1. 超长任务(>30 分钟):LangGraph 的 checkpoint 在内存中,进程重启后需要额外的持久化配置
  2. 动态 Agent 数量:某些任务需要根据输入动态生成 N 个并发 Agent,图结构的静态定义不够灵活
  3. 与现有任务队列集成:我们已经有 Celery + Redis 基础设施,不想再引入 LangGraph Server

自研调度器的核心实现:

import asyncio
import json
from datetime import datetime
from redis import asyncio as aioredis

class AgentTaskQueue:
    """基于 Redis 的 Agent 任务调度器"""
    
    def __init__(self, redis_url: str):
        self.redis = aioredis.from_url(redis_url)
    
    async def submit_task(
        self, 
        task_id: str,
        agent_chain: list[str],  # ["researcher", "writer", "reviewer"]
        payload: dict,
        max_retries: int = 3
    ) -> str:
        task = {
            "task_id": task_id,
            "agent_chain": agent_chain,
            "current_step": 0,
            "payload": payload,
            "results": {},
            "status": "pending",
            "created_at": datetime.now().isoformat(),
            "max_retries": max_retries,
            "retry_count": 0
        }
        # 持久化到 Redis
        await self.redis.set(f"task:{task_id}", json.dumps(task), ex=3600)
        await self.redis.lpush("task_queue", task_id)
        return task_id
    
    async def advance_task(self, task_id: str, step_result: dict):
        """某个 Agent 完成后,推进任务到下一步"""
        raw = await self.redis.get(f"task:{task_id}")
        task = json.loads(raw)
        
        current_agent = task["agent_chain"][task["current_step"]]
        task["results"][current_agent] = step_result
        task["current_step"] += 1
        
        if task["current_step"] >= len(task["agent_chain"]):
            task["status"] = "completed"
        else:
            # 把下一步 Agent 的输入推入队列
            next_agent = task["agent_chain"][task["current_step"]]
            await self.redis.lpush(f"agent_queue:{next_agent}", task_id)
        
        await self.redis.set(f"task:{task_id}", json.dumps(task), ex=3600)
    
    async def handle_failure(self, task_id: str, error: str):
        """失败处理:重试或死信队列"""
        raw = await self.redis.get(f"task:{task_id}")
        task = json.loads(raw)
        
        task["retry_count"] += 1
        if task["retry_count"] <= task["max_retries"]:
            task["status"] = "retrying"
            current_agent = task["agent_chain"][task["current_step"]]
            # 指数退避重试
            delay = 2 ** task["retry_count"]
            await asyncio.sleep(delay)
            await self.redis.lpush(f"agent_queue:{current_agent}", task_id)
        else:
            task["status"] = "failed"
            task["error"] = error
            await self.redis.lpush("dead_letter_queue", task_id)
        
        await self.redis.set(f"task:{task_id}", json.dumps(task), ex=86400)

坑 #5:自研调度器缺乏 observability,调试成本极高

自研的最大风险:没有 LangSmith 这样的开箱即用可观测性工具。

我们早期的自研版本没有 tracing,出了问题全靠日志查,定位一个 Agent 链路 bug 平均要花 2-3 小时。

最小可行 observability 方案

import structlog
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider

# 结构化日志 + OTel trace
log = structlog.get_logger()
tracer = trace.get_tracer("multi-agent")

class InstrumentedAgent:
    def __init__(self, name: str, agent_fn):
        self.name = name
        self.agent_fn = agent_fn
    
    async def run(self, task_id: str, input_data: dict) -> dict:
        with tracer.start_as_current_span(f"agent.{self.name}") as span:
            span.set_attribute("task.id", task_id)
            span.set_attribute("agent.name", self.name)
            span.set_attribute("input.token_estimate", estimate_tokens(str(input_data)))
            
            try:
                result = await self.agent_fn(input_data)
                span.set_attribute("output.token_estimate", estimate_tokens(str(result)))
                span.set_attribute("status", "success")
                
                log.info("agent_success",
                    agent=self.name,
                    task_id=task_id,
                    input_tokens=estimate_tokens(str(input_data)),
                    output_tokens=estimate_tokens(str(result))
                )
                return result
                
            except Exception as e:
                span.record_exception(e)
                span.set_attribute("status", "error")
                log.error("agent_failed",
                    agent=self.name,
                    task_id=task_id,
                    error=str(e)
                )
                raise

实际性能对比:三个框架在同一任务上的数据

以下是我们在一个真实任务上的测量数据(任务:分析 50 条用户反馈,分类并生成摘要报告):

框架 总 token 消耗 完成时间 失败率(100次测试) 成本(DeepSeek-V3 定价)
AutoGen 0.x(默认配置) ~85,000 4.2 min 12%(超时/循环) ~$1.70
AutoGen 0.x(加终止保护) ~42,000 2.8 min 4% ~$0.84
LangGraph ~18,000 1.6 min 1% ~$0.36
自研调度器 ~16,000 1.8 min 2% ~$0.32

说明

  • AutoGen 默认配置的高 token 消耗来自全量历史传递
  • LangGraph 的 token 效率接近自研方案的原因是 delta 传递机制
  • 自研方案的失败率略高于 LangGraph,因为我们当时的重试逻辑还不够成熟
  • 测试时间:2025 年 Q4,模型版本 DeepSeek-V3-2024-11-20

重要提醒:以上数据基于特定任务和配置,不代表所有场景。你的任务复杂度、Agent 数量、状态大小都会显著影响结果。这里提供数字的目的是给出量级参考,而不是作为选型的唯一依据。


一个容易忽视的问题:多 Agent 系统的容量规划

单 Agent 系统的容量规划相对简单:QPS × 平均 token/请求 = token 消耗率。

多 Agent 系统复杂得多,因为每个外部请求可能触发多个 LLM 调用:

1 个用户请求
  → Supervisor Agent(~500 token)
    → Research Agent × N(~1500 token each)
    → Writer Agent(~2000 token)
    → Review Agent(~800 token)
  → 合计:~5000-8000 token/用户请求(含并发)

如果你的应用有 100 QPS,实际 LLM 调用可能是 400-800 QPS,每个调用还有并发的 latency tail risk。

容量规划公式

实际 LLM QPS = 用户 QPS × 平均 Agent 数 × 平均分支因子
预留 headroom = 1.5x(Agent 循环重试 + 工具调用重试)

我们犯过的错误:按单 Agent 的 QPS 做了 Rate Limiting 配置,结果多 Agent 系统一上线就撞 429,而且因为错误是在 Agent 链路中间发生的,没有做好 backpressure,整个 pipeline 直接崩了。这个教训催生了我们之前文章写的[请求队列与背压工程实践]。


框架选型决策矩阵

基于上述实践,这是我们总结的选型参考:

维度 LangGraph AutoGen 自研
学习曲线 高(图结构心智模型) 低(对话式) 极高
生产可靠性 ★★★★★ ★★★(0.x)/ ★★★★(迁移后) 取决于投入
Token 成本可控性 ★★★★★(delta 传递) ★★(历史全量传递) ★★★★★
Observability ★★★★★(LangSmith) ★★★ ★★(需自建)
Human-in-the-Loop ★★★★★(原生支持) ★★★(需 HumanProxyAgent) 灵活
故障恢复 ★★★★★(checkpointing) ★★ 取决于实现
动态 Agent 数量 ★★★ ★★★★ ★★★★★
生态稳定性 ★★★★★ ★★★(正在迁移) ★★★★★(自主)
初始投入 10-14 engineer-days 2-5 engineer-days 30+ engineer-days

决策建议

  • 新项目,需要长期维护 → LangGraph,接受学习曲线,换取生产可靠性
  • 快速验证 POC,Azure 技术栈 → AutoGen(注意迁移风险)
  • 已有任务队列基础设施,需极度灵活 → 自研,但必须同步投入 observability
  • 任何情况都不推荐 → 在没有终止条件和 token 上限保护的情况下上生产

有一个细节值得单独说明:上表里「初始投入」的估算是基于从零开始的情况。如果你们团队已经熟悉 Python 异步编程和有向图的概念(比如做过 Airflow DAG 或者 Celery workflow),LangGraph 的学习曲线会大幅缩短,实际可能 5-7 个工程日就能上手。反之,如果团队背景主要是前端或者数据科学,这个曲线可能比估算的更陡。

另外,三个框架的社区活跃度和文档质量也在影响实际上手速度:LangGraph 的官方文档在 2026 年经过了一次大幅重组,现在质量明显好于 2024 年;AutoGen 的文档因为 0.x 到 0.4.x 再到 Microsoft Agent Framework 的迁移,目前有些碎片化,找到正确版本的文档需要额外注意。


从自研迁移到 LangGraph:迁移路径与成本估算

如果你和我们当时一样,已经有一套跑着的自研调度器或 AutoGen 代码,想迁移到 LangGraph,这里是一些实际经验。

迁移成本估算

现有方案 规模 迁移工作量估算
AutoGen 0.x(<5 Agent) 小型 3-5 天
AutoGen 0.x(5-15 Agent) 中型 2-3 周
自研串行调度器 小型 1-2 周
自研复杂调度器(含并发) 大型 1-2 个月

影响迁移成本的关键因素:

  1. 状态 schema 设计是否清晰:如果现有方案的 Agent 间通信靠 dict 乱传,迁移时需要先整理 schema
  2. 有没有完善的测试用例:LangGraph 的图行为是确定性的,但需要测试用例来验证行为等价
  3. 对历史会话的依赖程度:LangGraph 的 checkpoint 格式与现有存储不兼容,历史会话需要迁移或放弃

迁移策略:双写期

我们采用的是「双写期」策略,用 4 周时间把流量逐步从 AutoGen 切到 LangGraph:

第 1 周:新功能只在 LangGraph 实现,老功能继续用 AutoGen
第 2 周:10% 新增请求路由到 LangGraph,监控错误率
第 3 周:50% 新增请求路由到 LangGraph
第 4 周:100% 新增请求,老 AutoGen 进入只读状态
第 5 周:彻底下线 AutoGen,清理旧代码

路由层的实现非常简单:

import hashlib

def route_to_framework(user_id: str, feature_flags: dict) -> str:
    """基于用户 ID 的一致性路由,同一用户总是进同一个框架"""
    if feature_flags.get("langgraph_rollout_pct", 0) == 100:
        return "langgraph"
    
    # 一致性哈希,保证同一用户体验一致
    hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16) % 100
    rollout_pct = feature_flags.get("langgraph_rollout_pct", 0)
    
    return "langgraph" if hash_val < rollout_pct else "autogen"

双写期的核心价值:给你时间发现 LangGraph 在你特定业务场景下的行为差异,而不是靠大爆炸式迁移发现问题。


多 Agent 系统中最值得关注的安全问题

多 Agent 系统引入了一类传统微服务没有的安全威胁:Agent 间的信任边界

Prompt Injection 在多 Agent 场景的放大效应

单 Agent 的 Prompt Injection 攻击是已知问题(我们之前写过专门的文章)。多 Agent 系统会放大这个问题:

攻击者构造恶意输入 → Research Agent 处理 → 恶意指令被提取为「研究结论」
→ Writer Agent 把恶意指令误认为合法输入 → 生成攻击者期望的输出

即使每个 Agent 自身对 Prompt Injection 有防御,Agent 链路之间的信任传递也可能绕过防御。

防御策略

  1. 输出清洗(Output Sanitization):每个 Agent 的输出在传给下一个 Agent 之前,过滤掉可能是指令的内容
  2. 最小权限原则:Writer Agent 不应该有调用外部 API 的工具,即使 Researcher Agent 的输出里包含了这类指令
  3. Agent 间调用签名:在 State 里记录每一步数据的来源 Agent 和处理时间,便于事后审计
# 在 State 里记录数据来源链
class AgentState(TypedDict):
    artifacts: dict  # 各 Agent 产物
    provenance: dict  # 数据来源追踪

# 每个 Agent 写出数据时记录来源
def research_agent(state: AgentState) -> dict:
    result = do_research(state["task"])
    return {
        "artifacts": {"researcher": result},
        "provenance": {
            "researcher": {
                "agent": "researcher",
                "timestamp": datetime.now().isoformat(),
                "input_hash": hashlib.md5(state["task"].encode()).hexdigest()
            }
        }
    }

这个设计不只是安全层面的,在调试异常输出时也非常有用:你能直接追溯「这段内容是哪个 Agent 在什么时候写入的」。


结语:多 Agent 是个分布式系统问题

回头看这段经历,最大的教训是:多 Agent 架构不是 AI 问题,是分布式系统问题

  • 状态管理 → 和微服务的状态一致性是同一个问题
  • 故障恢复 → 和任务队列的 at-least-once 语义是同一个问题
  • 成本控制 → 和数据库查询优化是同一个问题(token 是计算资源)
  • Observability → 和分布式 tracing 是同一个问题

唯一的新变量是 LLM 的非确定性:你的 Agent 链路可能因为模型输出的一个词不同就走向完全不同的分支。这要求你的调度层有比传统微服务更强的防御性设计。

LangGraph 在这个维度上目前是工具选择里最成熟的。如果你现在刚开始构建多 Agent 系统,我的建议是先花 2 周学好 LangGraph 的图模型,这个投入值得。

最后说一个反直觉的经验:我们花在「如何让 Agent 更聪明」上的时间,远不如花在「如何让框架更可靠」上的时间带来的收益大。模型在进步,聪明这件事交给模型厂商去做。工程师的价值在于把这些模型能力稳定、可预期、有成本意识地组合成业务价值。

这就是为什么框架选型值得认真对待。它不只是技术选择,也是你的团队未来几年要面对什么样的工程问题的选择。


附录:快速检查清单——上生产前必须确认的 10 件事

不管你用哪个框架,在多 Agent 系统上生产之前,对照这个清单逐项确认:

终止保护(必须)

  • 每条 Agent 链路都有硬性轮次上限(max_iterations / max_consecutive_auto_reply)
  • 每个 LLM 调用都有超时配置(timeout_seconds)
  • 整个 pipeline 有全局超时兜底

成本控制(必须)

  • 每次 LLM 调用的 token 上限已设置(max_tokens)
  • 有 per-task 的 token budget 监控,超预算发告警
  • 对话历史有裁剪策略,防止二次方增长

故障恢复(强烈建议)

  • 已测试「Agent B 失败后整个 pipeline 的行为」
  • 重试逻辑有指数退避,而不是立即重试
  • 长任务有 checkpoint 机制,避免全量重跑

可观测性(强烈建议)

  • 每个 Agent 的调用链路有 trace id 关联
  • 关键指标已接入监控:token 消耗、延迟、错误率
  • 有能力回放单次失败 task 的完整执行链路

驉側话:什么时候不应该上多 Agent

这篇文章里是如何应对多 Agent 系统的工程问题。但有一个问题同样重要:你的场景真的需要多 Agent 吗?

以下情况下,单 Agent 就够了,不要引入多 Agent 复杂度:

1. 任务内在就是线性的

用户输入 → 检索 → 生成 → 输出。这个流程没有多个其并发的步骤,也没有需要动态路由的判断逻辑。此时用多 Agent 只是增加了调用延迟和复杂度。

2. 任务对 latency 极度敏感且需要达到 500ms 以内

每个额外的 Agent 层就是一次额外的 LLM API RTT。如果业务需要 sub-second 响应,优先考虑单 Agent 加工具调用,而不是多 Agent 并发。

3. 问题本身简单,是模型能力不够

如果你引入多 Agent 的原因是「单 Agent 没办法解决这个问题」,先检查是否是 prompt 设计问题,或者就应该换一个更强的模型。多 Agent 系统不是弥补模型能力不足的戞鸟工具。

4. 团队没有多 Agent 系统的运维经验

多 Agent 系统的调试和运维复杂度远超单 Agent。如果团队还没有成熟的 Agent 开发经验,引入多 Agent 只会把运维复杂度迅速叠加。

判断树

需要多个具有不同层次知识的専家协作?
  └ 是 → 多 Agent
  └ 否 → 单 Agent + 工具调用

任务需要并发处理且单个 Agent 没办法同时运行多个子任务?
  └ 是 → 多 Agent
  └ 否 → 单 Agent + 多次工具调用

工作流需要其中某些步骤的输出为另一些步骤的输入,且有内在岐1循环逐代逻辑?
  └ 是 → 多 Agent
  └ 否 → 单 Agent + 结构化输出

简单的语言则:如果一个工具调用能解决的,就不要用 Agent。留着 Agent 劳动,就要确保至少有 Agent 层面的智能和自主性在起作用。


实战建议:从第一个多 Agent 任务开始的正确姿势

如果你现在刚开始,以下是我们最想告诉过去的自己的几条建议:

第一步:先把单 Agent 做到极限

在引入第二个 Agent 之前,用尽一切手段优化单 Agent:更好的 prompt、更强的模型、工具调用、结构化输出。单 Agent 的极限往往比你想象的高很多。我们团队有一次把「必须多 Agent」的任务用更精细的 prompt 设计加工具调用解决了,省掉了整套多 Agent 架构的开发成本。

第二步:在纸上画出你的 Agent 图,然后问自己三个问题

  1. 这张图里哪些节点可以是普通函数?
  2. 这张图里哪些节点之间的通信可以是同步 API 调用而不是 LLM 调用?
  3. 这张图里最长的路径有多少步?每一步预期的延迟是多少?

如果第三个问题的答案超过 5 步,仔细想想是否真的需要这么多层。

第三步:从最简单的线性 pipeline 开始,而不是从复杂的层级结构开始

层级结构(Supervisor + 多个 Sub-Agent)听起来很优雅,但调试起来非常痛苦。先做线性的,证明每一步的输出质量可以接受之后,再引入动态路由和条件分支。

第四步:在生产环境的第一个月,把 token 成本监控和错误率监控放在最显眼的位置

不是 dashboard 深处,是每天工作时能看到的地方。多 Agent 系统的问题很容易在你不注意的时候悄悄演化成大问题。早发现早处理,比出了大事故再修要好得多。

第五步:为你的框架选择写下决策记录

无论你选了 LangGraph、AutoGen 还是自研,把选择的理由、当时考虑的替代方案、放弃其他方案的原因记录下来。六个月后你的团队可能会有新成员,或者框架本身又出了新版本,这份记录能帮你快速评估是否需要重新审视当初的决策。

多 Agent 工程是一个快速演化的领域。今天的最佳实践,明年可能就有更好的方案。保持学习,同时也保持批判性思维——不是所有新框架都值得立即迁移。

如果你在多 Agent 编排上踩过别的坑,欢迎在评论区交流,很期待听到不同场景下的实战经验。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐