LangChain大模型应用开发:工作流编排
大家好,博主又来给大家分享知识了,那么今天又给大家分享什么内容呢?今天我要给大家分享的内容是。
介绍
大家好,博主又来给大家分享知识了,那么今天又给大家分享什么内容呢?今天我要给大家分享的内容是LangChain工作流编排。那么什么是LangChain工作流编排呢?
简单来说,LangChain工作流编排就是将多个与自然语言处理相关的组件,像提示模板、大语言模型、各种实用工具等巧妙地组合在一起,形成一个有条理、可执行的流程。
LangChain提供了多种方式来实现这种编排,其中很有特色的就是链式调用和LCEL(LangChain表达式语言)。好了,那么我们直接进入正题。
LangChain Expression Language
LCEL(LangChain Expression Language)是一种强大的工作流编排工具,可以从基本组件构建复杂任务链条(Chain),并支持诸如流式处理、并行处理和日志记录等开箱即用的功能。
LCEL从第一天起就被设计为支持将原型投入生产且无需更改代码,从最简单的"提示 + LLM"链到最复杂的链(我们已经看到有人成功地在生产中运行了包含数百步的LCEL链)。而使用LCEL的一些原因包含以下几个亮点:
一流的流式支持:当我们使用LCEL构建链时,我们将获得可能的最佳时间到第一个标记(直到输出的第一块内容出现所经过的时间)。对于某些链,这意味着我们直接从LLM流式传输标记到流式输出解析器,我们将以与LLM提供程序输出原始标记的速率相同的速度获得解析的增量输出块。
异步支持:使用LCEL构建的任何链都可以使用同步API(例如,在我们的Jupyter Notebook中进行原型设计)以及异步API(例如,在LangServe服务器中)进行调用。这使得可以在原型和生产中使用相同的代码和具有出色的性能,并且能够在同一服务器中处理许多并发请求。
优化的并行执行:每当我们的LCEL链具有可以并行执行的步骤时(例如,如果我们从多个检索器中获取文档),我们会自动执行,无论是在同步接口还是异步接口中,以获得可能的最小延迟。
重试和回退:为LCEL链的任何部分配置重试和回退。这是使我们的链在规模上更可靠的好方法。LangChain官方目前正在努力为重试/回退添加流式支持,这样我们就可以获得额外的可靠性而无需任何延迟成本。
访问中间结果:对于更复杂的链,访问中间步骤的结果通常非常有用,即使在生成最终输出之前。这可以用于让最终用户知道正在发生的事情,甚至只是用于调试我们的链。我们可以流式传输中间结果,并且在每个LangServe服务器上都可以使用。
输入和输出模式:输入和输出模式为每个LCEL链提供了从链的结构推断出的Pydantic和JSONSchema模式。这可用于验证输入和输出,并且是LangServe的一个组成部分。
Runnable Interface
为了尽可能简化创建自定义链的过程,LangChain实现了一个"Runnable"协议。许多LangChain组件都实现了”Runnable“协议,包括聊天模型、LLMs、输出解析器、检索器、提示模板等等。此外,还有一些有用的基本组件可用于处理可运行对象。这是一个标准接口,可以轻松定义自定义链,并以标准方式调用它们。标准接口包括:
- stream: 返回响应的数据块
- invoke: 对输入调用链
- batch: 对输入列表调用链
这些还有相应的异步方法,应该与asyncio一起使用await语法以实现并发:
- astream:异步返回响应的数据块
- ainvoke:异步对输入调用链
- abatch:异步对输入列表调用链
- astream_log:异步返回中间步骤,以及最终响应
- astream_events:beta流式传输链中发生的事件(在langchain-core的0.1.14中引入)
输入类型和输出类型因组件而异:
组件 | 输入类型 | 输出类型 |
提示 | 字典 | 提示值 |
聊天模型 | 单个字符串、聊天消息列表或提示值 | 聊天消息 |
LLM | 单个字符串、聊天消息列表或提示值 | 字符串 |
输出解析器 | LLM或聊天模型的输出 | 取决于解释器 |
检索器 | 单个字符串 | 文档列表 |
工具 | 单个字符串或字典,取决于工具 | 取决于工具 |
所有可运行对象都公开输入和输出模式以检查输入和输出:
- input_schema:从可运行对象结构自动生成的输入Pydantic模型。
- output_schema:从可运行对象结构自动生成的输出Pydantic模型。
流式运行对于使基于LLM的应用程序对最终用户具有响应性至关重要。重要的LangChain原语,如聊天模型、输出解析器、提示模板、检索器和代理都实现了LangChain Runnable接口。该接口提供了两种通用的流式内容方法:
- 同步stream和异步astream:流式传输链中的最终输出的默认实现。
- 异步astream_events和异步astream_log:这些方法提供了一种从链中流式传输中间步骤和最终输出的方式。让我们看看这两种方法,并尝试理解如何使用它们。
Stream
流。所有Runnable对象都实现了一个名为Stream的同步方法和一个名为astream的异步变体。这些方法旨在以块的形式流式传输最终输出,尽快返回每个块。只有在程序中的所有步骤都知道如何处理输入流时,才能进行流式传输;即,逐个处理输入块,并产生相应的输出块。
这种处理的复杂性可以有所不同,从简单的任务,如发出LLM生成的令牌,到更具挑战性的任务,如在整个JSON完成之前流式传输JSON结果的部分。开始探索流式传输的最佳方法是从LLM应用程序中最重要的组件开始——LLM本身!
LLM和聊天模型
大型语言模型及其聊天变体是基于LLM的应用程序的主要瓶颈。大型语言模型可能需要几秒钟才能对查询生成完整的响应。这比应用程序对最终用户具有响应性的约200-300毫秒的阈值要慢得多。使应用程序具有更高的响应性的关键策略是显示中间进度;即,逐个令牌流式传输模型的输出。我们将展示使用聊天模型进行流式传输的示例。从以下选项中选择一个:
同步Stream运行
让我们从同步stream API开始:
完整代码
# 从 langchain_openai库中导入ChatOpenAI类,用于与OpenAI聊天模型交互
from langchain_openai import ChatOpenAI
# 初始化一个ChatOpenAI实例,使用gpt-3.5-turbo模型并开启流式响应
model = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True)
# 创建一个空列表chunks,用于存储模型流式输出的文本块
chunks = []
# 遍历模型对问题的流式输出
for chunk in model.stream("彩虹是什么颜色?"):
# 将每次流式输出的文本块添加到 chunks 列表中
chunks.append(chunk)
# 打印当前文本块内容,用|分隔且立即刷新输出
print(chunk.content, end="|", flush=True)
运行结果
|彩|虹|通|常|呈|现|红|橙|黄|绿|蓝|靛|紫|七|种|颜|色|,并|且|有|时|两|端|还|有|一|些|淡|淡|的|霞|光|色|。||
进程已结束,退出代码为 0
异步AStream运行
或者,我们的项目工程大多数都是再异步环境中运行,可以考虑使用异步astream API:
完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 从langchain_openai模块导入ChatOpenAI类,该类用于与OpenAI的聊天模型进行交互
from langchain_openai import ChatOpenAI
# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
# 创建一个ChatOpenAI实例,指定使用gpt-3.5-turbo模型,并开启流式响应
model = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True)
# 初始化一个空列表chunks,用于存储模型流式返回的文本块
chunks = []
# 异步迭代模型对问题的流式响应结果
async for chunk in model.astream("彩虹是什么颜色?"):
# 将每次流式返回的文本块添加到chunks列表中
chunks.append(chunk)
# 打印当前文本块的内容,以|作为分隔符,并立即刷新输出缓冲区
print(chunk.content, end="|", flush=True)
# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
# 运行异步函数main,启动整个异步流程
asyncio.run(main())
运行结果
|彩|虹|通|常|由|七|种|颜|色|组|成|,|按|照|顺|序|分|别|是|红|、|橙|、|黄|、|绿|、|蓝|、|靛|(|蓝|紫|)|和|紫|。||
进程已结束,退出代码为 0
检查其中一个块
让我们检查其中一个块:
完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 从langchain_openai模块导入ChatOpenAI类,该类用于与OpenAI的聊天模型进行交互
from langchain_openai import ChatOpenAI
# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
# 创建一个ChatOpenAI实例,指定使用gpt-3.5-turbo模型,并开启流式响应
model = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True)
# 初始化一个空列表chunks,用于存储模型流式返回的文本块
chunks = []
# 异步迭代模型对问题的流式响应结果
async for chunk in model.astream("彩虹是什么颜色?"):
# 将每次流式返回的文本块添加到chunks列表中
chunks.append(chunk)
# 检查其中一个块
print(chunks[1])
# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
# 运行异步函数main,启动整个异步流程
asyncio.run(main())
运行结果
content='彩' additional_kwargs={} response_metadata={} id='run-dc7251d4-74d7-4094-a38e-879f9373fcf2'
进程已结束,退出代码为 0
叠加消息块
我们得到了一个称为AIMessageChunk的东西。该块表示AIMessage的一部分。消息块是可叠加的,我们可以简单地将它们相加以获得到目前为止的响应状态!
完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 从langchain_openai模块导入ChatOpenAI类,该类用于与OpenAI的聊天模型进行交互
from langchain_openai import ChatOpenAI
# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
# 创建一个ChatOpenAI实例,指定使用gpt-3.5-turbo模型,并开启流式响应
model = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True)
# 初始化一个空列表chunks,用于存储模型流式返回的文本块
chunks = []
# 异步迭代模型对问题的流式响应结果
async for chunk in model.astream("彩虹是什么颜色?"):
# 将每次流式返回的文本块添加到chunks列表中
chunks.append(chunk)
# 初始化一个变量用于存储合并后的消息块
combined_chunk = chunks[0]
# 从索引2开始循环到索引4,将后续的消息块依次合并
for i in range(1, 5):
# 将消息快叠加
combined_chunk = combined_chunk + chunks[i]
# 打印叠加后的消息块
print(combined_chunk)
# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
# 运行异步函数main,启动整个异步流程
asyncio.run(main())
运行结果
content='彩虹是由' additional_kwargs={} response_metadata={} id='run-ad0352b7-d26e-4465-8ab8-b41d6ecd20f5'
进程已结束,退出代码为 0
Chain
链。几乎所有的LLM应用程序都涉及不止一步的操作,而不仅仅是调用语言模型。让我们使用LangChain的LCEL(表达式语言)构建一个简单的链,该链结合了一个提示、模型和解析器,并验证流式传输是否正常工作。
我们将使用StrOutputParser来解析模型的输出。这是一个简单的解析器,从AIMessageChunk中提取content字段,给出模型返回的token。
LCEL是一种声明式的方式,通过将不同的LangChain原语链接在一起来指定一个"程序"。使用LCEL创建的链可以自动实现stream和astream,从而实现对最终输出的流式传输。事实上,使用LCEL创建的链实现了整个标准Runnable接口。
我们用代码来演示:
完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 导入用于将输出解析为字符串的StrOutputParser类
from langchain_core.output_parsers import StrOutputParser
# 导入用于创建聊天提示模板的ChatPromptTemplate类
from langchain_core.prompts import ChatPromptTemplate
# 导入用于与OpenAI聊天模型交互的ChatOpenAI类
from langchain_openai import ChatOpenAI
# 初始化一个ChatOpenAI实例,使用gpt-3.5-turbo模型并开启流式响应
chat_model = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True)
# 创建一个聊天提示模板,根据输入的topic生成关于该主题的笑话提示
chat_prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")
# 初始化一个输出解析器,将模型输出解析为字符串
parser = StrOutputParser()
# 将提示模板、聊天模型和解析器组合成一个处理链
chain = chat_prompt | chat_model | parser
# # 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
# 异步流式处理,输入主题并迭代获取输出块
async for chunk in chain.astream({"topic": "编程"}):
# 打印输出块,用|分隔并立即刷新输出
print(chunk, end="|", flush=True)
# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
# 运行异步函数main,启动整个异步流程
asyncio.run(main())
运行结果
|程序|员|去|看|电|影|,|被|问|到|“|你|要|坐|第|几|排|?”|他|回|答|:“|我|要|坐|第|0|排|!”|因|为|在|编|程|中|,|数组|是|从|0|开始|计|数|的|。||
进程已结束,退出代码为 0
请注意,即使我们在上面的链条末尾使用了parser,我们仍然可以获得流式输出。 parser会对每个流式块进行操作。许多LCEL基元也支持这种转换式的流式传递,这在构建应用程序时非常方便。
自定义函数可以被设计为返回生成器,这样就能够操作流。
某些可运行实体,如提示模板和聊天模型,无法处理单个块,而是聚合所有先前的步骤。这些可运行实体可以中断流处理。
LangChain表达语言允许我们将链的构建与使用模式(例如同步/异步、批处理/流式等)分开。如果这与我们构建的内容无关,我们也可以依赖于标准的命令式编程方法,通过在每个组件上调用invoke、batch或stream,将结果分配给变量,然后根据需要在下游使用它们。
使用输入流
如果我们想要在输出生成时从中流式传输JSON,该怎么办呢?
如果我们依赖json.loads来解析部分JSON,那么解析将失败,因为部分JSON不会是有效的JSON。
当出现这种情况时,我们可能会束手无策,声称无法流式传输JSON。
事实证明,有一种方法可以做到这一点——解析器需要在输入流上操作,并尝试将部分JSON"自动完成"为有效状态。
让我们看看这样一个解析器的运行,以了解这意味着什么。
完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 导入用于将输出解析为字符串的StrOutputParser类
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
# 导入用于与OpenAI聊天模型交互的ChatOpenAI类
from langchain_openai import ChatOpenAI
# 创建一个ChatOpenAI实例,使用gpt-3.5-turbo模型
chat_model = ChatOpenAI(model="gpt-3.5-turbo")
# 初始化一个将输出解析为字符串的解析器实例
str_output_parser = StrOutputParser()
# 初始化一个将输出解析为JSON格式的解析器实例
json_output_parser = JsonOutputParser()
# 将聊天模型和JSON输出解析器组合成一个处理链
chain = chat_model | json_output_parser
# 定义一个异步函数,用于异步流式处理并输出结果
async def async_stream():
# 异步迭代处理链的异步流式输出,输入特定指令要求模型以JSON格式输出相关信息
async for text in chain.astream(
"以JSON格式输出中国、美国和俄罗斯的国家及其人口列表。"
'使用一个带有“countries”外部键的字典,其中包含国家列表。'
"每个国家都应该有键`name`和`population`"
):
# 打印输出的内容,并立即刷新输出缓冲区
print(text, flush=True)
# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
# 运行异步函数async_stream,启动整个异步流程
asyncio.run(async_stream())
运行结果
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': '中国'}]}
{'countries': [{'name': '中国', 'population': ''}]}
{'countries': [{'name': '中国', 'population': '14'}]}
{'countries': [{'name': '中国', 'population': '14亿'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': ''}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': ''}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': ''}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯', 'population': ''}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯', 'population': '1'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯', 'population': '1.'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯', 'population': '1.45'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯', 'population': '1.45亿'}]}
进程已结束,退出代码为 0
Stream Events
事件流。到目前,我们已经详细了解了LangChain中stream和astream方法的工作原理,它们分别以同步和异步的方式实现了模型响应的流式输出。现在,让我们深入探讨与这些流式输出紧密相关的事件流机制。
在LangChain中,与流式输出相关的部分API目前处于beta测试阶段。例如,在处理流式响应过程中涉及的一些用于监听和处理事件流的API可能还不稳定,后续会根据开发者社区的反馈进行调整和完善。这种调整可能不仅限于细微的修改,在某些情况下可能会对API的使用方式或功能特性做出较大变动。
本示例主要演示了LangChain中特定的V2 API功能,这些功能在官方的版本演进中涉及到一些变化。为了确保示例能够正确运行,我们要求langchain-core的版本需大于等于0.2。这是因为在0.2及以上版本中,我们所使用到的与事件流处理、流式输出相关的功能和API才得以完善和稳定,能够支持本示例的逻辑和操作。
langchain-core版本查询pip命令:
pip show langchain_core
代码查询:
import langchain_core
print(langchain_core.__version__)
为了使astream_events API能够正常且高效地工作,我们需要注意以下几点:
- 异步操作的使用:在编写与astream_events API交互的代码时,务必在相关函数和方法中合理运用异步编程特性。具体来说,将涉及到调用该API或处理其返回结果的函数定义为async函数。
- 自定义函数与回调处理:如果我们定义了自定义函数或者可运行项(Runnable),请确保正确处理回调机制。具体而言,在函数内部,当涉及到需要响应astream_events API产生的事件时,要将回调函数准确地传递到相应的处理环节。这可能包括在函数的参数中明确接收回调函数,并在合适的时机调用该回调函数来处理事件数据。
- LLM流式传输的正确调用:当在没有LangChain表达式语言(LCEL)的环境中使用可运行项时,为了实现大语言模型(LLMs)的流式令牌传输,确保在调用LLM相关操作时,使用.astream()方法而非.ainvoke()方法。.astream()方法专门设计用于以流式方式获取LLM生成的结果,即模型会逐步返回生成的令牌,而不是一次性返回完整的结果。相比之下,.ainvoke()方法可能采用不同的调用机制,无法满足流式传输的需求。因此,正确选择 .astream()方法,能够确保按照预期的流式方式获取LLM的输出结果,从而与astream_events API协同工作,实现高效的事件流处理。
事件参考
下面是一个参考表,显示各种可运行对象可能发出的一些事件。
当流式传输正确实现时,对于可运行项的输入直到输入流完全消耗后才会知道。这意味着inputs通常仅包括end事件,而不包括start事件。
事件 | 名称 | 块 | 输入 | 输出 |
on_chat_model_start | [模型名称] | {"messages": [[SystemMessage, HumanMessage]]} | ||
on_chat_model_end | [模型名称] | {"messages": [[SystemMessage, HumanMessage]]} | AIMessageChunk(content="hello world") | |
on_llm_start | [模型名称] | {"input": "hello"} | ||
on_llm_stream | [模型名称] | 'Hello' | ||
on_llm_end | [模型名称] | 'Hello human!' | ||
on_chain_start | format_docs | |||
on_chain_stream | format_docs | "hello world!, goodbye world!" | ||
on_chain_end | format_docs | [Document(...)] | "hello world!, goodbye world!" | |
on_tool_start | some_tool | {"x": 1, "y": "2"} | ||
on_tool_end | some_tool | {"x": 1, "y": "2"} | ||
on_retriever_start | [检索器名称] | {"query": "hello"} | ||
on_retriever_end | [检索器名称] | {"query": "hello"} | [Document(...),..] | |
on_prompt_start | [模板名称] | {"question": "hello"} | ||
on_prompt_end | [模板名称] | {"question": "hello"} | ChatPromptValue(messages: [SystemMessage,...]) |
聊天模型事件
让我们首先看一下聊天模型产生的事件。
完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 导入用于与OpenAI聊天模型交互的ChatOpenAI类
from langchain_openai import ChatOpenAI
# 创建一个ChatOpenAI实例,使用gpt-3.5-turbo模型
chat_model = ChatOpenAI(model="gpt-3.5-turbo")
# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
events = []
async for event in chat_model.astream_events("hello", version="v2"):
events.append(event)
# 打印存储了所有事件的events列表
print(events)
# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
# 运行异步函数async_stream,启动整个异步流程
asyncio.run(main())
运行结果
[{'event': 'on_chat_model_start', 'data': {'input': 'hello'}, 'name': 'ChatOpenAI', 'tags': [], 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content='Hello', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content='!', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' How', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' can', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' I', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' assist', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' you', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' today', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content='?', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': 'fp_0165350fbb'}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_end', 'data': {'output': AIMessageChunk(content='Hello! How can I assist you today?', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': 'fp_0165350fbb'}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'parent_ids': []}]
进程已结束,退出代码为 0
上述代码中我们看到,astream_events中有个参数version="v2",那么参数是什么意思呢?这是一个beta API,这个版本参数能够让我们将对代码的破坏性更改降至最低。简而言之,LangChain现在让我们感到烦恼,这样以后就不必再烦恼了。v2仅适用于langchain-core >= 0.2.0。
查看开始事件
让我们看一下一些开始事件和一些结束事件。
完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 导入用于与OpenAI聊天模型交互的ChatOpenAI类
from langchain_openai import ChatOpenAI
# 创建一个ChatOpenAI实例,使用gpt-3.5-turbo模型
chat_model = ChatOpenAI(model="gpt-3.5-turbo")
# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
events = []
async for event in chat_model.astream_events("hello", version="v2"):
events.append(event)
# 打印start事件
print(events[0])
# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
# 运行异步函数async_stream,启动整个异步流程
asyncio.run(main())
运行结果
{
'event': 'on_chat_model_start',
'data': {
'input': 'hello'
},
'name': 'ChatOpenAI',
'tags': [],
'run_id': '627050c5-c24e-4a24-9986-5ee2d66ed75e',
'metadata': {
'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo',
'ls_model_type': 'chat',
'ls_temperature': None
},
'parent_ids': []
}
进程已结束,退出代码为 0
查看结束事件
完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 导入用于与OpenAI聊天模型交互的ChatOpenAI类
from langchain_openai import ChatOpenAI
# 创建一个ChatOpenAI实例,使用gpt-3.5-turbo模型
chat_model = ChatOpenAI(model="gpt-3.5-turbo")
# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
events = []
async for event in chat_model.astream_events("hello", version="v2"):
events.append(event)
# 打印end事件
print(events[-1])
# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
# 运行异步函数async_stream,启动整个异步流程
asyncio.run(main())
运行结果
{
'event': 'on_chat_model_end',
'data': {
'output': AIMessageChunk(content = 'Hello! How can I assist you today?', additional_kwargs = {}, response_metadata = {
'finish_reason': 'stop',
'model_name': 'gpt-3.5-turbo-0125',
'system_fingerprint': 'fp_0165350fbb'
}, id = 'run-246301aa-503c-4c45-8ecd-beb6f64cbe71')
},
'run_id': '246301aa-503c-4c45-8ecd-beb6f64cbe71',
'name': 'ChatOpenAI',
'tags': [],
'metadata': {
'ls_provider': 'openai',
'ls_model_name': 'gpt-3.5-turbo',
'ls_model_type': 'chat',
'ls_temperature': None
},
'parent_ids': []
}
进程已结束,退出代码为 0
结束
好了,以上就是本次分享的全部内容了,不知道大家是否理解并掌握了以上的全部内容。本次内容偏理论的东西比较多,请大家慢慢研究并认真消化。
总的来说,在LangChain中,LCEL、Runnable、Stream、Chain、StreamEvents这几个概念各自承担着不同的角色,但它们之间又相互关联、协同工作,共同助力构建复杂且高效的语言处理流程。
- LCEL(LangChain Expression Language):是一种用于表达和组合可运行组件的语言,提供了一种简洁、直观的方式来定义和构建复杂的操作流程,让开发者可以用类似表达式的语法将不同组件连接起来。
- Runnable:是一个可运行的抽象对象,代表着一个可以接收输入并产生输出的操作或计算单元。LangChain里很多组件都实现了Runnable接口,如语言模型、提示模板等。
- Stream:即流式处理,允许在处理过程中逐步输出结果,而不是等整个任务完成后一次性返回,能有效提高响应速度和用户体验。
- Chain:表示将多个操作或组件按顺序连接起来形成的一个链条,前一个组件的输出会作为后一个组件的输入,实现复杂任务的分步处理。
- StreamEvents:与流式处理相关,用于定义和处理流式输出过程中产生的各种事件,如开始、中间结果、结束等。
对于今天的内容,大家都Get到了吗?博主还是那句话:请大家多去大胆的尝试和使用,成功总是在不断的失败中试验出来的,敢于尝试就已经成功了一半。那么本次分享就到这了。如果大家对博主分享的内容感兴趣或有帮助,请点赞和关注。大家的点赞和关注是博主持续分享的动力🤭,博主也希望让更多的人学习到新的知识。
更多推荐
所有评论(0)