摘要

中间件是 OpenClaw 处理链路中最灵活的一环。本文从中间件的设计哲学出发,系统讲解中间件的三种模式(前置、后置、环绕)、洋葱模型执行链、请求/响应变换机制,以及流式消息处理的特殊考量。通过两个完整实战案例——“消息敏感词过滤中间件"和"请求耗时统计仪表盘中间件”——你将掌握从零开发生产级中间件的全部技能。读完你会理解:为什么 AI Agent 框架需要中间件,以及如何用它优雅地解决横切关注点。


1. 引言:为什么 AI Agent 也需要中间件

1.1 场景还原

先说一个真实遇到过的问题。

你的 OpenClaw Agent 接入了企业微信和飞书两个渠道,一切都运行得很好。直到有一天,安全部门找上门来:“你们 Agent 回复的消息里,有没有可能泄露敏感信息?能不能在消息发出去之前做一次检查?”

你第一反应可能是——在每个处理逻辑里加判断。但是 Agent 的回复路径不止一条:直接回复、自动摘要、子代理消息、心跳输出……每条路径都要改一遍?

更麻烦的是,一周后运维团队又提了新需求:“能不能统计每个渠道的消息响应耗时?我们想做性能监控。” 两周后产品说:“飞书渠道的消息需要自动添加加粗格式,但微信渠道不需要。”

面对这些横切关注点(cross-cutting concerns),逐个修改业务逻辑的代价越来越高。这正是中间件的用武之地。

1.2 中间件的本质

✅ 有中间件

消息进入

🔍 脱敏中间件

📊 统计中间件

🎨 格式中间件

Agent逻辑
(干净纯粹)

格式中间件

统计中间件

输出

❌ 无中间件

消息进入

Agent逻辑

直接输出

消息进入

Agent逻辑
+ 脱敏
+ 统计
+ 格式转换

输出

方案 扩展性 维护性 复用性
无中间件(修改核心) ❌ 每次加需求改核心 ❌ 核心代码膨胀 ❌ 无法复用
有中间件(拦截链) ✅ 添加新中间件即可 ✅ 核心保持干净 ✅ 中间件独立复用

一句话总结:中间件让你在不修改 Agent 核心逻辑的前提下,插入任何需要的前置和后置处理逻辑。


2. 中间件架构设计

2.1 三种中间件模式

OpenClaw 的中间件根据切入时机分为三种:

模式 切入时机 典型用途 能否修改数据
Before(前置) Agent 处理之前 鉴权、参数校验、输入清洗 ✅ 可修改请求
After(后置) Agent 处理之后 格式化、翻译、脱敏 ✅ 可修改响应
Around(环绕) 包裹整个处理过程 性能统计、异常捕获、缓存 ✅ 可控制全流程
后置中间件 AI Agent 前置中间件 消息来源 后置中间件 AI Agent 前置中间件 消息来源 原始消息 鉴权/校验/清洗 处理后的消息 AI 推理 AI 响应 脱敏/格式化/增强 最终响应

2.2 洋葱模型执行链

多个中间件会形成一个洋葱模型的执行链——请求从外层穿过一层层中间件到达核心,响应再从核心穿过一层层中间件回到外层:

🧅 洋葱模型执行链

📥 请求

⬇️ Mid 1
前置

⬇️ Mid 2
前置

🧠 Agent
处理

⬆️ Mid 2
后置

⬆️ Mid 1
后置

📤 响应

洋葱模型的执行顺序

请求阶段:Mid 1 → Mid 2 → Agent Core
响应阶段:Agent Core → Mid 2 → Mid 1

💡 这个设计保证了:最先拦截请求的中间件,最后看到响应。就像剥洋葱——先接触的外层,最后离开。

2.3 中间件配置体系

# openclaw.yaml
middleware:
  enabled: true
  
  # 全局中间件(对所有渠道生效)
  global:
    - name: "auth-validator"      # 身份验证
      enabled: true
      order: 10                    # 执行顺序(数字越小越靠外)
    - name: "rate-limiter"         # 速率限制
      enabled: true
      order: 20
    - name: "request-logger"       # 请求日志
      enabled: true
      order: 30
  
  # 按渠道配置中间件
  channels:
    feishu:
      - name: "feishu-formatter"   # 飞书格式转换
        enabled: true
        order: 50
      - name: "sensitive-filter"   # 敏感信息过滤
        enabled: true
        order: 60
        config:
          keywords: ["密码", "token", "密钥"]
          mask_char: "*"
    wecom:
      - name: "wecom-formatter"    # 企微格式转换
        enabled: true
        order: 50
    discord:
      - name: "discord-sanitizer"  # Discord Markdown清洗
        enabled: true
        order: 50

配置策略表

配置项 说明 建议
global 对所有渠道生效 放基础中间件(鉴权、日志、限流)
channels.<name> 针对特定渠道 放渠道特有的中间件(格式化、清洗)
order 执行顺序 鉴权(order:10) → 限流(20) → 日志(30) → 格式化(50) → 过滤(60)
config 中间件特定配置 通过 config 段传递自定义参数

3. 中间件开发实战:消息敏感词过滤

3.1 需求分析

目标:开发一个敏感信息过滤中间件,在 Agent 回复消息发送前自动检测并脱敏。不阻止消息发送(非阻塞模式),但标记并记录敏感信息。

3.2 完整实现

"""
middleware_sensitive_filter.py
消息敏感词过滤中间件

功能:
1. 检测消息中的敏感信息(手机号、身份证、银行卡、API Key)
2. 自动脱敏替换(如 138****1234)
3. 记录脱敏日志供审计
4. 支持自定义规则和豁免名单
"""

import re
import json
import time
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field

# ============================================
# 1. 敏感信息检测规则定义
# ============================================

@dataclass
class DetectionRule:
    """检测规则定义"""
    name: str
    pattern: str
    mask_fn: callable  # 脱敏函数
    severity: str = "medium"  # low / medium / high
    enabled: bool = True


# 预置检测规则库
BUILTIN_RULES: List[DetectionRule] = [
    # 中国大陆手机号
    DetectionRule(
        name="phone_cn",
        pattern=r'1[3-9]\d{9}',
        mask_fn=lambda m: m.group()[:3] + "****" + m.group()[-4:],
        severity="high"
    ),
    # 身份证号码
    DetectionRule(
        name="id_card_cn",
        pattern=r'\d{17}[\dXx]',
        mask_fn=lambda m: m.group()[:4] + "**********" + m.group()[-4:],
        severity="high"
    ),
    # 银行卡号(16-19位)
    DetectionRule(
        name="bank_card",
        pattern=r'\d{16,19}',
        mask_fn=lambda m: m.group()[:4] + " **** **** " + m.group()[-4:],
        severity="high"
    ),
    # API Key(常见格式)
    DetectionRule(
        name="api_key",
        pattern=r'(?:api[_-]?key|apikey|token|secret)[:=]\s*["\']?([\w-]{20,})["\']?',
        mask_fn=lambda m: m.group(1)[:4] + "***..." + m.group(1)[-4:] 
            if len(m.group(1)) > 8 else "***",
        severity="high"
    ),
    # 邮箱地址
    DetectionRule(
        name="email",
        pattern=r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
        mask_fn=lambda m: m.group()[:2] + "****@" + m.group().split("@")[1],
        severity="medium"
    ),
]


# ============================================
# 2. 自定义关键词匹配器
# ============================================

class KeywordMatcher:
    """基于关键词列表的匹配器
    
    支持:
    - 精确匹配
    - 正则模式匹配  
    - 自定义脱敏字符
    """
    
    def __init__(self, keywords: List[str], mask_char: str = "*"):
        self.keywords = set(keywords)
        self.mask_char = mask_char
    
    def check(self, text: str) -> Dict[str, Any]:
        """检查文本是否包含关键词"""
        found = []
        clean_text = text
        for kw in self.keywords:
            if kw.lower() in clean_text.lower():
                found.append(kw)
                # 替换为脱敏字符
                masked = self.mask_char * len(kw)
                clean_text = clean_text.replace(kw, masked)
        
        return {
            "has_sensitive": len(found) > 0,
            "matched_keywords": found,
            "count": len(found),
            "masked_text": clean_text
        }


# ============================================
# 3. 核心中间件逻辑
# ============================================

class SensitiveFilterMiddleware:
    """敏感信息过滤中间件
    
    工作流程:
    1. 接收消息 → 正则匹配敏感模式
    2. 关键词匹配
    3. 脱敏替换
    4. 记录审计日志
    5. 返回脱敏后的消息
    """
    
    def __init__(self, config: Optional[Dict] = None):
        cfg = config or {}
        
        # 加载规则
        self.rules = BUILTIN_RULES.copy()
        
        # 关键词过滤器
        keywords = cfg.get("keywords", [])
        mask_char = cfg.get("mask_char", "*")
        self.keyword_matcher = KeywordMatcher(keywords, mask_char)
        
        # 豁免名单(不对特定用户做过滤)
        self.whitelist = set(cfg.get("whitelist", []))
        
        # 审计日志
        self.audit_log = []
        
        # 统计
        self.stats = {
            "total_checked": 0,
            "total_masked": 0,
            "by_rule": {}
        }
    
    def check(self, text: str, user_id: Optional[str] = None) -> Dict[str, Any]:
        """
        检查并脱敏一条消息
        
        Args:
            text: 待检查的文本
            user_id: 消息发送者ID(用于豁免检查)
        
        Returns:
            包含脱敏文本和检测报告的字典
        """
        # 豁免检查
        if user_id and user_id in self.whitelist:
            return {
                "original": text,
                "masked_text": text,
                "masked": False,
                "reason": "whitelist",
                "details": []
            }
        
        clean_text = text
        details = []
        masked_count = 0
        
        # 1. 正则规则检测
        for rule in self.rules:
            if not rule.enabled:
                continue
            
            matches = list(re.finditer(rule.pattern, clean_text, re.IGNORECASE))
            if matches:
                # 应用脱敏
                for match in matches:
                    mask_result = rule.mask_fn(match)
                    clean_text = clean_text.replace(match.group(), mask_result)
                
                details.append({
                    "rule": rule.name,
                    "severity": rule.severity,
                    "matches": len(matches),
                    "samples": [m.group()[:20] for m in matches[:3]]
                })
                masked_count += len(matches)
                
                # 更新统计
                self.stats["by_rule"][rule.name] = \
                    self.stats["by_rule"].get(rule.name, 0) + len(matches)
        
        # 2. 关键词检测
        kw_result = self.keyword_matcher.check(clean_text)
        if kw_result["has_sensitive"]:
            clean_text = kw_result["masked_text"]
            details.append({
                "rule": "keyword_match",
                "severity": "low",
                "matches": kw_result["count"],
                "samples": kw_result["matched_keywords"][:3]
            })
            masked_count += kw_result["count"]
        
        # 3. 更新统计
        self.stats["total_checked"] += 1
        if masked_count > 0:
            self.stats["total_masked"] += 1
        
        # 4. 审计日志(不记录原文内容,只记录元数据)
        audit_entry = {
            "timestamp": time.time(),
            "user_id": user_id,
            "text_length": len(text),
            "masked": masked_count > 0,
            "match_count": masked_count,
            "rule_names": [d["rule"] for d in details]
        }
        self.audit_log.append(audit_entry)
        
        return {
            "original_length": len(text),
            "masked_text": clean_text,
            "masked": masked_count > 0,
            "total_matched": masked_count,
            "details": details,
            "audit_id": len(self.audit_log)
        }


# ============================================
# 4. 中间件钩子函数(OpenClaw 接口)
# ============================================

middleware_instance = None

def on_load(config: Dict = None):
    """中间件加载初始化"""
    global middleware_instance
    middleware_instance = SensitiveFilterMiddleware(config)
    print(f"✅ 敏感信息过滤中间件已加载 "
          f"(规则: {len(middleware_instance.rules)}, "
          f"关键词: {len(middleware_instance.keyword_matcher.keywords)})")
    return {"status": "loaded"}

def before_message(message: Dict) -> Dict:
    """消息发送前拦截"""
    if not middleware_instance:
        return message
    
    text = message.get("content", "")
    user_id = message.get("user_id")
    
    result = middleware_instance.check(text, user_id)
    
    # 替换为脱敏后的内容
    message["content"] = result["masked_text"]
    
    # 附加元数据(不影响消息内容,供下游使用)
    message["_meta"] = message.get("_meta", {})
    message["_meta"]["sensitive_filter"] = {
        "masked": result["masked"],
        "count": result["total_matched"]
    }
    
    return message

def on_unload():
    """卸载前清理"""
    global middleware_instance
    if middleware_instance:
        print(f"📊 敏感信息过滤统计: "
              f"检查 {middleware_instance.stats['total_checked']} 条, "
              f"脱敏 {middleware_instance.stats['total_masked']} 条")
    middleware_instance = None
    print("🛑 敏感信息过滤中间件已卸载")
    return {"status": "unloaded"}

3.3 效果演示

# 测试敏感词过滤
from middleware_sensitive_filter import SensitiveFilterMiddleware

middleware = SensitiveFilterMiddleware({
    "keywords": ["密码", "密钥", "secret"],
    "mask_char": "*"
})

test_messages = [
    "您的手机号是13812341234,请记录",
    "我的身份证号是310101199001011234",
    "API密钥是sk-abcdefghijklmnopqrstuvw",
    "请帮我重置一下登录密码",
]

for msg in test_messages:
    result = middleware.check(msg)
    status = "🟢" if not result["masked"] else "🟡"
    print(f"{status} 原文: {result['original_length']}字")
    print(f"   脱敏后: {result['masked_text']}")
    if result["details"]:
        for d in result["details"]:
            print(f"   └─ {d['rule']}: {d['matches']}处匹配")
    print()

预期输出

🟡 原文: 16字
   脱敏后: 您的手机号是138****1234,请记录
   └─ phone_cn: 1处匹配

🟡 原文: 19字
   脱敏后: 我的身份证号是3101**********1234
   └─ id_card_cn: 1处匹配

🟡 原文: 18字
   脱敏后: API密钥是sk-a***...tuvw
   └─ api_key: 1处匹配

🟡 原文: 11字
   脱敏后: 请帮我重置一下登录**
   └─ keyword_match: 1处匹配

4. 实战案例二:请求耗时统计仪表盘中间件

4.1 需求分析

目标:统计每个渠道的消息处理耗时,按小时聚合,提供 API 查询仪表盘数据。

4.2 环绕中间件实现

"""
middleware_latency_stats.py  
请求耗时统计仪表盘中间件

采用环绕模式(Around)实现对 Agent 处理全流程的耗时统计,
并聚合为小时级数据供监控面板查询。
"""

import time
import json
from collections import defaultdict
from typing import Dict, Any, Optional
from datetime import datetime


class LatencyStatsMiddleware:
    """环绕中间件——包裹Agent处理过程并记录耗时"""
    
    def __init__(self, config: Optional[Dict] = None):
        # 小时级聚合数据 { "2026-06-21T12": { "feishu": {count, total, min, max} } }
        self.hourly_stats = defaultdict(lambda: defaultdict(
            lambda: {"count": 0, "total_ms": 0, "min": float("inf"), "max": 0, "p99": 0}
        ))
        self.all_durations = []  # 用于计算P99
    
    def get_hour_key(self) -> str:
        """获取当前小时标识"""
        return datetime.now().strftime("%Y-%m-%dT%H")
    
    def update_stats(self, channel: str, duration_ms: float):
        """更新统计指标"""
        hour = self.get_hour_key()
        stats = self.hourly_stats[hour][channel]
        
        stats["count"] += 1
        stats["total_ms"] += duration_ms
        stats["min"] = min(stats["min"], duration_ms)
        stats["max"] = max(stats["max"], duration_ms)
        
        # 保留最近1000条用于P99计算
        self.all_durations.append(duration_ms)
        if len(self.all_durations) > 1000:
            self.all_durations.pop(0)
    
    def get_stats(self, hours: int = 24) -> Dict:
        """获取最近 N 小时的统计数据,供监控面板 API 查询"""
        result = {}
        now = datetime.now()
        for h in range(hours):
            key = (now.replace(hour=now.hour - h)).strftime("%Y-%m-%dT%H")
            if key in self.hourly_stats:
                hour_data = {}
                for channel, stats in self.hourly_stats[key].items():
                    avg = stats["total_ms"] / stats["count"] if stats["count"] > 0 else 0
                    hour_data[channel] = {
                        **stats,
                        "avg_ms": round(avg, 2),
                        "min": round(stats["min"], 2) if stats["min"] != float("inf") else 0,
                        "max": round(stats["max"], 2)
                    }
                result[key] = hour_data
        return result
    
    def calculate_p99(self) -> float:
        """计算P99延迟"""
        if not self.all_durations:
            return 0
        sorted_durations = sorted(self.all_durations)
        idx = int(len(sorted_durations) * 0.99)
        return sorted_durations[idx]


# ============================================
# 环绕中间件——执行包裹
# ============================================

stats_middleware = None

def on_load(config: Dict = None):
    global stats_middleware
    stats_middleware = LatencyStatsMiddleware(config)
    print("✅ 请求耗时统计中间件已加载")
    return {"status": "loaded"}

# 环绕中间件需要一个包裹函数来同时处理请求前和请求后
# 在OpenClaw中,通过yield语法实现环绕
def around_message(message: Dict, next_handler: callable):
    """
    环绕中间件——包裹完整的消息处理流程
    
    prev_handler: 返回生成器(yield前是前置,yield后是后置)
    """
    channel = message.get("channel", "unknown")
    start = time.time()
    
    # ====== 前置:记录开始时间 ======
    # 可以在这里做限流判断
    print(f"📊 [{channel}] 开始处理消息...")
    
    # ====== 调用下一层(Agent处理) ======
    result = next_handler(message)
    
    # ====== 后置:计算耗时并更新统计 ======
    duration_ms = (time.time() - start) * 1000
    stats_middleware.update_stats(channel, duration_ms)
    
    print(f"📊 [{channel}] 处理完成, 耗时: {duration_ms:.2f}ms")
    
    # 将耗时信息附加到响应元数据
    if isinstance(result, dict):
        result["_meta"] = result.get("_meta", {})
        result["_meta"]["latency_ms"] = round(duration_ms, 2)
    
    return result

def get_dashboard_data():
    """查询仪表盘数据的API"""
    if not stats_middleware:
        return {"error": "中间件未初始化"}
    return {
        "p99_ms": round(stats_middleware.calculate_p99(), 2),
        "hourly": stats_middleware.get_stats(24)
    }

EXPORTS = {
    "middleware": {
        "around_message": around_message
    },
    "api": {
        "get_dashboard_data": get_dashboard_data
    }
}

4.3 模拟仪表盘数据查询

# 查询仪表盘数据
data = get_dashboard_data()
for hour_key, channels in sorted(data["hourly"].items()):
    print(f"\n⏰ {hour_key}")
    for channel, stats in channels.items():
        print(f"  📡 {channel}: "
              f"请求数={stats['count']}, "
              f"平均={stats['avg_ms']}ms, "
              f"P99={data['p99_ms']}ms")

5. 中间件开发最佳实践

5.1 中间件设计原则

原则 说明 反例
单一职责 一个中间件只做一件事 既做鉴权又做统计
无副作用 不改变业务逻辑的正确性 过滤中间件把正常内容也删了
可配置 行为由配置控制 硬编码所有参数
幂等 多次执行不会出错 重复脱敏导致信息丢失
快速失败 检查不通过快速拒绝 链路里卡30秒不返回

5.2 执行顺序的最佳实践

最外层(先拦截、最后返回)
  ├── 安全类中间件(鉴权、限流、CORS)
  ├── 日志类中间件(请求日志)
  ├── 业务类中间件(格式化、脱敏)
  ├── 监控类中间件(耗时、错误率)
  └── 最内层:Agent 核心处理
类型 order 建议 必须最先/最后
鉴权/限流 10-19 ✅ 必须最先(不通过则拒绝)
请求日志 20-29 记录原始请求
输入清洗 30-39 在 Agent 处理前完成
输出格式化 40-49 在 Agent 处理后立即执行
输出脱敏 50-59 ✅ 必须最后(保证干净输出)
性能统计 60-69 包裹整个流程

6. 总结

本文从零构建了两个生产级中间件,覆盖了 OpenClaw 中间件系统的完整知识点:

核心要点

  1. 三种模式,一个入口:前置(before)做输入校验,后置(after)做输出增强,环绕(around)做全流程监控

  2. 洋葱模型:多个中间件形成层层嵌套的执行链——请求从外向内穿过,响应从内向外返回

  3. 敏感词过滤:正则匹配 → 关键词筛 → 脱敏替换 → 审计日志,四步完成安全过滤

  4. 耗时统计:环绕中间件记录全流程耗时,按小时+渠道聚合 P99 延迟

  5. 横切关注点分离:中间件的核心价值——不改核心代码,独立添加任何横向能力

思考题

  1. 你有一个中间件需要耗时较长(如调用外部 API 做内容审核)。如何设计异步中间件,避免阻塞整个消息处理链路?

  2. 两个中间件可能产生冲突——比如"格式化中间件"添加了 Markdown 样式,但"脱敏中间件"把加粗语法也脱敏了。你会如何设计中间件的执行顺序避免这种冲突?

  3. 中间件本身也会出错(比如统计中间件的 Redis 挂了)。如何设计中间件的降级策略——是让整个流程失败,还是跳过出错的中间件继续处理?


参考资料

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐