在现代Web开发中,实时通信已经成为许多应用的核心需求。从聊天应用到股票市场更新,从游戏服务器到AI模型通信,各种技术应运而生以满足这些需求。最近,Model Context Protocol (MCP) 引入了一种新的传输机制 —— Streamable HTTP,它为服务器到客户端的实时通信提供了更优雅的解决方案。本文将深入探讨Streamable HTTP相较于Server-Sent Events (SSE)的优势,并通过实际代码示例展示其实现。

实时通信技术的演进

在深入Streamable HTTP之前,我们先简要回顾一下Web实时通信技术的发展历程:

长轮询 (Long Polling)

长轮询是早期使用的一种"黑客"方式,用于在浏览器中通过HTTP实现服务器-客户端消息传递。客户端发送HTTP请求,服务器保持连接开放直到有新数据可用。一旦服务器发送响应,客户端立即发起新的请求。这种方法虽然简单,但效率低下且可能导致消息丢失。

// 长轮询的JavaScript客户端实现
function longPoll() {
    fetch('http://example.com/poll')
        .then(response => response.json())
        .then(data => {
            console.log("接收到数据:", data);
            longPoll(); // 立即建立新的长轮询请求
        })
        .catch(error => {
            // 10秒后重试
            setTimeout(longPoll, 10000);
        });
}
longPoll(); // 启动长轮询

WebSockets

WebSockets提供了一种全双工通信机制,允许在单个长连接上双向传输数据。这项技术克服了HTTP请求-响应周期的开销,非常适合低延迟、高频率更新的场景。

// WebSocket的JavaScript客户端实现
const socket = new WebSocket('ws://example.com');

socket.onopen = function(event) {
  console.log('连接已建立');
  // 向服务器发送消息
  socket.send('你好,服务器!');
};

socket.onmessage = function(event) {
  console.log('来自服务器的消息:', event.data);
};

尽管WebSocket API基础使用简单,但在生产环境中处理连接断开、重连和心跳检测等问题是相当复杂的。

Server-Sent Events (SSE)

Server-Sent Events提供了一种标准方式,通过HTTP将服务器更新推送到客户端。与WebSockets不同,SSE专为单向通信(从服务器到客户端)设计,适用于新闻推送、体育比分更新等场景。

// SSE的JavaScript客户端实现
const evtSource = new EventSource("https://example.com/events");

// 处理消息事件
evtSource.onmessage = event => {
    console.log('收到消息: ' + event.data);
};

SSE的优点是自动重连及使用标准HTTP协议,但它存在一些局限性,如:

  • 需要维护长期连接
  • 浏览器限制每个域名最多6个并发连接
  • 在企业环境中可能受到代理和防火墙的限制

Streamable HTTP:新一代实时通信方案

Streamable HTTP是MCP协议在2025年3月引入的一种新传输机制,旨在取代之前的HTTP+SSE传输模式。它的设计理念是在保留SSE优点的同时克服其限制,特别是提供更好的可扩展性和企业环境兼容性。

Streamable HTTP的工作原理

Streamable HTTP的核心思想是提供一个统一的HTTP端点,同时支持POST和GET方法:

  1. POST方法:用于客户端向服务器发送请求和接收响应
  2. GET方法(可选):用于建立SSE流,接收服务器实时推送的消息

与传统HTTP+SSE不同,Streamable HTTP不要求维护单独的初始化连接和消息端点,简化了协议设计并提高了可靠性。

Streamable HTTP相比SSE的五大优势

1. 简化的通信模型

传统的HTTP+SSE方法需要两个不同的端点:一个用于建立连接,另一个用于发送消息。而Streamable HTTP提供了一个统一的端点,简化了客户端和服务器之间的交互。

传统SSE实现(两个端点):

// 服务器端(传统SSE)
router.get("/connect", async (req, res) => {
    const transport = new SSEServerTransport(POST_ENDPOINT, res);
    transports[transport.sessionId] = transport;
    await server.connect(transport);
});

router.post(POST_ENDPOINT, async (req, res) => {
    const sessionId = req.query.sessionId;
    if (!transports[sessionId]) {
        res.status(400).send({ message: "无效的会话ID" });
        return;
    }
    await transports[sessionId].handlePostMessage(req, res, req.body);
});

Streamable HTTP实现(单一端点):

// 服务器端(Streamable HTTP)
app.post("/mcp", async (req, res) => {
    const sessionId = req.headers['mcp-session-id'];
    
    if (sessionId && transports[sessionId]) {
        // 使用现有传输
        await transports[sessionId].handleRequest(req, res, req.body);
        return;
    }
    
    if (!sessionId && isInitializeRequest(req.body)) {
        // 创建新传输
        const transport = new StreamableHTTPServerTransport({
            sessionIdGenerator: () => randomUUID(),
        });
        await server.connect(transport);
        await transport.handleRequest(req, res, req.body);
        
        const newSessionId = transport.sessionId;
        if (newSessionId) {
            transports[newSessionId] = transport;
        }
        return;
    }
    
    res.status(400).json({ error: "无效的请求" });
});

app.get("/mcp", async (req, res) => {
    const sessionId = req.headers['mcp-session-id'];
    if (!sessionId || !transports[sessionId]) {
        res.status(400).json({ error: "无效的会话ID" });
        return;
    }
    
    await transports[sessionId].handleRequest(req, res);
});

2. 支持无状态模式

Streamable HTTP的一个重要创新是支持完全无状态操作。通过设置sessionIdGenerator: () => undefined,服务器可以在不维护会话状态的情况下处理请求,非常适合无服务器环境。

// 无状态模式配置
const transport = new StreamableHTTPServerTransport({
    sessionIdGenerator: () => undefined, // 启用无状态模式
});

这种无状态模式特别适合:

  • 部署在AWS Lambda、Azure Functions等无服务器环境
  • 短暂交互而非长期连接的场景
  • 需要最小化服务器内存使用的应用

3. 更好的可伸缩性

由于Streamable HTTP可以在无状态模式下运行,它非常适合容器化和自动扩展场景。服务器不需要维护长期连接,可以根据请求动态分配资源,显著提高可伸缩性。

这解决了SSE的一个主要问题:当有大量客户端时,每个客户端都需要维持一个长连接,可能导致服务器资源耗尽。使用Streamable HTTP的无状态模式,服务器只在处理请求时分配资源,处理完成后即可释放。

4. 提高的可靠性

Streamable HTTP的简化设计减少了出错机会:

  • 会话管理:在有状态模式下,会话ID通过HTTP头而非查询参数传递,减少安全风险
  • 重连处理:客户端可以在会话有效期内随时重连,无需复杂的重连逻辑
  • 错误恢复:简化的协议使错误处理和恢复更加直观

5. 更好的企业环境兼容性

在企业环境中,代理服务器和防火墙常常会阻止非标准HTTP连接。Streamable HTTP使用标准HTTP通信,大大减少了这类问题:

  • 使用标准HTTP POST和GET,无需特殊配置
  • 不依赖长连接,减少代理超时问题
  • 会话ID通过HTTP头传递,更符合企业安全要求

实现Streamable HTTP的最佳实践

以下是一个实现Streamable HTTP服务器的完整示例,包含了所有最佳实践:

import express from 'express';
import { Request, Response } from 'express';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { randomUUID } from 'crypto';

// 创建MCP服务器
const server = new Server({
  name: "streamable-http-demo",
  version: "1.0.0"
}, {
  capabilities: {
    tools: {},
    logging: {}
  }
});

// 创建Express应用
const app = express();
app.use(express.json());

// 会话存储
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};

// 单一MCP端点
const MCP_ENDPOINT = "/mcp";

// 处理POST请求
app.post(MCP_ENDPOINT, async (req: Request, res: Response) => {
  const sessionId = req.headers['mcp-session-id'] as string | undefined;

  try {
    // 1. 重用现有会话
    if (sessionId && transports[sessionId]) {
      await transports[sessionId].handleRequest(req, res, req.body);
      return;
    }

    // 2. 创建新会话(初始化请求)
    if (!sessionId && isInitializeRequest(req.body)) {
      const transport = new StreamableHTTPServerTransport({
        sessionIdGenerator: () => randomUUID(),
        // 无状态模式: sessionIdGenerator: () => undefined
      });

      await server.connect(transport);
      await transport.handleRequest(req, res, req.body);

      // 存储新会话
      const newSessionId = transport.sessionId;
      if (newSessionId) {
        transports[newSessionId] = transport;
        console.log(`新会话创建: ${newSessionId}`);
      }
      return;
    }

    // 3. 处理错误情况
    res.status(400).json(createErrorResponse("无效的会话ID或请求方法"));
  } catch (error) {
    console.error('处理请求出错:', error);
    res.status(500).json(createErrorResponse("内部服务器错误"));
  }
});

// 处理GET请求(SSE流)
app.get(MCP_ENDPOINT, async (req: Request, res: Response) => {
  const sessionId = req.headers['mcp-session-id'] as string | undefined;

  if (!sessionId || !transports[sessionId]) {
    res.status(400).json(createErrorResponse("无效的会话ID"));
    return;
  }

  try {
    const transport = transports[sessionId];
    await transport.handleRequest(req, res);
    console.log(`SSE流已为会话 ${sessionId} 建立`);
  } catch (error) {
    console.error('建立SSE流出错:', error);
    if (!res.headersSent) {
      res.status(500).json(createErrorResponse("建立SSE流出错"));
    }
  }
});

// 辅助函数
function isInitializeRequest(body: any): boolean {
  return body && body.method === 'initialize';
}

function createErrorResponse(message: string): any {
  return {
    jsonrpc: '2.0',
    error: {
      code: -32000,
      message: message,
    },
    id: null,
  };
}

// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`服务器运行在 http://localhost:${PORT}`);
});

在客户端,使用Streamable HTTP也非常直观:

import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
import { URL } from 'url';

async function main() {
  // 1. 创建客户端
  const client = new Client({ 
    name: "streamable-http-client", 
    version: "1.0.0" 
  });
  
  // 2. 创建传输实例
  const transport = new StreamableHTTPClientTransport(new URL('http://localhost:3000/mcp'));
  
  // 3. 连接到服务器
  await client.connect(transport);
  console.log('已连接到服务器');
  
  // 4. 设置通知处理
  transport.onmessage = (message) => {
    console.log('接收到消息:', message);
  };
  
  // 5. 调用工具示例
  const result = await client.callTool({
    name: 'example-tool',
    arguments: { param: 'value' },
  });
  
  console.log('工具调用结果:', result);
}

main().catch(console.error);

处理Streamable HTTP的常见挑战

尽管Streamable HTTP提供了许多优势,但在实际应用中仍需注意一些挑战:

1. 会话管理

在有状态模式下,需要妥善管理会话资源,避免内存泄漏。一个好的实践是设置会话超时机制:

// 会话超时管理
function setupSessionTimeout(sessionId, timeoutMs = 30 * 60 * 1000) {
  const timeoutId = setTimeout(() => {
    if (transports[sessionId]) {
      console.log(`会话 ${sessionId} 已超时,正在清理`);
      delete transports[sessionId];
    }
  }, timeoutMs);
  
  // 存储超时ID以便可以取消
  sessionTimeouts[sessionId] = timeoutId;
}

// 在活动时重置超时
function resetSessionTimeout(sessionId) {
  if (sessionTimeouts[sessionId]) {
    clearTimeout(sessionTimeouts[sessionId]);
    setupSessionTimeout(sessionId);
  }
}

2. 断线重连策略

客户端应实现断线重连策略,特别是在移动网络等不稳定环境中:

class MCPClient {
  // ...
  
  async connectWithRetry(url, maxRetries = 5, delayMs = 1000) {
    let retries = 0;
    
    while (retries < maxRetries) {
      try {
        await this.connect(url);
        console.log('连接成功');
        return;
      } catch (error) {
        retries++;
        console.log(`连接失败,第 ${retries} 次重试...`);
        await new Promise(resolve => setTimeout(resolve, delayMs));
        // 指数退避
        delayMs *= 1.5;
      }
    }
    
    throw new Error('连接失败,已达到最大重试次数');
  }
}

3. 处理未送达事件

当客户端断线重连时,可能会错过服务器发送的事件。一个解决方案是使用事件ID和断点续传:

// 服务器端
app.post("/mcp", async (req, res) => {
  // ...
  
  // 包含最后事件ID
  const lastEventId = req.headers['last-event-id'];
  if (lastEventId && sessionId) {
    // 发送错过的事件
    await sendMissedEvents(transport, lastEventId);
  }
});

// 客户端
let lastEventId = null;

transport.onmessage = (message) => {
  if (message.id) {
    lastEventId = message.id;
    localStorage.setItem('lastEventId', lastEventId);
  }
};

// 重连时包含最后事件ID
async function reconnect() {
  lastEventId = localStorage.getItem('lastEventId');
  const headers = {};
  if (lastEventId) {
    headers['last-event-id'] = lastEventId;
  }
  
  transport = new StreamableHTTPClientTransport(url, { headers });
  await client.connect(transport);
}

结论

Streamable HTTP代表了Web实时通信的一个重要进步,特别适合需要在各种环境中可靠工作的企业应用。与传统SSE相比,它提供了更简化的通信模型、可选的无状态模式、更好的可伸缩性、提高的可靠性以及更好的企业环境兼容性。

随着越来越多的服务采用MCP协议,Streamable HTTP有望成为构建实时通信应用的首选方法,特别是在AI工具集成和企业应用领域。

如果你正在考虑在项目中实现实时通信,Streamable HTTP绝对值得考虑,尤其是当你的应用需要在各种网络环境中可靠运行,或者部署在无服务器环境中时。

参考资料

  1. Model Context Protocol官方规范:https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http
  2. MCP TypeScript SDK:https://github.com/modelcontextprotocol/typescript-sdk
  3. RxDB博客 - WebSockets vs SSE vs 长轮询:https://rxdb.info/articles/websockets-sse-polling-webrtc-webtransport.html
  4. 使用SSE代替WebSockets的场景:https://blog.csdn.net/adcwa/article/details/146942148
  5. MCP服务器实现示例:https://medium.com/@itsuki.enjoy/mcp-server-and-client-with-sse-the-new-streamable-http-d860850d9d9d
Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐