AI模型调用之流式传输
我用的Kimi的API服务这里只讲流式传输的的改造基础部分的详细教程看前一篇文章Kimi模型调用联动前端简单交互页面-CSDN博客文章浏览阅读71次。Flask 是一个轻量级的 Python Web 框架,适用于快速构建 Web 应用和 API。它提供了基本的 Web 应用功能,比如路由、请求处理、模板渲染等,并且易于扩展。Flask 通常用于构建 RESTful API、Web 应用等。轻量、易
前言:
我用的Kimi的API服务这里只讲流式传输的的改造基础部分的详细教程看前一篇文章Kimi模型调用联动前端简单交互页面-CSDN博客文章浏览阅读71次。Flask 是一个轻量级的 Python Web 框架,适用于快速构建 Web 应用和 API。它提供了基本的 Web 应用功能,比如路由、请求处理、模板渲染等,并且易于扩展。Flask 通常用于构建 RESTful API、Web 应用等。轻量、易上手、灵活,可以根据需要选择添加扩展。https://blog.csdn.net/m0_75149710/article/details/143933044?fromshare=blogdetail&sharetype=blogdetail&sharerId=143933044&sharerefer=PC&sharesource=m0_75149710&sharefrom=from_link
1.Python程序
1. 导入所需的包
from flask import Flask, Response, request
from flask_cors import CORS
from openai import OpenAI
- Flask:用于创建 Web 应用的框架。它帮助我们轻松地处理 HTTP 请求和路由。
- Response:用于构造 HTTP 响应。它允许我们通过流式传输的方式发送数据给客户端。
- request:从客户端接收请求的数据,这里用来提取用户发送的消息。
- CORS:允许跨域资源共享,用于解决前后端分离架构中的跨域问题。
- OpenAI:这是 OpenAI 的官方 Python 客户端库,帮助我们与 OpenAI 的 API 进行交互。
2. 初始化 Flask 和 OpenAI 客户端
app = Flask(__name__)
CORS(app)
client = OpenAI(
api_key="API_KEY", # 替换为你的 API 密钥
base_url="https://api.moonshot.cn/v1",
)
- app = Flask(name):创建一个 Flask 应用实例。
- CORS(app):启用跨域请求支持,允许不同域的客户端(如前端应用)访问此后端 API。
- client = OpenAI(...):初始化 OpenAI 客户端,传入 API 密钥和基础 URL(用来指向 OpenAI 服务端)。API 密钥用于身份验证。
3. 定义流式响应函数
def stream_response(stream):
"""
生成器函数,用于从 OpenAI API 流式传输响应
:param stream: OpenAI API 响应流
:return: 逐个返回 API 中的内容块
"""
str001 = ''
for chunk in stream:
delta = chunk.choices[0].delta # 从响应中提取 delta 内容
if hasattr(delta, 'content') and delta.content: # 检查 delta 是否有 'content' 属性
print(f"Streaming content: {delta.content}") # 打印内容块到控制台
str001 += delta.content # 追加内容到 str001
yield delta.content # 实时返回每个内容块
print(f"Final str001: {str001}") # 打印最终内容到控制台
- stream_response(stream):这是一个生成器函数,用来处理 OpenAI 的流式响应。生成器会逐步接收流数据,并逐个返回每个内容块。每当接收到一个新的响应块时,内容会被打印到控制台并添加到一个字符串中。
chunk.choices[0].delta
:OpenAI 的 API 响应包含多个部分,这里提取delta
部分,这是流式数据的增量内容。- yield delta.content:生成器返回每次接收到的内容块,客户端会实时接收到这些数据。
4. 定义 Flask 路由
@app.route("/stream-chat", methods=["POST"])
def stream_chat():
user_message = request.json.get("message", "") # 从请求中获取用户消息
# 向 OpenAI API 请求流式响应
stream = client.chat.completions.create(
model="moonshot-v1-8k", # 使用的 AI 模型
messages=[ # 消息内容,包括系统消息和用户消息
{"role": "system", "content": "你是 Kimi,由 Moonshot AI 提供的人工智能助手。"},
{"role": "user", "content": user_message}
],
temperature=0.3, # 控制响应的随机性
stream=True # 开启流式响应
)
# 将流式响应发送回客户端
return Response(stream_response(stream), content_type="text/event-stream")
- @app.route("/stream-chat", methods=["POST"]):定义了一个 POST 路由,当客户端向
/stream-chat
发送 POST 请求时,调用stream_chat()
函数处理请求。 - request.json.get("message", ""):从请求的 JSON 数据中提取用户发送的消息(
message
)。如果没有提供消息,则默认值为空字符串。 - client.chat.completions.create(...):通过 OpenAI API 请求流式生成的聊天回复,
model="moonshot-v1-8k"
是使用的模型,messages
中包含了系统消息和用户消息,stream=True
表示请求流式响应。 - Response(stream_response(stream), content_type="text/event-stream"):调用
stream_response
函数来获取流式数据,并将其作为响应返回给客户端。content_type="text/event-stream"
设置了响应类型为事件流格式,这是一种用于流式数据的标准格式。
5. 启动 Flask 应用
if __name__ == "__main__":
app.run(debug=True)
app.run(debug=True):启动 Flask 应用,在开发模式下运行,启用调试模式,可以自动重新加载代码并输出详细错误信息
6.工作原理
- 用户请求:客户端(如前端网页)通过 POST 请求向服务器的
/stream-chat
路由发送数据,数据包含用户的消息。 - 流式响应:服务器接收到消息后,向 OpenAI API 请求流式响应。OpenAI 会逐步生成响应内容,并通过流式方式传输回服务器。
- 实时返回给客户端:服务器通过
stream_response
函数将 OpenAI 生成的每个内容块实时传输给客户端,客户端可以立即显示响应,而不是等待完整的响应完成。 - 最终响应:当所有内容生成完毕时,服务器会结束流式传输。
7.完成程序
from flask import Flask, Response, request
from flask_cors import CORS
from openai import OpenAI
# 初始化 Flask 应用和 OpenAI 客户端
app = Flask(__name__)
CORS(app) # 启用跨域支持
client = OpenAI(
api_key="自己的API Key", # 替换为你的 API 密钥
base_url="https://api.moonshot.cn/v1", # 设置 OpenAI API 的基础 URL
)
def stream_response(stream):
"""
生成器函数,用于从 OpenAI API 流式传输响应
:param stream: OpenAI API 响应流
:return: 逐个返回 API 中的内容块
"""
str001 = ''
for chunk in stream:
delta = chunk.choices[0].delta # 从响应中提取 delta 内容
if hasattr(delta, 'content') and delta.content: # 检查 delta 是否包含 'content' 属性
print(f"Streaming content: {delta.content}") # 打印内容块到控制台
str001 += delta.content # 将内容追加到 str001 字符串中
yield delta.content # 返回每个内容块
print(f"Final str001: {str001}") # 打印最终内容到控制台
@app.route("/stream-chat", methods=["POST"])
def stream_chat():
user_message = request.json.get("message", "") # 从请求中获取用户消息
# 向 OpenAI API 请求流式响应
stream = client.chat.completions.create(
model="moonshot-v1-8k", # 使用的 AI 模型
messages=[ # 消息内容,包括系统消息和用户消息
{"role": "system", "content": "你是 Kimi,由 Moonshot AI 提供的人工智能助手。"},
{"role": "user", "content": user_message}
],
temperature=0.3, # 控制响应的随机性
stream=True # 启用流式响应
)
# 将流式响应返回给客户端
return Response(stream_response(stream), content_type="text/event-stream")
if __name__ == "__main__":
app.run(debug=True) # 启动 Flask 应用,开启调试模式
2.前端页面
1.完成页面地址
AI接口调用之流式: 流式传输前端页面https://gitee.com/fallen-grazing-and-red-dust/flow-of-ai-interface-calls.git
2.JS流式接收详解部分
1.创建updateMessageContent函数用来动态追加后续接收到的内容
// 动态更新消息内容的函数
function updateMessageContent(messageElement, content) {
messageElement.innerText += content; // 动态追加内容,不换行
chatDisplay.scrollTop = chatDisplay.scrollHeight; // 自动滚动到底部
}
2.流式数据处理的核心部分
function fetchServerResponse(userMessage, messageElement) {
fetch("http://localhost:5000/stream-chat", {
method: "POST", // 使用 POST 方法发送请求
headers: {
"Content-Type": "application/json", // 设置请求头为 JSON 格式
},
body: JSON.stringify({ message: userMessage }), // 将用户消息作为请求体发送
})
.then((response) => {
if (!response.ok) {
throw new Error("服务器错误"); // 如果响应状态不正常,则抛出错误
}
const reader = response.body.getReader(); // 获取响应的流读取器
const decoder = new TextDecoder("utf-8"); // 用于解码字节流为字符串
// 逐块读取响应流
function readChunk() {
return reader.read().then(({ done, value }) => {
if (done) return; // 如果读取完成,则退出
const chunk = decoder.decode(value); // 解码当前块
updateMessageContent(messageElement, chunk); // 更新消息内容
return readChunk(); // 递归读取下一块
});
}
return readChunk(); // 开始读取流
})
.catch((error) => {
console.error("错误:", error); // 在控制台打印错误信息
updateMessageContent(messageElement, "无法连接到服务器。请稍后再试。"); // 显示错误提示
});
}
3. 关键步骤详解
(1) 发送请求
fetch("http://localhost:5000/stream-chat", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ message: userMessage }),
})
fetch
是 JavaScript 的内置函数,用于发送 HTTP 请求。- 请求 URL 是
http://localhost:5000/stream-chat
,需要与服务器的路由一致。 - 请求方法是
POST
,请求体包含用户发送的消息:{ message: userMessage }
。 - 服务器响应的是流式数据。
(2) 检查响应状态
if (!response.ok) {
throw new Error("服务器错误");
}
response.ok
检查服务器返回的状态码是否在 200-299 之间。- 如果服务器返回了错误状态码(如 404 或 500),会抛出错误,并进入
catch
块。
(3) 获取响应流读取器
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
response.body.getReader()
:获取响应的字节流读取器。流式数据以字节块(Uint8Array
)的形式返回。TextDecoder
:将字节解码为 UTF-8 字符串,适用于处理文本流。
(4) 逐块读取数据
function readChunk() {
return reader.read().then(({ done, value }) => {
if (done) return; // 如果流结束,则退出
const chunk = decoder.decode(value); // 解码当前块
updateMessageContent(messageElement, chunk); // 动态更新消息内容
return readChunk(); // 递归读取下一块
});
}
reader.read()
:读取流中的下一个块。- 返回一个
Promise
,解析后的值是一个对象:value
:当前块的数据,类型为Uint8Array
。done
:布尔值,表示流是否已结束。
- 解码数据块:
- 使用
decoder.decode(value)
将字节数组解码为可读的字符串。
- 使用
- 更新消息内容:
- 调用
updateMessageContent(messageElement, chunk)
动态更新聊天框内容,将接收到的数据逐块追加。
- 调用
- 递归读取:
- 通过递归调用
readChunk()
,实现持续读取直到流结束。
- 通过递归调用
(5) 处理错误
.catch((error) => {
console.error("错误:", error);
updateMessageContent(messageElement, "无法连接到服务器。请稍后再试。");
});
- 如果网络或服务器出错,会捕获异常。
- 显示错误提示“无法连接到服务器”。
4. 流程总结
- 用户点击发送按钮。
- 消息通过
fetch
发送到服务器。 - 服务器返回流式数据,客户端逐块接收:
- 使用
getReader()
获取流读取器。 - 使用
read()
循环读取数据块。 - 解码块并动态更新聊天框内容。
- 使用
- 如果出现错误,则显示提示信息。
5. 效果演示
流式传输视频展示
更多推荐
所有评论(0)