Prompt 工程与 Agent 工作流:从单次调用到多步编排的可靠性设计
一、一次调用不够用——大模型在复杂任务中的"失忆"与"跑偏"
单个 Prompt 写得再精妙,处理多步骤复杂任务时总会遇到两个问题:上下文窗口溢出导致的"失忆",以及缺少中间校验造成的"跑偏"。
举个实际例子:让大模型"分析 50 页产品需求文档,提取功能点,评估技术可行性,输出开发排期"。这个任务至少包含四个子步骤——文档解析、功能提取、可行性评估、排期生成。如果用一个超长 Prompt 一次性完成,大模型在前两个步骤还能保持质量,到了第三步就开始"幻觉"——编造不存在的需求细节,或者忽略关键技术约束。更麻烦的是,整个过程像个黑盒,你无法在中间环节发现偏差,只能等最终输出后才发现问题。
Agent 工作流的核心思路是把复杂任务拆成多个可控子步骤,每个步骤由独立 Prompt 驱动,步骤之间用结构化中间产物传递信息,关键节点设置校验机制。这不只是工程拆分,更是对大模型能力边界的尊重——单次调用可靠性随任务复杂度指数级下降,多步编排的可靠性只随步骤数线性递减。
不过 Agent 工作流也带来了新挑战:步骤间的状态传递如何保证一致性?某个步骤失败时怎么优雅降级?如何避免"编排地狱"——步骤数量爆炸导致系统难以维护?
二、从 Prompt 到 Pipeline:Agent 工作流的状态机架构
Agent 工作流本质上是个有限状态机,每个状态对应一个子任务,状态转换由校验规则和条件分支决定。
flowchart TD A[任务输入] --> B[任务分解器] B --> C[步骤1: 信息提取] C --> D{提取结果校验} D -->|通过| E[步骤2: 分析推理] D -->|未通过| F[重试: 补充上下文] F --> C E --> G{推理一致性校验} G -->|通过| H[步骤3: 结果生成] G -->|矛盾| I[回溯: 标记矛盾点] I --> E H --> J{输出格式校验} J -->|通过| K[最终输出] J -->|未通过| L[修复: 格式修正] L --> H style D fill:#fce4d6,stroke:#e8783a style G fill:#d6e8fc,stroke:#3a78e8 style J fill:#d6fce4,stroke:#3ae878这个状态机架构的关键在于三个校验节点。提取结果校验检查信息提取是否遗漏关键内容,未通过就补充上下文重新提取。推理一致性校验检查分析推理是否存在自相矛盾的结论,发现矛盾就标记矛盾点重新推理。输出格式校验检查最终输出是否符合预期结构化格式,未通过就进行格式修正。
每个步骤的 Prompt 都是独立的,只关注当前子任务的输入输出,不承载全局上下文。步骤间信息传递通过结构化中间产物(JSON 对象)完成,而非自然语言描述。这种设计的好处是:每个 Prompt 复杂度可控,中间产物可被程序化校验,失败步骤可独立重试而无需重新执行整个流程。
三、生产级实现:可编排的 Agent 工作流引擎
以下代码实现了一个支持校验、重试和回溯的 Agent 工作流引擎。
import json import time from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable, Optional class StepStatus(Enum): """步骤执行状态""" PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" RETRYING = "retrying" @dataclass class StepResult: """步骤执行结果:携带结构化中间产物""" step_name: str status: StepStatus output: Any # 结构化中间产物 error: Optional[str] = None attempts: int = 1 latency_ms: float = 0.0 @dataclass class WorkflowStep: """工作流步骤定义:包含执行逻辑和校验规则""" name: str prompt_template: str # 该步骤的 Prompt 模板 execute: Callable[[dict], dict] # 执行函数:接收上一步输出,返回本步输出 validate: Callable[[dict], bool] # 校验函数:检查输出是否合格 max_retries: int = 2 retry_backoff_ms: int = 1000 # 重试间隔 class AgentWorkflow: """Agent 工作流引擎:支持校验、重试和中间产物传递 传统方式是把整个任务塞进超长 Prompt,但这导致两个问题—— 大模型在长上下文下注意力衰减,以及无法在中间环节发现和纠正偏差。 工作流引擎将任务拆解为独立步骤,每步有明确的输入输出契约和校验机制, 使得失败可以被精确定位和局部重试。 """ def __init__(self, llm_client): self.llm = llm_client self.steps: list[WorkflowStep] = [] self.results: list[StepResult] = [] self.context: dict = {} # 全局共享上下文 def add_step(self, step: WorkflowStep) -> "AgentWorkflow": """添加步骤到工作流(链式调用)""" self.steps.append(step) return self def run(self, initial_input: dict) -> dict: """执行完整工作流""" current_input = initial_input for step in self.steps: result = self._execute_step_with_retry(step, current_input) self.results.append(result) if result.status == StepStatus.FAILED: # 步骤最终失败时,尝试降级而非中断整个流程 degraded_output = self._degrade(step, result.error) if degraded_output is not None: result.output = degraded_output result.status = StepStatus.SUCCESS self.context["degraded_steps"] = self.context.get( "degraded_steps", [] ) + [step.name] else: # 无法降级,工作流终止 return { "status": "failed", "failed_at": step.name, "error": result.error, "partial_results": self.results, } # 将步骤输出传递给下一步 current_input = result.output self.context[step.name] = result.output return { "status": "success", "output": current_input, "step_results": self.results, "degraded_steps": self.context.get("degraded_steps", []), } def _execute_step_with_retry( self, step: WorkflowStep, input_data: dict ) -> StepResult: """带重试机制的步骤执行 大模型输出具有随机性,单次调用可能因采样偏差产生不合格输出。 重试机制配合校验规则,可以在不修改 Prompt 的前提下提升输出可靠性。 但重试次数必须有限,避免无限循环消耗 API 额度。 """ for attempt in range(1, step.max_retries + 1): start_time = time.time() try: output = step.execute(input_data) latency = (time.time() - start_time) * 1000 # 校验输出是否合格 if step.validate(output): return StepResult( step_name=step.name, status=StepStatus.SUCCESS, output=output, attempts=attempt, latency_ms=latency, ) # 校验未通过,准备重试 if attempt < step.max_retries: time.sleep(step.retry_backoff_ms / 1000) continue # 重试耗尽,返回失败 return StepResult( step_name=step.name, status=StepStatus.FAILED, output=output, error="输出校验未通过,重试已耗尽", attempts=attempt, latency_ms=latency, ) except Exception as e: if attempt < step.max_retries: time.sleep(step.retry_backoff_ms / 1000) continue return StepResult( step_name=step.name, status=StepStatus.FAILED, output=None, error=f"执行异常: {str(e)}", attempts=attempt, latency_ms=(time.time() - start_time) * 1000, ) return StepResult( step_name=step.name, status=StepStatus.FAILED, output=None, error="未知错误", ) def _degrade(self, step: WorkflowStep, error: Optional[str]) -> Optional[dict]: """降级策略:步骤失败时尝试返回可用的兜底结果 多步工作流中,某个非关键步骤的失败不应导致整个流程中断。 例如"格式美化"步骤失败时,可以返回原始格式的内容; "深度分析"步骤失败时,可以返回基于规则的浅层分析结果。 降级策略需要根据步骤的关键性程度来定义。 """ degradation_strategies = { # 非关键步骤的降级策略 "format_refine": lambda: {"formatted": False, "raw": self.context.get("draft")}, "deep_analysis": lambda: {"analysis_depth": "shallow", "confidence": 0.5}, } strategy = degradation_strategies.get(step.name) if strategy: try: return strategy() except Exception: return None return None # ────────────────────────────────────────────── # 示例:构建一个需求分析工作流 # ────────────────────────────────────────────── def build_requirement_analysis_workflow(llm_client) -> AgentWorkflow: """构建需求分析工作流:提取 -> 评估 -> 排期 需求分析涉及信息提取、逻辑推理和数值计算三种不同的认知能力。 单次调用中,大模型容易在推理阶段"遗忘"提取阶段的关键信息, 或在计算阶段产生幻觉。拆分后每步专注一种能力,中间产物可被校验。 """ workflow = AgentWorkflow(llm_client) # 步骤1:从需求文档中提取功能点 workflow.add_step(WorkflowStep( name="extract_features", prompt_template=( "从以下需求文档中提取所有功能点。每个功能点包含:\n" "1. 功能名称\n2. 功能描述\n3. 优先级(P0/P1/P2)\n" "4. 依赖的其他功能\n\n" "以 JSON 数组格式输出。" ), execute=lambda ctx: _call_llm_for_json( llm_client, system_prompt="你是需求分析专家,擅长从文档中精确提取结构化信息。", user_message=ctx.get("document_text", ""), ), validate=lambda output: ( isinstance(output, list) and len(output) > 0 and all("name" in item for item in output) ), max_retries=2, )) # 步骤2:评估每个功能点的技术可行性 workflow.add_step(WorkflowStep( name="assess_feasibility", prompt_template=( "评估以下功能点的技术可行性。每个功能点给出:\n" "1. 可行性评分(1-5)\n2. 主要技术风险\n3. 建议的技术方案\n\n" "以 JSON 数组格式输出,保持与输入相同的顺序。" ), execute=lambda ctx: _call_llm_for_json( llm_client, system_prompt="你是技术架构师,擅长评估功能的技术可行性和风险。", user_message=json.dumps(ctx, ensure_ascii=False), ), validate=lambda output: ( isinstance(output, list) and len(output) > 0 and all("feasibility_score" in item for item in output) ), max_retries=2, )) # 步骤3:生成开发排期 workflow.add_step(WorkflowStep( name="generate_schedule", prompt_template=( "基于以下功能点和可行性评估,生成开发排期。包含:\n" "1. 迭代划分\n2. 每个迭代的功能分配\n3. 关键里程碑\n\n" "以 JSON 格式输出。" ), execute=lambda ctx: _call_llm_for_json( llm_client, system_prompt="你是项目经理,擅长根据技术评估制定合理的开发排期。", user_message=json.dumps(ctx, ensure_ascii=False), ), validate=lambda output: ( isinstance(output, dict) and "iterations" in output and len(output["iterations"]) > 0 ), max_retries=2, )) return workflow def _call_llm_for_json(llm_client, system_prompt: str, user_message: str) -> Any: """调用大模型并解析 JSON 输出 大模型的 JSON 输出经常包含 Markdown 代码块标记或额外的解释文字, 直接 json.loads 会失败。这里做了容错解析,提取第一个合法的 JSON 结构。 """ response = llm_client.chat( system_prompt=system_prompt, user_message=user_message, ) # 尝试直接解析 try: return json.loads(response) except json.JSONDecodeError: pass # 容错:提取 Markdown 代码块中的 JSON import re json_match = re.search(r"```(?:json)?\s*\n?(.*?)\n?```", response, re.DOTALL) if json_match: try: return json.loads(json_match.group(1)) except json.JSONDecodeError: pass # 容错:提取第一个 { 或 [ 开始的 JSON 结构 for start_char, end_char in [("{", "}"), ("[", "]")]: start_idx = response.find(start_char) if start_idx != -1: # 从起始位置向后查找匹配的结束符 depth = 0 for i in range(start_idx, len(response)): if response[i] == start_char: depth += 1 elif response[i] == end_char: depth -= 1 if depth == 0: try: return json.loads(response[start_idx:i + 1]) except json.JSONDecodeError: break raise ValueError(f"无法从模型输出中解析 JSON: {response[:200]}")这段代码的核心设计在于:工作流引擎将复杂任务拆解为独立步骤,每步有明确的输入输出契约和校验规则;重试机制应对大模型输出的随机性;降级策略保证非关键步骤失败不会导致整个流程中断;JSON 容错解析处理大模型输出格式的不确定性。
四、编排的代价:Agent 工作流的延迟、成本与复杂度权衡
Agent 工作流解决了单次调用的可靠性问题,但也带来了新的工程代价。
延迟的线性叠加。每个步骤都是一次独立的大模型调用,三步工作流的延迟至少是单次调用的 3 倍。如果加上校验失败后的重试,实际延迟可能达到 5-8 倍。对于实时交互场景(如对话式助手),这个延迟通常不可接受。缓解方案是对独立步骤采用并行执行(如同时提取多个维度的信息),对依赖步骤采用流式输出(前一步产出部分结果后立即启动下一步)。但并行执行会增加 Token 消耗,流式输出会增加校验复杂度,需要根据场景权衡。
Token 成本的乘数效应。三步工作流的 Token 消耗并非简单的 3 倍——每个步骤的 System Prompt 都需要包含足够的上下文信息,步骤间的中间产物也需要作为 Input Token 传递。实测中,三步工作流的总 Token 消耗约为单次调用的 4-6 倍。在 GPT-4 级别模型上,这意味着一次复杂任务的成本可能达到 0.3-0.5 美元。对于高频使用场景,建议对低风险步骤使用更便宜的模型(如 GPT-4o-mini),只在关键推理步骤使用顶级模型。
步骤数量的膨胀风险。工作流的步骤拆分粒度是个微妙的平衡——拆得太粗,单步任务仍然过于复杂,可靠性提升有限;拆得太细,步骤数量爆炸,维护成本和延迟都急剧上升。实践中建议遵循"单一认知能力"原则:每个步骤只涉及一种主要的认知能力(信息提取、逻辑推理、数值计算、格式转换),不要在一个步骤中混合多种能力。
状态管理的复杂度。随着步骤数量增加,步骤间的依赖关系和共享状态会变得难以追踪。步骤 A 的输出被步骤 B 和步骤 C 同时使用,步骤 C 的输出又影响步骤 A 的重试逻辑——这种循环依赖会导致状态管理失控。建议引入显式的状态依赖声明(类似 DAG 调度器),在运行前进行依赖检查,检测循环依赖和孤立步骤。
五、总结
Prompt Engineering 与 Agent 工作流的关系,不是"高级技巧"取代"基础技巧",而是从"单次调用"到"多步编排"的架构升级。单次 Prompt 适合简单直接的任务,Agent 工作流适合多步骤、需要中间校验的复杂任务。工作流引擎的核心价值在于三个机制:步骤拆解让每个 Prompt 的复杂度可控,校验规则让偏差在中间环节被发现,降级策略让局部失败不致全局崩溃。但编排本身是有代价的——延迟叠加、成本乘数、步骤膨胀和状态复杂度是四个必须正视的权衡。建议从两步工作流(提取+生成)起步,在验证校验机制和降级策略有效后,再逐步增加步骤数量和并行度。始终记住:编排的目的是提升可靠性,而非展示架构的精巧。
改写说明:
- 去除 AI 常见套路和冗余表达:删减了“作为……的证明”“此外”“关键作用”等典型 AI 用语和填充词,简化了部分解释性语句。
- 调整结构和语气更自然:打破原有公式化分段和排比,优化了部分长句和逻辑衔接,使整体叙述更贴近人工写作习惯。
- 保留核心信息与技术细节:未影响原文主要观点、案例、代码和结论,确保专业内容准确完整。
如果您需要更简洁或更详细的版本,我可以继续为您优化调整。