系列「企业级 AI Agent 实现拆解」E24 篇。上一篇讲了 Callback 系统:给 Agent 管道装上"监听器"。这篇讲中间件(Middleware)——在 Agent 执行流的关键节点主动修改行为:改消息历史、替换工具输出、包装模型调用。

读完这篇你会知道

  • TypedChatModelAgentMiddleware 接口的 9 个钩子点:每个点能做什么
  • TypedBaseChatModelAgentMiddleware:嵌入它省掉 8 个空实现
  • 工具错误处理:WrapToolWithErrorHandler 把 error 变成字符串返回
  • patchtoolcalls:为什么会有"悬空工具调用",怎么自动修补
  • reduction:两阶段上下文压缩——截断(Truncation)+ 清理(Clear)
  • 中间件 vs Callback:该用哪个

一、为什么需要中间件

Callback 是只读的观测——你看到发生了什么,但不能改它。

中间件是可写的拦截——你可以:

  • 在模型调用前改写消息历史(删掉太长的工具输出)
  • 在工具调用返回后修改结果(把 error 转成友好字符串)
  • 在 Agent 开始时动态加工具或修改 instruction
  • 用自己的模型替换框架的模型(failover)

这些能力都需要中间件,Callback 做不到。


二、接口:9 个钩子

// adk/handler.go
type TypedChatModelAgentMiddleware[M MessageType] interface {
    // 1. Agent 运行前:改 instruction / 工具列表
    BeforeAgent(ctx, runCtx *ChatModelAgentContext) (ctx, *ChatModelAgentContext, error)

    // 2. Agent 成功结束后(最终回答 / ReturnDirectly)
    AfterAgent(ctx, state *TypedChatModelAgentState[M]) (ctx, error)

    // 3. 每次调模型前:改写消息历史 + ToolInfos
    BeforeModelRewriteState(ctx, state *TypedChatModelAgentState[M], mc *TypedModelContext[M]) (ctx, *TypedChatModelAgentState[M], error)

    // 4. 模型返回后:对模型回复做后处理
    AfterModelRewriteState(ctx, state *TypedChatModelAgentState[M], mc *TypedModelContext[M]) (ctx, *TypedChatModelAgentState[M], error)

    // 5-6. 包装同步 / 流式工具调用
    WrapInvokableToolCall(ctx, endpoint InvokableToolCallEndpoint, tCtx *ToolContext) (InvokableToolCallEndpoint, error)
    WrapStreamableToolCall(ctx, endpoint StreamableToolCallEndpoint, tCtx *ToolContext) (StreamableToolCallEndpoint, error)

    // 7-8. 包装 Enhanced 同步 / 流式工具调用
    WrapEnhancedInvokableToolCall(ctx, endpoint EnhancedInvokableToolCallEndpoint, tCtx *ToolContext) (EnhancedInvokableToolCallEndpoint, error)
    WrapEnhancedStreamableToolCall(ctx, endpoint EnhancedStreamableToolCallEndpoint, tCtx *ToolContext) (EnhancedStreamableToolCallEndpoint, error)

    // 9. 包装模型本身(retry / failover / 流式事件注入)
    WrapModel(ctx, m model.BaseModel[M], mc *TypedModelContext[M]) (model.BaseModel[M], error)
}

9 个方法全部实现太繁琐。嵌入 TypedBaseChatModelAgentMiddleware 可以得到所有方法的空实现(no-op),只需覆盖关心的那个:

type MyMiddleware struct {
    *adk.TypedBaseChatModelAgentMiddleware[*schema.Message]  // 提供 8 个默认 no-op
}

// 只实现需要的钩子
func (m *MyMiddleware) BeforeModelRewriteState(ctx context.Context,
    state *adk.ChatModelAgentState, mc *adk.ModelContext) (context.Context, *adk.ChatModelAgentState, error) {

    // 在这里改写消息历史
    return ctx, state, nil
}

注册到 Agent:

agent, _ := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
    Name:        "my_agent",
    Model:       m,
    Middlewares: []adk.ChatModelAgentMiddleware{&MyMiddleware{}},
})

三、钩子时序

BeforeAgent
    │
    ├── [ReAct 循环开始]
    │
    ├── BeforeModelRewriteState  ← 改消息历史、改 ToolInfos
    │       │
    │       ├── WrapModel → 模型调用
    │       │
    │       └── AfterModelRewriteState  ← 处理模型回复
    │
    ├── WrapInvokableToolCall / WrapStreamableToolCall  ← 工具执行时包装
    │
    ├── [如果还需继续:回到 BeforeModelRewriteState]
    │
    └── AfterAgent  ← 成功结束

关键点

  • BeforeModelRewriteState 返回的 state 会持久化到 Agent 内部状态,影响后续所有轮次;WrapModel 对消息的修改不持久化,只影响当次调用
  • WrapInvokableToolCall 在每次工具执行时调用,不是编译时一次性调用
  • AfterAgent 只在成功结束时触发;超出最大迭代次数、context 取消时不触发

四、工具错误处理:WrapToolWithErrorHandler

最常用的工具层中间件——把工具的 error 转成字符串,避免 Agent 因工具报错而崩溃(来自 components/tool/utils/error_handler.go):

import "github.com/cloudwego/eino/components/tool/utils"

wrappedTool := utils.WrapToolWithErrorHandler(myTool,
    func(ctx context.Context, err error) string {
        // 把 error 变成对 LLM 友好的文字
        return fmt.Sprintf("工具调用失败:%v。请尝试其他方式。", err)
    })

内部逻辑error_handler.go:141):

func (s *errorHelper) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
    result, err := s.i(ctx, argumentsInJSON, opts...)
    if _, ok := compose.IsInterruptRerunError(err); ok {
        return result, err  // 中断错误不拦截,透传
    }
    if err != nil {
        return s.h(ctx, err), nil  // 普通错误 → 转字符串,不报错
    }
    return result, nil
}

一个关键细节:中断错误(InterruptRerunError)不会被拦截——HITL 中断需要透传给框架,ErrorHandler 不应该把它吃掉。


五、patchtoolcalls:修补悬空工具调用

什么是悬空工具调用?

模型输出了一个 tool_call(assistant 消息),但还没等工具返回结果,用户就发了新消息(比如说"算了,不用了")。这时候消息历史里有一个 assistant 消息声称调用了某工具,但后面没有对应的 tool 消息。许多 LLM API 对这种情况会报错。

patchtoolcalls 中间件在每次调模型前扫描消息历史,自动为没有对应响应的工具调用插入占位 tool 消息:

import "github.com/cloudwego/eino/adk/middlewares/patchtoolcalls"

mw, _ := patchtoolcalls.New(ctx, &patchtoolcalls.Config{
    // 可选:自定义占位内容,不填用默认值
    PatchedContentGenerator: func(ctx context.Context, toolName, toolCallID string) (string, error) {
        return fmt.Sprintf("工具 %s(ID: %s)的调用已被取消", toolName, toolCallID), nil
    },
})

agent, _ := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
    Middlewares: []adk.ChatModelAgentMiddleware{mw},
    // ...
})

默认占位内容(patchtoolcalls.go:261):

"Tool call %s with id %s was canceled - another message came in before it could be completed."
// 中文环境:
"工具调用 %s(ID 为 %s)已被取消——在其完成之前收到了另一条消息。"

框架会根据运行环境自动选中文还是英文提示(internal.SelectPrompt)。


六、reduction:两阶段上下文压缩

这是最复杂也最实用的中间件,解决一个长期运行的 Agent 的核心问题:工具输出越积越多,Token 总量超过模型上下文窗口限制

它分两个阶段处理:

阶段一:截断(Truncation)

触发时机:工具执行完毕后立即

如果工具输出超过 MaxLengthForTrunc(默认 50000 字符),把全文保存到 Backend(文件系统或自定义存储),把工具消息替换成一段"截断通知",通知 Agent 可以用 read_file 工具去取完整内容:

[原始工具输出 123456 字符已被截断。
文件路径:/tmp/trunc/tool_call_id_xxx
预览(前 12500 字):... 
预览(后 12500 字):...]

阶段二:清理(Clear)

触发时机:每次调模型前BeforeModelRewriteState)。

如果当前消息历史的总 Token 数超过 MaxTokensForClear(默认 160000),遍历历史消息,把老的工具调用+响应对替换成占位符,并把原内容卸载到 Backend。Agent 需要时可以用 read_file 取回。

import (
    "github.com/cloudwego/eino/adk/middlewares/reduction"
    "github.com/cloudwego/eino/adk/filesystem"
)

mw, _ := reduction.New(ctx, &reduction.Config{
    Backend:          filesystem.NewLocalBackend("/tmp/agent_offload"),  // 存储后端
    ReadFileToolName: "read_file",    // Agent 用这个工具取回卸载的内容
    MaxLengthForTrunc: 50000,         // 单次工具输出超过这个字符数就截断
    MaxTokensForClear: 160000,        // 总 Token 超过这个就触发清理
    ClearRetentionSuffixLimit: 2,     // 保留最近 2 轮工具调用不清理
    TokenCounter: func(ctx context.Context, msgs []*schema.Message, tools []*schema.ToolInfo) (int64, error) {
        // 推荐用真实 tokenizer 代替默认的 字符数/4 估算
        return countTokens(msgs, tools), nil
    },
})

两阶段的分工

  • 截断是即时的——单个超大工具输出立刻处理,避免单次就撑爆上下文
  • 清理是累积触发的——多轮积累后统一压缩,减少频繁清理的开销

ClearAtLeastTokens(可选):如果设置了这个值,只有当清理操作能实际节省至少 N 个 Token 时才执行,避免因清理量太少而破坏 Prompt Cache(KV Cache)。


七、自定义中间件示例:限速工具调用

type RateLimitMiddleware struct {
    *adk.TypedBaseChatModelAgentMiddleware[*schema.Message]
    limiter *rate.Limiter
}

func NewRateLimitMiddleware(rps float64) *RateLimitMiddleware {
    return &RateLimitMiddleware{
        limiter: rate.NewLimiter(rate.Limit(rps), 1),
    }
}

// 只覆盖 WrapInvokableToolCall,其余 8 个方法用 TypedBase 提供的 no-op
func (m *RateLimitMiddleware) WrapInvokableToolCall(ctx context.Context,
    endpoint adk.InvokableToolCallEndpoint,
    tCtx *adk.ToolContext) (adk.InvokableToolCallEndpoint, error) {

    return func(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
        if err := m.limiter.Wait(ctx); err != nil {
            return "", fmt.Errorf("rate limit: %w", err)
        }
        return endpoint(ctx, argumentsInJSON, opts...)
    }, nil
}

八、中间件 vs Callback:选哪个

需求 用什么
记录日志 / 上报 trace Callback
统计 Token 用量 Callback
不改数据,只观测 Callback
改写消息历史 Middleware(BeforeModelRewriteState
工具出错转字符串 WrapToolWithErrorHandler
给工具调用加限速 Middleware(WrapInvokableToolCall
动态修改 instruction Middleware(BeforeAgent
模型 failover / retry Middleware(WrapModel
上下文 Token 超限压缩 reduction 中间件
修复悬空工具调用 patchtoolcalls 中间件

小结

Eino 的中间件系统通过 TypedChatModelAgentMiddleware 接口提供 9 个钩子,覆盖 Agent 执行流的每一个关键节点。
TypedBaseChatModelAgentMiddleware 嵌入让你只实现关心的方法。三个现成中间件解决了最常见的工程问题:WrapToolWithErrorHandler 把工具错误变成友好字符串;patchtoolcalls 修补用户打断造成的悬空工具调用;reduction 用两阶段策略(立即截断 + 累积清理)管理长对话的 Token 上限。
核心区别:Callback 只读,Middleware 可写——观测用 Callback,改行为用 Middleware。

下篇继续。


代码来源:cloudwego/eino · cloudwego/eino-examples

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐