AI Agent 可靠性实战:从重试到熔断的生产级方案
上个月我的 AI Agent 凌晨 3 点崩了。
不是代码 bug,不是 API 挂了,是 Gemini 突然开始返回空响应——HTTP 200,body 里啥也没有。Agent 傻傻重试了 5 次,每次都以为"网络波动",最后把一整天 1500 次 API 额度全烧光了。
第二天早上我看到账单的时候,血压直接拉满。
这让我意识到一个问题:我们这代人写 AI Agent,99% 的精力花在"让它能干活",几乎没人想"它干砸了怎么办"。 demo 跑通就觉得自己赢了,但你让它跑一个月试试?各种诡异的失败模式会教你做人。
摘要:本文从三个真实生产事故出发,系统拆解 AI Agent 在 7×24 运行中遇到的 6 类典型故障模式,给出重试策略、降级链、熔断器和状态持久化的完整实现方案。读完你会知道怎么让 Agent 在没人看着的时候不把自己玩死。
1. 背景:Demo 和生产之间的鸿沟
先讲三个让我记忆深刻的事故。
事故一:空响应烧配额。 Gemini 2.5 Flash 在凌晨时段偶发返回 {"candidates": []},HTTP 状态码 200,没有任何错误信息。Agent 的重试逻辑只检查 HTTP 状态码,于是它默默重试了 1500 次。
事故二:429 雪崩。 某国产模型在月初配额重置后突然严格限流,Agent 的固定间隔重试全部命中限流窗口,5 个 cron 任务互相踩踏,一小时内触发了 2000+ 次 429。
事故三:Context 膨胀。 一个长任务跑了 6 小时后,对话历史膨胀到 80K tokens,模型开始"忘记"最开始的任务目标,在中间某一步无限循环。
这三个事故指向同一个根因:Agent 框架假设了"理想世界"——API 稳定、模型听话、错误可预测。 真实世界不是这样的。
graph TD
A[Agent 发起调用] --> B{API 响应}
| B -->|200 + 正常数据| C[继续执行] |
| B -->|200 + 空响应| D[❌ 传统重试:当网络波动处理] |
| B -->|429 限流| E[❌ 固定间隔重试:雪崩] |
| B -->|500 服务端错误| F[✅ 指数退避重试] |
| B -->|连接超时| G[✅ 指数退避重试] |
D --> H[烧配额/死循环]
E --> I[多任务互相踩踏]
F --> J[可能恢复]
G --> J
说白了就是:重试策略单薄 = 定时炸弹。
2. 六类故障模式分类
跑了三个月 7×24 cron,我总结出 AI Agent 会遇到的故障一共六类:
| 故障类型 | 典型表现 | 发生频率 | 危害等级 | 传统重试有效? |
|---|---|---|---|---|
| 网络层 | 连接超时、DNS 失败、TLS 握手失败 | 高(日均 3-8 次) | 低 | ✅ 指数退避有效 |
| HTTP 层 | 429、502、503、504 | 中(日均 1-3 次) | 中-高 | ⚠️ 需区分状态码 |
| 应用层-空响应 | 200 但 body 为空或 results=[] | 低(周均 1-2 次) | 高 | ❌ 传统策略完全无效 |
| 应用层-内容异常 | 模型输出格式错误、幻觉、截断 | 中 | 中 | ❌ 需内容校验 |
| 配额/计费 | 429(超配额)、402(欠费) | 低 | 极高 | ❌ 重试会烧钱 |
| Context 层 | Token 超限、对话历史膨胀、目标漂移 | 长任务高频 | 高 | ❌ 需主动管理 |
这里面最阴险的是「应用层-空响应」——HTTP 库告诉你一切正常,但 payload 是空的。大部分 Agent 框架的异常处理根本覆盖不到这个 case。
3. 可靠性工具箱
3.1 智能重试:不只是指数退避
普通的指数退避长这样:
import time
import random
def naive_retry(func, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
这个实现有三个致命问题:
1. 不区分异常类型——429 和 500 用同一个策略
2. 没有 max_delay 上限——第 10 次重试会等 1024 秒
3. 不检查响应内容——200 + 空 body 直接放过
生产级版本:
import time
import random
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, Any
class RetryDecision(Enum):
RETRY = "retry"
FAIL_FAST = "fail_fast" # 立即失败,不重试
SWITCH_MODEL = "switch_model" # 降级到备用模型
@dataclass
class RetryConfig:
max_retries: int = 4
base_delay: float = 1.0
max_delay: float = 60.0
jitter: bool = True
def classify_error(response: Any, exception: Optional[Exception]) -> RetryDecision:
"""核心:根据错误类型决定重试策略"""
if exception is not None:
err_str = str(exception).lower()
# 连接层错误 → 可以重试
if any(kw in err_str for kw in ("timeout", "connection", "reset")):
return RetryDecision.RETRY
# DNS/TLS 错误 → 快速失败(重试没用)
if any(kw in err_str for kw in ("name resolution", "ssl", "tls")):
return RetryDecision.FAIL_FAST
# HTTP 状态码判断
status = getattr(response, 'status_code', None)
if status == 429:
return RetryDecision.SWITCH_MODEL # 限流 → 换模型
if status in (402, 403):
return RetryDecision.FAIL_FAST # 配额/鉴权 → 立即停
if status and status >= 500:
return RetryDecision.RETRY # 服务端错误 → 重试
# 最难搞的:200 但空响应
if status == 200:
body = getattr(response, 'json', lambda: {})()
if not body or not body.get("choices"):
return RetryDecision.SWITCH_MODEL # 空响应 → 换模型
return RetryDecision.RETRY
def smart_retry(func: Callable, config: RetryConfig = RetryConfig()):
last_exception = None
for attempt in range(config.max_retries + 1):
try:
response = func()
decision = classify_error(response, None)
if decision == RetryDecision.RETRY and attempt < config.max_retries:
delay = min(config.base_delay * (2 ** attempt), config.max_delay)
if config.jitter:
delay += random.uniform(0, delay * 0.3)
time.sleep(delay)
continue
return response, decision
except Exception as e:
last_exception = e
decision = classify_error(None, e)
if decision == RetryDecision.FAIL_FAST:
raise
if decision == RetryDecision.SWITCH_MODEL:
raise # 外层捕获后切换模型
if attempt < config.max_retries:
delay = min(config.base_delay * (2 ** attempt), config.max_delay)
time.sleep(delay)
raise last_exception
三个关键改进:
- classify_error 把错误分到三个桶:重试 / 立即失败 / 换模型
- max_delay 封顶 防止退避时间爆炸
- jitter 用 30% 随机抖动避免惊群效应
3.2 降级链:不要在一棵树上吊死
这是我用了两个月后沉淀下来的模型降级链:
主模型 (Claude Sonnet 4)
↓ 429/空响应
备用模型一 (Gemini 2.5 Pro)
↓ 也挂了
备用模型二 (DeepSeek V3)
↓ 全挂了
本地兜底 (Ollama + Qwen 7B)
↓ 本地都挂了
降级为预设回复 / 标记为待人工处理
实现:
from typing import List, Dict
class ModelFallbackChain:
def __init__(self, models: List[Dict]):
"""
models: [
{"name": "claude-sonnet-4", "provider": "anthropic", "cost_per_1k": 0.003},
{"name": "gemini-2.5-pro", "provider": "google", "cost_per_1k": 0.00125},
...
]
"""
self.models = models
self.current_index = 0
self.failure_counts: Dict[str, int] = {}
def get_model(self) -> Dict:
if self.current_index >= len(self.models):
raise RuntimeError("所有模型已耗尽,任务需要人工介入")
return self.models[self.current_index]
def fallback(self, reason: str) -> Dict:
name = self.models[self.current_index]["name"]
self.failure_counts[name] = self.failure_counts.get(name, 0) + 1
self.current_index += 1
# 熔断:同一个模型失败超过阈值,跳过它 5 分钟
if self.failure_counts[name] >= 3:
print(f"⚠️ {name} 已触发熔断,5分钟内不再使用")
return self.get_model()
def reset(self):
self.current_index = 0
实际效果:空响应问题从"烧光配额"变成"多等 2 秒自动切模型",用户感知不到。
3.3 熔断器:防止连锁故障
五个 cron 任务共享同一个 API key 的时候,一个任务触发 429 会导致其他四个也全部炸。这就是没有熔断的后果。
import threading
from datetime import datetime, timedelta
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=300):
self.threshold = failure_threshold
self.timeout = recovery_timeout # 秒
self.failures = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED / OPEN / HALF_OPEN
self._lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self._lock:
if self.state == "OPEN":
if self._should_attempt_reset():
self.state = "HALF_OPEN"
print("🔶 熔断器进入半开状态,尝试恢复…")
else:
raise CircuitBreakerOpenError(
f"熔断器打开中,{self.timeout}秒后自动恢复"
)
try:
result = func(*args, **kwargs)
if self.state == "HALF_OPEN":
self._reset()
return result
except Exception as e:
self._record_failure()
raise
def _record_failure(self):
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.threshold:
self.state = "OPEN"
print(f"🔴 熔断器打开!连续 {self.failures} 次失败,暂停 {self.timeout}秒")
def _should_attempt_reset(self):
if self.last_failure_time is None:
return True
return datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout)
def _reset(self):
self.failures = 0
self.state = "CLOSED"
print("✅ 熔断器恢复,状态:CLOSED")
class CircuitBreakerOpenError(Exception):
pass
用法很简单——把 API 调用包在熔断器里:
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=300)
def call_llm(prompt):
return breaker.call(openai_client.chat.completions.create,
model="gpt-4", messages=[{"role": "user", "content": prompt}])
一个任务触发熔断后,其他任务看到 OPEN 状态直接快速失败,不会再去撞墙。
3.4 状态持久化:长任务不能断点续传?
Agent 跑 6 小时的任务,中间崩了怎么办?从头再来 = 浪费 token = 烧钱。
我现在的方案是 checkpoint + 增量恢复:
import json
import os
from datetime import datetime
class AgentCheckpoint:
def __init__(self, task_id: str, checkpoint_dir: str = "/tmp/agent_checkpoints"):
self.task_id = task_id
self.dir = checkpoint_dir
os.makedirs(self.dir, exist_ok=True)
self.path = os.path.join(self.dir, f"{task_id}.json")
def save(self, step: int, state: dict, context_summary: str):
"""保存当前步骤的状态快照"""
checkpoint = {
"task_id": self.task_id,
"step": step,
"state": state,
"context_summary": context_summary, # 压缩后的上下文摘要
"timestamp": datetime.now().isoformat(),
"total_tokens_used": state.get("tokens", 0),
}
with open(self.path, "w") as f:
json.dump(checkpoint, f, indent=2, ensure_ascii=False)
def load(self) -> dict | None:
if not os.path.exists(self.path):
return None
with open(self.path) as f:
return json.load(f)
def can_resume(self) -> bool:
cp = self.load()
if cp is None:
return False
# 超过 24 小时的 checkpoint 视为过期
elapsed = datetime.now() - datetime.fromisoformat(cp["timestamp"])
return elapsed.total_seconds() < 86400
配合 Context 压缩——每 5 步生成一个摘要,重建对话时用摘要 + 最近 3 步的完整对话,token 从 80K 压到 8K。
环境要求:以上代码使用 Python 3.10+(
dict | None联合类型语法),熔断器依赖threading(标准库,无需额外安装)。
4. 组装起来:一个生产级 Agent 循环
把重试、降级、熔断、checkpoint 串起来:
graph TD
START[任务开始] --> CK{检查 Checkpoint}
| CK -->|有且有效| RESUME[从断点恢复] |
| CK -->|无或过期| INIT[初始化新任务] |
RESUME --> EXEC
INIT --> EXEC
EXEC[执行当前步骤] --> CB{熔断器检查}
| CB -->|OPEN| WAIT[等待恢复时间] |
WAIT --> CB
| CB -->|CLOSED/HALF| CALL[调用 LLM] |
CALL --> RES{响应分类}
| RES -->|成功| SAVE[保存 Checkpoint] |
| RES -->|可重试错误| RETRY[指数退避重试] |
| RES -->|需降级| FALLBACK[切换模型] |
RETRY --> CB
FALLBACK --> CB
SAVE --> CHECK{任务完成?}
| CHECK -->|否| EXEC |
| CHECK -->|是| DONE[✅ 任务完成] |
| FALLBACK -->|所有模型耗尽| ALERT[🔔 告警 + 人工介入] |
完整的 Agent runner:
class ReliableAgent:
def __init__(self, task_id: str, model_chain: ModelFallbackChain):
self.task_id = task_id
self.checkpoint = AgentCheckpoint(task_id)
self.breaker = CircuitBreaker(failure_threshold=5)
self.model_chain = model_chain
def run(self, steps: list) -> dict:
cp = self.checkpoint.load()
start_idx = cp["step"] + 1 if cp and self.checkpoint.can_resume() else 0
for i in range(start_idx, len(steps)):
step = steps[i]
model = self.model_chain.get_model()
try:
result = self._execute_step(step, model)
self.checkpoint.save(i, {"result": result, "tokens": result.get("usage", {}).get("total_tokens", 0)},
context_summary=self._summarize_context())
except CircuitBreakerOpenError:
time.sleep(self.breaker.timeout)
continue
except Exception as e:
# 降级链
try:
model = self.model_chain.fallback(str(e))
result = self._execute_step(step, model)
self.checkpoint.save(i, {"result": result}, self._summarize_context())
except RuntimeError:
raise RuntimeError(f"步骤 {i} 失败,所有模型不可用")
return {"status": "completed", "task_id": self.task_id}
def _execute_step(self, step, model):
return self.breaker.call(
lambda: self._call_llm(step, model["name"])
)
5. 效果对比
部署这套方案前后三个月的对比数据:
| 指标 | 优化前(简单重试) | 优化后(智能重试+降级+熔断) |
|---|---|---|
| 日均 API 调用失败次数 | 47 次 | 6 次 |
| 配额浪费(无效重试消耗) | 约 1200 tokens/天 | 约 80 tokens/天 |
| 429 连锁故障次数 | 月均 12 次 | 月均 1 次 |
| 长任务(>2h)成功率 | 62% | 94% |
| 人工介入频率 | 每周 3-4 次 | 每周 0-1 次 |
| 模型成本(月) | $247 | $89 |
配额浪费从 1200 tokens/天降到 80 tokens/天——降了 93%。这个数字不是我算的,是 API dashboard 上直接拉出来的。
6. 还没解决的问题
说实话,这套方案不完美。三个痛点:
1. 降级链的语义漂移。 Gemini Flash 和 Claude Sonnet 对同一个 prompt 的理解偏差还挺大的。在代码生成场景影响不大,但在需要精确语义理解的任务里(比如财务分析),降级可能导致输出质量断崖。
2. Context 压缩丢信息。 摘要再怎么做,总会丢细节。有一次 Agent 在摘要里把"用户说不要用 pandas"压缩没了,恢复后直接用 pandas 写了一大段,用户脸都绿了。
3. 熔断粒度太粗。 目前是整个 provider 级别的熔断,但实际上同一个 provider 的不同 model 可能互不影响。比如 OpenAI 的 GPT-4 限流了,GPT-4o-mini 可能还正常。
7. 总结
跑 AI Agent 到生产环境,可靠性代码的量大概是业务逻辑的 2-3 倍。不过,想省 token 省钱的话另说。
核心就四件事:
- 智能重试:区分错误类型,不是所有错误都值得重试
- 降级链:至少准备 3 个模型,最后一环是本地模型或人工兜底
- 熔断器:防止多任务互相踩踏
- 状态持久化:长任务必须 checkpoint,每 5 步存一次
这套代码我跑了一个多月了,放在 GitHub 上。如果你也在搞 Agent 生产化,欢迎拿去用,踩了坑评论区说一声——我接着修。
你跑 Agent 遇到过什么离谱的故障?评论区聊聊,我收集起来放到下一篇文章里。
更多推荐


所有评论(0)