第六章:mcp高级Server能力——集成数据库
6.1 超越文件系统:让AI理解结构化数据
在前面的章节中,我们已经成功地让AI通过MCP与文件系统进行交互,实现了对非结构化文本数据的读写。然而,真实世界的应用充满了结构化数据,它们通常存储在关系型数据库(如MySQL, PostgreSQL)或NoSQL数据库(如MongoDB, Redis)中。这些数据——用户信息、产品目录、销售记录、传感器读数——是企业运营的核心。
如果AI只能读写.txt文件,那它的能力将大打折扣。要让AI成为真正强大的业务助手,它必须能够理解并操作这些结构化数据。本章的核心目标就是:打破AI与数据库之间的壁垒。
我们将学习如何构建一个MCP Server,它将充当一个“数据库翻译官”的角色。这个Server会连接到一个数据库,并将复杂的SQL查询或数据库操作,封装成简单、易于AI理解的MCP工具。例如,一个db/queryProducts工具,AI只需提供产品类别,就能获得格式化的产品列表,而无需关心背后执行的是哪条SELECT语句,也无需处理数据库连接、认证和事务。
本章我们将实现:
- 设计数据库Schema:我们将使用轻量级的SQLite数据库,设计一个简单的产品信息表。
- 构建数据库MCP Server:编写一个Python MCP Server,它将:
- 安全地连接到SQLite数据库。
- 实现
project/queryProducts工具,根据条件查询产品。 - 实现
project/addProduct工具,向数据库中添加新产品。
- 安全性考量:探讨在暴露数据库能力时,如何防止SQL注入等安全风险。
- Host端交互:更新我们的AI研究助理(Host),使其能够发现并使用这个新的数据库Server,回答诸如“帮我查找所有电子产品”或“添加一个价格为999的新手机”等问题。
通过本章的学习,你将掌握将任何数据库集成到AI应用中的核心模式,极大地扩展AI的应用场景。
6.2 项目准备与数据库设计
我们将继续在上一章的ai-research-assistant项目基础上进行扩展。
6.2.1 项目结构扩展
首先,为我们的新Server创建一个目录,并准备数据库文件。
# 确保你位于ai-research-assistant目录下并激活了虚拟环境
cd ai-research-assistant
# 创建数据库Server目录
mkdir db_server
# 创建一个Python脚本来初始化数据库和表
touch db_server/setup_database.py
# 创建数据库Server的主程序文件
touch db_server/main.py
6.2.2 设计并初始化数据库 (db_server/setup_database.py)
我们将创建一个名为inventory.db的SQLite数据库文件,其中包含一个products表。
# db_server/setup_database.py
import sqlite3
import os
DB_FILE = "inventory.db"
def setup():
# 如果数据库文件已存在,先删除
if os.path.exists(DB_FILE):
os.remove(DB_FILE)
# 连接到SQLite数据库(如果文件不存在,会自动创建)
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# 创建 products 表
cursor.execute('''
CREATE TABLE products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
category TEXT NOT NULL,
price REAL NOT NULL,
stock INTEGER NOT NULL
)
''')
# 插入一些示例数据
sample_products = [
('Laptop Pro', 'Electronics', 1299.99, 50),
('Smartphone X', 'Electronics', 899.50, 120),
('Coffee Maker', 'Home Appliances', 49.99, 200),
('Desk Chair', 'Furniture', 150.00, 75),
('Python Programming Book', 'Books', 39.95, 300)
]
cursor.executemany('''
INSERT INTO products (name, category, price, stock)
VALUES (?, ?, ?, ?)
''', sample_products)
# 提交事务
conn.commit()
# 关闭连接
conn.close()
print(f"Database '{DB_FILE}' created and populated successfully.")
if __name__ == "__main__":
setup()
现在,运行这个脚本来创建我们的数据库:
cd db_server
python3 setup_database.py
# 你会看到输出:Database 'inventory.db' created and populated successfully.
# 同时,一个 inventory.db 文件会出现在 db_server 目录下
cd ..
6.3 构建数据库MCP Server
现在我们来编写db_server/main.py。这个Server的结构与我们之前构建的Server类似,但其核心逻辑是与数据库交互。
6.3.1 db_server/main.py 代码实现
# db_server/main.py
import asyncio
import json
import logging
import sqlite3
from typing import Dict, Any, List, Optional
DB_FILE = "inventory.db"
class DatabaseServer:
def __init__(self, db_path: str):
self.db_path = db_path
logging.basicConfig(level=logging.INFO, format='%(asctime)s - DB_SERVER - %(levelname)s - %(message)s')
def _get_db_connection(self):
# SQLite连接不是线程安全的,但对于我们这个异步单线程模型是OK的
# 在多线程或更复杂的应用中,需要使用连接池
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row # 让查询结果可以像字典一样访问列
return conn
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
try:
if method == "project/listTools":
result = self._list_tools()
elif method == "project/executeTool":
tool_name = params.get("name")
tool_params = params.get("parameters", {})
if tool_name == "db/queryProducts":
result = self._query_products(tool_params.get("category"))
elif tool_name == "db/addProduct":
result = self._add_product(tool_params)
else:
raise ValueError(f"Tool '{tool_name}' not found.")
else:
raise NotImplementedError(f"Method '{method}' not supported.")
return {"jsonrpc": "2.0", "id": request_id, "result": result}
except Exception as e:
logging.error(f"Error handling request: {e}", exc_info=True)
return {"jsonrpc": "2.0", "id": request_id, "error": {"code": -32000, "message": str(e)}}
def _list_tools(self) -> List[Dict[str, Any]]:
return [
{
"name": "db/queryProducts",
"description": "Queries products from the inventory database, optionally filtering by category.",
"parameters": [
{"name": "category", "type": "string", "required": False, "description": "The category of products to query."}
]
},
{
"name": "db/addProduct",
"description": "Adds a new product to the inventory database.",
"parameters": [
{"name": "name", "type": "string", "required": True},
{"name": "category", "type": "string", "required": True},
{"name": "price", "type": "number", "required": True},
{"name": "stock", "type": "integer", "required": True}
]
}
]
def _query_products(self, category: Optional[str] = None) -> Dict[str, Any]:
conn = self._get_db_connection()
cursor = conn.cursor()
sql = "SELECT id, name, category, price, stock FROM products"
params = []
if category:
# 安全措施:使用参数化查询防止SQL注入
sql += " WHERE category = ?"
params.append(category)
cursor.execute(sql, params)
rows = cursor.fetchall()
conn.close()
products = [dict(row) for row in rows]
return {
"result": {"products": products},
"stdout": f"Found {len(products)} products.",
"stderr": None
}
def _add_product(self, params: Dict[str, Any]) -> Dict[str, Any]:
required_keys = ["name", "category", "price", "stock"]
if not all(key in params for key in required_keys):
raise ValueError("Missing required parameters for addProduct")
conn = self._get_db_connection()
cursor = conn.cursor()
# 安全措施:使用参数化查询
sql = "INSERT INTO products (name, category, price, stock) VALUES (?, ?, ?, ?)"
sql_params = (params['name'], params['category'], params['price'], params['stock'])
cursor.execute(sql, sql_params)
new_id = cursor.lastrowid
conn.commit()
conn.close()
return {
"result": {"productId": new_id},
"stdout": f"Successfully added product '{params['name']}' with ID {new_id}.",
"stderr": None
}
async def main():
# main函数与之前的Server完全相同
server = DatabaseServer(DB_FILE)
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await asyncio.get_running_loop().connect_read_pipe(lambda: protocol, asyncio.get_event_loop()._default_reader)
writer = asyncio.StreamWriter(protocol, asyncio.get_event_loop(), None, None)
logging.info("Database Server started. Waiting for requests...")
while True:
line = await reader.readline()
if not line:
break
request = json.loads(line.decode('utf-8'))
response = await server.handle_request(request)
response_line = json.dumps(response) + '\n'
sys.stdout.write(response_line)
await sys.stdout.flush()
if __name__ == "__main__":
import sys
sys.stderr = sys.stdout
asyncio.run(main())
6.3.2 代码与安全解析
-
_get_db_connection: 这个辅助方法负责建立到SQLite数据库的连接。conn.row_factory = sqlite3.Row是一个非常有用的设置,它让查询结果的每一行都像一个字典,可以通过列名来访问数据,这使得后续转换为JSON格式非常方便。 -
_list_tools: 我们声明了两个新的工具:db/queryProducts和db/addProduct。注意它们的命名空间db/,这有助于在Host端区分其来源和功能。参数定义清晰地说明了每个工具需要哪些输入。 -
_query_products和_add_product: 这是Server的核心业务逻辑。最重要的一点是安全性。- 防止SQL注入:我们绝对没有使用字符串拼接来构建SQL查询(例如
f"... WHERE category = '{category}'")。而是使用了参数化查询(... WHERE category = ?和cursor.execute(sql, params))。sqlite3库会自动处理这些参数,对特殊字符进行转义,从而从根本上杜绝了SQL注入的风险。这是任何时候与数据库交互都必须遵守的黄金法则。 - 输入验证:在
_add_product中,我们首先检查了所有必需的参数是否存在,如果不存在则直接抛出异常,避免了后续的数据库操作错误。
- 防止SQL注入:我们绝对没有使用字符串拼接来构建SQL查询(例如
-
主循环:
main函数和主循环部分与我们之前的Server实现完全一致,再次证明了MCP模式的复用性。我们只需要专注于实现核心的业务逻辑(即工具的实现),而无需关心底层的通信细节。
6.4 升级AI研究助理以使用数据库
现在,我们的数据库Server已经准备就绪。下一步是让我们的AI研究助理(Host)能够连接并使用它。
6.4.1 更新Host主程序 (assistant_host/main.py)
我们需要对assistant_host/main.py做一些修改,主要是:
- 在
McpMultiplexer中添加一个新的StdioMcpConnection来启动和管理db_server。 - 在
AiResearchAssistant的answer方法中,增加对数据库相关问题的理解和路由逻辑。
以下是需要修改和添加的部分:
# assistant_host/main.py (部分修改)
# ... (StdioMcpConnection 和 McpMultiplexer 类保持不变) ...
class AiResearchAssistant:
# ... (__init__ 保持不变) ...
async def answer(self, question: str) -> str:
logging.info(f"Received question: '{question}'")
context_parts = []
tasks = []
# --- 原有的路由逻辑 ---
if "transformer" in question.lower():
tasks.append(self.mux.request("wiki", "project/executeTool",
{"name": "project/getSummary", "parameters": {"topic": "transformer model"}}))
# ... (其他原有逻辑保持不变) ...
# --- 新增:数据库查询路由逻辑 ---
if "find products" in question.lower() or "query products" in question.lower():
# 简单的关键词提取,真实应用中会使用更复杂的NLU
category = None
if "electronics" in question.lower():
category = "Electronics"
elif "books" in question.lower():
category = "Books"
params = {"name": "db/queryProducts", "parameters": {}}
if category:
params["parameters"]["category"] = category
tasks.append(self.mux.request("db", "project/executeTool", params))
# ... (任务执行和上下文构建逻辑需要微调以处理新格式) ...
if not tasks:
return "I'm not sure how to answer that. Please ask about products, transformers, etc."
logging.info(f"Gathering context from {len(tasks)} sources...")
responses = await asyncio.gather(*tasks, return_exceptions=True)
# 更新上下文构建逻辑
for i, res in enumerate(responses):
if isinstance(res, Exception):
context_parts.append(f"Error from source {i}: {res}")
continue
if 'result' in res:
tool_result = res['result'].get('result', {})
if 'summary' in tool_result:
context_parts.append(f"[Wikipedia Summary]:\n{tool_result['summary']}")
elif 'content' in res['result']:
content = base64.b64decode(res['result']['content']).decode('utf-8')
context_parts.append(f"[Local File Content]:\n{content}")
elif 'products' in tool_result:
# 新增:格式化数据库查询结果
products_str = json.dumps(tool_result['products'], indent=2)
context_parts.append(f"[Database Query Result]:\n{products_str}")
# ... (构建prompt和调用OpenAI的部分保持不变) ...
context_str = "\n\n---\n\n".join(context_parts)
prompt = f"Based on the following context from multiple sources, please provide a comprehensive answer to the user's question.\n\nContext:\n{context_str}\n\nQuestion: {question}\n\nAnswer:"
logging.info("Sending request to OpenAI...")
response = await openai.ChatCompletion.acreate(
model="gpt-4",
messages=[{"role": "system", "content": "You are a helpful research assistant."}, {"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def main():
# ... (api_key检查保持不变) ...
mux = McpMultiplexer()
# --- 更新Server连接配置 ---
fs_conn = StdioMcpConnection("fs", "python3 main.py", "../fs_server")
wiki_conn = StdioMcpConnection("wiki", "python3 main.py", "../wiki_server")
# 新增:数据库Server连接
db_conn = StdioMcpConnection("db", "python3 main.py", "../db_server")
mux.add_connection(fs_conn)
mux.add_connection(wiki_conn)
mux.add_connection(db_conn) # 添加新的连接
assistant = AiResearchAssistant(mux, api_key)
try:
await mux.start_all()
await asyncio.sleep(3) # 增加一点等待时间,因为有3个Server了
logging.info("All servers started. AI Research Assistant is ready.")
# ... (交互循环保持不变) ...
finally:
logging.info("Shutting down all servers...")
await mux.stop_all()
if __name__ == "__main__":
asyncio.run(main())
6.4.2 运行与测试
现在,一切准备就绪。启动我们的AI研究助理,它现在应该会同时启动3个Server子进程。
-
启动Host:
cd assistant_host python3 main.py -
观察日志:你会看到
fs,wiki, 和db三个Server都已成功启动的日志。 -
提出数据库相关问题:
查询问题:
Ask your research question (or type 'exit'): Can you find all products in the Electronics category?执行流程:
- Host的
answer方法检测到关键词find products和electronics。 - 它向名为
db的Server发送一个project/executeTool请求,工具名为db/queryProducts,参数为{"category": "Electronics"}。 db_server接收请求,执行参数化的SQL查询SELECT ... FROM products WHERE category = ?。- 查询结果(一个包含Laptop和Smartphone的列表)被格式化为JSON并返回给Host。
- Host将这个JSON结果作为上下文,连同用户的问题一起发送给OpenAI。
- OpenAI生成一个人类可读的答案。
预期回答:
🤖 Assistant: Certainly! I found the following products in the Electronics category: * **Laptop Pro**: Priced at $1299.99 with 50 units in stock. * **Smartphone X**: Priced at $899.50 with 120 units in stock. - Host的
6.5 总结:MCP作为能力的“粘合剂”
本章我们迈出了关键的一步,成功地将结构化的数据库资源通过MCP协议暴露给了AI。这个过程清晰地展示了MCP作为一种“能力粘合剂”的强大作用。
- 封装复杂性:数据库Server将SQL查询、数据库连接、事务管理和安全防护等复杂性完全封装起来,只对外提供简单明了的
db/queryProducts工具。AI(Host)无需成为数据库专家,就能使用数据库的能力。 - 标准化交互:Host与数据库Server的交互方式,和它与文件系统Server、维基百科Server的交互方式完全一样,都是通过标准的JSON-RPC消息。这使得Host端的代码可以轻松扩展,以支持任意数量和类型的后端服务。
- 安全性:通过在Server端强制使用参数化查询等最佳实践,我们确保了即使AI(或恶意用户)尝试构造有害的输入,也不会对后端的数据库造成威胁。
现在,你的AI应用不仅能读书看报(文件系统),还能查阅百科(Web API),更能清点库存、查询订单(数据库)。在下一章,我们将探索另一个高级主题:如何构建一个RAG(检索增强生成)系统,让AI能够基于海量文档进行更智能的问答。
更多推荐


所有评论(0)