MCP 协议深入解析:构建生产级 AI Agent 工具链
·
系列目录:本文是「AI 应用开发进阶实战」系列的第 2 篇。上一篇我们构建了生产级 RAG 系统,本篇聚焦 AI Agent 的核心能力——工具调用,深入 MCP 协议。
一、为什么 MCP 是 AI Agent 的基础设施?
1.1 Agent 工具调用的痛点
AI Agent 要真正"干活",必须能调用外部工具:读文件、调 API、查数据库、发邮件……传统做法是每个 LLM 平台定义一套 Function Calling 格式,导致:
- 碎片化:OpenAI Functions、Anthropic Tool Use、Gemini Function Calling 各有各的格式
- 耦合重:工具实现和 Agent 代码深度耦合,换模型就得改工具定义
- 不可复用:写的文件读取工具只能在当前应用里用,无法跨项目共享
1.2 MCP 的解决方案
MCP(Model Context Protocol)是 Anthropic 推出的开放协议,定义了 AI 应用与外部工具/数据源之间的标准通信方式。
传统方案:每个 Agent 各写一套工具
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent A │ │ Agent B │ │ Agent C │
│ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │
│ │工具集│ │ │ │工具集│ │ │ │工具集│ │
│ └─────┘ │ │ └─────┘ │ │ └─────┘ │
└─────────┘ └─────────┘ └─────────┘
❌ 每个 Agent 独立实现,无法共享
MCP 方案:标准化工具服务器
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Agent A │ │ Agent B │ │ Agent C │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└──────────────┼──────────────┘
│ MCP 协议 (JSON-RPC)
┌──────────────┼──────────────┐
│ │ │
┌────┴────┐ ┌────┴────┐ ┌────┴────┐
│ 文件服务 │ │ API 服务 │ │ 数据库 │
│ MCP Srv │ │ MCP Srv │ │ MCP Srv │
└─────────┘ └─────────┘ └─────────┘
✅ 一次实现,所有 Agent 共享
二、MCP 协议核心概念
2.1 三大原语
# MCP 协议定义了三种核心能力:
1. Resources(资源)
# 暴露数据给模型,类似 GET 端点
# 例如:file://documents/report.pdf、postgres://users/table
2. Tools(工具)
# 模型可以调用的函数,类似 POST 端点
# 例如:create_file()、send_email()、search_database()
3. Prompts(提示模板)
# 预定义的交互模板
# 例如:"代码审查"模板、"写周报"模板
2.2 通信协议:JSON-RPC 2.0
MCP 基于 JSON-RPC 2.0,所有通信都是 JSON 格式的请求-响应:
// 请求:列出所有可用工具
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}
// 响应:返回工具列表
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"tools": [
{
"name": "read_file",
"description": "Read contents of a file",
"inputSchema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"}
},
"required": ["path"]
}
}
]
}
}
// 请求:调用工具
{
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "read_file",
"arguments": {"path": "/home/user/notes.txt"}
}
}
2.3 两种传输方式
| 传输方式 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| stdio | 标准输入输出 | 简单,无需网络 | 只能本地 | 本地 Agent、CLI 工具 |
| SSE (HTTP) | Server-Sent Events | 远程访问,可共享 | 需要网络部署 | 团队共享、云端 Agent |
三、构建第一个 MCP Server
3.1 项目结构
my-mcp-server/
├── server.py # MCP 服务器主逻辑
├── tools/
│ ├── __init__.py
│ ├── file_tools.py # 文件操作工具
│ └── web_tools.py # 网络请求工具
├── requirements.txt
└── README.md
3.2 核心 Server 实现
# server.py
import json
import sys
import asyncio
from typing import Any, Dict
from tools.file_tools import FileTools
from tools.web_tools import WebTools
class MCPServer:
"""MCP 协议服务器——stdio 传输"""
def __init__(self):
self.tools: Dict[str, Any] = {}
self._register_tools()
def _register_tools(self):
"""注册所有工具"""
file_tools = FileTools()
web_tools = WebTools()
# 注册文件工具
self.register_tool("read_file", file_tools.read_file, {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute or relative file path"
}
},
"required": ["path"]
})
self.register_tool("write_file", file_tools.write_file, {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path to write"},
"content": {"type": "string", "description": "Content to write"}
},
"required": ["path", "content"]
})
self.register_tool("list_directory", file_tools.list_directory, {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Directory path"},
"pattern": {"type": "string", "description": "Optional glob pattern"}
},
"required": ["path"]
})
# 注册网络工具
self.register_tool("http_get", web_tools.http_get, {
"type": "object",
"properties": {
"url": {"type": "string", "description": "Request URL"},
"headers": {"type": "object", "description": "Optional headers"}
},
"required": ["url"]
})
self.register_tool("http_post", web_tools.http_post, {
"type": "object",
"properties": {
"url": {"type": "string"},
"body": {"type": "object"},
"headers": {"type": "object"}
},
"required": ["url", "body"]
})
def register_tool(self, name: str, handler, schema: dict):
"""注册单个工具"""
self.tools[name] = {
"handler": handler,
"schema": schema
}
def handle_request(self, request: dict) -> dict:
"""处理 JSON-RPC 请求"""
method = request.get("method", "")
req_id = request.get("id")
try:
if method == "tools/list":
result = self._handle_list_tools()
elif method == "tools/call":
result = self._handle_call_tool(request.get("params", {}))
elif method == "initialize":
result = self._handle_initialize(request.get("params", {}))
else:
return self._error(req_id, -32601, f"Method not found: {method}")
return {
"jsonrpc": "2.0",
"id": req_id,
"result": result
}
except Exception as e:
return self._error(req_id, -32000, str(e))
def _handle_initialize(self, params: dict) -> dict:
"""初始化握手"""
return {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "my-mcp-server",
"version": "1.0.0"
}
}
def _handle_list_tools(self) -> dict:
"""返回所有可用工具"""
tools = []
for name, tool_info in self.tools.items():
tools.append({
"name": name,
"description": tool_info["handler"].__doc__ or "",
"inputSchema": tool_info["schema"]
})
return {"tools": tools}
def _handle_call_tool(self, params: dict) -> dict:
"""调用指定工具"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name not in self.tools:
raise ValueError(f"Unknown tool: {tool_name}")
handler = self.tools[tool_name]["handler"]
# 参数校验
schema = self.tools[tool_name]["schema"]
self._validate_args(arguments, schema)
# 调用工具
result = handler(**arguments)
# 统一包装返回值
return {
"content": [{
"type": "text",
"text": json.dumps(result, ensure_ascii=False, indent=2)
}]
}
def _validate_args(self, args: dict, schema: dict):
"""简单参数校验"""
for required in schema.get("required", []):
if required not in args:
raise ValueError(f"Missing required parameter: {required}")
def _error(self, req_id, code: int, message: str) -> dict:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": code, "message": message}
}
async def run_stdio(self):
"""stdio 模式运行——读取 stdin,写入 stdout"""
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
# 从 stdin 逐行读取 JSON-RPC 请求
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
response = self.handle_request(request)
# 写入 stdout
sys.stdout.write(json.dumps(response) + "\n")
sys.stdout.flush()
except json.JSONDecodeError as e:
error_resp = self._error(None, -32700, f"Parse error: {e}")
sys.stdout.write(json.dumps(error_resp) + "\n")
sys.stdout.flush()
# 工具实现
class FileTools:
"""文件系统操作工具集"""
def read_file(self, path: str) -> dict:
"""读取文件内容"""
import os
if not os.path.exists(path):
return {"error": f"File not found: {path}"}
with open(path, "r", encoding="utf-8") as f:
content = f.read()
return {
"path": path,
"size": len(content),
"lines": content.count("\n") + 1,
"content": content[:10000] # 限制最大返回 10K 字符
}
def write_file(self, path: str, content: str) -> dict:
"""写入文件"""
import os
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return {
"path": path,
"written": len(content),
"success": True
}
def list_directory(self, path: str, pattern: str = "*") -> dict:
"""列出目录内容"""
import os, fnmatch
if not os.path.isdir(path):
return {"error": f"Not a directory: {path}"}
items = []
for name in os.listdir(path):
if fnmatch.fnmatch(name, pattern):
full_path = os.path.join(path, name)
items.append({
"name": name,
"type": "directory" if os.path.isdir(full_path) else "file",
"size": os.path.getsize(full_path) if os.path.isfile(full_path) else 0
})
return {"path": path, "items": items, "count": len(items)}
class WebTools:
"""网络请求工具集"""
def http_get(self, url: str, headers: dict = None) -> dict:
"""发送 HTTP GET 请求"""
import urllib.request, json as _json
req = urllib.request.Request(url)
if headers:
for k, v in headers.items():
req.add_header(k, v)
req.add_header("User-Agent", "MCP-Server/1.0")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
body = resp.read().decode("utf-8")
return {
"status": resp.status,
"headers": dict(resp.headers),
"body": body[:5000] # 限制返回
}
except Exception as e:
return {"error": str(e)}
def http_post(self, url: str, body: dict, headers: dict = None) -> dict:
"""发送 HTTP POST 请求"""
import urllib.request, json as _json
data = _json.dumps(body).encode("utf-8")
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Content-Type", "application/json")
if headers:
for k, v in headers.items():
req.add_header(k, v)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
resp_body = resp.read().decode("utf-8")
return {
"status": resp.status,
"body": resp_body[:5000]
}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
server = MCPServer()
asyncio.run(server.run_stdio())
3.3 测试 MCP Server
# test_server.py
import json
import subprocess
import sys
def test_mcp_server():
"""模拟 Agent 客户端测试 MCP Server"""
# 启动 server 作为子进程
proc = subprocess.Popen(
[sys.executable, "server.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True
)
def send_request(method: str, params: dict = None) -> dict:
"""发送 JSON-RPC 请求并获取响应"""
request = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params or {}
}
proc.stdin.write(json.dumps(request) + "\n")
proc.stdin.flush()
return json.loads(proc.stdout.readline())
# 1. 初始化
init_resp = send_request("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {}
})
print("初始化:", json.dumps(init_resp, indent=2, ensure_ascii=False))
# 2. 列出工具
tools_resp = send_request("tools/list")
print(f"\n可用工具: {len(tools_resp['result']['tools'])} 个")
for tool in tools_resp['result']['tools']:
print(f" - {tool['name']}: {tool['description']}")
# 3. 调用 read_file
read_resp = send_request("tools/call", {
"name": "read_file",
"arguments": {"path": __file__} # 读取自身
})
print(f"\n读取文件: {read_resp['result']['content'][0]['text'][:200]}...")
# 4. 调用 write_file
write_resp = send_request("tools/call", {
"name": "write_file",
"arguments": {
"path": "/tmp/test_mcp.txt",
"content": "Hello from MCP Server!"
}
})
print(f"\n写入文件: {write_resp['result']}")
proc.terminate()
print("\n所有测试通过!")
if __name__ == "__main__":
test_mcp_server()
# 输出示例:
# 初始化: {"jsonrpc": "2.0", "id": 1, "result": {"protocolVersion": "2024-11-05", ...}}
#
# 可用工具: 5 个
# - read_file: 读取文件内容
# - write_file: 写入文件
# - list_directory: 列出目录内容
# - http_get: 发送 HTTP GET 请求
# - http_post: 发送 HTTP POST 请求
#
# 读取文件: {"path": "...", "size": 1234, "content": "..."}...
# 写入文件: ...
# 所有测试通过!
四、进阶:SSE 远程 MCP Server
线上部署时,stdio 不够用——用 SSE (Server-Sent Events) 构建可通过 HTTP 访问的 MCP Server。
# server_sse.py
import json
import asyncio
from aiohttp import web
from server import MCPServer
class MCPSSEServer:
"""MCP Server with SSE transport"""
def __init__(self, host="0.0.0.0", port=8080):
self.mcp = MCPServer()
self.host = host
self.port = port
self.sessions: dict = {} # session_id -> queue
async def handle_sse(self, request):
"""SSE 连接端点"""
session_id = request.query.get("session", "default")
response = web.StreamResponse()
response.headers["Content-Type"] = "text/event-stream"
response.headers["Cache-Control"] = "no-cache"
response.headers["Connection"] = "keep-alive"
await response.prepare(request)
queue = asyncio.Queue()
self.sessions[session_id] = queue
# 发送 endpoint 事件
endpoint = f"/message?session={session_id}"
await response.write(
f"event: endpoint\ndata: {endpoint}\n\n".encode()
)
try:
while True:
message = await queue.get()
await response.write(
f"data: {json.dumps(message)}\n\n".encode()
)
except asyncio.CancelledError:
pass
finally:
self.sessions.pop(session_id, None)
return response
async def handle_message(self, request):
"""JSON-RPC 消息端点"""
session_id = request.query.get("session", "default")
body = await request.json()
# 处理请求
response = self.mcp.handle_request(body)
# 通过 SSE 推送响应
if session_id in self.sessions:
await self.sessions[session_id].put(response)
return web.Response(status=202) # Accepted
async def start(self):
"""启动服务器"""
app = web.Application()
app.router.add_get("/sse", self.handle_sse)
app.router.add_post("/message", self.handle_message)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port)
await site.start()
print(f"MCP SSE Server running at http://{self.host}:{self.port}")
print(f"SSE endpoint: http://{self.host}:{self.port}/sse")
# Keep running
await asyncio.Event().wait()
if __name__ == "__main__":
server = MCPSSEServer(port=8080)
asyncio.run(server.start())
五、安全最佳实践
MCP Server 让 Agent 有了操作系统的能力,安全是第一要务。
class SecureMCPServer(MCPServer):
"""带安全限制的 MCP Server"""
def __init__(self, allowed_paths: list = None):
super().__init__()
self.allowed_paths = allowed_paths or ["/home/user/workspace"]
self.blocked_commands = ["rm -rf", "format", "shutdown"]
def _handle_call_tool(self, params: dict) -> dict:
"""加入安全检查"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
# 1. 文件路径白名单
if "path" in arguments:
if not self._is_path_allowed(arguments["path"]):
raise ValueError(
f"Access denied: {arguments['path']} is outside allowed paths"
)
# 2. URL 白名单(可选)
if "url" in arguments:
if not self._is_url_allowed(arguments["url"]):
raise ValueError(
f"Access denied: {arguments['url']} is not in allowed domains"
)
# 3. 操作审计日志
self._audit_log(tool_name, arguments)
# 4. 速率限制(每个工具 10次/分钟)
if not self._check_rate_limit(tool_name):
raise ValueError(f"Rate limit exceeded for tool: {tool_name}")
return super()._handle_call_tool(params)
def _is_path_allowed(self, path: str) -> bool:
import os
real_path = os.path.realpath(os.path.expanduser(path))
return any(
real_path.startswith(os.path.realpath(allowed))
for allowed in self.allowed_paths
)
def _is_url_allowed(self, url: str) -> bool:
from urllib.parse import urlparse
allowed_domains = ["api.example.com", "docs.internal.company.com"]
hostname = urlparse(url).hostname or ""
return any(hostname.endswith(d) for d in allowed_domains)
def _audit_log(self, tool: str, args: dict):
import logging, time
logging.info(
f"[AUDIT] {time.time()} | tool={tool} | args={json.dumps(args)}"
)
def _check_rate_limit(self, tool: str) -> bool:
# 简单的内存速率限制
import time
now = time.time()
key = f"rate:{tool}"
if not hasattr(self, "_rate_limits"):
self._rate_limits = {}
if key not in self._rate_limits:
self._rate_limits[key] = []
# 清理 60 秒前的记录
self._rate_limits[key] = [
t for t in self._rate_limits[key]
if now - t < 60
]
if len(self._rate_limits[key]) >= 10:
return False
self._rate_limits[key].append(now)
return True
安全清单
□ 路径白名单:限制文件访问范围
□ 域名白名单:限制网络请求目标
□ 命令过滤:禁止危险操作(rm -rf 等)
□ 审计日志:记录所有工具调用
□ 速率限制:防止滥用(10次/分钟/工具)
□ 参数校验:严格验证所有输入
□ 超时控制:每个工具调用最多 30s
□ 用户确认:危险操作(删除/覆盖)需要额外确认
六、在 Agent 中使用 MCP Server
# agent_client.py
import json
import subprocess
import sys
from typing import List, Dict
class MCPClient:
"""AI Agent 的 MCP 客户端"""
def __init__(self, server_command: List[str]):
"""启动 MCP Server 子进程"""
self.proc = subprocess.Popen(
server_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
bufsize=1
)
self.tools: Dict = {}
self._initialize()
self._discover_tools()
def _send(self, method: str, params: dict = None) -> dict:
"""发送 JSON-RPC 请求"""
request = {
"jsonrpc": "2.0",
"id": self._next_id(),
"method": method,
"params": params or {}
}
self.proc.stdin.write(json.dumps(request) + "\n")
self.proc.stdin.flush()
return json.loads(self.proc.stdout.readline())
def _next_id(self):
if not hasattr(self, "_id_counter"):
self._id_counter = 0
self._id_counter += 1
return self._id_counter
def _initialize(self):
"""握手初始化"""
resp = self._send("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {}
})
print(f"Connected to MCP Server: {resp['result']['serverInfo']}")
def _discover_tools(self):
"""发现可用工具"""
resp = self._send("tools/list")
self.tools = {
tool["name"]: tool
for tool in resp["result"]["tools"]
}
print(f"Discovered {len(self.tools)} tools")
def call_tool(self, name: str, **kwargs) -> dict:
"""调用工具"""
resp = self._send("tools/call", {
"name": name,
"arguments": kwargs
})
if "error" in resp:
raise RuntimeError(resp["error"]["message"])
# 解析返回内容
content = resp["result"]["content"]
if content and content[0]["type"] == "text":
return json.loads(content[0]["text"])
return content
def get_tool_schemas_for_llm(self) -> list:
"""转换为 OpenAI Function Calling 格式"""
schemas = []
for name, tool in self.tools.items():
schemas.append({
"type": "function",
"function": {
"name": name,
"description": tool.get("description", ""),
"parameters": tool.get("inputSchema", {})
}
})
return schemas
def close(self):
self.proc.terminate()
# === 使用 MCP Client ===
if __name__ == "__main__":
from openai import OpenAI
client = OpenAI(api_key="sk-xxx")
mcp = MCPClient([sys.executable, "server.py"])
# 获取工具定义,传给 LLM
tools = mcp.get_tool_schemas_for_llm()
messages = [
{"role": "system", "content": "你可以使用工具操作文件和发送网络请求。"},
{"role": "user", "content": "读取 /tmp/test.txt 的内容,然后将其中的域名替换为 example.com 后写回"}
]
# LLM 决定调用工具
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
tools=tools
)
# 执行工具调用
msg = response.choices[0].message
if msg.tool_calls:
for tc in msg.tool_calls:
args = json.loads(tc.function.arguments)
result = mcp.call_tool(tc.function.name, **args)
print(f"Tool: {tc.function.name}({args})")
print(f"Result: {json.dumps(result, ensure_ascii=False, indent=2)}")
mcp.close()
七、总结与下一篇预告
MCP 协议解决了 AI Agent 工具调用的三个核心问题:
1. 标准化 → JSON-RPC 2.0 + 统一工具描述格式
2. 解耦 → 工具实现与 Agent 代码分离,换模型不改工具
3. 可复用 → 一次编写 MCP Server,所有 Agent 共享
关键代码回顾:
MCPServer:处理 JSON-RPC 请求,注册/调用工具SecureMCPServer:路径白名单、速率限制、审计日志MCPClient:启动 Server 子进程,发现工具,转换 LLM 格式
下一篇:Graph RAG——知识图谱 + 大模型的融合推理。当文档之间存在复杂实体关系时,纯向量检索不够用,我们将用 Neo4j + LangChain 构建图增强的 RAG 系统。
本文完整代码已开源。下一篇:Graph RAG(即将发布)
更多推荐

所有评论(0)