Skip to content

LangGraph — 有状态多步骤工作流

简介

LangGraph 是 LangChain 团队推出的有状态图执行框架,用于构建复杂的多步骤 Agent 工作流,支持循环、条件分支、人工介入(Human-in-the-loop)。

bash
pip install langgraph

核心概念

  • State:贯穿整个图的共享状态(TypedDict)
  • Node:处理状态的函数节点
  • Edge:节点间的连接,支持条件路由
  • Graph:由节点和边组成的有向图

快速开始:简单 Agent

python
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, Annotated, List
import operator

# 定义状态
class AgentState(TypedDict):
    messages: Annotated[List, operator.add]  # 消息列表,自动追加
    next_step: str

llm = ChatOpenAI(model="qwen-plus", ...)

# 定义节点函数
def call_llm(state: AgentState) -> AgentState:
    """调用 LLM 节点"""
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

def should_continue(state: AgentState) -> str:
    """条件边:决定下一步"""
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "use_tool"
    return END

# 构建图
workflow = StateGraph(AgentState)
workflow.add_node("llm", call_llm)
workflow.set_entry_point("llm")
workflow.add_conditional_edges("llm", should_continue, {
    "use_tool": "tool_node",
    END: END
})

app = workflow.compile()

# 运行
result = app.invoke({
    "messages": [HumanMessage(content="分析招商银行的投资价值")]
})
print(result["messages"][-1].content)

金融 Agent 完整示例

python
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool
from typing import TypedDict, Annotated, List
import operator

# 工具定义
@tool
def get_stock_info(symbol: str) -> dict:
    """获取股票基本信息"""
    return {"symbol": symbol, "pe_ratio": 8.5, "pb_ratio": 0.9, "dividend": "3.2%"}

@tool
def get_financial_report(company: str, year: int) -> dict:
    """获取财务报告摘要"""
    return {
        "revenue": "3420亿",
        "net_profit": "1466亿",
        "npl_ratio": "0.95%",
        "roe": "16.2%"
    }

@tool
def risk_assessment(data: str) -> str:
    """进行风险评估"""
    return "综合评估:低风险,建议关注"

tools = [get_stock_info, get_financial_report, risk_assessment]
tool_node = ToolNode(tools)

llm_with_tools = ChatOpenAI(model="qwen-plus", ...).bind_tools(tools)

# 状态定义
class AnalysisState(TypedDict):
    messages: Annotated[List, operator.add]
    company: str
    analysis_result: str

def analyst_node(state: AnalysisState):
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def should_use_tools(state: AnalysisState) -> str:
    last = state["messages"][-1]
    if hasattr(last, "tool_calls") and last.tool_calls:
        return "tools"
    return "end"

# 构建分析工作流
graph = StateGraph(AnalysisState)
graph.add_node("analyst", analyst_node)
graph.add_node("tools", tool_node)

graph.set_entry_point("analyst")
graph.add_conditional_edges("analyst", should_use_tools, {
    "tools": "tools",
    "end": END
})
graph.add_edge("tools", "analyst")  # 工具执行后回到分析节点

app = graph.compile()

result = app.invoke({
    "messages": [HumanMessage(content="分析招商银行(600036)的投资价值,获取股票信息和财务报告")],
    "company": "招商银行",
    "analysis_result": ""
})

持久化(Checkpointing)

python
from langgraph.checkpoint.sqlite import SqliteSaver

# 使用 SQLite 持久化状态(支持断点续传)
with SqliteSaver.from_conn_string("checkpoints.db") as memory:
    app = graph.compile(checkpointer=memory)
    
    config = {"configurable": {"thread_id": "analysis_001"}}
    
    # 第一次运行
    result = app.invoke(
        {"messages": [HumanMessage(content="分析招商银行")]},
        config=config
    )
    
    # 继续同一会话
    result = app.invoke(
        {"messages": [HumanMessage(content="再分析一下平安银行")]},
        config=config
    )

Human-in-the-Loop(人工审核)

python
from langgraph.graph import StateGraph, END
import json

def review_node(state):
    """需要人工审核的节点"""
    # 暂停执行,等待人工输入
    print("⚠️  需要人工审核以下内容:")
    print(json.dumps(state.get("draft_report", {}), ensure_ascii=False, indent=2))
    return state

# 在 compile 时设置中断点
app = graph.compile(
    checkpointer=memory,
    interrupt_before=["review_node"]  # 在此节点前暂停
)

# 运行到中断点
result = app.invoke(initial_state, config=config)

# 人工审核后继续
human_feedback = {"approved": True, "comments": "报告内容准确"}
app.update_state(config, {"human_feedback": human_feedback})
final_result = app.invoke(None, config=config)  # 从断点继续

LangGraph vs LangChain

  • LangChain:适合线性流程,prompt → llm → output
  • LangGraph:适合有循环、条件分支、多 Agent 协作的复杂工作流
  • 金融 Agent 场景(多步骤审批、循环优化报告)推荐使用 LangGraph

本站内容由 褚成志 整理编写,仅供学习参考