AI Agent开发实战⑥|Agent工业化指南:日志、监控、告警与故障自愈的实战架构
·
AI Agent开发实战⑥|Agent工业化指南:日志、监控、告警与故障自愈的实战架构
Agent上线后最怕什么?不是回答质量差,而是"不知道为什么变差了"。线上跑了一晚上,第二天发现所有请求都失败了,但你完全不知道发生了什么。本文给出完整的Agent可观测性方案,让你像监控微服务一样监控Agent。
一、Agent可观测性的特殊性
传统微服务的监控是标准化的:请求进来 → 记录日志 → Prometheus采集 → Grafana展示。但Agent有几个独特挑战:
Agent监控难点:
├── 执行路径不固定(同一任务可能走不同工具组合)
├── LLM输出不可预测(无法预设"正常响应"的边界)
├── Token消耗波动大(不同任务差异可达100倍)
├── 工具调用成功率直接影响最终质量
└── 错误传播链路长(某一步失败可能导致全链路异常)
所以Agent可观测性需要五层监控:
第一层:请求层(进来了多少请求,响应时间如何)
↓
第二层:Agent行为层(调用了哪些工具,走了几步)
↓
第三层:LLM层(模型输出质量、token消耗)
↓
第四层:工具层(每个工具的成功率、延迟)
↓
第五层:业务层(任务完成率、用户满意度)
二、请求层:基础指标监控
import time
import logging
from functools import wraps
from collections import defaultdict
class AgentMetrics:
"""Agent核心指标采集器"""
def __init__(self):
self.metrics = defaultdict(list)
self.logger = logging.getLogger("agent_metrics")
def record_request(self, request_id: str, task: str, start_time: float):
"""记录请求开始"""
self.metrics["requests_total"].append({
"request_id": request_id,
"task": task[:100], # 截断,避免存储过大
"start_time": start_time,
"status": "pending"
})
def record_response(self, request_id: str,
status: str, # success/timeout/error
duration: float, # 总耗时
steps: int, # 走了多少步
tokens: int, # 总token消耗
error: str = None):
"""记录响应完成"""
self.metrics["requests_total"].append({
"request_id": request_id,
"status": status,
"duration": duration,
"steps": steps,
"tokens": tokens,
"error": error,
"timestamp": time.time()
})
# 更新聚合指标
self._update_aggregates(status, duration, steps, tokens)
def _update_aggregates(self, status, duration, steps, tokens):
"""实时计算聚合指标"""
if status == "success":
self.metrics["success_count"].append(time.time())
elif status == "error":
self.metrics["error_count"].append(time.time())
self.metrics["durations"].append(duration)
self.metrics["step_counts"].append(steps)
self.metrics["token_counts"].append(tokens)
def get_summary(self, window_seconds: int = 3600) -> dict:
"""获取指标摘要(最近N秒)"""
now = time.time()
window = now - window_seconds
# 过滤时间窗口内的数据
recent = [r for r in self.metrics["requests_total"]
if r.get("timestamp", 0) > window]
if not recent:
return {"error": "no data in window"}
durations = [r["duration"] for r in recent]
steps = [r["steps"] for r in recent]
tokens = [r["tokens"] for r in recent if "tokens" in r]
return {
"request_count": len(recent),
"success_rate": len([r for r in recent if r["status"]=="success"]) / len(recent),
"avg_duration_ms": sum(durations) / len(durations) * 1000,
"p95_duration_ms": sorted(durations)[int(len(durations) * 0.95)] * 1000,
"avg_steps": sum(steps) / len(steps),
"avg_tokens": sum(tokens) / len(tokens) if tokens else 0,
"window_seconds": window_seconds
}
# 全局指标实例
metrics = AgentMetrics()
@metrics_decorator
async def agent_endpoint(task: str, request_id: str):
"""Agent请求入口(带完整监控)"""
start = time.time()
metrics.record_request(request_id, task, start)
try:
result = await agent.run(task)
metrics.record_response(
request_id=request_id,
status="success",
duration=time.time() - start,
steps=len(result.get("steps", [])),
tokens=result.get("total_tokens", 0)
)
return result
except TimeoutError:
metrics.record_response(request_id, "timeout", time.time()-start, steps=-1, tokens=0)
raise
except Exception as e:
metrics.record_response(request_id, "error", time.time()-start, steps=-1, tokens=0, error=str(e))
raise
三、Agent行为层:追踪每一步执行
这是Agent监控最有价值的部分——记录Agent的"思考过程"。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
# 初始化链路追踪
provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter()) # 或JaegerExporter/LightstepExporter
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("agent")
# Agent执行追踪器
class AgentTracer:
"""Agent执行链路追踪"""
def __init__(self, trace_id: str):
self.trace_id = trace_id
self.spans = []
def trace_step(self, step_name: str, step_type: str, metadata: dict = None):
"""追踪Agent的每一个步骤"""
span = tracer.start_span(f"agent.{step_type}.{step_name}")
with span:
span.set_attribute("agent.trace_id", self.trace_id)
span.set_attribute("agent.step_name", step_name)
span.set_attribute("agent.step_type", step_type) # planning/execution/tool_call/llm_call
if metadata:
for k, v in metadata.items():
span.set_attribute(f"agent.{k}", str(v)[:200])
self.spans.append({
"step": step_name,
"type": step_type,
"start": span.start_time,
"end": span.end_time,
"duration_ms": (span.end_time - span.start_time) * 1000
})
return span
# Agent主循环中的追踪
class ObservableAgent:
"""可观测的Agent"""
def __init__(self, llm, tools, tracer: AgentTracer):
self.llm = llm
self.tools = {t.name: t for t in tools}
self.tracer = tracer
async def run(self, task: str) -> dict:
"""带追踪的Agent运行"""
self.tracer.trace_step("init", "planning", {"task_length": len(task)})
# 规划阶段
plan = await self._plan(task)
self.tracer.trace_step("plan_created", "planning",
{"steps": len(plan), "first_step": plan[0] if plan else None})
# 执行阶段
results = []
for i, step in enumerate(plan):
step_start = time.time()
self.tracer.trace_step(f"execute_{i+1}", "execution",
{"tool": step.get("tool"), "args": str(step.get("args", {}))[:100]})
result = await self._execute_step(step)
self.tracer.trace_step(f"result_{i+1}", "execution",
{"success": result.get("success"), "duration_ms": (time.time()-step_start)*1000})
results.append(result)
# 如果执行失败,记录并决定是否继续
if not result.get("success"):
self.tracer.trace_step(f"failure_handling_{i+1}", "error_handling",
{"error": result.get("error", "")[:200]})
break
return {"trace_id": self.tracer.trace_id, "steps": self.spans, "results": results}
追踪数据的实际价值:当你发现某类任务的平均步数突然从3步增加到7步,说明Agent可能在某些工具调用上反复失败——这比只看"成功率"能更早发现问题。
四、工具层:每个工具的健康度
class ToolHealthMonitor:
"""工具健康度监控"""
def __init__(self):
self.calls = defaultdict(list) # tool_name → [call_record]
def record_call(self, tool_name: str,
success: bool, duration_ms: float, error_type: str = None):
"""记录工具调用"""
self.calls[tool_name].append({
"success": success,
"duration_ms": duration_ms,
"error_type": error_type,
"timestamp": time.time()
})
def get_health_report(self, window_minutes: int = 30) -> dict:
"""生成工具健康度报告"""
now = time.time()
window = now - window_minutes * 60
report = {}
for tool_name, call_history in self.calls.items():
# 过滤时间窗口
recent = [c for c in call_history if c["timestamp"] > window]
if not recent:
report[tool_name] = {"status": "无数据", "health": -1}
continue
success_count = sum(1 for c in recent if c["success"])
avg_duration = sum(c["duration_ms"] for c in recent) / len(recent)
error_types = defaultdict(int)
for c in recent:
if not c["success"] and c.get("error_type"):
error_types[c["error_type"]] += 1
# 健康度评分
success_rate = success_count / len(recent)
if success_rate >= 0.99 and avg_duration < 1000:
status = "🟢 健康"
health = "good"
elif success_rate >= 0.95:
status = "🟡 轻微问题"
health = "warning"
elif success_rate >= 0.8:
status = "🟠 需要关注"
health = "concerning"
else:
status = "🔴 严重问题"
health = "critical"
report[tool_name] = {
"status": status,
"health": health,
"calls_in_window": len(recent),
"success_rate": f"{success_rate:.1%}",
"avg_duration_ms": f"{avg_duration:.0f}",
"common_errors": dict(error_types) if error_types else None
}
return report
def get_alerts(self) -> list[dict]:
"""生成告警"""
alerts = []
report = self.get_health_report()
for tool_name, status in report.items():
if status["health"] == "critical":
alerts.append({
"severity": "critical",
"tool": tool_name,
"message": f"工具{tool_name}成功率仅{status['success_rate']},请立即检查"
})
elif status["health"] == "concerning":
alerts.append({
"severity": "warning",
"tool": tool_name,
"message": f"工具{tool_name}出现异常,常见错误:{status.get('common_errors')}"
})
return alerts
# 使用示例
tool_monitor = ToolHealthMonitor()
# 在工具调用时自动记录
async def monitored_tool_call(tool, **kwargs):
start = time.time()
try:
result = tool.execute(**kwargs)
tool_monitor.record_call(
tool.name,
success=result.get("success", True),
duration_ms=(time.time() - start) * 1000
)
return result
except Exception as e:
tool_monitor.record_call(
tool.name,
success=False,
duration_ms=(time.time() - start) * 1000,
error_type=type(e).__name__
)
raise
五、告警系统:什么情况下需要立即知道
不是所有指标都要告警。下面是生产验证过的告警规则:
class AgentAlertManager:
"""Agent告警管理器"""
def __init__(self, notification_channel):
self.channel = notification_channel
self.alert_rules = [
{
"name": "成功率低于95%",
"condition": lambda m: m.get("success_rate", 1) < 0.95,
"severity": "critical",
"cooldown_minutes": 15
},
{
"name": "P95延迟超过30秒",
"condition": lambda m: m.get("p95_duration_ms", 0) > 30000,
"severity": "warning",
"cooldown_minutes": 10
},
{
"name": "某工具连续失败10次",
"condition": lambda m: any(
v.get("success_rate", 1) < 0.5
for v in m.get("tools", {}).values()
),
"severity": "critical",
"cooldown_minutes": 5
},
{
"name": "Token消耗异常增长50%",
"condition": lambda m: m.get("token_growth_rate", 0) > 0.5,
"severity": "warning",
"cooldown_minutes": 30
}
]
self.last_alert_time = {} # 每个规则的上次告警时间
self.cooldown = {} # 每个规则的冷却状态
def check_and_alert(self, metrics_summary: dict, tool_health: dict):
"""检查指标,触发告警"""
combined = {**metrics_summary, "tools": tool_health}
for rule in self.alert_rules:
try:
if rule["condition"](combined):
# 检查冷却
now = time.time()
last = self.last_alert_time.get(rule["name"], 0)
cooldown = rule["cooldown_minutes"] * 60
if now - last < cooldown:
continue # 在冷却期内,不重复告警
# 发送告警
self._send_alert(rule, combined)
self.last_alert_time[rule["name"]] = now
except Exception as e:
# 告警系统本身不能崩溃
print(f"告警检查异常: {e}")
def _send_alert(self, rule: dict, data: dict):
"""发送告警通知"""
message = f"""
🚨 【{rule['severity'].upper()}】Agent告警
告警规则:{rule['name']}
触发时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
当前指标:
{json.dumps(data, indent=2, ensure_ascii=False)[:500]}
请及时检查。
"""
self.channel.send(message)
六、故障自愈:Agent自己处理问题
除了监控告警,Agent还应该具备自动故障恢复能力:
class SelfHealingAgent:
"""带故障自愈能力的Agent"""
def __init__(self, base_agent, tool_monitor):
self.agent = base_agent
self.monitor = tool_monitor
async def run_with_healing(self, task: str) -> dict:
"""自动处理常见故障"""
attempt = 0
max_attempts = 3
while attempt < max_attempts:
attempt += 1
try:
result = await self.agent.run(task)
# 执行后检查
health = self.monitor.get_health_report(window_minutes=5)
alerts = self.monitor.get_alerts()
if alerts:
# 有告警,但任务成功:记录警告,继续
result["warnings"] = alerts
return result
except ToolError as e:
# 工具错误:检查工具是否可用,尝试降级
if self._should_fallback(e.tool_name):
self.agent.disable_tool(e.tool_name)
continue # 跳过失败的工具,重试
raise
except ContextOverflowError:
# 上下文溢出:压缩历史,降低max_iterations
self.agent.compress_history()
self.agent.reduce_iterations()
continue
except RateLimitError:
# 限流:等待后重试
await asyncio.sleep(30)
continue
except Exception as e:
# 未知错误:记录,返回错误信息
return {
"success": False,
"error": str(e),
"attempts": attempt,
"message": "任务执行遇到问题,建议重试或简化任务"
}
return {
"success": False,
"error": f"重试{attempt}次后仍未成功",
"message": "请稍后重试或联系技术支持"
}
def _should_fallback(self, tool_name: str) -> bool:
"""判断是否应该降级该工具"""
health = self.monitor.get_health_report(window_minutes=10)
tool_health = health.get(tool_name, {})
return tool_health.get("health") in ["critical", "concerning"]
七、总结:Agent可观测性三步走
| 阶段 | 做什么 | 收益 |
|---|---|---|
| 第一步:记录 | 接入请求日志、工具调用追踪 | 能回溯任何一次请求的执行路径 |
| 第二步:分析 | 配置成功率/P95延迟/Token消耗指标 | 能判断Agent整体健康状态 |
| 第三步:自愈 | 告警触发自动降级、重试、限流 | 大部分问题无需人工介入 |
推荐工具栈:
- 日志采集:ELK Stack 或 Loki
- 链路追踪:Jaeger 或 Apache SkyWalking
- 指标采集:Prometheus + Grafana
- 告警通道:企业微信机器人 / Slack / 飞书
下篇文章预告:「Agent框架横评2026:LangChain vs LangGraph vs CrewAI vs AutoGen 实测选型」——每个框架的真实性能对比、适用场景分析和选型决策树。
需要完整监控代码模板和Grafana配置的同学,可以看我主页的付费资源专栏。
有问题欢迎评论区留言,大家一起讨论!
所有评论(0)