手搭一个大模型-part2-构建一个RAG
LLM 的底层设计机制决定了其生成的回答本质上是基于概率而非既定事实。由于 LLM 缺乏自行校准生成内容的能力,这项工作通常由人类来完成,导致只要生成的内容符合其语言逻辑,即使与事实大相径庭,LLM 也会毫不犹豫地输出。在人们看来,这就像是一本正经地胡说八道。幻觉问题难以从根源上解决,因为这种概率性也是 LLM 能展现出创新性的关键所在,但我们可以通过一些方法提高模型回答的准确性。其中RAG就是解
手搭一个RAG
RAG的介绍
LLM 的底层设计机制决定了其生成的回答本质上是基于概率而非既定事实。由于 LLM 缺乏自行校准生成内容的能力,这项工作通常由人类来完成,导致只要生成的内容符合其语言逻辑,即使与事实大相径庭,LLM 也会毫不犹豫地输出。在人们看来,这就像是一本正经地胡说八道。幻觉问题难以从根源上解决,因为这种概率性也是 LLM 能展现出创新性的关键所在,但我们可以通过一些方法提高模型回答的准确性。
其中RAG就是解决这个’幻觉’问题的方法之一。
RAG全称为Retrieval-Augmented Generation。
- R: Retrieval, 即检索器,专门负责根据用户输入的问题,从我们给定的知识库中检索到数据
- A: Augmented, 数据增强,即对检索到的数据进行增强处理。例如,我检索了很多相关的文章,但我只需要与我的问题最相关的部分,通过数据增强可以提取出这些关键内容。
- G: Generation,生成器,通常代指LLM。在获得增强处理后的数据后,生成器将这些数据与用户的问题结合起来,生成更为准确和有用的回答。
以下是RAG的结构图:
- 用户输入问题Query
- Query经过检索器,从向量数据库中找到相关数据
- 对这些数据进行增强(这里没体现)
- 将查询到的数据和用户的查询合并发送给LLM
- 最后LLM会根据这些数据进行回答
RAG搭建思路
从上面的结构来看,一个最简单的RAG可以分成2部分:
- 检索器Retrieval的实现
- 生成器对象
接着我们一个个部分来实现
检索器
在检索器部分中,我们需要将用户的query,转换成向量,同时通过余弦相似度在向量数据库中找到和这个问题最相关的数据,并返回对应的内容给生成器。
在这个过程中,我们需要做两件事情:
- 定义一个能把输入的query转变为向量的Embedding类
- 构建一个可用来查询的向量数据库
接下来我们将一步步的构建这个系统
Embedding层的构建
首先,我们要明确这个Embedding层的作用到底是什么。看上面描述的场景,用户传入进来的是一个文本,最后得到的是一个向量。所以我们需要一个方法,能够将用户的查询变成向量。同时我们后面还要计算余弦相似度,因此我们还可以定义一个专门计算余弦相似度的方法
因此,我们可以编写我们Embedding的基类,BaseEmbedding
class BaseEmbedding(object):
def __init__(self, path: str = '', is_api: bool = True) -> None:
"""
:param path:
:param is_api: 这个Embedding模型是否需要API
"""
self.path = path
self.is_api = is_api
def get_embedding(self, text: str, model: str = '') -> List[float]:
pass
@classmethod
def cosine_similarity(cls, vec1:List[float], vec2: List[float]) -> float:
"""
:param vec1:向量1
:param vec2:向量2
:return: 余弦相似度
"""
dot_product = np.dot(vec1, vec2)
magnitude = np.linalg.norm(vec1) * np.linalg.norm(vec2)
if not magnitude:
# 防止vec1或vec2其中一个为0,导致分母为0
return 0
else:
return dot_product / magnitude
这就是一个最基础的Embedding类,里面有将文本转向量的功能,也有计算余弦相似度的功能。
接着,我们就可以通过类的继承关系来扩展这个功能。这是我们使用的是OpenAI训练的text-Embedding-3-large模型,这个模型可以将输入的矩阵自动分词,然后转化为词向量,经过编码层的特征提取后,最后转化为一个3072维度的向量。
代码如下:
class OpenAIEmbedding(BaseEmbedding):
def __init__(self, path: str = '', is_api: bool = True) -> None:
super().__init__(path, is_api)
if self.is_api:
self.client = OpenAI()
# 配置环境
self.client.api_key = os.getenv('OPENAI_API_KEY')
self.client.base_url = os.getenv('OPENAI_BASE_URL')
# 定义openai的Embedding模型
def get_embedding(self, text: str, model: str = 'text-embedding-3-large') -> List[float]:
"""
:param text: 需要被Embedding的句子
:param model: 调用的模型名字
:return:
"""
if self.is_api:
# 将文本的换行符去掉
text = text.replace('\n', '')
return self.client.embeddings.create(input=[text], model=model).data[0].embedding
else:
raise NotImplementedError
构建完后我们可以测试一下:
if __name__ == '__main__':
Embedding = OpenAIEmbedding()
inner = '人生何时起头'
inner2 = '人生如逆旅,不进则退'
inner_embedding = Embedding.get_embedding(inner, 'text-embedding-3-large')
inner_embedding2 = Embedding.get_embedding(inner2, 'text-embedding-3-large')
print(inner_embedding)
print(len(inner_embedding))
print(inner_embedding2)
print(len(inner_embedding2))
print(BaseEmbedding.cosine_similarity(inner_embedding, inner_embedding2))
可以看到这两句话最终被映射到了一个3072的维度,并且计算了余弦相似度获得了对应的结果。大概只有0.42的相似率
向量数据库构建
接着我们构建我们的向量数据库,一个向量数据库,要实现的功能有四个:
- 将句子转为的向量,如果是一个新的向量数据库,可以通过这一功能新建这个向量数据库
- 将向量做查询,即相似度索引查询功能
- 持久化功能
- 加载已经持久化的本地向量文件(可以理解加载数据库的数据)
接着,我们来实现这个向量数据库的代码构建
import os
from typing import Dict, List, Optional, Tuple, Union
import json
from Embedding import OpenAIEmbedding, BaseEmbedding
import numpy as np
from tqdm import tqdm
class VectorStore(object):
def __init__(self, doucment: List[str]):
self.doucment = doucment
def get_vector(self, EmbeddingModel: BaseEmbedding) -> List[List[float]]:
"""
将文本对象转化为向量存储起来,作为知识库
:param EmbeddingModel: 将文本转为词向量
:return:向量知识库
"""
self.vectors = []
for doc in tqdm(self.doucment):
self.vectors.append(EmbeddingModel.get_embedding(doc))
return self.vectors
def get_similarity(self, vec1: List[float], vec2: List[float]) -> float:
return BaseEmbedding.cosine_similarity(vec1, vec2)
def persist(self, path: '') -> None:
"""
:param path:持久化存储路径
:return:
"""
# 判断路径是否存在,不存在就添加这个文件夹
if not os.path.exists(path):
os.makedirs(path)
# 将文档保存
with open(f'{path}/document.json', 'w', encoding='utf-8') as f:
json.dump(self.doucment, f, ensure_ascii=False)
# 判断向量数据库是否有知识,有就存储起来
if self.vectors:
with open(f'{path}/vectors.json', 'w', encoding='utf-8') as f:
json.dump(self.vectors, f)
def load_vector(self, path: str = 'storage') -> None:
"""
加载已经向量化的路径和文档
:param path: 加载向量的路径
:return:
"""
with open(f'{path}/vectors.json', 'r', encoding='utf-8') as f:
self.vectors = json.load(f)
with open(f'{path}/document.json', 'r', encoding='utf-8') as f:
self.doucment = json.load(f)
def query(self, query: str, EmbeddingModel: BaseEmbedding, k: int = 1) -> list[str]:
"""
1. 将query转化位词向量
2.
:param query:查询的问题
:param EmbeddingModel: 负责向量化的模型
:return:
"""
query_vector = EmbeddingModel.get_embedding(query)
# 对向量数据库中的数据一一求相似度
result = np.array([self.get_similarity(query_vector, vector) for vector in self.vectors])
"""
np.array(self.doucment)[result.argsort()[-k:][::-1]].tolist()
1. np.array(self.document) 将文本段变成块
2. result.argsort() 将相似度计算的结果从小到大排序,如result 是 [0.1, 0.4, 0.2],则 result.argsort() 返回 [0, 2, 1],
表示第0个元素最小,接着是第2个元素,最后是第1个元素。
3. [-k:] 表示去倒数第k个数,如k=2,则取[2,1]
4. [::-1] 反转列表, [2,1]变[1,2]
5. 最后转化成list返回
作用:
返回相似度最高的 k 个文档,按相似度从高到低排序。
"""
return np.array(self.doucment)[result.argsort()[-k:][::-1]].tolist()
生成器
所谓的生成器部分也就是将经过检索后得到的文档和用户的查询结合到一起后得到的prompt发送给大模型LLM,让它返回答案,下面是实现代码,这里只实现了调用GPT模型的接口代码
首先,我们先定义一个提示词模板
PROMPT_TEMPLATE = dict(
RAG_PROMPT_TEMPALTE="""使用以上下文来回答用户的问题。如果你不知道答案,就说你不知道。总是使用中文回答。
问题: {question}
可参考的上下文:
···
{context}
···
如果给定的上下文无法让你做出回答,请回答数据库中没有这个内容,你不知道。
有用的回答:"""
)
然后,我们定义一个LLM的基类
class BaseModel(object):
def __init__(self, path: str = ''):
self.path = path
def chat(self, prompt: str, history: List[dict], content: str):
pass
def load_model(self):
pass
接着,构建我们的OpenAI大模型的类
class OpenAIChatModel(BaseModel):
def __init__(self, path: str = '', model: str = 'gpt-3.5-turbo-0125'):
"""
:param path:
:param model: 传入gpt模型
"""
super().__init__(path)
self.model = model
def chat(self, prompt: str, history: List[dict], content: str):
self.client = OpenAI()
self.client.api_key = os.getenv('OPENAI_API_KEY')
self.client.base_url = os.getenv('OPENAI_BASE_URL')
print(self.client.api_key, self.client.base_url)
history.append(
{'role': 'user', 'content': PROMPT_TEMPLATE['RAG_PROMPT_TEMPALTE'].format(question=prompt, context=content)}
)
response = self.client.chat.completions.create(
model=self.model,
messages=history,
max_tokens=150,
temperature=0.1
)
return response.choices[0].message.content
# gpt是调用API,不用再本地加载了
这样依赖,我们的LLM模块也弄好了。
整合
在最后整合之前,我们还需要定义一个工具文件,里面实现了读取文件,将文件分块等功能。用来辅助搭建整个RAG系统
工具类
我们的工具类主要有2个作用:
- 读取文档
- 对文档进行分块
可能有的人有疑问,为什么要对文档分块呢?
实际上我们text2Embedding的输入是有限性的。或者你也可以这样理解,就以OpenAI提供的text-Embedding-3-large模型来说,最后输出的维度是3072。假如我输入给它的文本长度是5000,但是他最后的维度只有3072,我们是不是可以理解为他还给我的特征降维了,原本Embedding的目的就是为了将索引映射到一个更好的维度,以至于让模型更好的学习到文本的特征关系。这样一来就有点本末倒置了。所以一般的text2Embedding都会有长度限制
下面是代码:
import os
from typing import Dict, List, Optional, Tuple, Union
import PyPDF2
import markdown
import html2text
import json
from tqdm import tqdm
import tiktoken
from bs4 import BeautifulSoup
import re
# 加载词表,用来计算token值,方便分割
enc = tiktoken.get_encoding("cl100k_base")
class ReadFiles:
"""
读取文件的类
"""
def __init__(self, path: str) -> None:
self._path = path
self.file_list = self.get_files()
def get_files(self):
"""
只记录满足我们读取条件的文件
:return:
"""
# args:dir_path,目标文件夹路径
file_list = []
for filepath, dirnames, filenames in os.walk(self._path):
# os.walk 函数将递归遍历指定文件夹
for filename in filenames:
# 通过后缀名判断文件类型是否满足要求
if filename.endswith(".md"):
# 如果满足要求,将其绝对路径加入到结果列表
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".txt"):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".pdf"):
file_list.append(os.path.join(filepath, filename))
return file_list
def get_content(self, max_token_len: int = 600, cover_content: int = 150):
docs = []
# 读取文件内容
for file in self.file_list:
# 读取文档
content = self.read_file_content(file)
# 切分文档
chunk_content = self.get_chunk(
content, max_token_len=max_token_len, cover_content=cover_content)
docs.extend(chunk_content)
# 最后的文档变成了一块一块的['chunk1', 'chunk2',...]
return docs
@classmethod
def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
chunk_text = []
curr_len = 0
curr_chunk = ''
token_len = max_token_len - cover_content
lines = text.splitlines() # 假设以换行符分割文本为行
for line in lines:
line = line.replace(' ', '')
line_len = len(enc.encode(line))
if line_len > max_token_len:
# 如果单行长度就超过限制,则将其分割成多个块
num_chunks = (line_len + token_len - 1) // token_len
end = 0
for i in range(num_chunks):
start = i * token_len
end = start + token_len
# 避免跨单词分割
while not line[start:end].rstrip().isspace():
start += 1
end += 1
if start >= line_len:
break
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
# 处理最后一个块
start = (num_chunks - 1) * token_len
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
# 如果拼接起来后长度还不够的话,那就继续拼接
if curr_len + line_len <= token_len:
curr_chunk += line
curr_chunk += '\n'
curr_len += line_len
curr_len += 1
else:
chunk_text.append(curr_chunk)
curr_chunk = curr_chunk[-cover_content:] + line
curr_len = line_len + cover_content
if curr_chunk:
chunk_text.append(curr_chunk)
return chunk_text
@classmethod
def read_file_content(cls, file_path: str):
# 根据文件扩展名选择读取方法
if file_path.endswith('.pdf'):
return cls.read_pdf(file_path)
elif file_path.endswith('.md'):
return cls.read_markdown(file_path)
elif file_path.endswith('.txt'):
return cls.read_text(file_path)
else:
raise ValueError("Unsupported file type")
@classmethod
def read_pdf(cls, file_path: str):
# 读取PDF文件
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page_num in range(len(reader.pages)):
text += reader.pages[page_num].extract_text()
return text
@classmethod
def read_markdown(cls, file_path: str):
# 读取Markdown文件
with open(file_path, 'r', encoding='utf-8') as file:
md_text = file.read()
html_text = markdown.markdown(md_text)
# 使用BeautifulSoup从HTML中提取纯文本
soup = BeautifulSoup(html_text, 'html.parser')
plain_text = soup.get_text()
# 使用正则表达式移除网址链接
text = re.sub(r'http\S+', '', plain_text)
return text
@classmethod
def read_text(cls, file_path: str):
# 读取文本文件
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
class Documents:
"""
获取已分好类的json格式文档
"""
def __init__(self, path: str = '') -> None:
self.path = path
def get_content(self):
with open(self.path, mode='r', encoding='utf-8') as f:
content = json.load(f)
return content
接下来开始搭建我们整个RAG的模型
代码:
from utils.Embedding import OpenAIEmbedding
from utils.LLM import OpenAIChatModel
from utils.VectorBase import VectorStore
from utils.Utlis import ReadFiles
import os
# 使用中转站或者openai官方的key
os.environ['OPENAI_API_KEY'] = ''
os.environ['OPENAI_BASE_URL'] = ''
# 读取data目录下的文本
docs = ReadFiles(r'./data').get_content(max_token_len=600, cover_content=150)
vector = VectorStore(docs)
embedding = OpenAIEmbedding()
vector.get_vector(EmbeddingModel=embedding)
# 持久化这个文件
vector.persist(path='./storage')
question = 'CRF是怎么构建损失函数的'
content = vector.query(question, embedding)
chat = OpenAIChatModel()
print(chat.chat(prompt=question,history=[], content=content))
Vectoring …: 100%|██████████| 37/37 [01:34<00:00, 2.56s/it]
CRF构建损失函数的过程是通过对比模型输出的标签序列和真实的标签序列,计算它们之间的差异,然后将这个差异作为损失函数的值。在CRF中,通常使用负对数似然作为损失函数,通过最小化这个损失函数来优化模型参数。希望这个回答对你有帮助。
结果如上
当然我们可以直接加载我们已经存储好的向量数据库
from utils.Embedding import OpenAIEmbedding
from utils.LLM import OpenAIChatModel
from utils.VectorBase import VectorStore
vector = VectorStore(None)
vector.load_vector('./storage')
question = 'CRF是怎么构建损失函数的'
embedding = OpenAIEmbedding()
content = vector.query(question, EmbeddingModel=embedding, k=1)[0]
chat = OpenAIChatModel()
print(f"查找到的内容为:{content}")
print('LLM生成的结果为:\n',chat.chat(question, [], content=content))
结果如下:
查找到的内容为:
PersonTO,OTO,B−OrganizationTB−Organization,I−Organization前边我们讲到,CRF能够帮助我们以一种全局的方式建模,在所有可能的路径中选择效果最优,分
数最高的那条路径。那么我们应该怎么去建模这个策略呢,下面我们来具体谈谈。
图5CRF解码过程图
图5展示了CRF的工作图,现在我们有一串输入(这里的是文本串对应的发射分数,每个字词都对应着一个发射分数向量,也就是前边提到的标签向量,该向量的维度就是
标签数量),期待解码出相应的标签序列,形式化为对应的条件概率公式
如下:
在第2节我们提到,CRF的解码策略在所有可能的路径中,找出得出概率最大,效果最优的一条路
径,那这个标签序列就是模型的输出,假设标签数量是,文本长度是,显然会有条路
径,若用代表第条路径的分数,那我们可以这样去算一个标签序列出现的概率:
现在我们有一条真实的路径,即我们期待CRF解码出来的序列就是这一条。那它的分数可以表
示为,它出现的概率就是
LLM生成的结果:
CRF构建损失函数的过程是通过计算标签序列的概率来实现的。具体来说,CRF会对所有可能的路径进行评分,并选择得分最高的路径作为模型的输出。在计算标签序列的概率时,可以使用条件概率公式进行计算。具体的计算过程可以参考上下文中提到的公式和图示。
可以看出我们是能找到对应的内容的,但是由于我们对文档的划分过于粗暴,所以可以看起来不那么的相关,而且有很多的冗余内容。这些就是我们后期可以优化的地方。
扩展优化的思路
我们可以对RAG三个部分进行优化改进
Retrieval
对于向量数据库的生成
向量数据库的质量很大一部分取决于我们对文本划分的准度,所以主要优化的是对文本的划分方法
- 使用先进的自然语言处理算法进行文本划分。例如,可以采用基于深度学习的句子分割算法,如BERT、RoBERTa等。
- 利用情感分析、主题建模等技术对文本进行语义划分,从而提高划分的准确性和上下文理解。
- 采用暴力划分,使用大型语言模型如GPT-4,对传入的文档按语义进行划分。这样可以保证划分的准确性,并且能够根据上下文进行更智能的分段。
对于检索数据来源
我们不要把眼光局限在向量数据库,我们还可以考虑多方面来源,比如:
- 引入图数据库,如Neo4j,可以通过关系图谱的形式来存储和检索信息。这种方式有助于理解数据之间的关系和联结,从而提供更深层次的检索结果。
- 结合SQL数据库的优势,使用大模型根据用户的查询需求自动生成SQL语句,查询结构化数据。
- 结合Web搜索引擎,通过发送查询请求到Web,实时获取最新的信息。
改进向量相似计算
- 引入更为复杂和准确的向量相似计算算法,如欧几里得距离、曼哈顿距离等。
- 采用深度学习中的对比学习方法(Contrastive Learning),进一步优化向量相似计算的准确性。
- 使用预训练的向量模型(如Sentence-BERT)生成高质量的文本向量,确保相似性计算的准确性和鲁棒性。
Augmented
智能过滤和选择
- 相关性评分:使用更高级的相关性评分算法(如BM25、TF-IDF、BERT-based re-ranking)对检索到的数据进行评分,选择与用户问题最相关的部分。
- 上下文理解:利用上下文感知模型(如BERT、RoBERTa)理解检索到的数据内容,从中提取与用户问题高度相关的段落或句子。
内容摘要和提取
- 自动摘要生成:使用预训练的摘要生成模型(如BART、T5)对长篇文章生成简明的摘要,确保保留关键信息。
- 关键词提取:利用关键词提取技术(如TextRank、YAKE)识别并提取与用户问题相关的关键词和句子。
Generator
这个模块像优化是最简单的,直接换一个指标更好的LLM就行了,或者根据特定的任务使用微调过后的LLM。
参考
更多推荐
所有评论(0)