引言:从Demo到生产,可靠性是最大的鸿沟

过去两年,LLM智能体从概念验证快速走向实际应用。然而,行业数据揭示了令人警醒的现实:大量AI智能体永远停留在原型阶段,它们在演示环境下表现得智能而流畅,一旦投入生产,就暴露出不可预测的故障、安全漏洞和失控的成本。

一个典型的生产级智能体失败场景是这样的:用户提交了一个复杂的多步骤请求,Agent在ReAct循环中逐步推理、执行工具调用。但某个中间步骤返回了意外结果,Agent陷入无限重试循环,每次迭代都在消耗Token和API费用。更糟糕的是,它可能在某个时刻被恶意提示注入,绕过安全控制,执行了未经授权的操作。

这些问题的根源,往往不是模型能力不足,而是架构设计缺乏对"可靠性"的系统性思考。本文聚焦Plan-then-Execute(计划-执行)模式,通过将战略规划与战术执行分离,构建具备可预测性、安全性和可恢复性的LLM智能体服务。以下所有代码示例均基于生产实践提炼,可直接用于项目参考。

一、为什么选择Plan-then-Execute模式?

1.1 ReAct模式的局限性

ReAct(Reason-Act)是最常见的Agent设计模式。Agent在一个循环中运行:思考 → 行动 → 观察 → 再思考。这种模式简单灵活,但存在一个致命弱点——“短视思维”

# ReAct模式的典型实现——每个步骤独立决策
while not done:
    thought = llm.think(context)      # 思考下一步
    action = llm.select_action(thought) # 决定动作
    observation = execute(action)      # 执行工具
    context.append(observation)        # 追加观察结果
    # 问题:Agent看不到全局,容易路径依赖

在复杂多步骤任务中,Agent可能因为某个中间观察结果而偏离主线,陷入低效路径甚至死循环。更关键的是,安全风险:如果某个步骤触发了间接提示注入攻击,恶意内容可能渗透后续所有步骤的上下文。

1.2 Plan-then-Execute的核心优势

Plan-then-Execute模式通过显式分离规划与执行,从根本上解决了ReAct模式存在的短视决策和安全隐患问题。该模式将整个Agent工作流划分为三个核心层次:

第一层:Planner(战略规划层)

Planner是整个Agent的"大脑",承担高层次的战略决策职责。它的输入是用户提交的原始目标,输出则是一份结构化、可执行的计划,通常以JSON格式或DAG(有向无环图)形式呈现。这份计划包含完整的步骤序列、每一步所需调用的工具、步骤间的依赖关系以及预期输出格式说明。

Planner的一个关键设计原则是:仅在此阶段调用昂贵的大模型(如GPT-4),且调用次数严格控制在1次,仅在必要时进行少量重规划。这种设计大幅降低了Token消耗,使成本变得可预测。更重要的是,Planner的输入仅限于用户最初提交的目标,不包含任何执行阶段的观察结果或中间数据,从而确保恶意内容无法渗透到规划阶段,污染后续的控制流。

第二层:Executor(战术执行层)

Executor是Agent的"手脚",负责将Planner生成的计划忠实地转化为具体行动。它按照计划中定义的顺序和依赖关系逐步执行,每一步可以调用外部工具、访问API接口或启动子Agent来完成特定任务。

与Planner不同,Executor可以使用更轻量级的小模型甚至纯确定性代码来实现,因为它的职责是"执行既定计划"而非"制定策略"。这种分层设计不仅降低了推理成本,还大幅提升了执行效率。Executor每次执行前都会验证当前步骤是否仍与原始计划一致,这种"控制流固化"机制有效防止了运行时注入攻击对Agent行为路径的篡改。

第三层:Verifier(验证与审批层,可选但强烈推荐)

Verifier在整个架构中扮演"质检员"和"安全阀"的角色,虽然它是可选组件,但在生产环境中几乎是不可或缺的。Verifier的职责分为两个阶段:

  • 执行前验证:在Planner生成计划之后、Executor启动执行之前,Verifier会主动检查计划的合理性、安全性和可行性。例如,它能够识别出是否包含高风险操作(如删除数据、修改系统配置),是否存在逻辑矛盾或依赖循环,以及所需工具是否真正可用。对于识别出的中低风险问题,Verifier会记录告警但不阻塞流程;对于高风险或明显不合理的计划,Verifier会标记为需要人工介入。

  • 执行后验证:在Executor完成某些关键步骤后,Verifier可以对执行结果进行校验,判断输出是否符合预期格式和内容要求,防止因模型幻觉或工具异常而产生错误结果。

此外,Verifier还承担着Human-in-the-Loop(人工介入)的协调职责。当遇到高风险操作、计划验证不通过或执行结果异常时,Verifier会暂停整个Agent的执行流程,生成待审批请求并交由人工决策,只有在获得明确批准后才允许继续执行。这种机制确保了AI系统在关键决策点上始终处于人类的监督之下。

通过这三层的协同配合,Plan-then-Execute模式成功将智能体的行为从不可预测的"自由发挥"转变为可观察、可控制、可恢复的工程化流程,为LLM智能体的生产级部署奠定了坚实的架构基础。

二、生产级实现:从零构建可靠Agent

2.1 核心组件实现

以下代码实现一个生产级的Plan-then-Execute Agent,包含计划生成、执行器、验证器和人工介入机制。

2.1.1 状态定义

使用类型安全的状态管理,基于LangGraph的状态图模式:

from typing import TypedDict, Annotated, Sequence, List, Optional
from langgraph.graph import StateGraph, END
from langgraph.checkpoint import MemorySaver
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
import operator

class AgentState(TypedDict):
    """Agent的全局状态 - 单一可信源"""
    messages: Annotated[Sequence[BaseMessage], operator.add]
    user_goal: str
    plan: Optional[List[dict]]          # 结构化计划: [{"step_id": 1, "task": "...", "tool": "...", "depends_on": []}]
    current_step: int
    execution_results: Annotated[List[dict], operator.add]
    step_status: dict                    # 每个步骤的执行状态: {"step_id": "pending|running|done|failed"}
    human_approval_needed: bool
    error_count: int
    max_retries: int = 3
2.1.2 Planner:生成结构化计划

Planner负责将用户目标转换为可执行的步骤序列。关键设计:使用结构化输出(JSON Schema)强制计划格式

from pydantic import BaseModel, Field
from typing import List, Optional
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import PydanticOutputParser

class PlanStep(BaseModel):
    """计划步骤的结构化定义"""
    step_id: int = Field(description="步骤编号")
    task_description: str = Field(description="该步骤要完成的任务描述")
    required_tool: str = Field(description="执行该步骤所需的工具名称")
    depends_on: List[int] = Field(default=[], description="依赖的前置步骤ID列表")
    expected_output: str = Field(description="该步骤期望的输出格式说明")

class PlanResult(BaseModel):
    """完整的计划结果"""
    steps: List[PlanStep] = Field(description="步骤列表")
    overall_goal: str = Field(description="对整体目标的描述")
    estimated_steps: int = Field(description="总步骤数")

def create_planner(llm: ChatOpenAI) -> callable:
    """创建Planner节点"""
    parser = PydanticOutputParser(pydantic_object=PlanResult)
    
    planner_prompt = """你是一个任务规划专家。请将以下用户目标分解为具体的、可执行的步骤序列。

用户目标: {goal}

**重要约束**:
1. 每个步骤必须可独立执行
2. 必须指明步骤间的依赖关系
3. 步骤总数不超过10步
4. 只规划你知道如何执行的步骤

**可用工具**: {available_tools}

{format_instructions}
"""
    
    def planner_node(state: AgentState) -> dict:
        messages = state.get("messages", [])
        user_goal = state.get("user_goal", "")
        
        # 只基于用户原始输入规划,不依赖中间执行结果
        prompt = planner_prompt.format(
            goal=user_goal,
            available_tools=", ".join(list_available_tools()),
            format_instructions=parser.get_format_instructions()
        )
        
        response = llm.invoke(prompt)
        plan = parser.parse(response.content)
        
        return {
            "plan": [step.dict() for step in plan.steps],
            "current_step": 0,
            "step_status": {step.step_id: "pending" for step in plan.steps}
        }
    
    return planner_node

关键设计决策:Planner的输入只包含用户原始目标,不包含任何执行阶段的上下文。这确保了即使执行过程中遇到恶意输入,Planner阶段生成的计划也不会被污染。

2.1.3 Executor:安全执行计划

Executor接收固化的计划,按顺序(或依赖关系)执行每个步骤。每个步骤执行前需要验证该步骤是否仍在计划中,防止控制流被篡改。

from typing import Dict, Any
import asyncio
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class ToolRegistry:
    """工具注册中心 - 所有工具调用经过统一入口"""
    _tools: Dict[str, callable] = {}
    
    @classmethod
    def register(cls, name: str, func: callable):
        cls._tools[name] = func
    
    @classmethod
    def execute(cls, tool_name: str, **kwargs) -> Any:
        if tool_name not in cls._tools:
            raise ValueError(f"未知工具: {tool_name}")
        # 执行前记录审计日志
        logger.info(f"执行工具: {tool_name}, 参数: {kwargs}")
        return cls._tools[tool_name](**kwargs)

def create_executor(llm: Optional[ChatOpenAI] = None) -> callable:
    """创建Executor节点 - 执行计划中的每一步"""
    
    def execute_step(step: dict, state: AgentState) -> dict:
        """执行单个步骤,带重试和超时控制"""
        step_id = step["step_id"]
        tool_name = step["required_tool"]
        task = step["task_description"]
        
        # 检查前置依赖
        for dep_id in step.get("depends_on", []):
            dep_status = state["step_status"].get(dep_id)
            if dep_status != "done":
                raise ValueError(f"依赖步骤 {dep_id} 尚未完成")
        
        # 执行工具调用(带超时和重试)
        max_retries = state.get("max_retries", 3)
        for attempt in range(max_retries):
            try:
                result = asyncio.wait_for(
                    ToolRegistry.execute(tool_name, task=task),
                    timeout=30
                )
                return {
                    "execution_results": [{
                        "step_id": step_id,
                        "result": result,
                        "status": "success",
                        "timestamp": datetime.now().isoformat()
                    }],
                    "step_status": {step_id: "done"}
                }
            except asyncio.TimeoutError:
                logger.warning(f"步骤 {step_id} 超时, 重试 {attempt+1}/{max_retries}")
            except Exception as e:
                logger.error(f"步骤 {step_id} 执行失败: {e}")
                if attempt == max_retries - 1:
                    return {
                        "execution_results": [{
                            "step_id": step_id,
                            "error": str(e),
                            "status": "failed"
                        }],
                        "step_status": {step_id: "failed"},
                        "error_count": state.get("error_count", 0) + 1
                    }
        return {}
    
    def executor_node(state: AgentState) -> dict:
        plan = state.get("plan", [])
        current_idx = state.get("current_step", 0)
        
        if current_idx >= len(plan):
            return {"current_step": current_idx}  # 所有步骤完成
        
        # 获取当前步骤
        step = plan[current_idx]
        
        # 【安全关键】检查当前步骤是否仍在原始计划中
        # 防止执行阶段被注入篡改控制流
        if step["step_id"] != state["plan"][current_idx]["step_id"]:
            raise SecurityError("检测到控制流篡改!")
        
        # 执行步骤
        result = execute_step(step, state)
        result["current_step"] = current_idx + 1
        
        return result
    
    return executor_node

安全要点

  1. 工具调用统一入口:所有工具调用经过ToolRegistry,便于审计、限流和安全策略注入。
  2. 步骤编号验证:执行前验证当前步骤编号与原始计划一致,防止注入攻击篡改控制流。
  3. 超时与重试:每个步骤独立超时控制,防止单个卡住阻塞整个流程。

2.2 可靠性增强:验证器与人工介入

仅靠Planner+Executor还不够。生产环境中,我们需要计划验证和**人工介入(HITL)**机制。

2.2.1 Verifier:计划合理性校验
def create_verifier(llm: ChatOpenAI) -> callable:
    """计划验证器 - 在执行前检查计划是否安全、合理"""
    
    verifier_prompt = """请验证以下计划是否安全、合理:

计划: {plan}
用户目标: {goal}

验证维度:
1. 安全合规: 是否有高风险操作?是否包含删除、修改敏感数据的步骤?
2. 逻辑合理性: 步骤依赖关系是否正确?是否遗漏关键步骤?
3. 可行性: 所有工具是否可用?步骤描述是否足够清晰?

请以JSON格式输出验证结果:
{
    "is_valid": true/false,
    "issues": ["问题列表"],
    "risk_level": "low|medium|high",
    "suggestions": ["改进建议"]
}
"""
    
    def verifier_node(state: AgentState) -> dict:
        plan = state.get("plan", [])
        goal = state.get("user_goal", "")
        
        # 如果计划为空,无需验证
        if not plan:
            return {"human_approval_needed": False}
        
        response = llm.invoke(verifier_prompt.format(
            plan=plan,
            goal=goal
        ))
        
        try:
            validation = json.loads(response.content)
        except:
            # 解析失败,默认通过但标记为需要人工复核
            return {"human_approval_needed": True}
        
        # 高风险或无效计划 → 需要人工介入
        if not validation.get("is_valid") or validation.get("risk_level") == "high":
            return {
                "human_approval_needed": True,
                "validation_issues": validation.get("issues", [])
            }
        
        # 中风险 → 记录但不阻塞
        if validation.get("risk_level") == "medium":
            logger.warning(f"计划存在中风险问题: {validation.get('issues')}")
        
        return {"human_approval_needed": False}
    
    return verifier_node
2.2.2 人工介入(Human-in-the-Loop)

对于高风险操作,Agent应只提议,不执行,等待人工审批。

from typing import Literal

def create_human_approval_node() -> callable:
    """人工介入节点 - 暂停执行等待审批"""
    
    def human_node(state: AgentState) -> dict:
        """返回待审批状态,由外部系统处理"""
        pending_step = state["plan"][state["current_step"]]
        
        # 生成审批请求
        approval_request = {
            "step_id": pending_step["step_id"],
            "task": pending_step["task_description"],
            "tool": pending_step["required_tool"],
            "risk_level": "high",
            "human_review_required": True
        }
        
        # 状态持久化,等待外部回调
        # 外部系统通过 update_state() 注入审批结果
        return {
            "pending_approval": approval_request,
            "step_status": {pending_step["step_id"]: "pending_approval"}
        }
    
    return human_node

def approve_step(step_id: int, approved: bool, signature: str) -> dict:
    """外部审批回调函数"""
    if not approved:
        return {
            "step_status": {step_id: "rejected"},
            "execution_results": [{
                "step_id": step_id,
                "status": "rejected_by_human",
                "human_signature": signature
            }]
        }
    
    return {
        "step_status": {step_id: "approved"},
        "human_approval_needed": False
    }

2.3 组装完整Agent

使用LangGraph将各节点组装成有状态图:

from langgraph.graph import StateGraph, END
from langgraph.checkpoint import MemorySaver  # 生产环境建议用PostgresSaver

def build_agent(llm: ChatOpenAI) -> StateGraph:
    """构建完整的Plan-then-Execute Agent"""
    
    # 创建节点
    planner = create_planner(llm)
    verifier = create_verifier(llm)
    executor = create_executor(llm)
    human_approval = create_human_approval_node()
    
    # 构建状态图
    workflow = StateGraph(AgentState)
    
    # 添加节点
    workflow.add_node("planner", planner)
    workflow.add_node("verifier", verifier)
    workflow.add_node("executor", executor)
    workflow.add_node("human_approval", human_approval)
    
    # 设置入口
    workflow.set_entry_point("planner")
    
    # 定义边
    workflow.add_edge("planner", "verifier")
    
    # Verifier后的条件路由
    def after_verifier(state: AgentState) -> Literal["human_approval", "executor", "__end__"]:
        if state.get("human_approval_needed"):
            return "human_approval"
        if not state.get("plan"):
            return "__end__"
        return "executor"
    
    workflow.add_conditional_edges("verifier", after_verifier)
    
    # Human Approval后的路由
    def after_approval(state: AgentState) -> Literal["executor", "__end__"]:
        pending = state.get("pending_approval")
        if pending:
            step_status = state["step_status"].get(pending["step_id"])
            if step_status == "approved":
                return "executor"
            # 拒绝或其他状态 → 终止
        return "__end__"
    
    workflow.add_conditional_edges("human_approval", after_approval)
    
    # Executor后的路由 - 循环或结束
    def after_executor(state: AgentState) -> Literal["executor", "__end__"]:
        current = state.get("current_step", 0)
        total = len(state.get("plan", []))
        
        # 检查是否有步骤失败
        if state.get("error_count", 0) > 3:
            return "__end__"  # 重试耗尽
        
        # 检查是否有步骤需要人工介入
        for step_id, status in state["step_status"].items():
            if status == "pending_approval":
                return "human_approval"
        
        if current >= total:
            return "__end__"
        return "executor"  # 继续下一步
    
    workflow.add_conditional_edges("executor", after_executor)
    
    return workflow

# 使用示例
def run_agent(user_goal: str):
    llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.1)
    
    # 生产环境使用PostgresSaver持久化状态
    checkpoint_saver = MemorySaver()
    
    app = build_agent(llm).compile(checkpointer=checkpoint_saver)
    
    # 配置线程ID支持恢复
    config = {"configurable": {"thread_id": "user_session_123"}}
    
    initial_state = {
        "messages": [HumanMessage(content=user_goal)],
        "user_goal": user_goal
    }
    
    # 流式执行,可处理人工介入暂停
    for event in app.stream(initial_state, config):
        print(event)
    
    # 检查最终状态
    final_state = app.get_state(config)
    return final_state

三、可靠性设计模式总结

3.1 五种核心稳健性模式

生产级Agent系统通常组合使用以下设计模式:

模式 适用场景 关键实现
Plan-then-Execute 复杂多步骤任务 规划与执行分离,计划前置固化
有界循环 防止无限重试 步骤数、Token数、时间三重预算限制
双LLM模式 防止提示注入 特权LLM隔离处理,阻断恶意数据传递
审查器-评判器 内容质量保证 独立模型验证,减少幻觉和偏差
检查点恢复 长时间任务 状态持久化,支持断点续传和人工介入

3.2 生产环境部署清单

从演示到生产,确保以下核心能力就位:

  • 状态持久化:用PostgresSaver替代MemorySaver,支持任务恢复
  • 预算限制:每轮最多10步、8000 Token、120秒
  • 可观测性:OpenTelemetry追踪、Prometheus指标、Grafana仪表盘
  • 策略引擎:Open Policy Agent (OPA) 控制工具调用权限
  • 金丝雀发布:先影子模式→内部小范围→1%流量→逐步放量
  • CI/CD:黄金测试集验证每次变更,准确率下降即构建失败

结语

构建高可靠性的LLM智能体,核心不在于让模型"更聪明",而在于用工程化手段驯服其非确定性。Plan-then-Execute模式通过分离规划与执行、固化控制流、引入验证和人工介入,将Agent从"不可预测的提示链"转变为"可观察、可控制、可恢复的生产级服务"。

正如一位从业者所言:“优秀智能体的衡量标准不是智力,而是它对安全边界的服从。” 当你的Agent能够优雅地处理失败、主动寻求帮助、并让每次行为都可审计时,它才真正准备好面对真实世界的复杂与不确定。


Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐