Apache Flink 2.1版本在8月中旬正式发布,标志着实时数据处理引擎向统一Data + AI平台的里程碑式演进。

其中很重要的一个能力是关于Flink在实时AI能力上的突破:

  • 新增AI模型DDL,支持通过Flink SQL与Table API创建和修改AI模型,实现AI模型的灵活管理。

  • 扩展ML_PREDICT表值函数,支持通过Flink SQL实时调用AI模型,为构建端到端实时AI工作流奠定基础。

我们今天来看一下这个功能,如何使用呢?

首先,Flink允许我们使用CREATE MODEL语法创建一个模型:

CREATE MODEL `compliance_model`
INPUT (text STRING)
OUTPUT (response STRING)
WITH(
'provider'='openai',
'endpoint'='https://api.openai.com/v1/llm/v1/chat',
'api-key'='abcdefg',
'system_prompt' = '你是电商合规审核员,请判断商品标题是否含有酒精、烟草等敏感内容,仅返回JSON:{"risk":0.0~1.0,"reason":"原因"}',
'model'='gpt-4o'
);

然后,通过ML_PREDICT函数进行实时推理:

SELECT * FROM ML_PREDICT(
  INPUT => TABLE input_table,
  MODEL => MODEL my_model,
  ARGS => DESCRIPTOR(text),
  CONFIG => MAP['async', 'true']
);

其中INPUT代表我们的输入数据,一般是类似下面这样的source table:

CREATE TABLE product_source (
  id   STRING,
  title STRING,
  ts   TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic'     = 'product_source',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'    = 'json'
);

然后我们定义结果表:

CREATE TABLE risk_sink (
  id      STRING,
  title   STRING,
  risk    DOUBLE,
  reason  STRING
) WITH (
  'connector' = 'kafka',
  'topic'     = 'risk_sink',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'    = 'json'
);

然后我们就可以通过一条SQL启动作业:

INSERT INTO risk_sink
SELECT
  id,
  title,
  CAST(JSON_VALUE(response,'$.risk') AS DOUBLE)  AS risk,
  JSON_VALUE(response,'$.reason')                AS reason
FROM (
  SELECT
    id,
    title,
    ML_PREDICT(compliance_model, title) AS response
  FROM product_source
) t;

假如我们的输入数据为:

kafka-console-producer.sh --broker-list localhost:9092 --topic product_source
>{"id":"1","title":"葡萄汁无酒精"}
>{"id":"2","title":"天然香草提取物"}   # LLM 会识别其含酒精
>{"id":"3","title":"茅台飞天53度"}     # 高风险

输出结果数据:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic risk_sink --from-beginning

>{"id":"2","title":"天然香草提取物","risk":0.92,"reason":"香草提取物通常含酒精"}
>{"id":"3","title":"茅台飞天53度","risk":0.99,"reason":"明确含酒精饮品"}

根据官方的文档,Flink对大模型的调用支持异步访问,并且默认打开。

在资源规划上,可以参考Little定律进行资源规划:

  • L:队列槽位(对应max-concurrent-operations)

  • λ:请求速率(对应预期的QPS)

  • W:平均延迟(对应模型的响应时间)

例如:对于目标100QPS和1.2秒的99百分位延迟,我们需要120个最大并发请求(max-concurrent-operations)。此外,考虑到队列长度和平均行大小,我们需要更多关注 TaskManager 中的内存设置。适当的调优可能显著提升运行AI函数的吞吐量和稳定性。

此外,Flink 2.1的ML框架已经原生支持「Embedding→向量存储→向量检索→LLM」的RAG链路,我们后面再单独分享。

好啦,本次分享就到这里。

图片

最后,欢迎加入我们的知识星球小圈子:
 《300万字!全网最全大数据学习面试社区等你来》

   如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

图片

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐