引言

在现代软件开发中,微服务架构已成为构建复杂系统的标准方式。但当我们需要让语言模型(LLM)能够利用这些服务时,就需要一种标准化的协议来实现工具调用。今天,我将分享如何使用MCP(Model Control Protocol)协议构建强大的工具服务,并通过LangGraph智能体实现自然语言驱动的工作流。

什么是MCP协议?

MCP协议是一种轻量级、通用的协议,允许语言模型通过统一接口调用各种工具和服务。它特别适合用于连接LLM与外部系统,让AI能够"使用工具"完成复杂任务。

MCP的核心优势:

  • 标准化接口:统一方式调用所有工具
  • 异步支持:高效处理I/O密集型操作
  • 服务无关性:支持多种传输协议(SSE、WebSocket等)
  • 资源管理:统一管理和描述可用资源

构建MCP工具服务

让我们首先构建一个包含多种实用工具的服务端。以下是核心代码片段:

from fastmcp import FastMCP
from datetime import datetime
import httpx
import os

# 初始化服务器
mcp_server = FastMCP("增强型工具服务器")

# === 文件管理系统 ===
@mcp_server.resource(
    uri="file://data/files",
    name="file_manager",
    description="文件管理系统资源",
    mime_type="application/json"
)
class FileManager:
    """文件资源管理器,支持读写操作"""
    # 文件读写实现...

# === API工具 ===
@mcp_server.tool()
async def get_weather(city: str) -> Dict[str, Any]:
    """获取指定城市的天气信息"""
    # 天气API集成...

@mcp_server.tool()
async def currency_converter(amount: float, from_currency: str, to_currency: str) -> Dict[str, Any]:
    """货币转换工具"""
    # 汇率API集成...

# === 计算工具 ===
@mcp_server.tool()
def calculate_sum(a: int, b: int) -> Dict[str, Any]:
    """计算两个整数的和"""
    return {"result": a + b, "timestamp": datetime.now().isoformat()}

# === 文本处理 ===
@mcp_server.tool()
def text_analysis(text: str) -> Dict[str, Any]:
    """文本分析工具(字数统计)"""
    words = len(text.split())
    chars = len(text)
    sentences = text.count('.') + text.count('!') + text.count('?')
    return {"word_count": words, "character_count": chars, "sentence_count": sentences}

# === 状态管理 ===
@mcp_server.resource(
    uri="app://state",
    name="state_manager",
    description="应用程序状态管理器",
    mime_type="application/json"
)
class StateManager:
    """状态管理器,用于存储应用状态"""
    # 状态管理实现...

# === 启动服务器 ===
if __name__ == "__main__":
    os.makedirs("data/files", exist_ok=True)  # 确保数据目录存在
    mcp_server.run(transport="sse", host="0.0.0.0", port=8000)

这个服务端提供了多种实用工具:

  • 文件管理系统:支持文件读写和目录管理
  • API工具:天气查询和货币转换
  • 计算工具:数学运算和BMI计算
  • 文本分析:字数统计和语法分析
  • 状态管理:会话状态存储

测试工具服务

在使用智能体之前,我们应该先验证工具服务是否正常工作:

import asyncio
from fastmcp import Client

async def main():
    async with Client("http://localhost:8000/sse") as client:
        # 测试加法工具
        sum_result = await client.call_tool("calculate_sum", {"a": 15, "b": 7})
        print(f"15 + 7 = {sum_result['result']}")
        
        # 测试文本分析
        text_result = await client.call_tool("text_analysis", {"text": "你好世界!这是个测试。"})
        print(f"字数统计: {text_result['word_count']} 字, {text_result['character_count']} 字符")

if __name__ == "__main__":
    asyncio.run(main())

输出示例:

15 + 7 = 22
字数统计: 4 字, 15 字符

LangGraph智能体集成

现在是最精彩的部分:通过LangGraph让语言模型智能地使用这些工具!

from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
import asyncio

# 初始化本地语言模型
llm = ChatOpenAI(
    openai_api_key="EMPTY",
    openai_api_base="http://localhost:1234/v1",
    model_name="qwen3-0.6b",
    temperature=0.7,
    max_tokens=5120
)

async def main():
    # 创建多服务客户端
    client = MultiServerMCPClient({
        "calculate_sum": {"url": "http://localhost:8000/sse"},
        "text_analysis": {"url": "http://localhost:8000/sse"},
        "file_manager": {"url": "http://localhost:8000/sse"}
    })
    
    # 获取所有可用工具
    tools = await client.get_tools()
    
    # 创建反应式智能体
    agent = create_react_agent(llm, tools)
    
    # 复杂任务:计算大数之和并分析结果
    response = await agent.ainvoke({
        "messages": [{
            "role": "user", 
            "content": "计算1111111111+2的结果,并将结果保存到output.txt文件中"
        }]
    })
    print(response)

if __name__ == "__main__":
    asyncio.run(main())

在这个智能体工作流程中:

  1. 用户提出自然语言请求:“计算1111111111+2的结果,并将结果保存到output.txt文件中”
  2. LangGraph智能体分析需求,决定调用工具的顺序
  3. 首先调用calculate_sum进行加法计算
  4. 然后调用file_manager将结果写入文件
  5. 智能体生成最终响应,包含任务完成确认

实际应用场景

这种架构在各种场景下都非常有用:

  1. 数据分析工作流

    • 查询数据库 -> 计算结果 -> 生成可视化 -> 保存报告
  2. 自动化办公

    • 读取邮件 -> 分析内容 -> 制作回复 -> 发送邮件
  3. 智能客服

    • 理解问题 -> 查询知识库 -> 生成答案 -> 记录对话
  4. 研究助手

    • 收集数据 -> 统计分析 -> 总结发现 -> 格式化报告

性能优化技巧

在生产环境中使用时,考虑以下优化:

  1. 异步批处理:同时处理多个工具请求

    # 同时获取天气和货币信息
    task1 = client.call_tool("get_weather", {"city": "Beijing"})
    task2 = client.call_tool("currency_converter", {"amount": 100, "from": "USD", "to": "CNY"})
    results = await asyncio.gather(task1, task2)
    
  2. 结果缓存:减少重复计算

    @mcp_server.tool(cache=True, cache_duration=60)  # 缓存60秒
    def expensive_calculation(data: str) -> Dict:
        # 耗时计算...
    
  3. 服务发现:动态管理多个工具服务

    # 动态添加新服务
    client.register_service("new_tool", url="http://new-server:8080")
    

总结

通过MCP协议构建工具服务和LangGraph实现智能体调用,我们创建了强大的AI-工具协作系统:

  1. 服务端:统一接口管理各种工具
  2. MCP协议:标准化工具调用机制
  3. LangGraph:智能决策和工具调度
  4. 语言模型:自然语言理解和生成

这种架构让AI真正具备了"使用工具"的能力,可以完成传统语言模型无法处理的复杂任务。随着工具集的扩展,AI的应用场景将无限广阔!

未来展望:这种架构为构建完全自主的AI Agent奠定了基础。下一步我们将探索记忆模块和计划制定系统,实现真正具备"思考"能力的智能体。

mcp 服务

from fastmcp import FastMCP
from datetime import datetime

import aiofiles
import httpx
import os

from typing import Dict, Any

# 初始化服务器
mcp_server = FastMCP("增强型工具服务器")


# ====== 文件资源 ======
@mcp_server.resource(
    uri="file://data/files",
    name="file_manager",
    description="文件管理系统资源",
    mime_type="application/json"
)
class FileManager:
    """文件资源管理器,支持读写操作"""

    async def read_file(self, file_path: str) -> Dict[str, Any]:
        """读取指定文件的内容"""
        full_path = os.path.join("data/files", file_path)
        if not os.path.exists(full_path):
            return {"error": f"文件不存在: {file_path}"}

        try:
            async with aiofiles.open(full_path, "r", encoding="utf-8") as f:
                content = await f.read()
                return {"success": True, "content": content}
        except Exception as e:
            return {"error": f"读取文件失败: {str(e)}"}

    async def write_file(self, file_path: str, content: str) -> Dict[str, Any]:
        """写入内容到指定文件"""
        full_path = os.path.join("data/files", file_path)

        # 确保目录存在
        os.makedirs(os.path.dirname(full_path), exist_ok=True)

        try:
            async with aiofiles.open(full_path, "w", encoding="utf-8") as f:
                await f.write(content)
                return {"success": True, "message": f"文件已保存: {file_path}"}
        except Exception as e:
            return {"error": f"写入文件失败: {str(e)}"}

    async def list_files(self, directory: str = "") -> Dict[str, Any]:
        """列出指定目录下的文件"""
        full_dir = os.path.join("data/files", directory)
        if not os.path.exists(full_dir):
            return {"error": f"目录不存在: {directory}"}

        files = []
        for file in os.listdir(full_dir):
            if os.path.isfile(os.path.join(full_dir, file)):
                files.append(file)

        return {"success": True, "directory": directory, "files": files}


# ====== API 工具 ======
@mcp_server.tool()
async def get_weather(city: str) -> Dict[str, Any]:
    """获取指定城市的天气信息"""
    api_key = "YOUR_OPENWEATHERMAP_API_KEY"  # 替换为您的API密钥
    url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric&lang=zh_cn"

    try:
        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.get(url)
            if response.status_code == 200:
                data = response.json()
                return {
                    "city": city,
                    "temperature": data["main"]["temp"],
                    "description": data["weather"][0]["description"],
                    "humidity": data["main"]["humidity"],
                    "wind_speed": data["wind"]["speed"]
                }
            else:
                return {"error": f"API错误: {response.status_code} {response.reason_phrase}"}
    except Exception as e:
        return {"error": f"获取天气失败: {str(e)}"}


@mcp_server.tool()
async def currency_converter(amount: float, from_currency: str, to_currency: str) -> Dict[str, Any]:
    """货币转换工具"""
    try:
        api_key = "YOUR_EXCHANGERATE_API_KEY"  # 替换为您的API密钥
        url = f"https://v6.exchangerate-api.com/v6/{api_key}/pair/{from_currency}/{to_currency}/{amount}"

        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.get(url)
            if response.status_code == 200:
                data = response.json()
                if data.get("result") == "success":
                    return {
                        "from_currency": from_currency,
                        "to_currency": to_currency,
                        "amount": amount,
                        "converted": data["conversion_result"]
                    }
                else:
                    return {"error": f"转换失败: {data.get('error-type')}"}
            else:
                return {"error": f"API错误: {response.status_code}"}
    except Exception as e:
        return {"error": f"货币转换失败: {str(e)}"}


# ====== 计算工具 ======
@mcp_server.tool()
def calculate_sum(a: int, b: int) -> Dict[str, Any]:
    """计算两个整数的和"""
    return {"result": a + b, "timestamp": datetime.now().isoformat()}


@mcp_server.tool()
def calculate_product(a: int, b: int) -> Dict[str, Any]:
    """计算两个整数的乘积"""
    return {"result": a * b, "timestamp": datetime.now().isoformat()}


@mcp_server.tool()
def calculate_bmi(weight: float, height: float) -> Dict[str, Any]:
    """计算身体质量指数(BMI)"""
    bmi = weight / ((height / 100) ** 2)
    return {"bmi": round(bmi, 2), "timestamp": datetime.now().isoformat()}


# ====== 文本处理工具 ======
@mcp_server.tool()
def text_analysis(text: str) -> Dict[str, Any]:
    """文本分析工具(字数统计)"""
    words = len(text.split())
    chars = len(text)
    sentences = text.count('.') + text.count('!') + text.count('?')
    return {
        "word_count": words,
        "character_count": chars,
        "sentence_count": sentences
    }


# ====== 系统工具 ======
@mcp_server.tool()
def get_system_time() -> Dict[str, Any]:
    """获取服务器当前时间"""
    return {"system_time": datetime.now().isoformat()}


@mcp_server.tool()
def get_system_info() -> Dict[str, Any]:
    """获取系统基本信息"""
    import platform
    return {
        "platform": platform.platform(),
        "python_version": platform.python_version(),
        "system_time": datetime.now().isoformat()
    }


# ====== 状态管理资源 ======
@mcp_server.resource(
    uri="app://state",
    name="state_manager",
    description="应用程序状态管理器",
    mime_type="application/json"
)
class StateManager:
    """状态管理器,用于存储应用状态"""

    def __init__(self):
        self.state = {}

    async def get_state(self, key: str = None) -> Dict[str, Any]:
        """获取状态值"""
        if key:
            return {key: self.state.get(key)}
        else:
            return self.state

    async def set_state(self, key: str, value: Any) -> Dict[str, Any]:
        """设置状态值"""
        self.state[key] = value
        return {"success": True, "message": f"状态 {key} 已更新"}

    async def remove_state(self, key: str) -> Dict[str, Any]:
        """删除状态值"""
        if key in self.state:
            del self.state[key]
            return {"success": True, "message": f"状态 {key} 已删除"}
        else:
            return {"error": f"状态 {key} 不存在"}


# ====== 配置管理 ======
@mcp_server.tool()
def get_server_config() -> Dict[str, Any]:
    """获取服务器配置信息"""
    return {
        "server_name": "增强型工具服务器",
        "supported_tools": list(mcp_server.get_tool_names()),
        "supported_resources": list(mcp_server.get_resource_names())
    }


# ====== 启动服务器 ======
if __name__ == "__main__":
    # 确保数据目录存在
    os.makedirs("data/files", exist_ok=True)

    print("=" * 50)
    print(f"启动 FastMCP 工具服务器")
    print("=" * 50)

    mcp_server.run(transport="sse", host="0.0.0.0", port=8000)

mcp 测试

import asyncio
from fastmcp import Client


async def main():
    async with Client("http://localhost:8000/sse") as client:
        # 1. 调用加法工具(正确访问方式)
        sum_result = await client.call_tool("calculate_sum", {"a": 15, "b": 7})
        print(f"加法结果: {sum_result}")  # 直接访问result属性


if __name__ == "__main__":
    asyncio.run(main())

llm mcp 工具调用

from langchain_mcp_adapters.client import MultiServerMCPClient  # 修正导入路径
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
import asyncio

# 初始化本地模型
llm = ChatOpenAI(
    openai_api_key="EMPTY",
    openai_api_base="http://localhost:1234/v1",
    model_name="qwen3-0.6b",
    temperature=0.7,
    max_tokens=5120
)


async def main():
    # 1. 创建客户端实例(无需 async with)
    client = MultiServerMCPClient({
        "calculate_sum": {"url": "http://localhost:8000/sse", "transport": "sse"},
        "text_analysis": {"url": "http://localhost:8000/sse", "transport": "sse"}  # 注意:两个服务端口应不同!
    })

    # 2. 获取所有工具
    tools = await client.get_tools()

    # 3. 创建Agent
    agent = create_react_agent(llm, tools)

    # 4. 调用Agent
    response = await agent.ainvoke({
        "messages": [{"role": "user", "content": "计算1111111111+2 并统计数字的个数。"}]
    })
    print(response)

if __name__ == "__main__":
    asyncio.run(main())
Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐