文章目录

计算机网络基础

为什么要了解网络?

在 MCP Server 项目中,服务器需要与外部客户端(如 Claude Desktop、Claude Code、浏览器等)进行通信。这个通信过程本质上就是网络通信。项目提供了三种传输模式:

  • Stdio:通过标准输入输出通信(不涉及网络)
  • SSE:基于 HTTP 协议的长连接推送
  • HTTP Stream:基于 HTTP 协议的请求-响应 + 服务器推送

后两种模式需要你理解网络基础知识。

OSI模型的简化理解

OSI 七层模型是网络通信的理论框架,但对于本项目,你只需要理解三层:

通俗理解

  • 网络层决定数据包"送到哪台机器",靠 IP 地址寻址(就像快递的省市区地址)
  • 传输层决定数据"怎么送",靠 TCP 保证可靠到达(就像快递的签收确认)
  • 应用层决定数据"说的是什么",靠 HTTP/MCP 这样的协议约定格式(就像收件人拆开信封读信)

在代码中,你不需要手动操作网络层和传输层,cpp-httplib 库已经封装好了这些细节。你只需要关注应用层:定义什么样的 HTTP 路由,处理什么样的 JSON 请求,返回什么样的 JSON 响应。

IP地址与端口

IP 地址

IP 地址是网络中每台计算机的唯一标识。本项目使用的地址:

127.0.0.1

这是回环地址(Loopback Address),俗称 localhost。它永远指向本机

为什么项目用 127.0.0.1?因为 MCP Server 和客户端通常运行在同一台机器上:

// SseTransport.h 第51行 - SSE 传输的构造函数默认值
explicit SSE(int port = 8080, std::string host = "127.0.0.1");

// HttpStreamTransport.hpp 第42行 - HTTP Stream 传输的构造函数默认值
explicit HttpStream(int port = 8080, std::string host = "127.0.0.1");
端口

一台计算机可以同时运行多个网络程序。端口就是用来区分不同程序的。它是一个 0~65535 的整数。

127.0.0.1:8080 的含义是:

  • 127.0.0.1:本机
  • 8080:这台机器上的第 8080 号端口
  • 合起来:本机上的 8080 端口

本项目默认使用 8080 端口。为什么不是 80?因为 80 端口是 HTTP 的默认端口,在 Linux/Mac 上监听 1024 以下的端口需要 root 权限。8080 是 HTTP 开发中最常用的替代端口。

// SseTransport.cpp 第117行 - 实际监听
server_->listen(host_.c_str(), port_);
// 展开就是:server_->listen("127.0.0.1", 8080);
端口冲突

如果 8080 端口已经被其他程序占用(比如另一个开发服务器),listen() 会失败:

// SseTransport.cpp 第119-122行
if (!server_->listen(host_.c_str(), port_)) {
    LOG(ERROR) << "Failed to start SSE server on " << host_ << ":" << port_ << std::endl;
    server_running_.store(false);
}

这就是为什么 Start() 方法返回 bool 值——它告诉调用者启动是否成功。

TCP vs UDP

特性 TCP UDP
连接方式 需要建立连接(三次握手) 不需要连接,直接发送
可靠性 保证数据不丢失、不乱序 不保证,可能丢包
速度 较慢 较快
适用场景 网页、文件传输、API 调用 视频直播、在线游戏
本项目使用

本项目使用 TCP。HTTP 协议底层就是 TCP。为什么?

  • MCP 协议中的 JSON-RPC 消息必须完整、有序地到达
  • cpp-httplib 库底层使用 TCP socket
  • SSE 和 HTTP Stream 都建立在 HTTP 之上,HTTP 建立在 TCP 之上

在 CMakeLists.txt 中可以看到链接了 socket 库:

# CMakeLists.txt 第46-48行
if(WIN32)
    # Windows Sockets library is required on Windows
    target_link_libraries(${PROJECT_NAME} PRIVATE ws2_32)

ws2_32 是 Windows 上的 socket 库,cpp-httplib 通过它使用 TCP。

客户端-服务器模型

本项目遵循经典的客户端-服务器(Client-Server)模型

在 MCP 的语境中,这是双向的:

客户端 → 服务器(Request / Notification):

{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"weather","arguments":{"city":"Beijing"}}}

服务器 → 客户端(Response):

{"jsonrpc":"2.0","id":1,"result":{"content":[{"type":"text","text":"Beijing: 25°C, sunny"}]}}

服务器 → 客户端(Notification,服务器主动推送):

{"jsonrpc":"2.0","method":"notifications/tools/list_changed"}

在项目代码中,服务器的核心循环就在 Server::Connect() 中:

// Server.cpp 第127-154行
while (!isStopping_) {
    auto [length, json_string] = transport->Read();   // 等待客户端请求
    // ...
    json request = json::parse(json_string);           // 解析 JSON
    json response = HandleRequest(request);            // 处理请求
    transport_->Write(response.dump());                // 返回响应
}

这是一个典型的请求-响应循环:读取 → 处理 → 写入 → 读取 → …

阻塞I/O vs 非阻塞I/O

阻塞I/O(Blocking I/O)

当一个线程执行 Read() 时,如果还没有数据到达,这个线程会被操作系统挂起,CPU 会去执行其他线程。直到数据到达后,操作系统再唤醒这个线程。

就像你在餐厅点餐:

  • 阻塞:你站在柜台前,菜没做好就一直等,不做别的事
  • 非阻塞:你拿了号,去做别的事,等叫号了再回来

本项目中的所有 Read() 调用都是阻塞的:

// StdioTransport.cpp 第31-45行
std::pair<size_t, std::string> Stdio::Read() {
    std::string json_data;
    int c;
    while ((c = std::getc(stdin)) != EOF && c != '\n') {
        json_data += static_cast<char>(c);
    }
    return {json_data.length(), json_data};
}

std::getc(stdin) 是阻塞的——标准输入没有数据时,函数不会返回,整个线程停下来等待。

// SseTransport.cpp 第55-79行
std::pair<size_t, std::string> SSE::Read() {
    std::unique_lock<std::mutex> lock(incoming_mutex_);
    incoming_cv_.wait(lock, [this]() {
        return !incoming_messages_.empty() || !server_running_.load();
    });
    // ...
}

incoming_cv_.wait() 也是阻塞的——队列为空时,线程在条件变量上等待,CPU 不消耗。

为什么使用阻塞I/O?
  1. 简单:代码逻辑是线性的,读 → 处理 → 写,清晰易懂
  2. 适合请求量不大的场景:MCP Server 通常只有一个客户端(如 Claude Desktop),不需要处理几千个并发连接
  3. 配合多线程使用:阻塞的只是当前线程,其他线程(如 Writer 线程、Watcher 线程)不受影响

项目在信号处理中也利用了阻塞的性质:

// main.cpp 第43-46行 - 关于 SIGINT 对阻塞 read() 的影响
// 对于 stdio 传输,SIGINT 会中断阻塞的 read() 系统调用使 Connect 循环退出。
// 对于 HTTP/SSE 传输,Connect 循环会在下次迭代检测到 isStopping_ 后退出。

同步I/O vs 异步I/O

本项目既使用了同步I/O,也提供了异步I/O接口。

同步I/O

调用 Read() 后,当前线程阻塞,直到数据到达才返回:

auto [length, json_string] = transport->Read();
// 这行代码执行完时,数据一定已经拿到了
异步I/O

调用 ReadAsync() 后,立即返回一个 future 对象。你可以做其他事情,之后再通过 .get() 获取结果(此时如果数据还没到,.get() 会阻塞):

// Server.cpp 第181-182行
auto future = transport_->ReadAsync();
auto [length, json_string] = future.get();  // 这里才真正等待

项目中的异步实现非常简单——就是把同步版本包了一层:

// StdioTransport.cpp 第47-56行
std::future<std::pair<size_t, std::string>> Stdio::ReadAsync() {
    return std::async(std::launch::async, []() {
        std::string json_data;
        int c;
        while ((c = std::getc(stdin)) != EOF && c != '\n') {
            json_data += static_cast<char>(c);
        }
        return std::make_pair(json_data.length(), json_data);
    });
}

std::async(std::launch::async, ...) 会在另一个线程中执行阻塞的 getc,调用者的线程可以继续执行。

关键区别
概念 含义 项目中的对应
阻塞 vs 非阻塞 描述了函数调用的行为 Read() 会阻塞调用线程
同步 vs 异步 描述了获取结果的方式 ReadAsync() 返回 future,延迟获取结果

注意:项目的当前实现中,异步只是把阻塞操作放到了另一个线程。真正的"非阻塞异步I/O"(如 Linux 的 epoll、Windows 的 IOCP)项目中没有使用,因为 MCP Server 的连接数很少,阻塞+多线程已经完全够用。


HTTP协议深入

本项目使用 HTTP 作为 SSE 和 HTTP Stream 两种传输方式的底层协议。理解 HTTP 协议是理解这两个 Transport 的前提。

HTTP请求格式

一个 HTTP 请求由四部分组成:

方法 路径 HTTP/1.1          ← 请求行
Host: 127.0.0.1:8080        ← 请求头部(Headers)
Content-Type: application/json
Mcp-Session-Id: abc123

{"method":"tools/call",...}  ← 请求正文(Body,可选)
HTTP方法

项目用到了四种方法:

方法 用途 项目中的路由
GET 获取资源/建立长连接 /sse(SSE), /mcp(HTTP Stream 的 SSE 通道)
POST 提交数据 /messages(SSE), /mcp(HTTP Stream 请求)
DELETE 删除资源/结束会话 /mcp(HTTP Stream 会话终止)
OPTIONS 预检请求 所有路径(CORS 预检)
项目中的路由注册
// SseTransport.cpp 第150-165行
void SSE::SetupRoutes() {
    server_->Options("/.*", [this](...) { HandleOptionsRequest(req, res); });
    server_->Get("/health", [](...) { res.set_content(...); });
    server_->Post("/messages", [this](...) { HandlePostMessage(req, res); });
    server_->Get("/sse", [this](...) { HandleSSEConnection(req, res); });
}
// HttpStreamTransport.cpp 第163-183行
void HttpStream::SetupRoutes() {
    server_->Options("/.*", [](...) { HandleOptionsRequest(req, res); });
    server_->Get("/health", [](...) { res.set_content(...); });
    server_->Post("/mcp", [this](...) { HandlePostMessage(req, res); });
    server_->Get("/mcp", [this](...) { HandleGetSSE(req, res); });
    server_->Delete("/mcp", [this](...) { HandleDeleteSession(req, res); });
}

注意:

  • /.* 是一个正则表达式,匹配所有路径(用于 OPTIONS)
  • 同一个路径 /mcp 同时注册了 GET、POST、DELETE 三个方法的不同处理函数
HTTP头部(Headers)

头部是键值对,用来传递元信息。项目中最关键的头部:

头部名称 作用 示例值
Content-Type 告诉对方正文的格式 application/json, text/event-stream
Accept 告诉对方自己能接受什么格式 application/json, text/event-stream
Access-Control-Allow-Origin CORS:允许哪些来源访问 *(允许所有)
Mcp-Session-Id MCP 自定义:会话标识 18f3a2b1c-4d5e6f7a
Cache-Control 缓存策略 no-cache
Connection 连接管理 keep-alive
项目中的头部读取
// HttpStreamTransport.cpp 第189行
auto content_type = req.get_header_value("Content-Type");

// HttpStreamTransport.cpp 第197行
auto accept = req.get_header_value("Accept");

// HttpStreamTransport.cpp 第416-418行
auto client_session = req.get_header_value("Mcp-Session-Id");

get_header_value() 是 cpp-httplib 提供的便捷方法(见 httplib.h 第657行),如果头部不存在则返回空字符串。

HTTP正文(Body)

正文是请求/响应的实际数据。在项目中,正文永远是 JSON 字符串

// SseTransport.cpp 第286行 - 读取 POST 请求的正文
std::string message = req.body;

// HttpStreamTransport.cpp 第204行
std::string message = req.body;

HTTP响应格式

响应也由四部分组成:

HTTP/1.1 200 OK              ← 状态行
Content-Type: application/json
Mcp-Session-Id: abc123        ← 响应头部
Access-Control-Allow-Origin: *

{"jsonrpc":"2.0","id":1,...}  ← 响应正文
状态码

项目中使用的状态码:

状态码 含义 项目中的使用场景
200 OK,请求成功 正常返回响应
202 Accepted,已接受但尚未处理 通知消息(不需要响应)
400 Bad Request,请求格式错误 空消息体、JSON 解析失败
404 Not Found,资源不存在 会话 ID 无效
406 Not Acceptable,不接受此格式 Accept 头部不满足要求
415 Unsupported Media Type Content-Type 不是 application/json
503 Service Unavailable SSE 连接未建立
504 Gateway Timeout 请求处理超时

项目中的使用示例:

// HttpStreamTransport.cpp 第191-193行
if (content_type.find("application/json") == std::string::npos) {
    res.status = 415;  // Unsupported Media Type
    res.set_content("{\"error\":\"Unsupported Media Type. Expected application/json\"}", "application/json");
    return;
}
// HttpStreamTransport.cpp 第291行 - 请求超时
res.status = 504;  // Gateway Timeout
res.set_content("{\"error\":\"Request timed out\"}", "application/json");
// SseTransport.cpp 第281行 - SSE连接不存在时拒绝POST
if (!client_connected_.load()) {
    res.status = 503;  // Service Unavailable
    res.set_content("{\"error\":\"No SSE connection\"}", "application/json");
    return;
}
响应体设置
// 最简单的响应
res.set_content("{\"status\":\"ok\"}", "application/json");
// 第一个参数:响应正文
// 第二个参数:Content-Type

// 带自定义头部的响应
res.status = 200;
res.set_content(response_data, "application/json");
res.set_header("Mcp-Session-Id", session_id_);

Content-Type 协商

Content-Type 决定了数据的格式。客户端和服务器需要通过它来"协商"使用什么格式交换数据。

在 MCP Server 中,有两个关键的 Content-Type:

application/json

标准的 JSON 格式,用于普通的请求和响应

Content-Type: application/json

{"jsonrpc":"2.0","id":1,"result":{"tools":[...]}}

项目在 HTTP Stream 中强制要求 POST 请求必须是 JSON:

// HttpStreamTransport.cpp 第189-194行
auto content_type = req.get_header_value("Content-Type");
if (content_type.find("application/json") == std::string::npos) {
    res.status = 415;
    res.set_content("{\"error\":\"Unsupported Media Type. Expected application/json\"}", "application/json");
    return;
}

同时还检查 Accept 头部,确保客户端能接受 JSON 响应:

// HttpStreamTransport.cpp 第197-202行
auto accept = req.get_header_value("Accept");
if (!accept.empty() && accept.find("application/json") == std::string::npos) {
    res.status = 406;
    res.set_content("{\"error\":\"Not Acceptable. Must accept application/json\"}", "application/json");
    return;
}
text/event-stream

SSE 专用的 MIME 类型,用于服务器向客户端的实时推送

// SseTransport.cpp 第181行
res.set_header("Content-Type", "text/event-stream");

为什么需要区分 application/jsontext/event-stream?因为浏览器/客户端需要知道如何解析数据:

  • application/json → 直接 JSON.parse(body) 就行
  • text/event-stream → 需要按 SSE 协议解析(按行读取,解析 event:data: 字段)

CORS 跨域资源共享

什么是跨域?

浏览器的同源策略(Same-Origin Policy)规定:一个网页只能请求同源(协议+域名+端口都相同)的服务器。

例如,https://claude.ai 的网页想请求 http://localhost:8080 的数据,浏览器会阻止。因为:

  • 协议不同:https vs http
  • 域名不同:claude.ai vs localhost
  • 端口不同:443 vs 8080
CORS 如何解决?

CORS 通过在 HTTP 响应中添加特定的头部来告诉浏览器:“我允许来自其他来源的请求”。

本项目的 CORS 配置:

// SseTransport.cpp 第309-315行
void SSE::SetCORSHeaders(httplib::Response& res) {
    res.set_header("Access-Control-Allow-Origin", "*");        // 允许所有来源
    res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS");  // 允许的方法
    res.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization, x-api-key");  // 允许的请求头
    res.set_header("Access-Control-Expose-Headers", "Content-Type, Authorization, x-api-key"); // 允许客户端读取的响应头
    res.set_header("Access-Control-Max-Age", "86400");        // 预检结果缓存 24 小时
}

HTTP Stream 版本的 CORS 配置多了一个重要头部:

// HttpStreamTransport.cpp 第432-438行
void HttpStream::SetCORSHeaders(httplib::Response& res) {
    res.set_header("Access-Control-Allow-Origin", "*");
    res.set_header("Access-Control-Allow-Methods", "GET, POST, DELETE, OPTIONS");  // 多了 DELETE
    res.set_header("Access-Control-Allow-Headers", "Content-Type, Authorization, Mcp-Session-Id");  // 多了 Mcp-Session-Id
    res.set_header("Access-Control-Expose-Headers", "Content-Type, Mcp-Session-Id");
    res.set_header("Access-Control-Max-Age", "86400");
}

关键区别:

  • Allow-Methods 多了 DELETE(HTTP Stream 需要删除会话)
  • Allow-HeadersExpose-Headers 中多了 Mcp-Session-Id(HTTP Stream 的会话管理需要)
为什么 SSE 的 Allow-Headers 没有 Mcp-Session-Id?

SSE 模式下,会话 ID 是通过 URL 查询参数传递的(/messages?session_id=xxx),而不是通过 HTTP 头部。

// SseTransport.cpp 第209行
std::string event_endpoint = "event: endpoint\ndata: /messages?session_id=" + sessionId + "\n\n";

而 HTTP Stream 模式下,会话 ID 通过 Mcp-Session-Id 头部传递:

// HttpStreamTransport.cpp 第416行
auto client_session = req.get_header_value("Mcp-Session-Id");

预检请求 OPTIONS

对于某些"非简单"的跨域请求(如带自定义头部的 POST、DELETE 等),浏览器会在发送实际请求之前,先发送一个 OPTIONS 请求来询问服务器:“我可以发这个请求吗?”

流程如下:

项目中对 OPTIONS 的处理非常简单——直接返回 200 和 CORS 头部:

// SseTransport.cpp 第304-307行
void SSE::HandleOptionsRequest(const httplib::Request& req, httplib::Response& res) {
    SetCORSHeaders(res);
    res.status = 200;
}

不需要处理任何正文,因为 OPTIONS 请求只关心响应头部中的 CORS 信息。

路由注册使用了正则表达式来匹配所有路径:

// SseTransport.cpp 第151-153行
server_->Options("/.*", [this](const httplib::Request& req, httplib::Response& res) {
    HandleOptionsRequest(req, res);
});

/.* 匹配所有路径,这样无论客户端 OPTIONS 哪个路径,服务器都能正确响应。

自定义头部 Mcp-Session-Id

Mcp-Session-Id 是本项目使用的一个自定义 HTTP 头部,用于 HTTP Stream 模式下管理会话。

标准的 HTTP 头部由 IANA 注册管理,但应用程序可以自定义任何以 X- 开头(传统习惯)的头部。Mcp-Session-Id 虽然没有 X- 前缀,但它是 MCP 规范约定的自定义头部。

在 HTTP Stream 中,每个客户端在初始化(initialize 方法)时获得一个唯一的会话 ID:

// HttpStreamTransport.cpp 第224-228行
if (is_initialize) {
    // Generate session ID on initialize
    session_id_ = vx::utils::SessionBuilder::GenerateUniqueSessionID();
    session_initialized_ = true;
    client_connected_.store(true);
    LOG(INFO) << "Session initialized: " << session_id_ << std::endl;
}

之后,客户端的每个请求都必须带上这个会话 ID:

// HttpStreamTransport.cpp 第231-233行
} else if (session_initialized_) {
    if (!ValidateSession(req, res)) {
        return;
    }
}

验证逻辑:

// HttpStreamTransport.cpp 第416-425行
bool HttpStream::ValidateSession(const httplib::Request& req, httplib::Response& res) const {
    auto client_session = req.get_header_value("Mcp-Session-Id");
    if (client_session.empty() || client_session != session_id_) {
        LOG(ERROR) << "Invalid session ID: " << client_session << " (expected: " << session_id_ << ")" << std::endl;
        res.status = 404;
        res.set_content("{\"error\":\"Invalid or missing session ID\"}", "application/json");
        return false;
    }
    return true;
}

服务器在响应中也返回这个会话 ID,让客户端知道会话已建立:

// HttpStreamTransport.cpp 第305-306行
res.set_header("Mcp-Session-Id", session_id_);

cpp-httplib库详解

本项目使用 cpp-httplibinclude/httplib.h)作为 HTTP 服务器库。它是一个**仅头文件(header-only)**的 C++ 库——不需要编译链接,只需 #include "httplib.h" 就可以使用。

为什么选 cpp-httplib?

特性 说明
Header-only 不需要额外的编译步骤
跨平台 Windows / Linux / macOS
支持 HTTPS 通过 OpenSSL(本项目未使用)
轻量级 约 8000 行代码,无额外依赖
支持流式传输 set_content_provider 是实现 SSE 的核心

Server 创建与路由注册

创建服务器
// SseTransport.cpp 第47行 - SSE Transport 的构造函数
SSE::SSE(const int port, std::string host)
    : host_(std::move(host)), port_(port),
      server_(std::make_unique<httplib::Server>())  // ← 创建 HTTP 服务器
{
    SetupRoutes();
}

httplib::Server 是 cpp-httplib 的核心类。std::make_unique 说明每个 Transport 对象拥有一个独占的 Server 实例。

路由注册

cpp-httplib 使用流畅的 API 注册路由:

// 语法:server_->Method(path, handler);
server_->Get("/sse", handler);       // 处理 GET 请求
server_->Post("/messages", handler); // 处理 POST 请求
server_->Delete("/mcp", handler);    // 处理 DELETE 请求
server_->Options("/.*", handler);    // 处理 OPTIONS 请求(支持正则)

handler 是一个 lambda 函数(或任何可调用对象),签名为:

void(const httplib::Request& req, httplib::Response& res)

请求处理

httplib::Request 对象包含了客户端发来的所有信息。

常用属性和方法
// httplib.h 第627-682行 - Request 结构体定义
struct Request {
    std::string method;    // "GET", "POST", "DELETE", etc.
    std::string path;      // "/sse", "/messages", "/mcp"
    std::string body;      // 请求正文
    Headers headers;       // 所有 HTTP 头部的 map

    std::string remote_addr;  // 客户端 IP 地址
    int remote_port;          // 客户端端口

    std::string get_header_value(const std::string& key) const;  // 获取指定头部
};

项目中的实际使用:

// 读取请求方法、路径、版本、地址
// SseTransport.cpp 第170-178行
LOG(DEBUG) << "Request method: " << req.method << std::endl;
LOG(DEBUG) << "Request path: " << req.path << std::endl;
LOG(DEBUG) << "Request version: " << req.version << std::endl;
LOG(DEBUG) << "Request remote address: " << req.remote_addr << std::endl;

// 读取请求正文
// SseTransport.cpp 第286行
std::string message = req.body;

// 读取特定头部
// HttpStreamTransport.cpp 第189行
auto content_type = req.get_header_value("Content-Type");
// HttpStreamTransport.cpp 第416行
auto client_session = req.get_header_value("Mcp-Session-Id");

// 遍历所有头部(调试用)
// SseTransport.cpp 第171-173行
for (const auto &header: req.headers) {
    LOG(DEBUG) << " - " << header.first << ": " << header.second << std::endl;
}

响应设置

httplib::Response 对象用于构建返回给客户端的响应。

常用方法和属性
// httplib.h 第684-731行 - Response 结构体定义
struct Response {
    int status;       // HTTP 状态码
    std::string body; // 响应正文

    void set_header(const std::string& key, const std::string& val);
    void set_content(const std::string& s, const std::string& content_type);
    void set_content_provider(const std::string& content_type, ContentProviderWithoutLength provider);
};

项目中的实际使用:

最简单的情况

// HttpStreamTransport.cpp 第169行
res.set_content("{\"status\":\"ok\"}", "application/json");

设置状态码 + 错误消息

// HttpStreamTransport.cpp 第191-193行
res.status = 415;
res.set_content("{\"error\":\"Unsupported Media Type. Expected application/json\"}", "application/json");

设置多个头部(SSE 连接的建立)

// SseTransport.cpp 第180-183行
SetCORSHeaders(res);
res.set_header("Content-Type", "text/event-stream");
res.set_header("Cache-Control", "no-cache");
res.set_header("Connection", "keep-alive");

set_content_provider — 流式传输的核心

这是理解 SSE 实现最关键的部分。

普通响应 vs 流式响应

普通响应set_content):

res.set_content("done", "text/plain");
// 1. 设置 Content-Type
// 2. 把整个 body 写入响应
// 3. 连接结束

流式响应set_content_provider):

res.set_content_provider("text/plain", [](size_t offset, DataSink& sink) -> bool {
    sink.write("chunk1", 6);
    sleep(1);
    sink.write("chunk2", 6);
    return false;  // false 表示结束
});
// cpp-httplib 会反复调用这个 lambda 函数
// 每次调用时,通过 sink.write() 发送一批数据
// 返回 true 表示"还有数据,继续调我"
// 返回 false 表示"数据发完了,可以关闭连接了"

关键区别:

  • set_content一次性写入全部数据,连接随即关闭
  • set_content_provider:cpp-httplib 反复调用你的函数,你每次写入一小块。这实现了流式传输(streaming)
ContentProvider 的函数签名
// httplib.h 第578-579行
using ContentProviderWithoutLength =
    std::function<bool(size_t offset, DataSink &sink)>;

参数:

  • offset:当前已写入的字节数(cpp-httplib 传入,你可以忽略)
  • sink:数据输出接口,通过它写入数据
  • 返回值:true = 继续调用,false = 结束流

在 SSE 传输中,set_content_provider 创建了一个持久连接——只要你的回调函数一直返回 true,连接就一直保持。这正是 SSE 需要的"长连接"。

DataSink 接口

DataSink 是你向客户端发送数据的"水槽"。

// httplib.h 第542-573行
class DataSink {
public:
    std::function<bool(const char *data, size_t data_len)> write;    // 写入数据
    std::function<bool()> is_writable;                               // 检查连接是否可写
    std::function<void()> done;                                      // 标记完成
    std::ostream os;                                                 // 流式输出(包装了 write)
};

项目中使用 sink.write() 发送数据:

// SseTransport.cpp 第210行 - 发送 SSE 事件
sink.write(event_endpoint.data(), event_endpoint.size());

// SseTransport.cpp 第222行 - 发送心跳
sink.write(ping, std::strlen(ping));

// SseTransport.cpp 第259行 - 发送消息
sink.write(sse_msg.data(), sse_msg.size());

项目中使用 sink.is_writable() 检测连接断开:

// SseTransport.cpp 第245-248行
#ifdef CPPHTTPLIB_HAS_SINK_IS_WRITABLE
if (!sink.is_writable()) {
    LOG(DEBUG) << "Sink no longer writable; client likely disconnected" << std::endl;
    return terminate();
}
#endif

sink.write() 的返回值:

  • true:写入成功
  • false:写入失败(通常是客户端断开连接)

这就是断连检测的机制——当 sink.write() 返回 false,说明客户端已经离开,content_provider 应该返回 false 来终止流。

项目中的实际使用:每个 Transport 的 Start() 方法

三种 Transport 的 Start() 方法展示了不同的启动模式。

StdioTransport
// StdioTransport.h 第46行
bool Start() override { return true; }

Stdio 不需要启动任何网络服务,Start() 只是返回 true

SseTransport
// SseTransport.cpp 第109-127行
bool SSE::Start() {
    if (server_running_.load()) {
        return false;  // 防止重复启动
    }
    server_running_.store(true);

    server_thread_ = std::thread([this]() {
        LOG(INFO) << "Starting SSE server on " << host_ << ":" << port_ << std::endl;
        if (!server_->listen(host_.c_str(), port_)) {  // ← 阻塞调用
            LOG(ERROR) << "Failed to start SSE server..." << std::endl;
            server_running_.store(false);
        }
    });

    std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 等待启动完成
    return server_running_.load();
}

关键点:

  1. 在一个独立线程中调用 server_->listen(),因为它是阻塞的
  2. 等待 100ms 让服务器有时间启动
  3. 返回实际的运行状态
HttpStreamTransport
// HttpStreamTransport.cpp 第41-59行
bool HttpStream::Start() {
    if (server_running_.load()) {
        return false;
    }
    server_running_.store(true);

    server_thread_ = std::thread([this]() {
        LOG(INFO) << "Starting HttpStream server on " << host_ << ":" << port_ << std::endl;
        if (!server_->listen(host_.c_str(), port_)) {  // ← 阻塞调用
            LOG(ERROR) << "Failed to start HttpStream server..." << std::endl;
            server_running_.store(false);
        }
    });

    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    return server_running_.load();
}

与 SSE 几乎完全相同——都是在新线程中启动 HTTP 服务器。

Stop() 方法的共同模式
// SseTransport.cpp 第129-148行
void SSE::Stop() {
    server_->stop();           // 停止 cpp-httplib 服务器
    if (server_thread_.joinable()) {
        server_thread_.join(); // 等待服务器线程退出
    }
    // 唤醒所有等待中的线程
    incoming_cv_.notify_all();
    outgoing_cv_.notify_all();
}

SSE(Server-Sent Events)协议

SSE 的本质

SSE (Server-Sent Events) 是一种让服务器主动向客户端推送数据的 Web 技术。它的本质是:

HTTP 长连接 + text/event-stream 格式 + 单向推送

在本项目中,SSE Transport 对应文件 src/transport/SseTransport.cpp

SSE协议、SSE流、SSE传输模式辨析

这三个词只差几个字,但指的是三个不同层级的概念。理解它们的区别是理解整个项目通信架构的关键。

概念层级总览

通俗类比

层级 概念 类比
SSE 协议 数据格式规范 “电报的编码规则”——规定了电文怎么开头、怎么分段
SSE 流 协议的具体实例 “一条正在通话的电话线”——用这套规则传输数据的实际连接
SSE 传输模式 完整的通信方案 “整个邮局系统”——既有收件窗口(POST),又有投递线路(SSE 流)
第1层:SSE 协议 —— 只定义了"怎么说"

SSE 协议(Server-Sent Events)是 W3C/WHATWG 定义的 Web 标准。它的核心规定只有三条:

1. 服务器响应的 Content-Type 必须是 text/event-stream
2. 数据按 "field: value\n" 的文本格式组织,双换行 \n\n 分隔每条消息
3. 方向是单向的:服务器 → 客户端

协议本身不规定客户端怎么发请求,也不规定连接怎么建立。它只是一个数据格式规范

SSE 协议规定的消息格式:

event: message          ← 事件类型(可选,默认 "message")
data: {"key": "value"}  ← 数据内容(必需)
id: 42                  ← 事件序号(可选,用于断线重连)
                        ← 空行表示一条消息结束

: 冒号开头的行是注释   ← 用于心跳保活

SSE 协议不关心的事情

  • 客户端怎么把请求发给服务器? → 不管
  • 请求和响应怎么匹配? → 不管
  • 连接断了怎么重连? → 只规定了 id: 字段和 Last-Event-Id 头部,具体逻辑由客户端实现
第2层:SSE 流 —— SSE 协议的实例

SSE 流(SSE Stream)是一条具体的、使用 SSE 协议的 TCP 长连接

在项目中,SSE 流是这样创建的:

// SseTransport.cpp 第180-183行
res.set_header("Content-Type", "text/event-stream");  // ← 声明使用 SSE 协议
res.set_header("Cache-Control", "no-cache");
res.set_header("Connection", "keep-alive");            // ← 长连接

res.set_content_provider(                              // ← 流式传输,连接不关闭
    "text/event-stream",
    [this](size_t offset, httplib::DataSink& sink) -> bool {
        // 只要这个函数返回 true,连接就一直保持
        // 每次通过 sink.write() 发送的数据都按 SSE 协议格式
    }
);

关键认知:一条 SSE 流 = 一条 TCP 连接。它本质上是一根"水管",水管里流的水遵循 SSE 协议格式。

第3层:SSE 传输模式 —— 用双通道拼出双向通信

SSE 传输模式(项目中的 vx::transport::SSE 类)是一个完整的通信方案

回忆上一层的结论:一条 SSE 流只能服务器→客户端单向推送。那客户端怎么发请求?答案很简单:再开一条短连接专门发请求

所以 SSE 传输模式 = POST 短连接 + SSE 长连接

POST /messages  →  把客户端请求扔进 incoming_messages_ 队列 → 立刻返回 200(连接关闭)
GET  /sse       →  一直连着,从 outgoing_messages_ 队列取响应 → 包装成 SSE 格式推送

注意:POST 返回的 200 只表示"消息投递成功",不代表"请求已处理完毕"。真正的处理结果要等 Server 处理完后,通过 SSE 流推送回来。

三种层级的关系总结

图中用 emoji 标注了三个层级的对应关系:

  • 🟠 标注的节点:属于 SSE 传输模式的 POST 通道(短连接)
  • 🟢 标注的节点:属于 SSE 流 + SSE 协议(长连接 + SSE 格式推送)
  • 未标注的节点:属于 Server 公共逻辑(与传输模式无关)
连接生命周期对比

关键结论

  • POST /messages:每次请求都是一条新连接,发完即关(就像寄信,每封信独立投递)
  • GET /sse:建立后一直保持,所有响应都通过这一条连接推送(就像一条专属快递传送带)
  • SSE 传输模式 = 无数条短命的 POST 连接 + 一条长寿的 SSE 长连接

SSE 协议的建立

客户端发起一个普通的 HTTP GET 请求,服务器返回特殊的响应头:

// SseTransport.cpp 第168-186行
void SSE::HandleSSEConnection(const httplib::Request& req, httplib::Response& res) {
    // 设置 CORS 头部
    SetCORSHeaders(res);

    // SSE 的关键头部
    res.set_header("Content-Type", "text/event-stream");  // ← 告诉浏览器这是 SSE
    res.set_header("Cache-Control", "no-cache");          // ← 禁止缓存
    res.set_header("Connection", "keep-alive");           // ← 保持连接

    client_connected_.store(true);
    sse_active_.store(true);

    // 注册内容提供者(流式传输)
    res.set_content_provider(
        "text/event-stream",
        [this](size_t offset, httplib::DataSink& sink) -> bool {
            // ... 推送逻辑
        }
    );
}

三个关键头部缺一不可:

  • Content-Type: text/event-stream — 告诉客户端按 SSE 格式解析
  • Cache-Control: no-cache — 代理服务器不应缓存 SSE 流
  • Connection: keep-alive — HTTP 连接保持打开

SSE 事件格式

SSE 数据流的格式非常简单,就是纯文本,每条消息由两个换行符分隔:

event: endpoint\ndata: /messages?session_id=abc123\n\n

格式规则:

  • event: — 事件类型(可选,默认为 “message”)
  • data: — 数据内容(可以有多个 data: 行,组成多行数据)
  • id: — 事件 ID(可选,用于断线重连)
  • : 开头 — 注释行(用于心跳保活)
  • \n\n — 双换行符表示一条消息结束

项目中的事件构建:

// SseTransport.cpp 第209行 - 发送 endpoint 事件
std::string event_endpoint = "event: endpoint\ndata: /messages?session_id=" + sessionId + "\n\n";

// SseTransport.cpp 第256行 - 发送消息事件
std::string sse_msg = "data: " + message + "\n\n";

注意 SSE 模式中,消息事件的格式是没有 event: 行的(默认为 message 类型),只有 data: 行。

而在 HTTP Stream 中,消息明确声明了事件类型:

// HttpStreamTransport.cpp 第368行
std::string sse_msg = "event: message\ndata: " + message + "\n\n";

Keep-Alive 心跳

SSE 是长连接,如果长时间没有数据传输,中间的网络设备(路由器、代理、负载均衡器)可能会关闭这个"闲置"的连接。

心跳机制就是定期发送一条无意义的数据来"告诉"沿途设备"我还活着"。

在 SSE 协议中,以冒号 : 开头的行是注释行,客户端会忽略它们,非常适合做心跳:

: ping\n\n

项目中的心跳实现:

// SseTransport.cpp 第193-227行
const auto ping_interval = std::chrono::seconds(15);  // 每 15 秒一次

// ...

if (clock::now() - last_ping >= ping_interval) {
    // SSE comment line as keep-alive
    const char* ping = ": ping\n\n";
    // If write fails, client disconnected
    if (!sink.write(ping, std::strlen(ping))) {
        LOG(ERROR) << "Keep-alive write failed; client likely disconnected" << std::endl;
        return terminate();  // 连接断开,退出
    }
    last_ping = clock::now();
}

心跳间隔 15 秒是一个合理的值——大多数网络设备的空闲超时通常在 30 秒到 2 分钟之间,15 秒可以安全地在超时前刷新连接状态。

断连检测与重连

服务器端检测断连
// SseTransport.cpp 第196-201行
auto terminate = [this]() -> bool {
    sse_active_.store(false);
    client_connected_.store(false);
    outgoing_cv_.notify_all();   // 唤醒可能阻塞的发送线程
    incoming_cv_.notify_all();   // 唤醒可能阻塞的读取线程
    return false;                // 返回 false 表示 content_provider 结束
};

三种检测方式:

  1. 写入失败
// SseTransport.cpp 第222行
if (!sink.write(ping, std::strlen(ping))) {
    return terminate();
}
  1. is_writable() 检查
// SseTransport.cpp 第245-248行
if (!sink.is_writable()) {
    return terminate();
}
  1. 异常捕获
// SseTransport.cpp 第266-272行
} catch (const std::exception& ex) {
    LOG(ERROR) << "Exception in SSE content provider: " << ex.what() << std::endl;
    return terminate();
} catch (...) {
    LOG(ERROR) << "Unknown exception in SSE content provider" << std::endl;
    return terminate();
}
客户端重连

SSE 协议内置了重连支持。如果连接断开,浏览器(使用 EventSource API)会自动重连。服务器可以通过 id: 字段告诉客户端最后一个事件 ID,客户端重连时会在请求中带上 Last-Event-Id 头部,服务器可以从中断的地方继续推送。

本项目目前没有实现 id:Last-Event-Id 的重连机制,断连后客户端需要重新建立完整的会话。

多客户端并发推送

SSE Transport 的 write() 方法通过消息队列和条件变量实现线程安全的消息推送:

// SseTransport.cpp 第81-94行
void SSE::Write(const std::string& json_data) {
    if (!client_connected_.load()) {
        return;  // 没有客户端连接,忽略
    }

    {
        std::lock_guard<std::mutex> lock(outgoing_mutex_);
        outgoing_messages_.push(json_data);  // 加入队列
    }
    outgoing_cv_.notify_one();  // 唤醒 content_provider 线程
}

content_provider 中的处理:

// SseTransport.cpp 第230-263行
std::unique_lock<std::mutex> lock(outgoing_mutex_);
outgoing_cv_.wait_for(lock, std::chrono::milliseconds(200), [this]() {
    return !outgoing_messages_.empty() || !sse_active_.load();
});

if (!outgoing_messages_.empty()) {
    std::string message = outgoing_messages_.front();
    outgoing_messages_.pop();
    lock.unlock();

    std::string sse_msg = "data: " + message + "\n\n";
    sink.write(sse_msg.data(), sse_msg.size());
}

这段代码做了三件事:

  1. 等待队列中出现消息(最多等 200ms)
  2. 取出消息
  3. 包装成 SSE 格式并发送

注意:当前 SSE 实现只支持单个客户端(只有一个 client_connected_ 标志和一个 outgoing_messages_ 队列)。如果需要支持多个同时连接,需要维护每个连接的独立队列。

项目代码逐行分析:SseTransport.cpp

让我们从头到尾分析 SSE Transport 的关键代码。

构造函数(第 47-49 行)
SSE::SSE(const int port, std::string host)
    : host_(std::move(host)), port_(port),
      server_(std::make_unique<httplib::Server>()) {
    SetupRoutes();  // 注册 HTTP 路由
}

创建一个 cpp-httplib 服务器实例,然后立即注册路由。

析构函数(第 51-53 行)
SSE::~SSE() {
    SSE::Stop();  // 确保析构时停止服务器
}
SetupRoutes(第 150-166 行)
void SSE::SetupRoutes() {
    server_->Options("/.*", ...);   // CORS 预检
    server_->Get("/health", ...);   // 健康检查
    server_->Post("/messages", ...); // 接收客户端消息
    server_->Get("/sse", ...);      // SSE 长连接
}

注册四个路由。注意注册顺序无关紧要,cpp-httplib 内部会维护一个路由表。

Read()(第 55-79 行)
std::pair<size_t, std::string> SSE::Read() {
    std::unique_lock<std::mutex> lock(incoming_mutex_);
    incoming_cv_.wait(lock, [this]() {
        return !incoming_messages_.empty() || !server_running_.load();
    });
    // 取出队列中的消息
    if (!incoming_messages_.empty()) {
        std::string message = incoming_messages_.front();
        incoming_messages_.pop();
        return {message.length(), message};
    }
    return {0, ""};
}

这是 ITransport 接口的实现。它阻塞等待来自 incoming_messages_ 队列的消息。消息如何进入队列?通过 HandlePostMessage()

HandlePostMessage(第 277-302 行)
void SSE::HandlePostMessage(const httplib::Request& req, httplib::Response& res) {
    SetCORSHeaders(res);

    if (!client_connected_.load()) {
        res.status = 503;  // SSE 连接不存在
        return;
    }

    std::string message = req.body;
    if (message.empty()) {
        res.status = 400;  // 空消息
        return;
    }

    // 将消息放入 incoming 队列
    {
        std::lock_guard<std::mutex> lock(incoming_mutex_);
        incoming_messages_.push(message);
    }
    incoming_cv_.notify_one();  // 唤醒 Read() 中等待的线程

    res.status = 200;
}

客户端通过 POST 发送消息,服务器将其放入队列,然后阻塞在 Read() 上的主线程被唤醒,取出消息并处理。

Write()(第 81-94 行)
void SSE::Write(const std::string& json_data) {
    if (!client_connected_.load()) {
        return;  // 忽略——没有客户端
    }
    {
        std::lock_guard<std::mutex> lock(outgoing_mutex_);
        outgoing_messages_.push(json_data);  // 加入发送队列
    }
    outgoing_cv_.notify_one();  // 唤醒 content_provider 线程
}

服务器处理完请求后调用 Write() 返回响应。消息进入 outgoing_messages_ 队列,由 SSE content_provider 取出并以 SSE 格式发送。

数据流总结

HTTP

客户端
POST /messages

HandlePostMessage

incoming_messages_

incoming_cv_.notify_one()

Read() 被唤醒

Server 处理请求

Write() 被调用

outgoing_messages_.push()

outgoing_cv_.notify_one()

SSE content_provider 被唤醒

sink.write('data: ' + msg + '

')

客户端收到 SSE 事件

SSE 通信中消息格式的实际例子

客户端通过 POST 发送请求

POST /messages HTTP/1.1
Content-Type: application/json

{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"weather","arguments":{"city":"Tokyo"}}}

服务器通过 SSE 长连接返回响应

data: {"jsonrpc":"2.0","id":1,"result":{"content":[{"type":"text","text":"Tokyo: 20°C, cloudy"}]}}

服务器主动推送通知

data: {"jsonrpc":"2.0","method":"notifications/tools/list_changed"}

HTTP Stream / Streamable HTTP

与 SSE 的区别

HTTP Stream Transport 是本项目中对 MCP 规范中 “Streamable HTTP” 传输的实现。它与 SSE 的核心区别:

特性 SSE HTTP Stream
通信方向 单向(服务器→客户端推送) 双向(客户端请求 + 服务器推送)
请求发送 POST /messages POST /mcp(同一个端点)
响应接收 GET /sse 长连接 GET /mcp 长连接 + POST 直接返回
会话管理 URL 参数 ?session_id= HTTP 头部 Mcp-Session-Id
断开方式 关闭 SSE 连接 DELETE /mcp
请求-响应匹配 不匹配(客户端自己关联) 基于 jsonrpc.id 匹配

在 SSE 模式中,请求和响应走不同的通道

  • 请求走 POST /messages
  • 响应走 GET /sse 长连接

在 HTTP Stream 模式中,所有通信都通过 /mcp 这一个端点:

  • 请求通过 POST /mcp 发送
  • 响应可以直接从 POST 响应返回,也可以通过 GET /mcp 的 SSE 流推送服务器通知

会话管理

HTTP Stream 的最大特点是基于会话。每个客户端首先发送 initialize 请求建立会话,之后的所有请求都要携带会话 ID。

会话建立initialize 方法触发):

// HttpStreamTransport.cpp 第220-228行
bool is_initialize = parsed.contains("method") && parsed["method"] == "initialize";

if (is_initialize) {
    session_id_ = vx::utils::SessionBuilder::GenerateUniqueSessionID();
    session_initialized_ = true;
    client_connected_.store(true);
    LOG(INFO) << "Session initialized: " << session_id_ << std::endl;
}

会话 ID 的生成(时间戳 + 随机数):

// SessionBuilder.h 第37-49行
static std::string GenerateUniqueSessionID() {
    auto now = std::chrono::high_resolution_clock::now();
    auto timestamp = std::chrono::duration_cast<std::chrono::microseconds>(
        now.time_since_epoch()).count();

    static std::random_device rd;
    static std::mt19937 gen(rd());
    static std::uniform_int_distribution<uint32_t> dis;

    std::stringstream ss;
    ss << std::hex << timestamp << "-" << dis(gen);
    return ss.str();
}

生成的 ID 格式如 18f3a2b1c-4d5e6f7a(微秒级时间戳 + 随机数,十六进制)。

会话验证

// HttpStreamTransport.cpp 第416-425行
bool HttpStream::ValidateSession(const httplib::Request& req, httplib::Response& res) const {
    auto client_session = req.get_header_value("Mcp-Session-Id");
    if (client_session.empty() || client_session != session_id_) {
        res.status = 404;
        res.set_content("{\"error\":\"Invalid or missing session ID\"}", "application/json");
        return false;
    }
    return true;
}

会话终止(客户端主动关闭):

// HttpStreamTransport.cpp 第389-414行
void HttpStream::HandleDeleteSession(const httplib::Request& req, httplib::Response& res) {
    SetCORSHeaders(res);

    if (!session_initialized_) {
        res.status = 404;
        return;
    }

    if (!ValidateSession(req, res)) {
        return;
    }

    session_initialized_ = false;
    client_connected_.store(false);
    sse_stream_active_.store(false);

    incoming_cv_.notify_all();  // 唤醒等待中的线程
    sse_cv_.notify_all();

    res.status = 200;
    res.set_content("{\"status\":\"session terminated\"}", "application/json");
}

会话的生命周期:

请求-响应匹配(基于 jsonrpc.id)

在 HTTP Stream 中,请求的响应不再通过 SSE 流返回,而是直接从 POST 请求的响应中返回。这是通过 promise/future 机制实现的。

PendingRequest 结构
// HttpStreamTransport.hpp 第97-99行
struct PendingRequest {
    std::promise<std::string> promise;  // 用于传递响应的 promise
};
std::unordered_map<std::string, std::shared_ptr<PendingRequest>> pending_requests_;

pending_requests_ 是一个 map,key 是请求的 id,value 是一个 promise

完整流程

步骤 1:收到 POST 请求,创建 pending request

// HttpStreamTransport.cpp 第257-275行
// 提取请求 ID
std::string id_str;
if (parsed["id"].is_number()) {
    id_str = std::to_string(parsed["id"].get<int>());
} else {
    id_str = parsed["id"].get<std::string>();
}

// 创建 PendingRequest(包含 promise/future 对)
auto pending = std::make_shared<PendingRequest>();
std::future<std::string> response_future = pending->promise.get_future();

{
    std::lock_guard<std::mutex> lock(pending_mutex_);
    pending_requests_[id_str] = pending;  // 注册到 map
}

// 将消息放入 incoming 队列
{
    std::lock_guard<std::mutex> lock(incoming_mutex_);
    incoming_messages_.push(message);
}
incoming_cv_.notify_one();

步骤 2:Server 处理请求,调用 Write() 返回响应

// HttpStreamTransport.cpp 第113-149行
void HttpStream::Write(const std::string& json_data) {
    auto parsed = nlohmann::json::parse(json_data);

    // 检查是否是请求的响应(有 "id" 和 "result"/"error")
    if (parsed.contains("id") && (parsed.contains("result") || parsed.contains("error"))) {
        std::string id_str = /* 提取 ID */;

        std::lock_guard<std::mutex> lock(pending_mutex_);
        auto it = pending_requests_.find(id_str);
        if (it != pending_requests_.end()) {
            // 找到了!通过 promise 传递响应
            it->second->promise.set_value(json_data);
            pending_requests_.erase(it);  // 清理
            return;  // 不需要走 SSE 推送
        }
    }

    // 不是请求响应,是服务器通知 → 放入 SSE 队列
    if (sse_stream_active_.load()) {
        std::lock_guard<std::mutex> lock(sse_mutex_);
        sse_notifications_.push(json_data);
        sse_cv_.notify_one();
    }
}

步骤 3:POST 处理函数等待 promise

// HttpStreamTransport.cpp 第283-307行
// 等待 Server 处理并调用 Write() 返回结果
auto status = response_future.wait_for(std::chrono::seconds(30));
if (status == std::future_status::timeout) {
    // 超时:清理并返回 504
    {
        std::lock_guard<std::mutex> lock(pending_mutex_);
        pending_requests_.erase(id_str);
    }
    res.status = 504;
    res.set_content("{\"error\":\"Request timed out\"}", "application/json");
    return;
}

std::string response_data = response_future.get();

// 将响应直接返回给客户端
res.status = 200;
res.set_content(response_data, "application/json");
res.set_header("Mcp-Session-Id", session_id_);
数据流总结

通知(Notifications)

在 HTTP Stream 中,通知消息(没有 id 字段的 JSON-RPC 消息)不走 promise/future 匹配,而是直接确认并返回 202

// HttpStreamTransport.cpp 第237-253行
bool is_notification = !parsed.contains("id");

if (is_notification) {
    // 将通知放入队列供 Server 处理
    {
        std::lock_guard<std::mutex> lock(incoming_mutex_);
        incoming_messages_.push(message);
    }
    incoming_cv_.notify_one();

    // 通知只返回 202 Accepted
    res.status = 202;
    res.set_header("Mcp-Session-Id", session_id_);
    return;
}

服务器的主动通知(如 tools/list_changed)通过 GET /mcp 的 SSE 流推送:

// HttpStreamTransport.cpp 第385-387行(HandleGetSSE 中的内容提供者)
if (!sse_notifications_.empty()) {
    std::string message = sse_notifications_.front();
    sse_notifications_.pop();
    std::string sse_msg = "event: message\ndata: " + message + "\n\n";
    sink.write(sse_msg.data(), sse_msg.size());
}

项目代码逐行分析:HttpStreamTransport.cpp

构造函数(第 32-35 行)
HttpStream::HttpStream(int port, std::string host)
    : port_(port), host_(std::move(host)),	
      server_(std::make_unique<httplib::Server>()) {
    SetupRoutes();
}

与 SSE Transport 构造函数几乎相同。

SetupRoutes(第 163-183 行)
void HttpStream::SetupRoutes() {
    server_->Options("/.*", ...);     // CORS 预检
    server_->Get("/health", ...);     // 健康检查
    server_->Post("/mcp", ...);       // 处理客户端请求
    server_->Get("/mcp", ...);        // SSE 流(接收服务器通知)
    server_->Delete("/mcp", ...);     // 终止会话
}

对比 SSE 的路由:SSE 用 /messages/sse 两个端点,HTTP Stream 统一用 /mcp 一个端点。

Read()(第 93-111 行)
std::pair<size_t, std::string> HttpStream::Read() {
    std::unique_lock<std::mutex> lock(incoming_mutex_);
    incoming_cv_.wait(lock, [this]() {
        return !incoming_messages_.empty() || !server_running_.load();
    });
    // 取出消息...
}

与 SSE 的 Read() 逻辑完全相同——都是从 incoming_messages_ 队列中取消息。

Write()(第 113-149 行)

这是 HTTP Stream 最复杂的部分。与前两个 Transport 不同,Write() 需要区分请求响应服务器通知

void HttpStream::Write(const std::string& json_data) {
    if (!client_connected_.load()) return;

    auto parsed = nlohmann::json::parse(json_data);

    // 情况1:请求的响应 → 通过 promise 返回
    if (parsed.contains("id") && (parsed.contains("result") || parsed.contains("error"))) {
        // 匹配 pending_requests_,通过 promise.set_value() 传递
    }

    // 情况2:服务器通知 → 通过 SSE 流推送
    if (sse_stream_active_.load()) {
        sse_notifications_.push(json_data);
        sse_cv_.notify_one();
    }
}
Stop()(第 61-91 行)
void HttpStream::Stop() {
    server_running_.store(false);
    server_->stop();
    incoming_cv_.notify_all();
    sse_cv_.notify_all();

    // 清理所有 pending requests,防止内存泄漏
    {
        std::lock_guard<std::mutex> lock(pending_mutex_);
        for (auto& [id, pending] : pending_requests_) {
            try {
                pending->promise.set_value("");  // 唤醒等待中的线程
            } catch (...) {}
        }
        pending_requests_.clear();
    }
}

三种传输模式完整对比

总览

维度 Stdio SSE HTTP Stream
C++ 类 vx::transport::Stdio vx::transport::SSE vx::transport::HttpStream
源文件 StdioTransport.cpp SseTransport.cpp HttpStreamTransport.cpp
传输介质 stdin/stdout HTTP (TCP port) HTTP (TCP port)
默认端口 8080 8080
通信方向 双向(请求-响应) 半双向(POST请求 + SSE推送) 全双向(POST + SSE 通知)
连接数量 1对1(单客户端) 1对1(当前实现单客户端) 1对1(当前实现单会话)
启动方式 ./mcp_server ./mcp_server -s ./mcp_server -t
GetName() "stdio" "sse" "httpstream"
GetVersion() "0.2" "0.4" "0.1"
Start() 空操作(返回 true) 在独立线程中启动 HTTP 服务器 在独立线程中启动 HTTP 服务器

架构对比

数据流对比

Stdio 的数据流

最简单直接。没有队列,没有线程间通信。一切都在主线程中顺序执行。

SSE 的数据流

两个队列将网络 I/O 与业务逻辑分离:

  • incoming_messages_:HTTP 处理线程写入 → 主线程 Read() 读取
  • outgoing_messages_:主线程 Write() 写入 → content_provider 线程读取并通过 SSE 发送
HTTP Stream 的数据流

三步走:

  1. 消息进入 incoming_messages_,同时创建 PendingRequest
  2. Server 处理完后调用 Write()Write() 识别是否匹配到 pending_requests_
  3. POST 处理函数从 future 获取结果并返回

各自的优缺点

Stdio

:::color1
优点

  • 零网络开销(不经过网络协议栈)
  • 零依赖(不需要 HTTP 库)
  • 代码最简单(StdioTransport.cpp 只有 68 行)
  • 天然单客户端(stdin/stdout 只有一个来源/目标)
  • 无需端口管理

:::

:::color4
缺点

  • 只能本地使用(无法远程访问)
  • 客户端必须是能启动子进程的程序(如 Claude Desktop)
  • 无法在浏览器中使用
  • 不支持多客户端

:::

SSE

:::color1
优点

  • 浏览器原生支持(EventSource API)
  • 适合服务器主动推送
  • 实现相对简单
  • HTTP 协议通用性好(可通过代理、防火墙)

:::

:::color4
缺点

  • 请求和响应走不同通道,实现复杂
  • 当前实现只支持单客户端
  • POST 请求和 SSE 连接之间的会话管理通过 URL 参数,不够优雅
  • SSE 只支持文本数据(在 MCP 场景中足够,因为所有消息都是 JSON)
  • 没有内置的请求-响应匹配,客户端需要自行关联

:::

HTTP Stream

:::color1
优点

  • 请求响应走同一通道(POST 直接返回响应),逻辑更清晰
  • 基于 HTTP 头部的会话管理更规范
  • 支持显式的会话终止(DELETE)
  • Content-Type 协商更严格
  • 更符合 RESTful 风格

:::

:::color4
缺点

  • 实现最复杂(HttpStreamTransport.cpp 约 440 行)
  • 当前实现只支持单会话
  • 使用 promise/future 增加了实现复杂度
  • 超时处理增加了边界情况

:::

何时使用哪种传输

在 MCP 规范中:

  • Stdio 是基础传输,所有 MCP 服务器都应该支持
  • SSE 是早期规范中的 Web 传输方式
  • Streamable HTTP(本项目中的 HTTP Stream)是较新的 MCP 规范推荐传输方式,正在逐步取代 SSE

ITransport 接口的统一

三种 Transport 都实现了 ITransport 接口:

// ITransport.h - 第33-48行
class ITransport {
public:
    virtual bool Start() = 0;
    virtual void Stop() = 0;
    virtual bool IsRunning() = 0;

    virtual std::pair<size_t, std::string> Read() = 0;
    virtual void Write(const std::string& json_data) = 0;

    virtual std::future<std::pair<size_t, std::string>> ReadAsync() = 0;
    virtual std::future<void> WriteAsync(const std::string& json_data) = 0;

    virtual std::string GetName() = 0;
    virtual std::string GetVersion() = 0;
    virtual int GetPort() = 0;
};

这正是面向接口编程的体现。Server 类不需要知道它使用的是哪种传输方式,只需要调用 transport->Read()transport->Write()

// Server.cpp 第107-159行 - Server::Connect()
bool Server::Connect(const std::shared_ptr<ITransport>& transport) {
    transport_ = transport;
    transport_->Start();
    while (!isStopping_) {
        auto [length, json_string] = transport->Read();
        json response = HandleRequest(request);
        transport_->Write(response.dump());
    }
    transport_->Stop();
}

无论是 Stdio、SSE 还是 HTTP Stream,这段代码的逻辑都不需要改变。

主函数中的传输选择

// main.cpp 第120-126行
if (use_sse_server->count() > 0) {
    transport = std::make_shared<vx::transport::SSE>();
} else if (use_httpstream_server->count() > 0) {
    transport = std::make_shared<vx::transport::HttpStream>();
} else {
    transport = std::make_shared<vx::transport::Stdio>();
}

然后用统一的接口使用:

// main.cpp 第339行
server->Connect(transport);

这就是策略模式在项目中的实际应用。


Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐