介绍

大家好,博主又来给大家分享知识了,那么今天又给大家分享什么内容呢?今天我要给大家分享的内容是LangChain工作流编排。那么什么是LangChain工作流编排呢?

简单来说,LangChain工作流编排就是将多个与自然语言处理相关的组件,像提示模板、大语言模型、各种实用工具等巧妙地组合在一起,形成一个有条理、可执行的流程。

LangChain提供了多种方式来实现这种编排,其中很有特色的就是链式调用和LCEL(LangChain表达式语言)。好了,那么我们直接进入正题。

LangChain Expression Language

LCEL(LangChain Expression Language)是一种强大的工作流编排工具,可以从基本组件构建复杂任务链条(Chain),并支持诸如流式处理、并行处理和日志记录等开箱即用的功能。

LCEL从第一天起就被设计为支持将原型投入生产且无需更改代码,从最简单的"提示 + LLM"链到最复杂的链(我们已经看到有人成功地在生产中运行了包含数百步的LCEL链)。而使用LCEL的一些原因包含以下几个亮点:

一流的流式支持:当我们使用LCEL构建链时,我们将获得可能的最佳时间到第一个标记(直到输出的第一块内容出现所经过的时间)。对于某些链,这意味着我们直接从LLM流式传输标记到流式输出解析器,我们将以与LLM提供程序输出原始标记的速率相同的速度获得解析的增量输出块。

异步支持:使用LCEL构建的任何链都可以使用同步API(例如,在我们的Jupyter Notebook中进行原型设计)以及异步API(例如,在LangServe服务器中)进行调用。这使得可以在原型和生产中使用相同的代码和具有出色的性能,并且能够在同一服务器中处理许多并发请求。

优化的并行执行:每当我们的LCEL链具有可以并行执行的步骤时(例如,如果我们从多个检索器中获取文档),我们会自动执行,无论是在同步接口还是异步接口中,以获得可能的最小延迟。

重试和回退:LCEL链的任何部分配置重试和回退。这是使我们的链在规模上更可靠的好方法。LangChain官方目前正在努力为重试/回退添加流式支持,这样我们就可以获得额外的可靠性而无需任何延迟成本。

访问中间结果:对于更复杂的链,访问中间步骤的结果通常非常有用,即使在生成最终输出之前。这可以用于让最终用户知道正在发生的事情,甚至只是用于调试我们的链。我们可以流式传输中间结果,并且在每个LangServe服务器上都可以使用。

输入和输出模式:输入和输出模式为每个LCEL链提供了从链的结构推断出的PydanticJSONSchema模式。这可用于验证输入和输出,并且是LangServe的一个组成部分。

Runnable Interface

为了尽可能简化创建自定义链的过程,LangChain实现了一个"Runnable"协议。许多LangChain组件都实现了”Runnable“协议,包括聊天模型、LLMs、输出解析器、检索器、提示模板等等。此外,还有一些有用的基本组件可用于处理可运行对象。这是一个标准接口,可以轻松定义自定义链,并以标准方式调用它们。标准接口包括:

  • stream: 返回响应的数据块
  • invoke: 对输入调用链
  • batch: 对输入列表调用链

这些还有相应的异步方法,应该与asyncio一起使用await语法以实现并发:

  • astream:异步返回响应的数据块
  • ainvoke:异步对输入调用链
  • abatch:异步对输入列表调用链
  • astream_log:异步返回中间步骤,以及最终响应
  • astream_events:beta流式传输链中发生的事件(在langchain-core的0.1.14中引入)

输入类型输出类型因组件而异:

组件 输入类型 输出类型
提示 字典 提示值
聊天模型 单个字符串、聊天消息列表或提示值 聊天消息
LLM 单个字符串、聊天消息列表或提示值 字符串
输出解析器 LLM或聊天模型的输出 取决于解释器
检索器 单个字符串 文档列表
工具 单个字符串或字典,取决于工具 取决于工具

 所有可运行对象都公开输入和输出模式以检查输入和输出:

  • input_schema:从可运行对象结构自动生成的输入Pydantic模型。
  • output_schema:从可运行对象结构自动生成的输出Pydantic模型。

流式运行对于使基于LLM的应用程序对最终用户具有响应性至关重要。重要的LangChain原语,如聊天模型、输出解析器、提示模板、检索器和代理都实现了LangChain Runnable接口。该接口提供了两种通用的流式内容方法:

  1. 同步stream和异步astream:流式传输链中的最终输出的默认实现。
  2. 异步astream_events和异步astream_log:这些方法提供了一种从链中流式传输中间步骤和最终输出的方式。让我们看看这两种方法,并尝试理解如何使用它们。

Stream

。所有Runnable对象都实现了一个名为Stream的同步方法和一个名为astream的异步变体。这些方法旨在以块的形式流式传输最终输出,尽快返回每个块。只有在程序中的所有步骤都知道如何处理输入流时,才能进行流式传输;即,逐个处理输入块,并产生相应的输出块。

这种处理的复杂性可以有所不同,从简单的任务,如发出LLM生成的令牌,到更具挑战性的任务,如在整个JSON完成之前流式传输JSON结果的部分。开始探索流式传输的最佳方法是从LLM应用程序中最重要的组件开始——LLM本身!

LLM和聊天模型

大型语言模型及其聊天变体是基于LLM的应用程序的主要瓶颈。大型语言模型可能需要几秒钟才能对查询生成完整的响应。这比应用程序对最终用户具有响应性的约200-300毫秒的阈值要慢得多。使应用程序具有更高的响应性的关键策略是显示中间进度;即,逐个令牌流式传输模型的输出。我们将展示使用聊天模型进行流式传输的示例。从以下选项中选择一个:

同步Stream运行

让我们从同步stream API开始:

完整代码
# 从 langchain_openai库中导入ChatOpenAI类,用于与OpenAI聊天模型交互
from langchain_openai import ChatOpenAI

# 初始化一个ChatOpenAI实例,使用gpt-3.5-turbo模型并开启流式响应
model = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True)

# 创建一个空列表chunks,用于存储模型流式输出的文本块
chunks = []

# 遍历模型对问题的流式输出
for chunk in model.stream("彩虹是什么颜色?"):
    # 将每次流式输出的文本块添加到 chunks 列表中
    chunks.append(chunk)
    # 打印当前文本块内容,用|分隔且立即刷新输出
    print(chunk.content, end="|", flush=True)
运行结果
|彩|虹|通|常|呈|现|红|橙|黄|绿|蓝|靛|紫|七|种|颜|色|,并|且|有|时|两|端|还|有|一|些|淡|淡|的|霞|光|色|。||
进程已结束,退出代码为 0

异步AStream运行

或者,我们的项目工程大多数都是再异步环境中运行,可以考虑使用异步astream API

完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 从langchain_openai模块导入ChatOpenAI类,该类用于与OpenAI的聊天模型进行交互
from langchain_openai import ChatOpenAI


# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
    # 创建一个ChatOpenAI实例,指定使用gpt-3.5-turbo模型,并开启流式响应
    model = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True)
    # 初始化一个空列表chunks,用于存储模型流式返回的文本块
    chunks = []
    # 异步迭代模型对问题的流式响应结果
    async for chunk in model.astream("彩虹是什么颜色?"):
        # 将每次流式返回的文本块添加到chunks列表中
        chunks.append(chunk)
        # 打印当前文本块的内容,以|作为分隔符,并立即刷新输出缓冲区
        print(chunk.content, end="|", flush=True)

# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
    # 运行异步函数main,启动整个异步流程
    asyncio.run(main())
运行结果
|彩|虹|通|常|由|七|种|颜|色|组|成|,|按|照|顺|序|分|别|是|红|、|橙|、|黄|、|绿|、|蓝|、|靛|(|蓝|紫|)|和|紫|。||
进程已结束,退出代码为 0

检查其中一个块

让我们检查其中一个块:

完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 从langchain_openai模块导入ChatOpenAI类,该类用于与OpenAI的聊天模型进行交互
from langchain_openai import ChatOpenAI


# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
    # 创建一个ChatOpenAI实例,指定使用gpt-3.5-turbo模型,并开启流式响应
    model = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True)
    # 初始化一个空列表chunks,用于存储模型流式返回的文本块
    chunks = []
    # 异步迭代模型对问题的流式响应结果
    async for chunk in model.astream("彩虹是什么颜色?"):
        # 将每次流式返回的文本块添加到chunks列表中
        chunks.append(chunk)
    # 检查其中一个块
    print(chunks[1])

# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
    # 运行异步函数main,启动整个异步流程
    asyncio.run(main())
运行结果
content='彩' additional_kwargs={} response_metadata={} id='run-dc7251d4-74d7-4094-a38e-879f9373fcf2'

进程已结束,退出代码为 0

叠加消息块

我们得到了一个称为AIMessageChunk的东西。该块表示AIMessage的一部分。消息块是可叠加的,我们可以简单地将它们相加以获得到目前为止的响应状态!

完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 从langchain_openai模块导入ChatOpenAI类,该类用于与OpenAI的聊天模型进行交互
from langchain_openai import ChatOpenAI


# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
    # 创建一个ChatOpenAI实例,指定使用gpt-3.5-turbo模型,并开启流式响应
    model = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True)
    # 初始化一个空列表chunks,用于存储模型流式返回的文本块
    chunks = []
    # 异步迭代模型对问题的流式响应结果
    async for chunk in model.astream("彩虹是什么颜色?"):
        # 将每次流式返回的文本块添加到chunks列表中
        chunks.append(chunk)

    # 初始化一个变量用于存储合并后的消息块
    combined_chunk = chunks[0]
    # 从索引2开始循环到索引4,将后续的消息块依次合并
    for i in range(1, 5):
        # 将消息快叠加
        combined_chunk = combined_chunk + chunks[i]

    # 打印叠加后的消息块
    print(combined_chunk)

# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
    # 运行异步函数main,启动整个异步流程
    asyncio.run(main())
 运行结果
content='彩虹是由' additional_kwargs={} response_metadata={} id='run-ad0352b7-d26e-4465-8ab8-b41d6ecd20f5'

进程已结束,退出代码为 0

Chain

。几乎所有的LLM应用程序都涉及不止一步的操作,而不仅仅是调用语言模型。让我们使用LangChainLCEL(表达式语言)构建一个简单的链,该链结合了一个提示、模型和解析器,并验证流式传输是否正常工作。

我们将使用StrOutputParser来解析模型的输出。这是一个简单的解析器,从AIMessageChunk中提取content字段,给出模型返回的token

LCEL是一种声明式的方式,通过将不同的LangChain原语链接在一起来指定一个"程序"。使用LCEL创建的链可以自动实现streamastream,从而实现对最终输出的流式传输。事实上,使用LCEL创建的链实现了整个标准Runnable接口。

我们用代码来演示:

完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 导入用于将输出解析为字符串的StrOutputParser类
from langchain_core.output_parsers import StrOutputParser
# 导入用于创建聊天提示模板的ChatPromptTemplate类
from langchain_core.prompts import ChatPromptTemplate
# 导入用于与OpenAI聊天模型交互的ChatOpenAI类
from langchain_openai import ChatOpenAI

# 初始化一个ChatOpenAI实例,使用gpt-3.5-turbo模型并开启流式响应
chat_model = ChatOpenAI(model_name="gpt-3.5-turbo", streaming=True)

# 创建一个聊天提示模板,根据输入的topic生成关于该主题的笑话提示
chat_prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")

# 初始化一个输出解析器,将模型输出解析为字符串
parser = StrOutputParser()

# 将提示模板、聊天模型和解析器组合成一个处理链
chain = chat_prompt | chat_model | parser


# # 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
    # 异步流式处理,输入主题并迭代获取输出块
    async for chunk in chain.astream({"topic": "编程"}):
        # 打印输出块,用|分隔并立即刷新输出
        print(chunk, end="|", flush=True)


# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
    # 运行异步函数main,启动整个异步流程
    asyncio.run(main())

运行结果

|程序|员|去|看|电|影|,|被|问|到|“|你|要|坐|第|几|排|?”|他|回|答|:“|我|要|坐|第|0|排|!”|因|为|在|编|程|中|,|数组|是|从|0|开始|计|数|的|。||
进程已结束,退出代码为 0

请注意,即使我们在上面的链条末尾使用了parser,我们仍然可以获得流式输出。 parser会对每个流式块进行操作。许多LCEL基元也支持这种转换式的流式传递,这在构建应用程序时非常方便。

自定义函数可以被设计为返回生成器,这样就能够操作流。

某些可运行实体,如提示模板和聊天模型,无法处理单个块,而是聚合所有先前的步骤。这些可运行实体可以中断流处理。

LangChain表达语言允许我们将链的构建与使用模式(例如同步/异步、批处理/流式等)分开。如果这与我们构建的内容无关,我们也可以依赖于标准的命令式编程方法,通过在每个组件上调用invokebatchstream,将结果分配给变量,然后根据需要在下游使用它们。

使用输入流

如果我们想要在输出生成时从中流式传输JSON,该怎么办呢?

如果我们依赖json.loads来解析部分JSON,那么解析将失败,因为部分JSON不会是有效的JSON

当出现这种情况时,我们可能会束手无策,声称无法流式传输JSON

事实证明,有一种方法可以做到这一点——解析器需要在输入流上操作,并尝试将部分JSON"自动完成"为有效状态。

让我们看看这样一个解析器的运行,以了解这意味着什么。

完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 导入用于将输出解析为字符串的StrOutputParser类
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
# 导入用于与OpenAI聊天模型交互的ChatOpenAI类
from langchain_openai import ChatOpenAI

# 创建一个ChatOpenAI实例,使用gpt-3.5-turbo模型
chat_model = ChatOpenAI(model="gpt-3.5-turbo")

# 初始化一个将输出解析为字符串的解析器实例
str_output_parser = StrOutputParser()

# 初始化一个将输出解析为JSON格式的解析器实例
json_output_parser = JsonOutputParser()

# 将聊天模型和JSON输出解析器组合成一个处理链
chain = chat_model | json_output_parser


# 定义一个异步函数,用于异步流式处理并输出结果
async def async_stream():
    # 异步迭代处理链的异步流式输出,输入特定指令要求模型以JSON格式输出相关信息
    async for text in chain.astream(
            "以JSON格式输出中国、美国和俄罗斯的国家及其人口列表。"
            '使用一个带有“countries”外部键的字典,其中包含国家列表。'
            "每个国家都应该有键`name`和`population`"
    ):
        # 打印输出的内容,并立即刷新输出缓冲区
        print(text, flush=True)


# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
    # 运行异步函数async_stream,启动整个异步流程
    asyncio.run(async_stream())
运行结果
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': '中国'}]}
{'countries': [{'name': '中国', 'population': ''}]}
{'countries': [{'name': '中国', 'population': '14'}]}
{'countries': [{'name': '中国', 'population': '14亿'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': ''}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': ''}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': ''}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯', 'population': ''}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯', 'population': '1'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯', 'population': '1.'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯', 'population': '1.45'}]}
{'countries': [{'name': '中国', 'population': '14亿'}, {'name': '美国', 'population': '3.3亿'}, {'name': '俄罗斯', 'population': '1.45亿'}]}

进程已结束,退出代码为 0

Stream Events

事件流。到目前,我们已经详细了解了LangChainstreamastream方法的工作原理,它们分别以同步和异步的方式实现了模型响应的流式输出。现在,让我们深入探讨与这些流式输出紧密相关的事件流机制。

LangChain中,与流式输出相关的部分API目前处于beta测试阶段。例如,在处理流式响应过程中涉及的一些用于监听和处理事件流的API可能还不稳定,后续会根据开发者社区的反馈进行调整和完善。这种调整可能不仅限于细微的修改,在某些情况下可能会对API的使用方式或功能特性做出较大变动。

本示例主要演示了LangChain中特定的V2 API功能,这些功能在官方的版本演进中涉及到一些变化。为了确保示例能够正确运行,我们要求langchain-core的版本需大于等于0.2。这是因为在0.2及以上版本中,我们所使用到的与事件流处理、流式输出相关的功能和API才得以完善和稳定,能够支持本示例的逻辑和操作。

langchain-core版本查询pip命令:

pip show langchain_core

代码查询:

import langchain_core

print(langchain_core.__version__)

为了使astream_events API能够正常且高效地工作,我们需要注意以下几点:

  • 异步操作的使用:在编写与astream_events API交互的代码时,务必在相关函数和方法中合理运用异步编程特性。具体来说,将涉及到调用该API或处理其返回结果的函数定义为async函数。
  • 自定义函数与回调处理:如果我们定义了自定义函数或者可运行项(Runnable),请确保正确处理回调机制。具体而言,在函数内部,当涉及到需要响应astream_events API产生的事件时,要将回调函数准确地传递到相应的处理环节。这可能包括在函数的参数中明确接收回调函数,并在合适的时机调用该回调函数来处理事件数据。
  • LLM流式传输的正确调用:当在没有LangChain表达式语言(LCEL)的环境中使用可运行项时,为了实现大语言模型(LLMs)的流式令牌传输,确保在调用LLM相关操作时,使用.astream()方法而非.ainvoke()方法。.astream()方法专门设计用于以流式方式获取LLM生成的结果,即模型会逐步返回生成的令牌,而不是一次性返回完整的结果。相比之下,.ainvoke()方法可能采用不同的调用机制,无法满足流式传输的需求。因此,正确选择 .astream()方法,能够确保按照预期的流式方式获取LLM的输出结果,从而与astream_events API协同工作,实现高效的事件流处理。

事件参考

下面是一个参考表,显示各种可运行对象可能发出的一些事件。

当流式传输正确实现时,对于可运行项的输入直到输入流完全消耗后才会知道。这意味着inputs通常仅包括end事件,而不包括start事件。

事件 名称 输入 输出
on_chat_model_start [模型名称] {"messages": [[SystemMessage, HumanMessage]]}
on_chat_model_end [模型名称] {"messages": [[SystemMessage, HumanMessage]]} AIMessageChunk(content="hello world")
on_llm_start [模型名称] {"input": "hello"}
on_llm_stream [模型名称] 'Hello'
on_llm_end [模型名称] 'Hello human!'
on_chain_start format_docs
on_chain_stream format_docs "hello world!, goodbye world!"
on_chain_end format_docs [Document(...)] "hello world!, goodbye world!"
on_tool_start some_tool {"x": 1, "y": "2"}
on_tool_end some_tool {"x": 1, "y": "2"}
on_retriever_start [检索器名称] {"query": "hello"}
on_retriever_end [检索器名称] {"query": "hello"} [Document(...),..]
on_prompt_start [模板名称] {"question": "hello"}
on_prompt_end [模板名称] {"question": "hello"} ChatPromptValue(messages: [SystemMessage,...])

聊天模型事件

让我们首先看一下聊天模型产生的事件。

完整代码

# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 导入用于与OpenAI聊天模型交互的ChatOpenAI类
from langchain_openai import ChatOpenAI

# 创建一个ChatOpenAI实例,使用gpt-3.5-turbo模型
chat_model = ChatOpenAI(model="gpt-3.5-turbo")


# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
    events = []
    async for event in chat_model.astream_events("hello", version="v2"):
        events.append(event)
    # 打印存储了所有事件的events列表
    print(events)


# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
    # 运行异步函数async_stream,启动整个异步流程
    asyncio.run(main())
运行结果

[{'event': 'on_chat_model_start', 'data': {'input': 'hello'}, 'name': 'ChatOpenAI', 'tags': [], 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content='Hello', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content='!', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' How', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' can', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' I', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' assist', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' you', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content=' today', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content='?', additional_kwargs={}, response_metadata={}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_stream', 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': 'fp_0165350fbb'}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'parent_ids': []}, {'event': 'on_chat_model_end', 'data': {'output': AIMessageChunk(content='Hello! How can I assist you today?', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': 'fp_0165350fbb'}, id='run-6aeed14f-c6ac-4aad-a864-552c5da2bdc5')}, 'run_id': '6aeed14f-c6ac-4aad-a864-552c5da2bdc5', 'name': 'ChatOpenAI', 'tags': [], 'metadata': {'ls_provider': 'openai', 'ls_model_name': 'gpt-3.5-turbo', 'ls_model_type': 'chat', 'ls_temperature': None}, 'parent_ids': []}]

进程已结束,退出代码为 0

上述代码中我们看到,astream_events中有个参数version="v2",那么参数是什么意思呢?这是一个beta API,这个版本参数能够让我们将对代码的破坏性更改降至最低。简而言之,LangChain现在让我们感到烦恼,这样以后就不必再烦恼了。v2仅适用于langchain-core >= 0.2.0

查看开始事件

让我们看一下一些开始事件和一些结束事件。

完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 导入用于与OpenAI聊天模型交互的ChatOpenAI类
from langchain_openai import ChatOpenAI

# 创建一个ChatOpenAI实例,使用gpt-3.5-turbo模型
chat_model = ChatOpenAI(model="gpt-3.5-turbo")


# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
    events = []
    async for event in chat_model.astream_events("hello", version="v2"):
        events.append(event)
    # 打印start事件
    print(events[0])


# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
    # 运行异步函数async_stream,启动整个异步流程
    asyncio.run(main())
运行结果
{
	'event': 'on_chat_model_start',
	'data': {
		'input': 'hello'
	},
	'name': 'ChatOpenAI',
	'tags': [],
	'run_id': '627050c5-c24e-4a24-9986-5ee2d66ed75e',
	'metadata': {
		'ls_provider': 'openai',
		'ls_model_name': 'gpt-3.5-turbo',
		'ls_model_type': 'chat',
		'ls_temperature': None
	},
	'parent_ids': []
}

进程已结束,退出代码为 0

查看结束事件

完整代码
# 导入asyncio库,用于支持Python的异步编程功能
import asyncio
# 导入用于与OpenAI聊天模型交互的ChatOpenAI类
from langchain_openai import ChatOpenAI

# 创建一个ChatOpenAI实例,使用gpt-3.5-turbo模型
chat_model = ChatOpenAI(model="gpt-3.5-turbo")


# 定义一个异步函数main,该函数将包含主要的业务逻辑
async def main():
    events = []
    async for event in chat_model.astream_events("hello", version="v2"):
        events.append(event)
    # 打印end事件
    print(events[-1])


# 判断当前脚本是否作为主程序运行
if __name__ == "__main__":
    # 运行异步函数async_stream,启动整个异步流程
    asyncio.run(main())
运行结果
{
	'event': 'on_chat_model_end',
	'data': {
		'output': AIMessageChunk(content = 'Hello! How can I assist you today?', additional_kwargs = {}, response_metadata = {
			'finish_reason': 'stop',
			'model_name': 'gpt-3.5-turbo-0125',
			'system_fingerprint': 'fp_0165350fbb'
		}, id = 'run-246301aa-503c-4c45-8ecd-beb6f64cbe71')
	},
	'run_id': '246301aa-503c-4c45-8ecd-beb6f64cbe71',
	'name': 'ChatOpenAI',
	'tags': [],
	'metadata': {
		'ls_provider': 'openai',
		'ls_model_name': 'gpt-3.5-turbo',
		'ls_model_type': 'chat',
		'ls_temperature': None
	},
	'parent_ids': []
}

进程已结束,退出代码为 0

结束

好了,以上就是本次分享的全部内容了,不知道大家是否理解并掌握了以上的全部内容。本次内容偏理论的东西比较多,请大家慢慢研究并认真消化。

总的来说,在LangChain中,LCELRunnableStreamChainStreamEvents这几个概念各自承担着不同的角色,但它们之间又相互关联、协同工作,共同助力构建复杂且高效的语言处理流程。

  • LCEL(LangChain Expression Language):是一种用于表达和组合可运行组件的语言,提供了一种简洁、直观的方式来定义和构建复杂的操作流程,让开发者可以用类似表达式的语法将不同组件连接起来。
  • Runnable:是一个可运行的抽象对象,代表着一个可以接收输入并产生输出的操作或计算单元。LangChain里很多组件都实现了Runnable接口,如语言模型、提示模板等。
  • Stream:即流式处理,允许在处理过程中逐步输出结果,而不是等整个任务完成后一次性返回,能有效提高响应速度和用户体验。
  • Chain:表示将多个操作或组件按顺序连接起来形成的一个链条,前一个组件的输出会作为后一个组件的输入,实现复杂任务的分步处理。
  • StreamEvents:与流式处理相关,用于定义和处理流式输出过程中产生的各种事件,如开始、中间结果、结束等。

对于今天的内容,大家都Get到了吗?博主还是那句话:请大家多去大胆的尝试和使用,成功总是在不断的失败中试验出来的,敢于尝试就已经成功了一半。那么本次分享就到这了。如果大家对博主分享的内容感兴趣或有帮助,请点赞和关注。大家的点赞和关注是博主持续分享的动力🤭,博主也希望让更多的人学习到新的知识。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐