工具驱动与 MCP 协议——异构环境的同构编织《Harness平台实战》
·
第8章 工具驱动与 MCP 协议——异构环境的同构编织
《从0到1实现一个企业级 Harness 平台》
本章聚焦 Harness 平台的工具层架构——如何通过 MCP(Model Context Protocol)协议将异构的外部系统统一为 Agent 可调用的同构工具接口。工具是 Agent 与真实世界交互的桥梁,而 MCP 协议则是 Anthropic 提出的标准化协议,旨在解决 LLM 与外部数据源、计算资源之间的连接问题。本章将从协议原理、工具注册、运行时编排、安全隔离四个维度,构建一个企业级的工具管理子系统。
8.1 工具驱动的范式:从"调用 API"到"编织能力"
8.1.1 工具在 Agent 架构中的地位
在 CAR 框架中,工具(Tools)属于 Runtime 层的核心组件。但工具的影响远超 Runtime 层——它们决定了 Agent 能做什么(Agency 层的决策空间)、被允许做什么(Control 层的策略边界)、以及如何做(Runtime 的执行能力)。
工具在 CAR 框架中的穿透效应:
┌─────────────────────────────────────────────────┐
│ Control 层 │
│ "Agent 被允许使用哪些工具?每个工具的调用频率?" │
│ ↓ 策略约束 │
├─────────────────────────────────────────────────┤
│ Agency 层 │
│ "Agent 在决策时可以选择哪些工具?组合策略?" │
│ ↓ 决策空间 │
├─────────────────────────────────────────────────┤
│ Runtime 层 │
│ "工具的实际注册、发现、调用、结果处理" │
│ ↓ 执行能力 │
├─────────────────────────────────────────────────┤
│ 外部世界 │
│ 数据库、文件系统、API、浏览器、IoT 设备... │
└─────────────────────────────────────────────────┘
8.1.2 工具编织 vs API 调用
传统的 API 调用是"我知道你在哪里,我来调用你"的硬编码模式。工具编织(Tool Weaving)是一种更高级的范式:
| 维度 | API 调用 | 工具编织 |
|---|---|---|
| 发现 | 硬编码端点 | 动态注册与发现 |
| 接口 | 每个 API 不同 | 统一协议(MCP) |
| 组合 | 手动编排 | 自动组合与链式调用 |
| 容错 | 调用方处理 | 框架级重试与降级 |
| 安全 | 分散管理 | 集中式策略控制 |
| 生命周期 | 无管理 | 完整的注册/注销/版本管理 |
8.1.3 MCP 协议概述
MCP(Model Context Protocol)是 Anthropic 于 2024 年提出的开放协议,旨在标准化 LLM 与外部系统的交互。它的核心设计哲学是:
让 LLM 像操作系统一样,通过标准化接口访问任意外部资源。
MCP 协议架构:
┌────────────────────────────────────────────┐
│ LLM (Claude, GPT, etc.) │
│ ↓ Tool Calls │
├────────────────────────────────────────────┤
│ MCP Client (Agent Runtime) │
│ ↓ MCP Protocol (JSON-RPC 2.0) │
├────────────────────────────────────────────┤
│ MCP Servers │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────────┐ │
│ │ 文件 │ │ 数据库│ │ 搜索 │ │ 自定义 │ │
│ │ 系统 │ │ │ │ 引擎 │ │ 服务 │ │
│ └──────┘ └──────┘ └──────┘ └──────────┘ │
└────────────────────────────────────────────┘
MCP 的三种核心原语:
1. Tools(工具):Agent 可调用的函数
2. Resources(资源):Agent 可读取的数据
3. Prompts(提示):预定义的交互模板
8.2 MCP 协议深度解析
8.2.1 协议栈
MCP 基于 JSON-RPC 2.0,支持多种传输层:
┌─────────────────────────────────────────┐
│ 应用层 │
│ Tools / Resources / Prompts │
├─────────────────────────────────────────┤
│ 协议层 │
│ JSON-RPC 2.0 │
│ - Request / Response / Notification │
│ - Error codes & handling │
├─────────────────────────────────────────┤
│ 传输层(可选) │
│ ┌──────────┐ ┌────────┐ ┌──────────┐ │
│ │ stdio │ │ SSE │ │WebSocket │ │
│ │(本地进程) │ │(HTTP) │ │ (双向) │ │
│ └──────────┘ └────────┘ └──────────┘ │
└─────────────────────────────────────────┘
8.2.2 MCP 消息格式
/**
* MCP 协议消息定义
* 基于 JSON-RPC 2.0 规范
*/
// ===== 基础类型 =====
interface JsonRpcRequest {
jsonrpc: '2.0';
id: string | number;
method: string;
params?: Record<string, any>;
}
interface JsonRpcResponse {
jsonrpc: '2.0';
id: string | number;
result?: any;
error?: JsonRpcError;
}
interface JsonRpcNotification {
jsonrpc: '2.0';
method: string;
params?: Record<string, any>;
}
interface JsonRpcError {
code: number;
message: string;
data?: any;
}
// ===== MCP 特定类型 =====
/**
* 工具定义
* MCP Server 向 Client 声明的工具接口
*/
interface MCPToolDefinition {
name: string;
description: string;
inputSchema: JSONSchema; // JSON Schema 格式的参数定义
// Harness 扩展字段(非标准 MCP,我们的平台扩展)
harness?: {
category: string; // 工具分类
riskLevel: 'low' | 'medium' | 'high';
rateLimit?: {
maxCalls: number;
windowMs: number;
};
timeout?: number; // 调用超时(毫秒)
idempotent?: boolean; // 是否幂等
requiresApproval?: boolean; // 是否需要人工审批
costPerCall?: number; // 每次调用的估算成本
};
}
/**
* 工具调用请求
*/
interface MCPToolCallRequest {
jsonrpc: '2.0';
id: string;
method: 'tools/call';
params: {
name: string;
arguments: Record<string, any>;
};
}
/**
* 工具调用响应
*/
interface MCPToolCallResponse {
jsonrpc: '2.0';
id: string;
result: {
content: MCPContent[];
isError?: boolean;
};
}
/**
* MCP 内容类型
* 支持多模态返回
*/
type MCPContent =
| { type: 'text'; text: string }
| { type: 'image'; data: string; mimeType: string }
| { type: 'resource'; resource: MCPResource };
/**
* 资源定义
*/
interface MCPResource {
uri: string;
name: string;
description?: string;
mimeType?: string;
}
/**
* 资源模板
* 支持参数化的资源 URI
*/
interface MCPResourceTemplate {
uriTemplate: string; // 例如 "file:///{path}"
name: string;
description?: string;
mimeType?: string;
}
// ===== 协议生命周期消息 =====
/**
* 初始化请求
* Client → Server: 建立连接时发送
*/
interface MCPInitializeRequest {
jsonrpc: '2.0';
id: string;
method: 'initialize';
params: {
protocolVersion: string;
capabilities: {
roots?: { listChanged?: boolean };
sampling?: {};
};
clientInfo: {
name: string;
version: string;
};
};
}
/**
* 初始化响应
* Server → Client: 返回服务器能力
*/
interface MCPInitializeResponse {
jsonrpc: '2.0';
id: string;
result: {
protocolVersion: string;
capabilities: {
tools?: { listChanged?: boolean };
resources?: { subscribe?: boolean; listChanged?: boolean };
prompts?: { listChanged?: boolean };
logging?: {};
};
serverInfo: {
name: string;
version: string;
};
};
}
8.2.3 MCP 协议的生命周期
MCP 连接生命周期:
1. 初始化(Initialize)
Client ──→ Server: initialize (protocol version, capabilities)
Server ──→ Client: initialize response (server capabilities)
Client ──→ Server: initialized notification
2. 能力发现(Discovery)
Client ──→ Server: tools/list
Server ──→ Client: available tools
Client ──→ Server: resources/list
Server ──→ Client: available resources
Client ──→ Server: prompts/list
Server ──→ Client: available prompts
3. 运行(Operation)
Client ──→ Server: tools/call (tool name, arguments)
Server ──→ Client: tool result (content, isError)
Server ──→ Client: notifications (progress, logging)
4. 关闭(Shutdown)
Client ──→ Server: close signal
双方清理资源
8.3 MCP Server 管理器:注册、发现与生命周期
8.3.1 TypeScript 实现:MCP Server 管理器
/**
* MCP Server 管理器
* 管理所有 MCP Server 的注册、发现、连接和生命周期
*/
import { EventEmitter } from 'events';
// ===== MCP Server 配置 =====
interface MCPServerConfig {
id: string;
name: string;
description?: string;
// 连接配置
transport: 'stdio' | 'sse' | 'websocket';
command?: string; // stdio: 启动命令
args?: string[]; // stdio: 命令参数
url?: string; // sse/websocket: 服务地址
// Harness 扩展
category: string;
priority: number; // 同名工具时的优先级
enabled: boolean;
// 环境配置
env?: Record<string, string>;
// 安全配置
sandbox?: {
networkAccess: boolean;
fileSystemAccess: 'none' | 'read' | 'readwrite';
allowedPaths?: string[];
};
}
// ===== MCP Server 连接状态 =====
interface MCPServerConnection {
config: MCPServerConfig;
status: 'disconnected' | 'connecting' | 'connected' | 'error';
tools: MCPToolDefinition[];
resources: MCPResource[];
prompts: MCPPrompt[];
lastError?: string;
metrics: ServerMetrics;
process?: any; // stdio 模式的子进程
transport?: MCPTransport; // 传输层实例
}
interface ServerMetrics {
totalCalls: number;
successRate: number;
avgLatencyMs: number;
lastCallTime?: Date;
errorCount: number;
}
interface MCPPrompt {
name: string;
description?: string;
arguments?: { name: string; description?: string; required?: boolean }[];
}
// ===== 传输层抽象 =====
interface MCPTransport {
send(message: JsonRpcRequest | JsonRpcNotification): Promise<void>;
onMessage(handler: (msg: JsonRpcResponse | JsonRpcNotification) => void): void;
close(): Promise<void>;
isConnected(): boolean;
}
// ===== MCP Server 管理器 =====
class MCPServerManager extends EventEmitter {
private servers: Map<string, MCPServerConnection> = new Map();
private toolRegistry: Map<string, ToolRegistration[]> = new Map();
private pendingRequests: Map<string, {
resolve: (value: any) => void;
reject: (error: any) => void;
timeout: NodeJS.Timeout;
}> = new Map();
/**
* 注册 MCP Server
*/
async registerServer(config: MCPServerConfig): Promise<void> {
if (this.servers.has(config.id)) {
throw new Error(`Server ${config.id} already registered`);
}
const connection: MCPServerConnection = {
config,
status: 'disconnected',
tools: [],
resources: [],
prompts: [],
metrics: {
totalCalls: 0,
successRate: 1.0,
avgLatencyMs: 0,
errorCount: 0,
},
};
this.servers.set(config.id, connection);
if (config.enabled) {
await this.connectServer(config.id);
}
this.emit('serverRegistered', config.id);
}
/**
* 连接到 MCP Server
*/
async connectServer(serverId: string): Promise<void> {
const connection = this.servers.get(serverId);
if (!connection) throw new Error(`Server ${serverId} not found`);
connection.status = 'connecting';
this.emit('serverConnecting', serverId);
try {
// 1. 建立传输层连接
const transport = await this.createTransport(connection.config);
connection.transport = transport;
// 2. 设置消息处理
transport.onMessage((msg) => this.handleMessage(serverId, msg));
// 3. 发送初始化请求
const initResult = await this.sendRequest(serverId, 'initialize', {
protocolVersion: '2024-11-05',
capabilities: {
roots: { listChanged: true },
sampling: {},
},
clientInfo: {
name: 'harness-platform',
version: '1.0.0',
},
});
// 4. 发送 initialized 通知
await this.sendNotification(serverId, 'notifications/initialized', {});
// 5. 发现工具、资源和提示
await this.discoverCapabilities(serverId);
connection.status = 'connected';
this.emit('serverConnected', serverId);
} catch (error: any) {
connection.status = 'error';
connection.lastError = error.message;
this.emit('serverError', serverId, error);
throw error;
}
}
/**
* 发现 Server 的能力
*/
private async discoverCapabilities(serverId: string): Promise<void> {
const connection = this.servers.get(serverId)!;
// 发现工具
try {
const toolsResult = await this.sendRequest(
serverId, 'tools/list', {}
);
connection.tools = toolsResult.tools ?? [];
// 注册到全局工具注册表
for (const tool of connection.tools) {
this.registerTool(tool, serverId);
}
} catch (error) {
// Server 可能不支持 tools
console.warn(`Server ${serverId} does not support tools`);
}
// 发现资源
try {
const resourcesResult = await this.sendRequest(
serverId, 'resources/list', {}
);
connection.resources = resourcesResult.resources ?? [];
} catch {
// 忽略
}
// 发现提示模板
try {
const promptsResult = await this.sendRequest(
serverId, 'prompts/list', {}
);
connection.prompts = promptsResult.prompts ?? [];
} catch {
// 忽略
}
}
/**
* 注册工具到全局注册表
*/
private registerTool(
tool: MCPToolDefinition,
serverId: string
): void {
const existing = this.toolRegistry.get(tool.name) ?? [];
existing.push({
tool,
serverId,
priority: this.servers.get(serverId)!.config.priority,
});
// 按优先级排序
existing.sort((a, b) => b.priority - a.priority);
this.toolRegistry.set(tool.name, existing);
}
/**
* 调用工具
* 支持自动路由到最优的 Server
*/
async callTool(
name: string,
args: Record<string, any>,
options?: ToolCallOptions
): Promise<ToolCallResult> {
const registrations = this.toolRegistry.get(name);
if (!registrations || registrations.length === 0) {
throw new Error(`Tool ${name} not found`);
}
// 选择最佳的 Server
const registration = this.selectServer(registrations, options);
const connection = this.servers.get(registration.serverId)!;
const startTime = Date.now();
try {
const result = await this.sendRequest(
registration.serverId,
'tools/call',
{ name, arguments: args },
options?.timeout ?? registration.tool.harness?.timeout ?? 30000
);
const latency = Date.now() - startTime;
this.updateMetrics(connection, true, latency);
return {
content: result.content,
isError: result.isError ?? false,
serverId: registration.serverId,
latencyMs: latency,
};
} catch (error: any) {
const latency = Date.now() - startTime;
this.updateMetrics(connection, false, latency);
// 尝试 failover 到其他 Server
if (registrations.length > 1 && options?.failover !== false) {
const alternate = registrations.find(
r => r.serverId !== registration.serverId
);
if (alternate) {
return this.callTool(name, args, {
...options,
preferredServerId: alternate.serverId,
});
}
}
throw new ToolCallError(
`Tool ${name} call failed: ${error.message}`,
{ serverId: registration.serverId, latencyMs: latency }
);
}
}
/**
* 获取所有可用工具
*/
getAvailableTools(): MCPToolDefinition[] {
const tools: MCPToolDefinition[] = [];
const seen = new Set<string>();
for (const [name, registrations] of this.toolRegistry) {
// 返回优先级最高的注册
const best = registrations[0];
if (best && !seen.has(name)) {
tools.push(best.tool);
seen.add(name);
}
}
return tools;
}
/**
* 按分类获取工具
*/
getToolsByCategory(): Map<string, MCPToolDefinition[]> {
const categories = new Map<string, MCPToolDefinition[]>();
for (const tool of this.getAvailableTools()) {
const category = tool.harness?.category ?? 'uncategorized';
const existing = categories.get(category) ?? [];
existing.push(tool);
categories.set(category, existing);
}
return categories;
}
/**
* 读取资源
*/
async readResource(
uri: string,
serverId?: string
): Promise<MCPContent[]> {
const targetServer = serverId ??
this.findServerForResource(uri);
if (!targetServer) {
throw new Error(`No server found for resource: ${uri}`);
}
const result = await this.sendRequest(
targetServer, 'resources/read', { uri }
);
return result.contents ?? [];
}
/**
* 优雅关闭所有 Server
*/
async shutdownAll(): Promise<void> {
const promises: Promise<void>[] = [];
for (const [serverId] of this.servers) {
promises.push(this.disconnectServer(serverId));
}
await Promise.allSettled(promises);
this.emit('allServersShutdown');
}
/**
* 断开单个 Server
*/
async disconnectServer(serverId: string): Promise<void> {
const connection = this.servers.get(serverId);
if (!connection) return;
// 清理工具注册
for (const tool of connection.tools) {
const registrations = this.toolRegistry.get(tool.name) ?? [];
this.toolRegistry.set(
tool.name,
registrations.filter(r => r.serverId !== serverId)
);
}
// 关闭传输层
if (connection.transport) {
await connection.transport.close();
}
// 终止进程(stdio 模式)
if (connection.process) {
connection.process.kill('SIGTERM');
}
connection.status = 'disconnected';
this.emit('serverDisconnected', serverId);
}
// ===== 内部方法 =====
private async createTransport(
config: MCPServerConfig
): Promise<MCPTransport> {
switch (config.transport) {
case 'stdio':
return this.createStdioTransport(config);
case 'sse':
return this.createSSETransport(config);
case 'websocket':
return this.createWebSocketTransport(config);
default:
throw new Error(`Unsupported transport: ${config.transport}`);
}
}
private async createStdioTransport(
config: MCPServerConfig
): Promise<MCPTransport> {
const { spawn } = await import('child_process');
const proc = spawn(config.command!, config.args ?? [], {
env: { ...process.env, ...config.env },
stdio: ['pipe', 'pipe', 'pipe'],
});
// 保存进程引用
const connection = this.servers.get(config.id)!;
connection.process = proc;
let messageHandler: ((msg: any) => void) | null = null;
let buffer = '';
proc.stdout.on('data', (data: Buffer) => {
buffer += data.toString();
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';
for (const line of lines) {
if (line.trim()) {
try {
const msg = JSON.parse(line);
messageHandler?.(msg);
} catch {
// 非 JSON 行,忽略
}
}
}
});
proc.stderr.on('data', (data: Buffer) => {
console.error(`[${config.name}] stderr:`, data.toString());
});
return {
send: async (msg) => {
const json = JSON.stringify(msg) + '\n';
proc.stdin.write(json);
},
onMessage: (handler) => {
messageHandler = handler;
},
close: async () => {
proc.kill('SIGTERM');
},
isConnected: () => !proc.killed,
};
}
private async createSSETransport(
config: MCPServerConfig
): Promise<MCPTransport> {
// SSE 传输层实现
const eventSource = new EventSource(config.url! + '/sse');
let messageHandler: ((msg: any) => void) | null = null;
eventSource.onmessage = (event) => {
try {
const msg = JSON.parse(event.data);
messageHandler?.(msg);
} catch {
// 忽略解析错误
}
};
return {
send: async (msg) => {
await fetch(config.url! + '/messages', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(msg),
});
},
onMessage: (handler) => {
messageHandler = handler;
},
close: async () => {
eventSource.close();
},
isConnected: () => eventSource.readyState === EventSource.OPEN,
};
}
private async createWebSocketTransport(
config: MCPServerConfig
): Promise<MCPTransport> {
const WebSocket = (await import('ws')).default;
const ws = new WebSocket(config.url!);
let messageHandler: ((msg: any) => void) | null = null;
await new Promise<void>((resolve, reject) => {
ws.on('open', () => resolve());
ws.on('error', (err) => reject(err));
});
ws.on('message', (data: any) => {
try {
const msg = JSON.parse(data.toString());
messageHandler?.(msg);
} catch {
// 忽略
}
});
return {
send: async (msg) => {
ws.send(JSON.stringify(msg));
},
onMessage: (handler) => {
messageHandler = handler;
},
close: async () => {
ws.close();
},
isConnected: () => ws.readyState === WebSocket.OPEN,
};
}
private async sendRequest(
serverId: string,
method: string,
params: any,
timeout = 30000
): Promise<any> {
const connection = this.servers.get(serverId);
if (!connection?.transport) {
throw new Error(`Server ${serverId} not connected`);
}
const id = `req_${Date.now()}_${Math.random().toString(36).slice(2)}`;
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingRequests.delete(id);
reject(new Error(`Request ${method} timed out`));
}, timeout);
this.pendingRequests.set(id, { resolve, reject, timeout: timer });
connection.transport!.send({
jsonrpc: '2.0',
id,
method,
params,
});
});
}
private async sendNotification(
serverId: string,
method: string,
params: any
): Promise<void> {
const connection = this.servers.get(serverId);
if (!connection?.transport) return;
await connection.transport.send({
jsonrpc: '2.0',
method,
params,
});
}
private handleMessage(
serverId: string,
msg: JsonRpcResponse | JsonRpcNotification
): void {
// 处理响应
if ('id' in msg && msg.id) {
const pending = this.pendingRequests.get(msg.id as string);
if (pending) {
clearTimeout(pending.timeout);
this.pendingRequests.delete(msg.id as string);
if ('error' in msg && msg.error) {
pending.reject(new Error(msg.error.message));
} else {
pending.resolve('result' in msg ? msg.result : undefined);
}
}
}
// 处理通知
if ('method' in msg) {
this.emit('notification', serverId, msg);
}
}
private selectServer(
registrations: ToolRegistration[],
options?: ToolCallOptions
): ToolRegistration {
if (options?.preferredServerId) {
const preferred = registrations.find(
r => r.serverId === options.preferredServerId
);
if (preferred) return preferred;
}
// 选择连接状态最好、指标最优的 Server
return registrations
.filter(r => {
const conn = this.servers.get(r.serverId);
return conn?.status === 'connected';
})
.sort((a, b) => {
const connA = this.servers.get(a.serverId)!;
const connB = this.servers.get(b.serverId)!;
// 优先选择成功率高的
const rateDiff = connB.metrics.successRate - connA.metrics.successRate;
if (Math.abs(rateDiff) > 0.05) return rateDiff;
// 其次选择延迟低的
return connA.metrics.avgLatencyMs - connB.metrics.avgLatencyMs;
})[0] ?? registrations[0];
}
private updateMetrics(
connection: MCPServerConnection,
success: boolean,
latencyMs: number
): void {
const m = connection.metrics;
m.totalCalls++;
if (!success) m.errorCount++;
// 指数加权移动平均
const alpha = 0.1;
m.avgLatencyMs = m.avgLatencyMs * (1 - alpha) + latencyMs * alpha;
m.successRate = (m.totalCalls - m.errorCount) / m.totalCalls;
m.lastCallTime = new Date();
}
private findServerForResource(uri: string): string | null {
for (const [serverId, connection] of this.servers) {
if (connection.resources.some(r =>
uri.startsWith(r.uri) || uri.match(r.uri)
)) {
return serverId;
}
}
return null;
}
}
// ===== 辅助类型 =====
interface ToolRegistration {
tool: MCPToolDefinition;
serverId: string;
priority: number;
}
interface ToolCallOptions {
timeout?: number;
failover?: boolean;
preferredServerId?: string;
}
interface ToolCallResult {
content: MCPContent[];
isError: boolean;
serverId: string;
latencyMs: number;
}
class ToolCallError extends Error {
constructor(
message: string,
public details: { serverId: string; latencyMs: number }
) {
super(message);
this.name = 'ToolCallError';
}
}
8.3.2 Python 实现:MCP Server 管理器
"""
MCP Server 管理器 - Python 实现
管理 MCP Server 的注册、发现和生命周期
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Optional
import asyncio
import json
import subprocess
import time
from datetime import datetime
from enum import Enum
class TransportType(Enum):
STDIO = "stdio"
SSE = "sse"
WEBSOCKET = "websocket"
class ServerStatus(Enum):
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
ERROR = "error"
@dataclass
class MCPServerConfig:
"""MCP Server 配置"""
id: str
name: str
transport: TransportType
category: str = "general"
priority: int = 0
enabled: bool = True
description: str = ""
command: Optional[str] = None
args: list[str] = field(default_factory=list)
url: Optional[str] = None
env: dict[str, str] = field(default_factory=dict)
sandbox: dict = field(default_factory=dict)
@dataclass
class ToolDefinition:
"""工具定义"""
name: str
description: str
input_schema: dict
server_id: str
category: str = "general"
risk_level: str = "low"
timeout: int = 30000
idempotent: bool = False
@dataclass
class ServerMetrics:
"""服务器指标"""
total_calls: int = 0
error_count: int = 0
avg_latency_ms: float = 0.0
last_call_time: Optional[datetime] = None
@property
def success_rate(self) -> float:
if self.total_calls == 0:
return 1.0
return (self.total_calls - self.error_count) / self.total_calls
@dataclass
class ServerConnection:
"""服务器连接"""
config: MCPServerConfig
status: ServerStatus = ServerStatus.DISCONNECTED
tools: list[ToolDefinition] = field(default_factory=list)
metrics: ServerMetrics = field(default_factory=ServerMetrics)
process: Optional[subprocess.Popen] = None
last_error: Optional[str] = None
class MCPServerManager:
"""MCP Server 管理器"""
def __init__(self):
self.servers: dict[str, ServerConnection] = {}
self.tool_registry: dict[str, list[dict]] = {}
self._request_counter = 0
self._pending: dict[str, asyncio.Future] = {}
self._stdin_writers: dict[str, asyncio.StreamWriter] = {}
async def register_server(self, config: MCPServerConfig) -> None:
"""注册 MCP Server"""
if config.id in self.servers:
raise ValueError(f"Server {config.id} already registered")
self.servers[config.id] = ServerConnection(config=config)
if config.enabled:
await self.connect_server(config.id)
async def connect_server(self, server_id: str) -> None:
"""连接到 MCP Server"""
conn = self.servers.get(server_id)
if not conn:
raise ValueError(f"Server {server_id} not found")
conn.status = ServerStatus.CONNECTING
try:
if conn.config.transport == TransportType.STDIO:
await self._connect_stdio(conn)
elif conn.config.transport == TransportType.SSE:
await self._connect_sse(conn)
# 初始化
init_result = await self._send_request(
server_id, "initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "harness-platform-python",
"version": "1.0.0",
},
}
)
# 通知已初始化
await self._send_notification(
server_id, "notifications/initialized", {}
)
# 发现能力
await self._discover_capabilities(server_id)
conn.status = ServerStatus.CONNECTED
except Exception as e:
conn.status = ServerStatus.ERROR
conn.last_error = str(e)
raise
async def _connect_stdio(self, conn: ServerConnection) -> None:
"""建立 stdio 传输连接"""
import os
env = {**os.environ, **conn.config.env}
process = await asyncio.create_subprocess_exec(
conn.config.command,
*conn.config.args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
conn.process = process
# 启动读取循环
asyncio.create_task(self._stdio_read_loop(conn))
async def _stdio_read_loop(self, conn: ServerConnection) -> None:
"""stdio 读取循环"""
while conn.process and conn.process.returncode is None:
try:
line = await conn.process.stdout.readline()
if not line:
break
msg = json.loads(line.decode().strip())
self._handle_message(conn.config.id, msg)
except json.JSONDecodeError:
continue
except Exception:
break
async def _connect_sse(self, conn: ServerConnection) -> None:
"""建立 SSE 传输连接"""
import aiohttp
session = aiohttp.ClientSession()
# SSE 连接实现...
async def _discover_capabilities(self, server_id: str) -> None:
"""发现服务器能力"""
conn = self.servers[server_id]
# 发现工具
try:
result = await self._send_request(
server_id, "tools/list", {}
)
for tool_data in result.get("tools", []):
tool = ToolDefinition(
name=tool_data["name"],
description=tool_data.get("description", ""),
input_schema=tool_data.get("inputSchema", {}),
server_id=server_id,
category=tool_data.get("harness", {}).get(
"category", conn.config.category
),
)
conn.tools.append(tool)
self._register_tool(tool)
except Exception:
pass
def _register_tool(self, tool: ToolDefinition) -> None:
"""注册工具到全局注册表"""
if tool.name not in self.tool_registry:
self.tool_registry[tool.name] = []
self.tool_registry[tool.name].append({
"tool": tool,
"server_id": tool.server_id,
"priority": self.servers[tool.server_id].config.priority,
})
# 按优先级排序
self.tool_registry[tool.name].sort(
key=lambda r: r["priority"], reverse=True
)
async def call_tool(
self,
name: str,
args: dict,
timeout: int = 30000,
failover: bool = True,
) -> dict:
"""调用工具"""
registrations = self.tool_registry.get(name)
if not registrations:
raise ValueError(f"Tool {name} not found")
# 选择最佳服务器
reg = self._select_server(registrations)
conn = self.servers[reg["server_id"]]
start_time = time.time()
try:
result = await self._send_request(
reg["server_id"],
"tools/call",
{"name": name, "arguments": args},
timeout=timeout,
)
latency = (time.time() - start_time) * 1000
self._update_metrics(conn, True, latency)
return {
"content": result.get("content", []),
"is_error": result.get("isError", False),
"server_id": reg["server_id"],
"latency_ms": latency,
}
except Exception as e:
latency = (time.time() - start_time) * 1000
self._update_metrics(conn, False, latency)
# 尝试 failover
if failover and len(registrations) > 1:
alt = next(
(r for r in registrations
if r["server_id"] != reg["server_id"]),
None
)
if alt:
return await self.call_tool(
name, args, timeout, failover=False
)
raise RuntimeError(
f"Tool {name} call failed: {e}"
) from e
def get_available_tools(self) -> list[ToolDefinition]:
"""获取所有可用工具"""
tools = []
seen = set()
for name, registrations in self.tool_registry.items():
if registrations and name not in seen:
tools.append(registrations[0]["tool"])
seen.add(name)
return tools
async def shutdown_all(self) -> None:
"""关闭所有服务器"""
for server_id in list(self.servers.keys()):
await self.disconnect_server(server_id)
async def disconnect_server(self, server_id: str) -> None:
"""断开服务器连接"""
conn = self.servers.get(server_id)
if not conn:
return
# 清理工具注册
for tool in conn.tools:
if tool.name in self.tool_registry:
self.tool_registry[tool.name] = [
r for r in self.tool_registry[tool.name]
if r["server_id"] != server_id
]
# 终止进程
if conn.process:
conn.process.terminate()
try:
await asyncio.wait_for(
conn.process.wait(), timeout=5
)
except asyncio.TimeoutError:
conn.process.kill()
conn.status = ServerStatus.DISCONNECTED
async def _send_request(
self,
server_id: str,
method: str,
params: dict,
timeout: int = 30000,
) -> Any:
"""发送 JSON-RPC 请求"""
conn = self.servers.get(server_id)
if not conn:
raise ValueError(f"Server {server_id} not found")
self._request_counter += 1
request_id = f"req_{self._request_counter}"
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": method,
"params": params,
}
future = asyncio.get_event_loop().create_future()
self._pending[request_id] = future
# 发送请求
if conn.config.transport == TransportType.STDIO:
if conn.process and conn.process.stdin:
data = json.dumps(request) + "\n"
conn.process.stdin.write(data.encode())
await conn.process.stdin.drain()
# 等待响应
try:
result = await asyncio.wait_for(future, timeout=timeout / 1000)
return result
except asyncio.TimeoutError:
self._pending.pop(request_id, None)
raise TimeoutError(f"Request {method} timed out")
async def _send_notification(
self,
server_id: str,
method: str,
params: dict,
) -> None:
"""发送通知(无响应)"""
conn = self.servers.get(server_id)
if not conn or not conn.process or not conn.process.stdin:
return
notification = {
"jsonrpc": "2.0",
"method": method,
"params": params,
}
data = json.dumps(notification) + "\n"
conn.process.stdin.write(data.encode())
await conn.process.stdin.drain()
def _handle_message(self, server_id: str, msg: dict) -> None:
"""处理收到的消息"""
# 响应
if "id" in msg:
request_id = msg["id"]
future = self._pending.pop(request_id, None)
if future and not future.done():
if "error" in msg:
future.set_exception(
RuntimeError(msg["error"]["message"])
)
else:
future.set_result(msg.get("result"))
# 通知
elif "method" in msg:
pass # 处理通知
def _select_server(self, registrations: list[dict]) -> dict:
"""选择最佳服务器"""
connected = [
r for r in registrations
if self.servers.get(r["server_id"])?.status == ServerStatus.CONNECTED
]
if not connected:
return registrations[0]
# 按成功率和延迟排序
def score(r):
conn = self.servers[r["server_id"]]
return (conn.metrics.success_rate, -conn.metrics.avg_latency_ms)
return max(connected, key=score)
def _update_metrics(
self,
conn: ServerConnection,
success: bool,
latency_ms: float,
) -> None:
"""更新指标"""
m = conn.metrics
m.total_calls += 1
if not success:
m.error_count += 1
alpha = 0.1
m.avg_latency_ms = m.avg_latency_ms * (1 - alpha) + latency_ms * alpha
m.last_call_time = datetime.now()
8.4 工具编排引擎:链式调用与并行执行
8.4.1 工具编排模式
现实任务通常需要多个工具协同工作。工具编排引擎支持以下模式:
1. 链式调用(Chain)
Tool A → Tool B → Tool C
前一个工具的输出作为后一个的输入
2. 并行执行(Parallel)
┌ Tool A ─┐
├ Tool B ─┤ → Merge Results
└ Tool C ─┘
3. 条件分支(Conditional)
Tool A → [条件判断] → Tool B (if true)
→ Tool C (if false)
4. 扇出-聚合(Fan-out/Fan-in)
Tool A → [生成 N 个子任务]
┌ 子任务1 → Tool B ─┐
├ 子任务2 → Tool B ─┤ → Tool C (聚合)
└ 子任务3 → Tool B ─┘
5. 管道(Pipeline)
数据流经多个转换阶段
Source → Transform → Filter → Enrich → Sink
8.4.2 TypeScript 实现:工具编排引擎
/**
* 工具编排引擎
* 支持链式、并行、条件分支等编排模式
*/
// ===== 编排定义 DSL =====
interface ToolWorkflow {
name: string;
description: string;
steps: WorkflowStep[];
errorHandling: 'stop' | 'skip' | 'fallback';
}
type WorkflowStep =
| SequentialStep
| ParallelStep
| ConditionalStep
| FanOutStep
| LoopStep;
interface SequentialStep {
type: 'sequential';
id: string;
toolName: string;
inputMapping: InputMapping;
outputMapping?: OutputMapping;
timeout?: number;
}
interface ParallelStep {
type: 'parallel';
id: string;
branches: {
toolName: string;
inputMapping: InputMapping;
}[];
mergeStrategy: 'first' | 'all' | 'majority';
timeout?: number;
}
interface ConditionalStep {
type: 'conditional';
id: string;
condition: string; // 表达式
ifTrue: WorkflowStep;
ifFalse?: WorkflowStep;
}
interface FanOutStep {
type: 'fanout';
id: string;
toolName: string;
splitExpression: string; // 如何拆分输入
maxConcurrency: number;
mergeTool?: string; // 聚合工具
}
interface LoopStep {
type: 'loop';
id: string;
condition: string;
maxIterations: number;
body: WorkflowStep;
}
// ===== 输入/输出映射 =====
type InputMapping = Record<string, string>;
// 键:工具参数名
// 值:表达式(如 "$step1.result.data" 或 "$input.query")
type OutputMapping = Record<string, string>;
// ===== 编排引擎 =====
class ToolOrchestrationEngine {
private mcpManager: MCPServerManager;
private workflowRegistry: Map<string, ToolWorkflow> = new Map();
private executionHistory: WorkflowExecution[] = [];
constructor(mcpManager: MCPServerManager) {
this.mcpManager = mcpManager;
}
/**
* 注册工作流
*/
registerWorkflow(workflow: ToolWorkflow): void {
this.workflowRegistry.set(workflow.name, workflow);
}
/**
* 执行工作流
*/
async executeWorkflow(
name: string,
input: Record<string, any>,
options?: ExecutionOptions
): Promise<WorkflowResult> {
const workflow = this.workflowRegistry.get(name);
if (!workflow) throw new Error(`Workflow ${name} not found`);
const context: ExecutionContext = {
input,
stepResults: new Map(),
variables: new Map(),
startTime: Date.now(),
logs: [],
};
try {
for (const step of workflow.steps) {
await this.executeStep(step, context, workflow.errorHandling);
}
const result: WorkflowResult = {
success: true,
output: this.collectOutput(context),
executionTimeMs: Date.now() - context.startTime,
stepsExecuted: context.stepResults.size,
logs: context.logs,
};
this.executionHistory.push({
workflow: name,
timestamp: new Date(),
result,
});
return result;
} catch (error: any) {
return {
success: false,
error: error.message,
executionTimeMs: Date.now() - context.startTime,
stepsExecuted: context.stepResults.size,
logs: context.logs,
};
}
}
/**
* 执行单个步骤
*/
private async executeStep(
step: WorkflowStep,
context: ExecutionContext,
errorHandling: string
): Promise<void> {
try {
switch (step.type) {
case 'sequential':
await this.executeSequential(step, context);
break;
case 'parallel':
await this.executeParallel(step, context);
break;
case 'conditional':
await this.executeConditional(step, context, errorHandling);
break;
case 'fanout':
await this.executeFanOut(step, context);
break;
case 'loop':
await this.executeLoop(step, context, errorHandling);
break;
}
} catch (error: any) {
context.logs.push({
stepId: step.id,
type: 'error',
message: error.message,
timestamp: new Date(),
});
switch (errorHandling) {
case 'stop':
throw error;
case 'skip':
context.logs.push({
stepId: step.id,
type: 'skip',
message: '步骤被跳过',
timestamp: new Date(),
});
break;
case 'fallback':
// 使用默认值
context.stepResults.set(step.id, {
fallback: true,
error: error.message,
});
break;
}
}
}
/**
* 执行顺序步骤
*/
private async executeSequential(
step: SequentialStep,
context: ExecutionContext
): Promise<void> {
// 解析输入映射
const args = this.resolveInputMapping(step.inputMapping, context);
context.logs.push({
stepId: step.id,
type: 'start',
message: `调用工具 ${step.toolName}`,
timestamp: new Date(),
});
// 调用工具
const result = await this.mcpManager.callTool(
step.toolName,
args,
{ timeout: step.timeout }
);
// 存储结果
context.stepResults.set(step.id, result);
// 处理输出映射
if (step.outputMapping) {
for (const [varName, expression] of Object.entries(step.outputMapping)) {
context.variables.set(varName, this.evaluate(expression, context));
}
}
context.logs.push({
stepId: step.id,
type: 'complete',
message: `工具 ${step.toolName} 完成`,
timestamp: new Date(),
});
}
/**
* 执行并行步骤
*/
private async executeParallel(
step: ParallelStep,
context: ExecutionContext
): Promise<void> {
const promises = step.branches.map(async (branch) => {
const args = this.resolveInputMapping(branch.inputMapping, context);
return this.mcpManager.callTool(branch.toolName, args, {
timeout: step.timeout,
});
});
let results: ToolCallResult[];
switch (step.mergeStrategy) {
case 'first':
results = [await Promise.any(promises)];
break;
case 'all':
results = await Promise.all(promises);
break;
case 'majority':
const settled = await Promise.allSettled(promises);
results = settled
.filter((r): r is PromiseFulfilledResult<ToolCallResult> =>
r.status === 'fulfilled'
)
.map(r => r.value);
if (results.length <= settled.length / 2) {
throw new Error('多数分支执行失败');
}
break;
}
context.stepResults.set(step.id, results!);
}
/**
* 执行条件分支
*/
private async executeConditional(
step: ConditionalStep,
context: ExecutionContext,
errorHandling: string
): Promise<void> {
const conditionResult = this.evaluate(step.condition, context);
if (conditionResult) {
await this.executeStep(step.ifTrue, context, errorHandling);
} else if (step.ifFalse) {
await this.executeStep(step.ifFalse, context, errorHandling);
}
}
/**
* 执行扇出步骤
*/
private async executeFanOut(
step: FanOutStep,
context: ExecutionContext
): Promise<void> {
// 拆分输入
const items = this.evaluate(step.splitExpression, context) as any[];
// 并发控制
const semaphore = new Semaphore(step.maxConcurrency);
const results = await Promise.all(
items.map(async (item) => {
await semaphore.acquire();
try {
return await this.mcpManager.callTool(step.toolName, {
item,
index: items.indexOf(item),
});
} finally {
semaphore.release();
}
})
);
// 聚合结果
if (step.mergeTool) {
const merged = await this.mcpManager.callTool(step.mergeTool, {
results: results.map(r => r.content),
});
context.stepResults.set(step.id, merged);
} else {
context.stepResults.set(step.id, results);
}
}
/**
* 执行循环步骤
*/
private async executeLoop(
step: LoopStep,
context: ExecutionContext,
errorHandling: string
): Promise<void> {
const loopResults: any[] = [];
let iteration = 0;
while (iteration < step.maxIterations) {
const condition = this.evaluate(step.condition, {
...context,
variables: new Map([
...context.variables,
['iteration', iteration],
['loopResults', loopResults],
]),
});
if (!condition) break;
await this.executeStep(step.body, context, errorHandling);
loopResults.push(context.stepResults.get(step.body.id));
iteration++;
}
context.stepResults.set(step.id, loopResults);
}
/**
* 解析输入映射
*/
private resolveInputMapping(
mapping: InputMapping,
context: ExecutionContext
): Record<string, any> {
const args: Record<string, any> = {};
for (const [paramName, expression] of Object.entries(mapping)) {
args[paramName] = this.evaluate(expression, context);
}
return args;
}
/**
* 表达式求值
* 支持简单的路径表达式和变量引用
*/
private evaluate(expression: string, context: ExecutionContext): any {
// $input.xxx → 从输入中取值
if (expression.startsWith('$input.')) {
const path = expression.replace('$input.', '');
return this.getNestedValue(context.input, path);
}
// $stepN.result.xxx → 从步骤结果中取值
if (expression.startsWith('$')) {
const match = expression.match(/^\$([^.]+)\.result\.?(.*)$/);
if (match) {
const stepId = match[1];
const path = match[2];
const stepResult = context.stepResults.get(stepId);
if (path) {
return this.getNestedValue(stepResult, path);
}
return stepResult;
}
}
// $var.xxx → 从变量中取值
if (expression.startsWith('$var.')) {
const varName = expression.replace('$var.', '');
return context.variables.get(varName);
}
// 字面值
try {
return JSON.parse(expression);
} catch {
return expression;
}
}
private getNestedValue(obj: any, path: string): any {
return path.split('.').reduce((current, key) => {
if (current === undefined || current === null) return undefined;
return current[key];
}, obj);
}
private collectOutput(context: ExecutionContext): any {
// 收集最后一步的结果作为工作流输出
const lastResult = Array.from(context.stepResults.values()).pop();
return lastResult;
}
}
// ===== 信号量 =====
class Semaphore {
private count: number;
private queue: (() => void)[] = [];
constructor(private maxConcurrency: number) {
this.count = maxConcurrency;
}
async acquire(): Promise<void> {
if (this.count > 0) {
this.count--;
return;
}
return new Promise(resolve => {
this.queue.push(resolve);
});
}
release(): void {
if (this.queue.length > 0) {
const next = this.queue.shift()!;
next();
} else {
this.count++;
}
}
}
// ===== 辅助类型 =====
interface ExecutionContext {
input: Record<string, any>;
stepResults: Map<string, any>;
variables: Map<string, any>;
startTime: number;
logs: ExecutionLog[];
}
interface ExecutionLog {
stepId: string;
type: 'start' | 'complete' | 'error' | 'skip';
message: string;
timestamp: Date;
}
interface WorkflowResult {
success: boolean;
output?: any;
error?: string;
executionTimeMs: number;
stepsExecuted: number;
logs: ExecutionLog[];
}
interface WorkflowExecution {
workflow: string;
timestamp: Date;
result: WorkflowResult;
}
interface ExecutionOptions {
timeout?: number;
dryRun?: boolean;
}
8.5 内置 MCP Server 实现
8.5.1 文件系统 MCP Server
/**
* 文件系统 MCP Server
* 提供安全的文件读写能力
*/
import * as fs from 'fs/promises';
import * as path from 'path';
class FileSystemMCPServer {
private allowedPaths: string[];
private maxFileSize: number;
constructor(options: {
allowedPaths: string[];
maxFileSize?: number;
}) {
this.allowedPaths = options.allowedPaths;
this.maxFileSize = options.maxFileSize ?? 10 * 1024 * 1024; // 10MB
}
/**
* 工具清单
*/
getTools(): MCPToolDefinition[] {
return [
{
name: 'fs_read_file',
description: '读取文件内容',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string', description: '文件路径' },
encoding: {
type: 'string',
enum: ['utf-8', 'base64'],
default: 'utf-8',
},
},
required: ['path'],
},
harness: {
category: 'filesystem',
riskLevel: 'low',
idempotent: true,
},
},
{
name: 'fs_write_file',
description: '写入文件内容',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string', description: '文件路径' },
content: { type: 'string', description: '文件内容' },
createDirs: {
type: 'boolean',
default: true,
description: '是否自动创建父目录',
},
},
required: ['path', 'content'],
},
harness: {
category: 'filesystem',
riskLevel: 'medium',
requiresApproval: true,
},
},
{
name: 'fs_list_directory',
description: '列出目录内容',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string', description: '目录路径' },
recursive: {
type: 'boolean',
default: false,
description: '是否递归列出',
},
maxDepth: {
type: 'number',
default: 3,
description: '最大递归深度',
},
},
required: ['path'],
},
harness: {
category: 'filesystem',
riskLevel: 'low',
idempotent: true,
},
},
{
name: 'fs_search_files',
description: '搜索文件(glob 模式)',
inputSchema: {
type: 'object',
properties: {
pattern: {
type: 'string',
description: 'Glob 模式(如 **/*.ts)',
},
rootPath: {
type: 'string',
description: '搜索根路径',
},
},
required: ['pattern'],
},
harness: {
category: 'filesystem',
riskLevel: 'low',
idempotent: true,
timeout: 30000,
},
},
{
name: 'fs_get_file_info',
description: '获取文件/目录信息',
inputSchema: {
type: 'object',
properties: {
path: { type: 'string', description: '路径' },
},
required: ['path'],
},
harness: {
category: 'filesystem',
riskLevel: 'low',
idempotent: true,
},
},
];
}
/**
* 处理工具调用
*/
async handleToolCall(
name: string,
args: Record<string, any>
): Promise<MCPContent[]> {
switch (name) {
case 'fs_read_file':
return this.readFile(args);
case 'fs_write_file':
return this.writeFile(args);
case 'fs_list_directory':
return this.listDirectory(args);
case 'fs_search_files':
return this.searchFiles(args);
case 'fs_get_file_info':
return this.getFileInfo(args);
default:
throw new Error(`Unknown tool: ${name}`);
}
}
private async readFile(args: {
path: string;
encoding?: string;
}): Promise<MCPContent[]> {
this.validatePath(args.path);
const stat = await fs.stat(args.path);
if (stat.size > this.maxFileSize) {
throw new Error(
`File too large: ${stat.size} bytes (max: ${this.maxFileSize})`
);
}
const content = await fs.readFile(args.path, {
encoding: (args.encoding ?? 'utf-8') as BufferEncoding,
});
return [{ type: 'text', text: content }];
}
private async writeFile(args: {
path: string;
content: string;
createDirs?: boolean;
}): Promise<MCPContent[]> {
this.validatePath(args.path);
if (args.createDirs !== false) {
const dir = path.dirname(args.path);
await fs.mkdir(dir, { recursive: true });
}
await fs.writeFile(args.path, args.content, 'utf-8');
return [{
type: 'text',
text: `文件写入成功: ${args.path} (${args.content.length} 字节)`,
}];
}
private async listDirectory(args: {
path: string;
recursive?: boolean;
maxDepth?: number;
}): Promise<MCPContent[]> {
this.validatePath(args.path);
const entries = await this.listDirRecursive(
args.path,
args.recursive ?? false,
args.maxDepth ?? 3,
0
);
return [{
type: 'text',
text: JSON.stringify(entries, null, 2),
}];
}
private async listDirRecursive(
dirPath: string,
recursive: boolean,
maxDepth: number,
currentDepth: number
): Promise<any[]> {
const entries = await fs.readdir(dirPath, { withFileTypes: true });
const results: any[] = [];
for (const entry of entries) {
const fullPath = path.join(dirPath, entry.name);
const item: any = {
name: entry.name,
type: entry.isDirectory() ? 'directory' : 'file',
};
if (entry.isFile()) {
const stat = await fs.stat(fullPath);
item.size = stat.size;
item.modified = stat.mtime.toISOString();
}
if (
recursive && entry.isDirectory() &&
currentDepth < maxDepth
) {
item.children = await this.listDirRecursive(
fullPath, recursive, maxDepth, currentDepth + 1
);
}
results.push(item);
}
return results;
}
private async searchFiles(args: {
pattern: string;
rootPath?: string;
}): Promise<MCPContent[]> {
const root = args.rootPath ?? this.allowedPaths[0];
this.validatePath(root);
const { glob } = await import('glob');
const files = await glob(args.pattern, {
cwd: root,
absolute: true,
nodir: true,
});
return [{
type: 'text',
text: JSON.stringify(files.slice(0, 100), null, 2),
}];
}
private async getFileInfo(args: {
path: string;
}): Promise<MCPContent[]> {
this.validatePath(args.path);
const stat = await fs.stat(args.path);
const info = {
path: args.path,
type: stat.isDirectory() ? 'directory' :
stat.isFile() ? 'file' : 'other',
size: stat.size,
created: stat.birthtime.toISOString(),
modified: stat.mtime.toISOString(),
accessed: stat.atime.toISOString(),
permissions: {
readable: stat.isFile() ? true : true,
writable: (stat.mode & 0o200) !== 0,
executable: (stat.mode & 0o100) !== 0,
},
};
return [{ type: 'text', text: JSON.stringify(info, null, 2) }];
}
private validatePath(filePath: string): void {
const resolved = path.resolve(filePath);
const isAllowed = this.allowedPaths.some(allowed =>
resolved.startsWith(path.resolve(allowed))
);
if (!isAllowed) {
throw new Error(
`Path not allowed: ${filePath}. Allowed paths: ${this.allowedPaths.join(', ')}`
);
}
}
}
8.5.2 数据库查询 MCP Server
/**
* 数据库查询 MCP Server
* 提供安全的只读数据库查询能力
*/
class DatabaseMCPServer {
private pools: Map<string, any> = new Map();
private queryTimeout: number;
private maxRows: number;
constructor(options: {
connections: DatabaseConnectionConfig[];
queryTimeout?: number;
maxRows?: number;
}) {
this.queryTimeout = options.queryTimeout ?? 30000;
this.maxRows = options.maxRows ?? 1000;
// 初始化连接池
for (const conn of options.connections) {
this.pools.set(conn.name, this.createPool(conn));
}
}
getTools(): MCPToolDefinition[] {
return [
{
name: 'db_query',
description: '执行只读 SQL 查询(SELECT)',
inputSchema: {
type: 'object',
properties: {
connection: {
type: 'string',
description: '数据库连接名称',
},
sql: {
type: 'string',
description: 'SQL 查询语句(仅支持 SELECT)',
},
params: {
type: 'array',
items: { type: 'string' },
description: '参数化查询的参数',
},
maxRows: {
type: 'number',
description: `最大返回行数(默认 ${this.maxRows})`,
},
},
required: ['connection', 'sql'],
},
harness: {
category: 'database',
riskLevel: 'low',
idempotent: true,
timeout: this.queryTimeout,
},
},
{
name: 'db_describe_table',
description: '获取表结构信息',
inputSchema: {
type: 'object',
properties: {
connection: { type: 'string' },
tableName: { type: 'string' },
},
required: ['connection', 'tableName'],
},
harness: {
category: 'database',
riskLevel: 'low',
idempotent: true,
},
},
{
name: 'db_list_tables',
description: '列出数据库中的所有表',
inputSchema: {
type: 'object',
properties: {
connection: { type: 'string' },
},
required: ['connection'],
},
harness: {
category: 'database',
riskLevel: 'low',
idempotent: true,
},
},
];
}
async handleToolCall(
name: string,
args: Record<string, any>
): Promise<MCPContent[]> {
switch (name) {
case 'db_query':
return this.executeQuery(args);
case 'db_describe_table':
return this.describeTable(args);
case 'db_list_tables':
return this.listTables(args);
default:
throw new Error(`Unknown tool: ${name}`);
}
}
private async executeQuery(args: {
connection: string;
sql: string;
params?: any[];
maxRows?: number;
}): Promise<MCPContent[]> {
// 安全检查:仅允许 SELECT
const normalizedSql = args.sql.trim().toUpperCase();
if (!normalizedSql.startsWith('SELECT')) {
throw new Error('仅支持 SELECT 查询。写操作需要人工审批。');
}
// 禁止危险操作
const forbidden = ['DROP', 'DELETE', 'INSERT', 'UPDATE', 'ALTER', 'TRUNCATE'];
for (const keyword of forbidden) {
if (normalizedSql.includes(keyword)) {
throw new Error(`禁止使用 ${keyword} 语句`);
}
}
const pool = this.pools.get(args.connection);
if (!pool) {
throw new Error(`数据库连接不存在: ${args.connection}`);
}
const maxRows = Math.min(args.maxRows ?? this.maxRows, this.maxRows);
// 添加 LIMIT
let sql = args.sql;
if (!normalizedSql.includes('LIMIT')) {
sql += ` LIMIT ${maxRows}`;
}
const startTime = Date.now();
const result = await pool.query(sql, args.params ?? []);
const duration = Date.now() - startTime;
const response = {
rows: result.rows,
rowCount: result.rows.length,
truncated: result.rows.length >= maxRows,
durationMs: duration,
columns: result.fields?.map((f: any) => ({
name: f.name,
type: f.dataTypeID,
})),
};
return [{ type: 'text', text: JSON.stringify(response, null, 2) }];
}
private async describeTable(args: {
connection: string;
tableName: string;
}): Promise<MCPContent[]> {
const pool = this.pools.get(args.connection);
if (!pool) throw new Error(`连接不存在: ${args.connection}`);
const result = await pool.query(
`SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = $1
ORDER BY ordinal_position`,
[args.tableName]
);
return [{ type: 'text', text: JSON.stringify(result.rows, null, 2) }];
}
private async listTables(args: {
connection: string;
}): Promise<MCPContent[]> {
const pool = this.pools.get(args.connection);
if (!pool) throw new Error(`连接不存在: ${args.connection}`);
const result = await pool.query(
`SELECT table_name, table_type
FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name`
);
return [{ type: 'text', text: JSON.stringify(result.rows, null, 2) }];
}
private createPool(config: DatabaseConnectionConfig): any {
// 实际的连接池创建逻辑
return {
query: async (sql: string, params: any[]) => ({
rows: [],
fields: [],
}),
};
}
}
interface DatabaseConnectionConfig {
name: string;
type: 'postgres' | 'mysql' | 'sqlite';
host?: string;
port?: number;
database: string;
username?: string;
password?: string;
ssl?: boolean;
}
8.6 工具安全与沙箱
8.6.1 工具安全模型
工具安全控制层次:
┌──────────────────────────────────────────────┐
│ Layer 1: 注册时审核 │
│ - 工具 Schema 验证 │
│ - 风险等级标注 │
│ - 权限需求声明 │
├──────────────────────────────────────────────┤
│ Layer 2: 调用时校验 │
│ - Control 层策略检查 │
│ - 频率限制(Rate Limiting) │
│ - 参数验证(JSON Schema) │
├──────────────────────────────────────────────┤
│ Layer 3: 执行时隔离 │
│ - 进程级沙箱(stdio 模式) │
│ - 网络访问控制 │
│ - 文件系统访问范围 │
├──────────────────────────────────────────────┤
│ Layer 4: 结果审计 │
│ - 输出内容过滤(PII、密钥检测) │
│ - 审计日志记录 │
│ - 异常行为检测 │
└──────────────────────────────────────────────┘
8.6.2 工具调用审批流程
/**
* 工具调用审批引擎
* 对高风险工具实施审批流程
*/
class ToolApprovalEngine {
private pendingApprovals: Map<string, PendingApproval> = new Map();
private approvedPatterns: ApprovalPattern[] = [];
/**
* 判断是否需要审批
*/
requiresApproval(tool: MCPToolDefinition, args: Record<string, any>): boolean {
// 1. 工具声明需要审批
if (tool.harness?.requiresApproval) return true;
// 2. 高风险工具
if (tool.harness?.riskLevel === 'high') return true;
// 3. 敏感参数
const sensitivePatterns = [
/password/i, /secret/i, /token/i, /key/i,
/delete/i, /drop/i, /destroy/i,
];
for (const [key, value] of Object.entries(args)) {
if (sensitivePatterns.some(p => p.test(key))) return true;
if (typeof value === 'string' && sensitivePatterns.some(p => p.test(value))) {
return true;
}
}
// 4. 匹配已批准的规则
for (const pattern of this.approvedPatterns) {
if (this.matchesPattern(tool.name, args, pattern)) {
return false; // 已预批准
}
}
return false;
}
/**
* 创建审批请求
*/
async requestApproval(
tool: MCPToolDefinition,
args: Record<string, any>,
context: ApprovalContext
): Promise<ApprovalRequest> {
const request: ApprovalRequest = {
id: `approval_${Date.now()}_${Math.random().toString(36).slice(2)}`,
toolName: tool.name,
toolDescription: tool.description,
arguments: args,
riskLevel: tool.harness?.riskLevel ?? 'medium',
requestedBy: context.agentId,
requestedAt: new Date(),
status: 'pending',
reason: this.generateApprovalReason(tool, args),
};
this.pendingApprovals.set(request.id, {
request,
resolve: null as any,
timeout: setTimeout(() => {
this.handleTimeout(request.id);
}, context.timeout ?? 300000), // 默认 5 分钟超时
});
// 发送审批通知
context.notifyApprovers(request);
return request;
}
/**
* 批准
*/
approve(approvalId: string, approver: string, notes?: string): void {
const pending = this.pendingApprovals.get(approvalId);
if (!pending) return;
clearTimeout(pending.timeout);
pending.request.status = 'approved';
pending.request.approvedBy = approver;
pending.request.approvedAt = new Date();
pending.request.notes = notes;
pending.resolve?.({ approved: true });
this.pendingApprovals.delete(approvalId);
}
/**
* 拒绝
*/
reject(
approvalId: string,
rejector: string,
reason: string
): void {
const pending = this.pendingApprovals.get(approvalId);
if (!pending) return;
clearTimeout(pending.timeout);
pending.request.status = 'rejected';
pending.request.rejectedBy = rejector;
pending.request.rejectedAt = new Date();
pending.request.rejectionReason = reason;
pending.resolve?.({ approved: false, reason });
this.pendingApprovals.delete(approvalId);
}
/**
* 等待审批结果
*/
waitForApproval(approvalId: string): Promise<ApprovalDecision> {
const pending = this.pendingApprovals.get(approvalId);
if (!pending) {
return Promise.resolve({ approved: false, reason: '审批请求不存在' });
}
return new Promise((resolve) => {
pending.resolve = resolve;
});
}
private generateApprovalReason(
tool: MCPToolDefinition,
args: Record<string, any>
): string {
const reasons: string[] = [];
if (tool.harness?.riskLevel === 'high') {
reasons.push('高风险工具');
}
if (tool.harness?.requiresApproval) {
reasons.push('工具声明需要审批');
}
if (tool.name.includes('delete') || tool.name.includes('drop')) {
reasons.push('涉及数据删除操作');
}
return reasons.join('; ') || '常规审批';
}
private matchesPattern(
toolName: string,
args: Record<string, any>,
pattern: ApprovalPattern
): boolean {
return pattern.toolPattern.test(toolName);
}
private handleTimeout(approvalId: string): void {
const pending = this.pendingApprovals.get(approvalId);
if (!pending) return;
pending.request.status = 'timeout';
pending.resolve?.({ approved: false, reason: '审批超时' });
this.pendingApprovals.delete(approvalId);
}
}
// ===== 类型定义 =====
interface PendingApproval {
request: ApprovalRequest;
resolve: (decision: ApprovalDecision) => void;
timeout: NodeJS.Timeout;
}
interface ApprovalRequest {
id: string;
toolName: string;
toolDescription: string;
arguments: Record<string, any>;
riskLevel: string;
requestedBy: string;
requestedAt: Date;
status: 'pending' | 'approved' | 'rejected' | 'timeout';
reason: string;
approvedBy?: string;
approvedAt?: Date;
rejectedBy?: string;
rejectedAt?: Date;
rejectionReason?: string;
notes?: string;
}
interface ApprovalDecision {
approved: boolean;
reason?: string;
}
interface ApprovalContext {
agentId: string;
timeout?: number;
notifyApprovers: (request: ApprovalRequest) => void;
}
interface ApprovalPattern {
toolPattern: RegExp;
argumentConstraints?: Record<string, any>;
approvedBy: string;
expiresAt: Date;
}
8.7 工具指标与可观测性
8.7.1 工具调用追踪
/**
* 工具调用追踪器
* 记录工具调用的完整链路
*/
class ToolCallTracer {
private traces: TraceRecord[] = [];
private activeSpans: Map<string, Span> = new Map();
/**
* 开始一个追踪跨度
*/
startSpan(
toolName: string,
args: Record<string, any>,
parentSpanId?: string
): Span {
const span: Span = {
id: `span_${Date.now()}_${Math.random().toString(36).slice(2)}`,
parentSpanId,
toolName,
args: this.sanitizeArgs(args),
startTime: Date.now(),
endTime: null,
status: 'active',
events: [],
};
this.activeSpans.set(span.id, span);
return span;
}
/**
* 结束跨度
*/
endSpan(
spanId: string,
result?: any,
error?: string
): void {
const span = this.activeSpans.get(spanId);
if (!span) return;
span.endTime = Date.now();
span.status = error ? 'error' : 'success';
span.result = this.sanitizeResult(result);
span.error = error;
span.durationMs = span.endTime - span.startTime;
this.activeSpans.delete(spanId);
// 如果没有父跨度,记录完整 trace
if (!span.parentSpanId) {
this.traces.push({
id: span.id,
rootSpan: span,
timestamp: new Date(span.startTime),
});
}
}
/**
* 获取工具调用统计
*/
getStats(toolName?: string): ToolCallStats {
const relevantTraces = toolName
? this.traces.filter(t => t.rootSpan.toolName === toolName)
: this.traces;
const durations = relevantTraces
.filter(t => t.rootSpan.durationMs)
.map(t => t.rootSpan.durationMs!);
const errors = relevantTraces.filter(
t => t.rootSpan.status === 'error'
);
return {
totalCalls: relevantTraces.length,
successRate: relevantTraces.length > 0
? (relevantTraces.length - errors.length) / relevantTraces.length
: 1.0,
avgDurationMs: durations.length > 0
? durations.reduce((a, b) => a + b, 0) / durations.length
: 0,
p50DurationMs: this.percentile(durations, 50),
p95DurationMs: this.percentile(durations, 95),
p99DurationMs: this.percentile(durations, 99),
errorCount: errors.length,
topErrors: this.getTopErrors(errors),
};
}
private sanitizeArgs(args: Record<string, any>): Record<string, any> {
const sanitized: Record<string, any> = {};
const sensitiveKeys = ['password', 'secret', 'token', 'apiKey', 'authorization'];
for (const [key, value] of Object.entries(args)) {
if (sensitiveKeys.some(sk => key.toLowerCase().includes(sk.toLowerCase()))) {
sanitized[key] = '[REDACTED]';
} else if (typeof value === 'string' && value.length > 200) {
sanitized[key] = value.slice(0, 200) + '...[truncated]';
} else {
sanitized[key] = value;
}
}
return sanitized;
}
private sanitizeResult(result: any): any {
if (typeof result === 'string' && result.length > 500) {
return result.slice(0, 500) + '...[truncated]';
}
return result;
}
private percentile(sorted: number[], p: number): number {
if (sorted.length === 0) return 0;
const index = Math.ceil(p / 100 * sorted.length) - 1;
return sorted[Math.max(0, index)];
}
private getTopErrors(errors: TraceRecord[]): { message: string; count: number }[] {
const counts = new Map<string, number>();
for (const error of errors) {
const msg = error.rootSpan.error ?? 'unknown';
counts.set(msg, (counts.get(msg) ?? 0) + 1);
}
return Array.from(counts.entries())
.map(([message, count]) => ({ message, count }))
.sort((a, b) => b.count - a.count)
.slice(0, 5);
}
}
// ===== 类型定义 =====
interface Span {
id: string;
parentSpanId?: string;
toolName: string;
args: Record<string, any>;
startTime: number;
endTime: number | null;
status: 'active' | 'success' | 'error';
durationMs?: number;
result?: any;
error?: string;
events: SpanEvent[];
}
interface SpanEvent {
name: string;
timestamp: number;
attributes?: Record<string, any>;
}
interface TraceRecord {
id: string;
rootSpan: Span;
timestamp: Date;
}
interface ToolCallStats {
totalCalls: number;
successRate: number;
avgDurationMs: number;
p50DurationMs: number;
p95DurationMs: number;
p99DurationMs: number;
errorCount: number;
topErrors: { message: string; count: number }[];
}
8.8 本章小结
本章构建了 Harness 平台的工具驱动架构,核心组件包括:
| 组件 | 职责 | 关键设计 |
|---|---|---|
| MCP Server 管理器 | 注册、发现、连接管理 | 多传输层(stdio/SSE/WS)+ 自动 failover |
| 工具编排引擎 | 链式/并行/条件编排 | DSL 定义 + 表达式映射 + 并发控制 |
| 文件系统 MCP Server | 安全的文件操作 | 路径白名单 + 大小限制 + 权限检查 |
| 数据库 MCP Server | 安全的查询能力 | 仅 SELECT + 禁止危险语句 + 行数限制 |
| 审批引擎 | 高风险工具审批 | 多级审批 + 超时处理 + 预批准模式 |
| 追踪器 | 调用链可观测性 | Span 树 + 敏感信息脱敏 + 百分位统计 |
工具层的核心洞见:
- MCP 是 Agent 的 POSIX:就像操作系统通过标准接口抽象硬件,MCP 通过标准协议抽象外部系统
- 工具编织 ≠ API 调用:编织意味着动态发现、自动组合、统一治理
- 安全是分层的:从注册审核到执行隔离,每一层都是防御纵深的一部分
- 可观测性不是可选的:没有追踪的工具调用就是黑盒,黑盒无法治理
在下一章中,我们将深入语义记忆系统——Agent 的"长期记忆"与"工作记忆"如何协同工作。
参考资料
- Anthropic. (2024). “Model Context Protocol Specification.” https://modelcontextprotocol.io
- JSON-RPC 2.0 Specification. https://www.jsonrpc.org/specification
- Schlabach, K. (2024). “Building Effective Agents.” Anthropic Research Blog.
- Richardson, C. (2018). “Microservices Patterns.” Manning Publications. — Saga 模式参考
更多推荐
所有评论(0)