前言:

        我用的Kimi的API服务这里只讲流式传输的的改造基础部分的详细教程看前一篇文章Kimi模型调用联动前端简单交互页面-CSDN博客文章浏览阅读71次。Flask 是一个轻量级的 Python Web 框架,适用于快速构建 Web 应用和 API。它提供了基本的 Web 应用功能,比如路由、请求处理、模板渲染等,并且易于扩展。Flask 通常用于构建 RESTful API、Web 应用等。轻量、易上手、灵活,可以根据需要选择添加扩展。https://blog.csdn.net/m0_75149710/article/details/143933044?fromshare=blogdetail&sharetype=blogdetail&sharerId=143933044&sharerefer=PC&sharesource=m0_75149710&sharefrom=from_link

1.Python程序

1. 导入所需的包

from flask import Flask, Response, request
from flask_cors import CORS
from openai import OpenAI
  • Flask:用于创建 Web 应用的框架。它帮助我们轻松地处理 HTTP 请求和路由。
  • Response:用于构造 HTTP 响应。它允许我们通过流式传输的方式发送数据给客户端。
  • request:从客户端接收请求的数据,这里用来提取用户发送的消息。
  • CORS:允许跨域资源共享,用于解决前后端分离架构中的跨域问题。
  • OpenAI:这是 OpenAI 的官方 Python 客户端库,帮助我们与 OpenAI 的 API 进行交互。

2. 初始化 Flask 和 OpenAI 客户端 

app = Flask(__name__)
CORS(app)
client = OpenAI(
    api_key="API_KEY",  # 替换为你的 API 密钥
    base_url="https://api.moonshot.cn/v1",
)
  • app = Flask(name):创建一个 Flask 应用实例。
  • CORS(app):启用跨域请求支持,允许不同域的客户端(如前端应用)访问此后端 API。
  • client = OpenAI(...):初始化 OpenAI 客户端,传入 API 密钥和基础 URL(用来指向 OpenAI 服务端)。API 密钥用于身份验证。

3. 定义流式响应函数 

def stream_response(stream):
    """
    生成器函数,用于从 OpenAI API 流式传输响应
    :param stream: OpenAI API 响应流
    :return: 逐个返回 API 中的内容块
    """
    str001 = ''
    for chunk in stream:
        delta = chunk.choices[0].delta  # 从响应中提取 delta 内容

        if hasattr(delta, 'content') and delta.content:  # 检查 delta 是否有 'content' 属性
            print(f"Streaming content: {delta.content}")  # 打印内容块到控制台
            str001 += delta.content  # 追加内容到 str001
            yield delta.content  # 实时返回每个内容块
    print(f"Final str001: {str001}")  # 打印最终内容到控制台
  • stream_response(stream):这是一个生成器函数,用来处理 OpenAI 的流式响应。生成器会逐步接收流数据,并逐个返回每个内容块。每当接收到一个新的响应块时,内容会被打印到控制台并添加到一个字符串中。
  • chunk.choices[0].delta:OpenAI 的 API 响应包含多个部分,这里提取 delta 部分,这是流式数据的增量内容。
  • yield delta.content:生成器返回每次接收到的内容块,客户端会实时接收到这些数据。

4. 定义 Flask 路由 

 

@app.route("/stream-chat", methods=["POST"])
def stream_chat():
    user_message = request.json.get("message", "")  # 从请求中获取用户消息

    # 向 OpenAI API 请求流式响应
    stream = client.chat.completions.create(
        model="moonshot-v1-8k",  # 使用的 AI 模型
        messages=[  # 消息内容,包括系统消息和用户消息
            {"role": "system", "content": "你是 Kimi,由 Moonshot AI 提供的人工智能助手。"},
            {"role": "user", "content": user_message}
        ],
        temperature=0.3,  # 控制响应的随机性
        stream=True  # 开启流式响应
    )

    # 将流式响应发送回客户端
    return Response(stream_response(stream), content_type="text/event-stream")
  • @app.route("/stream-chat", methods=["POST"]):定义了一个 POST 路由,当客户端向 /stream-chat 发送 POST 请求时,调用 stream_chat() 函数处理请求。
  • request.json.get("message", ""):从请求的 JSON 数据中提取用户发送的消息(message)。如果没有提供消息,则默认值为空字符串。
  • client.chat.completions.create(...):通过 OpenAI API 请求流式生成的聊天回复,model="moonshot-v1-8k" 是使用的模型,messages 中包含了系统消息和用户消息,stream=True 表示请求流式响应。
  • Response(stream_response(stream), content_type="text/event-stream"):调用 stream_response 函数来获取流式数据,并将其作为响应返回给客户端。content_type="text/event-stream" 设置了响应类型为事件流格式,这是一种用于流式数据的标准格式。

5. 启动 Flask 应用 

if __name__ == "__main__":
    app.run(debug=True)

 app.run(debug=True):启动 Flask 应用,在开发模式下运行,启用调试模式,可以自动重新加载代码并输出详细错误信息

6.工作原理

  1. 用户请求:客户端(如前端网页)通过 POST 请求向服务器的 /stream-chat 路由发送数据,数据包含用户的消息。
  2. 流式响应:服务器接收到消息后,向 OpenAI API 请求流式响应。OpenAI 会逐步生成响应内容,并通过流式方式传输回服务器。
  3. 实时返回给客户端:服务器通过 stream_response 函数将 OpenAI 生成的每个内容块实时传输给客户端,客户端可以立即显示响应,而不是等待完整的响应完成。
  4. 最终响应:当所有内容生成完毕时,服务器会结束流式传输。

7.完成程序

from flask import Flask, Response, request
from flask_cors import CORS
from openai import OpenAI

# 初始化 Flask 应用和 OpenAI 客户端
app = Flask(__name__)
CORS(app)  # 启用跨域支持
client = OpenAI(
    api_key="自己的API Key",  # 替换为你的 API 密钥
    base_url="https://api.moonshot.cn/v1",  # 设置 OpenAI API 的基础 URL
)

def stream_response(stream):
    """
    生成器函数,用于从 OpenAI API 流式传输响应
    :param stream: OpenAI API 响应流
    :return: 逐个返回 API 中的内容块
    """
    str001 = ''
    for chunk in stream:
        delta = chunk.choices[0].delta  # 从响应中提取 delta 内容

        if hasattr(delta, 'content') and delta.content:  # 检查 delta 是否包含 'content' 属性
            print(f"Streaming content: {delta.content}")  # 打印内容块到控制台
            str001 += delta.content  # 将内容追加到 str001 字符串中
            yield delta.content  # 返回每个内容块
    print(f"Final str001: {str001}")  # 打印最终内容到控制台

@app.route("/stream-chat", methods=["POST"])
def stream_chat():
    user_message = request.json.get("message", "")  # 从请求中获取用户消息

    # 向 OpenAI API 请求流式响应
    stream = client.chat.completions.create(
        model="moonshot-v1-8k",  # 使用的 AI 模型
        messages=[  # 消息内容,包括系统消息和用户消息
            {"role": "system", "content": "你是 Kimi,由 Moonshot AI 提供的人工智能助手。"},
            {"role": "user", "content": user_message}
        ],
        temperature=0.3,  # 控制响应的随机性
        stream=True  # 启用流式响应
    )

    # 将流式响应返回给客户端
    return Response(stream_response(stream), content_type="text/event-stream")

if __name__ == "__main__":
    app.run(debug=True)  # 启动 Flask 应用,开启调试模式

 2.前端页面

1.完成页面地址

AI接口调用之流式: 流式传输前端页面icon-default.png?t=O83Ahttps://gitee.com/fallen-grazing-and-red-dust/flow-of-ai-interface-calls.git

2.JS流式接收详解部分

1.创建updateMessageContent函数用来动态追加后续接收到的内容

// 动态更新消息内容的函数
function updateMessageContent(messageElement, content) {
  messageElement.innerText += content; // 动态追加内容,不换行
  chatDisplay.scrollTop = chatDisplay.scrollHeight; // 自动滚动到底部
}

2.流式数据处理的核心部分

 

function fetchServerResponse(userMessage, messageElement) {
  fetch("http://localhost:5000/stream-chat", {
    method: "POST", // 使用 POST 方法发送请求
    headers: {
      "Content-Type": "application/json", // 设置请求头为 JSON 格式
    },
    body: JSON.stringify({ message: userMessage }), // 将用户消息作为请求体发送
  })
    .then((response) => {
      if (!response.ok) {
        throw new Error("服务器错误"); // 如果响应状态不正常,则抛出错误
      }
      const reader = response.body.getReader(); // 获取响应的流读取器
      const decoder = new TextDecoder("utf-8"); // 用于解码字节流为字符串

      // 逐块读取响应流
      function readChunk() {
        return reader.read().then(({ done, value }) => {
          if (done) return; // 如果读取完成,则退出
          const chunk = decoder.decode(value); // 解码当前块
          updateMessageContent(messageElement, chunk); // 更新消息内容
          return readChunk(); // 递归读取下一块
        });
      }

      return readChunk(); // 开始读取流
    })
    .catch((error) => {
      console.error("错误:", error); // 在控制台打印错误信息
      updateMessageContent(messageElement, "无法连接到服务器。请稍后再试。"); // 显示错误提示
    });
}

3. 关键步骤详解

(1) 发送请求
fetch("http://localhost:5000/stream-chat", {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
  },
  body: JSON.stringify({ message: userMessage }),
})

 

  • fetch 是 JavaScript 的内置函数,用于发送 HTTP 请求。
  • 请求 URL 是 http://localhost:5000/stream-chat,需要与服务器的路由一致。
  • 请求方法是 POST,请求体包含用户发送的消息:{ message: userMessage }
  • 服务器响应的是流式数据。
(2) 检查响应状态 
if (!response.ok) {
  throw new Error("服务器错误");
}
  • response.ok 检查服务器返回的状态码是否在 200-299 之间。
  • 如果服务器返回了错误状态码(如 404 或 500),会抛出错误,并进入 catch 块。
 (3) 获取响应流读取器
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
  • response.body.getReader():获取响应的字节流读取器。流式数据以字节块(Uint8Array)的形式返回。
  • TextDecoder:将字节解码为 UTF-8 字符串,适用于处理文本流。
 (4) 逐块读取数据
function readChunk() {
  return reader.read().then(({ done, value }) => {
    if (done) return; // 如果流结束,则退出
    const chunk = decoder.decode(value); // 解码当前块
    updateMessageContent(messageElement, chunk); // 动态更新消息内容
    return readChunk(); // 递归读取下一块
  });
}

 

  • reader.read():读取流中的下一个块。
  • 返回一个 Promise,解析后的值是一个对象:
    • value:当前块的数据,类型为 Uint8Array
    • done:布尔值,表示流是否已结束。
  • 解码数据块
    • 使用 decoder.decode(value) 将字节数组解码为可读的字符串。
  • 更新消息内容
    • 调用 updateMessageContent(messageElement, chunk) 动态更新聊天框内容,将接收到的数据逐块追加。
  • 递归读取
    • 通过递归调用 readChunk(),实现持续读取直到流结束。
 (5) 处理错误
 
.catch((error) => {
  console.error("错误:", error);
  updateMessageContent(messageElement, "无法连接到服务器。请稍后再试。");
});
  • 如果网络或服务器出错,会捕获异常。
  • 显示错误提示“无法连接到服务器”。

4. 流程总结 

  • 用户点击发送按钮。
  • 消息通过 fetch 发送到服务器。
  • 服务器返回流式数据,客户端逐块接收:
    • 使用 getReader() 获取流读取器。
    • 使用 read() 循环读取数据块。
    • 解码块并动态更新聊天框内容。
  • 如果出现错误,则显示提示信息。

 5. 效果演示

流式传输视频展示

 

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐