1. 项目概述:为什么“模型无关”不是一句空话,而是一道必须跨过的工程门槛

你有没有遇到过这样的场景:团队刚用 GPT-4 Turbo 跑通了一个客服对话路由 Agent,客户突然要求“必须切换成国产大模型,且不能改一行业务逻辑”;或者你在做金融合规审核 Agent,昨天还在调用 Qwen2.5-72B 的函数调用能力,今天监管侧要求所有推理必须走本地部署的 DeepSeek-V3,接口协议、流式响应格式、token 计费方式、错误码体系全都不一样——这时候,你写的那套 llm.invoke() 调用链,是不是瞬间变成了一堆需要逐行重写的“技术债”?这正是 LangChain 社区在 Part 29 中正式提出 Model Agnostic Pattern(模型无关模式) 的真实战场。它不是教你怎么调 API,而是教你如何把“调哪个模型”这件事,从代码里彻底抽离出来,变成一个可配置、可热替换、可灰度发布、甚至可按用户 ID 动态路由的运行时决策。而 LLM API Gateway(大模型 API 网关) ,就是支撑这个模式落地的基础设施底座——它不生产模型,但决定谁来响应、怎么响应、响应失败后怎么兜底。我带过三个 AI 应用交付项目,其中两个卡在模型切换环节超过三周,不是因为模型能力不行,而是因为整个调用链路像一捆缠死的电线,剪哪根都断服务。这篇内容,就是我把这捆电线一根根剥开、标上色、装上快插接头的过程。它适合所有正在用 LangChain 构建生产级 AI Agent 的人,尤其是那些已经踩过“硬编码模型名”“手动转换消息格式”“重写 retry 逻辑”这些坑的工程师和架构师。如果你还在 ChatOpenAI(model="gpt-4-turbo") 里写死模型名,那你不是在写应用,是在给未来埋雷。

2. 核心设计思路拆解:从“硬编码调用”到“协议抽象层”的四步跃迁

2.1 为什么传统 LangChain LLM 封装方式注定不可扩展

LangChain 早期的 ChatOpenAI ChatAnthropic ChatQwen 这类封装器,表面看是“开箱即用”,实则暗藏三重耦合陷阱:

  • 协议耦合 ChatOpenAI 强依赖 OpenAI 的 /v1/chat/completions 接口规范,包括 messages 字段必须是 {"role": "user", "content": "xxx"} 结构, response_format 必须是 JSON Schema, tool_choice 必须是 "auto" {"type": "function", "function": {"name": "xxx"}} 。而 Qwen 的 tools 字段叫 functions ,DeepSeek 的 tool_choice enable_thinking ,千问的流式响应是 delta.content ,而 GLM 的是 choices[0].delta.text 。一旦换模型,光是消息序列的 to_messages() 方法就得重写。

  • 行为耦合 ChatOpenAI stream=True 返回的是 AsyncIterator[ChatGenerationChunk] ,而本地 Ollama 模型返回的是 Iterator[dict] ,类型系统完全断裂。更麻烦的是 retry 逻辑——OpenAI 的 429 错误要指数退避,而某国产模型的 429 是永久性配额超限,retry 百次也没用;某云厂商的 503 表示模型实例未就绪,需要轮询健康检查端点,而不是简单 sleep。这些差异被封装器粗暴地“统一”成 BaseLLM.generate() ,结果就是线上报错时,你得翻三份文档才能定位问题。

  • 生命周期耦合 ChatOpenAI 实例初始化时就绑定了 api_key base_url model_name ,这意味着你无法在同一个 Agent 流程中,对不同子任务动态切换模型——比如“法律条款解析”用高精度 72B 模型,“用户情绪判断”用低延迟 7B 模型,“多语言翻译”用专精小模型。你只能写三套几乎一样的 Chain,维护成本翻三倍。

我去年在一个跨境电商业务中就栽在这上面:最初用 ChatOpenAI(model="gpt-3.5-turbo") 做商品描述生成,QPS 上去后成本飙升,想切到 ChatQwen(model="qwen2.5-7b") ,结果发现 qwen2.5-7b 不支持 response_format={"type": "json_object"} ,JSON 输出全靠 prompt engineering + post-process 正则提取,准确率从 99.2% 掉到 83.7%。最后不是换模型,而是给 Qwen 单独写了个 QwenJSONOutputParser ,又额外加了 200 行校验逻辑。这就是“协议耦合”带来的典型熵增。

2.2 Model Agnostic Pattern 的四层抽象设计

LangGraph Part 29 提出的 Model Agnostic Pattern,本质是构建一个四层抽象漏斗,把模型差异全部拦在最外层:

  • 第 1 层:统一输入契约(Unified Input Contract)
    定义一个与任何模型无关的 AIAgentInput 数据类,只包含业务语义字段: user_query: str context: Dict[str, Any] required_tools: List[str] output_format: Literal["text", "json", "structured"] 。所有上游 Agent 节点(如 Router、Retriever)只跟这个契约打交道,绝不碰 messages tools 这些协议层字段。

  • 第 2 层:协议适配器(Protocol Adapter)
    每个模型供应商对应一个 Adapter 类,例如 OpenAIAdapter QwenAdapter DeepSeekAdapter 。它的唯一职责,是把 AIAgentInput 翻译成该模型能理解的原始请求体,并把原始响应翻译回统一的 AIAgentOutput (含 text: str structured_data: Optional[Dict] tool_calls: List[ToolCall] )。Adapter 内部封装所有协议细节:消息格式转换、tool call 解析、流式 chunk 合并、错误码映射(如把 Qwen 的 50001 映射为标准 LLMRateLimitError )。

  • 第 3 层:模型路由网关(Model Routing Gateway)
    这就是 LLM API Gateway 的核心。它接收标准化的 AIAgentInput ,根据预设策略(如 model_policy: "by_user_tier" "by_latency_sla" "by_cost_budget" )查询路由规则引擎,决定调用哪个 Adapter。规则引擎本身是可插拔的——可以是内存中的 Dict 配置,也可以是连接 Redis 的实时策略中心,甚至可以是调用另一个轻量级 LLM 做动态决策(比如用 7B 模型分析 query 复杂度,再决定是否升配到 72B)。

  • 第 4 层:运行时执行器(Runtime Executor)
    一个薄薄的 ModelExecutor 类,只做三件事:1)根据路由结果实例化对应 Adapter;2)调用 Adapter 的 invoke() 方法;3)捕获所有底层异常,统一包装为 AIAgentError 并附带 adapter_name original_error 等上下文。它不关心模型怎么工作,只确保“输入进来,输出出去,出错了有迹可循”。

这个设计最妙的地方在于,当你新增一个模型(比如刚发布的 Kimi 2.0),你只需要写一个新的 KimiAdapter ,实现 adapt_input() adapt_output() 两个方法,然后在网关配置里加一条路由规则,整个系统无需重启、无需改任何业务代码,就能接入新模型。我在上个月就用这套模式,在 2 小时内把一个已上线的合同审查 Agent,从 gpt-4o 切换到了 kimi-pro ,全程零 downtime,用户无感知。

2.3 为什么必须用 LangGraph 而非纯 LangChain 实现

很多人会问:LangChain 本身就有 BaseLLM 抽象,为啥还要 LangGraph?关键在于 状态持久化 节点可组合性 。LangChain 的 Chain 是线性的、一次性的, LLMChain 执行完就销毁。而 LangGraph 的 StateGraph 是有状态的图结构,每个节点(Node)可以读写共享状态 State 。Model Agnostic Pattern 的核心价值,恰恰体现在“状态驱动的动态路由”上:

  • 你可以让 Router 节点根据 State["query_complexity"] 字段,决定调用 high_precision_adapter 还是 low_latency_adapter
  • 你可以让 Fallback 节点监听 State["last_adapter_failure"] ,当 OpenAIAdapter 连续失败 3 次,自动触发 QwenAdapter 降级;
  • 你可以让 Metrics 节点在每次 ModelExecutor 执行前后,记录 adapter_name input_tokens output_tokens latency_ms 到 Prometheus,形成模型级的可观测性大盘。

LangChain 的 Runnable 也能做类似事,但它缺乏原生的状态管理能力。你得自己用 functools.partial 绑定上下文,或者用 threading.local() 存状态,一到异步环境就崩。而 LangGraph 的 State 是深度集成的, State.update() 是原子操作, State.get() 是线程安全的,这才是生产环境需要的确定性。我见过太多团队用 LangChain + 自研状态管理,结果在高并发下出现状态污染,查 bug 查了三天,最后发现是 global 变量被多个协程同时修改。LangGraph 把这个坑直接填平了。

3. 核心组件实现详解:从协议适配器到网关路由的完整代码实录

3.1 统一输入/输出契约定义:用 Pydantic V2 做强类型保障

我们先定义最顶层的契约。注意,这里不用 dataclass ,而用 Pydantic V2 的 BaseModel ,因为它自带验证、序列化、文档生成能力,且与 FastAPI、LangServe 天然兼容:

from typing import List, Dict, Any, Optional, Literal, Union
from pydantic import BaseModel, Field, field_validator
import re

class ToolCall(BaseModel):
    """统一工具调用表示,屏蔽各模型 tool_call 字段差异"""
    name: str = Field(..., description="工具名称")
    arguments: Dict[str, Any] = Field(default_factory=dict, description="工具参数字典")
    id: Optional[str] = Field(None, description="调用ID,用于响应关联")

class AIAgentOutput(BaseModel):
    """模型输出的统一表示"""
    text: str = Field("", description="纯文本输出")
    structured_data: Optional[Dict[str, Any]] = Field(
        None, description="结构化数据,如JSON解析结果"
    )
    tool_calls: List[ToolCall] = Field(default_factory=list, description="工具调用列表")
    metadata: Dict[str, Any] = Field(default_factory=dict, description="元数据,如token用量、延迟")

    @field_validator("structured_data")
    @classmethod
    def validate_structured_data(cls, v):
        if v is not None and not isinstance(v, dict):
            raise ValueError("structured_data must be a dict")
        return v

class AIAgentInput(BaseModel):
    """统一输入契约,业务语义优先"""
    user_query: str = Field(..., description="用户原始查询")
    context: Dict[str, Any] = Field(default_factory=dict, description="上下文信息")
    required_tools: List[str] = Field(default_factory=list, description="必需的工具列表")
    output_format: Literal["text", "json", "structured"] = Field(
        "text", description="期望输出格式"
    )
    max_tokens: int = Field(2048, ge=1, le=32768, description="最大生成token数")
    temperature: float = Field(0.3, ge=0.0, le=2.0, description="采样温度")
    # 注意:这里没有 model_name、api_key、base_url!它们属于网关配置

    @field_validator("user_query")
    @classmethod
    def validate_query_length(cls, v):
        if len(v.strip()) == 0:
            raise ValueError("user_query cannot be empty or whitespace only")
        if len(v) > 128000:  # 限制单次输入长度,防OOM
            raise ValueError("user_query too long, max 128000 chars")
        return v

这个契约的设计哲学是: 只暴露业务需要的字段,隐藏所有技术细节 user_query 是用户说了什么, required_tools 是业务逻辑决定要调哪些工具, output_format 是业务目标(要 JSON 还是纯文本)。至于“用哪个模型”、“走哪个 endpoint”、“key 怎么鉴权”,统统不在这里出现。我坚持这个原则,是因为在三个项目中,业务方提需求时从来不会说“我要调 Qwen 的 /v1/chat/completions”,他们只会说“这个合同条款要解析成 JSON 格式”。契约必须反映业务语言,而不是技术语言。

3.2 协议适配器实现:以 OpenAI 和 Qwen 为例的双向翻译

适配器的核心是 adapt_input() adapt_output() 两个方法。我们以 OpenAIAdapter 为例,展示如何把 AIAgentInput 翻译成 OpenAI 的标准请求体:

import json
from openai import AsyncOpenAI
from openai.types.chat import ChatCompletionMessageParam
from typing import List, Dict, Any, Optional, cast

class OpenAIAdapter:
    def __init__(self, api_key: str, base_url: str = "https://api.openai.com/v1"):
        self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
        self.model_name = "gpt-4o-mini"  # 默认模型,可覆盖

    def adapt_input(self, input_data: AIAgentInput) -> Dict[str, Any]:
        """将 AIAgentInput 翻译为 OpenAI API 请求体"""
        # 构建 messages:系统提示 + 用户查询 + 上下文
        messages: List[ChatCompletionMessageParam] = [
            {"role": "system", "content": "You are a helpful AI assistant."}
        ]
        
        # 添加上下文(如果存在)
        if input_data.context:
            context_str = "\n".join([
                f"{k}: {v}" for k, v in input_data.context.items()
            ])
            messages.append({"role": "system", "content": f"Context:\n{context_str}"})
        
        # 添加用户查询
        messages.append({"role": "user", "content": input_data.user_query})
        
        # 构建 tools(如果需要)
        tools = []
        if input_data.required_tools:
            # 这里假设你有一个 tools registry,根据 name 查找 tool spec
            from my_tools_registry import TOOLS_REGISTRY
            for tool_name in input_data.required_tools:
                if tool_name in TOOLS_REGISTRY:
                    tools.append(TOOLS_REGISTRY[tool_name])
        
        # 构建请求参数
        request_body = {
            "model": self.model_name,
            "messages": messages,
            "max_tokens": input_data.max_tokens,
            "temperature": input_data.temperature,
        }
        
        # 根据 output_format 添加 response_format 或 tool_choice
        if input_data.output_format == "json":
            request_body["response_format"] = {"type": "json_object"}
        elif input_data.required_tools:
            # 如果有工具且没指定 json,则默认 auto
            request_body["tool_choice"] = "auto"
            request_body["tools"] = tools
        
        return request_body

    async def adapt_output(self, raw_response: Any) -> AIAgentOutput:
        """将 OpenAI 原始响应翻译为 AIAgentOutput"""
        from openai.types.chat import ChatCompletion
        completion = cast(ChatCompletion, raw_response)
        
        # 提取文本
        text = ""
        if completion.choices and completion.choices[0].message.content:
            text = completion.choices[0].message.content
        
        # 提取结构化数据(JSON)
        structured_data = None
        if input_data.output_format == "json" and text:
            try:
                structured_data = json.loads(text)
            except json.JSONDecodeError:
                pass  # 交给上层处理
        
        # 提取 tool calls
        tool_calls = []
        if completion.choices and completion.choices[0].message.tool_calls:
            for tc in completion.choices[0].message.tool_calls:
                tool_calls.append(ToolCall(
                    name=tc.function.name,
                    arguments=json.loads(tc.function.arguments),
                    id=tc.id
                ))
        
        # 构建元数据
        metadata = {
            "input_tokens": completion.usage.prompt_tokens if completion.usage else 0,
            "output_tokens": completion.usage.completion_tokens if completion.usage else 0,
            "total_tokens": completion.usage.total_tokens if completion.usage else 0,
            "model": completion.model,
            "finish_reason": completion.choices[0].finish_reason if completion.choices else None
        }
        
        return AIAgentOutput(
            text=text,
            structured_data=structured_data,
            tool_calls=tool_calls,
            metadata=metadata
        )

现在看 QwenAdapter ,重点对比它如何处理 Qwen 特有的差异:

from dashscope import Generation  # Qwen SDK
from dashscope.common.error import RequestFailed

class QwenAdapter:
    def __init__(self, api_key: str, model_name: str = "qwen2.5-7b-instruct"):
        self.api_key = api_key
        self.model_name = model_name

    def adapt_input(self, input_data: AIAgentInput) -> Dict[str, Any]:
        """Qwen 的 messages 格式是 [{"role": "user", "content": "..."}],但 tools 字段叫 functions"""
        messages = [{"role": "user", "content": input_data.user_query}]
        
        # Qwen 不支持 system role,所以把 system 提示和 context 合并到第一个 user message
        system_prompt = "You are a helpful AI assistant."
        if input_data.context:
            context_str = "\n".join([
                f"{k}: {v}" for k, v in input_data.context.items()
            ])
            system_prompt += f"\nContext:\n{context_str}"
        
        # Qwen 的第一消息必须是 user,所以把 system prompt 也塞进去
        messages[0]["content"] = f"{system_prompt}\n\nUser query:\n{input_data.user_query}"
        
        # Qwen 的 tools 字段叫 functions,且格式是 [{"name": "...", "description": "...", "parameters": {...}}]
        functions = []
        if input_data.required_tools:
            from my_tools_registry import TOOLS_REGISTRY
            for tool_name in input_data.required_tools:
                if tool_name in TOOLS_REGISTRY:
                    # Qwen 的 function spec 需要转换
                    qwen_func = {
                        "name": TOOLS_REGISTRY[tool_name]["name"],
                        "description": TOOLS_REGISTRY[tool_name]["description"],
                        "parameters": TOOLS_REGISTRY[tool_name]["parameters"]
                    }
                    functions.append(qwen_func)
        
        request_body = {
            "model": self.model_name,
            "input": {"messages": messages},
            "parameters": {
                "max_tokens": input_data.max_tokens,
                "temperature": input_data.temperature,
            }
        }
        
        if functions:
            request_body["input"]["functions"] = functions
            # Qwen 的 function_call 是 "auto" 或 {"name": "xxx"}
            request_body["parameters"]["function_call"] = "auto"
        
        return request_body

    async def adapt_output(self, raw_response: Any) -> AIAgentOutput:
        """Qwen 响应格式:{"output": {"text": "...", "choices": [...]}}"""
        try:
            # DashScope SDK 的响应结构
            text = raw_response.output.text if raw_response.output.text else ""
            
            structured_data = None
            if input_data.output_format == "json" and text:
                try:
                    structured_data = json.loads(text)
                except json.JSONDecodeError:
                    pass
            
            tool_calls = []
            # Qwen 的 tool call 在 output.choices[0].message.function_call
            if (hasattr(raw_response, 'output') and 
                hasattr(raw_response.output, 'choices') and 
                raw_response.output.choices):
                choice = raw_response.output.choices[0]
                if (hasattr(choice.message, 'function_call') and 
                    choice.message.function_call):
                    fc = choice.message.function_call
                    tool_calls.append(ToolCall(
                        name=fc.name,
                        arguments=json.loads(fc.arguments) if fc.arguments else {},
                        id=None  # Qwen 不返回 id
                    ))
            
            metadata = {
                "input_tokens": raw_response.usage.input_tokens if hasattr(raw_response, 'usage') else 0,
                "output_tokens": raw_response.usage.output_tokens if hasattr(raw_response, 'usage') else 0,
                "model": self.model_name,
                "request_id": raw_response.request_id if hasattr(raw_response, 'request_id') else None
            }
            
            return AIAgentOutput(
                text=text,
                structured_data=structured_data,
                tool_calls=tool_calls,
                metadata=metadata
            )
        except Exception as e:
            # Qwen 的异常类型是 RequestFailed,需统一包装
            raise AIAgentError(f"QwenAdapter error: {str(e)}", adapter_name="qwen")

提示:适配器的 adapt_input() 方法里,我刻意把 context 合并到 user_query ,是因为 Qwen 的 system role 支持不稳定,很多版本会忽略。这是实战中踩出来的坑——不要迷信文档,要以实测为准。我在测试 Qwen2.5-72B 时发现,加了 system role 反而让模型更倾向于复述 system 提示,而不是回答问题。所以适配器不是机械翻译,而是“智能桥接”。

3.3 LLM API Gateway:基于策略的动态路由引擎

网关是整个模式的大脑。我们用一个简单的 ModelGateway 类实现,它支持三种路由策略:

from enum import Enum
from typing import Dict, Any, Callable, Optional, Awaitable
import asyncio
import random

class RoutingStrategy(str, Enum):
    STATIC = "static"  # 固定模型
    ROUND_ROBIN = "round_robin"  # 轮询
    WEIGHTED = "weighted"  # 加权
    CONTEXT_AWARE = "context_aware"  # 上下文感知(需外部 LLM)

class ModelGateway:
    def __init__(self):
        # 注册所有可用的 Adapter
        self.adapters: Dict[str, Callable[[], Any]] = {}
        # 路由规则:strategy -> config
        self.routing_rules: Dict[str, Dict[str, Any]] = {}
        # 轮询计数器
        self.rr_counter = 0
    
    def register_adapter(self, name: str, factory: Callable[[], Any]):
        """注册 Adapter 工厂函数,延迟实例化"""
        self.adapters[name] = factory
    
    def set_routing_rule(self, strategy: RoutingStrategy, config: Dict[str, Any]):
        """设置路由策略"""
        self.routing_rules[strategy.value] = config
    
    async def route_to_adapter(self, input_data: AIAgentInput) -> Any:
        """根据策略选择 Adapter 实例"""
        strategy = self.routing_rules.get("strategy", "static")
        
        if strategy == "static":
            model_name = self.routing_rules.get("static", "openai")
            if model_name not in self.adapters:
                raise ValueError(f"Adapter {model_name} not registered")
            return self.adapters[model_name]()
        
        elif strategy == "round_robin":
            available_models = list(self.adapters.keys())
            if not available_models:
                raise ValueError("No adapters registered for round-robin")
            chosen_model = available_models[self.rr_counter % len(available_models)]
            self.rr_counter += 1
            return self.adapters[chosen_model]()
        
        elif strategy == "weighted":
            weights = self.routing_rules.get("weights", {})
            if not weights:
                raise ValueError("Weights config missing for weighted routing")
            models = list(weights.keys())
            weights_list = [weights[m] for m in models]
            chosen_model = random.choices(models, weights=weights_list)[0]
            return self.adapters[chosen_model]()
        
        elif strategy == "context_aware":
            # 这里可以调用一个轻量级 LLM 来分析 input_data.context
            # 例如:用 7B 模型判断 query 是否涉及法律术语,决定是否升配
            return await self._context_aware_route(input_data)
        
        else:
            raise ValueError(f"Unknown routing strategy: {strategy}")
    
    async def _context_aware_route(self, input_data: AIAgentInput) -> Any:
        """上下文感知路由:用小模型分析 query 复杂度"""
        # 简化版:关键词匹配
        query_lower = input_data.user_query.lower()
        if any(word in query_lower for word in ["contract", "clause", "legal", "compliance"]):
            return self.adapters.get("deepseek", self.adapters["openai"])()
        elif any(word in query_lower for word in ["translate", "language", "中文"]):
            return self.adapters.get("qwen", self.adapters["openai"])()
        else:
            return self.adapters["openai"]()

# 全局网关实例
GATEWAY = ModelGateway()

# 注册适配器(实际项目中应从配置中心加载)
GATEWAY.register_adapter("openai", lambda: OpenAIAdapter(
    api_key="sk-...", 
    base_url="https://api.openai.com/v1"
))
GATEWAY.register_adapter("qwen", lambda: QwenAdapter(
    api_key="YOUR_QWEN_KEY",
    model_name="qwen2.5-7b-instruct"
))
GATEWAY.register_adapter("deepseek", lambda: DeepSeekAdapter(
    api_key="YOUR_DEEPSEEK_KEY"
))

# 设置路由策略:默认静态,生产环境可切为 weighted
GATEWAY.set_routing_rule(RoutingStrategy.STATIC, {"static": "openai"})
# GATEWAY.set_routing_rule(RoutingStrategy.WEIGHTED, {"weights": {"openai": 0.6, "qwen": 0.4}})

注意: register_adapter 接收的是工厂函数 lambda: Adapter() ,而不是 Adapter 实例。这是因为 Adapter 可能包含网络连接、认证状态等有状态资源,必须按需创建,避免多线程/协程间状态污染。我在一个高并发客服系统中就吃过亏:把 OpenAIAdapter 实例存为全局变量,结果 api_key 被多个请求并发修改,导致一半请求 401。用工厂函数,每次 route_to_adapter() 都新建干净实例,彻底规避这个问题。

3.4 运行时执行器:统一异常处理与可观测性注入

最后是 ModelExecutor ,它把适配器、网关、重试、监控串起来:

import time
import logging
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from typing import Type, Dict, Any

class AIAgentError(Exception):
    """统一 Agent 错误基类"""
    def __init__(self, message: str, adapter_name: str = "", original_error: Exception = None):
        super().__init__(message)
        self.adapter_name = adapter_name
        self.original_error = original_error

class ModelExecutor:
    def __init__(self, gateway: ModelGateway):
        self.gateway = gateway
        self.logger = logging.getLogger(__name__)
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type((ConnectionError, TimeoutError, AIAgentError))
    )
    async def execute(self, input_data: AIAgentInput) -> AIAgentOutput:
        """执行模型调用,含重试、监控、错误统一"""
        start_time = time.time()
        adapter = None
        
        try:
            # 1. 路由获取 Adapter
            adapter = await self.gateway.route_to_adapter(input_data)
            
            # 2. 适配输入
            request_body = adapter.adapt_input(input_data)
            
            # 3. 调用模型(Adapter 内部实现)
            raw_response = await adapter.invoke(request_body)  # Adapter 需实现 invoke 方法
            
            # 4. 适配输出
            output = await adapter.adapt_output(raw_response)
            
            # 5. 注入耗时元数据
            latency_ms = int((time.time() - start_time) * 1000)
            output.metadata["latency_ms"] = latency_ms
            output.metadata["adapter_name"] = adapter.__class__.__name__.replace("Adapter", "")
            
            self.logger.info(
                f"Model execution success: adapter={output.metadata['adapter_name']} "
                f"latency={latency_ms}ms tokens={output.metadata.get('total_tokens', 0)}"
            )
            
            return output
            
        except Exception as e:
            latency_ms = int((time.time() - start_time) * 1000)
            error_msg = f"Model execution failed after {latency_ms}ms: {str(e)}"
            
            # 包装为统一错误
            if not isinstance(e, AIAgentError):
                wrapped_error = AIAgentError(
                    error_msg, 
                    adapter_name=adapter.__class__.__name__.replace("Adapter", "") if adapter else "unknown",
                    original_error=e
                )
                raise wrapped_error
            else:
                raise e  # 已是统一错误,直接抛出

# 初始化执行器
EXECUTOR = ModelExecutor(GATEWAY)

这个 ModelExecutor 的价值在于:它把所有“脏活累活”都包圆了。你作为业务开发者,只需要调 EXECUTOR.execute(input_data) ,就能得到一个干净的 AIAgentOutput ,里面包含了文本、结构化数据、工具调用、token 用量、延迟、模型名——所有你需要的信息,而且格式统一。再也不用在每个 Chain 里写 try...except ,不用手动算 time.time() ,不用纠结 openai.RateLimitError dashscope.RequestFailed 怎么 catch。这就是抽象的价值。

4. LangGraph 集成:将 Model Agnostic Pattern 编排进有状态的 Agent 图

4.1 定义可复用的 State 和 Node

LangGraph 的威力,在于把 ModelExecutor 封装成一个可复用的 Node。我们定义一个通用的 llm_node

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Sequence
import operator

class GraphState(TypedDict):
    """LangGraph 共享状态"""
    input_data: AIAgentInput
    llm_output: Optional[AIAgentOutput] = None
    error: Optional[str] = None
    retry_count: int = 0
    # 可以添加更多字段,如 retrieved_docs, router_decision 等

# 定义一个通用的 LLM 调用节点
async def llm_node(state: GraphState) -> GraphState:
    """调用 LLM 的通用节点"""
    try:
        # 从 state 中取出 input_data
        input_data = state["input_data"]
        
        # 执行模型调用
        output = await EXECUTOR.execute(input_data)
        
        return {
            "llm_output": output,
            "error": None,
            "retry_count": state.get("retry_count", 0)
        }
    except AIAgentError as e:
        # 记录错误,但不中断流程,留给 fallback 节点处理
        error_msg = f"LLM call failed: {e.adapter_name} - {str(e)}"
        return {
            "error": error_msg,
            "llm_output": None,
            "retry_count": state.get("retry_count", 0) + 1
        }

# 定义 fallback 节点:当主 LLM 失败时,降级到备用模型
async def fallback_node(state: GraphState) -> GraphState:
    """降级节点:切换到备用 Adapter"""
    if not state.get("error"):
        return state
    
    # 简单降级:从 openai 切到 qwen
    current_adapter = "openai"
    fallback_adapter = "qwen"
    
    # 修改 input_data 的上下文,标记为降级调用
    new_input = state["input_data"].model_copy(update={
        "context": {
            **state["input_data"].context,
            "fallback_triggered": True,
            "original_error": state["error"]
        }
    })
    
    # 临时覆盖网关策略
    original_strategy = GATEWAY.routing_rules.get("strategy")
    GATEWAY.set_routing_rule(RoutingStrategy.STATIC, {"static": fallback_adapter})
    
    try:
        output = await EXECUTOR.execute(new_input)
        return {
            "llm_output": output,
            "error": None,
            "retry_count": state.get("retry_count", 0)
        }
    except Exception as e:
        # 降级也失败,返回原始错误
        return {
            "error": f"Fallback to {fallback_adapter} also failed: {str(e)}",
            "llm_output": None,
            "retry_count": state.get("retry_count", 0) + 1
        }
    finally:
        # 恢复原始策略
        if original_strategy:
            GATEWAY.set_routing_rule(RoutingStrategy.STATIC, {"static": current_adapter})

# 定义 router 节点:根据 query 复杂度决定模型
async def router_node(state: GraphState) -> str:
    """路由节点:返回下一个节点名"""
    input_data = state["input_data"]
    
    # 简单规则:字符数 > 5000 且含法律术语,走 high_precision
    if (len(input_data.user_query) > 5000 and 
        any(term in input_data.user_query.lower() for term in ["contract", "clause", "section"])):
        return "high_precision_llm"
    else:
        return "default_llm"

4.2 构建完整的 Agent 图:支持动态路由、降级、重试

现在,我们把所有节点编排成一个有状态的图:

# 创建图
workflow = StateGraph(GraphState)

# 添加节点
workflow.add_node("router", router_node)
workflow.add_node("default_llm", llm_node)
workflow.add_node("high_precision_llm", llm_node)
workflow.add_node("fallback", fallback_node)
workflow.add_node("end", lambda state: state)  # 终止节点

# 设置边
workflow.set_conditional_entry_point(
    router_node,
    {
        "default_llm": "default_llm",
        "high_precision_llm": "high_precision_llm"
    }
)

# default_llm 成功则到 end,失败
Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐