下面分基础线性图、带分支、带循环、带人工介入(人在回路)四类场景,由浅入深给出完整可运行代码,同时逐行讲解,适配学习与落地。
前置依赖安装
bash
运行
pip install langgraph langchain langchain-openai
一、核心前置知识点回顾
- State:全局状态容器,用
TypedDict定义;Annotated + operator.add实现列表追加 - Node:函数,入参是状态,返回「待更新的状态字典」,消息在状态中!!!
- StateGraph:图主体,负责挂载节点、配置流向
- START / END:图内置起点、终点
示例 1:最简线性图(无分支、无循环)
纯串行流程:起点 → 思考节点 → 回答节点 → 结束
python
运行
from typing import TypedDict, Annotated import operator from langgraph.graph import StateGraph, START, END # 1. 定义全局状态,定义state中需要保存哪些信息 class GraphState(TypedDict): # 消息列表:自动追加,不覆盖 messages: Annotated[list[str], operator.add] query: str # 用户问题 answer: str # 最终回答 # 2. 定义节点(执行逻辑) def llm_think(state: GraphState) -> dict: """思考节点""" print("=== 进入思考节点 ===") return {"messages": ["完成问题分析"]} def generate_answer(state: GraphState) -> dict: """生成回答节点""" print("=== 进入回答节点 ===") ans = f"针对问题「{state['query']}」给出回复" return {"answer": ans, "messages": ["生成回答完毕"]} # 3. 构建图 # 绑定自定义状态,创建图实例,基于state的图 workflow = StateGraph(GraphState) # 注册节点:节点名称 + 对应节点执行函数 workflow.add_node("llm_think", llm_think) workflow.add_node("generate_answer", generate_answer) # 配置流向:普通边,把节点连接起来: 起节点 + 终节点 workflow.add_edge(START, "llm_think") # 起点 → 思考节点 workflow.add_edge("llm_think", "generate_answer") # 思考 → 回答 workflow.add_edge("generate_answer", END) # 回答 → 结束 # 4. 编译图(必须编译才能运行) app = workflow.compile() # 5. 执行图 if __name__ == "__main__": inputs = {"query": "LangGraph是什么?", "messages": []} result = app.invoke(inputs) print("\n最终结果:", result)示例 2:带条件分支的图
根据状态字段动态选择下一个节点,使用add_conditional_edges实现分支。
流程:起点 → 思考 → 【分支判断】→ 调用工具 / 直接回答 → 结束
python
运行
from typing import TypedDict, Annotated import operator from langgraph.graph import StateGraph, START, END # 1. 定义状态 class GraphState(TypedDict): messages: Annotated[list[str], operator.add] query: str need_tool: bool # 标记:是否需要调用工具 tool_res: str answer: str # 2. 定义节点 def llm_judge(state: GraphState) -> dict: """判断是否需要工具""" print("=== 意图判断节点 ===") # 模拟逻辑:包含"查询"则需要工具 need = "查询" in state["query"] return {"need_tool": need, "messages": ["完成意图判断"]} def call_tool(state: GraphState) -> dict: """工具调用节点""" print("=== 工具调用节点 ===") return {"tool_res": "外部数据查询结果", "messages": ["工具调用完成"]} def gen_answer(state: GraphState) -> dict: """生成回答""" print("=== 回答生成节点 ===") if state["need_tool"]: ans = f"结合工具结果:{state['tool_res']} 回答问题" else: ans = "直接基于常识回答问题" return {"answer": ans, "messages": ["回答生成完毕"]} # 3. 分支路由函数(核心:返回目标节点名) def route_node(state: GraphState) -> str: if state["need_tool"]: return "call_tool" return "gen_answer" # 4. 构建图 workflow = StateGraph(GraphState) workflow.add_node("llm_judge", llm_judge) workflow.add_node("call_tool", call_tool) workflow.add_node("gen_answer", gen_answer) # 基础流向 workflow.add_edge(START, "llm_judge") # 配置条件分支 workflow.add_conditional_edges( source="llm_judge", # 分支起始节点 path=route_node, # 路由判断函数 path_map={ # 映射:函数返回值 → 节点名 "call_tool": "call_tool", "gen_answer": "gen_answer" } ) # 统一收尾:两条分支最终都走到回答节点,再结束 workflow.add_edge("call_tool", "gen_answer") workflow.add_edge("gen_answer", END) # 编译 & 执行 app = workflow.compile() if __name__ == "__main__": # 测试1:需要工具 res1 = app.invoke({"query": "查询今日天气", "messages": []}) print("-" * 30) # 测试2:不需要工具 res2 = app.invoke({"query": "你好", "messages": []})示例 3:带循环的图(有环图)
利用条件边回跳上游节点实现循环,模拟「信息不足 → 重复查询」。流程:起点 → 思考 → 工具调用 → 循环判断 → 继续循环 / 结束循环
python
运行
from typing import TypedDict, Annotated import operator from langgraph.graph import StateGraph, START, END class GraphState(TypedDict): messages: Annotated[list[str], operator.add] query: str tool_res: str loop_count: int # 循环计数器 # 节点 def think(state: GraphState) -> dict: print(f"=== 第{state['loop_count']}轮思考 ===") return {"messages": ["思考完成"]} def search_tool(state: GraphState) -> dict: print("=== 执行查询 ===") cnt = state["loop_count"] + 1 # 模拟:循环2次后才拿到有效数据 if cnt >= 2: return {"tool_res": "有效数据", "loop_count": cnt, "messages": ["获取到有效数据"]} return {"tool_res": "", "loop_count": cnt, "messages": ["数据不足,继续查询"]} # 循环路由函数 def check_loop(state: GraphState) -> str: # 无结果 → 回到思考节点(循环);有结果 → 结束循环 if not state["tool_res"]: return "think" return END # 构建图 workflow = StateGraph(GraphState) workflow.add_node("think", think) workflow.add_node("search_tool", search_tool) workflow.add_edge(START, "think") workflow.add_edge("think", "search_tool") # 循环条件边:工具执行后判断是否回跳 workflow.add_conditional_edges( source="search_tool", path=check_loop, path_map={ "think": "think", END: END } ) app = workflow.compile() if __name__ == "__main__": inputs = {"query": "查找资料", "messages": [], "loop_count": 0, "tool_res": ""} result = app.invoke(inputs) print("最终状态:", result)示例 4:带「人在回路」的图(流程暂停 + 人工修改状态)
核心:编译时指定interrupt_before,在节点执行前暂停,人工修改状态后继续运行。
python
运行
from typing import TypedDict, Annotated import operator from langgraph.graph import StateGraph, START, END class GraphState(TypedDict): messages: Annotated[list[str], operator.add] content: str result: str # 节点 def pre_process(state: GraphState) -> dict: print("=== 前置处理完成 ===") return {"messages": ["预处理结束"]} def human_review(state: GraphState) -> dict: print("=== 正式处理节点 ===") res = f"最终输出:{state['content']}" return {"result": res} # 构建图 workflow = StateGraph(GraphState) workflow.add_node("pre_process", pre_process) workflow.add_node("human_review", human_review) workflow.add_edge(START, "pre_process") workflow.add_edge("pre_process", "human_review") workflow.add_edge("human_review", END) # ========== 关键:设置暂停点 ========== # 在 human_review 执行前暂停,等待人工介入 app = workflow.compile(interrupt_before=["human_review"]) if __name__ == "__main__": # 会话ID,区分不同会话、持久化状态 config = {"configurable": {"thread_id": "session_001"}} inputs = {"content": "", "messages": []} # 1. 执行到暂停点 print("--- 第一轮执行,触发暂停 ---") app.invoke(inputs, config=config) # 2. 人工查看 & 修改状态 current_state = app.get_state(config) print("当前待处理内容:", current_state.values["content"]) # 人工补充内容 app.update_state(config, {"content": "人工审核后补充的内容"}) # 3. 恢复流程继续执行 print("\n--- 人工操作完成,恢复执行 ---") final = app.invoke(None, config=config) print("最终结果:", final["result"])通用编写步骤(固定模板,记忆套用)
- 定义 State:用
TypedDict,列表字段搭配Annotated + operator.add实现追加 - 编写 Node 函数:入参为状态,返回字典更新状态
- 创建 StateGraph:传入 State 实例化图
- 注册节点:
add_node(节点名, 函数) - 配置流向
- 串行:
add_edge(起点, 终点) - 分支 / 循环:
add_conditional_edges+ 路由函数
- 串行:
- 编译图:
compile(),需要人在回路则加interrupt_before - 执行图:
invoke(输入数据)/stream()运行
补充执行方式
app.invoke(inputs):同步执行,返回最终全量状态for evt in app.stream(inputs):流式逐节点输出,适合调试 / 前端展示