AI Agent 错误处理:从工具调用失败到 LLM 幻觉的防御性设计

cover

一、Agent 崩溃的 N 种方式:不只是 API 超时

AI Agent 的错误场景远比传统软件复杂。传统软件的错误主要来自网络超时、数据校验失败、资源不足等确定性原因。但 Agent 的错误来源多了一层不确定性——LLM 本身的输出不可控。

一个典型的工作流 Agent 执行路径:接收用户指令 → LLM 规划步骤 → 调用工具 A → 解析返回结果 → 调用工具 B → 生成最终回复。这条链路上至少有 5 个可能出错的节点:

LLM 输出格式错误。 要求返回 JSON,模型返回了带注释的 JSON 或格式错误的 JSON。解析失败,后续流程中断。

工具调用参数错误。 LLM 生成的函数参数类型不匹配或缺少必填参数,工具执行直接报错。

工具执行环境错误。 数据库连接断开、第三方 API 限流、文件权限不足——这些是传统意义上的运行时错误。

LLM 幻觉。 模型编造了不存在的工具名称、捏造了不存在的参数值、或者对工具返回结果做了错误解读。这类错误不会抛异常,但会导致业务逻辑错误。

死循环。 LLM 反复调用同一个工具、反复重试同一个失败的操作,消耗大量 Token 和时间而不产生有效进展。

如果 Agent 没有针对这些错误场景的防御性设计,任何一个节点的失败都会导致整个工作流崩溃,且错误信息对用户毫无意义。

二、Agent 错误分类与防御策略

graph TB
    subgraph 错误分类
        A[LLM 输出错误] --> A1[格式错误]
        A --> A2[幻觉输出]
        A --> A3[拒绝回答]
        B[工具调用错误] --> B1[参数校验失败]
        B --> B2[执行超时]
        B --> B3[环境异常]
        C[流程控制错误] --> C1[死循环]
        C --> C2[步骤超限]
        C --> C3[上下文溢出]
    end

    subgraph 防御策略
        D[输出校验 + 重试] --> A1
        E[结果交叉验证] --> A2
        F[降级回复] --> A3
        G[参数 Schema 校验] --> B1
        H[超时 + 重试] --> B2
        I[熔断 + 降级] --> B3
        J[步骤计数 + 强制终止] --> C1
        K[最大步数限制] --> C2
        L[上下文预算管理] --> C3
    end

三、生产级 Agent 错误处理框架

3.1 LLM 输出校验与自动修复

import json
import re
from dataclasses import dataclass
from typing import Optional, Any

@dataclass
class ParsedOutput:
    """LLM 输出解析结果"""
    success: bool
    data: Optional[dict] = None
    error: Optional[str] = None
    raw_output: str = ""

class LLMOutputParser:
    """LLM 输出解析器:带自动修复能力

    设计思路:
    - 优先严格解析,失败后尝试修复常见格式问题
    - 修复仍失败则要求 LLM 重新生成
    - 最多重试 2 次,避免无限循环
    """

    def __init__(self, llm_client=None, max_retries: int = 2):
        self.llm_client = llm_client
        self.max_retries = max_retries

    def parse_json(self, raw: str) -> ParsedOutput:
        """解析 LLM 输出为 JSON,带自动修复"""
        # 第一步:直接解析
        data = self._try_parse_json(raw)
        if data is not None:
            return ParsedOutput(success=True, data=data, raw_output=raw)

        # 第二步:尝试提取 JSON 块(模型可能包裹在 ```json ... ``` 中)
        extracted = self._extract_json_block(raw)
        if extracted:
            data = self._try_parse_json(extracted)
            if data is not None:
                return ParsedOutput(success=True, data=data, raw_output=raw)

        # 第三步:尝试修复常见格式问题
        fixed = self._try_fix_json(raw)
        if fixed:
            data = self._try_parse_json(fixed)
            if data is not None:
                return ParsedOutput(success=True, data=data, raw_output=raw)

        return ParsedOutput(
            success=False,
            error=f"JSON 解析失败,原始输出: {raw[:200]}",
            raw_output=raw,
        )

    def _try_parse_json(self, text: str) -> Optional[dict]:
        """尝试解析 JSON"""
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            return None

    def _extract_json_block(self, text: str) -> Optional[str]:
        """从 Markdown 代码块中提取 JSON"""
        pattern = r'```(?:json)?\s*\n?(.*?)\n?```'
        match = re.search(pattern, text, re.DOTALL)
        if match:
            return match.group(1).strip()
        return None

    def _try_fix_json(self, text: str) -> Optional[str]:
        """修复常见的 JSON 格式问题"""
        # 移除行内注释:// comment
        fixed = re.sub(r'//.*?$', '', text, flags=re.MULTILINE)
        # 移除尾随逗号:, } 或 , ]
        fixed = re.sub(r',\s*([}\]])', r'\1', fixed)
        # 修复单引号为双引号
        fixed = fixed.replace("'", '"')
        return fixed if fixed != text else None

    def parse_with_retry(
        self, prompt: str, schema: dict = None
    ) -> ParsedOutput:
        """解析 LLM 输出,失败时自动重试

        重试时将错误信息反馈给 LLM,让它修正输出
        """
        for attempt in range(self.max_retries + 1):
            if attempt > 0 and self.llm_client:
                # 重试时将错误信息反馈给模型
                retry_prompt = (
                    f"上次的输出格式有误,请修正后重新输出。\n"
                    f"原始提示:{prompt}\n"
                    f"要求:严格输出 JSON 格式,不要添加注释或额外文本"
                )
                response = self.llm_client.chat.completions.create(
                    model="gpt-4o",
                    messages=[{"role": "user", "content": retry_prompt}],
                    max_tokens=1000,
                    temperature=0,
                    timeout=15,
                )
                raw = response.choices[0].message.content
            else:
                # 首次调用由外部完成,这里只解析
                raw = prompt

            result = self.parse_json(raw)
            if result.success:
                # 如果有 Schema,校验字段完整性
                if schema:
                    missing = self._validate_schema(result.data, schema)
                    if missing:
                        result.success = False
                        result.error = f"缺少必填字段: {missing}"
                        continue
                return result

        return result

3.2 工具调用错误处理与熔断

from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Optional
import time

class CircuitState(Enum):
    CLOSED = "closed"       # 正常状态
    OPEN = "open"           # 熔断状态:直接拒绝请求
    HALF_OPEN = "half_open" # 半开状态:允许少量请求探测

@dataclass
class CircuitBreaker:
    """熔断器:防止对故障工具的持续调用

    设计思路:
    - 连续失败 N 次后进入 OPEN 状态,直接拒绝请求
    - 冷却期后进入 HALF_OPEN 状态,允许 1 次探测
    - 探测成功则恢复 CLOSED,失败则重回 OPEN
    """
    name: str
    failure_threshold: int = 5      # 连续失败阈值
    recovery_timeout: float = 30.0  # 冷却期(秒)
    state: CircuitState = CircuitState.CLOSED
    failure_count: int = 0
    last_failure_time: float = 0

    def can_execute(self) -> bool:
        """判断是否允许执行"""
        if self.state == CircuitState.CLOSED:
            return True
        if self.state == CircuitState.OPEN:
            # 冷却期已过,进入半开状态
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                return True
            return False
        if self.state == CircuitState.HALF_OPEN:
            return True  # 半开状态允许 1 次探测
        return False

    def record_success(self):
        """记录成功:重置计数器"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def record_failure(self):
        """记录失败:累加计数,超阈值则熔断"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN


@dataclass
class ToolCallResult:
    """工具调用结果"""
    success: bool
    data: Any = None
    error: Optional[str] = None
    tool_name: str = ""
    execution_time: float = 0.0

class ToolExecutor:
    """工具执行器:带超时、重试和熔断

    设计思路:
    - 每个工具独立熔断,一个工具故障不影响其他工具
    - 超时保护:防止工具执行挂起
    - 参数预校验:在调用工具前校验参数,避免无效调用
    """

    def __init__(self, timeout: float = 10.0, max_retries: int = 2):
        self.timeout = timeout
        self.max_retries = max_retries
        self.circuit_breakers: dict[str, CircuitBreaker] = {}

    def execute(
        self,
        tool_name: str,
        tool_fn: Callable,
        params: dict,
        param_schema: dict = None,
    ) -> ToolCallResult:
        """执行工具调用,带完整的错误处理链"""

        # 第一步:参数预校验
        if param_schema:
            validation_error = self._validate_params(params, param_schema)
            if validation_error:
                return ToolCallResult(
                    success=False,
                    error=f"参数校验失败: {validation_error}",
                    tool_name=tool_name,
                )

        # 第二步:熔断检查
        cb = self._get_circuit_breaker(tool_name)
        if not cb.can_execute():
            return ToolCallResult(
                success=False,
                error=f"工具 {tool_name} 已熔断,请稍后重试",
                tool_name=tool_name,
            )

        # 第三步:带重试的执行
        last_error = None
        for attempt in range(self.max_retries + 1):
            start_time = time.time()
            try:
                result = tool_fn(**params)
                execution_time = time.time() - start_time

                cb.record_success()
                return ToolCallResult(
                    success=True,
                    data=result,
                    tool_name=tool_name,
                    execution_time=execution_time,
                )
            except Exception as e:
                execution_time = time.time() - start_time
                last_error = str(e)
                cb.record_failure()

        return ToolCallResult(
            success=False,
            error=f"工具 {tool_name} 执行失败(重试 {self.max_retries} 次): {last_error}",
            tool_name=tool_name,
            execution_time=time.time() - start_time,
        )

    def _get_circuit_breaker(self, tool_name: str) -> CircuitBreaker:
        """获取或创建工具的熔断器"""
        if tool_name not in self.circuit_breakers:
            self.circuit_breakers[tool_name] = CircuitBreaker(name=tool_name)
        return self.circuit_breakers[tool_name]

    def _validate_params(self, params: dict, schema: dict) -> Optional[str]:
        """校验参数是否符合 Schema"""
        required = schema.get("required", [])
        for field_name in required:
            if field_name not in params:
                return f"缺少必填参数: {field_name}"

        properties = schema.get("properties", {})
        for key, value in params.items():
            if key not in properties:
                return f"未知参数: {key}"
            expected_type = properties[key].get("type")
            if expected_type and not self._check_type(value, expected_type):
                return f"参数 {key} 类型错误: 期望 {expected_type}"

        return None

    def _check_type(self, value: Any, expected_type: str) -> bool:
        """检查值类型"""
        type_map = {
            "string": str,
            "integer": int,
            "number": (int, float),
            "boolean": bool,
            "array": list,
            "object": dict,
        }
        expected = type_map.get(expected_type)
        if expected is None:
            return True
        return isinstance(value, expected)

3.3 Agent 执行循环的防护机制

class AgentExecutor:
    """Agent 执行器:带步骤限制和死循环检测

    设计思路:
    - 最大步数限制:防止 Agent 无限执行
    - 重复检测:检测 Agent 是否在重复相同的操作
    - 错误累积:连续错误超过阈值时终止执行
    """

    def __init__(
        self,
        max_steps: int = 15,
        max_consecutive_errors: int = 3,
        repeat_threshold: int = 3,
    ):
        self.max_steps = max_steps
        self.max_consecutive_errors = max_consecutive_errors
        self.repeat_threshold = repeat_threshold

    def execute(self, agent, user_input: str) -> dict:
        """执行 Agent 循环,带防护机制"""
        step_count = 0
        consecutive_errors = 0
        action_history = []  # 记录动作历史,用于检测重复
        result = {"success": False, "steps": [], "error": None}

        while step_count < self.max_steps:
            step_count += 1

            # 让 Agent 决定下一步动作
            action = agent.decide_next_action(user_input, result["steps"])

            # 检测死循环:连续执行相同动作
            action_key = f"{action.tool_name}:{json.dumps(action.params, sort_keys=True)}"
            action_history.append(action_key)
            if self._is_repeating(action_history):
                result["error"] = (
                    f"检测到重复操作: {action.tool_name},"
                    f"可能陷入死循环,强制终止"
                )
                break

            # 执行动作
            tool_result = agent.execute_action(action)

            step_record = {
                "step": step_count,
                "action": action.tool_name,
                "success": tool_result.success,
                "error": tool_result.error,
            }
            result["steps"].append(step_record)

            if tool_result.success:
                consecutive_errors = 0
                # 检查 Agent 是否认为任务完成
                if agent.is_task_complete():
                    result["success"] = True
                    result["final_answer"] = agent.generate_final_answer()
                    break
            else:
                consecutive_errors += 1
                if consecutive_errors >= self.max_consecutive_errors:
                    result["error"] = (
                        f"连续 {consecutive_errors} 步执行失败,"
                        f"最后一次错误: {tool_result.error}"
                    )
                    break

        else:
            # 步数超限
            result["error"] = f"执行步数超过 {self.max_steps} 步限制"

        result["total_steps"] = step_count
        return result

    def _is_repeating(self, history: list[str]) -> bool:
        """检测是否在重复相同操作"""
        if len(history) < self.repeat_threshold:
            return False
        # 检查最近 N 次操作是否完全相同
        recent = history[-self.repeat_threshold:]
        return len(set(recent)) == 1

四、防御性设计的代价与适用边界

重试机制可能放大故障。 当下游服务过载时,Agent 的重试会加重下游压力。必须配合熔断器使用——熔断打开后不再重试,直接降级。

输出校验增加延迟。 JSON 解析、Schema 校验、重试生成都会增加单次调用的延迟。对于实时性要求极高的场景(如语音助手),需要权衡校验深度和响应速度。

死循环检测的误判。 某些合法场景需要多次调用同一工具(如分页查询),重复检测可能误判为死循环。对策:在动作中附加"调用原因"字段,只有原因也相同时才判定为重复。

适用场景: 所有生产级 Agent 系统,尤其是涉及工具调用和多步骤工作流的 Agent。不适用场景: 单轮问答、纯文本生成——这些场景的错误类型简单,不需要复杂的防御框架。

五、总结

AI Agent 的错误处理必须覆盖三个层面:LLM 输出的格式校验与自动修复、工具调用的超时重试与熔断、执行循环的步数限制与死循环检测。每一层防御都有其代价——重试增加延迟、熔断导致降级、步数限制可能中断合法操作。防御性设计的目标不是消除所有错误,而是在错误发生时保证系统不崩溃、用户有反馈、问题可追溯。生产级 Agent 的可靠性不是靠 LLM 的能力保证的,而是靠工程化的错误处理框架保证的。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐