[AI]大模型MCP快速入门及智能体执行模式介绍

一、MCP入门

介绍

MCP(Model Context Protocol,模型上下文协议)是一种由Anthropic公司于2024年提出的开放标准协议,旨在为大型语言模型(LLM)提供统一接口,使其能够安全、便捷地连接外部数据源、工具和系统,从而突破传统AI模型因数据孤岛限制而无法实时获取信息的瓶颈。

在MCP协议出来之前,各个大型AI公司各自为战,大模型对外接口不统一,以至于我们开发了一个功能,比如AI操作数据库。适配了ChatGPT,但是对于Deepseek模型却不通用,导致需要重新开发一遍适配Deepseek的接口。效率极低,并且都是重复操作。

将MCP想象成一个“万能插座”或“通用翻译官”。就像不同电器通过统一的USB-C接口充电一样,MCP为所有AI模型和数据源制定了标准化的通信规则。例如,当你想让AI助手查询公司数据库时,MCP就像一位翻译官,将AI的请求转换成数据库能理解的语言,再将结果翻译回AI能处理的形式,整个过程无需开发者编写复杂的定制代码

工作流程:

用户提问 → AI Host生成请求(运行AI的程序,比如IDE,cursor等) → MCP Client转发 → MCP Server执行操作 → 返回结果 → AI生成最终回答

##基础入门

我们将手动编写一个mcp基础案例,通过调用大模型完成对本地文件的写入。
主要包含:

  • client.py:连接mcp server,并操作大模型调用server.py提供的功能。
  • server.py:对外暴露功能,供大模型调用。
  • .env:env配置文件中配置大模型API_KEY、BASE_URL、MODEL等

本案例将通过FastMCP来演示如何快速搭建一个MCP Server,并且与大模型适配上。

实战

①MCP Server搭建

这里将使用FastMCP来演示如何快速搭建mcp server。

1. 封装mcp server提供的能力为func

其实这里的步骤就类似于我们之前编写function_call的时候,将能力封装为一个函数。然后将func上添加上@mcp.tool()注解。

from mcp.server.fastmcp import FastMCP

@mcp.tool()
def write_to_txt(filename: str, content: str) -> str:
    """
    将指定内容写入文本文件并且保存到本地。
    参数:
      filename: 文件名(例如 "output.txt")
      content: 要写入的文本内容
    返回:
      写入成功或失败的提示信息
    """
    try:
        with open(filename, "w", encoding="utf-8") as f:
            f.write(content)
        return f"成功写入文件 {filename}。"
    except Exception as e:
        return f"写入文件失败:{e}"

2. 通过mcp协议暴露该能力

这里我们通过stdio传输,因为本案例我们的mcp server和mcp client都会放在一个机器上,所以通过标准的io传输数据即可。
目前mcp支持两种协议:

  1. stdio:standard input output,标准输入输出,适用于mcp client和mcp server部署在同一个机器上,不涉及跨机器。
  2. sse:Server-Sent Events,服务器发送事件,是一个基于HTTP的协议。适用于mcp client与mcp server不在同一个机器中的场景,涉及网络传输。
if __name__ == "__main__":
    # 因为我们这里mcp的server端和服务端都在同一个机器上,所以使用stdio
    mcp.run(transport='stdio')  # 默认使用 stdio 传输

全部代码

server.py

from mcp.server.fastmcp import FastMCP

mcp = FastMCP("WriterServer")


@mcp.tool()
def write_to_txt(filename: str, content: str) -> str:
    """
    将指定内容写入文本文件并且保存到本地。
    参数:
      filename: 文件名(例如 "output.txt")
      content: 要写入的文本内容
    返回:
      写入成功或失败的提示信息
    """
    try:
        with open(filename, "w", encoding="utf-8") as f:
            f.write(content)
        return f"成功写入文件 {filename}。"
    except Exception as e:
        return f"写入文件失败:{e}"


if __name__ == "__main__":
    # 因为我们这里mcp的server端和服务端都在同一个机器上,所以使用stdio
    mcp.run(transport='stdio')  # 默认使用 stdio 传输

②MCP Client搭建

1. 封装MCPClient工具

  1. chat_loop:
  • process_query:封装与大模型的交流,将client读取到的tools和上下文内容传给大模型,然后由大模型返回内容
  1. cleanup:清理资源

2. 封装connect_to_server连接MCP工具

读取mcp 服务端文件,通过stdio协议连接

async def connect_to_server(self, server_script_path: str):
    """连接到 MCP 服务器

    参数:
        server_script_path: 服务器脚本路径 (.py 或 .js)
    """
    is_python = server_script_path.endswith('.py')
    is_js = server_script_path.endswith('.js')
    if not (is_python or is_js):
        raise ValueError("服务器脚本必须是 .py 或 .js 文件")

    command = "python" if is_python else "node"
    # 创建 StdioServerParameters 对象
    server_params = StdioServerParameters(
        command=command,
        args=[server_script_path],
        env=None
    )

    # 使用 stdio_client 创建与服务器的 stdio 传输
    stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))

    # 解包 stdio_transport,获取读取和写入句柄
    self.stdio, self.write = stdio_transport

    # 创建 ClientSession 对象,用于与服务器通信
    self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

    # 初始化会话
    await self.session.initialize()
    # 列出可用工具
    response = await self.session.list_tools()
    tools = response.tools
    print("\n连接到服务器,工具列表:", [tool.name for tool in tools])

全部代码

import os
import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import json
import traceback

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from openai import OpenAI
from dotenv import load_dotenv
import sys

load_dotenv()  # 加载环境变量从 .env文件中加载


class MCPClient:
    def __init__(self):
        # 初始化会话和客户端对象
        self.session: Optional[ClientSession] = None  # 会话对象
        self.exit_stack = AsyncExitStack()  # 退出堆栈
        self.openai = OpenAI(api_key=os.getenv("API_KEY"), base_url=os.getenv("BASE_URL"))
        self.model = os.getenv("MODEL")

    def get_response(self, messages: list, tools: list):
        response = self.openai.chat.completions.create(
            model=self.model,
            max_tokens=1000,
            messages=messages,
            tools=tools,
        )
        return response

    async def get_tools(self):
        # 列出可用工具
        response = await self.session.list_tools()
        available_tools = [{
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,  # 工具描述
                "parameters": tool.inputSchema  # 工具输入模式
            }
        } for tool in response.tools]

        return available_tools

    async def connect_to_server(self, server_script_path: str):
        """连接到 MCP 服务器

        参数:
            server_script_path: 服务器脚本路径 (.py 或 .js)
        """
        is_python = server_script_path.endswith('.py')
        is_js = server_script_path.endswith('.js')
        if not (is_python or is_js):
            raise ValueError("服务器脚本必须是 .py 或 .js 文件")

        command = "python" if is_python else "node"
        # 创建 StdioServerParameters 对象
        server_params = StdioServerParameters(
            command=command,
            args=[server_script_path],
            env=None
        )

        # 使用 stdio_client 创建与服务器的 stdio 传输
        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))

        # 解包 stdio_transport,获取读取和写入句柄
        self.stdio, self.write = stdio_transport

        # 创建 ClientSession 对象,用于与服务器通信
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        # 初始化会话
        await self.session.initialize()
        # 列出可用工具
        response = await self.session.list_tools()
        tools = response.tools
        print("\n连接到服务器,工具列表:", [tool.name for tool in tools])

    async def process_query(self, query: str) -> str:
        """使用 OpenAI 和可用工具处理查询"""

        # 创建消息列表
        messages = [
            {
                "role": "user",
                "content": query
            }
        ]

        # 列出可用工具
        available_tools = await self.get_tools()
        print(f"\n可用工具: {json.dumps([t['function']['name'] for t in available_tools], ensure_ascii=False)}")

        # 处理消息
        response = self.get_response(messages, available_tools)

        # 处理LLM响应和工具调用
        tool_results = []
        final_text = []
        for choice in response.choices:
            message = choice.message
            is_function_call = message.tool_calls

            # 如果不调用工具,则添加到 final_text 中
            if not is_function_call:
                final_text.append(message.content)
            # 如果是工具调用,则获取工具名称和输入
            else:
                # 解包tool_calls
                tool_name = message.tool_calls[0].function.name
                tool_args = json.loads(message.tool_calls[0].function.arguments)
                print(f"准备调用工具: {tool_name}")
                print(f"参数: {json.dumps(tool_args, ensure_ascii=False, indent=2)}")

                try:
                    # 执行工具调用,获取结果
                    result = await self.session.call_tool(tool_name, tool_args)
                    print(f"\n工具调用返回结果类型: {type(result)}")
                    print(f"工具调用返回结果属性: {dir(result)}")
                    print(
                        f"工具调用content类型: {type(result.content) if hasattr(result, 'content') else '无content属性'}")

                    # 安全处理content
                    content = None
                    if hasattr(result, 'content'):
                        if isinstance(result.content, list):
                            content = "\n".join(str(item) for item in result.content)
                            print(f"将列表转换为字符串: {content}")
                        else:
                            content = str(result.content)
                            print(f"工具调用content值: {content}")
                    else:
                        content = str(result)
                        print(f"使用完整result作为字符串: {content}")

                    tool_results.append({"call": tool_name, "result": content})
                    final_text.append(f"[调用工具 {tool_name} 参数: {json.dumps(tool_args, ensure_ascii=False)}]")

                    # 继续与工具结果进行对话
                    if message.content and hasattr(message.content, 'text'):
                        messages.append({
                            "role": "assistant",
                            "content": message.content
                        })

                    # 将工具调用结果添加到消息
                    messages.append({
                        "role": "user",
                        "content": content
                    })

                    # 获取下一个LLM响应
                    print("获取下一个LLM响应...")
                    response = self.get_response(messages, available_tools)
                    # 将结果添加到 final_text
                    content = response.choices[0].message.content or ""
                    final_text.append(content)
                except Exception as e:
                    print(f"\n工具调用异常: {str(e)}")
                    print(f"异常详情: {traceback.format_exc()}")
                    final_text.append(f"工具调用失败: {str(e)}")

        return "\n".join(final_text)

    async def chat_loop(self):
        """运行交互式聊天循环(没有记忆)"""
        print("\nMCP Client 启动!")
        print("输入您的查询或 'quit' 退出.")

        while True:
            try:
                query = input("\nQuery: ").strip()

                if query.lower() == 'quit':
                    break

                print("\n处理查询中...")
                response = await self.process_query(query)
                print("\n" + response)

            except Exception as e:
                print(f"\n错误: {str(e)}")
                print(f"错误详情: {traceback.format_exc()}")

    async def cleanup(self):
        """清理资源"""
        await self.exit_stack.aclose()


async def main():
    """
    主函数:初始化并运行 MCP 客户端
    此函数执行以下步骤:
    1. 检查命令行参数是否包含服务器脚本路径
    2. 创建 MCPClient 实例
    3. 连接到指定的服务器
    4. 运行交互式聊天循环
    5. 在结束时清理资源
    用法:
    python client.py <path_to_server_script>
    如:python client.py server.py
    """
    # 检查命令行参数
    if len(sys.argv) < 2:
        print("Usage: python client.py <path_to_server_script>")
        sys.exit(1)

    # 创建 MCPClient 实例
    client = MCPClient()
    try:
        # 连接到服务器
        await client.connect_to_server(sys.argv[1])
        # 运行聊天循环
        await client.chat_loop()
    finally:
        # 确保在任何情况下都清理资源
        await client.cleanup()


if __name__ == "__main__":
    asyncio.run(main())

运行效果

# 运行项目 
python client.py server.py

在这里插入图片描述

成功写入到本地:
在这里插入图片描述

二、拓展(智能体的执行模式)

目前业界较为流行的智能体执行模式主要有以下5种:

  1. ReAct:全Agent,边思考边执行
  2. Plan-and-Execute:全Agent,先思考任务需要哪几步(列计划),然后一步步执行
  3. 静态Workflow:几乎无Agent,通过任务编排把步骤都规划好,让Agent执行极少数具体步骤
  4. Workflow + 局部智能:半Agent,通过任务编排将主要步骤都规划好,具体执行让Agent操作
  5. 模块化分层规划(多级Agent):全Agent且多级,一个总Agent调配下面多个子Agent完成任务

在这里插入图片描述

①ReAct:思考-行动交替的动态规划执行

概念:Agent边推理边完成任务,响应速度快,成本低,但结果不确定性较高。

适用场景:任务流程不固定。

  • ReAct 模式适用于相对中等复杂度的任务,尤其当任务步骤需根据中间结果动态调整时(如某个任务需要根据查询资料来决定给后续);如果任务流程无法提前确定或需要频繁工具调用,ReAct 能提供较好的灵活性和实时反应能力。

在这里插入图片描述

优点:

  1. 降低幻觉,提升可信度:ReAct将推理过程显性地记录下来,提升了模型的可信度和人类可解释性。相比直接让模型一蹴而就给出答案,ReAct 通过逐步推理有效降低了幻觉率。
  2. 响应速度快,成本较低:由于每一步只需考虑当前子问题,ReAct 响应速度较快,成本也较低。

缺点:

  1. 缺乏全局规划,可能偏离用户期望:因为一次只规划一步,缺乏全局规划,有时会使智能体短视,模型可能会在局部反复横跳,重复思路。在没有外部干预时,ReAct 智能体可能一直执行下去却偏离用户期望,无法适时收敛结果以完成任务。

②Plan-and-Execute:先规划后调整

概念:先思考详细步骤后执行。智能体在行动之前,先生成一个较完整的计划。也就是将任务拆解成子任务清单,然后逐一执行。该方式任务完成度高,结果相比ReAct更准确,但响应较慢,成本较高。

适用场景:多步骤任务且步骤较为明确的场景。

  • 适合较复杂的多步骤任务,尤其是可以在一定程度上预见步骤的场景 。例如数据分析任务可以先规划“获取数据->清洗->分析->可视化”的步骤。
  • 当正确性比速度更重要时,相对ReAct来说,Plan-and-Execute 是值得选择的策略 。

在这里插入图片描述

优点:

  1. 任务完成度较高:预先规划赋予智能体一个全局视野,有助于提升复杂任务的准确率和完备性 ,特别是对于多工具、多步骤的复杂场景,能更好地分配步骤、协调顺序。
  2. 结果相比ReAct更加准确:流程更可控 — 可以审查或调整生成的计划,从而对最终执行有一定把控。一些测试证明在复杂任务中Plan-and-Execute 模式准确率要高于ReAct。

缺点:

  1. 成本相比ReAct更高:需要先额外一次(或多次)LLM调用来规划,再逐步执行,整体响应速度比ReAct慢,token消耗也更高 (有测试结果表明上升约50%)。
  2. 最终结果强依赖模型生成的整体任务规划:如果初始计划不佳,执行阶段可能走弯路甚至失败。虽然可以动态调整,但调整本身又需要额外逻辑和模型交互。

③静态Workflow:预设流程图式的执行

概念:几乎无Agent参与或只参与极少部分。该模式整体可控更稳定,但智能化程度低。

适用场景:流程明确固定,变化少的场景。

  • 静态工作流适合规则明确、变化少的任务。比如企业中的表单处理、固定报表生成、数据转换管道等。特别在企业场景下,如果业务流程高度重复且标准化,静态工作流能提供稳健的自动化方案 ,不必担心AI“越俎代庖”引入不确定性和风险。

在这里插入图片描述
静态工作流(Static Workflow)方式则几乎不让智能体自主决定流程,而是由开发者根据对任务的理解,将任务拆分成固定流程的子任务,并把这些子任务串起来执行。某些子任务可能由LLM完成(例如生成一段文字),但LLM在此不决定下一步做什么,因为下一步已经在程序固化。

也就是说,智能体遵循一个事先画好的脚本/流程图来执行,没有决策自由度。Workflow的实现可以借助很多支持Workflow编排的框架来完成,比如LangGraph、LlamaIndex Workflow等低层框架,或Dify、FastGPT这样低代码平台。

优点:

  1. 整体输出更可控、更稳定:静态工作流最大的优点是确定性和可控性。所有步骤由开发者掌控,因而系统行为可预测、易测试,避免了让LLM自己规划可能带来的不确定性。从工程角度看,这种方式更像传统软件开发,调试和监控相对简单。
  2. 成本低、速度快:静态流程通常执行速度更快、成本更低,因为不需要额外的决策推理步骤。每个LLM调用都有明确目的,减少了无效对话。

缺点:

  1. 智能化不足:最大缺点是缺乏灵活性,智能化不足。一旦预设流程无法完全匹配实际任务需求,Agent 就会表现不佳甚至失败。
  2. 有局限性,不通用:不具有通用智能,只能覆盖开发者想到的那些路径。特别对于未知领域或复杂任务,开发者往往难以提前设计出完善的流程图。如果业务流程发生变化,通常需要进行应用的调整或升级,成本较高,不如让智能体自主学习来得方便。

④Workflow+局部智能:兼顾确定性与智能化

概念:人为固定大流程,让Agent去做每个细小分支的任务。该模式兼顾可控性与灵活性,但对开发者要求高,开发者需要评估哪些地方需要智能体参与。

适用场景:流程较固定,但部分分支需要发散泛化思维,智能决策。

  • 混合法适用于流程较固定但存在关键智能决策点的任务场景。又或者一些长流程的子任务本身是复杂AI问题(如代码生成、数据分析),就特别适合拆出来让智能体发挥。实际项目中可以采用“静态框架 + 智能插件”的思路:框架提供流程壳子,插件(Agent)完成具体智能任务。

在这里插入图片描述
这种折衷的思路是将静态规划与智能体局部决策相结合。在整体上采用固定流程,但在特定步骤上授予智能体一定的自主规划或推理权限。设计主流程时,识别出其中具有不确定性或需要动态决策的步骤,交给LLM智能体以子任务的形式在内部自行规划或调用工具,完成后,流程继续按照预定顺序执行后续步骤。

换言之,大的流程图是固定的,只有某些节点是“智能节点”,里面运行一个受控的Agent子流程。

优点:

  1. 兼顾可控性与灵活性:与全自主Agent相比,整体行为更可控,因为智能部分被限制在局部范围内,不会干涉整个流程结构。相比纯静态流程,又具备了一定灵活应变能力——至少在那些标记出的复杂环节上,智能体可以随机应变。
  2. 过渡性好:开发者可以逐步引入智能节点,从全静态开始逐步引入智能环节。

缺点:

  1. 系统复杂度较高:增加系统复杂度,既要开发静态逻辑又要集成Agent。
  2. 对开发者要求较高:如何划分哪些步骤静态哪些智能并无定式,依赖开发者对任务的理解和持续调整。
  3. 可控性随智能节点增多而下降:局部智能体的表现仍然可能不稳定,如果智能节点过多,可控性也会相应下降。

⑤模块化的分层规划:化大为小逐层细化

概念:多级Agent。一个Agent当领导人,安排下面的Agent去完成各个模块的任务。这种模式,职责分明,从单个智能体来看复杂度低。但后续整个流程调试以及优化较为繁琐。

  • 对于复杂场景,可以构建多个智能体形成一个层次化结构:由“高层”Agent负责宏观规划和决策,“低层”Agent执行具体子任务,各司其职又互相配合。这种模式最具代表性的就是Supervisor模式的多智能体系统。
    分层规划包含至少两个层级:
    1. 高层Agent(规划者/经理):面向最终目标,制定子任务或子目标清单,分配给低层Agent。高层Agent关注全局进展,可能不直接与环境交互,而是通过检查下级完成情况来决定接下来做什么。
    2. 低层Agent(执行者/员工):接收高层指派的具体子任务,在其自己能力范围内完成。低层Agent可能本身用ReAct或其他模式来解决子任务,然后将结果汇报给高层。

适用场景:任务模块多,且每个模块设计领域不同。

  • 当任务规模庞大或专业模块众多时,分层/多Agent是很自然的选择。例如一项软件工程任务,从需求分析、设计、编码、测试到文档,每一步都可由不同Agent完成,由总负责人Agent协调。再如学术研究Agent,一个负责制定研究计划,几个分别去查文献、做实验、分析数据,最后综合。

在这里插入图片描述
这种架构下,高层和低层可以都是LLM实例,扮演不同角色进行多轮协作:高层发号施令,低层报告结果,循环往复直到任务完成。

这种模式常借助多智能体系统的开发框架来完成。比如LangGraph、AutoGen、CrewAI等。

优点:

  1. 职责分离:充分利用了职责分离的思想,每个Agent专注于其擅长的层面,提高效率和效果。高层Agent擅长宏观计划,确保不偏离大方向;低层Agent专注微观执行,可以投入更多细节推理(团队协作胜过一人包办)。
  2. 分治思想,单一智能体复杂度较低:在需要使用大量工具完成复杂任务的场景下,通过这种分治的模式可以大问题转小问题,降低单一智能体的决策复杂度。而对于上层任务规划,只需在低层Agent的“黑盒”接口层面做规划和调度,决策空间与推理复杂度大大减小
    在这里插入图片描述
  3. 并发度高,速度快:多子任务并行处理,提高速度(比如高层把任务分给两个低层Agent同时做不同部分)。
  4. 健壮性好:某个子任务失败可以局部重新规划与执行,提高健壮性。

缺点:

  1. 整体复杂度较高:多智能体系统的实现复杂度高。需要处理Agent间的通信、上下文共享、结果整合等问题。
  2. 调优及问题排查较为困难:错误责任归属问题:任务失败需要鉴别是高层计划不当还是低层执行不力,调试困难度增加。

参考文章:https://mp.weixin.qq.com/s/TLekMTd3K6jLoMHYI_XIig

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐