本章是整条流水线的「大脑」,分两层讲:

  • 客户端层 AiClient:把大模型的鉴权、调用、回调接收、异步、文件抽取、JSON 清洗全部收口,对上只暴露几个干净方法。
  • 编排层 AiPdfExtractionServiceImpl:把「切分(第3章)+ OCR识别JSON(第4章)+ 提示词(第5章)+ 调用」编排成「PDF → 结构化 JSON」,并解决并行、重试、合并、失败隔离。

下文实现以 GLM(zai-sdk)为例,换其它模型只需替换客户端实现,编排层不动。

6.1 AiClient:统一「JSON 输出 + 关闭思考」

6.1.1 懒加载客户端

SDK 客户端用 double-checked locking 懒加载,保证全局单例、线程安全,且只有真正调用时才初始化(启动期不依赖密钥就绪):

private volatile ai.z.openapi.ZhipuAiClient sdkClient;   // volatile 保证可见性

private ai.z.openapi.ZhipuAiClient client() {
    if (sdkClient == null) {
        synchronized (this) {
            if (sdkClient == null) {
                if (StringUtils.isEmpty(properties.getApiKey())) {
                    throw new ServiceException("智谱AI调用失败:未配置 apiKey");
                }
                int sec = Math.max(60, properties.getTimeout() / 1000);   // 下限 60s
                // 请求/连接/读/写 四个超时统一用大值:大表输出长,读超时最容易踩
                sdkClient = ai.z.openapi.ZhipuAiClient.builder()
                        .ofZHIPU()
                        .apiKey(properties.getApiKey())
                        .networkConfig(sec, sec, sec, sec, TimeUnit.SECONDS)
                        .build();
                log.info("智谱AI SDK 客户端已初始化,model={}, timeout={}s",
                        properties.getModel(), sec);
            }
        }
    }
    return sdkClient;
}

细节:

  • volatile 不能省:没有它,另一个线程可能读到「构造一半」的对象引用。
  • 超时下限 60stimeout 配小了也至少给 60 秒,避免大表一调就超时。
  • 四个超时全放大:连接快但慢是大模型调用的典型特征(要等它生成完),四项统一放大最省心。

    6.1.2 请求参数 buildParams

    每次调用都构造「系统 + 用户」两条消息,并固定几个关键开关:

    private ChatCompletionCreateParams buildParams(String systemPrompt, String userContent, boolean stream) {
        ChatMessage system = ChatMessage.builder()
                .role(ChatMessageRole.SYSTEM.value()).content(systemPrompt).build();
        ChatMessage user = ChatMessage.builder()
                .role(ChatMessageRole.USER.value()).content(userContent).build();
        return ChatCompletionCreateParams.builder()
                .model(properties.getModel())
                .messages(Arrays.asList(system, user))
                .stream(stream)
                .temperature((float) properties.getTemperature())                 // 默认 0.1
                .maxTokens(properties.getMaxTokens() > 0 ? properties.getMaxTokens() : null)
                .responseFormat(new ResponseFormat("json_object"))                // 强制 JSON
                .thinking(new ChatThinking(ChatThinkingType.DISABLED.value()))    // 关闭思考
                .build();
    }
参数 取值 为什么
temperature 0.1(低温) 结构化抽取要稳定可复现,不要创造性
maxTokens 16384 大表 JSON 很长,给小了会被截断(finish_reason=length
responseFormat json_object 让模型直接产出 JSON,少一层文本清洗
thinking DISABLED 抽取是「照搬」不是「推理」,关掉思考省 token、更快
model glm-5.1 要更准切 glm-5.1,新模型理解能力强

6.1.3 二种调用形态

AiClient 对外提供二个入口,对应不同使用场景:

chatForJson / chatForJsonResult 同步 + 回调接收 批量抽取编排器 runChunk 用,带重试
submitAsync 异步提交 单表「调AI」按钮,立即返回 taskId,不阻塞页面
queryAsync 异步查询 定时任务轮询,识别 PROCESSING/SUCCESS/FAIL

6.1.4 回调接口接收输出(替代旧的阻塞流式)

早期用 RxJava Flowable.blockingForEach 阻塞式流式读取,缺点是独占当前线程、与抽取线程池并行时不友好。现已改为注册回调接收模型输出:SDK 收到内容(增量片段或一次性结果)时回调,由回调累积内容、记录 finish_reason 与 token 用量,doChat 在回调结束后拿到完整 JSON。

// 回调处理器:累积内容 + 记录用量/截断(接口名以实际 SDK 回调为准)
StringBuilder sb = new StringBuilder();
Usage[] usageHolder = {null};
String[] finish = {null};

client().chat().createChatCompletion(request, new ChatStreamCallback() {
    @Override public void onMessage(ModelData md) {            // 收到内容片段/结果
        if (md.getUsage() != null) usageHolder[0] = md.getUsage();   // 用量通常在末尾给
        if (md.getChoices() == null || md.getChoices().isEmpty()) return;
        Choice c = md.getChoices().get(0);
        if (c.getDelta() != null && c.getDelta().getContent() != null) {
            sb.append(c.getDelta().getContent());             // 累积,不再 System.out.print
        }
        if (c.getFinishReason() != null) finish[0] = c.getFinishReason();
    }
    @Override public void onError(Throwable t) {              // 出错统一转业务异常
        throw new ServiceException("智谱AI回调异常:" + t.getMessage());
    }
    @Override public void onComplete() { /* 收尾,可选 */ }
});

// 截断检测:JSON 不完整时尽早失败,触发上层重试或提示调大 max-tokens
if ("length".equalsIgnoreCase(finish[0]))
    throw new ServiceException("AI输出超长被截断(finish_reason=length),请调大 max-tokens、或对该长表启用分块");

String content = stripJsonFence(sb.toString());              // 完整内容交给下游

回调式相比阻塞流式的好处:

  • 不占用调用线程:回调由 SDK 的 IO 线程驱动,抽取线程池(6.3 节)能跑满并发。
  • 结构一致:无论增量片段还是一次性结果,都在 onMessage 里统一累积,doChat 逻辑不变。
  • 错误集中onError 统一转 ServiceException,被 6.1.3 的重试循环接住。

ChatStreamCallback / onMessage / onError / onComplete 为示意名,请替换为你实际使用的回调接口与方法签名。finish_reason == length 的截断判断务必保留——它是「JSON 被切一半」最常见的根因。

6.1.5 异步提交与查询(给前端用)

页面点「调AI」不能干等,于是走异步任务:提交后立即拿 taskId 返回,结果由定时任务轮询(详见第 7 章)。

public String submitAsync(String systemPrompt, String userContent, String label) {
    ChatCompletionCreateParams request = buildParams(systemPrompt, userContent, false);
    ChatCompletionResponse response = client().chat().asyncChatCompletion(request);
    if (response == null)      throw new ServiceException("智谱AI异步提交返回为空");
    if (!response.isSuccess()) throw new ServiceException("智谱AI异步提交失败:code="
            + response.getCode() + ", msg=" + response.getMsg());
    String taskId = response.getData() == null ? null : response.getData().getId();
    if (StringUtils.isEmpty(taskId)) throw new ServiceException("智谱AI异步提交未返回任务ID");
    return taskId;
}

查询 queryAsync 把 SDK 的任务状态映射成自己的 AsyncChatResultPROCESSING/SUCCESS/FAIL),并在「成功」分支里做完整校验:

public AsyncChatResult queryAsync(String taskId) {
    QueryModelResultResponse response = client().chat().retrieveAsyncResult(
            AsyncResultRetrieveParams.builder().taskId(taskId).build());
    // …response 空/失败校验略…
    ModelData data = response.getData();
    TaskStatus status = data == null ? null : data.getTaskStatus();

    AsyncChatResult result = new AsyncChatResult();
    if (status == null)                 { result.setStatus("PROCESSING"); return result; } // 状态未知按处理中
    result.setStatus(status.name());
    if (status == TaskStatus.PROCESSING) return result;                                    // 等下次轮询
    if (status == TaskStatus.FAIL)      { result.setError("智谱返回任务失败(FAIL)"); return result; }

    // SUCCESS:记 token 用量
    if (data.getUsage() != null) { /* set prompt/completion/total tokens */ }
    if (data.getChoices() == null || data.getChoices().isEmpty()) {       // 成功却无内容 → 判失败
        result.setStatus("FAIL"); result.setError("任务成功但无 choices"); return result;
    }
    Choice choice = data.getChoices().get(0);
    if ("length".equalsIgnoreCase(choice.getFinishReason())) {            // 截断 → 判失败
        result.setStatus("FAIL"); result.setError("AI输出超长被截断(finish_reason=length)…"); return result;
    }
    String text = choice.getMessage() == null ? null : String.valueOf(choice.getMessage().getContent());
    if (StringUtils.isEmpty(text)) { result.setStatus("FAIL"); result.setError("content 为空"); return result; }
    result.setContent(stripJsonFence(text));
    return result;
}

关键点:「成功」不等于「拿到可用 JSON」。无 choices、被截断、content 为空,都要在查询时降级为 FAIL,否则脏数据会流到入库环节。

6.1.7 去代码块包裹 stripJsonFence

即使要求了 json_object,模型偶尔仍会用 ```json … ``` 把结果包起来。统一剥掉首尾围栏再交给 JSON.parse,否则会解析失败:

private String stripJsonFence(String content) {
    if (StringUtils.isEmpty(content)) return content;
    String text = content.trim();
    if (text.startsWith("```")) {
        int nl = text.indexOf('\n');
        if (nl > 0) text = text.substring(nl + 1);        // 去掉首行 ```json
        if (text.endsWith("```")) text = text.substring(0, text.length() - 3);
    }
    return text.trim();
}

6.2 抽取总编排 AiPdfExtractionServiceImpl

这是「PDF → 结构化 JSON」的纯抽取编排器(不落库)。主流程 doExtract

slice 切表  →  对每段调百度OCR识别成 JSON  →  构建任务(buildChunkTasks)
            →  线程池并行调 AI(runChunk)  →  按表合并(mergeSection)  →  AiExtractionResult
private AiExtractionResult doExtract(byte[] pdfBytes, String fileName, String[] titleKeywords) {
    long start = System.currentTimeMillis();

    List<PdfSection> sections = pdfTableSlicer.slice(pdfBytes);          // ① 按表切分(第3章)
    if (titleKeywords != null && titleKeywords.length > 0)               // 仅抽指定标题
        sections = sections.stream().filter(s -> matchAnyKeyword(s.getTitle(), titleKeywords)).toList();
    if (sections.isEmpty()) throw new ServiceException("未在 PDF 中识别到任何「表N」表格");

    List<ChunkTask> tasks = properties.isUseFile()
            ? buildFileTasks(sections, pdfBytes)                         // ②a 文件抽取(可选)
            : buildChunkTasks(sections, ocrJsonOf(sections));           // ②b 主线:OCR JSON 投喂

    List<ChunkResult> results = runInParallel(tasks);                    // ③ 并行(6.3)

    List<TableExtraction> tables = new ArrayList<>();                    // ④ 按表合并(6.4)
    for (int si = 0; si < sections.size(); si++)
        tables.add(mergeSection(sections.get(si), si, results));

    AiExtractionResult result = new AiExtractionResult();
    result.setFileName(fileName);
    result.setTables(tables);
    result.setTableCount(tables.size());
    result.setSuccessCount((int) tables.stream().filter(t -> SUCCESS.equals(t.getStatus())).count());
    result.setElapsedMs(System.currentTimeMillis() - start);
    return result;
}

analyze()(只切分、不调 AI,用于前端快速预览有哪些表)、extract()(全量)、extractByTitles()(按标题)都收敛到 doExtract

6.2.1 系统提示词 SYSTEM_PROMPT

所有分块共用同一套铁律(7 条,详见第 5 章),核心是「逐字保留、字段对齐、合并单元格向下填充、跨页折行合并、只输出 JSON」。它定义为常量,与按表的字段定义(buildSchemaBlock)拼在一起构成完整提示。

6.2.2 构建任务 buildChunkTasks

主线输入是该表的百度 OCR 识别结果 JSON,按表大小分两种:

  1. 整表一次投喂(默认):把整表 OCR JSON + 字段定义拼进提示词,一次调用,保留完整表头上下文,准确率最高。
  2. 超大表分块兜底:仅当整表文本超过 chunkCharThreshold 时,才按 chunkPageSize 把内容拆块并行、再合并。每个非首块附带「表头参考」,避免分块后丢失列名:
    if (begin > 0) {
        text = "【表头参考(仅用于确定列名,不要把本段当作数据行)】\n" + headerContext
             + "\n【待抽取数据】\n" + text;
    }

默认 chunkCharThreshold = Integer.MAX_VALUE,即「整表一次请求、不分块」;分块只是超大表的安全阀。

每个任务封装成内部类 ChunkTask(sectionIndex, title, text)——sectionIndex 记录它属于哪张表,合并时按它归位。

6.2.3 文件抽取任务 buildFileTasks

useFile 路径:每段先用 PdfSplitter 导出独立 PDF(第3章),上传给模型解析(6.1.6),再让模型基于解析内容输出 JSON。临时 PDF 用完即删(Files.deleteIfExists);某段准备失败也会生成一个「空内容任务」,让 runChunk 统一记为 Failed,不中断其它表。

6.3 并行调度与失败隔离 runChunk

所有表的所有分块汇总成一个 List<ChunkTask>,用固定线程池(concurrency,默认 4)统一并行:

int poolSize = Math.max(1, Math.min(properties.getConcurrency(), tasks.size()));
ExecutorService pool = Executors.newFixedThreadPool(poolSize);
try {
    List<CompletableFuture<ChunkResult>> futures = tasks.stream()
            .map(t -> CompletableFuture.supplyAsync(() -> runChunk(t), pool))
            .toList();
    return futures.stream().map(CompletableFuture::join).toList();
} finally {
    shutdown(pool);   // shutdown + awaitTermination(timeout×3+5s),超时则 shutdownNow
}

runChunk 内部再套一层重试,且失败不抛出,而是返回带 error 的 ChunkResult,保证单块失败不拖垮整批:

private ChunkResult runChunk(ChunkTask task) {
    int attempts = Math.max(1, properties.getMaxRetries() + 1);
    Exception last = null;
    for (int i = 0; i < attempts; i++) {
        try {
            String json = aiClient.chatForJson(SYSTEM_PROMPT, task.text, task.title);
            if (StringUtils.isBlank(json)) throw new ServiceException("AI 返回内容为空");
            Object data = JSON.parse(json);                       // 解析校验
            if (data == null) throw new ServiceException("AI 返回无法解析为 JSON:" + abbreviate(json));
            return new ChunkResult(task.sectionIndex, data, null);// 成功
        } catch (Exception e) {
            last = e;
            log.warn("表格[{}]第{}/{}次抽取失败", task.title, i + 1, attempts, e);
        }
    }
    return new ChunkResult(task.sectionIndex, null, describe(last)); // 失败也返回,不抛
}
注意这里有两层重试chatForJson 内部对「网络/接口错误」重试(6.1.3),runChunk 对「内容不合法(空、非JSON)」再重试一次。前者管「没调通」,后者管「调通了但产出不可用」。

6.4 结果合并 mergeData

一张表可能被拆成多块,多块结果要按类型智能合并。mergeSection 先把同一 sectionIndex 的结果归拢,再交 mergeData

private TableExtraction mergeSection(PdfSection section, int sectionIndex, List<ChunkResult> results) {
    List<Object> datas = new ArrayList<>();
    List<String> errors = new ArrayList<>();
    for (ChunkResult r : results) {
        if (r.sectionIndex != sectionIndex) continue;     // 只收本表的块
        if (r.data != null) datas.add(r.data); else if (r.error != null) errors.add(r.error);
    }
    TableExtraction te = new TableExtraction();
    te.setTitle(section.getTitle()); te.setStartPage(section.getStartPage()); te.setEndPage(section.getEndPage());
    if (datas.isEmpty()) {                                 // 全失败
        te.setStatus(FAILED); te.setError(String.join("; ", errors)); return te;
    }
    te.setData(mergeData(datas));                          // 有成功 → 合并
    te.setStatus(SUCCESS);
    if (!errors.isEmpty()) te.setError("部分分块失败:" + String.join("; ", errors)); // 部分失败也保留
    return te;
}

mergeData 按数据形态分四类处理:

private Object mergeData(List<Object> datas) {
    if (datas.size() == 1) return datas.get(0);           // 不分块的常见情况,直接返回

    if (datas.stream().allMatch(this::isRowsObject)) {    // ① 明细表 {columns, rows}
        JSONObject merged = new JSONObject(); JSONArray rows = new JSONArray(); JSONArray columns = null;
        for (Object d : datas) {
            JSONObject o = (JSONObject) d;
            if (columns == null && o.getJSONArray("columns") != null) columns = o.getJSONArray("columns");
            if (o.getJSONArray("rows") != null) rows.addAll(o.getJSONArray("rows"));  // 拼接所有行
        }
        if (columns != null) merged.put("columns", columns);
        merged.put("rows", rows); return merged;
    }
    if (datas.stream().allMatch(d -> d instanceof JSONArray)) {        // ② 纯数组:直接拼
        JSONArray merged = new JSONArray(); datas.forEach(d -> merged.addAll((JSONArray) d)); return merged;
    }
    if (datas.stream().allMatch(d -> d instanceof JSONObject)) {       // ③ 键值对象:逐键合并
        JSONObject merged = new JSONObject();
        for (Object d : datas) for (String k : ((JSONObject) d).keySet()) {
            Object exist = merged.get(k);
            if (exist == null || (exist instanceof String && ((String) exist).isEmpty()))
                merged.put(k, ((JSONObject) d).get(k));   // 已有非空值不覆盖
        }
        return merged;
    }
    return datas;                                          // ④ 混合类型:原样返回
}
形态 判定 合并策略
明细表 每块都是 {rows:[...]} 拼接所有 rowscolumns 取首个非空
数组 每块都是 JSONArray 直接 addAll
键值对象 每块都是 JSONObject 逐键合并,已有非空值不覆盖(防分块重复字段互相冲掉)
混合 以上都不满足 原样返回各分块,交由人工/装配器处理

6.5 小结

  • 客户端层 AiClient:鉴权、回调接收、异步、文件抽取、JSON 清洗全收口,上层只调方法名;模型可替换。关键防线是两处截断检测(同步回调 + 异步查询)和双层重试
  • 编排层 AiPdfExtractionServiceImpl:切表 → OCR JSON → 并行抽 → 合并,产出纯粹的 AiExtractionResult,不碰数据库。
  • 失败隔离贯穿始终:单块失败返回错误而非抛出、单表全失败只标 Failed、部分失败保留成功数据,便于精准重试。

下一章讲怎么把这套抽取能力接进真实业务、异步化、并最终落库。

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐