《RAG 智能客服项目技术博客》,基于 Streamlit、FastAPI、LangChain 与 Chroma 的本地知识库问答系统实践
一、项目背景
随着大语言模型的发展,越来越多的应用开始使用 AI 来完成智能问答、客服咨询、文档检索和知识库助手等任务。普通的大模型虽然具备较强的自然语言理解和生成能力,但在实际业务场景中仍然存在几个明显问题:
1. 无法直接理解企业或个人的私有文档内容
例如商品尺码规则、公司制度、项目文档、客服 FAQ 等内容通常不在模型训练数据中。
2. 知识可能过时
大模型训练完成后,其知识并不会自动同步最新业务资料。
3. 容易产生幻觉
当模型不知道答案时,可能会生成看似合理但实际错误的内容。
4. 缺少业务可控性
对于智能客服、商品推荐、企业知识问答等场景,回答必须尽量基于已有资料,而不是完全依赖模型自由发挥。
为了解决这些问题,本项目采用了 RAG(Retrieval-Augmented Generation,检索增强生成) 技术方案。RAG 的核心思想是:
▎ 在大模型回答问题之前,先从本地知识库中检索相关资料,再把检索到的资料作为上下文交给大模型生成答案。
这样既可以利用大模型的语言理解能力,又能让回答尽量基于本地知识库内容,从而提升回答的准确性和可控性。
本项目实现的是一个 RAG 智能客服助手,支持本地文档上传、向量化存储、知识库问答、多轮对话、流式输出和 API 服务化调用。
---
二、项目概述
本项目是一个基于 Python 开发的 RAG 智能客服系统,主要功能包括:
- 支持上传本地知识库文件;
- 支持 TXT、MD、JSON、PDF、Word、Excel 等多种文件格式;
- 使用 LangChain 对文档进行加载、切片和链式编排;
- 使用 DashScope 的 Embedding 模型将文本转换为向量;
- 使用 Chroma 作为本地向量数据库;
- 使用通义千问聊天模型生成回答;
- 使用 Streamlit 搭建 Web 聊天界面;
- 使用 FastAPI 提供 HTTP API 服务;
- 支持普通聊天模式和知识库问答模式;
- 支持多会话历史记录保存;
- 支持流式输出 AI 回复。
项目整体可以看作一个较完整的本地知识库问答系统,适合作为学习 RAG 技术、LangChain 编排、大模型应用开发和智能客服项目实践的基础案例。
---
三、项目技术栈
本项目涉及的核心技术栈如下。

四、项目目录结构
本项目目录中既有早期知识库上传 Demo,也有较完整的 RAG 智能客服版本。其中最核心的实现位于 online_chroma 文件夹下。
RAG项目/
├── config_data.py # 早期 Demo 配置文件
├── knowledgebase.py # 早期知识库上传逻辑
├── 文件上传.py # 早期 Streamlit 文件上传页面
├── md.text # 早期 MD5 去重记录文件
├── data/
│ └── 尺码推荐.txt # 示例知识库数据
├── chroma_db/ # 根目录下的 Chroma 向量库
└── online_chroma/
├── app_qa.py # Streamlit 智能客服页面
├── api.py # FastAPI 接口服务
├── rag.py # RAG 核心问答逻辑
├── vector_stores.py # 向量库和文档加载管理
├── file_history.py # 会话历史管理
├── config_data.py # 完整版项目配置
├── requirements.txt # 项目依赖
├── .env.example # API Key 配置示例
└── README.md # 项目说明文档

---
五、系统整体架构
从功能层次来看,本项目可以分为五层:
用户层
├── Streamlit Web 页面
└── FastAPI HTTP 接口
应用服务层
└── RAGService
RAG 编排层
├── Prompt 构造
├── Retriever 检索
├── Message History 历史对话
└── LLM 调用
知识库层
├── 文件加载
├── 文档切片
├── Embedding 向量化
└── Chroma 向量存储
本地存储层
├── chroma_db
└── chat_history
系统的核心流程可以概括为:
1. 用户上传知识库文件;
2. 系统解析文件内容;
3. 将文本切分成多个文档片段;
4. 使用 Embedding 模型将文档片段转换为向量;
5. 将向量和元数据存入 Chroma;
6. 用户提问时,系统先从 Chroma 中检索相关文档;
7. 将检索结果、历史对话和用户问题组合成 Prompt;
8. 调用大语言模型生成回答;
9. 将回答流式展示到页面;
10. 将本轮对话保存到本地历史文件中。
---
六、RAG 技术原理
6.1 什么是 RAG
RAG 的全称是 Retrieval-Augmented Generation,中文通常翻译为 检索增强生成。
它由两个核心部分组成:
1. Retrieval:检索
从外部知识库中找到和用户问题相关的内容。
2. Generation:生成
将检索到的内容提供给大模型,让模型基于这些内容生成答案。
普通大模型回答问题时,主要依赖自身参数中已有的知识。而 RAG 会在回答前额外加入一个“查资料”的过程。
可以简单理解为:
普通大模型:
用户问题 -> 大模型 -> 答案
RAG:
用户问题 -> 检索知识库 -> 找到相关资料 -> 大模型结合资料回答 -> 答案
6.2 RAG 为什么适合智能客服
智能客服系统通常有大量业务规则,例如:
- 商品尺码规则;
- 售后政策;
- 退换货流程;
- 用户使用手册;
- 售后政策;
- 退换货流程;
- 用户使用手册;
- 企业内部规范;
- 常见问题 FAQ。
这些知识并不是大模型天然知道的,而且会经常更新。RAG 可以把这些资料放入本地知识库中,用户提问时实时检索相关内容,再让大模型进行组织和表达。
这样做的好处是:
- 回答更贴合业务资料;
- 可以随时更新知识库;
- 减少大模型胡编乱造;
- 不需要重新训练模型;
- 适合中小型项目快速落地。
---
七、项目配置文件解析
项目的配置文件是:
online_chroma/config_data.py
核心代码如下:
import os
from dotenv import load_dotenv
load_dotenv()
MD5_FILE_PATH = "./md.text"
collections_name = "rag"
persist_directory = "./chroma_db"
chunk_size = 1000
chunk_overlap = 100
separators = ["\n\n", "\n", "!", "。", ",", ",", "."]
max_split_char_number = 1000
similarity_threshold = 2
embedding_model_name = "text-embedding-v4"
chat_model_name = "qwen3-max"
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY", "")
7.1 配置说明
1. 向量库集合名称
collections_name = "rag"
这个配置表示 Chroma 中的 collection 名称。可以理解为向量数据库中的一个表或集合。
2. 向量库持久化目录
persist_directory = "./chroma_db"
Chroma 会把向量数据存储到本地 chroma_db 文件夹中。这样即使程序关闭,下次启动后仍然可以继续使用已有知识库。
3. 文档切片大小
chunk_size = 1000
chunk_overlap = 100
chunk_size 表示每个文档片段的最大长度。
chunk_overlap 表示相邻片段之间保留 100 个字符的重叠部分。
文档切片非常重要。如果切片太大,检索不够精准;如果切片太小,语义可能不完整。本项目设置为 1000 字符,并保留 100 字符重叠,是一种比较常见的基础配置。
4. 切片分隔符
separators = ["\n\n", "\n", "!", "。", ",", ",", "."]
该配置表示切片时优先按照自然段、换行和标点符号进行分割,尽量避免把一句话从中间切断。
5. 检索数量
similarity_threshold = 2
虽然变量名叫 similarity_threshold,但在代码中它实际作为 Retriever 的 k 参数使用,也就是每次返回最相似的 2 个文档片段。
6. 模型配置
embedding_model_name = "text-embedding-v4"
chat_model_name = "qwen3-max"
本项目使用:
- text-embedding-v4 作为文本向量模型;
- qwen3-max 作为聊天生成模型。
---
八、知识库构建流程
知识库构建是 RAG 系统的第一步,对应的核心文件是:
online_chroma/vector_stores.py
该文件中定义了 VectorStoreService 类,负责文档加载、文本切片、向量化和写入 Chroma。
---
8.1 初始化向量库
核心代码如下:
class VectorStoreService(object):
def __init__(self, embedding):
self.embedding = embedding
self.vector_store = Chroma(
collection_name=config.collections_name,
embedding_function=self.embedding,
persist_directory=config.persist_directory
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=config.chunk_size,
chunk_overlap=config.chunk_overlap,
separators=config.separators,
)
这段代码完成了两个重要初始化:
1. 初始化 Chroma 向量库;
2. 初始化文本切片器。
其中:
embedding_function=self.embedding
表示 Chroma 在添加文档时,会使用传入的 Embedding 模型将文本转换为向量。
persist_directory=config.persist_directory
表示向量库会持久化保存到本地目录中。

8.2 获取检索器
def get_retriever(self):
return self.vector_store.as_retriever(
search_kwargs={"k": config.similarity_threshold}
)
这段代码将 Chroma 向量库转换为 LangChain 的 Retriever。
Retriever 的作用是:
▎ 输入一个用户问题,返回与问题最相似的若干个文档片段。
这里的 k=2,表示每次检索返回最相关的 2 个片段。
---
8.3 添加文档
def add_documents(self, documents):
if not documents:
return 0
split_docs = self.text_splitter.split_documents(documents)
self.vector_store.add_documents(split_docs)
return len(split_docs)
这段代码是知识库入库的关键逻辑。
流程如下:
1. 判断文档是否为空;
2. 使用 text_splitter 对文档进行切片;
3. 将切片后的文档写入 Chroma;
4. 返回添加的文档片段数量。
其中:
split_docs = self.text_splitter.split_documents(documents)
会将原始大文档拆成多个小片段。
self.vector_store.add_documents(split_docs)
会完成两个动作:
- 调用 Embedding 模型生成向量;
- 将向量和文档内容写入 Chroma。
---
8.4 支持多种文件格式
项目支持 TXT、MD、JSON、PDF、Word、Excel 等文件格式,核心判断逻辑如下:
def _load_file(self, file_path):
_, ext = os.path.splitext(file_path)
ext = ext.lower()
if ext == ".pdf":
return self._load_pdf_langchain(file_path)
elif ext == ".docx":
return self._load_docx_langchain(file_path)
elif ext in [".xlsx", ".xls"]:
return self._load_excel_langchain(file_path)
elif ext in [".txt", ".md", ".json"]:
return self._load_text_file(file_path)
else:
raise ValueError(f"不支持的文件格式: {ext}")
这段代码根据文件扩展名选择不同的加载方式。
例如:
- .pdf 文件使用 PDF Loader;
- .docx 文件使用 Word Loader;
- .xlsx 文件使用 Excel Loader;
- .txt、.md、.json 文件按普通文本读取。
这使得系统的知识库来源更加灵活,不局限于纯文本文件。
【重点说明】
在真实业务系统中,知识库文件格式通常比较复杂,不同部门可能提供 Word、PDF、Excel 等格式。本项目在文件解析上做了兼容处理,能够支持更丰富的文档来源。
---
8.5 PDF 文件加载
项目优先使用 LangChain 的 PDF Loader,如果依赖不存在或解析失败,则尝试使用备用方案。
def _load_pdf_langchain(self, file_path):
try:
from langchain_community.document_loaders import PyPDFLoader
loader = PyPDFLoader(file_path)
docs = loader.load()
return docs
except ImportError:
try:
from langchain_community.document_loaders import PDFMinerLoader
loader = PDFMinerLoader(file_path)
return loader.load()
except ImportError:
return self._load_pdf_fallback(file_path)
except Exception as e:
return self._load_pdf_fallback(file_path)
这种写法体现了一个比较实用的工程思路:
▎ 优先使用标准组件,如果失败,再使用备用方案,提升系统兼容性。
---
8.6 Word 文件加载
def _load_docx_langchain(self, file_path):
try:
from langchain_community.document_loaders import Docx2txtLoader
loader = Docx2txtLoader(file_path)
return loader.load()
except ImportError:
try:
from langchain_community.document_loaders import UnstructuredWordDocumentLoader
loader = UnstructuredWordDocumentLoader(file_path)
return loader.load()
except ImportError:
return self._load_docx_fallback(file_path)
Word 文件同样采用多级 fallback 方案:
1. 优先使用 Docx2txtLoader;
2. 如果不可用,尝试 UnstructuredWordDocumentLoader;
3. 如果仍不可用,再使用 python-docx 自己提取段落文本。
---
8.7 Excel 文件加载
def _load_excel_fallback(self, file_path):
try:
import pandas as pd
df = pd.read_excel(file_path)
text = df.to_string(index=False)
return [Document(page_content=text.strip(), metadata={"source": file_path})]
except ImportError:
raise ValueError(
"无法解析Excel文件。请安装:\n"
"pip install pandas openpyxl xlrd"
)
except Exception as e:
raise ValueError(f"Excel文档解析失败: {str(e)}")
Excel 文件最终会被转换成字符串,然后包装成 LangChain 的 Document 对象。
这说明在 RAG 系统中,不管原始数据是什么格式,最终都需要统一转换成类似这样的结构:
Document(
page_content="文本内容",
metadata={"source": "文件路径"}
)
---
九、RAG 核心问答链路
项目最核心的文件是:
online_chroma/rag.py
该文件定义了 RAGService 类,负责初始化模型、构建 RAG 链路、普通聊天链路、文档入库以及知识库管理。
---
9.1 初始化 RAGService
核心代码如下:
class RAGService(object):
def __init__(self, api_key=None):
self.api_key = api_key or config.DASHSCOPE_API_KEY
if self.api_key:
self.vector_service = VectorStoreService(
embedding=DashScopeEmbeddings(
model=config.embedding_model_name,
dashscope_api_key=self.api_key
)
)
self.chat_model = ChatTongyi(
model=config.chat_model_name,
api_key=self.api_key
)
else:
self.vector_service = None
self.chat_model = None
self.rag_chain = self._get_rag_chain()
self.chat_chain = self._get_chat_chain()
这段代码主要完成四件事:
1. 读取 API Key;
2. 初始化 Embedding 模型;
3. 初始化聊天模型;
4. 构建 RAG Chain 和普通 Chat Chain。
其中:
DashScopeEmbeddings(model=config.embedding_model_name)
用于将文本转换为向量。
ChatTongyi(model=config.chat_model_name)
用于调用通义千问聊天模型生成回答。

9.2 RAG Chain 构建
RAG 链路的核心代码如下:
def _get_rag_chain(self):
if not self.api_key or not self.vector_service:
return None
retriever = self.vector_service.get_retriever()
def format_document(docs: list[Document]):
if not docs:
return "无相关参考资料"
formatted_str = ""
for doc in docs:
formatted_str += f"文档片段:{doc.page_content}\n文档元数据:{doc.metadata}\n\n"
return formatted_str
def temp1(value: dict) -> str:
return value["input"]
def temp2(value):
new_value = {}
new_value["input"] = value["input"]["input"]
new_value["context"] = value["context"]
new_value["history"] = value["input"]["history"]
return new_value
chain = (
{
"input": RunnablePassthrough(),
"context": RunnableLambda(temp1) | retriever | format_document
} | RunnableLambda(temp2) | self.prompt_template | print_prompt | self.chat_model | StrOutputParser()
)
conversation_chain = RunnableWithMessageHistory(
chain,
get_history,
input_messages_key="input",
history_messages_key="history"
)
return conversation_chain
这段代码是整个项目最重要的部分。
---
9.3 链路拆解
RAG Chain 可以拆解成以下流程:
用户问题
↓
RunnablePassthrough 保留原始输入
↓
RunnableLambda 提取 input
↓
Retriever 检索知识库
↓
format_document 格式化检索结果
↓
组合 input、context、history
↓
PromptTemplate 构造提示词
↓
ChatTongyi 调用大模型
↓
StrOutputParser 转成字符串
↓
RunnableWithMessageHistory 保存历史
其中最关键的是这段:
{
"input": RunnablePassthrough(),
"context": RunnableLambda(temp1) | retriever | format_document
}
它表示同一个用户输入会被分成两路:
1. 一路作为原始问题继续向后传递;
2. 另一路进入 Retriever,从知识库中检索相关资料。
然后再把检索到的资料作为 context 放入 Prompt 中。
---
9.4 检索结果格式化
def format_document(docs: list[Document]):
if not docs:
return "无相关参考资料"
formatted_str = ""
for doc in docs:
formatted_str += f"文档片段:{doc.page_content}\n文档元数据:{doc.metadata}\n\n"
return formatted_str
这段代码把检索到的 Document 列表转换成字符串。
最终格式类似:
文档片段: 用户体重 180 斤时建议选择 XXL 尺码
文档元数据: {'source': '尺码推荐.txt'}
文档片段: 身高 175cm 到 180cm 的用户推荐 XL 或 XXL
文档元数据: {'source': '尺码推荐.txt'}
这些内容会被放进 Prompt,供大模型参考。
---
9.5 Prompt 模板设计
项目中的 RAG Prompt 如下:
@property
def prompt_template(self):
return ChatPromptTemplate.from_messages(
[
("system", "以我提供的资料为主简洁和专业的回答用户问题.参考资料:{context}"),
("system", "并且以我提供的用户对话为历史记录,如下:"),
MessagesPlaceholder("history"),
("user", "请回答用户提问{input}")
]
)
这个 Prompt 包含三部分信息:
1. 参考资料 context
来自 Chroma 检索结果。
2. 历史对话 history
来自本地会话历史文件。
3. 当前用户问题 input
用户本轮输入的问题。
Prompt 的作用是告诉模型:
▎ 回答时优先参考我提供的资料,同时结合历史对话,回答用户当前的问题。
【重点说明】
Prompt 是 RAG 系统中非常关键的一环。检索结果再准确,如果 Prompt 没有明确告诉模型如何使用资料,模型仍然可能忽略知识库内容。
---
9.6 普通聊天链路
除了 RAG 模式,项目还支持普通聊天模式。
def _get_chat_chain(self):
if not self.api_key or not self.chat_model:
return None
chat_prompt = ChatPromptTemplate.from_messages(
[
("system", "你是一个智能助手,请根据历史对话和用户提问提供友好、专业的回答。"),
MessagesPlaceholder("history"),
("user", "{input}")
]
)
chain = chat_prompt | self.chat_model | StrOutputParser()
conversation_chain = RunnableWithMessageHistory(
chain,
get_history,
input_messages_key="input",
history_messages_key="history"
)
return conversation_chain
普通聊天模式不检索知识库,只使用历史对话和当前用户输入。
两种模式对比如下:
9.7 同步问答与流式问答
项目提供了两个聊天方法:
def chat(self, prompt, session_config, use_rag=True):
if not self.api_key:
return "请先配置 API Key"
if use_rag and self.rag_chain:
return self.rag_chain.stream({"input": prompt}, session_config)
elif self.chat_chain:
return self.chat_chain.stream({"input": prompt}, session_config)
else:
return "模型初始化失败,请检查 API Key"
chat() 方法使用 .stream(),返回流式输出结果。
def chat_sync(self, prompt, session_config, use_rag=True):
if not self.api_key:
return "请先配置 API Key"
if use_rag and self.rag_chain:
return self.rag_chain.invoke({"input": prompt}, session_config)
elif self.chat_chain:
return self.chat_chain.invoke({"input": prompt}, session_config)
else:
return "模型初始化失败,请检查 API Key"
chat_sync() 方法使用 .invoke(),一次性返回完整回答。
这两个方法分别服务于不同场景:
- Streamlit 页面更适合流式输出;
- API 同步接口更适合一次性返回完整结果。
---
十、会话历史管理
本项目支持多轮对话,并且会将历史消息保存到本地文件中。对应文件是:
online_chroma/file_history.py
---
10.1 获取历史对象
def get_history(session_id):
return FileChatMessageHistory(session_id, "./chat_history")
每个用户会话都有一个 session_id,系统会根据这个 ID 创建或读取对应的历史文件。
---
10.2 FileChatMessageHistory 类
class FileChatMessageHistory(BaseChatMessageHistory):
def __init__(self, session_id, storage_path):
self.session_id = session_id
self.storage_path = storage_path
self.file_path = os.path.join(storage_path, session_id)
os.makedirs(self.storage_path, exist_ok=True)
初始化时,系统会将会话历史保存路径设置为:
./chat_history/{session_id}
例如:
./chat_history/user_001
这样每个会话都可以单独保存自己的聊天记录。
---
10.3 添加消息
def add_message(self, message: Sequence[BaseMessage]) -> None:
all_messages = list(self.messages)
all_messages.append(message)
new_messages = [message_to_dict(m) for m in all_messages]
with open(self.file_path, 'w', encoding='utf-8') as f:
json.dump(new_messages, f, ensure_ascii=False)
这段代码会:
1. 读取原有历史消息;
2. 追加新消息;
3. 将消息转换为字典;
4. 写回 JSON 文件。
LangChain 中的消息对象不能直接写入 JSON,因此需要使用:
message_to_dict()
将消息转换为可序列化结构。
---
10.4 读取消息
@property
def messages(self) -> list[BaseMessage]:
try:
with open(self.file_path, 'r', encoding='utf-8') as f:
messages_data = json.load(f)
return messages_from_dict(messages_data)
except FileNotFoundError:
return []
读取时使用:
messages_from_dict()
将 JSON 数据还原为 LangChain 的消息对象。
【重点说明】
这个模块解决了多轮对话中的“记忆”问题。用户刷新页面或切换会话后,系统仍然可以恢复之前的聊天记录。
---
十一、Streamlit 前端页面实现
项目的 Web 页面入口是:
online_chroma/app_qa.py
该文件使用 Streamlit 实现了智能客服界面。
---
11.1 页面初始化
st.set_page_config(page_title="智能客服", page_icon="🤖", layout="wide")
st.title("🤖 智能客服")
st.divider()
这段代码设置了页面标题、图标和布局。
---
11.2 Session State 状态管理
Streamlit 每次交互都会重新执行脚本,因此需要使用 st.session_state 保存状态。
if "api_key" not in st.session_state:
st.session_state["api_key"] = config.DASHSCOPE_API_KEY
if "message" not in st.session_state:
st.session_state["message"] = [{"role": "assistant", "content": "你好!有什么可以帮助你的吗?"}]
if "rag" not in st.session_state:
st.session_state["rag"] = None
if "session_id" not in st.session_state:
st.session_state["session_id"] = "user_001"
if "collection_stats" not in st.session_state:
st.session_state["collection_stats"] = None
if "use_rag" not in st.session_state:
st.session_state["use_rag"] = True
这些状态分别表示:

11.3 初始化 RAG 服务
def init_rag_service():
if st.session_state["api_key"] and not st.session_state["rag"]:
st.session_state["rag"] = RAGService(api_key=st.session_state["api_key"])
return st.session_state["rag"]
该函数用于根据 API Key 初始化 RAGService。
如果用户没有输入 API Key,RAG 服务无法正常调用 Embedding 和聊天模型。
---
11.4 侧边栏设置
项目在侧边栏中提供了多个功能模块:
with st.sidebar:
st.header("⚙️ 设置")
1. API Key 输入
api_key_input = st.text_input(
"DashScope API Key",
value=st.session_state["api_key"],
type="password",
placeholder="请输入您的 DashScope API Key"
)
用户可以在页面中输入自己的 DashScope API Key。
2. 知识库文件上传
uploaded_files = st.file_uploader(
"📁 选择要上传的文件",
type=["txt", "md", "json", "pdf", "docx", "xlsx", "xls"],
accept_multiple_files=True,
help="支持:txt, md, json, pdf, docx, xlsx, xls",
key="file_uploader"
)
这里支持多文件上传,并限制了可上传的文件类型。

---
11.5 上传文件到知识库
当用户点击“上传到知识库”按钮后,系统会执行以下逻辑:
if st.button("上传到知识库"):
if not st.session_state["api_key"]:
st.error("请先配置 API Key!")
elif not uploaded_files:
st.warning("请先选择要上传的文件!")
else:
rag = init_rag_service()
if rag:
with st.spinner("正在处理文件..."):
temp_dir = "./temp_uploads"
os.makedirs(temp_dir, exist_ok=True)
上传流程包括:
1. 检查 API Key;
2. 检查是否选择文件;
3. 初始化 RAG 服务;
4. 创建临时上传目录;
5. 保存上传文件;
6. 调用 rag.add_files(file_paths);
7. 删除临时文件;
8. 显示上传结果。
核心调用如下:
result = rag.add_files(file_paths)
这会进入 RAGService.add_files(),再进入 VectorStoreService.add_file(),最终完成文件解析、切片、向量化和入库。
---
11.6 知识库统计
if st.session_state["collection_stats"]:
stats = st.session_state["collection_stats"]
if stats["success"]:
st.metric("文档片段数", stats["document_count"])
这里会显示当前知识库中已有多少个文档片段。
这对于用户判断知识库是否成功构建非常有帮助。
---
11.7 历史会话管理
项目支持展示历史会话:
def get_history_sessions():
history_dir = "./chat_history"
sessions = []
if os.path.exists(history_dir):
for filename in os.listdir(history_dir):
file_path = os.path.join(history_dir, filename)
if os.path.isfile(file_path):
...
sessions.append({
'id': filename,
'preview': preview,
'mtime': os.path.getmtime(file_path)
})
sessions.sort(key=lambda x: x['mtime'], reverse=True)
return sessions
这段代码会扫描 chat_history 目录,把已有会话按照修改时间排序展示出来。
用户可以:
- 查看历史会话;
- 切换到指定会话;
- 创建新会话;
- 清空当前对话。
---
11.8 知识库模式开关
rag_mode = st.toggle(
"📚 知识库模式",
value=st.session_state["use_rag"],
help="开启时使用知识库回答,关闭时使用普通聊天"
)
st.session_state["use_rag"] = rag_mode
这个开关用于控制当前对话是否使用 RAG。
如果开启:
rag.chat(prompt, session_config, use_rag=True)
如果关闭:
rag.chat(prompt, session_config, use_rag=False)
这样用户可以很方便地对比知识库模式和普通聊天模式的区别。
---
11.9 流式输出回答
主区域聊天逻辑如下:
prompt = st.chat_input("请输入你的问题...")
if prompt:
...
res = rag.chat(prompt, session_config, use_rag=st.session_state["use_rag"])
def capture(generator, cache_list):
for chunk in generator:
cache_list.append(chunk)
yield chunk
with st.chat_message("assistant"):
st.write_stream(capture(res, ai_res_list))
st.session_state["message"].append(
{"role": "assistant", "content": "".join(ai_res_list)}
)
这段代码实现了流式输出效果。
其中:
st.write_stream()
会让 AI 回复像打字一样逐步显示,提高交互体验。
capture() 函数一边把流式内容显示出来,一边保存到 ai_res_list 中,最后再合并为完整回答,写入当前页面的消息列表。

十二、FastAPI 接口服务
除了 Web 页面,本项目还提供了 API 服务,方便被其他系统调用。
对应文件是:
online_chroma/api.py
---
12.1 FastAPI 应用初始化
app = FastAPI(
title="智能客服 API",
description="基于 RAG 的智能客服服务",
version="1.0.0"
)
这段代码定义了 FastAPI 应用的标题、描述和版本。
---
12.2 跨域配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
这段代码允许跨域请求,方便前端页面或其他系统调用 API。
---
12.3 请求数据模型
class ChatRequest(BaseModel):
message: str
session_id: str = "user_001"
use_rag: bool = True
api_key: str = None
该模型定义了聊天接口需要接收的字段:

12.4 响应数据模型
class ChatResponse(BaseModel):
response: str
session_id: str
use_rag: bool
接口返回内容包括:
- 模型回答;
- 当前会话 ID;
- 是否使用 RAG 模式。
---
12.5 同步聊天接口
@app.post("/chat", response_model=ChatResponse, summary="同步聊天接口")
async def chat(request: ChatRequest):
try:
api_key = request.api_key or config.DASHSCOPE_API_KEY
if not api_key:
raise HTTPException(status_code=400, detail="API Key 未配置")
if request.session_id not in rag_services:
rag_services[request.session_id] = RAGService(api_key=api_key)
service = rag_services[request.session_id]
service.api_key = api_key
session_config = {
"configurable": {
"session_id": request.session_id
}
}
response = service.chat_sync(
request.message,
session_config,
use_rag=request.use_rag
)
return ChatResponse(
response=response,
session_id=request.session_id,
use_rag=request.use_rag
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
这个接口的处理流程是:
1. 获取 API Key;
2. 检查 API Key 是否存在;
3. 根据 session_id 获取或创建 RAGService;
4. 构造 LangChain 所需的 session_config;
5. 调用 chat_sync() 获取完整回答;
6. 返回结构化响应。
---
12.6 健康检查接口
@app.get("/health", summary="健康检查")
async def health_check():
return {"status": "healthy", "service": "RAG Chat Service"}
该接口用于判断服务是否正常运行。
访问:
GET /health
返回:
{
"status": "healthy",
"service": "RAG Chat Service"
}
---
12.7 删除会话接口
@app.delete("/session/{session_id}", summary="删除会话")
async def delete_session(session_id: str):
if session_id in rag_services:
del rag_services[session_id]
return {"message": f"会话 {session_id} 已删除"}
else:
raise HTTPException(status_code=404, detail="会话不存在")
该接口用于删除内存中的 RAGService 实例。
需要注意的是,这里删除的是内存中的服务实例,不一定会删除本地 chat_history 文件。
---
十三、早期知识库 Demo 说明
项目根目录下还有两个早期版本文件:
knowledgebase.py
文件上传.py
它们可以看作完整 RAG 系统之前的知识库上传 Demo。
---
13.1 knowledgebase.py
该文件主要实现了:
- 字符串 MD5 计算;
- 内容重复检测;
- 文本切片;
- 写入 Chroma 向量库。
核心代码如下:
def get_string_md5(input_str: str, encoding="utf-8"):
str_bytes = input_str.encode(encoding=encoding)
md5_obj = hashlib.md5()
md5_obj.update(str_bytes)
md5_hex = md5_obj.hexdigest()
return md5_hex
这段代码用于计算文本内容的 MD5 值。
上传文本时,通过 MD5 判断内容是否已经存在:
md5_hex = get_string_md5(data)
if check_md5(md5_hex):
return "[跳过]内容已存在知识库中"
这是一种简单的去重方案。
---
13.2 文件上传.py
该文件是一个简单的 Streamlit 上传页面:
uploaded_file = st.file_uploader(
"上传文件",
type=["csv", "xlsx", "txt"],
accept_multiple_files=False
)
用户上传文件后,系统读取文本内容并调用:
reslut = st.session_state["service"].upload_by_str(text, file_name)
将内容写入知识库。
这个早期版本功能比较简单,但它体现了项目的演进过程:
早期 Demo:
文件上传 -> 文本切片 -> 向量库
完整版本:
多格式上传 -> 文档解析 -> 文本切片 -> 向量库 -> RAG 问答 -> 会话历史 -> Web/API 服务
---
十四、完整问答流程示例
以“尺码推荐”为例,完整流程如下。
14.1 知识库准备
假设知识库中有一个文件:
data/尺码推荐.txt
其中包含类似规则:
体重 180 斤的用户,建议选择 XXL 尺码。
身高 175cm 至 180cm 的用户,可以优先考虑 XL 或 XXL。
如果喜欢宽松版型,可以选择大一码。
用户通过 Streamlit 页面上传该文件。
系统执行:
文件上传
↓
临时保存
↓
文件解析
↓
文本切片
↓
Embedding 向量化
↓
写入 Chroma
14.2 用户提问
用户输入:
我体重180斤,应该穿什么尺码?
14.3 系统检索
系统将用户问题转换为向量,在 Chroma 中检索相关片段。
可能检索到:
体重 180 斤的用户,建议选择 XXL 尺码。
14.4 Prompt 构造
系统构造类似这样的 Prompt:
以我提供的资料为主简洁和专业的回答用户问题。
参考资料:
文档片段:体重 180 斤的用户,建议选择 XXL 尺码。
文档元数据:{'source': '尺码推荐.txt'}
并且以我提供的用户对话为历史记录,如下:
历史对话:...
请回答用户提问:
我体重180斤,应该穿什么尺码?
14.5 模型回答
模型最终可能回答:
根据知识库中的尺码推荐规则,你的体重是 180 斤,建议选择 XXL 尺码。如果你更喜欢宽松穿着,可以优先选择 XXL;如果衣服版型偏宽松,也可以结合身高再进一步判断。
---
十五、项目亮点
15.1 功能完整
本项目不仅实现了最基础的 RAG 问答,还包含:
- 知识库上传;
- 多格式文件解析;
- 本地向量库;
- 多轮对话;
- 流式输出;
- Web 页面;
- API 服务。
这使它比简单的 RAG Demo 更接近真实应用。
---
15.2 架构清晰
项目将不同功能拆分到了不同文件中:

这种结构便于后续维护和扩展。
---
15.3 支持两种使用方式
项目既支持 Streamlit 页面,也支持 FastAPI 接口:
- Streamlit 适合演示和交互;
- FastAPI 适合对外提供服务能力。
这让系统既可以作为本地工具使用,也可以集成到其他系统中。
---
15.4 支持多会话
通过 session_id 和本地 JSON 文件,系统实现了多会话历史管理。
用户可以创建不同会话,每个会话保留独立上下文。
---
15.5 支持流式输出
通过 LangChain 的 .stream() 和 Streamlit 的 st.write_stream(),系统可以实时展示 AI 生成内容,用户体验更好。
---
十六、项目可优化方向
虽然本项目已经完成了一个基础但较完整的 RAG 智能客服系统,但仍有一些可以继续优化的方向。
---
16.1 Prompt 可以进一步增强
当前 Prompt 是:
("system", "以我提供的资料为主简洁和专业的回答用户问题.参考资料:{context}")
可以优化为:
你是一个专业的智能客服助手。请严格基于参考资料回答用户问题。
如果参考资料中没有相关信息,请明确说明“知识库中没有找到相关内容”,不要编造答案。
回答时尽量简洁、准确、结构清晰。
这样可以进一步减少模型幻觉。
---
16.2 检索参数命名可以更准确
当前配置中:
similarity_threshold = 2
但它实际表示返回 Top K 文档数量,而不是相似度阈值。
建议改成:
retrieval_top_k = 2
这样更符合语义。
---
16.3 流式 API 可以使用 StreamingResponse
当前 FastAPI 的 /chat/stream 接口直接返回 generator:
return generate()
更规范的写法是使用:
from fastapi.responses import StreamingResponse
然后返回:
return StreamingResponse(generate(), media_type="text/event-stream")
这样更符合 FastAPI 流式响应规范。
---
16.4 知识库可以增加来源引用
当前回答没有明确展示引用来源。后续可以让模型回答时附带:
参考来源:尺码推荐.txt
这样可以提高回答可信度。
---
16.5 可以增加文件去重机制
早期 Demo 中有 MD5 去重逻辑,但完整版本中上传文件时主要按当前上传批次的文件名去重。
后续可以结合 MD5,对已入库文件做更完整的去重,避免重复文档影响检索结果。
---
16.6 可以增加用户权限和安全控制
当前 FastAPI 允许所有跨域请求:
allow_origins=["*"]
在生产环境中应限制允许访问的域名,并增加身份认证机制。
---
16.7 可以增加前端知识库管理功能
后续可以增加:
- 查看已上传文件列表;
- 删除指定文件;
- 按文件重建向量;
- 查看某条检索结果;
- 导出知识库统计信息。
这样系统会更接近生产级知识库管理平台。
---
十七、项目运行方式
17.1 安装依赖
进入 online_chroma 目录后执行:
pip install -r requirements.txt
依赖文件如下:
streamlit>=1.32.0
fastapi>=0.104.0
uvicorn>=0.24.0
langchain>=0.1.10
langchain-community>=0.0.25
langchain-core>=0.1.0
dashscope>=1.14.0
chromadb>=0.4.22
python-dotenv>=1.0.0
pydantic>=2.5.0
click>=8.1.0
jieba>=0.42.1
tiktoken>=0.5.0
requests>=2.31.0
numpy>=1.24.0
---
17.2 配置 API Key
在项目目录下创建 .env 文件:
DASHSCOPE_API_KEY=你的阿里云百炼API Key
程序会通过 dotenv 自动读取该配置。
---
17.3 启动 Streamlit 页面
streamlit run app_qa.py
访问:
http://localhost:8501
---
17.4 启动 FastAPI 服务
python api.py
接口文档地址:
http://localhost:8000/docs
---
十八、核心代码总结
本项目最关键的代码可以归纳为三部分。
---
18.1 文档入库
split_docs = self.text_splitter.split_documents documents)
self.vector_store.add_documents(split_docs)
这部分负责将原始文档切片并写入向量库。
准确写法应为:
split_docs = self.text_splitter.split_documents(documents)
self.vector_store.add_documents(split_docs)
---
18.2 检索增强生成
chain = (
{
"input": RunnablePassthrough(),
"context": RunnableLambda(temp1) | retriever | format_document
} | RunnableLambda(temp2) | self.prompt_template | print_prompt | self.chat_model | StrOutputParser()
)
这部分是 RAG 的核心。它实现了:
用户问题 -> 检索知识库 -> 构造 Prompt -> 调用大模型 -> 输出答案
---
18.3 会话历史绑定
conversation_chain = RunnableWithMessageHistory(
chain,
get_history,
input_messages_key="input",
history_messages_key="history"
)
这部分让 RAG 链路具备多轮对话记忆能力。
只要传入:
session_config = {
"configurable": {
"session_id": "user_001"
}
}
LangChain 就会自动根据 session_id 读取和写入历史消息。
二十、总结
本项目实现了一个基于 RAG 技术的智能客服问答系统。它通过 LangChain 完成 RAG 链路编排,使用 DashScope Embedding 模型进行文本向量化,使用 Chroma 作为本地向量数据库,并结合通义千问聊天模型生成自然语言回答。
从整体功能来看,系统已经具备一个基础智能客服应用的主要能力:
- 可以上传知识库文件;
- 可以自动完成文本解析、切片和向量化;
- 可以基于知识库回答用户问题;
- 可以支持多轮对话;
- 可以保存历史会话;
- 可以通过 Web 页面使用;
- 可以通过 API 接口调用。
对于学习大模型应用开发的人来说,这个项目覆盖了 RAG 系统开发中的核心流程,包括文档加载、文本切片、向量存储、相似度检索、Prompt 构造、大模型调用、流式输出和会话管理。
从工程实践角度看,这个项目虽然还可以继续增强,例如优化 Prompt、增加引用来源、完善流式 API、增加权限控制和知识库管理功能,但它已经形成了一个比较完整的 RAG 应用闭环。
因此,本项目不仅适合作为 RAG 技术学习案例,也可以作为后续开发企业知识库问答、智能客服、商品推荐助手、文档问答助手等系统的基础框架。
---
附录:博客中可重点强调的技术点
1. RAG 的核心价值
RAG 不是让大模型凭空回答,而是让模型先查资料,再根据资料回答。这可以明显提升回答的准确性和业务可控性。
---
2. 文本切片的重要性
文档不能直接整篇放入向量库。合理切片可以提升检索精度,也能避免上下文过长。
---
3. 向量数据库的作用
Chroma 存储的不是普通文本索引,而是文本对应的语义向量。用户问题也会被转成向量,然后通过向量相似度找到相关内容。
---
4. Prompt 决定模型如何使用知识库
RAG 系统不仅要能检索,还要通过 Prompt 告诉模型如何使用检索结果。Prompt 设计不好,模型可能忽略资料或产生幻觉。
---
5. RunnableWithMessageHistory 实现多轮对话
LangChain 的 RunnableWithMessageHistory 可以将链路与历史消息绑定,让系统支持多轮上下文。
---
6. Streamlit 适合快速做 AI 应用原型
通过 Streamlit,可以用较少代码快速构建一个可交互的 AI 问答界面,适合项目演示和原型开发。
---
7. FastAPI 让 RAG 能力服务化
通过 FastAPI,可以把 RAG 问答能力封装成 HTTP 接口,方便其他系统集成调用。
---
附录:可直接放入 Word 的项目流程图
用户上传文件
↓
临时保存文件
↓
根据文件类型解析内容
↓
转换为 Document 对象
↓
RecursiveCharacterTextSplitter 文本切片
↓
DashScope Embedding 向量化
↓
写入 Chroma 向量数据库
↓
用户输入问题
↓
Retriever 检索相关文档片段
↓
组合 context + history + input
↓
ChatTongyi 生成回答
↓
Streamlit 流式展示
↓
保存会话历史

更多推荐
所有评论(0)