目录

0 环境准备

1 开发环境准备

1.1 创建项目python环境

1.2 在pycharm创建项目

1.3 安装项目依赖

2 资源准备

3 程序逻辑实现

 3.1 服务端逻辑实现

3.1.1 新建sse_weather_server.py文件

3.1.2 导入相关依赖包

3.1.3 初始化MCP服务

3.1.4  定义openweather配置

3.1.5 定义获取城市天气方法

3.1.6 定义格式话天气数据方法

3.1.7 注册MCP工具

3.1.8 定义sse服务接口

3.1.9 定义main方法

3.2 客户端逻辑实现

3.2.1 新建agents_sdk_tool_mcp.py文件

3.2.2 导入相关依赖包

3.2.3 加载配置文件

3.2.4 定义获取openai客户端

3.2.5 定义获取大模型方法

3.2.6 定义agent方法

3.2.7 定义连接mcp服务器方法

3.2.8 定义main方法

4 完整源码

4.1 服务端完整源码

4.2 客户端完整源码

5 测试

 附录

配置文件.env


0 环境准备

  • ollama已部署推理模型qwen:7b
  • 已安装miniconda环境
  • 具备科学上网条件

1 开发环境准备

1.1 创建项目python环境

通过conda命令创建项目的python开发环境

 conda create -n mcp_demo python=3.10

1.2 在pycharm创建项目

  • 解释器类型:选择自定义环境
  • 环境:选择现有
  • 类型:选择conda
  • 环境:选择上一步创建的环境

1.3 安装项目依赖

安装openai、openai-agents openai相关依赖

安装python-dotenv python读取环境配置文件依赖

 pip install openai python-dotenv openai-agents httpx mcp

2 资源准备

        本项目中需要调用天气接口获取城市的天气信息,因此需要注册天气API网站,并且获取调用key。

具体流程请参考以下链接的第二章节:MCP实战-基于Ollama+qwen2.5以sse方式实现MCP协议工具调用_mcp sse-CSDN博客

3 程序逻辑实现

 3.1 服务端逻辑实现

3.1.1 新建sse_weather_server.py文件

3.1.2 导入相关依赖包

import argparse
import json

import httpx
import uvicorn
from mcp.server import FastMCP, Server
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route, Mount

3.1.3 初始化MCP服务

mcp = FastMCP("sse_WeatherServer")

3.1.4  定义openweather配置

load_dotenv()

openweather_api_key = os.getenv('OPENWEATHER_API_KEY')
openweather_base_url = os.getenv('OPENWEATHER_BASE_URL')
user_agent = os.getenv('USER_AGENT')

注意将APIkey替换为自己申请的key。

3.1.5 定义获取城市天气方法

async def get_weather(city):
    """
    从OpenWeather API 获取天气信息
    :param city: 城市名称(需要试用英文,如 beijing)
    :return: 天气数据字典;若发生错误,返回包含error信息的字典
    """
    params = {
        "q": city,
        "appid": openweather_api_key,
        "units": "metric",
        "lang": "zh_cn",
    }
    headers = {"User-Agent": user_agent}

    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(openweather_base_url, params=params, headers=headers, timeout=30)
            response.raise_for_status()
            return response.json()
        except httpx.HTTPStatusError as e:
            return {"error": f"HTTP请求错误:{e}"}
        except Exception as e:
            return {"error": f"发生错误:{e}"}

3.1.6 定义格式话天气数据方法

def format_weather_data(data):
    """
    格式化天气数据
    :param data: 天气数据字典
    :return: 格式化后的字符串;若发生错误,返回包含error信息的字符串
    """

    #  如果传入的是字符串,则先转换成字典
    if isinstance(data, str):
        data = json.loads(data)

    if "error" in data:
        return data["error"]
    weather = data["weather"][0]["description"]
    temperature = data["main"]["temp"]
    city = data["name"]
    country = data["sys"]["country"]
    humidity = data["main"]["humidity"]
    wind = data["wind"]["speed"]

    return f"城市:{city}, {country}\n天气:{weather}\n温度:{temperature}°C\n湿度:{humidity}%\n风速:{wind}m/s"

3.1.7 注册MCP工具


@mcp.tool()
async def get_weather_tool(city: str):
    """
    获取城市的天气信息
    :param city: 城市名称(需要试用英文,如 beijing)
    :return: 天气数据字典;若发生错误,返回包含error信息的字典
    """
    weather_data = await get_weather(city)
    return format_weather_data(weather_data)

3.1.8 定义sse服务接口


def create_starlette_app(mcp_server: Server, *, debug: bool = False):
    """创建 Starlette 应用能通过sse提供mcp服务"""
    sse = SseServerTransport("/messages/")

    async def handle_sse(request):
        async with sse.connect_sse(
            request.scope,
            request.receive,
            request._send,
        ) as (read_stream,write_stream):
            await mcp_server.run(
                read_stream,
                write_stream,
                mcp_server.create_initialization_options(),
            )

    return Starlette(
        debug=debug,
        routes=[
            Route("/sse", endpoint=handle_sse),
            Mount("/messages/", app=sse.handle_post_message),
        ],
    )

3.1.9 定义main方法

if __name__ == "__main__":
    mcp_server = mcp._mcp_server

    parser = argparse.ArgumentParser(description='Run MCP SSE-based server')
    parser.add_argument("--host", default="0.0.0.0", help="MCP server host")
    parser.add_argument("--port", default=18081, type=int, help="MCP server port")
    args = parser.parse_args()

    starlette_app = create_starlette_app(mcp_server, debug=True)
    uvicorn.run(starlette_app, host=args.host, port=args.port)

3.2 客户端逻辑实现

3.2.1 新建agents_sdk_tool_mcp.py文件

3.2.2 导入相关依赖包

import asyncio
import os
from agents import Runner, OpenAIChatCompletionsModel, Agent
from agents.mcp import MCPServer, MCPServerSse
from dotenv import load_dotenv
from openai import AsyncOpenAI

3.2.3 加载配置文件

load_dotenv()

3.2.4 定义获取openai客户端

def get_openai_client():
    api_key = os.getenv('OPENAI_API_KEY')
    base_url = os.getenv('BASE_URL')
    return AsyncOpenAI(
        api_key=api_key,
        base_url=base_url
    )

3.2.5 定义获取大模型方法

def get_chat_model():
    model_name = os.getenv('MODEL')
    return OpenAIChatCompletionsModel(
        model=model_name,
        openai_client=get_openai_client()
    )

3.2.6 定义agent方法

async def run(mcp_server: MCPServer):
    agent = Agent(
        name="Assistant",
        instructions="你是一个天气查询的助手",
        mcp_servers=[mcp_server],
        model=get_chat_model()
    )

    message = "武汉今天天气如何?"
    print(f"Running: {message}")
    result = await Runner.run(starting_agent=agent, input=message)
    print(result.final_output)

3.2.7 定义连接mcp服务器方法

async def mcp_run():
    async with MCPServerSse(
        name="Weather Server",
        cache_tools_list=True,
        params = {"url": "http://localhost:18081/sse"}
    ) as server:
        await run(server)

3.2.8 定义main方法

if __name__ == "__main__":
    asyncio.run(mcp_run())

4 完整源码

4.1 服务端完整源码

import argparse
import json
import os

import httpx
import uvicorn
from dotenv import load_dotenv
from mcp.server import FastMCP, Server
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route, Mount

load_dotenv()

mcp = FastMCP("sse_WeatherServer")

openweather_api_key = os.getenv('OPENWEATHER_API_KEY')
openweather_base_url = os.getenv('OPENWEATHER_BASE_URL')
user_agent = os.getenv('USER_AGENT')

async def get_weather(city):
    """
    从OpenWeather API 获取天气信息
    :param city: 城市名称(需要试用英文,如 beijing)
    :return: 天气数据字典;若发生错误,返回包含error信息的字典
    """
    params = {
        "q": city,
        "appid": openweather_api_key,
        "units": "metric",
        "lang": "zh_cn",
    }
    headers = {"User-Agent": user_agent}

    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(openweather_base_url, params=params, headers=headers, timeout=30)
            response.raise_for_status()
            return response.json()
        except httpx.HTTPStatusError as e:
            return {"error": f"HTTP请求错误:{e}"}
        except Exception as e:
            return {"error": f"发生错误:{e}"}

def format_weather_data(data):
    """
    格式化天气数据
    :param data: 天气数据字典
    :return: 格式化后的字符串;若发生错误,返回包含error信息的字符串
    """

    #  如果传入的是字符串,则先转换成字典
    if isinstance(data, str):
        data = json.loads(data)

    if "error" in data:
        return data["error"]
    weather = data["weather"][0]["description"]
    temperature = data["main"]["temp"]
    city = data["name"]
    country = data["sys"]["country"]
    humidity = data["main"]["humidity"]
    wind = data["wind"]["speed"]

    return f"城市:{city}, {country}\n天气:{weather}\n温度:{temperature}°C\n湿度:{humidity}%\n风速:{wind}m/s"


@mcp.tool()
async def get_weather_tool(city: str):
    """
    获取城市的天气信息
    :param city: 城市名称(需要试用英文,如 beijing)
    :return: 天气数据字典;若发生错误,返回包含error信息的字典
    """
    weather_data = await get_weather(city)
    return format_weather_data(weather_data)


def create_starlette_app(mcp_server: Server, *, debug: bool = False):
    """创建 Starlette 应用能通过sse提供mcp服务"""
    sse = SseServerTransport("/messages/")

    async def handle_sse(request):
        async with sse.connect_sse(
            request.scope,
            request.receive,
            request._send,
        ) as (read_stream,write_stream):
            await mcp_server.run(
                read_stream,
                write_stream,
                mcp_server.create_initialization_options(),
            )

    return Starlette(
        debug=debug,
        routes=[
            Route("/sse", endpoint=handle_sse),
            Mount("/messages/", app=sse.handle_post_message),
        ],
    )



if __name__ == "__main__":
    mcp_server = mcp._mcp_server

    parser = argparse.ArgumentParser(description='Run MCP SSE-based server')
    parser.add_argument("--host", default="0.0.0.0", help="MCP server host")
    parser.add_argument("--port", default=18081, type=int, help="MCP server port")
    args = parser.parse_args()

    starlette_app = create_starlette_app(mcp_server, debug=True)
    uvicorn.run(starlette_app, host=args.host, port=args.port)


4.2 客户端完整源码

import asyncio
import os
from agents import Runner, OpenAIChatCompletionsModel, Agent
from agents.mcp import MCPServer, MCPServerSse
from dotenv import load_dotenv
from openai import AsyncOpenAI

load_dotenv()


def get_openai_client():
    api_key = os.getenv('OPENAI_API_KEY')
    base_url = os.getenv('BASE_URL')
    return AsyncOpenAI(
        api_key=api_key,
        base_url=base_url
    )

def get_chat_model():
    model_name = os.getenv('MODEL')
    return OpenAIChatCompletionsModel(
        model=model_name,
        openai_client=get_openai_client()
    )


async def run(mcp_server: MCPServer):
    agent = Agent(
        name="Assistant",
        instructions="你是一个天气查询的助手",
        mcp_servers=[mcp_server],
        model=get_chat_model()
    )

    message = "武汉今天天气如何?"
    print(f"Running: {message}")
    result = await Runner.run(starting_agent=agent, input=message)
    print(result.final_output)


async def mcp_run():
    async with MCPServerSse(
        name="Weather Server",
        cache_tools_list=True,
        params = {"url": "http://localhost:18081/sse"}
    ) as server:
        await run(server)


if __name__ == "__main__":
    asyncio.run(mcp_run())

5 测试

  • 查看调用过程

        查看new_items属性,可以看到agents-sdk调用MCP智能体的流程

  • 运行结果

 附录

配置文件.env

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐