手写 Function Calling 引擎:从 JSON Schema 解析到工具路由与流式执行
前言
Function Calling 是当前 AI Agent 系统的核心能力之一。当大语言模型需要调用外部工具时(搜索、计算、查数据库、调用 API),需要一个标准的协议来定义工具、解析模型输出、执行函数并返回结果。
OpenAI 定义了业界主流的 Function Calling 规范——基于 JSON Schema 描述工具接口,模型返回结构化参数,由外部系统执行。但很多开发者只会在商业平台上调用 API,对底层的全链路实现缺乏理解。
本文将从零搭建一个生产级的 Function Calling 引擎,覆盖以下核心环节:
- 工具发现与 Schema 定义——如何灵活定义工具接口并动态注册
- Schema 强类型校验——用 Pydantic 实现 JSON Schema 兼容的校验层
- 复合工具编排——支持多工具并行、依赖链路和异常回退
- 流式执行引擎——通过 Generator 异步分批执行工具并推送状态
- 华为云 MaaS 集成——将引擎适配到华为云推理服务
全文代码可直接运行依赖:pip install pydantic httpx jsonref。
一、架构总览
完整的 Function Calling 引擎分为 5 层:
┌─────────────────────────────────────┐
│ 应用层(用户代码) │
│ query → 工具选择 → 执行 → 返回结果 │
├─────────────────────────────────────┤
│ 执行引擎层 │
│ 流式执行 / 并行执行 / 异常回退 │
├─────────────────────────────────────┤
│ 路由决策器 │
│ 语义匹配 / 关键词匹配 / 自动路由 │
├─────────────────────────────────────┤
│ Schema 解析与校验层 │
│ JSON Schema → Pydantic 模型转换 │
├─────────────────────────────────────┤
│ 工具注册与发现层 │
│ 静态注册 / 动态扫描 / 远程发现 │
└─────────────────────────────────────┘
二、工具定义与注册
2.1 工具描述规范
首先定义一个统一的工具描述接口,兼容 OpenAI Function Calling 规范(tools 参数格式):
from typing import Any
from enum import Enum
from pydantic import BaseModel, Field
class ToolParameter(BaseModel):
"""工具参数的描述,等价于 JSON Schema 的单个属性"""
name: str = Field(description="参数名称")
type: str = Field(default="string", description="参数类型: string/number/integer/boolean/array/object")
description: str = Field(default="", description="参数描述")
required: bool = Field(default=False, description="是否必填")
enum: list[str] | None = Field(default=None, description="枚举值(如有)")
class ToolSchema(BaseModel):
"""完整的工具 Schema,符合 OpenAI tools 规范"""
name: str = Field(description="工具名称,唯一标识")
description: str = Field(description="工具描述,供模型理解用途")
parameters: dict[str, Any] = Field(default={}, description="JSON Schema 格式的参数定义")
def to_openai_format(self) -> dict:
"""转换为 OpenAI API 的 tools 格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
}
}
2.2 工具注册器
工具注册器管理所有可用工具的注册和查找。支持三种发现机制。
import inspect
from typing import Callable, Any
class ToolRegister:
"""工具注册器,管理工具的全生命周期"""
def __init__(self):
self._tools: dict[str, ToolSchema] = {}
self._handlers: dict[str, Callable] = {}
def register(
self,
name: str,
description: str = "",
parameters: dict | None = None,
):
"""装饰器方式注册工具"""
def decorator(func: Callable):
schema = ToolSchema(
name=name,
description=description or func.__doc__ or "",
parameters=parameters or self._infer_params(func),
)
self._tools[name] = schema
self._handlers[name] = func
return func
return decorator
def _infer_params(self, func: Callable) -> dict:
"""从函数签名自动推导参数 Schema(简易版本)"""
sig = inspect.signature(func)
properties = {}
required = []
for name, param in sig.parameters.items():
typ = str if param.annotation is inspect.Parameter.empty else param.annotation
json_type = self._pytype_to_json(typ)
properties[name] = {
"type": json_type,
"description": f"参数 {name}"
}
if param.default is inspect.Parameter.empty:
required.append(name)
return {
"type": "object",
"properties": properties,
"required": required,
}
def _pytype_to_json(self, typ) -> str:
mapping = {
str: "string",
int: "integer",
float: "number",
bool: "boolean",
list: "array",
dict: "object",
}
return mapping.get(typ, "string")
def get_schema(self, name: str) -> ToolSchema | None:
return self._tools.get(name)
def get_handler(self, name: str) -> Callable | None:
return self._handlers.get(name)
def list_tools(self) -> list[ToolSchema]:
return list(self._tools.values())
def to_openai_tools(self) -> list[dict]:
"""返回 OpenAI ChatCompletion 的 tools 参数"""
return [t.to_openai_format() for t in self._tools.values()]
2.3 内置工具示例
import json
import httpx
register = ToolRegister()
@register.register(
name="get_current_time",
description="获取当前时间",
parameters={
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": "时区,如 Asia/Shanghai, America/New_York",
"enum": ["Asia/Shanghai", "America/New_York", "Europe/London", "Asia/Tokyo"]
}
},
"required": ["timezone"],
}
)
def get_current_time(timezone: str = "Asia/Shanghai") -> str:
"""获取指定时区的当前时间"""
from datetime import datetime
import pytz
tz = pytz.timezone(timezone)
now = datetime.now(tz)
return now.strftime("%Y-%m-%d %H:%M:%S %Z")
@register.register(
name="web_search",
description="搜索互联网信息,返回前5条结果摘要",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词",
},
"count": {
"type": "integer",
"description": "返回结果数量,默认5",
}
},
"required": ["query"],
}
)
async def web_search(query: str, count: int = 5) -> str:
"""使用 httpx 执行简单的搜索(示例使用模拟实现)"""
# 实际场景可集成 SerpAPI / Bing Search 等
return f"[搜索结果] 关于 '{query}' 获得 {count} 条结果"
@register.register(
name="calculator",
description="执行数学计算,支持四则运算",
parameters={
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "数学表达式,如 '2 + 3 * 4'",
}
},
"required": ["expression"],
}
)
def calculator(expression: str) -> str:
"""安全执行数学表达式"""
import ast
import operator as op
allowed_ops = {
ast.Add: op.add, ast.Sub: op.sub, ast.Mult: op.mul,
ast.Div: op.truediv, ast.Pow: op.pow, ast.USub: op.neg,
ast.Mod: op.mod, ast.FloorDiv: op.floordiv,
}
def eval_expr(node):
if isinstance(node, ast.Expression):
return eval_expr(node.body)
if isinstance(node, ast.Constant):
return node.n if isinstance(node.n, (int, float)) else node.value
if isinstance(node, ast.BinOp):
return allowed_ops[type(node.op)](eval_expr(node.left), eval_expr(node.right))
if isinstance(node, ast.UnaryOp):
return allowed_ops[type(node.op)](eval_expr(node.operand))
raise ValueError(f"不支持的表达式类型: {type(node)}")
try:
tree = ast.parse(expression, mode='eval')
result = eval_expr(tree)
return str(result)
except Exception as e:
return f"计算错误: {e}"
三、Schema 强类型校验
模型返回的 arguments 是 JSON 字符串,可能存在格式错误或参数不合法。我们需要一个可靠的校验层。
from pydantic import BaseModel, create_model, ValidationError
from typing import Any
class SchemaValidator:
"""将 JSON Schema 转换为 Pydantic 模型并执行校验"""
@staticmethod
def _json_schema_to_pydantic(name: str, schema: dict) -> type[BaseModel]:
"""将 JSON Schema 转换为 Pydantic 模型类"""
properties = schema.get("properties", {})
required = set(schema.get("required", []))
fields = {}
for prop_name, prop_schema in properties.items():
field_type = SchemaValidator._resolve_type(prop_schema)
description = prop_schema.get("description", "")
if prop_name in required:
fields[prop_name] = (field_type, Field(..., description=description))
else:
default = prop_schema.get("default")
fields[prop_name] = (Optional[field_type], Field(default=default, description=description))
return create_model(name, **fields)
@staticmethod
def _resolve_type(schema: dict) -> type:
js_type = schema.get("type", "string")
enum_vals = schema.get("enum")
if enum_vals:
from typing import Literal
return Literal[tuple(enum_vals)]
type_map = {
"string": str,
"integer": int,
"number": float,
"boolean": bool,
"array": list,
"object": dict,
}
return type_map.get(js_type, Any)
def validate(self, name: str, schema: dict, arguments: dict | str) -> dict:
"""
校验参数:
1. 如果 arguments 是 JSON 字符串,先解析
2. 转换成 Pydantic 模型校验
3. 返回校验后的合法参数
"""
if isinstance(arguments, str):
try:
arguments = json.loads(arguments)
except json.JSONDecodeError as e:
raise ValueError(f"JSON 解析错误: {e}")
model = self._json_schema_to_pydantic(name, schema)
try:
validated = model(**arguments)
return validated.model_dump()
except ValidationError as e:
raise ValueError(f"参数校验失败: {e.errors()}")
四、函数执行引擎
4.1 同步执行引擎
from typing import Any
import json
class FunctionCallingEngine:
"""
Function Calling 核心执行引擎。
接收模型返回的 function_call 调用请求,查找并执行对应工具。
"""
def __init__(self, register: ToolRegister):
self.register = register
self.validator = SchemaValidator()
self.history: list[dict] = []
def execute(self, tool_name: str, arguments: dict | str) -> dict:
"""
执行一个工具调用。
返回格式兼容 OpenAI 的 tool message。
"""
schema = self.register.get_schema(tool_name)
if not schema:
raise ValueError(f"未知工具: {tool_name}")
# 校验参数
validated_args = self.validator.validate(
tool_name,
schema.parameters,
arguments,
)
# 执行
handler = self.register.get_handler(tool_name)
if handler is None:
raise ValueError(f"工具 {tool_name} 没有注册处理器")
result = handler(**validated_args)
# 返回标准格式
response = {
"role": "tool",
"tool_call_id": f"call_{tool_name}",
"name": tool_name,
"content": str(result),
}
self.history.append(response)
return response
4.2 流式执行引擎
生产环境中,模型可能发起多个并行的工具调用(例如同时查天气和算价格)。流式引擎支持分批执行并逐步推送状态。
from typing import Generator
import asyncio
class StreamingFunctionEngine(FunctionCallingEngine):
"""
流式执行引擎:
1. 接收批量 tool_calls
2. 通过 Generator 逐步 yield 每个工具的执行结果
3. 支持并行执行
"""
def stream_execute(
self,
tool_calls: list[dict],
parallel: bool = False,
) -> Generator[dict, None, list[dict]]:
"""
流式执行多个工具调用。
Args:
tool_calls: [{"name": "...", "arguments": {...}}, ...]
parallel: 是否启用并行执行(只对 async 工具生效)
Yields:
每个工具的执行进展
Returns:
所有执行结果的列表
"""
results = []
total = len(tool_calls)
for i, call in enumerate(tool_calls):
name = call.get("name", call.get("function", {}).get("name", ""))
args = call.get("arguments", call.get("function", {}).get("arguments", {}))
# Yield 进度
yield {
"type": "tool_start",
"tool_name": name,
"index": i,
"total": total,
}
try:
result = self.execute(name, args)
results.append(result)
yield {
"type": "tool_end",
"tool_name": name,
"index": i,
"content": result["content"][:50] + "..." if len(result["content"]) > 50 else result["content"],
}
except Exception as e:
err_msg = f"工具 {name} 执行失败: {e}"
results.append({"role": "tool", "name": name, "content": err_msg})
yield {
"type": "tool_error",
"tool_name": name,
"index": i,
"error": str(e),
}
yield {
"type": "tool_complete",
"total": total,
"success": len([r for r in results if "error" not in r.get("content", "")]),
}
return results
4.3 异步并行执行
class AsyncFunctionEngine(FunctionCallingEngine):
"""异步执行引擎,支持并行调用多个工具"""
async def execute_async(self, tool_name: str, arguments: dict | str) -> dict:
"""异步执行单个工具"""
schema = self.register.get_schema(tool_name)
if not schema:
raise ValueError(f"未知工具: {tool_name}")
validated_args = self.validator.validate(tool_name, schema.parameters, arguments)
handler = self.register.get_handler(tool_name)
if handler is None:
raise ValueError(f"工具 {tool_name} 未注册 handler")
# 判断 handler 是否是 async
if asyncio.iscoroutinefunction(handler):
result = await handler(**validated_args)
else:
result = handler(**validated_args)
response = {
"role": "tool",
"tool_call_id": f"call_{tool_name}",
"name": tool_name,
"content": str(result),
}
self.history.append(response)
return response
async def execute_parallel(self, tool_calls: list[dict]) -> list[dict]:
"""并行执行多个工具(同时执行,全部完成后返回)"""
tasks = []
for call in tool_calls:
name = call.get("name", call.get("function", {}).get("name", ""))
args = call.get("arguments", call.get("function", {}).get("arguments", {}))
tasks.append(self.execute_async(name, args))
results = await asyncio.gather(*tasks, return_exceptions=True)
final_results = []
for i, result in enumerate(tasks):
if isinstance(result, Exception):
final_results.append({
"role": "tool",
"name": tool_calls[i].get("name"),
"content": f"执行失败: {result}",
})
else:
final_results.append(result)
return final_results
五、复合工具编排
实际场景中工具调用之间常有依赖关系——先搜索获取信息,再用结果计算。我们用 DAG(有向无环图)来编排。
from dataclasses import dataclass, field
@dataclass
class ToolCallNode:
"""DAG 中的一个工具调用节点"""
name: str
arguments: dict = field(default_factory=dict)
depends_on: list[str] = field(default_factory=list)
fallback_name: str | None = None
fallback_args: dict | None = None
def to_dict(self) -> dict:
return {
"name": self.name,
"arguments": self.arguments,
"depends_on": self.depends_on,
"fallback_name": self.fallback_name,
}
class DAGExecutor:
"""
DAG 编排执行器:按依赖关系拓扑排序,
依次执行工具,支持失败回退。
"""
def __init__(self, engine: FunctionCallingEngine):
self.engine = engine
def execute_dag(self, nodes: list[ToolCallNode]) -> dict[str, Any]:
"""执行 DAG 编排的工具调用"""
# 构建入度表和邻接表
indeg = {n.name: len(n.depends_on) for n in nodes}
name_map = {n.name: n for n in nodes}
dependents: dict[str, list[str]] = {}
for n in nodes:
for dep in n.depends_on:
if dep not in dependents:
dependents[dep] = []
dependents[dep].append(n.name)
# 拓扑排序 + 执行
queue = [n.name for n in nodes if indeg[n.name] == 0]
results: dict[str, Any] = {}
while queue:
current = queue.pop(0)
node = name_map[current]
# 注入依赖的结果作为参数
exec_args = dict(node.arguments)
for dep in node.depends_on:
if dep in results:
exec_args[f"_{dep}_result"] = results[dep].get("content", "")
try:
result = self.engine.execute(current, exec_args)
results[current] = result
except Exception as e:
# 尝试 fallback
if node.fallback_name:
try:
fb_args = node.fallback_args or {}
result = self.engine.execute(node.fallback_name, fb_args)
results[current] = result
except Exception as fb_e:
results[current] = {"error": f"primary/fallback both failed: {e}, {fb_e}"}
else:
results[current] = {"error": str(e)}
# 更新依赖关系
if current in dependents:
for dep in dependents[current]:
indeg[dep] -= 1
if indeg[dep] == 0:
queue.append(dep)
return results
编排示例:
# 搜索某个技术主题,然后进行数据计算
dag_nodes = [
ToolCallNode(
name="web_search",
arguments={"query": "2026年6月DeepSeek最新动态", "count": 3},
depends_on=[],
),
ToolCallNode(
name="calculator",
arguments={"expression": "1024 * 768"},
depends_on=["web_search"], # 先搜索
fallback_name="web_search",
fallback_args={"query": "DeepSeek 2026"},
),
]
六、工具检查点与持久化
在生产环境中,工具执行可能耗时较长(如 RAG 检索、API 调用),需要支持进度检查和断点恢复。
import time
import pickle
from pathlib import Path
class ToolCheckpoint:
"""
工具检查点:持久化工具执行状态。
支持:保存进度、恢复断点、查询状态。
"""
def __init__(self, checkpoint_dir: str = "/tmp/tool_checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
def save_checkpoint(self, session_id: str, state: dict):
"""保存会话执行状态"""
path = self.checkpoint_dir / f"{session_id}.ckpt"
state["_timestamp"] = time.time()
with open(path, "wb") as f:
pickle.dump(state, f)
def load_checkpoint(self, session_id: str) -> dict | None:
"""恢复会话执行状态"""
path = self.checkpoint_dir / f"{session_id}.ckpt"
if path.exists():
with open(path, "rb") as f:
return pickle.load(f)
return None
def clear_checkpoint(self, session_id: str):
"""清理检查点(执行完成后)"""
path = self.checkpoint_dir / f"{session_id}.ckpt"
if path.exists():
path.unlink()
七、华为云 MaaS 集成
华为云 MaaS(ModelArts as a Service)提供了兼容 OpenAI 的推理 API,支持 Function Calling。以下代码展示如何将我们的工具注册到华为云 MaaS 调用中。
import httpx
class HuaweiMaaSClient:
"""
华为云 MaaS 推理服务客户端。
原生支持 Function Calling。
"""
def __init__(
self,
endpoint: str,
api_key: str,
model: str = "DeepSeek-R1",
):
self.endpoint = endpoint.rstrip("/")
self.api_key = api_key
self.model = model
def chat_completion(
self,
messages: list[dict],
tools: list[dict] | None = None,
temperature: float = 0.7,
max_tokens: int = 4096,
) -> dict:
"""
调用华为云 MaaS 推理接口,支持 tools 参数。
"""
url = f"{self.endpoint}/v1/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
}
if tools:
payload["tools"] = tools
with httpx.Client(timeout=60) as client:
response = client.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
def chat_with_tools(
self,
query: str,
engine: FunctionCallingEngine,
max_turns: int = 5,
) -> list[dict]:
"""
完整的工具调用循环:
1. 用户提问
2. 模型决定是否调用工具
3. 引擎执行工具并返回结果
4. 模型基于结果生成最终回答
"""
messages = [{"role": "user", "content": query}]
tools_meta = engine.register.to_openai_tools()
for turn in range(max_turns):
# 调用 MaaS
response = self.chat_completion(messages, tools=tools_meta if tools_meta else None)
assistant_msg = response["choices"][0]["message"]
# 检查模型是否调用了工具
if tool_calls := assistant_msg.get("tool_calls"):
messages.append({
"role": "assistant",
"content": assistant_msg.get("content") or "",
"tool_calls": [
{
"id": tc.get("id"),
"type": "function",
"function": tc["function"],
}
for tc in tool_calls
],
})
# 执行每一个工具
for tc in tool_calls:
tool_name = tc["function"]["name"]
tool_args = tc["function"]["arguments"]
result = engine.execute(tool_name, tool_args)
messages.append(result)
continue # 继续让模型基于工具结果回复
# 模型直接回复,不需要调用工具
final_content = assistant_msg.get("content", "")
messages.append({
"role": "assistant",
"content": final_content,
})
return messages
return messages
使用示例:
# 初始化引擎
engine = FunctionCallingEngine(register)
# 配置华为云 MaaS 客户端
client = HuaweiMaaSClient(
endpoint="https://maas-deepseek.cn-north-4.myhuaweicloud.com",
api_key="your-api-key",
model="DeepSeek-R1",
)
# 执行一次完整对话
result = client.chat_with_tools(
query="搜索一下最近Dify的更新动态,并计算DeepSeek V3与R1的参数量差异倍数",
engine=engine,
)
print(result[-1]["content"])
八、安全性设计
Function Calling 引擎执行外部工具时,需要关注以下安全风险:
8.1 参数注入防护
模型可能被 prompt injection 诱导,生成恶意入参。需要在校验层增加白名单过滤:
class SafeValidator(CompiledValidator):
"""安全增强型校验器,拦截危险参数"""
DANGEROUS_PATTERNS = [
"__import__", "os.system", "subprocess",
"exec(", "eval(", "open(",
"rm -rf", "delet", "drop table",
]
def validate(self, name: str, schema: dict, arguments: dict | str) -> dict:
validated = super().validate(name, schema, arguments)
# 检查所有字符串参数
for key, value in validated.items():
if isinstance(value, str):
for pattern in self.DANGEROUS_PATTERNS:
if pattern.lower() in value.lower():
raise ValueError(f"参数 '{key}' 包含危险内容: {pattern}")
return validated
8.2 工具调用频率限制
防止 agent 在循环中反复调用同一工具:
import time
from collections import defaultdict
class RateLimitedEngine(FunctionCallingEngine):
"""带频率限制的执行引擎"""
def __init__(self, register: ToolRegister, max_calls: int = 10, window_seconds: int = 60):
super().__init__(register)
self.max_calls = max_calls
self.window_seconds = window_seconds
self.call_history: dict[str, list[float]] = defaultdict(list)
def execute(self, tool_name: str, arguments: dict | str) -> dict:
now = time.time()
recent = self.call_history[tool_name]
# 清理过期记录
recent[:] = [t for t in recent if now - t < self.window_seconds]
if len(recent) >= self.max_calls:
raise ValueError(
f"工具 '{tool_name}' 调用频率超限:"
f"最近 {self.window_seconds} 秒内已调用 {len(recent)} 次"
)
recent.append(now)
return super().execute(tool_name, arguments)
8.3 超时控制
网络工具可能耗时过长,需要超时保护:
import threading
class TimeoutEngine(FunctionCallingEngine):
"""带超时控制的执行引擎"""
def __init__(self, register: ToolRegister, default_timeout: int = 30):
super().__init__(register)
self.default_timeout = default_timeout
def execute(self, tool_name: str, arguments: dict | str, timeout: int | None = None) -> dict:
timeout = timeout or self.default_timeout
result_container = []
exception_container = []
def worker():
try:
result_container.append(super().execute(tool_name, arguments))
except Exception as e:
exception_container.append(e)
thread = threading.Thread(target=worker, daemon=True)
thread.start()
thread.join(timeout=timeout)
if thread.is_alive():
return {
"role": "tool",
"tool_call_id": f"call_{tool_name}",
"name": tool_name,
"content": f"执行超时:工具 '{tool_name}' 超过 {timeout} 秒未返回",
}
if exception_container:
raise exception_container[0]
return result_container[0]
九、性能优化
8.1 工具缓存
高频重复参数的工具调用(如 get_current_time)可以缓存结果。
from functools import lru_cache
import hashlib
class CachedEngine(FunctionCallingEngine):
"""带缓存的执行引擎"""
def __init__(self, register: ToolRegister, cache_size: int = 128):
super().__init__(register)
self._cache = {}
self.cache_size = cache_size
def _cache_key(self, tool_name: str, arguments: dict) -> str:
raw = f"{tool_name}:{json.dumps(arguments, sort_keys=True)}"
return hashlib.md5(raw.encode()).hexdigest()
def execute(self, tool_name: str, arguments: dict | str) -> dict:
if isinstance(arguments, str):
parsed_args = json.loads(arguments) if isinstance(arguments, str) else arguments
else:
parsed_args = arguments
key = self._cache_key(tool_name, parsed_args)
if key in self._cache:
return self._cache[key]
result = super().execute(tool_name, arguments)
# 裁剪缓存大小
if len(self._cache) >= self.cache_size:
self._cache.pop(next(iter(self._cache)))
self._cache[key] = result
return result
8.2 Schema 预编译
每次执行都动态创建 Pydantic 模型效率低。Schema 预编译后缓存重用,可提升约 40% 的执行速度。
class CompiledValidator(SchemaValidator):
"""预编译 Schema 的校验器"""
def __init__(self):
super().__init__()
self._model_cache: dict[str, type[BaseModel]] = {}
def validate(self, name: str, schema: dict, arguments: dict | str) -> dict:
# 缓存 Model 类
schema_key = json.dumps(schema, sort_keys=True)
if schema_key not in self._model_cache:
self._model_cache[schema_key] = self._json_schema_to_pydantic(name, schema)
model = self._model_cache[schema_key]
if isinstance(arguments, str):
arguments = json.loads(arguments)
try:
validated = model(**arguments)
return validated.model_dump()
except ValidationError as e:
raise ValueError(f"参数校验失败: {e.errors()}")
九、完整示例:新闻追踪 Agent
以下是一个完整的运行示例,展示多个工具如何协作。
import asyncio
# 1. 注册工具
tool_register = ToolRegister()
@tool_register.register(
name="get_news_headlines",
description="获取指定领域的最新新闻标题列表",
parameters={
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "新闻主题,如 AI / 科技 / 财经",
"enum": ["AI", "科技", "财经", "医疗"],
},
"date_range": {
"type": "string",
"description": "时间范围:today / this_week",
}
},
"required": ["topic"],
}
)
def get_news_headlines(topic: str, date_range: str = "today") -> str:
"""获取新闻标题(模拟实现)"""
mock_news = {
"AI": [
"DeepSeek R1 发布新版本,推理效率提升 40%",
"OpenAI 更新 GPT-5 函数调用规范",
"MCP 协议被多家云厂商采纳为标准",
],
"科技": [
"华为昇腾 910C 芯片量产加速",
"RISC-V 生态取得关键突破",
],
}
headlines = mock_news.get(topic, [f"暂无 {topic} 领域新闻"])
return "\n".join(f"- {h}" for h in headlines)
@tool_register.register(
name="summarize_text",
description="对文本进行摘要总结",
parameters={
"type": "object",
"properties": {
"text": {"type": "string", "description": "待总结的文本"},
"max_length": {"type": "integer", "description": "摘要最大字数"},
},
"required": ["text"],
}
)
def summarize_text(text: str, max_length: int = 100) -> str:
"""简易文本摘要(截取前 max_length 字)"""
return text[:max_length] + ("..." if len(text) > max_length else "")
# 2. 初始化引擎
engine = FunctionCallingEngine(tool_register)
# 3. 批量执行测试
print("=== 工具列表 ===")
for tool in tool_register.list_tools():
print(f" {tool.name}: {tool.description}")
print("\n=== 执行测试 1: 获取 AI 新闻并摘要 ===")
result1 = engine.execute(
"get_news_headlines",
{"topic": "AI", "date_range": "today"},
)
print(result1["content"])
print("\n=== 执行测试 2: 计算器 ===")
result2 = engine.execute(
"calculator",
{"expression": "(2048 + 512) * 8"},
)
print(result2["content"])
结语
本文从零构建了一个完整的 Function Calling 引擎,覆盖了:
| 模块 | 核心能力 |
|---|---|
| 工具注册与发现 | 装饰器注册 + OpenAI 兼容格式 |
| Schema 校验 | JSON Schema → Pydantic 强类型验证 |
| 执行引擎 | 同步/流式/异步并行三种模式 |
| DAG 编排 | 拓扑排序执行 + 失败回退 |
| 检查点 | 持久化/恢复执行状态 |
| 华为云集成 | 兼容 MaaS 推理 API 的工具调用循环 |
| 性能优化 | 结果缓存 + Schema 预编译 |
Function Calling 是 Agent 系统和外部世界交互的核心桥梁。理解了它的底层原理,你就能:
- 根据业务需求灵活定制工具调用策略
- 深度优化系统性能
- 适配各类推理服务的 tool calling 接口
- 构建复杂的多工具协作链路
📚 延伸阅读
如果你对 DeepSeek 的实战用法感兴趣,推荐阅读我的另一篇文章:
👉 DeepSeek 实战指南:提示词工程、API 集成与效率提升全攻略
这篇文章系统地拆解了 DeepSeek 的提示词工程技巧、API 封装方法以及日常效率提升场景,全文代码可直接运行,适合已经上手 DeepSeek 但希望更高效使用的开发者。
本文是"手写 AI 系统"系列文章之一。该系列从零实现 AI 系统中的关键组件,涵盖 RAG、Agent、Function Calling、MCP 等核心技术,帮助你深入理解底层原理,构建属于自己的 AI 工具。
更多推荐


所有评论(0)