构建智能工具服务:MCP协议与LangGraph智能体的完美结合
MCP协议是一种轻量级、通用的协议,允许语言模型通过统一接口调用各种工具和服务。它特别适合用于连接LLM与外部系统,让AI能够"使用工具"完成复杂任务。服务端:统一接口管理各种工具MCP协议:标准化工具调用机制LangGraph:智能决策和工具调度语言模型:自然语言理解和生成这种架构让AI真正具备了"使用工具"的能力,可以完成传统语言模型无法处理的复杂任务。随着工具集的扩展,AI的应用场景将无限广
引言
在现代软件开发中,微服务架构已成为构建复杂系统的标准方式。但当我们需要让语言模型(LLM)能够利用这些服务时,就需要一种标准化的协议来实现工具调用。今天,我将分享如何使用MCP(Model Control Protocol)协议构建强大的工具服务,并通过LangGraph智能体实现自然语言驱动的工作流。
什么是MCP协议?
MCP协议是一种轻量级、通用的协议,允许语言模型通过统一接口调用各种工具和服务。它特别适合用于连接LLM与外部系统,让AI能够"使用工具"完成复杂任务。
MCP的核心优势:
- 标准化接口:统一方式调用所有工具
- 异步支持:高效处理I/O密集型操作
- 服务无关性:支持多种传输协议(SSE、WebSocket等)
- 资源管理:统一管理和描述可用资源
构建MCP工具服务
让我们首先构建一个包含多种实用工具的服务端。以下是核心代码片段:
from fastmcp import FastMCP
from datetime import datetime
import httpx
import os
# 初始化服务器
mcp_server = FastMCP("增强型工具服务器")
# === 文件管理系统 ===
@mcp_server.resource(
uri="file://data/files",
name="file_manager",
description="文件管理系统资源",
mime_type="application/json"
)
class FileManager:
"""文件资源管理器,支持读写操作"""
# 文件读写实现...
# === API工具 ===
@mcp_server.tool()
async def get_weather(city: str) -> Dict[str, Any]:
"""获取指定城市的天气信息"""
# 天气API集成...
@mcp_server.tool()
async def currency_converter(amount: float, from_currency: str, to_currency: str) -> Dict[str, Any]:
"""货币转换工具"""
# 汇率API集成...
# === 计算工具 ===
@mcp_server.tool()
def calculate_sum(a: int, b: int) -> Dict[str, Any]:
"""计算两个整数的和"""
return {"result": a + b, "timestamp": datetime.now().isoformat()}
# === 文本处理 ===
@mcp_server.tool()
def text_analysis(text: str) -> Dict[str, Any]:
"""文本分析工具(字数统计)"""
words = len(text.split())
chars = len(text)
sentences = text.count('.') + text.count('!') + text.count('?')
return {"word_count": words, "character_count": chars, "sentence_count": sentences}
# === 状态管理 ===
@mcp_server.resource(
uri="app://state",
name="state_manager",
description="应用程序状态管理器",
mime_type="application/json"
)
class StateManager:
"""状态管理器,用于存储应用状态"""
# 状态管理实现...
# === 启动服务器 ===
if __name__ == "__main__":
os.makedirs("data/files", exist_ok=True) # 确保数据目录存在
mcp_server.run(transport="sse", host="0.0.0.0", port=8000)
这个服务端提供了多种实用工具:
- 文件管理系统:支持文件读写和目录管理
- API工具:天气查询和货币转换
- 计算工具:数学运算和BMI计算
- 文本分析:字数统计和语法分析
- 状态管理:会话状态存储
测试工具服务
在使用智能体之前,我们应该先验证工具服务是否正常工作:
import asyncio
from fastmcp import Client
async def main():
async with Client("http://localhost:8000/sse") as client:
# 测试加法工具
sum_result = await client.call_tool("calculate_sum", {"a": 15, "b": 7})
print(f"15 + 7 = {sum_result['result']}")
# 测试文本分析
text_result = await client.call_tool("text_analysis", {"text": "你好世界!这是个测试。"})
print(f"字数统计: {text_result['word_count']} 字, {text_result['character_count']} 字符")
if __name__ == "__main__":
asyncio.run(main())
输出示例:
15 + 7 = 22
字数统计: 4 字, 15 字符
LangGraph智能体集成
现在是最精彩的部分:通过LangGraph让语言模型智能地使用这些工具!
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
import asyncio
# 初始化本地语言模型
llm = ChatOpenAI(
openai_api_key="EMPTY",
openai_api_base="http://localhost:1234/v1",
model_name="qwen3-0.6b",
temperature=0.7,
max_tokens=5120
)
async def main():
# 创建多服务客户端
client = MultiServerMCPClient({
"calculate_sum": {"url": "http://localhost:8000/sse"},
"text_analysis": {"url": "http://localhost:8000/sse"},
"file_manager": {"url": "http://localhost:8000/sse"}
})
# 获取所有可用工具
tools = await client.get_tools()
# 创建反应式智能体
agent = create_react_agent(llm, tools)
# 复杂任务:计算大数之和并分析结果
response = await agent.ainvoke({
"messages": [{
"role": "user",
"content": "计算1111111111+2的结果,并将结果保存到output.txt文件中"
}]
})
print(response)
if __name__ == "__main__":
asyncio.run(main())
在这个智能体工作流程中:
- 用户提出自然语言请求:“计算1111111111+2的结果,并将结果保存到output.txt文件中”
- LangGraph智能体分析需求,决定调用工具的顺序
- 首先调用
calculate_sum
进行加法计算 - 然后调用
file_manager
将结果写入文件 - 智能体生成最终响应,包含任务完成确认
实际应用场景
这种架构在各种场景下都非常有用:
-
数据分析工作流
- 查询数据库 -> 计算结果 -> 生成可视化 -> 保存报告
-
自动化办公
- 读取邮件 -> 分析内容 -> 制作回复 -> 发送邮件
-
智能客服
- 理解问题 -> 查询知识库 -> 生成答案 -> 记录对话
-
研究助手
- 收集数据 -> 统计分析 -> 总结发现 -> 格式化报告
性能优化技巧
在生产环境中使用时,考虑以下优化:
-
异步批处理:同时处理多个工具请求
# 同时获取天气和货币信息 task1 = client.call_tool("get_weather", {"city": "Beijing"}) task2 = client.call_tool("currency_converter", {"amount": 100, "from": "USD", "to": "CNY"}) results = await asyncio.gather(task1, task2)
-
结果缓存:减少重复计算
@mcp_server.tool(cache=True, cache_duration=60) # 缓存60秒 def expensive_calculation(data: str) -> Dict: # 耗时计算...
-
服务发现:动态管理多个工具服务
# 动态添加新服务 client.register_service("new_tool", url="http://new-server:8080")
总结
通过MCP协议构建工具服务和LangGraph实现智能体调用,我们创建了强大的AI-工具协作系统:
- 服务端:统一接口管理各种工具
- MCP协议:标准化工具调用机制
- LangGraph:智能决策和工具调度
- 语言模型:自然语言理解和生成
这种架构让AI真正具备了"使用工具"的能力,可以完成传统语言模型无法处理的复杂任务。随着工具集的扩展,AI的应用场景将无限广阔!
未来展望:这种架构为构建完全自主的AI Agent奠定了基础。下一步我们将探索记忆模块和计划制定系统,实现真正具备"思考"能力的智能体。
mcp 服务
from fastmcp import FastMCP
from datetime import datetime
import aiofiles
import httpx
import os
from typing import Dict, Any
# 初始化服务器
mcp_server = FastMCP("增强型工具服务器")
# ====== 文件资源 ======
@mcp_server.resource(
uri="file://data/files",
name="file_manager",
description="文件管理系统资源",
mime_type="application/json"
)
class FileManager:
"""文件资源管理器,支持读写操作"""
async def read_file(self, file_path: str) -> Dict[str, Any]:
"""读取指定文件的内容"""
full_path = os.path.join("data/files", file_path)
if not os.path.exists(full_path):
return {"error": f"文件不存在: {file_path}"}
try:
async with aiofiles.open(full_path, "r", encoding="utf-8") as f:
content = await f.read()
return {"success": True, "content": content}
except Exception as e:
return {"error": f"读取文件失败: {str(e)}"}
async def write_file(self, file_path: str, content: str) -> Dict[str, Any]:
"""写入内容到指定文件"""
full_path = os.path.join("data/files", file_path)
# 确保目录存在
os.makedirs(os.path.dirname(full_path), exist_ok=True)
try:
async with aiofiles.open(full_path, "w", encoding="utf-8") as f:
await f.write(content)
return {"success": True, "message": f"文件已保存: {file_path}"}
except Exception as e:
return {"error": f"写入文件失败: {str(e)}"}
async def list_files(self, directory: str = "") -> Dict[str, Any]:
"""列出指定目录下的文件"""
full_dir = os.path.join("data/files", directory)
if not os.path.exists(full_dir):
return {"error": f"目录不存在: {directory}"}
files = []
for file in os.listdir(full_dir):
if os.path.isfile(os.path.join(full_dir, file)):
files.append(file)
return {"success": True, "directory": directory, "files": files}
# ====== API 工具 ======
@mcp_server.tool()
async def get_weather(city: str) -> Dict[str, Any]:
"""获取指定城市的天气信息"""
api_key = "YOUR_OPENWEATHERMAP_API_KEY" # 替换为您的API密钥
url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric&lang=zh_cn"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
if response.status_code == 200:
data = response.json()
return {
"city": city,
"temperature": data["main"]["temp"],
"description": data["weather"][0]["description"],
"humidity": data["main"]["humidity"],
"wind_speed": data["wind"]["speed"]
}
else:
return {"error": f"API错误: {response.status_code} {response.reason_phrase}"}
except Exception as e:
return {"error": f"获取天气失败: {str(e)}"}
@mcp_server.tool()
async def currency_converter(amount: float, from_currency: str, to_currency: str) -> Dict[str, Any]:
"""货币转换工具"""
try:
api_key = "YOUR_EXCHANGERATE_API_KEY" # 替换为您的API密钥
url = f"https://v6.exchangerate-api.com/v6/{api_key}/pair/{from_currency}/{to_currency}/{amount}"
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
if response.status_code == 200:
data = response.json()
if data.get("result") == "success":
return {
"from_currency": from_currency,
"to_currency": to_currency,
"amount": amount,
"converted": data["conversion_result"]
}
else:
return {"error": f"转换失败: {data.get('error-type')}"}
else:
return {"error": f"API错误: {response.status_code}"}
except Exception as e:
return {"error": f"货币转换失败: {str(e)}"}
# ====== 计算工具 ======
@mcp_server.tool()
def calculate_sum(a: int, b: int) -> Dict[str, Any]:
"""计算两个整数的和"""
return {"result": a + b, "timestamp": datetime.now().isoformat()}
@mcp_server.tool()
def calculate_product(a: int, b: int) -> Dict[str, Any]:
"""计算两个整数的乘积"""
return {"result": a * b, "timestamp": datetime.now().isoformat()}
@mcp_server.tool()
def calculate_bmi(weight: float, height: float) -> Dict[str, Any]:
"""计算身体质量指数(BMI)"""
bmi = weight / ((height / 100) ** 2)
return {"bmi": round(bmi, 2), "timestamp": datetime.now().isoformat()}
# ====== 文本处理工具 ======
@mcp_server.tool()
def text_analysis(text: str) -> Dict[str, Any]:
"""文本分析工具(字数统计)"""
words = len(text.split())
chars = len(text)
sentences = text.count('.') + text.count('!') + text.count('?')
return {
"word_count": words,
"character_count": chars,
"sentence_count": sentences
}
# ====== 系统工具 ======
@mcp_server.tool()
def get_system_time() -> Dict[str, Any]:
"""获取服务器当前时间"""
return {"system_time": datetime.now().isoformat()}
@mcp_server.tool()
def get_system_info() -> Dict[str, Any]:
"""获取系统基本信息"""
import platform
return {
"platform": platform.platform(),
"python_version": platform.python_version(),
"system_time": datetime.now().isoformat()
}
# ====== 状态管理资源 ======
@mcp_server.resource(
uri="app://state",
name="state_manager",
description="应用程序状态管理器",
mime_type="application/json"
)
class StateManager:
"""状态管理器,用于存储应用状态"""
def __init__(self):
self.state = {}
async def get_state(self, key: str = None) -> Dict[str, Any]:
"""获取状态值"""
if key:
return {key: self.state.get(key)}
else:
return self.state
async def set_state(self, key: str, value: Any) -> Dict[str, Any]:
"""设置状态值"""
self.state[key] = value
return {"success": True, "message": f"状态 {key} 已更新"}
async def remove_state(self, key: str) -> Dict[str, Any]:
"""删除状态值"""
if key in self.state:
del self.state[key]
return {"success": True, "message": f"状态 {key} 已删除"}
else:
return {"error": f"状态 {key} 不存在"}
# ====== 配置管理 ======
@mcp_server.tool()
def get_server_config() -> Dict[str, Any]:
"""获取服务器配置信息"""
return {
"server_name": "增强型工具服务器",
"supported_tools": list(mcp_server.get_tool_names()),
"supported_resources": list(mcp_server.get_resource_names())
}
# ====== 启动服务器 ======
if __name__ == "__main__":
# 确保数据目录存在
os.makedirs("data/files", exist_ok=True)
print("=" * 50)
print(f"启动 FastMCP 工具服务器")
print("=" * 50)
mcp_server.run(transport="sse", host="0.0.0.0", port=8000)
mcp 测试
import asyncio
from fastmcp import Client
async def main():
async with Client("http://localhost:8000/sse") as client:
# 1. 调用加法工具(正确访问方式)
sum_result = await client.call_tool("calculate_sum", {"a": 15, "b": 7})
print(f"加法结果: {sum_result}") # 直接访问result属性
if __name__ == "__main__":
asyncio.run(main())
llm mcp 工具调用
from langchain_mcp_adapters.client import MultiServerMCPClient # 修正导入路径
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
import asyncio
# 初始化本地模型
llm = ChatOpenAI(
openai_api_key="EMPTY",
openai_api_base="http://localhost:1234/v1",
model_name="qwen3-0.6b",
temperature=0.7,
max_tokens=5120
)
async def main():
# 1. 创建客户端实例(无需 async with)
client = MultiServerMCPClient({
"calculate_sum": {"url": "http://localhost:8000/sse", "transport": "sse"},
"text_analysis": {"url": "http://localhost:8000/sse", "transport": "sse"} # 注意:两个服务端口应不同!
})
# 2. 获取所有工具
tools = await client.get_tools()
# 3. 创建Agent
agent = create_react_agent(llm, tools)
# 4. 调用Agent
response = await agent.ainvoke({
"messages": [{"role": "user", "content": "计算1111111111+2 并统计数字的个数。"}]
})
print(response)
if __name__ == "__main__":
asyncio.run(main())
更多推荐
所有评论(0)