把 Agent 放进 Flink:一套可续跑、可恢复、可验证的运行时设计

基于 apache/flink-agents 运行时实现的机制拆解。本文聚焦执行挂起、恢复续跑、副作用一致性、事件时间与跨语言边界,讨论 Agent 真正进入 Flink 之后需要补上的运行时约束。

导读

将 Agent 接入 Flink 作业,让它像普通处理逻辑一样消费输入、更新状态并输出结果,看上去是一个直接的落地方案。但真正进入运行时后,首先暴露出来的通常不是功能缺失,而是线程占用、执行挂起、状态恢复、副作用重复以及事件时间漂移。

本文讨论的重点,不是 Agent 能否接入 Flink,而是接入之后如何确立并维持运行时边界。过去关于 Agent 的讨论多集中在提示词、工具调用和工作流编排等能力层;进入 Flink 之后,问题的重心会转到执行层。这里承接的已不再是一次模型调用,而是一段需要持续推进的执行过程:它可能等待外部系统,可能在中途挂起,可能在恢复后继续运行,还要参与状态推进并遵循事件时间语义。

因此,对于熟悉流处理,或正在尝试把 Agent 引入流式系统的工程师来说,核心问题不再是“Agent 能不能调起来”,而是这些边界如何成立:主线程是否会被长耗时动作阻塞,中断的执行如何接续,状态恢复后如何避免外部副作用重复,挂起任务如何影响 Watermark 的推进,以及引入 Python Agent 时跨语言边界应如何划分。

本文试图建立一套围绕这些运行时问题的判断框架。它不按功能罗列机制,而是沿着执行、恢复、副作用、事件时间和跨语言这条主线展开,说明这些问题为什么会一起出现,当前实现又如何处理它们,同时交代相应的成本与局限,帮助读者判断不同落地方案的边界与风险。

顺着这条主线往下读,后续内容会先从最小骨架出发,说明这里面对的已经不是普通函数调用,再解释普通算子接口为什么装不下这段执行过程;然后进入主线程让出、可续跑状态机 ActionTask、上下文注入和记忆分层,讨论执行主线如何被维持;最后再看外部副作用、事件时间和跨语言边界如何继续抬高系统复杂度,并在验证与结语中回到这套设计的收益与代价。前一层机制暴露出来的问题,通常会推动后一层机制出现。

分析基线与文章结构

本文的分析对象为 apache/flink-agents 公开仓库。讨论主要围绕 runtime 实现展开,并穿插 planapi 和示例模块以对齐上层使用方式。为避免后续代码演进导致行号漂移,文中的源码链接默认固定在 2026-04-22 的提交 785c5f97c96512a534f516fd8fb839de24fc1c1c

文章按同一条运行时问题链展开:先从最小执行骨架说明 Agent 为什么已经不是普通函数调用;再进入主线程让出、ActionTask 续跑、RunnerContext 装配和记忆分层,解释执行主线如何被保存和恢复;随后讨论外部副作用一致性、Watermark 与事件时间代价,以及 Python 跨语言执行边界;最后用测试验证和结语收束这套设计的收益、代价与残余边界。

这不是功能清单式介绍,而是一篇围绕执行、恢复、副作用、时间语义和跨语言边界展开的完整文章。每一章都回答前一层机制留下来的运行时问题。

第 1 章 从一个最小例子开始

本章通过一个极简的 Agent 执行流模型,展示引入大模型调用后的流处理逻辑为何不再是普通的函数调用,而是演变为一个微型运行时(Runtime)。

在典型的流处理场景中接入 Agent,表面上只是多了几步:接收用户输入、调用模型、执行工具,再把结果输出。但这些步骤往往不会在一次同步调用里结束。模型请求可能等几秒,工具调用可能先挂起再回来,一个动作还可能继续派生新的内部事件。

这样一来,算子就不能像普通函数那样从头跑到尾:主线程不能一直等,中途做到一半的进度要先记住,同一会话的上下文也要一起保存。问题因此变成:怎样让这条会被打断、之后还要继续的执行链稳定地往前推进。

最小运行时骨架

以下伪代码展示了该运行时的基本骨架,包含 EventActionTaskmemorydrive() 四个核心要素。

class MiniAgentOperator {
    Queue<Event> events;
    Queue<ActionTask> tasks;
    Map<String, Object> memory;

    void onInput(InputEvent input) {
        events.add(input);
        drive();
    }

    void drive() {
        while (true) {
            if (!tasks.isEmpty()) {
                ActionTask task = tasks.poll();
                ActionTaskResult result = task.invoke();
                if (!result.finished()) {
                    tasks.add(result.generatedTask());
                    return; // 挂起,释放当前线程
                }
                events.addAll(result.outputEvents());
                continue;
            }

            if (events.isEmpty()) {
                return;
            }

            Event event = events.poll();
            for (Action action : route(event)) {
                tasks.add(new ActionTask(action, event, memory));
            }
        }
    }
}

这段代码表明,外部输入转化为 Event 进入内部队列,路由生成若干 ActionTask。任务执行中会读写 memory 并可能挂起,整个过程由 drive() 持续推进。

产出物

状态与记忆

Agent Runtime 核心引擎

取出事件

生成任务

触发推进

触发推进

调度执行

产生

Unfinished (挂起)

Internal Event (回流)

Output Event (输出)

InputEvent

Event Queue

ActionTask Queue

Route Logic

Invoke Task

Drive Loop

Memory

Generated Events

Downstream

上述四个角色各自引入了不同的运行时边界。

Event 将输入与内部流转对象化。它允许一个动作派生出新的内部事件,使执行路径从简单的单次输入输出,延伸为一条内部流转链。

ActionTask 承载异步执行进度。由于模型请求或工具调用无法即时同步返回,ActionTask 负责保存中断点和上下文。当任务挂起时,系统能够明确记录当前停留在哪个动作,确保后续驱动时可以正确接续。

memory 管理跨步骤的会话上下文。在多步异步调用中,局部变量无法跨越线程挂起边界。memory 承担了状态的持久化职责,确保不同执行阶段能安全地读写同一份业务状态。

drive() 控制状态机的推进。它决定了事件与任务的调度顺序、线程的释放时机以及内部事件的回流策略,是保障执行语义和一致性的核心引擎。

这四个角色共同构成了 Agent 在 Flink 中的基本运行逻辑。后续复杂的工程机制,均是为保障该模型在分布式环境下的正确性而引入的代价与局限性处理。允许任务挂起,必然带来状态跨步保存的需求;引入状态持久化,必须结合 Flink 的 Checkpoint 机制解决恢复一致性;内部事件的流转与异步重试,则需要配合 Watermark 处理事件时间与外部副作用的去重。

Agent 在内部已经演变为一段持续推进的执行链。这种多步状态流转与线程挂起特性,使其无法直接适配 Flink 基础的同步 MapFunction,这也是在流式系统中接入 Agent 时需要解决的核心架构问题。

第 2 章 为什么 Agent 不能直接写成 MapFunction

本章回答两个问题:为什么 MapFunction 这类“一条输入一次回调”的接口装不下会被打断、之后还要继续的执行;为什么 Agent 需要先编译成可分发的 AgentPlan,再在运行时装配。

Agent 在外部表现为“处理单条输入,产出单条结果”,直观上与 Flink 的 MapFunction 类似。将 Agent 封装在 map() 中是一种常见的工程直觉。

然而,一旦引入 EventActionTaskmemory 以及主循环,处理一条输入就不再是“一次调用算完就返回”。它会拆成多步:可能等外部 I/O,可能中途挂起,可能派生新的内部事件,之后再继续推进。MapFunction 只有一次回调入口,很难表达“执行到哪一步了、挂起后怎么续上、恢复后从哪继续”这类运行时事实。

本章核心观点:MapFunction 描述的是“收到一条记录就同步算完并返回”,而 Agent 需要的是“同一会话上多步推进,必要时挂起,之后再继续”。

普通调用的局限性

假设输入流为文本流,Agent 包含一组 @Action 函数。如果让每条数据都通过 Agent 处理并输出,伪代码如下:

DataStream<Result> out = inputStream.map(new MapFunction<Input, Result>() {
    Agent agent = new ReActAgent(...);

    @Override
    public Result map(Input value) {
        return agent.process(value);
    }
});

如果 Agent 是纯同步、无状态、无外部副作用的处理器,这种实现是合理的。

但实际的 Agent 通常带有会话上下文,在处理单条输入时可能触发多步内部流转,调用大模型或工具,甚至在执行中途挂起。因此,基于单次回调的函数接口无法满足需求。

首先缺的是会话边界。把 Agent 放进 MapFunction 后,它只是这个算子实例里的一个本地对象,不会自动跟业务 key 对齐,也不天然落在 Flink 的 Checkpoint 可恢复范围内。同一用户的多条消息如果没有稳定地路由到同一个 key,上下文就会被打散;任务重启或迁移时,这个本地对象里的状态也不会被恢复。结果是:看起来在“调用 Agent”,实际上执行链还没真正进入 Flink 的状态边界。

普通函数接口与 Agent Runtime 的差异在于:

  • MapFunction 语义为“单条输入,单次回调”。
  • RichMapFunction 增加了生命周期与 keyed state,但核心语义仍为“单次回调”。
  • Agent Runtime 需要在同一会话内排队推进,能保存进度并在后续恢复继续,同时允许内部事件回流触发下一步。

两者架构差异如下:

真实运行时链:受控的执行流水线

建立会话边界

分发装配描述

实例化引擎

挂起与恢复调度

DataStream

KeyBy

AgentPlan

OperatorFactory

ActionExecutionOperator

直觉链:看起来可行,但缺少边界

无状态分发

阻塞调用

DataStream

MapFunction

Agent Process

如果把 Agent 只当成函数回调,执行过程就很容易跑到 Flink 的会话与状态边界之外:看起来结果算出来了,但状态不一定能按 key 对齐,也不一定能在失败恢复后延续。

KeyBy 与 RichFunction 的局限

使用 keyBy 配合 RichMapFunction 并维护 keyed state 也是一种思路,但仍有不足。

在框架实现中,KeyBy 提供了最低限度的会话边界。CompileUtils.connectToAgent(...) 中明确,当输入为 DataStream 时,会先通过 keyBy(keySelector) 进行转换 (CompileUtils.java#L44-L47)。这保证了同一会话数据路由至同一物理分区,使得状态与执行进度得以对齐。

KeyBy 只解决了“同一用户落到同一并行实例”的问题,不解决“这条执行链怎么推进”的问题。Agent 的执行往往不是一步到位:会触发后续动作、产生内部事件、等待外部返回。RichMapFunction 虽然能用 keyed state,但它仍然是“来一条数据就调用一次 map()”。遇到需要等待的步骤,你要么在 map() 里阻塞等待,要么把后续工作丢到异步路径并把推进逻辑自己管理;两种都会把恢复、调度和控制流问题推到框架之外。

运行时需要的不只是“能存状态”,还需要把同一 key 的执行顺序固定下来,并维护内部队列、推进入口和恢复入口。这些更像算子内部的一段调度循环,需要由 StreamOperator 承担,而不是靠一个函数回调勉强拼出来。

以如下输入流为例:

msg1 = {userId=42, text="你好"}
msg2 = {userId=42, text="继续"}

userId 为 key,能保证两条消息进入同一会话边界。但这只是“落点一致”,不是“推进一致”。例如 msg1 触发的动作挂起时,msg2 应该排队等待还是并发执行?如果并发,会话上下文会交错;如果排队,你就需要队列和推进规则。再加上恢复后从哪一步继续、内部事件和任务如何重新入队,这些都超出了函数回调接口能自然表达的范围。

普通函数接口表达“一次调用”,而 Agent Runtime 需要表达“多步推进”。会话边界、执行进度、内部事件和恢复入口都很难塞进一次回调里。

静态装配:从 AgentPlan 到 StreamOperator

直接将 Agent 对象传递给算子在分布式系统中面临以下问题:

  1. 跨语言执行:Flink 的主执行引擎基于 JVM。如果 Agent 位于 Python 生态,Java 的算子无法直接将其作为本地执行体。
  2. 序列化与资源分发:Agent 对象内部可能包含模型客户端、工具连接或线程池等不可序列化的资源。将包含这些资源的对象直接跨网络分发会导致任务提交和反序列化失败。

因此,系统传输的不应是实例化的 Agent 对象,而是静态配置描述。将 Agent 编译为包含动作定义、事件触发规则、资源需求和运行配置的 AgentPlan,再由运行时根据配置装配算子。伪代码如下:

String planJson = agent.compileToJson();

DataStream out = keyedStream.transform(
    "AgentOperator",
    new AgentOperatorFactory(planJson)
);

class AgentOperatorFactory {
    public Operator create() {
        AgentPlan plan = parse(planJson);
        return new ActionExecutionOperator(plan);
    }
}

在 Flink Agent 的源码实现中,CompileUtils 将输入流连接至 Agent,并在 KeyedStream 上挂载执行算子 (CompileUtils.java#L72-L83)。实现上通过 transform("action-execute-operator", ..., new ActionExecutionOperatorFactory(agentPlan, inputIsJava))AgentPlan 传递给工厂,最终在 TaskManager 端实例化 ActionExecutionOperator (ActionExecutionOperatorFactory.java#L33-L67)。

AgentPlan 是静态描述结构,包含 actionsactionsByEventresourceProviders 及相关配置 (AgentPlan.java#L94-L173)。它支持序列化传输,运行时根据该配置在 TaskManager 上重建执行组件、资源连接和上下文对象。

综上,落地路径需要是 KeyedStream -> AgentPlan -> OperatorFactory -> StreamOperatorKeyBy 先把会话边界固定住;AgentPlan 让运行时拿到一份可序列化、可分发的静态描述;StreamOperator 再把“多步推进、挂起与恢复”收进算子内部的调度循环里。
这条链路的代价很明确:开发和调试复杂度上升,你需要面对计划编译与算子装配,而不是只写一个 map()。把 Agent 放进算子只是第一步;下一章将讨论算子主线程为什么不能阻塞等待,以及执行如何在让出线程后继续推进。

第 3 章 主循环:事件、Mailbox 与执行闭环

本章问题:当一个 Action 涉及大模型推理、工具调用或网络请求时,Flink 算子的主线程为何不能阻塞等待?
后面讨论的可续跑、恢复和副作用控制,都建立在一个前提上:算子主线程不能被长时间卡住。

把 Agent 放进 StreamOperator,只解决了“这段逻辑放在哪里跑”的问题,没有解决“跑到一半卡住怎么办”。只要执行链里有模型请求、工具调用或网络等待,主线程就不能靠一个本地 while 循环一直跑到结束。本章要回答的是:主线程怎样先让出来,外部结果回来后又怎样把同一条执行链接上。

考虑以下场景:某个 Action 发起大模型请求并等待 5 秒。如果将完整的执行链置于主线程,主线程将被阻塞 5 秒。在普通服务中这属于慢调用,但在 Flink 算子中,主线程阻塞会导致输入处理、Mailbox 调度、Checkpoint Barrier 推进全部停滞,直接破坏流式算子的可用性。

本章围绕 ActionExecutionOperator.java 的三个入口:processElementprocessEventprocessActionTaskForKey,说明三件事:同一 Key 的多步执行怎样保持顺序,不同 Key 为什么还能继续推进,以及长耗时等待为什么不能把 Checkpoint Barrier 一起堵住。

这一章最核心的约束是:外部 I/O 在等结果时,算子主线程必须还能继续工作。

阻塞执行的局限性

在简化的单机逻辑中,处理外部输入通常包含以下步骤:包装内部事件、路由对应 Action、执行 Action 并产出新事件,循环直至处理完成。伪代码如下:

public void processElement(Input input) {
    Queue<Event> eventQueue = new LinkedList<>();
    eventQueue.add(new InputEvent(input));

    while (!eventQueue.isEmpty()) {
        Event event = eventQueue.poll();

        if (isOutputEvent(event)) {
            collectToDownstream(event);
            continue;
        }

        List<Action> actions = plan.getActionsTriggeredBy(event);
        for (Action action : actions) {
            List<Event> newEvents = action.execute(event);
            eventQueue.addAll(newEvents);
        }
    }
}

这种写法集中了所有控制流,在单机脚本或普通服务中较为常见。但它隐含了一个前提:每个动作都能在短时间内执行完毕,不会长时间占据线程。

Flink 的 processElement 运行在算子的主执行线程上。该线程除了处理业务逻辑,还负责接收输入数据、推进状态、响应 Mailbox 调度以及处理 Checkpoint Barrier 等控制流。如果主线程被长耗时动作阻塞,会导致整个算子实例停滞。

如果 action.execute() 发起一次耗时 5 秒的大模型请求,继续使用上述的阻塞 while 循环会导致主线程被占据 5 秒。其后果包括:

  1. 数据处理停滞:不同 Key 的后续数据无法进入,相互独立的会话处理被阻塞。
  2. 控制流中断:Barrier 无法推进,导致 Checkpoint 超时,任务丧失可恢复性。

因此,主线程在等待期间让出执行权是 Flink 算子正常运行的必要条件。系统必须保证在外部 I/O 等待时,算子主线程能将控制权交还给运行时。Flink 提供的 Mailbox 机制正是解决这一问题的基础。

Mailbox 与分段调度

要避免主线程被卡住,就不能要求它一次把整条长执行链跑完。运行时需要把这条链拆成多个短片段,每次只推进一步;当前实现里,负责接回这些片段的是 Mailbox

Mailbox 可以把“后面再做的事”重新塞回主线程。processElement 收到输入后,不再原地等待整条执行链结束,而是先把事件和任务保存下来,再把后续推进动作投递给 Mailbox。这样主线程可以先返回,去处理 Barrier 或其他 Key;等外部结果回来,再由 Mailbox 把同一个 Key 的后续步骤接回去。

调度流程如下所示:

外部大模型/网络 Flink Mailbox ActionExecutionOperator Flink 主线程 外部大模型/网络 Flink Mailbox ActionExecutionOperator Flink 主线程 第一阶段:事件进入与任务触发 第二阶段:主线程保持存活,处理其他事 第三阶段:外部回调激活 Mailbox processElement (Event A) 加入 Key A Event Queue 触发 Route,生成 Task A1 加入 Key A Task Queue drive() 取出 Task A1 调度 发起异步调用 (挂起) 让出执行权 (yield) 继续处理 Checkpoint 或其他 Key 异步结果返回 (Future 兑现) 放入 Mailbox 信件 消费信件,唤醒 Operator drive() 再次取出 Task A1 (续跑) 生成 OutputEvent / Task A2 结束或再次让出

在运行时实现中,processEvent 把动作装配成任务后,并不会原地阻塞把后续步骤全部跑完,而是把推进逻辑交给 Mailbox,再由 processActionTaskForKey 接着处理。变化的关键在于:系统不再要求“一次把整条请求处理完”,而是改成“每次只推进一个不会长时间阻塞主线程的片段”。

系统同时引入了 EventActionTask 两个概念,其职责区分如下:

  • Event 负责描述系统事件与业务流转(即触发哪个动作)。
  • ActionTask 负责描述任务当前的执行进度与挂起状态(即执行到了哪一步、如何恢复)。

两者分工不同,不能合并。Event 解决“接下来该触发什么”,ActionTask 解决“当前这一步做到哪里了、下次从哪接着做”。把这两层拆开后,主线程才能在长执行存在时继续保持可用。

会话内的串行约束

主线程不再阻塞后,不同 Key 之间可以继续推进,但同一会话内部的新问题也跟着出现了。

假设用户 A 连续发送 Input1Input2Input1 触发的动作挂起并让出主线程后,如果系统立即开始处理 Input2,两者的执行将在同一会话上下文中交错。对状态(如 Memory)的修改顺序、新事件的产生顺序都将变得不可控,进而破坏 Agent 强依赖上下文的执行语义。

因此,系统必须把约束说死:同一 Key 严格串行,不同 Key 才允许并发。只要某个 Key 还有没完成的任务,后续输入就必须先排队,等前面的任务链真正收尾后再进入。这件事靠状态后端里的任务队列和输入队列来完成。

这里至少要同时满足两件事:

  1. Mailbox 分段执行:将长执行拆分为片段,保证主线程不被阻塞。
  2. 同 Key 串行处理:保证同一会话的执行片段不交错,维护状态与语义的一致性。

算子内部的执行闭环

执行闭环主要体现在 ActionExecutionOperator 的几个核心方法中:

  • processElement:接收输入时,首先检查当前 Key 是否存在未完成任务。若有,则将新输入放入队列排队,避免并发冲突。
  • processEvent:根据事件匹配路由,组装出运行时任务(ActionTask),并将后续推进逻辑提交给 Mailbox,而非在当前调用中阻塞执行。
  • processActionTaskForKey:从当前 Key 的状态中取出任务并推进执行。若执行未结束,将更新后的任务状态写回,等待下一轮调度;若执行结束,则收集产生的新事件并触发下一轮处理流程。具体逻辑参考 ActionExecutionOperator.java#L448-L466ActionExecutionOperator.java#L547-L580。这部分代码实现了“取任务、推进、写回状态、决定下一步”的调度循环。

代价也很直接:系统要维护更多状态,调度路径更绕,排查问题时也更难看清到底卡在“等待外部结果”还是“没有被正确接回主线程”。

这一章解决的是主线程存活问题:外部 I/O 在等待时,主线程先让出来;结果回来后,再通过 Mailbox 接回后续步骤。EventActionTask 分别负责“该做什么”和“做到哪里了”,同 Key 串行则把会话内顺序固定住。但这样一来,“当前做到哪一步”就不能只放在线程栈里了;下一章将讨论这些任务状态如何保存、恢复,以及现场丢失后怎样退化重建。

第 4 章 可续跑执行:ActionTask 状态机

第三章解决的是“主线程先让出来”。但主线程让出来以后,还有一个更具体的问题:这条执行链停在了哪一步,下次怎么接着跑。Action 挂起后,系统必须把当前进度记下来,这样下次调度或故障恢复时才能继续。

本章分析 ActionTask.java 及其子类(JavaActionTaskPythonActionTaskPythonGeneratorActionTask),说明运行时怎样把“做到一半的 Action”变成一个能保存、能恢复、还能继续调度的对象。这里保存的不是线程栈或协程现场本身,而是“下一次应该从哪一步继续”的信息。

执行挂起与状态保存

以同步调用为例,用户代码直接调用模型:

public void executeAction(Event event, RunnerContext ctx) {
    action.invoke(event, ctx);
}

String prompt = buildPrompt(event);
String result = llm.chat(prompt);
ctx.sendEvent(new OutputEvent(result));

同步调用会阻塞主线程。把它改成回调或 Future,可以先返回,不再原地等结果。但如果“做到哪里了”只存在语言运行时的内存里,比如回调链、协程对象或 continuation,Flink 的状态机制就看不见它。这样一来,进程一旦重启,这些信息也就跟着丢了。

所以,主线程让出以后,剩下那半截执行不能只靠进程内对象撑着,必须转换成 Flink 能保存、恢复和再次调度的东西。

ActionTask:调度与恢复单元

系统用 ActionTask 表示“这个 Action 还没做完,之后还要继续跑”。一个 Action 不再要求一次执行到底,而是拆成多步。每次推进完一步后,通过 ActionTaskResult 告诉算子三件事:这一步是不是已经结束了,产生了哪些事件,下一步还要不要再放回队列。ActionTask.java#L33-L40

Event 负责触发 Action。processEvent(...) 接收 Event 后,调用 createActionTask(key, triggerAction, event) 创建执行单元存入 actionTasksKStateActionExecutionOperator.java#L381-L406

真正进入状态、参与后续调度和恢复的对象,就是 ActionTaskprocessActionTaskForKey(...) 会把它从状态里取出来,调用 invoke() 推进一步,再根据 ActionTaskResult 判断:这条任务链是不是结束了,要不要产出事件,还是把下一步任务重新放回状态里等下次继续。ActionExecutionOperator.java#L448-L580 ActionTask.java#L94-L118

ActionTask 里会记住当前 key、对应的 Action、触发它的 Event,以及继续推进需要的状态。Java 路径里,如果这一步还没结束,任务通常直接返回 this 作为下一次继续调度的对象;Python 路径里,第一次执行返回的 awaitable 会被包装成 PythonGeneratorActionTask,后面再继续推进。JavaActionTask.java#L84-L89 PythonActionTask.java#L55-L67

绑定了算子和状态对象的 runnerContextActionTask 中被声明为 transientActionTask.java#L50-L55 这也说明:真正写进 Flink 状态的,是任务如何继续调度的信息;至于语言运行时里的挂起现场,只能在进程内暂存,或者在恢复后重新搭起来。

分工关系如下:

执行结果

调度恢复层

业务流转层

产生新事件

产生下一步

循环入队

Event

Route to Action

ActionTask

Invoke

ActionTaskResult

Output Events

Next ActionTask

Event 负责业务流转,ActionTask 是恢复边界内的调度单元。

状态保存限制与 Durable Execution

Java continuation 和 Python coroutine 都绑着进程内的运行时上下文,不能当成稳定的跨进程状态来保存。就算勉强存下来,换了新的解释器实例或线程栈,也很难按原样接回去。

以多步调用为例:InputEvent 路由到 summarize Action,算子创建 ActionTask(key=A, event=InputEvent, action=summarize) 放入任务队列。ActionExecutionOperator.java#L381-L406 该 Action 顺序执行 durableExecute(loadProfile)durableExecute(callLlm(profile))。如果在第二次调用前发生故障,恢复时需解决两个问题:调度层确认是否继续执行该 ActionTask,以及调用层确认 loadProfile 是否完成及其结果。

Durable Execution

ActionTask

持有

ActionTask

Current State

Next State

Call Log

Call 1: API A
Status: DONE

Call 2: API B
Status: DONE

Call 3: API C
Status: PENDING

Next Calls...

durable execution 负责记住外部调用已经做到哪一步。每个 CallResult 会记录调用标识、参数摘要、状态和结果。CallResult.java#L45-L59 CallResult.java#L125-L185 恢复时,系统通过 currentCallIndex 对照这些记录:命中且已经结束的,直接复用;还是 pending 的,进入 reconcile 或重试;对不上的,说明后半段记录已经不可信,需要从那里往后清掉再重建。RunnerContextImpl.java#L470-L488 RunnerContextImpl.java#L518-L541 RunnerContextImpl.java#L658-L789

在 Python 路径里,第一次执行返回的 awaitable 会转换成 PythonGeneratorActionTask,后面靠它继续续跑。要是恢复后进程内的 pythonAwaitableRef 已经找不回来了,这条任务就只能退化成重新执行 PythonActionTaskPythonActionTask.java#L55-L67 PythonGeneratorActionTask.java#L45-L60

在 Java 路径里,ContinuationActionExecutor.executeAsync(...) 让原本同步的代码也能挂起。ContinuationActionExecutor.java#L112-L149 真正进入 Flink 状态的是 generatedActionTask;continuation、awaitable 引用和 durable 上下文则放在进程内的 transient map 里,供下一次调度时继续用。ActionExecutionOperator.java#L547-L580 恢复时,系统会先尝试从 transient map 里把 continuation 或 awaitable 引用拿回来;拿不回来,就只能新建 Java continuation,或者重新执行 Python Action。ActionExecutionOperator.java#L903-L918

执行恢复与退化机制

系统恢复时主要做两件事:

  1. 运行时将后续执行逻辑编码至 ActionTask,明确调度对象。
  2. 依赖 durable execution 记录已完成的外部调用结果,避免重复执行。

如果语言运行时里的 continuation 或 coroutine 已经随故障一起丢了,系统仍可以先靠 ActionTask 找回“哪条任务还没做完”,再靠 durable 记录跳过已经完成的外部调用。这条路径不是原样续上,而是退一步,用可重建的信息把执行重新拉起来。

Mailbox 负责让出线程,ActionTask 负责把“下一次从哪一步继续”变成可调度对象,因此第四章先把任务续跑这件事立住。但执行真正继续时,状态读写、durable 记录和跨语言续跑对象并不是直接挂在 ActionTask 上,而是要通过 RunnerContext 接入;这正是下一章要补上的接口层。等这层补齐后,后面才能继续讨论外部世界已经被改动时该怎样恢复。

第 5 章 RunnerContext:注入、装配与运行时接口

第 4 章说明了 ActionTask 负责把任务重新调度起来。但任务一旦真的开始执行,用户代码还要继续做很多事:读写状态、发送事件、调用外部系统。如果这些能力都直接以底层对象的形式暴露出去,用户代码就要自己处理很多本来不该自己管的细节,比如什么时候刷盘、什么时候把事件真正发出去、外部调用结果怎么记录,以及 Java 和 Python 的续跑对象怎么接回。

所以运行时需要一个统一入口,把这些底层能力先收住,再按一致的方式交给用户代码。RunnerContext 做的就是这件事。它不是为了少写几行代码,而是把状态读写、durable 记录和跨语言续跑这些运行时约束集中到一个地方,避免它们散到每个 Action 里。

在架构层面,用户代码通过 RunnerContext 操作底层资源,而无法直接接触状态对象或持久化账本:

底层运行时资源

统一入口

用户执行逻辑

操作接口

代理

代理

代理

隐式持有

ActionTask running

RunnerContext

Memory

Flink State / Cache

Durable Log

Call Results

Event Queue

Output Events

Continuation / Awaitable

进程内临时挂起栈

以下是其典型用法示例:

public void summarize(Event event, RunnerContext ctx) throws Exception {
    String userName = (String) ctx.getShortTermMemory().get("profile.name");
    String answer =
            ctx.durableExecute(
                    () -> callModel("user=" + userName + ", input=" + event.toString()));
    ctx.sendEvent(new OutputEvent(answer));
}

从用户视角看,这段代码只是读记忆、调外部能力、再发一个事件。但运行时内部做的事更多:记忆读写不是立刻直接刷到状态里,外部调用不是每次都真的重新执行,事件发送也不是一调用就立刻脱离当前任务边界。

RunnerContext 主要把三类事情收在一起。第一类是状态访问。它内部的 MemoryContext 持有感知记忆和短期记忆的缓存(CachedMemoryStore)及更新列表(RunnerContextImpl.java#L65-L94)。这样用户代码只管改业务字段,不用自己决定什么时候刷盘,也不用自己处理恢复时这些值该怎样接回。

第二类是外部调用。durableExecute() 在真正执行前,会先检查当前上下文里有没有已经命中的记录。命中了就直接复用,没命中才真的执行,并把结果写回持久化记录(RunnerContextImpl.java#L280-L305)。也就是说,用户代码看到的是一次普通调用,运行时看到的是一次带恢复语义的副作用。

第三类是跨语言差异。Java 和 Python 的续跑载体并不一样,但 RunnerContext 会把这种差异挡在实现层里,对外仍然给 Action 一套统一接口。这样,状态读写、外部调用恢复和跨语言续跑都从同一个入口进去,而不是让用户代码分别对接不同底层对象。

RunnerContext 也不是一个全局固定对象。它绑定着当前任务要访问的状态、持久化记录和语言级续跑载体,所以会随着 ActionTask 的切换而切换。ActionExecutionOperator 在调用任务前,会取出对应语言的上下文实例,再通过 switchActionContext 把 action 名称、memory context 和 key 切到当前任务上,同时挂载 continuation 或 awaitable 引用(ActionExecutionOperator.java#L877-L920)。

由于 Mailbox 路径保证主线程一次只推进一个任务,框架可以复用上下文实例,减少对象创建开销。复用的是同一个 RunnerContext 外壳,切换的是它背后指向的任务语义。这也解释了为什么记忆和 durable 记录能统一接到这里:用户写记忆时,数据先进入 CachedMemoryStore 缓存,任务结束时再由算子调用 persistMemory() 批量刷盘(RunnerContextImpl.java#L355-L358ActionExecutionOperator.java#L545-L580)。

引入统一入口的代价是,用户不能直接随意操作这些底层对象。但换来的好处也很直接:状态、恢复和跨语言逻辑都被收敛到了同一层。ActionTask 负责把挂起的任务重新调度起来,RunnerContext 负责让这次调度中的资源访问按框架规则发生。

这一章解决的是“接口从哪里进”的问题,还没解决“不同记忆该怎么分层存”的问题。短期记忆、感知缓存和长期记忆如果混成一类处理,会把完全不同的生命周期和访问代价搅在一起。下一章会继续讨论,为什么记忆在物理存储上必须分层。

第 6 章 Memory:分层记忆与压缩

第五章说明了记忆如何通过 RunnerContext 暴露给用户代码。那一章解决的是“从哪里访问”,这一章要回答的是“这些东西在底层到底怎么存”。在实现上,Memory 并不是一个统一的大对象。当前轮次的临时现场、跨轮会话状态和长期沉淀内容,在生命周期、访问方式和成本上都不一样,因此必须拆成感知记忆、短期记忆和长期记忆三层。

如果框架只给一个统一的 Memory,对运行时来说反而更难处理。当前消息和工具返回只在这一轮有用,会话状态要跨轮保留,而长期历史又往往要走外部检索。这三类数据混在一起后,你既说不清哪些该在本轮结束后清掉,哪些该继续保留,也说不清哪些应该留在 Flink state,哪些应该外置到向量库。实现一旦不分层,清理、恢复和访问成本都会互相牵连。

所以,分层不是概念上的分类,而是存储和运行时行为上的分类。系统需要明确区分:哪些内容紧贴当前执行链、哪些内容要跨轮保留并支持结构化更新、哪些内容体量太大只能外置并按需召回。

当前处理自动产生

运行时收尾自动清理

用户代码显式保存

用户代码显式沉淀

用户代码显式检索带回

长期记忆 Long-Term Memory

长周期沉淀

历史文档、摘要知识

外部 Vector Store

短期记忆 Short-Term Memory

跨轮会话期

Profile、上下文状态

Flink State + Cache

感知记忆 Sensory Memory

单次处理期

当前消息、工具返回

MemoryContext

RunnerContext

Event 输入

Clear

这三层记忆并不是彼此隔离的孤岛,但它们的流转方向是明确的。感知记忆主要跟着当前处理过程产生,并在当前输入对应的整轮执行结束后由运行时清理;用户代码可以显式把其中一部分写入短期记忆。短期记忆中的内容也可以进一步沉淀到长期记忆,而长期记忆在被检索命中后,召回结果会重新回到当前轮次参与后续处理。

感知记忆的单次处理生命周期

最靠近主执行路径的是感知记忆,它保存的是单次输入在内部流转期间的临时上下文。这包括当前轮次的 prompt、工具返回片段,以及还没有整理成最终输出的中间状态。它的特点不是“数据量小”,而是生命周期短:这一轮执行要能读到,等这一轮真正结束后又必须清掉。

如果把这些临时数据直接并进跨轮会话状态,本轮现场就会污染后续输入。恢复时也会更麻烦,因为系统很难再区分哪些是还没完成的处理中间态,哪些只是已经过期的临时信息。当前实现里,感知记忆通过 MemoryContext 单独管理,并在当前输入对应的一轮执行完成后显式清理。ActionExecutionOperator.java#L583-L586 RunnerContextImpl.java#L360-L362

短期记忆的树状打平与局部更新

短期记忆保存的是跨轮输入的会话级状态。它在物理上仍然属于当前 Flink 作业内的 keyed state,只是前面先经过一层进程内缓存;因此它可以跟随 checkpoint 恢复和迁移,但并不是要跨作业共享的那类记忆。它的典型访问方式不是整块替换,而是高频地改某个嵌套字段,比如只改 profile.name,不重写整个 profile

为了避免每次只改一个小字段都把整个对象反序列化再写回,系统把结构化对象打平到 MapState<String, MemoryItem> 里,并让父节点额外记录 subKeys 目录。MemoryObjectImpl.java#L205-L257 这样做不是单纯为了“扁平化好存”,而是为了同时支持两件事:按路径更新单个字段,以及从父节点快速找回它下面有哪些子项。否则只存叶子节点的话,像读取 profile 这样的对象就会退化成扫描整张表。

短期记忆在写入时也不会立刻落到后端,而是先进入 CachedMemoryStore。当前实现里,真正的 persistCache() 发生在 ActionTask 完成时;如果任务还没结束,相关 MemoryContext 会继续留在进程内,交给下一次续跑继续使用。RunnerContextImpl.java#L355-L358 这种设计的重点是把“高频小改动”和“真正写入状态后端”拆开:前者先走缓存,后者等到合适的运行时节点再统一提交。

长期记忆的外置与后台压缩

当上下文增长到多月对话、长文档或长期工作记录的规模后,继续把它们保存在 Flink state 里就不合适了。这类数据通常不是按路径精确读取,而是按语义检索召回。因此,长期记忆必须外置到专门的向量存储中,把大容量保存和语义检索从主执行路径里拆出去。

即使外置到了向量库,长期记忆也不能失去会话隔离。VectorStoreLongTermMemory 通过 jobId + key + memorySetName 做名字改写,把不同作业、不同会话、不同 memory set 的数据隔开。VectorStoreLongTermMemory.java#L246-L248 这也说明:长期记忆虽然技术上可以外置并长期保存,但当前设计默认仍然先按作业隔离;外置不等于默认跨作业共享,是否跨作业复用还需要额外的身份与隔离设计。

随着历史数据持续累积,检索命中的片段会越来越多,最后超过模型上下文窗口。为此,系统引入了压缩机制。CompactionFunctions.summarize(...) 会先取出一批旧记录,让模型生成摘要,再删除原记录、写回摘要,同时保留这些记录原本覆盖的时间范围和最近访问时间。CompactionFunctions.java#L86-L160

压缩的本质,是把“每次查询时反复喂给模型的大上下文成本”换成“后台偶尔做一次摘要”的成本。假设积累了 50 条长期记忆,每次检索都召回其中 20 条,不做处理的话,后续每次推理都要重复承担这 20 条的上下文负担。压缩后,系统删掉原始片段,保留摘要和时间范围,后面检索时只需带回更少的内容。这样,长期记忆既保住了历史信息,又不会持续把主路径上的检索和推理成本抬高。

但记忆一旦参与短期状态更新、输出事件生成,甚至进一步影响外部系统,问题就不再只是“存在哪里、怎样压缩”,而是恢复后这些已经生效的结果是否会被再次施加。下一章转向这一层更硬的边界:外部副作用一致性。

第 7 章 外部副作用一致性:ActionStateStore

第六章讨论的是记忆该怎么存;这一章讨论的是这些结果一旦已经生效,恢复后会不会再做一遍。最典型的情况是:某个 action 已经更新了记忆、发出了输出事件,但系统在下一次 Checkpoint 之前崩溃。恢复后,如果还要重新进入用户代码,外部效果就可能重复发生。ActionStateStore 解决的正是这个问题:它不关心 task 怎么续跑,而关心某个 action 是否已经整体完成,能不能直接跳过代码、复用结果。

ActionStateStoreActionTask、durable 的关系如图所示:

已完成 Action 级命中

未完成

单次调用命中

未命中

系统恢复/事件到达

查 ActionStateStore

跳过代码,直接回放结果与事件

进入 ActionTask 调度

查 Durable 账本

复用单次调用结果

真实执行外部调用

系统在推进某个 ActionTask 前,会先看对应的 ActionState。若该 action 已完整结束,就跳过用户代码,直接回放记忆更新和输出事件;若还没结束,就继续走 task 级调度,并利用 durable 机制复用 task 内已经确认过的外部调用。三层分工可以压成一句话:ActionTask 负责“接下来跑哪个 task”,durable 负责“task 内哪些调用不用重做”,ActionStateStore 负责“这个 action 能不能整段跳过”。

ActionStateStore 存的不是 ActionTask 本身,而是一条按“Key + 序号 + 触发事件 + Action”定位的 ActionState。action 未完成时,ActionState 里最重要的是 callResults,因为 durable 会在每次调用完成、写入 pending 槽位或截断不可信记录后立即持久化它们,恢复时据此从中间位置继续。action 完成后,记录重点会切换成 completedshortTermMemoryUpdatessensoryMemoryUpdatesoutputEvents;同时 markCompleted() 会清空 callResults,因为恢复路径已经从“继续 task 内调用”变成了“整段跳过并回放结果”。

这也解释了为什么 ActionStateStore 不能只存一个 completed=true。仅有完成标记只能告诉系统“不必再进用户代码”,却无法把该 action 已经产生的结果补回来。恢复时真正需要重新建立的是记忆更新和输出事件这些稳定事实,而不是进程内的 continuation 或 awaitable 现场。

副作用窗口最危险的地方在两次 Checkpoint 之间。只要 ActionState 的写入必须等下一次 Checkpoint 才生效,系统恢复时就可能不知道某个 action 已经做完。ActionStateStore 的价值就在这里:它让 action 级完成结果和中间 durable 调用记录更早写出去。以 KafkaActionStateStore 为例,它不是替代 Flink state,而是把 ActionState 作为外部日志持续写入;Checkpoint 仍然参与这套机制,用 recovery marker 重建状态,并在 checkpoint 完成后 prune 已稳定的旧记录。所以,这一层建立的是 action 级恢复语义,而不是 task 级续跑语义;目标就是尽可能避免恢复后再次进入已经完成的用户代码,从而缩小重复执行窗口,而不是承诺绝对的 external exactly-once。

不过,副作用窗口被压缩之后,运行时边界并没有结束。只要某条输入对应的执行尚未真正完成,时间语义仍可能被提前推进。下一章转向另一类同样不能靠重试掩盖的问题:Watermark 与事件时间边界。

第 8 章 事件时间代价:Watermark 与分段队列

普通流算子里,一条输入通常很快处理完,收到 Watermark 后直接往下游转发,通常不会出问题。但 Agent 算子不同:一条输入可能触发多步执行,中间还会因为模型请求或工具调用挂起。此时,“输入已经到达”和“结果已经产出”不再是同一个时刻。

例如,上游先后送来 A1B1,随后送来 Watermark(2)。如果 A1 触发长耗时调用并挂起,而 B1 很快完成,算子此时直接转发 Watermark(2),下游就会认为时间戳不大于 2 的结果已经到齐。等 A1 后面再产出结果,下游就可能把它当成迟到数据。Agent 算子里的 Watermark 因此不能只看“上游已经发来”,还要看对应时间边界内的输入是否真正处理完。

只用一个全局活跃计数器也不够。只要有未完成输入就阻塞所有 Watermark,会让老 Watermark 被新输入拖住。例如 WM1 到达后又来了 A2B2,全局计数器会把这些后来输入也算进去,导致 WM1 等待本不属于它的工作。真正要解决的是:每条未完成输入应该只挡住它之前的那条时间边界。

SegmentedQueue:底层的物理隔离

到达时间轴:业务看到的流

落入

落入

切出新段

放入新段

触发清空

满足放行条件

新输入排队不影响老 WM 放行

A1

B1

WM 1

A2

老段 A1 和 B1

被拦截的 WM 1

新段 A2

长耗时任务彻底完成

为了使每条 Watermark 只等待它之前的输入,系统引入了 SegmentedQueue。它不是按 task 记账,而是按“还没完成的输入记录”记账:新输入到来时,把当前 key 记到尾段;Watermark 到来时,先保存这个 Watermark,再切出一个新段;某条输入的整轮处理结束后,再从队头找到这个 key 最早所在的段并扣减一次。只要队头段清空,对应的 Watermark 就能放行。SegmentedQueue.java#L25-L87 这样一来,WM1 之后新来的 A2 会进入新段,不会继续拖住只该等待老段的 WM1

单纯切段还不够,因为同一个 key 可能同时出现在多个时间边界里。KeySegmentMap<Object, Integer> 记录的,是该段里每个 key 对应的未完成输入条数,而不是“这个 key 现在有没有任务”。KeySegment.java#L23-L58 比如 A1WM1 前到达、A2WM1 后到达,而 A1 还没完成,那么 key A 会同时出现在两个段里。此时如果 A1 先完成,系统只能对老段扣减一次,不能把新段里的 A2 也一起消掉。所以完成路径必须从队头开始找第一个包含该 key 的段,命中后立刻停止。SegmentedQueue.java#L48-L62

SegmentedQueue 记录的是运行时对齐结构,而不是业务状态本身,所以它没有直接进入 checkpoint。恢复后,算子也不会试图按故障前的形状精确还原整条分段队列,而是先根据 currentProcessingKeysOpStatependingInputEventsKState 找回仍然活跃的 key,再继续处理后续输入和新的 WatermarkActionExecutionOperator.java#L590-L601 也就是说,恢复真正依赖的是“哪些输入还没完成”这类稳定状态,分段队列本身只是围绕这些状态临时搭起来的运行时结构。

可以看一个小例子:故障前,队列里有 A1,后面拦着 WM1,接着又放进了 A2。这时机器崩溃了。因为 SegmentedQueue 没有进入 checkpoint,还没被放行的 WM1 会在崩溃中丢失。恢复后,系统不会把“老段 + WM1 + 新段”这个形状原样摆回去,而是根据稳定状态发现 A1A2 都还在处理中,把它们一并放进一个新的初始段里。
这两者的差别在于:故障前,A2 是在 WM1 之后到达的,所以 A2 没做完并不影响 WM1 放行;但恢复后,WM1 丢了,A1A2 被合并到了同一段,如果此时新来了一条 WM2,这条 WM2 就必须等 A1A2 全部完成才能放行。也就是说,恢复虽然没有破坏“必须等任务做完”的时间约束,但因为丢失了原有的精细分段边界,时间约束变粗了,可能导致恢复初期新的 Watermark 被不必要地拖慢。恢复的目标不是原样还原旧队列,而是以合并后的保守代价继续保证语义不出错。

这种做法的代价也很直接。只要老段里还有未完成输入,后续 Watermark 就必须继续等;而某条输入完成时,系统还要从队头开始找到它属于哪个段,时间开销会随积压段数上升。内存开销也不再只是单次调用里的局部变量,而是这些活跃段和段内按 key 保存的引用计数。

不过被拖慢的是事件时间推进,不是整个算子的控制流。Watermark 和 Checkpoint Barrier 仍是两条不同的控制线:老段没清空时,Watermark 会被拦住;但主线程在外部等待期间仍会通过 Mailbox 让出执行权,所以 Barrier 还能继续推进。

到这里,时间边界已经被单独兜住了。但运行时边界还没结束。只要执行体跨出 JVM 主引擎,前面建立的挂起、恢复和调度语义仍可能在跨语言交接处被破坏。下一章转向 Python 执行链路,讨论这条边界如何继续成立。

第 9 章 Python:跨语言执行链路

前一章处理的是时间边界:长耗时执行尚未完成时,Watermark 不能提前放行。这里还要处理语言边界:Agent 代码常写在 Python 里,而 Flink 的主调度运行在 JVM 里。问题不是把 Python 函数翻译成 Java 代码,而是让 Java 调度能够找到 Python 函数、启动 Python 环境,并在 Python 协程挂起后继续接管调度。

AgentPlan 只描述控制关系,不携带 Python 函数体。Python 侧构造 plan 时,不会编译 action 代码,而是把 action 包装成引用,保留 modulequalname 等定位信息。agent_plan.py#L215-L249 进入 AgentPlan 的只是“到哪里找这个 Python 函数”。

Java 侧反序列化 plan 时也只恢复引用。ActionJsonDeserializer 根据 func_type 创建 JavaFunctionPythonFunction;对于 Python 函数,只恢复 modulequalNameActionJsonDeserializer.java#L57-L67ActionJsonDeserializer.java#L98-L101 真正执行时,PythonFunction.call(...) 才通过 interpreter.invoke(...) 回到 Python 解释器。PythonFunction.java#L42-L49

Python: Interpreter / Event Loop 跨语言桥接 Pemja Java: ActionExecutionOperator Python: Interpreter / Event Loop 跨语言桥接 Pemja Java: ActionExecutionOperator 1. 控制平面:只传递引用,不翻译代码 2. 环境重建与初始化 3. 执行平面:协程挂起与引用交接 4. 回归调度主线 5. 下次 Mailbox 唤醒续跑 解析 AgentPlan 获取 module 和 qualname PythonEnvironmentManager 拉起独立进程与依赖 executePythonFunction 引用和参数 interpreter invoke 业务执行到 await 挂起 返回 awaitable 协程对象 包装为 python_awaitable_1 引用 生成 PythonGeneratorActionTask 持有该 awaitable 引用 任务入队,主线程让出 callPythonAwaitable 推进协程 推动协程继续执行

为了让这些引用在 TaskManager 上能被解析,PythonEnvironmentManager 会在远端节点准备 Python 环境。它根据提交时携带的依赖信息,配置解释器、PYTHONPATHPYTHONHOME 和工作目录。PythonEnvironmentManager.java#L35-L83

运行前,PythonActionExecutor.open() 会把桥接代码和 RunnerContext 注入解释器。PythonActionExecutor.java#L93-L111 因此,Python 代码访问 context.memory 时,实际会回到 Java 侧的 RunnerContextImpl 和 Flink 状态存储。

执行 async def action 时,第一次调用不一定拿到最终结果。executePythonFunction(...) 如果收到 awaitable,不会立刻把它跑完,而是先给它生成一个名字,例如 python_awaitable_1,再用 interpreter.set(...) 把这个对象挂到解释器命名空间里,防止对象被回收。PythonActionExecutor.java#L123-L149 所以后面真正被保住的是解释器里的 awaitable 对象,Java 侧保存的只是它的句柄。续跑时,callPythonAwaitable(...) 会先用 interpreter.get(...) 通过这个名字取回原对象,再继续推进。PythonActionExecutor.java#L176-L185 这层保活只在当前解释器进程还活着时成立;一旦发生 checkpoint 恢复,旧解释器里的对象已经不存在,就不能靠原句柄续跑,只能回到更上层的恢复逻辑。

后续推进由 PythonGeneratorActionTask 承接。它持有 pythonAwaitableRef,下次被 Java 调度到时,再调用 executor.callPythonAwaitable(...) 推动 Python 协程继续执行。PythonActionExecutor.java#L176-L185 所以主调度仍在 Java 的 ActionExecutionOperator 中,业务现场则留在 Python 解释器里。

这套机制保住了同一条调度主线,但代价也很直接:环境要重建,桥接调用有开销,解释器内对象需要保活,问题排查还要同时看 Java 和 Python。执行路径已经跨过线程、状态、副作用、时间和语言边界,不能只靠功能跑通来判断设计成立;下一章转向验证,检查这些约束是否真的被固定下来。

第 10 章 正确性:测试与验证

前几章已经引入主线程让出、任务恢复、副作用控制、事件时间处理和跨语言执行。机制齐全不等于语义正确,系统还要证明这些机制组合后不会互相破坏。

只测单个功能是否能跑通还不够。Mailbox 能调度、ActionTask 能恢复、ActionStateStore 能缓存结果,并不等于恢复重放、跨语言调用和 Watermark 推进都正确。常见错误包括:外部请求被重复调用、Watermark 提前穿透、owner subtask 恢复后任务被其他 subtask 接管。

测试要固定的是运行时约束:同 key 是否串行,恢复后是否回到正确 subtask,外部调用是否避免重做,Watermark 是否只在该等的输入完成后推进。测试分层的目的,是把这些约束写进可重复运行的代码里,而不是依赖人工观察一次结果。

验证分层结构如下:

脚本化验证链

防止人工验证退化

真实环境集成

验证接入面

验证外部依赖

受控运行时语义

固定核心约束

Harness 测试

单个算子边界

Flink 体系兼容测试

真实外部调用测试

tools e2e

这套分层借鉴了 Flink 官方测试基建的边界:内层用 Harness 固定最小运行单元,外层再验证真实入口、真实依赖和脚本化执行链。能在内存内精确验证的约束,不应放到慢速 e2e;只有真实入口或真实外部依赖才能暴露的问题,才放到外层。

KeyedOneInputStreamOperatorTestHarness 会把 Operator 放进带有生命周期和状态语义的受控环境。ActionExecutionOperatorTest.java#L76-L90 在这一层,测试验证的不再是普通 Java 类的返回值,而是算子在 Flink 调度模型里的真实推进。

Harness 层先检查执行主线:同 key 必须串行推进,恢复后的任务必须回到 owner subtask。ActionExecutionOperatorTest.java#L205-L230 只有这条主线成立,后面的恢复和重放才有意义。

恢复语义测试防止“内部状态恢复了,外部调用却又执行一遍”。测试先执行真实外部路径并写入状态,再在新的 Harness 中复用状态重放,断言计数器保持为 1,证明恢复阶段跳过了已完成的外部调用。ActionExecutionOperatorTest.java#L728-L785

异常路径也必须进入恢复测试。若系统只缓存成功结果,恢复时又重新执行失败调用,错误分支就会失真;测试要求缓存的异常能原样重放,且不会重新调用 supplier。ActionExecutionOperatorTest.java#L823-L867

时间边界测试检查 Watermark 是否提前穿透。测试交替送入数据和 Watermark,检查输出队列是否单调推进,确保当前执行没有完成之前,时间进度不会提前越过去。ActionExecutionOperatorTest.java#L568-L609

Harness 终究还是内存内的受控环境。走出单个算子之后,系统还要验证接入面:用户从真实 DataStreamTable 入口进入后,数据能否穿过 Agent Runtime,再回到 Flink 的普通下游处理。FlinkIntegrationTest 覆盖 DataStream -> DataStreamTable -> TableDataStream -> Table 三条链路,用来发现 API 接入、类型转换和计划编译问题。FlinkIntegrationTest.java#L68-L181

接入面测试不承担所有语义验证。它只证明运行时可以放进宿主系统,并且输出还能继续组合;恢复、重放和时间边界仍由 Harness 用例断言。

真实外部调用则由 EmbeddingIntegrationTestVectorStoreIntegrationTest 覆盖。它们会拉起模型服务和向量库后端,用来发现配置错误、网络不通或鉴权失败。EmbeddingIntegrationTest.java#L50-L97 VectorStoreIntegrationTest.java#L50-L112

这些测试最后由脚本收束成一条可重复的验证链。e2e.sh#L99-L164 脚本不只是执行测试,还把构建产物检查、Python 环境、临时目录、Flink 版本和跨语言资源检查放到同一个入口,减少人工验证留下的差异。

这套分层方法可以迁移到其他后端系统,但迁移的不是测试名称,而是测试边界:组件 Harness 固定最小执行单元,Task Harness 固定调度边界,本地多实例环境验证快照恢复和归属迁移,e2e runner 统一部署、执行、证据采集和清理。

断言也要跟着迁移。不变量比最终输出更有价值:调用计数不增加,可以发现外部副作用重做;状态归属可计算,可以发现恢复后落错实例;时间边界不穿透,可以发现异步结果破坏事件时间。

分层验证把主线程让出、任务恢复、副作用控制和时间推进这些约束写进测试。代价是测试体系变重:需要 Harness 构造、状态重放场景和真实环境集成测试。机制构建和验证完成后,最后需要回到整体设计,判断它解决了什么,又引入了哪些代价。

第 11 章 结语:这套设计解决了什么,还留下了什么

将 Agent 放进流式系统,难点不只是模型能力或提示词,而是执行链会变长、会挂起、会恢复,还会触碰外部系统。本文讨论的核心问题,是如何让这条执行链在 Flink 里继续可调度、可恢复、可验证。

这些机制解决的是同一个问题的不同侧面。主线程让出保证算子不会被长调用卡死;ActionTask 记录任务下次从哪里继续;RunnerContext 和记忆分层把状态访问收回到运行时接口里;ActionStateStore 缩小外部调用重复执行的窗口;SegmentedQueue 防止 Watermark 提前推进;跨语言链路让 Python 代码仍然服从 Java 主调度。

这套设计同时留下了三类代价。

首先是能力边界。当前 AgentPlan 偏静态,运行中不能随意热更新完整的 Agent 能力图谱,对高度动态的任务编排支持有限。

其次是一致性边界。外部大模型或服务不受 Flink checkpoint 直接控制,系统无法承诺严格的 external exactly-once。当前实现只能通过 ActionStateStore 更早记录结果,尽量减少恢复后的重复执行窗口。

最后是运行时代价。为了保证事件时间语义,长耗时任务积压时,Watermark 仍然会被阻塞。跨语言执行还会带来环境装配、桥接调用、对象保活和调试成本。

因此,这套设计没有消除调度、恢复、副作用和时间阻塞这些问题,而是把它们放到可以管理的位置。评估其他流式 Agent 方案时,也应先看四件事:主执行线程会不会被长调用卡死;恢复后是否会重复修改外部系统;Watermark 是否会提前推进;跨语言调用是否绕开了原有调度。

模块和接口会变化,但这些运行时问题不会消失。越早明确边界和代价,越容易判断一个流式 Agent 方案是在解决问题,还是把问题藏到了恢复、时间或外部副作用里。

附录 A 运行时源码地图

这一附录不试图把 runtime/ 目录完整展开,而是回答一个更具体的问题:正文读完之后,如果想回到源码核对主链路,应该先看哪里,再看哪里。

正文已经把 Flink Agent 的问题链立住了:执行为什么不能直接阻塞,恢复为什么不能只靠 Checkpoint,副作用为什么要额外补一层一致性,事件时间和 Python 为什么会把边界继续推硬。附录 A 不重复这条论证,只负责把这些判断重新压回源码入口。

换句话说,这一篇不是“目录说明书”,而是一张阅读导航图。目标不是覆盖每一个子目录,而是让你在回到实现时,知道主链路在哪里,核心模块各自守哪条边界,以及推荐的下潜顺序是什么。

运行时主链路

如果只抓一条主线,runtime/ 的中心可以压成下面这段话:CompileUtilsDataStream 接到 ActionExecutionOperatorActionExecutionOperator 把输入事件推进成 action 执行闭环;action 不直接整段跑完,而是拆成可续跑的 ActionTaskRunnerContext 再把 memory、durable execution、resource 和指标装配进每次执行。

  • 接入入口:CompileUtilsActionExecutionOperatorFactory
  • 执行核心:ActionExecutionOperator
  • 任务切分:ActionTask
  • 执行上下文:RunnerContextImpl

如果你只想确认“Flink Agent 在代码里到底怎么跑起来”,先沿着这四个点读,基本就能把主循环、调度和恢复壳看清楚。

核心模块导航

operator/

这里是运行时主引擎。最重要的不是“它是一个算子”这个事实,而是它把事件推进、action 触发、任务续跑、状态刷回和输出下发收进了同一条执行链。正文里关于主循环、第 3 章关于 Mailbox、第 4 章关于 ActionTask 的分析,最后都要回到这一层核对。

context/

这里负责把用户 action 能碰到的运行时能力统一成 RunnerContext。它不是单纯的工具集合,而是把 memory、resource、durable execution 和指标放进同一入口的装配层。第 5 章和第 6 章的很多判断,最终都要在这里和 memory/ 一起落回实现。

actionstate/

这里是外部副作用一致性的关键补丁。只靠 ActionTask,系统只能知道“下次继续谁”;只靠 durable execution,系统只能复用任务内部某些调用结果;而 actionstate/ 额外记录的是 action 级完成状态、内存更新和输出事件,用来在恢复后跳过已完成动作并回放结果。第 7 章的主问题,主要就落在这里。

memory/

这里对应的是分层记忆背后的物理结构。短期记忆为什么要打平成树状目录,长期记忆为什么会走向外置和压缩,不是抽象口号,而是这里的数据结构和访问方式决定的。第 6 章读完后回头看这一层,会比先看 API 更容易抓住重点。

python/env/

这一层回答的是跨语言执行链怎么接回 JVM 主运行时。env/ 负责准备 Python 运行环境,python/ 负责 action 执行桥接、事件包装和 awaitable 续跑。第 9 章真正要核对的,不是“Python 能不能调起来”,而是跨语言之后执行边界怎样重新接回主链路。

operator/queue/

这里的重点不是普通队列,而是 SegmentedQueue 如何把未完成 key 和 Watermark 推进绑在一起。第 8 章讲的不是一个局部优化,而是事件时间边界在 Flink Agent 场景下为什么会变硬;这部分源码的阅读入口主要就在这里。

补充模块

除了上面几块主模块,eventlog/metrics/feedback/logger/common/queue/utils/message/ 也都提供了必要能力,但它们对理解全书主线不是第一优先级。

  • eventlog/ 偏调试和审计。
  • metrics/ 偏运行时观测。
  • feedback/logger/ 更接近扩展基础设施。
  • common/queue/utils/ 偏工程工具箱。
  • message/ 定义通用消息载体。

这些目录适合在主链路已经看清以后,再按具体问题回头补读。

推荐阅读顺序

如果你希望把正文里的判断重新压回源码,推荐按下面顺序看:

  1. 先看 CompileUtilsActionExecutionOperatorFactory,确认 Flink Agent 是怎样从 DataStream 接到运行时算子的。
  2. 再看 ActionExecutionOperator,抓住事件进入后如何路由、执行、续跑和产出输出。
  3. 接着看 ActionTask,把“半截执行为什么必须变成可调度对象”这一点和第 4 章对上。
  4. 然后看 RunnerContextImpl,确认 memory、durable execution 和资源装配到底是怎样进入 action 的。
  5. 再回看 memory/,把第 6 章的分层记忆与物理结构对齐。
  6. 再看 actionstate/,确认副作用一致性是怎样在 action 级被补齐的。
  7. 如果关心事件时间,再看 operator/queue/SegmentedQueue
  8. 如果关心跨语言,再看 env/python/

这个顺序本质上沿着和正文相同的主线推进:先看执行,再看恢复,再看副作用与时间,最后再看跨语言和补充能力。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐