简单修改将stdio协议的mcp项目转为sse协议的mcp项目
修改connect to server函数启动方式:stdio:,sse:修改一行代码:和启动方式的不同stdio:无需手动启动server.py,启动client的时候会传入server的路径作为参数。但可以直接运行运行 mcp inspector来测试server的功能。sse:要先,启动服务器,然后才能运行 ,启动mcp inspector或启动client链接server。clientss
·
Clinet
-
修改connect to server函数
-
启动方式:
-
stdio:
uv run client.py search_server.py
, -
sse:
uv run sse_client.py
http://localhost:8000/sse
-
Server
-
修改一行代码:
mcp.run(transport='sse')
和mcp.run(transport='stdio')
-
启动方式的不同
-
stdio:无需手动启动server.py,启动client的时候会传入server的路径作为参数。但可以直接运行
mcp dev server/search_server.py
运行 mcp inspector来测试server的功能。 -
sse:要先
python server/sse_search_server
,启动服务器,然后才能运行mcp dev server/sse_search_server.py
,启动mcp inspector或启动client链接server。
-
直接放全部代码
stdio
server
from datetime import datetime
import glob
import json
import os
from typing import List
from mcp import Resource
from mcp.server.fastmcp import FastMCP
from mcp.types import Resource, TextContent, EmbeddedResource
from exa_py import Exa
# 初始化 FastMCP 服务器
mcp = FastMCP("qwen-docs-server")
# 定义文档目录常量
DOCS_DIR = ""
RESULT_DIR = "../log"
# 确保结果目录存在
os.makedirs(RESULT_DIR, exist_ok=True)
# Exa API 密钥
EXA_API_KEY = ""
@mcp.tool(description="通过 Exa 搜索 API 查询问题")
def request_exa(question: str) -> str:
"""使用 Exa 搜索 API 查询问题的相关内容
参数:
question: 要搜索的问题
返回:
str: 搜索结果或错误信息
"""
try:
# 初始化 Exa 客户端
exa = Exa(api_key=EXA_API_KEY)
print('server: 正在调用搜索工具。。。')
# 发送 API 请求
search_results = exa.search_and_contents(
question,
text={"max_characters": 1000}
)
# 格式化结果
formatted_results = []
for index, result in enumerate(search_results.results):
formatted_results.append(
f"title {index}: {result.title}\n"
f"content {index}: {result.text.replace('\n', '')}"
)
return '\n\n'.join(formatted_results)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
# ===== 主程序入口 =====
# mcp dev server/search_server.py
if __name__ == "__main__":
# 以标准 I/O 方式运行 MCP 服务器
mcp.run(transport='stdio')
# mcp.run(transport='sse')
client
# client.py
import asyncio
import json
import os
import sys
from typing import Optional, Dict, List
from contextlib import AsyncExitStack
from openai import OpenAI
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# 加载 .env 文件,确保 API Key 受到保护
load_dotenv()
class MCPClient:
"""MCP客户端,用于与OpenAI API交互并调用MCP工具"""
def __init__(self):
"""初始化MCP客户端"""
# 环境变量检查和初始化
self.stdio = None
self.write = None
self.openai_api_key = ''
self.base_url = ""
self.model = ""
if not self.openai_api_key:
raise ValueError(
"❌ 未找到OpenAI API Key,请在.env文件中设置OPENAI_API_KEY"
)
# 初始化组件
self.exit_stack = AsyncExitStack()
self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)
self.session: Optional[ClientSession] = None
self.resources_dict = {}
async def connect_to_server(self, server_script_path: str):
"""连接到MCP服务器并初始化会话"""
# 检查脚本类型
if server_script_path.endswith(".py"):
command = "python"
elif server_script_path.endswith(".js"):
command = "node"
else:
raise ValueError("服务器脚本必须是.py或.js文件")
# 设置服务器参数并建立连接
server_params = StdioServerParameters(
command=command, args=[server_script_path], env=None
)
# 初始化连接和会话
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
# 加载服务器工具和资源
await self._load_tools_and_resources()
async def _load_tools_and_resources(self):
"""加载服务器上的工具和资源"""
# 加载工具
tools_response = await self.session.list_tools()
tools = tools_response.tools
print(f"\n已连接到服务器,支持以下工具: {[tool.name for tool in tools]}")
# 加载资源
resources_response = await self.session.list_resources()
resources_names = [resource.name for resource in resources_response.resources]
# 读取所有资源内容
for resource_name in resources_names:
resource = await self.session.read_resource(resource_name)
self.resources_dict[resource_name] = resource.contents[0].text
async def transform_json(self, tools_data: List[Dict]) -> List[Dict]:
"""将Claude Function calling格式转换为OpenAI格式"""
result = []
for item in tools_data:
old_func = item["function"]
# 构建新的function对象
new_func = {
"name": old_func["name"],
"description": old_func["description"],
"parameters": {},
}
# 转换input_schema为parameters
if "input_schema" in old_func and isinstance(
old_func["input_schema"], dict
):
schema = old_func["input_schema"]
new_func["parameters"] = {
"type": schema.get("type", "object"),
"properties": schema.get("properties", {}),
"required": schema.get("required", []),
}
result.append({"type": item["type"], "function": new_func})
return result
async def process_query(self, query: str) -> str:
"""处理用户查询并调用必要的工具"""
if not self.session:
return "❌ 未连接到MCP服务器"
try:
# 1. 获取可用工具
tools_response = await self.session.list_tools()
tools_data = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema,
},
}
for tool in tools_response.tools
]
# 2. 转换工具格式
available_tools = await self.transform_json(tools_data)
# 5. 发送请求到OpenAI
messages = [{"role": "user", "content": query}]
response = self.client.chat.completions.create(
model=self.model, messages=messages, tools=available_tools
)
# print('model response: ', response)
# 6. 处理工具调用
max_tool_calls = 5 # 限制工具调用次数
call_count = 0
list = []
while (
response.choices[0].message.tool_calls and call_count < max_tool_calls
):
tool_call = response.choices[0].message.tool_calls[0]
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 调用工具
print(f"\n[正在调用工具 {tool_name}, 参数: {tool_args}]")
result = await self.session.call_tool(tool_name, tool_args)
# print('tools response:', result)
# 更新消息历史
messages.append(response.choices[0].message.model_dump())
messages.append(
{
"role": "tool",
"content": result.content[0].text,
"tool_call_id": tool_call.id,
}
)
# 再次请求OpenAI
response = self.client.chat.completions.create(
model=self.model, messages=messages, tools=available_tools
)
# print('message: ', messages)
list.append(messages)
print('resp: ', response)
call_count += 1
with open('../message.json', "w", encoding="utf-8") as f:
json.dump(list, f, ensure_ascii=False, indent=2)
# 7. 返回最终结果
return response.choices[0].message.content
except Exception as e:
return f"❌ 处理查询时出错: {str(e)}"
async def chat_loop(self):
"""运行交互式聊天循环"""
print("\n MCP客户端已启动!输入'quit'退出")
while True:
try:
query = input("\n你: ").strip()
if query.lower() == "quit":
break
print("\n 处理中...")
response = await self.process_query(query)
print(f"\n 回复: {response}")
except KeyboardInterrupt:
print("\n\n 已终止会话")
break
except Exception as e:
print(f"\n⚠️ 发生错误: {str(e)}")
async def cleanup(self):
"""清理资源"""
if self.exit_stack:
await self.exit_stack.aclose()
print("\n 已清理资源并断开连接")
# 启动时不用启动server,直接启动client即可
# uv run client.py search_server.py
async def main():
"""主函数"""
if len(sys.argv) < 2:
print("用法: python client.py <服务器脚本路径>")
sys.exit(1)
client = MCPClient()
try:
await client.connect_to_server(sys.argv[1])
await client.chat_loop()
except Exception as e:
print(f"\n⚠️ 程序出错: {str(e)}")
finally:
await client.cleanup()
# uv run client.py search_server.py
if __name__ == "__main__":
asyncio.run(main())
sse
server
from datetime import datetime
import glob
import json
import os
from typing import List
from mcp import Resource
from mcp.server.fastmcp import FastMCP
from mcp.types import Resource, TextContent, EmbeddedResource
from exa_py import Exa
# 初始化 FastMCP 服务器
mcp = FastMCP(
name="weather"
)
# 定义文档目录常量
DOCS_DIR = ""
RESULT_DIR = "../log"
# 确保结果目录存在
os.makedirs(RESULT_DIR, exist_ok=True)
# Exa API 密钥
EXA_API_KEY = ""
@mcp.tool(description="通过 Exa 搜索 API 查询问题")
def request_exa(question: str) -> str:
"""使用 Exa 搜索 API 查询问题的相关内容
参数:
question: 要搜索的问题
返回:
str: 搜索结果或错误信息
"""
try:
# 初始化 Exa 客户端
exa = Exa(api_key=EXA_API_KEY)
print('server: 正在调用搜索工具。。。')
# 发送 API 请求
search_results = exa.search_and_contents(
question,
text={"max_characters": 1000}
)
# 格式化结果
formatted_results = []
for index, result in enumerate(search_results.results):
formatted_results.append(
f"title {index}: {result.title}\n"
f"content {index}: {result.text.replace('\n', '')}"
)
return '\n\n'.join(formatted_results)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
# ===== 主程序入口 =====
# 对于sse server,要先python server/sse_search_server,启动服务器
# 然后再运行 mcp dev server/sse_search_server.py,启动mcp inspector
if __name__ == "__main__":
# 以标准 I/O 方式运行 MCP 服务器
# mcp.run(transport='stdio')
mcp.run(transport='sse')
client
# client.py
import asyncio
import json
import os
import sys
from typing import Optional, Dict, List
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from openai import OpenAI
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# 加载 .env 文件,确保 API Key 受到保护
load_dotenv()
class MCPClient:
"""MCP客户端,用于与OpenAI API交互并调用MCP工具"""
def __init__(self):
"""初始化MCP客户端"""
# 环境变量检查和初始化
self.stdio = None
self.write = None
self.openai_api_key = ''
self.base_url = ""
self.model = "d"
if not self.openai_api_key:
raise ValueError(
"❌ 未找到OpenAI API Key,请在.env文件中设置OPENAI_API_KEY"
)
# 初始化组件
self.exit_stack = AsyncExitStack()
self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)
self.session: Optional[ClientSession] = None
self.resources_dict = {}
async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server running with SSE transport"""
# 创建 SSE 客户端连接上下文管理器
self._streams_context = sse_client(url=server_url)
# 异步初始化 SSE 连接,获取数据流对象
streams = await self._streams_context.__aenter__()
# 使用数据流创建 MCP 客户端会话上下文
self._session_context = ClientSession(*streams)
# 初始化客户端会话对象
self.session: ClientSession = await self._session_context.__aenter__()
# 执行 MCP 协议初始化握手
await self.session.initialize()
# 加载服务器工具和资源
await self._load_tools_and_resources()
async def _load_tools_and_resources(self):
"""加载服务器上的工具和资源"""
# 加载工具
tools_response = await self.session.list_tools()
tools = tools_response.tools
print(f"\n已连接到服务器,支持以下工具: {[tool.name for tool in tools]}")
# 加载资源
resources_response = await self.session.list_resources()
resources_names = [resource.name for resource in resources_response.resources]
# 读取所有资源内容
for resource_name in resources_names:
resource = await self.session.read_resource(resource_name)
self.resources_dict[resource_name] = resource.contents[0].text
async def transform_json(self, tools_data: List[Dict]) -> List[Dict]:
"""将Claude Function calling格式转换为OpenAI格式"""
result = []
for item in tools_data:
old_func = item["function"]
# 构建新的function对象
new_func = {
"name": old_func["name"],
"description": old_func["description"],
"parameters": {},
}
# 转换input_schema为parameters
if "input_schema" in old_func and isinstance(
old_func["input_schema"], dict
):
schema = old_func["input_schema"]
new_func["parameters"] = {
"type": schema.get("type", "object"),
"properties": schema.get("properties", {}),
"required": schema.get("required", []),
}
result.append({"type": item["type"], "function": new_func})
return result
async def process_query(self, query: str) -> str:
"""处理用户查询并调用必要的工具"""
if not self.session:
return "❌ 未连接到MCP服务器"
try:
# 1. 获取可用工具
tools_response = await self.session.list_tools()
tools_data = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema,
},
}
for tool in tools_response.tools
]
# 2. 转换工具格式
available_tools = await self.transform_json(tools_data)
# 5. 发送请求到OpenAI
messages = [{"role": "user", "content": query}]
response = self.client.chat.completions.create(
model=self.model, messages=messages, tools=available_tools
)
# print('model response: ', response)
# 6. 处理工具调用
max_tool_calls = 5 # 限制工具调用次数
call_count = 0
list = []
while (
response.choices[0].message.tool_calls and call_count < max_tool_calls
):
tool_call = response.choices[0].message.tool_calls[0]
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 调用工具
print(f"\n[正在调用工具 {tool_name}, 参数: {tool_args}]")
result = await self.session.call_tool(tool_name, tool_args)
# print('tools response:', result)
# 更新消息历史
messages.append(response.choices[0].message.model_dump())
messages.append(
{
"role": "tool",
"content": result.content[0].text,
"tool_call_id": tool_call.id,
}
)
# 再次请求OpenAI
response = self.client.chat.completions.create(
model=self.model, messages=messages, tools=available_tools
)
# print('message: ', messages)
list.append(messages)
print('resp: ', response)
call_count += 1
with open('../message.json', "w", encoding="utf-8") as f:
json.dump(list, f, ensure_ascii=False, indent=2)
# 7. 返回最终结果
return response.choices[0].message.content
except Exception as e:
return f"❌ 处理查询时出错: {str(e)}"
async def chat_loop(self):
"""运行交互式聊天循环"""
print("\n MCP客户端已启动!输入'quit'退出")
while True:
try:
query = input("\n你: ").strip()
if query.lower() == "quit":
break
print("\n 处理中...")
response = await self.process_query(query)
print(f"\n 回复: {response}")
except KeyboardInterrupt:
print("\n\n 已终止会话")
break
except Exception as e:
print(f"\n⚠️ 发生错误: {str(e)}")
async def cleanup(self):
"""清理资源"""
if self.exit_stack:
await self.exit_stack.aclose()
print("\n 已清理资源并断开连接")
# 启动时不用启动server,直接启动client即可
# uv run client.py search_server.py
async def main():
"""主函数"""
if len(sys.argv) < 2:
print("用法: python client.py <服务器脚本路径>")
sys.exit(1)
client = MCPClient()
try:
await client.connect_to_sse_server(sys.argv[1])
await client.chat_loop()
except Exception as e:
print(f"\n⚠️ 程序出错: {str(e)}")
finally:
await client.cleanup()
# uv run client/sse_client.py http://localhost:8000/sse
if __name__ == "__main__":
asyncio.run(main())
更多推荐
所有评论(0)