MCP 协议深度解析:Java 实现 JSON-RPC 传输与工具注册
·
一、MCP 解决了什么问题?
在 AI Agent 开发中,一个核心痛点是:LLM 如何安全、标准化地调用外部工具?
每个框架都有自己的工具注册方式 —— LangChain 用 @tool 装饰器,Spring AI 用 @Tool 注解,Semantic Kernel 用 KernelFunction。当你需要让 Claude Desktop、Cursor、以及自研 Agent 共享同一套工具时,适配成本极高。
MCP (Model Context Protocol) 就是 Anthropic 为这个问题给出的答案:一套基于 JSON-RPC 2.0 的标准化协议,定义了 AI 应用与工具服务器之间的通信规范。
| 问题 | MCP 之前的做法 | MCP 的做法 |
|---|---|---|
| 工具注册 | 每个框架自己定义 DSL | 统一的 tools/list RPC |
| 工具调用 | 框架特有的函数签名 | 统一的 tools/call RPC |
| 传输层 | HTTP/gRPC/in-process 各自实现 | stdio / HTTP SSE 两种标准 |
| 资源暴露 | 无标准,各自实现 | resources/list + resources/read |
| 提示词模板 | 各框架独立 | prompts/list + prompts/get |
二、MCP 协议的核心概念
2.1 架构角色
┌─────────────┐ JSON-RPC 2.0 ┌─────────────────┐
│ MCP Client │ ◄──────────────────► │ MCP Server │
│ (AI 应用) │ stdio / HTTP │ (工具提供方) │
└─────────────┘ └─────────────────┘
- Client(Host):AI 应用,如 Claude Desktop、自研 Agent
- Server:工具提供方,暴露 Tools / Resources / Prompts 三类能力
- Transport:stdio(子进程通信)或 HTTP SSE(网络通信)
2.2 生命周期
初始化阶段: 运行阶段: 关闭阶段:
initialize ──────► tools/list ──────► shutdown
│ tools/call │
▼ resources/read ▼
initialized prompts/get closed
Client 必须先发 initialize 请求协商协议版本和能力,Server 响应后才进入就绪状态。
三、Java 实现:从零构建 MCP Server
下面我们写一个完整的、可运行的 MCP Server,暴露一个天气查询工具。不依赖任何 MCP SDK,从 JSON-RPC 到 stdio 传输全部手写。
3.1 项目结构
mcp-server-java/
├── pom.xml
└── src/main/java/com/example/mcp/
├── McpServer.java // 主入口
├── protocol/
│ ├── JsonRpcMessage.java // JSON-RPC 消息模型
│ ├── Request.java
│ ├── Response.java
│ └── ErrorCodes.java
├── transport/
│ └── StdioTransport.java // stdio 传输层
├── handler/
│ └── MessageHandler.java // 消息路由
└── tools/
└── WeatherTool.java // 业务工具
3.2 JSON-RPC 2.0 消息模型
// protocol/JsonRpcMessage.java
package com.example.mcp.protocol;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class JsonRpcMessage {
@JsonProperty("jsonrpc")
private final String jsonrpc = "2.0";
private String id; // 请求ID,通知类消息可为 null
private String method; // 方法名,如 "tools/list"
private JsonNode params; // 方法参数
private JsonNode result; // 成功响应
private JsonNode error; // 错误响应
// 工厂方法:创建请求
public static JsonRpcMessage request(String id, String method, JsonNode params) {
JsonRpcMessage msg = new JsonRpcMessage();
msg.id = id;
msg.method = method;
msg.params = params;
return msg;
}
// 工厂方法:创建成功响应
public static JsonRpcMessage success(String id, JsonNode result) {
JsonRpcMessage msg = new JsonRpcMessage();
msg.id = id;
msg.result = result;
return msg;
}
// 工厂方法:创建错误响应
public static JsonRpcMessage error(String id, int code, String message) {
JsonRpcMessage msg = new JsonRpcMessage();
msg.id = id;
msg.error = new com.fasterxml.jackson.databind.node.ObjectNode(
com.fasterxml.jackson.databind.node.JsonNodeFactory.instance
).put("code", code).put("message", message);
return msg;
}
// getters...
public String getJsonrpc() { return jsonrpc; }
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getMethod() { return method; }
public void setMethod(String method) { this.method = method; }
public JsonNode getParams() { return params; }
public void setParams(JsonNode params) { this.params = params; }
public JsonNode getResult() { return result; }
public void setResult(JsonNode result) { this.result = result; }
public JsonNode getError() { return error; }
public void setError(JsonNode error) { this.error = error; }
}
// protocol/ErrorCodes.java
package com.example.mcp.protocol;
public final class ErrorCodes {
// JSON-RPC 标准错误码
public static final int PARSE_ERROR = -32700;
public static final int INVALID_REQUEST = -32600;
public static final int METHOD_NOT_FOUND = -32601;
public static final int INVALID_PARAMS = -32602;
public static final int INTERNAL_ERROR = -32603;
// MCP 自定义错误码(-32000 到 -32099 为服务器自定义范围)
public static final int SERVER_NOT_INITIALIZED = -32002;
public static final int UNKNOWN_CAPABILITY = -32003;
private ErrorCodes() {}
}
3.3 stdio 传输层
这是最关键的部分 —— MCP 的 stdio 模式要求 Server 通过标准输入输出与 Client 通信,不能输出任何日志到 stdout,所有日志必须走 stderr。
// transport/StdioTransport.java
package com.example.mcp.transport;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.example.mcp.protocol.JsonRpcMessage;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
public class StdioTransport implements AutoCloseable {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String DELIMITER = "\n"; // MCP 用换行符分隔消息
private final BufferedWriter writer;
private final BufferedReader reader;
private final Thread readThread;
private final BlockingQueue<JsonRpcMessage> inboundQueue;
private volatile boolean running = true;
public StdioTransport() {
// stdin/stdout 用于 JSON-RPC 通信
this.reader = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8));
this.writer = new BufferedWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8));
this.inboundQueue = new LinkedBlockingQueue<>();
// 独立线程读 stdin,避免阻塞主线程
this.readThread = new Thread(this::readLoop, "mcp-stdio-reader");
this.readThread.setDaemon(true);
}
public void start() {
readThread.start();
// 将日志输出到 stderr,不污染 stdout 的 JSON-RPC 通道
System.err.println("[MCP Server] stdio transport started");
}
/** 发送 JSON-RPC 消息到 stdout */
public synchronized void send(JsonRpcMessage message) throws IOException {
String json = MAPPER.writeValueAsString(message);
writer.write(json);
writer.write(DELIMITER);
writer.flush();
}
/** 从入站队列取消息(阻塞) */
public JsonRpcMessage receive() throws InterruptedException {
return inboundQueue.take();
}
/** 读取 stdin 循环 */
private void readLoop() {
try {
String line;
while (running && (line = reader.readLine()) != null) {
if (line.isEmpty()) continue;
try {
JsonRpcMessage msg = MAPPER.readValue(line, JsonRpcMessage.class);
inboundQueue.put(msg);
} catch (Exception e) {
System.err.println("[MCP Server] parse error: " + e.getMessage());
}
}
} catch (IOException e) {
if (running) {
System.err.println("[MCP Server] read error: " + e.getMessage());
}
}
}
@Override
public void close() {
running = false;
try { reader.close(); } catch (IOException ignored) {}
try { writer.close(); } catch (IOException ignored) {}
}
}
3.4 消息处理与路由
// handler/MessageHandler.java
package com.example.mcp.handler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.example.mcp.protocol.ErrorCodes;
import com.example.mcp.protocol.JsonRpcMessage;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
public class MessageHandler {
private static final ObjectMapper MAPPER = new ObjectMapper();
private final Map<String, BiFunction<String, JsonNode, JsonRpcMessage>> handlers = new HashMap<>();
private boolean initialized = false;
private String clientName;
public MessageHandler() {
// 注册 MCP 协议方法处理器
register("initialize", this::handleInitialize);
register("tools/list", this::handleToolsList);
register("tools/call", this::handleToolsCall);
register("notifications/initialized", this::handleInitialized);
}
private void register(String method, BiFunction<String, JsonNode, JsonRpcMessage> handler) {
handlers.put(method, handler);
}
/** 处理所有入站消息 */
public JsonRpcMessage handle(JsonRpcMessage request) {
String method = request.getMethod();
String id = request.getId();
// 通知类消息不需要响应
if (method != null && method.startsWith("notifications/")) {
handlers.getOrDefault(method, this::unknownMethod)
.apply(id, request.getParams());
return null;
}
// 未初始化时只允许 initialize
if (!initialized && !"initialize".equals(method)) {
return JsonRpcMessage.error(id, ErrorCodes.SERVER_NOT_INITIALIZED,
"Server not initialized. Send initialize first.");
}
BiFunction<String, JsonNode, JsonRpcMessage> handler =
handlers.getOrDefault(method, this::unknownMethod);
return handler.apply(id, request.getParams());
}
// ─── MCP 协议方法实现 ───
private JsonRpcMessage handleInitialize(String id, JsonNode params) {
String protocolVersion = params.path("protocolVersion").asText("2024-11-05");
clientName = params.path("clientInfo").path("name").asText("unknown");
ObjectNode result = MAPPER.createObjectNode();
result.put("protocolVersion", protocolVersion);
// 申明 Server 能力
ObjectNode capabilities = result.putObject("capabilities");
ObjectNode toolsCap = capabilities.putObject("tools");
// listChanged: Server 支持通知 Client 工具列表变化
toolsCap.put("listChanged", false);
ObjectNode serverInfo = result.putObject("serverInfo");
serverInfo.put("name", "mcp-server-java");
serverInfo.put("version", "1.0.0");
return JsonRpcMessage.success(id, result);
}
private JsonRpcMessage handleInitialized(String id, JsonNode params) {
initialized = true;
System.err.println("[MCP Server] Initialized by client: " + clientName);
return null; // 通知类,不响应
}
private JsonRpcMessage handleToolsList(String id, JsonNode params) {
ArrayNode tools = MAPPER.createArrayNode();
// 注册天气查询工具
ObjectNode weatherTool = tools.addObject();
weatherTool.put("name", "get_weather");
weatherTool.put("description", "查询指定城市的实时天气信息");
ObjectNode weatherSchema = weatherTool.putObject("inputSchema");
weatherSchema.put("type", "object");
ObjectNode weatherProps = weatherSchema.putObject("properties");
ObjectNode cityProp = weatherProps.putObject("city");
cityProp.put("type", "string");
cityProp.put("description", "城市名称,如北京、上海、Tokyo");
ArrayNode required = weatherSchema.putArray("required");
required.add("city");
// 可以在这里注册更多工具...
ObjectNode result = MAPPER.createObjectNode();
result.set("tools", tools);
return JsonRpcMessage.success(id, result);
}
private JsonRpcMessage handleToolsCall(String id, JsonNode params) {
String toolName = params.path("name").asText();
JsonNode arguments = params.path("arguments");
try {
String toolResult = dispatchToolCall(toolName, arguments);
ObjectNode result = MAPPER.createObjectNode();
ArrayNode content = result.putArray("content");
ObjectNode textContent = content.addObject();
textContent.put("type", "text");
textContent.put("text", toolResult);
result.put("isError", false);
return JsonRpcMessage.success(id, result);
} catch (Exception e) {
ObjectNode result = MAPPER.createObjectNode();
ArrayNode content = result.putArray("content");
ObjectNode textContent = content.addObject();
textContent.put("type", "text");
textContent.put("text", "工具执行失败: " + e.getMessage());
result.put("isError", true);
return JsonRpcMessage.success(id, result);
}
}
/** 工具调用分发 */
private String dispatchToolCall(String toolName, JsonNode arguments) {
switch (toolName) {
case "get_weather":
String city = arguments.path("city").asText("未知城市");
return WeatherTool.query(city);
default:
throw new IllegalArgumentException("未知工具: " + toolName);
}
}
private JsonRpcMessage unknownMethod(String id, JsonNode params) {
return JsonRpcMessage.error(id, ErrorCodes.METHOD_NOT_FOUND,
"Method not found");
}
}
3.5 天气工具实现
// tools/WeatherTool.java
package com.example.mcp.tools;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
/**
* 模拟天气查询工具。
* 生产环境中应对接真实天气 API(如 OpenWeatherMap、和风天气)。
*/
public class WeatherTool {
private static final Map<String, String> CONDITIONS = Map.of(
"北京", "晴 28°C 湿度45% 北风3级",
"上海", "多云 32°C 湿度70% 东南风2级",
"深圳", "雷阵雨 29°C 湿度85% 西南风4级",
"杭州", "阴 25°C 湿度60% 东北风2级",
"成都", "小雨 22°C 湿度80% 无持续风向"
);
public static String query(String city) {
// 生产环境:HTTP 调用天气 API
// RestTemplate rest = new RestTemplate();
// String resp = rest.getForObject(API_URL + "?city=" + city, String.class);
String result = CONDITIONS.getOrDefault(city, "未知城市,暂无数据");
// 模拟网络延迟
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return city + " 当前天气:" + result;
}
}
3.6 主入口:事件循环
// McpServer.java
package com.example.mcp;
import com.example.mcp.handler.MessageHandler;
import com.example.mcp.protocol.JsonRpcMessage;
import com.example.mcp.transport.StdioTransport;
public class McpServer {
public static void main(String[] args) {
StdioTransport transport = new StdioTransport();
MessageHandler handler = new MessageHandler();
transport.start();
System.err.println("[MCP Server] Waiting for client connection...");
try {
// 事件循环:收消息 → 处理 → 发响应
while (true) {
JsonRpcMessage request = transport.receive();
JsonRpcMessage response = handler.handle(request);
if (response != null) {
transport.send(response);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("[MCP Server] Interrupted, shutting down");
} catch (Exception e) {
System.err.println("[MCP Server] Fatal error: " + e.getMessage());
e.printStackTrace(System.err);
} finally {
transport.close();
}
}
}
3.7 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>mcp-server-java</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<jackson.version>2.16.1</jackson.version>
</properties>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.mcp.McpServer</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.8 Claude Desktop 配置
要让 Claude Desktop 使用我们的 MCP Server,编辑 claude_desktop_config.json:
{
"mcpServers": {
"weather-java": {
"command": "java",
"args": ["-jar", "/path/to/mcp-server-java-1.0.0.jar"]
}
}
}
四、工程化要点总结
| 要点 | 实现方式 | 为什么重要 |
|---|---|---|
| 传输隔离 | stdout 只用于 JSON-RPC,日志全走 stderr | 防止日志污染协议通道导致解析失败 |
| 独立读取线程 | stdin 用 daemon 线程异步读 | 避免 I/O 阻塞事件循环 |
| 状态机 | initialized 标志控制生命周期 |
防止未初始化的非法调用 |
| 错误友好 | 工具调用失败返回 isError: true 而非抛异常 |
LLM 可以看到错误信息并自我纠正 |
| 阻塞队列 | LinkedBlockingQueue 解耦读写 |
生产者-消费者模式,天然线程安全 |
更多推荐

所有评论(0)