news 2026/6/30 11:26:52

PDF-OCR文件识别篇(六):AI 客户端封装与结构化抽取

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
PDF-OCR文件识别篇(六):AI 客户端封装与结构化抽取

本章是整条流水线的「大脑」,分两层讲:

  • 客户端层AiClient:把大模型的鉴权、调用、回调接收、异步、文件抽取、JSON 清洗全部收口,对上只暴露几个干净方法。
  • 编排层AiPdfExtractionServiceImpl:把「切分(第3章)+ OCR识别JSON(第4章)+ 提示词(第5章)+ 调用」编排成「PDF → 结构化 JSON」,并解决并行、重试、合并、失败隔离。

下文实现以 GLM(zai-sdk)为例,换其它模型只需替换客户端实现,编排层不动。

6.1 AiClient:统一「JSON 输出 + 关闭思考」

6.1.1 懒加载客户端

SDK 客户端用double-checked locking懒加载,保证全局单例、线程安全,且只有真正调用时才初始化(启动期不依赖密钥就绪):

private volatile ai.z.openapi.ZhipuAiClient sdkClient; // volatile 保证可见性 private ai.z.openapi.ZhipuAiClient client() { if (sdkClient == null) { synchronized (this) { if (sdkClient == null) { if (StringUtils.isEmpty(properties.getApiKey())) { throw new ServiceException("智谱AI调用失败:未配置 apiKey"); } int sec = Math.max(60, properties.getTimeout() / 1000); // 下限 60s // 请求/连接/读/写 四个超时统一用大值:大表输出长,读超时最容易踩 sdkClient = ai.z.openapi.ZhipuAiClient.builder() .ofZHIPU() .apiKey(properties.getApiKey()) .networkConfig(sec, sec, sec, sec, TimeUnit.SECONDS) .build(); log.info("智谱AI SDK 客户端已初始化,model={}, timeout={}s", properties.getModel(), sec); } } } return sdkClient; }

细节:

  • volatile不能省:没有它,另一个线程可能读到「构造一半」的对象引用。
  • 超时下限 60stimeout配小了也至少给 60 秒,避免大表一调就超时。
  • 四个超时全放大:连接快但慢是大模型调用的典型特征(要等它生成完),四项统一放大最省心。

    6.1.2 请求参数buildParams

    每次调用都构造「系统 + 用户」两条消息,并固定几个关键开关:

    private ChatCompletionCreateParams buildParams(String systemPrompt, String userContent, boolean stream) { ChatMessage system = ChatMessage.builder() .role(ChatMessageRole.SYSTEM.value()).content(systemPrompt).build(); ChatMessage user = ChatMessage.builder() .role(ChatMessageRole.USER.value()).content(userContent).build(); return ChatCompletionCreateParams.builder() .model(properties.getModel()) .messages(Arrays.asList(system, user)) .stream(stream) .temperature((float) properties.getTemperature()) // 默认 0.1 .maxTokens(properties.getMaxTokens() > 0 ? properties.getMaxTokens() : null) .responseFormat(new ResponseFormat("json_object")) // 强制 JSON .thinking(new ChatThinking(ChatThinkingType.DISABLED.value())) // 关闭思考 .build(); }
参数取值为什么
temperature0.1(低温)结构化抽取要稳定可复现,不要创造性
maxTokens16384大表 JSON 很长,给小了会被截断(finish_reason=length
responseFormatjson_object让模型直接产出 JSON,少一层文本清洗
thinkingDISABLED抽取是「照搬」不是「推理」,关掉思考省 token、更快
modelglm-5.1要更准切glm-5.1,新模型理解能力强

6.1.3 二种调用形态

AiClient对外提供二个入口,对应不同使用场景:

chatForJson/chatForJsonResult同步 + 回调接收批量抽取编排器runChunk用,带重试
submitAsync异步提交单表「调AI」按钮,立即返回taskId,不阻塞页面
queryAsync异步查询定时任务轮询,识别PROCESSING/SUCCESS/FAIL

6.1.4 回调接口接收输出(替代旧的阻塞流式)

早期用 RxJavaFlowable.blockingForEach阻塞式流式读取,缺点是独占当前线程、与抽取线程池并行时不友好。现已改为注册回调接收模型输出:SDK 收到内容(增量片段或一次性结果)时回调,由回调累积内容、记录finish_reason与 token 用量,doChat在回调结束后拿到完整 JSON。

// 回调处理器:累积内容 + 记录用量/截断(接口名以实际 SDK 回调为准) StringBuilder sb = new StringBuilder(); Usage[] usageHolder = {null}; String[] finish = {null}; client().chat().createChatCompletion(request, new ChatStreamCallback() { @Override public void onMessage(ModelData md) { // 收到内容片段/结果 if (md.getUsage() != null) usageHolder[0] = md.getUsage(); // 用量通常在末尾给 if (md.getChoices() == null || md.getChoices().isEmpty()) return; Choice c = md.getChoices().get(0); if (c.getDelta() != null && c.getDelta().getContent() != null) { sb.append(c.getDelta().getContent()); // 累积,不再 System.out.print } if (c.getFinishReason() != null) finish[0] = c.getFinishReason(); } @Override public void onError(Throwable t) { // 出错统一转业务异常 throw new ServiceException("智谱AI回调异常:" + t.getMessage()); } @Override public void onComplete() { /* 收尾,可选 */ } }); // 截断检测:JSON 不完整时尽早失败,触发上层重试或提示调大 max-tokens if ("length".equalsIgnoreCase(finish[0])) throw new ServiceException("AI输出超长被截断(finish_reason=length),请调大 max-tokens、或对该长表启用分块"); String content = stripJsonFence(sb.toString()); // 完整内容交给下游

回调式相比阻塞流式的好处:

  • 不占用调用线程:回调由 SDK 的 IO 线程驱动,抽取线程池(6.3 节)能跑满并发。
  • 结构一致:无论增量片段还是一次性结果,都在onMessage里统一累积,doChat逻辑不变。
  • 错误集中onError统一转ServiceException,被 6.1.3 的重试循环接住。

ChatStreamCallback/onMessage / onError / onComplete为示意名,请替换为你实际使用的回调接口与方法签名。finish_reason == length的截断判断务必保留——它是「JSON 被切一半」最常见的根因。

6.1.5 异步提交与查询(给前端用)

页面点「调AI」不能干等,于是走异步任务:提交后立即拿taskId返回,结果由定时任务轮询(详见第 7 章)。

public String submitAsync(String systemPrompt, String userContent, String label) { ChatCompletionCreateParams request = buildParams(systemPrompt, userContent, false); ChatCompletionResponse response = client().chat().asyncChatCompletion(request); if (response == null) throw new ServiceException("智谱AI异步提交返回为空"); if (!response.isSuccess()) throw new ServiceException("智谱AI异步提交失败:code=" + response.getCode() + ", msg=" + response.getMsg()); String taskId = response.getData() == null ? null : response.getData().getId(); if (StringUtils.isEmpty(taskId)) throw new ServiceException("智谱AI异步提交未返回任务ID"); return taskId; }

查询queryAsync把 SDK 的任务状态映射成自己的AsyncChatResultPROCESSING/SUCCESS/FAIL),并在「成功」分支里做完整校验:

public AsyncChatResult queryAsync(String taskId) { QueryModelResultResponse response = client().chat().retrieveAsyncResult( AsyncResultRetrieveParams.builder().taskId(taskId).build()); // …response 空/失败校验略… ModelData data = response.getData(); TaskStatus status = data == null ? null : data.getTaskStatus(); AsyncChatResult result = new AsyncChatResult(); if (status == null) { result.setStatus("PROCESSING"); return result; } // 状态未知按处理中 result.setStatus(status.name()); if (status == TaskStatus.PROCESSING) return result; // 等下次轮询 if (status == TaskStatus.FAIL) { result.setError("智谱返回任务失败(FAIL)"); return result; } // SUCCESS:记 token 用量 if (data.getUsage() != null) { /* set prompt/completion/total tokens */ } if (data.getChoices() == null || data.getChoices().isEmpty()) { // 成功却无内容 → 判失败 result.setStatus("FAIL"); result.setError("任务成功但无 choices"); return result; } Choice choice = data.getChoices().get(0); if ("length".equalsIgnoreCase(choice.getFinishReason())) { // 截断 → 判失败 result.setStatus("FAIL"); result.setError("AI输出超长被截断(finish_reason=length)…"); return result; } String text = choice.getMessage() == null ? null : String.valueOf(choice.getMessage().getContent()); if (StringUtils.isEmpty(text)) { result.setStatus("FAIL"); result.setError("content 为空"); return result; } result.setContent(stripJsonFence(text)); return result; }

关键点:「成功」不等于「拿到可用 JSON」。无 choices、被截断、content 为空,都要在查询时降级为FAIL,否则脏数据会流到入库环节。

6.1.7 去代码块包裹stripJsonFence

即使要求了json_object,模型偶尔仍会用```json … ```把结果包起来。统一剥掉首尾围栏再交给JSON.parse,否则会解析失败:

private String stripJsonFence(String content) { if (StringUtils.isEmpty(content)) return content; String text = content.trim(); if (text.startsWith("```")) { int nl = text.indexOf('\n'); if (nl > 0) text = text.substring(nl + 1); // 去掉首行 ```json if (text.endsWith("```")) text = text.substring(0, text.length() - 3); } return text.trim(); }

6.2 抽取总编排AiPdfExtractionServiceImpl

这是「PDF → 结构化 JSON」的纯抽取编排器(不落库)。主流程doExtract

slice 切表 → 对每段调百度OCR识别成 JSON → 构建任务(buildChunkTasks) → 线程池并行调 AI(runChunk) → 按表合并(mergeSection) → AiExtractionResult
private AiExtractionResult doExtract(byte[] pdfBytes, String fileName, String[] titleKeywords) { long start = System.currentTimeMillis(); List<PdfSection> sections = pdfTableSlicer.slice(pdfBytes); // ① 按表切分(第3章) if (titleKeywords != null && titleKeywords.length > 0) // 仅抽指定标题 sections = sections.stream().filter(s -> matchAnyKeyword(s.getTitle(), titleKeywords)).toList(); if (sections.isEmpty()) throw new ServiceException("未在 PDF 中识别到任何「表N」表格"); List<ChunkTask> tasks = properties.isUseFile() ? buildFileTasks(sections, pdfBytes) // ②a 文件抽取(可选) : buildChunkTasks(sections, ocrJsonOf(sections)); // ②b 主线:OCR JSON 投喂 List<ChunkResult> results = runInParallel(tasks); // ③ 并行(6.3) List<TableExtraction> tables = new ArrayList<>(); // ④ 按表合并(6.4) for (int si = 0; si < sections.size(); si++) tables.add(mergeSection(sections.get(si), si, results)); AiExtractionResult result = new AiExtractionResult(); result.setFileName(fileName); result.setTables(tables); result.setTableCount(tables.size()); result.setSuccessCount((int) tables.stream().filter(t -> SUCCESS.equals(t.getStatus())).count()); result.setElapsedMs(System.currentTimeMillis() - start); return result; }

analyze()(只切分、不调 AI,用于前端快速预览有哪些表)、extract()(全量)、extractByTitles()(按标题)都收敛到doExtract

6.2.1 系统提示词SYSTEM_PROMPT

所有分块共用同一套铁律(7 条,详见第 5 章),核心是「逐字保留、字段对齐、合并单元格向下填充、跨页折行合并、只输出 JSON」。它定义为常量,与按表的字段定义(buildSchemaBlock)拼在一起构成完整提示。

6.2.2 构建任务buildChunkTasks

主线输入是该表的百度 OCR 识别结果 JSON,按表大小分两种:

  1. 整表一次投喂(默认):把整表 OCR JSON + 字段定义拼进提示词,一次调用,保留完整表头上下文,准确率最高。
  2. 超大表分块兜底:仅当整表文本超过chunkCharThreshold时,才按chunkPageSize把内容拆块并行、再合并。每个非首块附带「表头参考」,避免分块后丢失列名:
    if (begin > 0) { text = "【表头参考(仅用于确定列名,不要把本段当作数据行)】\n" + headerContext + "\n【待抽取数据】\n" + text; }

默认chunkCharThreshold = Integer.MAX_VALUE,即「整表一次请求、不分块」;分块只是超大表的安全阀。

每个任务封装成内部类ChunkTask(sectionIndex, title, text)——sectionIndex记录它属于哪张表,合并时按它归位。

6.2.3 文件抽取任务buildFileTasks

useFile路径:每段先用PdfSplitter导出独立 PDF(第3章),上传给模型解析(6.1.6),再让模型基于解析内容输出 JSON。临时 PDF 用完即删(Files.deleteIfExists);某段准备失败也会生成一个「空内容任务」,让runChunk统一记为Failed,不中断其它表。

6.3 并行调度与失败隔离runChunk

所有表的所有分块汇总成一个List<ChunkTask>,用固定线程池(concurrency,默认 4)统一并行:

int poolSize = Math.max(1, Math.min(properties.getConcurrency(), tasks.size())); ExecutorService pool = Executors.newFixedThreadPool(poolSize); try { List<CompletableFuture<ChunkResult>> futures = tasks.stream() .map(t -> CompletableFuture.supplyAsync(() -> runChunk(t), pool)) .toList(); return futures.stream().map(CompletableFuture::join).toList(); } finally { shutdown(pool); // shutdown + awaitTermination(timeout×3+5s),超时则 shutdownNow }

runChunk内部再套一层重试,且失败不抛出,而是返回带errorChunkResult,保证单块失败不拖垮整批:

private ChunkResult runChunk(ChunkTask task) { int attempts = Math.max(1, properties.getMaxRetries() + 1); Exception last = null; for (int i = 0; i < attempts; i++) { try { String json = aiClient.chatForJson(SYSTEM_PROMPT, task.text, task.title); if (StringUtils.isBlank(json)) throw new ServiceException("AI 返回内容为空"); Object data = JSON.parse(json); // 解析校验 if (data == null) throw new ServiceException("AI 返回无法解析为 JSON:" + abbreviate(json)); return new ChunkResult(task.sectionIndex, data, null);// 成功 } catch (Exception e) { last = e; log.warn("表格[{}]第{}/{}次抽取失败", task.title, i + 1, attempts, e); } } return new ChunkResult(task.sectionIndex, null, describe(last)); // 失败也返回,不抛 }
注意这里有两层重试chatForJson内部对「网络/接口错误」重试(6.1.3),runChunk对「内容不合法(空、非JSON)」再重试一次。前者管「没调通」,后者管「调通了但产出不可用」。

6.4 结果合并mergeData

一张表可能被拆成多块,多块结果要按类型智能合并。mergeSection先把同一sectionIndex的结果归拢,再交mergeData

private TableExtraction mergeSection(PdfSection section, int sectionIndex, List<ChunkResult> results) { List<Object> datas = new ArrayList<>(); List<String> errors = new ArrayList<>(); for (ChunkResult r : results) { if (r.sectionIndex != sectionIndex) continue; // 只收本表的块 if (r.data != null) datas.add(r.data); else if (r.error != null) errors.add(r.error); } TableExtraction te = new TableExtraction(); te.setTitle(section.getTitle()); te.setStartPage(section.getStartPage()); te.setEndPage(section.getEndPage()); if (datas.isEmpty()) { // 全失败 te.setStatus(FAILED); te.setError(String.join("; ", errors)); return te; } te.setData(mergeData(datas)); // 有成功 → 合并 te.setStatus(SUCCESS); if (!errors.isEmpty()) te.setError("部分分块失败:" + String.join("; ", errors)); // 部分失败也保留 return te; }

mergeData按数据形态分四类处理:

private Object mergeData(List<Object> datas) { if (datas.size() == 1) return datas.get(0); // 不分块的常见情况,直接返回 if (datas.stream().allMatch(this::isRowsObject)) { // ① 明细表 {columns, rows} JSONObject merged = new JSONObject(); JSONArray rows = new JSONArray(); JSONArray columns = null; for (Object d : datas) { JSONObject o = (JSONObject) d; if (columns == null && o.getJSONArray("columns") != null) columns = o.getJSONArray("columns"); if (o.getJSONArray("rows") != null) rows.addAll(o.getJSONArray("rows")); // 拼接所有行 } if (columns != null) merged.put("columns", columns); merged.put("rows", rows); return merged; } if (datas.stream().allMatch(d -> d instanceof JSONArray)) { // ② 纯数组:直接拼 JSONArray merged = new JSONArray(); datas.forEach(d -> merged.addAll((JSONArray) d)); return merged; } if (datas.stream().allMatch(d -> d instanceof JSONObject)) { // ③ 键值对象:逐键合并 JSONObject merged = new JSONObject(); for (Object d : datas) for (String k : ((JSONObject) d).keySet()) { Object exist = merged.get(k); if (exist == null || (exist instanceof String && ((String) exist).isEmpty())) merged.put(k, ((JSONObject) d).get(k)); // 已有非空值不覆盖 } return merged; } return datas; // ④ 混合类型:原样返回 }
形态判定合并策略
明细表每块都是{rows:[...]}拼接所有rowscolumns取首个非空
数组每块都是JSONArray直接addAll
键值对象每块都是JSONObject逐键合并,已有非空值不覆盖(防分块重复字段互相冲掉)
混合以上都不满足原样返回各分块,交由人工/装配器处理

6.5 小结

  • 客户端层AiClient:鉴权、回调接收、异步、文件抽取、JSON 清洗全收口,上层只调方法名;模型可替换。关键防线是两处截断检测(同步回调 + 异步查询)和双层重试
  • 编排层AiPdfExtractionServiceImpl:切表 → OCR JSON → 并行抽 → 合并,产出纯粹的AiExtractionResult,不碰数据库。
  • 失败隔离贯穿始终:单块失败返回错误而非抛出、单表全失败只标Failed、部分失败保留成功数据,便于精准重试。

下一章讲怎么把这套抽取能力接进真实业务、异步化、并最终落库。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/30 11:26:07

Sesame-TK:蚂蚁森林自动化助手终极指南

Sesame-TK&#xff1a;蚂蚁森林自动化助手终极指南 【免费下载链接】Sesame-TK 芝麻粒TK版 项目地址: https://gitcode.com/gh_mirrors/se/Sesame-TK Sesame-TK是一款专为支付宝蚂蚁森林设计的智能自动化工具&#xff0c;通过模块化架构实现一键收取能量、好友互动、道具…

作者头像 李华
网站建设 2026/6/30 11:25:48

盐城装修付款避坑需要注意哪些核心事项

对于盐城本地准备装修的业主来说&#xff0c;付款环节是最容易产生权益纠纷的核心节点&#xff0c;盐城装修付款避坑是绝大多数业主开工前必须梳理清楚的关键问题。装修行业长期存在的预付款比例不合理、付款节点不清晰、付款条件不绑定验收等问题&#xff0c;往往会让业主在施…

作者头像 李华
网站建设 2026/6/30 11:24:18

PG 日报|PG 排序性能优化,新增 UUID 聚合函数

&#x1f514; 关注【IvorySQL开源数据库社区】即可获取 PostgreSQL 一手干货与最新动态⚙️ PostgreSQL技术文章 &#x1f9e9; 在满足欧盟数据主权要求的同时加快创新步伐2026年6月&#xff0c;欧盟委员会发布European Tech Sovereignty一揽子政策&#xff0c;将数据主权提升…

作者头像 李华
网站建设 2026/6/30 11:24:17

从零到一:在uni-app项目中优雅集成Pinia状态管理

1. 为什么要在uni-app中使用Pinia&#xff1f; 第一次接触uni-app的状态管理时&#xff0c;你可能会有这样的疑问&#xff1a;既然uni-app已经内置了Vuex&#xff0c;为什么还要用Pinia&#xff1f;我刚开始也有同样的困惑&#xff0c;直到在实际项目中踩了几个坑才明白两者的区…

作者头像 李华
网站建设 2026/6/30 11:23:31

AMD Ryzen处理器性能调优终极指南:免费开源调试工具完全掌握

AMD Ryzen处理器性能调优终极指南&#xff1a;免费开源调试工具完全掌握 【免费下载链接】SMUDebugTool A dedicated tool to help write/read various parameters of Ryzen-based systems, such as manual overclock, SMU, PCI, CPUID, MSR and Power Table. 项目地址: http…

作者头像 李华
网站建设 2026/6/30 11:21:04

深耕本地化家教服务:昆明金廷教育的办学优势与价值探析

当下社会&#xff0c;家庭教育早已成为学校教育的重要补充&#xff0c;是青少年成长过程中不可或缺的关键环节。随着学业竞争日趋多元&#xff0c;学生个性化学习问题愈发突出&#xff0c;不少家长面临孩子偏科严重、学习效率低下、学习心态浮躁等各类教育难题&#xff0c;亟需…

作者头像 李华