饮墨

子安饮墨馀三斗,留与卿儿作赋来

用 LangGraph 编排 AI Agent 工作流:3 个核心模式 + 生产落地实战

痛点

你在生产环境跑 AI Agent,早晚会遇到这些问题:

  1. 单轮对话不够用 — Agent 需要多步推理、条件分支、循环重试,简单的 Chain 搞不定
  2. 状态管理混乱 — 多步骤之间传参靠全局变量,出了 bug 无法重放
  3. 失败恢复困难 — Agent 跑到第 5 步挂了,只能从头来,浪费 Token 和时间

LangChain 团队推出的 LangGraph 正是为了解决这类问题——用有向图建模 Agent 工作流,自带状态持久化和断点续跑能力。

方案

LangGraph 的核心思想:把 Agent 工作流抽象为状态机图(StateGraph)

  • 节点(Node) = 一个处理步骤(调用 LLM、执行工具、数据校验等)
  • 边(Edge) = 步骤之间的转移规则(包括条件分支)
  • 状态(State) = 贯穿整个图的数据结构,每个节点读写同一份 State

相比传统的 LangChain Agent,LangGraph 提供了:

特性 LangChain Agent LangGraph
流程控制 隐式(ReAct 循环) 显式(图定义)
状态管理 TypedDict 全局状态
断点/重放 不支持 原生支持
并行执行 不支持 支持 Fan-out/Fan-in
人工审批 需额外开发 interrupt() 原生支持

实操步骤

第 1 步:安装与基础环境

pip install langgraph langchain-openai pydantic>=2.0
# 设置环境变量
export OPENAI_API_KEY="sk-xxx"

第 2 步:定义状态与基础图结构

以一个自动运维排障 Agent为例——接收告警,分析原因,执行修复,输出报告:

from typing import TypedDict, Literal, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI

# 定义全局状态
class OpsState(TypedDict):
    alert_info: str                    # 原始告警
    diagnosis: str                     # 诊断结果
    action_plan: str                   # 修复方案
    execution_result: str              # 执行结果
    messages: Annotated[list, add_messages]  # 对话历史
    retry_count: int                   # 重试计数

llm = ChatOpenAI(model="gpt-4o", temperature=0)

第 3 步:实现核心模式

模式 1 — 条件分支(Router)

def diagnose(state: OpsState) -> dict:
    """分析告警,判断问题类型"""
    prompt = f"分析以下运维告警,判断属于 cpu/memory/disk/network 哪类问题:\n{state['alert_info']}"
    resp = llm.invoke(prompt)
    return {"diagnosis": resp.content}

def route_by_diagnosis(state: OpsState) -> Literal["fix_cpu", "fix_memory", "fix_disk", "escalate"]:
    """条件路由:根据诊断结果走不同修复分支"""
    d = state["diagnosis"].lower()
    if "cpu" in d:
        return "fix_cpu"
    elif "memory" in d:
        return "fix_memory"
    elif "disk" in d:
        return "fix_disk"
    else:
        return "escalate"  # 无法自动处理,升级人工

模式 2 — 循环重试(Retry Loop)

def execute_fix(state: OpsState) -> dict:
    """执行修复命令并检查结果"""
    # 实际场景中这里调用 SSH/K8s API 执行命令
    result = run_remote_command(state["action_plan"])
    return {
        "execution_result": result,
        "retry_count": state.get("retry_count", 0) + 1
    }

def should_retry(state: OpsState) -> Literal["retry", "report"]:
    """检查是否需要重试"""
    if "success" in state["execution_result"].lower():
        return "report"
    if state["retry_count"] >= 3:
        return "report"  # 超过重试上限,生成报告
    return "retry"

模式 3 — 人工审批中断(Human-in-the-Loop)

from langgraph.types import interrupt

def human_approval(state: OpsState) -> dict:
    """高危操作前暂停,等待人工确认"""
    decision = interrupt({
        "question": f"即将执行高危操作:{state['action_plan']},是否批准?",
        "options": ["approve", "reject", "modify"]
    })
    if decision == "reject":
        return {"execution_result": "人工拒绝执行"}
    return state

第 4 步:组装完整图并运行

from langgraph.checkpoint.memory import MemorySaver

# 构建图
graph = StateGraph(OpsState)

# 添加节点
graph.add_node("diagnose", diagnose)
graph.add_node("fix_cpu", fix_cpu_handler)
graph.add_node("fix_memory", fix_memory_handler)
graph.add_node("fix_disk", fix_disk_handler)
graph.add_node("escalate", escalate_handler)
graph.add_node("human_approval", human_approval)
graph.add_node("execute", execute_fix)
graph.add_node("report", generate_report)

# 定义边
graph.add_edge(START, "diagnose")
graph.add_conditional_edges("diagnose", route_by_diagnosis)
graph.add_edge("fix_cpu", "human_approval")
graph.add_edge("fix_memory", "human_approval")
graph.add_edge("fix_disk", "human_approval")
graph.add_edge("human_approval", "execute")
graph.add_conditional_edges("execute", should_retry, {
    "retry": "execute",
    "report": "report"
})
graph.add_edge("report", END)
graph.add_edge("escalate", END)

# 编译(带持久化)
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)

# 运行
config = {"configurable": {"thread_id": "alert-001"}}
result = app.invoke(
    {"alert_info": "Node cpu-worker-3 CPU 使用率持续 >95% 超过 10 分钟"},
    config=config
)
print(result["execution_result"])

避坑

坑 1:State 字段未初始化导致 KeyError

LangGraph 的 State 是 TypedDict,不会自动填充默认值。每个节点访问字段前要用 state.get("field", default) 防御:

# ❌ 直接访问可能报错
count = state["retry_count"]

# ✅ 安全访问
count = state.get("retry_count", 0)

坑 2:条件边返回值与节点名不匹配

add_conditional_edges 的路由函数返回值必须是已注册节点的名字,拼写错误不会报编译错误,只会在运行时抛出 ValueError。建议用 Literal 类型约束:

def route(state) -> Literal["node_a", "node_b"]:  # IDE 可检查拼写
    ...

坑 3:MemorySaver 仅适合开发环境

MemorySaver 是内存 checkpointer,重启进程数据丢失。生产环境用持久化方案:

# 生产推荐:PostgreSQL checkpointer
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
checkpointer = AsyncPostgresSaver.from_conn_string("postgresql://...")

总结

  • LangGraph 适合需要多步推理、条件分支、失败重试、人工审批的 Agent 场景
  • 三个核心模式:Router(条件路由)+ Retry Loop(循环重试)+ Human-in-the-Loop(人工中断),覆盖 90% 的生产 Agent 编排需求
  • 状态持久化 + 断点续跑是 LangGraph 区别于普通 Chain 的核心优势,对运维自动化场景尤其关键
  • 生产部署务必替换 MemorySaver 为 PostgreSQL/Redis checkpointer,并为图添加超时和最大重试限制
您还没有登录,请登录后发表评论。