AI Agent开发实战⑥|Agent工业化指南:日志、监控、告警与故障自愈的实战架构

Agent上线后最怕什么?不是回答质量差,而是"不知道为什么变差了"。线上跑了一晚上,第二天发现所有请求都失败了,但你完全不知道发生了什么。本文给出完整的Agent可观测性方案,让你像监控微服务一样监控Agent。

一、Agent可观测性的特殊性

传统微服务的监控是标准化的:请求进来 → 记录日志 → Prometheus采集 → Grafana展示。但Agent有几个独特挑战:

Agent监控难点:
├── 执行路径不固定(同一任务可能走不同工具组合)
├── LLM输出不可预测(无法预设"正常响应"的边界)
├── Token消耗波动大(不同任务差异可达100倍)
├── 工具调用成功率直接影响最终质量
└── 错误传播链路长(某一步失败可能导致全链路异常)

所以Agent可观测性需要五层监控

第一层:请求层(进来了多少请求,响应时间如何)
    ↓
第二层:Agent行为层(调用了哪些工具,走了几步)
    ↓
第三层:LLM层(模型输出质量、token消耗)
    ↓
第四层:工具层(每个工具的成功率、延迟)
    ↓
第五层:业务层(任务完成率、用户满意度)

二、请求层:基础指标监控

import time
import logging
from functools import wraps
from collections import defaultdict

class AgentMetrics:
    """Agent核心指标采集器"""
    
    def __init__(self):
        self.metrics = defaultdict(list)
        self.logger = logging.getLogger("agent_metrics")
    
    def record_request(self, request_id: str, task: str, start_time: float):
        """记录请求开始"""
        self.metrics["requests_total"].append({
            "request_id": request_id,
            "task": task[:100],  # 截断,避免存储过大
            "start_time": start_time,
            "status": "pending"
        })
    
    def record_response(self, request_id: str, 
                       status: str,          # success/timeout/error
                       duration: float,       # 总耗时
                       steps: int,            # 走了多少步
                       tokens: int,           # 总token消耗
                       error: str = None):
        """记录响应完成"""
        self.metrics["requests_total"].append({
            "request_id": request_id,
            "status": status,
            "duration": duration,
            "steps": steps,
            "tokens": tokens,
            "error": error,
            "timestamp": time.time()
        })
        
        # 更新聚合指标
        self._update_aggregates(status, duration, steps, tokens)
    
    def _update_aggregates(self, status, duration, steps, tokens):
        """实时计算聚合指标"""
        
        if status == "success":
            self.metrics["success_count"].append(time.time())
        elif status == "error":
            self.metrics["error_count"].append(time.time())
        
        self.metrics["durations"].append(duration)
        self.metrics["step_counts"].append(steps)
        self.metrics["token_counts"].append(tokens)
    
    def get_summary(self, window_seconds: int = 3600) -> dict:
        """获取指标摘要(最近N秒)"""
        now = time.time()
        window = now - window_seconds
        
        # 过滤时间窗口内的数据
        recent = [r for r in self.metrics["requests_total"] 
                 if r.get("timestamp", 0) > window]
        
        if not recent:
            return {"error": "no data in window"}
        
        durations = [r["duration"] for r in recent]
        steps = [r["steps"] for r in recent]
        tokens = [r["tokens"] for r in recent if "tokens" in r]
        
        return {
            "request_count": len(recent),
            "success_rate": len([r for r in recent if r["status"]=="success"]) / len(recent),
            "avg_duration_ms": sum(durations) / len(durations) * 1000,
            "p95_duration_ms": sorted(durations)[int(len(durations) * 0.95)] * 1000,
            "avg_steps": sum(steps) / len(steps),
            "avg_tokens": sum(tokens) / len(tokens) if tokens else 0,
            "window_seconds": window_seconds
        }


# 全局指标实例
metrics = AgentMetrics()


@metrics_decorator
async def agent_endpoint(task: str, request_id: str):
    """Agent请求入口(带完整监控)"""
    start = time.time()
    metrics.record_request(request_id, task, start)
    
    try:
        result = await agent.run(task)
        
        metrics.record_response(
            request_id=request_id,
            status="success",
            duration=time.time() - start,
            steps=len(result.get("steps", [])),
            tokens=result.get("total_tokens", 0)
        )
        
        return result
    
    except TimeoutError:
        metrics.record_response(request_id, "timeout", time.time()-start, steps=-1, tokens=0)
        raise
    except Exception as e:
        metrics.record_response(request_id, "error", time.time()-start, steps=-1, tokens=0, error=str(e))
        raise

三、Agent行为层:追踪每一步执行

这是Agent监控最有价值的部分——记录Agent的"思考过程"。

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource

# 初始化链路追踪
provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())  # 或JaegerExporter/LightstepExporter
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("agent")

# Agent执行追踪器
class AgentTracer:
    """Agent执行链路追踪"""
    
    def __init__(self, trace_id: str):
        self.trace_id = trace_id
        self.spans = []
    
    def trace_step(self, step_name: str, step_type: str, metadata: dict = None):
        """追踪Agent的每一个步骤"""
        
        span = tracer.start_span(f"agent.{step_type}.{step_name}")
        
        with span:
            span.set_attribute("agent.trace_id", self.trace_id)
            span.set_attribute("agent.step_name", step_name)
            span.set_attribute("agent.step_type", step_type)  # planning/execution/tool_call/llm_call
            if metadata:
                for k, v in metadata.items():
                    span.set_attribute(f"agent.{k}", str(v)[:200])
        
        self.spans.append({
            "step": step_name,
            "type": step_type,
            "start": span.start_time,
            "end": span.end_time,
            "duration_ms": (span.end_time - span.start_time) * 1000
        })
        
        return span


# Agent主循环中的追踪
class ObservableAgent:
    """可观测的Agent"""
    
    def __init__(self, llm, tools, tracer: AgentTracer):
        self.llm = llm
        self.tools = {t.name: t for t in tools}
        self.tracer = tracer
    
    async def run(self, task: str) -> dict:
        """带追踪的Agent运行"""
        
        self.tracer.trace_step("init", "planning", {"task_length": len(task)})
        
        # 规划阶段
        plan = await self._plan(task)
        self.tracer.trace_step("plan_created", "planning", 
                                {"steps": len(plan), "first_step": plan[0] if plan else None})
        
        # 执行阶段
        results = []
        for i, step in enumerate(plan):
            step_start = time.time()
            
            self.tracer.trace_step(f"execute_{i+1}", "execution", 
                                  {"tool": step.get("tool"), "args": str(step.get("args", {}))[:100]})
            
            result = await self._execute_step(step)
            
            self.tracer.trace_step(f"result_{i+1}", "execution",
                                  {"success": result.get("success"), "duration_ms": (time.time()-step_start)*1000})
            
            results.append(result)
            
            # 如果执行失败,记录并决定是否继续
            if not result.get("success"):
                self.tracer.trace_step(f"failure_handling_{i+1}", "error_handling",
                                      {"error": result.get("error", "")[:200]})
                break
        
        return {"trace_id": self.tracer.trace_id, "steps": self.spans, "results": results}

追踪数据的实际价值:当你发现某类任务的平均步数突然从3步增加到7步,说明Agent可能在某些工具调用上反复失败——这比只看"成功率"能更早发现问题。

四、工具层:每个工具的健康度

class ToolHealthMonitor:
    """工具健康度监控"""
    
    def __init__(self):
        self.calls = defaultdict(list)  # tool_name → [call_record]
    
    def record_call(self, tool_name: str, 
                   success: bool, duration_ms: float, error_type: str = None):
        """记录工具调用"""
        self.calls[tool_name].append({
            "success": success,
            "duration_ms": duration_ms,
            "error_type": error_type,
            "timestamp": time.time()
        })
    
    def get_health_report(self, window_minutes: int = 30) -> dict:
        """生成工具健康度报告"""
        
        now = time.time()
        window = now - window_minutes * 60
        report = {}
        
        for tool_name, call_history in self.calls.items():
            # 过滤时间窗口
            recent = [c for c in call_history if c["timestamp"] > window]
            
            if not recent:
                report[tool_name] = {"status": "无数据", "health": -1}
                continue
            
            success_count = sum(1 for c in recent if c["success"])
            avg_duration = sum(c["duration_ms"] for c in recent) / len(recent)
            error_types = defaultdict(int)
            for c in recent:
                if not c["success"] and c.get("error_type"):
                    error_types[c["error_type"]] += 1
            
            # 健康度评分
            success_rate = success_count / len(recent)
            if success_rate >= 0.99 and avg_duration < 1000:
                status = "🟢 健康"
                health = "good"
            elif success_rate >= 0.95:
                status = "🟡 轻微问题"
                health = "warning"
            elif success_rate >= 0.8:
                status = "🟠 需要关注"
                health = "concerning"
            else:
                status = "🔴 严重问题"
                health = "critical"
            
            report[tool_name] = {
                "status": status,
                "health": health,
                "calls_in_window": len(recent),
                "success_rate": f"{success_rate:.1%}",
                "avg_duration_ms": f"{avg_duration:.0f}",
                "common_errors": dict(error_types) if error_types else None
            }
        
        return report
    
    def get_alerts(self) -> list[dict]:
        """生成告警"""
        alerts = []
        report = self.get_health_report()
        
        for tool_name, status in report.items():
            if status["health"] == "critical":
                alerts.append({
                    "severity": "critical",
                    "tool": tool_name,
                    "message": f"工具{tool_name}成功率仅{status['success_rate']},请立即检查"
                })
            elif status["health"] == "concerning":
                alerts.append({
                    "severity": "warning",
                    "tool": tool_name,
                    "message": f"工具{tool_name}出现异常,常见错误:{status.get('common_errors')}"
                })
        
        return alerts


# 使用示例
tool_monitor = ToolHealthMonitor()

# 在工具调用时自动记录
async def monitored_tool_call(tool, **kwargs):
    start = time.time()
    try:
        result = tool.execute(**kwargs)
        tool_monitor.record_call(
            tool.name,
            success=result.get("success", True),
            duration_ms=(time.time() - start) * 1000
        )
        return result
    except Exception as e:
        tool_monitor.record_call(
            tool.name,
            success=False,
            duration_ms=(time.time() - start) * 1000,
            error_type=type(e).__name__
        )
        raise

五、告警系统:什么情况下需要立即知道

不是所有指标都要告警。下面是生产验证过的告警规则:

class AgentAlertManager:
    """Agent告警管理器"""
    
    def __init__(self, notification_channel):
        self.channel = notification_channel
        self.alert_rules = [
            {
                "name": "成功率低于95%",
                "condition": lambda m: m.get("success_rate", 1) < 0.95,
                "severity": "critical",
                "cooldown_minutes": 15
            },
            {
                "name": "P95延迟超过30秒",
                "condition": lambda m: m.get("p95_duration_ms", 0) > 30000,
                "severity": "warning",
                "cooldown_minutes": 10
            },
            {
                "name": "某工具连续失败10次",
                "condition": lambda m: any(
                    v.get("success_rate", 1) < 0.5 
                    for v in m.get("tools", {}).values()
                ),
                "severity": "critical",
                "cooldown_minutes": 5
            },
            {
                "name": "Token消耗异常增长50%",
                "condition": lambda m: m.get("token_growth_rate", 0) > 0.5,
                "severity": "warning",
                "cooldown_minutes": 30
            }
        ]
        
        self.last_alert_time = {}  # 每个规则的上次告警时间
        self.cooldown = {}         # 每个规则的冷却状态
    
    def check_and_alert(self, metrics_summary: dict, tool_health: dict):
        """检查指标,触发告警"""
        
        combined = {**metrics_summary, "tools": tool_health}
        
        for rule in self.alert_rules:
            try:
                if rule["condition"](combined):
                    
                    # 检查冷却
                    now = time.time()
                    last = self.last_alert_time.get(rule["name"], 0)
                    cooldown = rule["cooldown_minutes"] * 60
                    
                    if now - last < cooldown:
                        continue  # 在冷却期内,不重复告警
                    
                    # 发送告警
                    self._send_alert(rule, combined)
                    self.last_alert_time[rule["name"]] = now
                    
            except Exception as e:
                # 告警系统本身不能崩溃
                print(f"告警检查异常: {e}")
    
    def _send_alert(self, rule: dict, data: dict):
        """发送告警通知"""
        message = f"""
🚨 【{rule['severity'].upper()}】Agent告警

告警规则:{rule['name']}
触发时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

当前指标:
{json.dumps(data, indent=2, ensure_ascii=False)[:500]}

请及时检查。
        """
        
        self.channel.send(message)

六、故障自愈:Agent自己处理问题

除了监控告警,Agent还应该具备自动故障恢复能力:

class SelfHealingAgent:
    """带故障自愈能力的Agent"""
    
    def __init__(self, base_agent, tool_monitor):
        self.agent = base_agent
        self.monitor = tool_monitor
    
    async def run_with_healing(self, task: str) -> dict:
        """自动处理常见故障"""
        
        attempt = 0
        max_attempts = 3
        
        while attempt < max_attempts:
            attempt += 1
            
            try:
                result = await self.agent.run(task)
                
                # 执行后检查
                health = self.monitor.get_health_report(window_minutes=5)
                alerts = self.monitor.get_alerts()
                
                if alerts:
                    # 有告警,但任务成功:记录警告,继续
                    result["warnings"] = alerts
                
                return result
            
            except ToolError as e:
                # 工具错误:检查工具是否可用,尝试降级
                if self._should_fallback(e.tool_name):
                    self.agent.disable_tool(e.tool_name)
                    continue  # 跳过失败的工具,重试
                raise
            
            except ContextOverflowError:
                # 上下文溢出:压缩历史,降低max_iterations
                self.agent.compress_history()
                self.agent.reduce_iterations()
                continue
            
            except RateLimitError:
                # 限流:等待后重试
                await asyncio.sleep(30)
                continue
            
            except Exception as e:
                # 未知错误:记录,返回错误信息
                return {
                    "success": False,
                    "error": str(e),
                    "attempts": attempt,
                    "message": "任务执行遇到问题,建议重试或简化任务"
                }
        
        return {
            "success": False,
            "error": f"重试{attempt}次后仍未成功",
            "message": "请稍后重试或联系技术支持"
        }
    
    def _should_fallback(self, tool_name: str) -> bool:
        """判断是否应该降级该工具"""
        health = self.monitor.get_health_report(window_minutes=10)
        tool_health = health.get(tool_name, {})
        return tool_health.get("health") in ["critical", "concerning"]

七、总结:Agent可观测性三步走

阶段 做什么 收益
第一步:记录 接入请求日志、工具调用追踪 能回溯任何一次请求的执行路径
第二步:分析 配置成功率/P95延迟/Token消耗指标 能判断Agent整体健康状态
第三步:自愈 告警触发自动降级、重试、限流 大部分问题无需人工介入

推荐工具栈

  • 日志采集:ELK Stack 或 Loki
  • 链路追踪:Jaeger 或 Apache SkyWalking
  • 指标采集:Prometheus + Grafana
  • 告警通道:企业微信机器人 / Slack / 飞书

下篇文章预告:「Agent框架横评2026:LangChain vs LangGraph vs CrewAI vs AutoGen 实测选型」——每个框架的真实性能对比、适用场景分析和选型决策树。


需要完整监控代码模板和Grafana配置的同学,可以看我主页的付费资源专栏。

有问题欢迎评论区留言,大家一起讨论!

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!