痛点
你在生产环境跑 AI Agent,早晚会遇到这些问题:
- 单轮对话不够用 — Agent 需要多步推理、条件分支、循环重试,简单的 Chain 搞不定
- 状态管理混乱 — 多步骤之间传参靠全局变量,出了 bug 无法重放
- 失败恢复困难 — Agent 跑到第 5 步挂了,只能从头来,浪费 Token 和时间
LangChain 团队推出的 LangGraph 正是为了解决这类问题——用有向图建模 Agent 工作流,自带状态持久化和断点续跑能力。
方案
LangGraph 的核心思想:把 Agent 工作流抽象为状态机图(StateGraph)。
- 节点(Node) = 一个处理步骤(调用 LLM、执行工具、数据校验等)
- 边(Edge) = 步骤之间的转移规则(包括条件分支)
- 状态(State) = 贯穿整个图的数据结构,每个节点读写同一份 State
相比传统的 LangChain Agent,LangGraph 提供了:
| 特性 | LangChain Agent | LangGraph |
|---|---|---|
| 流程控制 | 隐式(ReAct 循环) | 显式(图定义) |
| 状态管理 | 无 | TypedDict 全局状态 |
| 断点/重放 | 不支持 | 原生支持 |
| 并行执行 | 不支持 | 支持 Fan-out/Fan-in |
| 人工审批 | 需额外开发 | interrupt() 原生支持 |
实操步骤
第 1 步:安装与基础环境
pip install langgraph langchain-openai pydantic>=2.0
# 设置环境变量
export OPENAI_API_KEY="sk-xxx"
第 2 步:定义状态与基础图结构
以一个自动运维排障 Agent为例——接收告警,分析原因,执行修复,输出报告:
from typing import TypedDict, Literal, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
# 定义全局状态
class OpsState(TypedDict):
alert_info: str # 原始告警
diagnosis: str # 诊断结果
action_plan: str # 修复方案
execution_result: str # 执行结果
messages: Annotated[list, add_messages] # 对话历史
retry_count: int # 重试计数
llm = ChatOpenAI(model="gpt-4o", temperature=0)
第 3 步:实现核心模式
模式 1 — 条件分支(Router)
def diagnose(state: OpsState) -> dict:
"""分析告警,判断问题类型"""
prompt = f"分析以下运维告警,判断属于 cpu/memory/disk/network 哪类问题:\n{state['alert_info']}"
resp = llm.invoke(prompt)
return {"diagnosis": resp.content}
def route_by_diagnosis(state: OpsState) -> Literal["fix_cpu", "fix_memory", "fix_disk", "escalate"]:
"""条件路由:根据诊断结果走不同修复分支"""
d = state["diagnosis"].lower()
if "cpu" in d:
return "fix_cpu"
elif "memory" in d:
return "fix_memory"
elif "disk" in d:
return "fix_disk"
else:
return "escalate" # 无法自动处理,升级人工
模式 2 — 循环重试(Retry Loop)
def execute_fix(state: OpsState) -> dict:
"""执行修复命令并检查结果"""
# 实际场景中这里调用 SSH/K8s API 执行命令
result = run_remote_command(state["action_plan"])
return {
"execution_result": result,
"retry_count": state.get("retry_count", 0) + 1
}
def should_retry(state: OpsState) -> Literal["retry", "report"]:
"""检查是否需要重试"""
if "success" in state["execution_result"].lower():
return "report"
if state["retry_count"] >= 3:
return "report" # 超过重试上限,生成报告
return "retry"
模式 3 — 人工审批中断(Human-in-the-Loop)
from langgraph.types import interrupt
def human_approval(state: OpsState) -> dict:
"""高危操作前暂停,等待人工确认"""
decision = interrupt({
"question": f"即将执行高危操作:{state['action_plan']},是否批准?",
"options": ["approve", "reject", "modify"]
})
if decision == "reject":
return {"execution_result": "人工拒绝执行"}
return state
第 4 步:组装完整图并运行
from langgraph.checkpoint.memory import MemorySaver
# 构建图
graph = StateGraph(OpsState)
# 添加节点
graph.add_node("diagnose", diagnose)
graph.add_node("fix_cpu", fix_cpu_handler)
graph.add_node("fix_memory", fix_memory_handler)
graph.add_node("fix_disk", fix_disk_handler)
graph.add_node("escalate", escalate_handler)
graph.add_node("human_approval", human_approval)
graph.add_node("execute", execute_fix)
graph.add_node("report", generate_report)
# 定义边
graph.add_edge(START, "diagnose")
graph.add_conditional_edges("diagnose", route_by_diagnosis)
graph.add_edge("fix_cpu", "human_approval")
graph.add_edge("fix_memory", "human_approval")
graph.add_edge("fix_disk", "human_approval")
graph.add_edge("human_approval", "execute")
graph.add_conditional_edges("execute", should_retry, {
"retry": "execute",
"report": "report"
})
graph.add_edge("report", END)
graph.add_edge("escalate", END)
# 编译(带持久化)
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
# 运行
config = {"configurable": {"thread_id": "alert-001"}}
result = app.invoke(
{"alert_info": "Node cpu-worker-3 CPU 使用率持续 >95% 超过 10 分钟"},
config=config
)
print(result["execution_result"])
避坑
坑 1:State 字段未初始化导致 KeyError
LangGraph 的 State 是 TypedDict,不会自动填充默认值。每个节点访问字段前要用 state.get("field", default) 防御:
# ❌ 直接访问可能报错
count = state["retry_count"]
# ✅ 安全访问
count = state.get("retry_count", 0)
坑 2:条件边返回值与节点名不匹配
add_conditional_edges 的路由函数返回值必须是已注册节点的名字,拼写错误不会报编译错误,只会在运行时抛出 ValueError。建议用 Literal 类型约束:
def route(state) -> Literal["node_a", "node_b"]: # IDE 可检查拼写
...
坑 3:MemorySaver 仅适合开发环境
MemorySaver 是内存 checkpointer,重启进程数据丢失。生产环境用持久化方案:
# 生产推荐:PostgreSQL checkpointer
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
checkpointer = AsyncPostgresSaver.from_conn_string("postgresql://...")
总结
- LangGraph 适合需要多步推理、条件分支、失败重试、人工审批的 Agent 场景
- 三个核心模式:Router(条件路由)+ Retry Loop(循环重试)+ Human-in-the-Loop(人工中断),覆盖 90% 的生产 Agent 编排需求
- 状态持久化 + 断点续跑是 LangGraph 区别于普通 Chain 的核心优势,对运维自动化场景尤其关键
- 生产部署务必替换 MemorySaver 为 PostgreSQL/Redis checkpointer,并为图添加超时和最大重试限制