第8章 智能体开发框架LangGraph-代码和提示词-林子雨编著《AI编程与智能体开发》

大数据学习路线图

林子雨编著《AI编程与智能体开发》(访问教材官网

8.3 LangGraph快速入门

8.3.2 环境准备

为了保证依赖管理清晰,实际开发中通常建议使用独立的Python虚拟环境,以免与其他实验冲突。推荐命令如下:

python -m venv langgraph-env  # 在当前目录下创建一个名字叫langgraph-env的独立 Python 环境,用来隔离项目依赖,不污染系统全局 Python
langgraph-env\Scripts\activate
pip install -U langgraph

8.3.3 第一个LangGraph应用

状态在整个工作流中流转,节点读取并更新状态,边则决定状态应流向哪个节点。三者各司其职、紧密配合。以下代码演示完整的 LangGraph构建过程:

# state_node_edge.py
from typing import TypedDict
from langgraph.graph import StateGraph, END

# 1. 定义 State:三个字段,各司其职
class State(TypedDict):
    messages: list[str]   # 用户输入的消息列表
    intent: str | None    # 由第一个节点识别出的意图
    response: str | None  # 由第二个节点生成的回复

# 2. 定义 Node:每个节点只做一件事,返回需要更新的字段
def classify_intent(state: State) -> dict:
    """读取最后一条消息,用关键词匹配识别意图"""
    text = state["messages"][-1] if state["messages"] else ""
    if any(kw in text for kw in ["计算", "多少", "分数", "成绩"]):
        intent = "calculation"
    elif any(kw in text for kw in ["规定", "政策", "要求"]):
        intent = "policy"
    else:
        intent = "knowledge"
    return {"intent": intent}   # 只更新 intent 字段,其余字段保持不变

def generate_response(state: State) -> dict:
    """读取识别出的意图,生成对应回复"""
    intent = state.get("intent", "knowledge")
    question = state["messages"][-1] if state["messages"] else ""
    if intent == "calculation":
        response = f"这是一道计算题,让我帮你算一下:{question}"
    elif intent == "policy":
        response = f"关于规定的问题,我来查一下:{question}"
    else:
        response = f"关于您的问题「{question}」,这是我的解答..."
    return {"response": response}

# 3. 构建工作流:先注册节点,再连接边
workflow = StateGraph(State)
workflow.add_node("classify", classify_intent)
workflow.add_node("respond", generate_response)

# 4. 定义边:固定顺序,classify → respond → END
workflow.add_edge("__start__", "classify")
workflow.add_edge("classify", "respond")
workflow.add_edge("respond", END)

# 5. 编译为可执行应用
app = workflow.compile()

# 6. 运行测试
result = app.invoke({"messages": ["期末考试成绩怎么计算?"], "intent": None, "response": None})
print(f"意图: {result['intent']}")    # 输出: 意图: calculation
print(f"回复: {result['response']}")  # 输出: 回复: 这是一道计算题,让我帮你算一下:期末考试成绩怎么计算?

8.4 LangGraph的条件边与条件节点

8.4.1 条件边的工作原理

路由函数必须是纯函数(Pure Function),即只根据输入状态决定输出,不产生副作用。下面是智学助手最基础的两路分支的例子:

# two_edges.py
from typing import TypedDict
from langgraph.graph import StateGraph, END

class AssistantState(TypedDict):
    question: str
    intent: str | None
    answer: str | None

# 节点:处理知识问答
def knowledge_node(state: AssistantState) -> dict:
    return {"answer": f"关于「{state['question']}」,这是我的解答..."}

# 节点:处理计算问题
def calc_node(state: AssistantState) -> dict:
    return {"answer": f"让我帮您计算:{state['question']}"}

# 识别意图的节点
def classify_node(state: AssistantState) -> dict:
    q = state["question"]
    if any(kw in q for kw in ["计算", "多少", "分数"]):
        return {"intent": "calculation"}
    return {"intent": "knowledge"}

# 路由函数:根据意图决定走哪个节点
def route_by_intent(state: AssistantState) -> str:
    intent = state.get("intent", "knowledge")
    if intent == "calculation":
        return "calc_node"
    return "knowledge_node"

workflow = StateGraph(AssistantState)
workflow.add_node("classify", classify_node)
workflow.add_node("knowledge", knowledge_node)
workflow.add_node("calc", calc_node)

workflow.add_edge("__start__", "classify")
workflow.add_conditional_edges(
    "classify",           # 从 classify 节点出发
    route_by_intent,      # 路由函数
    {
        "knowledge_node": "knowledge",   # 返回 "knowledge_node" → 执行 knowledge 节点
        "calc_node": "calc",             # 返回 "calc_node" → 执行 calc 节点
    }
)
workflow.add_edge("knowledge", END)
workflow.add_edge("calc", END)

app = workflow.compile()

result = app.invoke({"question": "期末成绩怎么计算?", "intent": None, "answer": None})
print(result["answer"])  # 输出: 让我帮您计算:期末成绩怎么计算?

8.4.2 多条件分支

在上一节代码基础上,这里新增第三条分支:

# three_edges.py
from typing import TypedDict
from langgraph.graph import StateGraph, END

class AssistantState(TypedDict):
    question: str
    intent: str | None
    sentiment: str | None   # 新增:用户情绪
    answer: str | None

# 新增节点:情绪安抚
def empathy_node(state: AssistantState) -> dict:
    return {"answer": "非常抱歉给您带来了不好的体验。我已记录您的反馈,会尽快跟进处理。"}

# 节点:处理知识问答
def knowledge_node(state: AssistantState) -> dict:
    return {"answer": f"关于「{state['question']}」,这是我的解答..."}

# 节点:处理计算问题
def calc_node(state: AssistantState) -> dict:
    return {"answer": f"让我帮您计算:{state['question']}"}

# 识别意图和情绪
def classify_node(state: AssistantState) -> dict:
    q = state["question"]
    if any(kw in q for kw in ["计算", "多少", "分数"]):
        return {"intent": "calculation", "sentiment": "neutral"}
    elif any(kw in q for kw in ["投诉", "不满", "太差了"]):
        return {"intent": "complaint", "sentiment": "negative"}
    return {"intent": "knowledge", "sentiment": "neutral"}

# 路由函数:同时看意图和情绪
def route_by_intent(state: AssistantState) -> str:
    intent = state.get("intent", "knowledge")
    sentiment = state.get("sentiment", "neutral")
    if intent == "complaint" and sentiment == "negative":
        return "empathy"        # 投诉且情绪负面,先安抚
    if intent == "calculation":
        return "calc"
    return "knowledge"

workflow = StateGraph(AssistantState)
workflow.add_node("empathy", empathy_node)
workflow.add_node("classify", classify_node)
workflow.add_node("knowledge", knowledge_node)
workflow.add_node("calc", calc_node)
workflow.add_edge("__start__", "classify")
workflow.add_conditional_edges(
    "classify",
    route_by_intent,
    {
        "empathy": "empathy",
        "calc": "calc",
        "knowledge": "knowledge",
    }
)
workflow.add_edge("empathy", END)
workflow.add_edge("knowledge", END)
workflow.add_edge("calc", END)
app = workflow.compile()
result = app.invoke({"question": "我投诉酒店房间卫生太差了?", "intent": None, "answer": None})
print(result["answer"])  # 输出: 非常抱歉给您带来了不好的体验。我已记录您的反馈,会尽快跟进处理。

8.4.3 LLM驱动的路由

在上一节代码基础上,这里把意图识别方式修改为由LLM判断意图:

# llm_intent.py
from typing import TypedDict
from langgraph.graph import StateGraph, END
from langchain_ollama import ChatOllama
from langchain_core.messages import SystemMessage, HumanMessage
import os

class AssistantState(TypedDict):
    question: str
    intent: str | None
    sentiment: str | None   # 新增:用户情绪
    answer: str | None  

llm = ChatOllama(
    model=os.getenv("OLLAMA_MODEL", "gemma4:e4b"),
    base_url=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"),
    temperature=0
)

# 节点:情绪安抚
def empathy_node(state: AssistantState) -> dict:
    return {"answer": "非常抱歉给您带来了不好的体验。我已记录您的反馈,会尽快跟进处理。"}
# 节点:处理知识问答
def knowledge_node(state: AssistantState) -> dict:
    return {"answer": f"关于「{state['question']}」,这是我的解答..."}
# 节点:处理计算问题
def calc_node(state: AssistantState) -> dict:
    return {"answer": f"让我帮您计算:{state['question']}"}

# classify_node 改为调用 LLM 判断意图
def classify_node(state: AssistantState) -> dict:
    response = llm.invoke([
        SystemMessage(content="""分析学生的问题,返回以下意图之一(只返回单词,不要解释):
knowledge(知识问答)、calculation(计算问题)、empathy(投诉反馈)"""),
        HumanMessage(content=state["question"])
    ])
    intent = response.content.strip().lower()   # llm.invoke() 返回 AIMessage,.content 取文本
    # 防御:LLM 返回意外值时回退到默认
    valid_intents = {"knowledge", "calculation", "empathy"}
    return {"intent": intent if intent in valid_intents else "knowledge"}

# 路由函数,只读取state["intent"],因为LLM 的判断结果已经写进了状态
def route_by_intent(state: AssistantState) -> str:
    intent = state.get("intent", "knowledge")
    return intent   # 直接返回意图字符串,映射表用相同的 key

workflow = StateGraph(AssistantState)
workflow.add_node("empathy", empathy_node)
workflow.add_node("classify", classify_node)
workflow.add_node("knowledge", knowledge_node)
workflow.add_node("calc", calc_node)
workflow.add_edge("__start__", "classify")
workflow.add_conditional_edges(
    "classify",
    route_by_intent,
    {
        "empathy": "empathy",
        "calc": "calc",
        "knowledge": "knowledge",
    }
)
workflow.add_edge("empathy", END)
workflow.add_edge("knowledge", END)
workflow.add_edge("calc", END)
app = workflow.compile()
result = app.invoke({"question": "这个西瓜怎么这么难吃", "intent": None, "answer": None})
print(result["answer"])  # 输出: 非常抱歉给您带来了不好的体验。我已记录您的反馈,会尽快跟进处理。

8.4.5 条件边的特殊用法:循环

条件边不仅可以向前分支,还可以指回已有节点,形成循环。智学助手的“反复追问”场景就需要这种结构——如果学生的问题描述不清楚,就循环要求补充,直到信息足够才继续处理。

# condition_edge_cycle.py
from typing import TypedDict
from langgraph.graph import StateGraph, END

class ClarifyState(TypedDict):
    question: str
    clarify_count: int    # 记录已追问次数,防止无限循环
    is_clear: bool        # 问题是否已清晰
    answer: str | None

def check_clarity(state: ClarifyState) -> dict:
    """判断问题是否足够清晰(这里用关键词判断,实际可用LLM来判断)"""
    q = state["question"]
    is_clear = len(q) > 10 and not q.endswith("?")  # 简化判断:长度足够且不是模糊问法
    return {"is_clear": is_clear, "clarify_count": state.get("clarify_count", 0)}

def ask_clarify(state: ClarifyState) -> dict:
    """追问节点:提示用户补充信息"""
    count = state.get("clarify_count", 0) + 1
    print(f"[第{count}次追问] 您的问题还不够具体,请补充更多细节。")
    return {"clarify_count": count}

def answer_node(state: ClarifyState) -> dict:
    if state.get("clarify_count", 0) >= 3:
        return {"answer": "非常抱歉,我已经无法进一步追问了,请重新描述您的问题。"}
    return {"answer": f"关于「{state['question']}」,这是我的解答..."}

def should_clarify(state: ClarifyState) -> str:
    if state["is_clear"] or state.get("clarify_count", 0) >= 3:
        return "answer"   # 问题清晰,或追问次数达到上限,继续处理
    return "clarify"      # 问题不清晰,再次追问

workflow = StateGraph(ClarifyState)
workflow.add_node("check", check_clarity)
workflow.add_node("clarify", ask_clarify)
workflow.add_node("answer", answer_node)
workflow.add_edge("__start__", "check")
workflow.add_conditional_edges("check", should_clarify, {
    "clarify": "clarify",
    "answer": "answer",
})
workflow.add_edge("clarify", "check")   # 追问后回到检查节点,形成循环
workflow.add_edge("answer", END)
app = workflow.compile()
result = app.invoke({"question": "这门课程为什么不及格?", "clarify_count": 0, "is_clear": False, "answer": None})
print(result["answer"])

8.5 LangGraph的工具调用

下面是一个完整代码示例:

# langgraph_toolnode_demo.py

from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_ollama import ChatOllama
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode, tools_condition

# 1. 用 LangChain 的 @tool 定义工具
@tool
def query_course_notice(topic: str) -> str:
    """根据主题查询课程通知。topic 可以是:实验报告、课程答疑、期末复习。"""
    notices = {
        "实验报告": "实验报告需在第 10 周周五 18:00 前提交到课程平台。",
        "课程答疑": "课程答疑安排在每周三 19:00-20:00,地点为线上会议室。",
        "期末复习": "期末复习资料将在第 16 周发布,以课程平台通知为准。",
    }
    return notices.get(topic, "未查询到该主题的课程通知。")

@tool
def calculate_final_score(homework: float, experiment: float, exam: float) -> str:
    """按照课程规则计算总评成绩:作业 30%,实验 30%,期末 40%。"""
    score = homework * 0.3 + experiment * 0.3 + exam * 0.4
    return f"总评成绩为 {score:.1f} 分。"

tools = [query_course_notice, calculate_final_score]

# 2. 把工具绑定给大模型
llm = ChatOllama(
        model="gemma4:e4b",
        temperature=0,
        base_url="http://127.0.0.1:11434",
    )

llm_with_tools = llm.bind_tools(tools)

# 3. 定义 agent 节点:模型决定是否调用工具
def agent_node(state: MessagesState):
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

# 4. 用 ToolNode 包装工具
tool_node = ToolNode(tools)

# 5. 构建 LangGraph
builder = StateGraph(MessagesState)

builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)

builder.add_edge(START, "agent")

# agent 运行后:
# 如果模型返回 tool_calls,则进入 tools 节点;
# 如果模型没有返回 tool_calls,则结束。
builder.add_conditional_edges(
    "agent",
    tools_condition,
    {
        "tools": "tools",
        END: END,
    },
)

# 工具执行完后,把 ToolMessage 写回 messages,再回到 agent
builder.add_edge("tools", "agent")

graph = builder.compile()

if __name__ == "__main__":
    result = graph.invoke(
        {
            "messages": [
                HumanMessage(content="请查询实验报告什么时候提交。")
            ]
        }
    )

    for message in result["messages"]:
        print(message.type, ":", message.content)

8.6 LangGraph的状态和持久化

8.6.2 使用MessagesState保存对话历史

下面的示例构建一个最小聊天图。图中只有一个chatbot节点,该节点读取当前messages,调用本地Ollama模型生成回复,再把模型回复写回messages。由于编译时传入了InMemorySaver,同一个 thread_id下的后续调用可以读取前一次调用保存的消息历史。具体代码如下:

# langgraph_conversation_memory.py
from dotenv import load_dotenv
from langchain_ollama import ChatOllama
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, MessagesState, StateGraph
import os

load_dotenv()

llm = ChatOllama(
    model=os.getenv("OLLAMA_MODEL", "gemma4:e4b"),
    base_url=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"),
    temperature=0.2,
)

def chatbot(state: MessagesState):
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)

memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "student-001"}}

result1 = graph.invoke(
    {"messages": [{"role": "user", "content": "我叫小林,正在学习LangGraph。"}]},
    config=config,
)
print(result1["messages"][-1].content)

result2 = graph.invoke(
    {"messages": [{"role": "user", "content": "我正在学习什么?"}]},
    config=config,
)
print(result2["messages"][-1].content)

8.6.3 使用自定义状态保存会话信息

下面的示例在状态中同时保存messages和course两个字段。remember_course节点从用户输入中提取课程名称,answer节点再利用状态中的课程名称生成回答。这个例子说明,LangGraph的状态不仅可以保存对话历史,也可以保存业务字段。具体代码如下:

# langgraph_custom_state_memory.py
from typing import Annotated, TypedDict

from langchain_core.messages import BaseMessage
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.graph.message import add_messages

class CourseState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    course: str

def remember_course(state: CourseState):
    last_message = state["messages"][-1].content
    if "数据库" in last_message:
        return {"course": "数据库系统"}
    if "人工智能" in last_message:
        return {"course": "人工智能导论"}
    return {}

def answer(state: CourseState):
    course = state.get("course", "当前课程")
    return {
        "messages": [
            {
                "role": "assistant",
                "content": f"我会结合《{course}》这门课来回答你的问题。"
            }
        ]
    }

builder = StateGraph(CourseState)
builder.add_node("remember_course", remember_course)
builder.add_node("answer", answer)
builder.add_edge(START, "remember_course")
builder.add_edge("remember_course", "answer")
builder.add_edge("answer", END)

graph = builder.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": "course-thread-001"}}

result = graph.invoke(
    {
        "messages": [{"role": "user", "content": "我正在学习数据库,请帮我复习索引。"}],
        "course": "",
    },
    config=config,
)
print(result["messages"][-1].content)
print("当前课程:", result["course"])

8.6.4 使用SQLite实现检查点持久化

使用 SQLite 检查点前,需要安装额外依赖:

pip install langgraph-checkpoint-sqlite

下面的示例与 8.5.2节基本相同,只是把InMemorySaver换成了SqliteSaver。第一次运行程序时,状态会写入langgraph_state.db;再次运行程序时,只要使用相同数据库文件和相同 thread_id,就可以继续读取该线程的历史状态。具体代码如下:

# langgraph_sqlite_checkpoint.py
from dotenv import load_dotenv
from langchain_ollama import ChatOllama
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import START, END, MessagesState, StateGraph
import os

load_dotenv()

llm = ChatOllama(
    model=os.getenv("OLLAMA_MODEL", "gemma4:e4b"),
    base_url=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"),
    temperature=0.2,
)

def chatbot(state: MessagesState):
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)

config = {"configurable": {"thread_id": "student-001"}}

with SqliteSaver.from_conn_string("langgraph_state.db") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

    result = graph.invoke(
        {"messages": [{"role": "user", "content": "请记住:我的实验题目是课程问答助手。"}]},
        config=config,
    )
    print(result["messages"][-1].content)

    result = graph.invoke(
        {"messages": [{"role": "user", "content": "我的实验题目是什么?"}]},
        config=config,
    )
    print(result["messages"][-1].content)

8.7 LangGraph编程实践

8.7.1 课程助教智能体案例(基础版)

# course-agent.py
from operator import add
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, interrupt

class AssistantState(TypedDict):
    question: str  #学生问题
    category: str  # 问题分类,routine / complex
    retrieved_docs: list[str]  # 检索到的知识库文档
    draft_answer: str  # 草稿答案
    need_human_review: bool  # 是否需要人工审核
    review_result: dict  # 教师审核结果
    final_answer: str  # 最终答案
    logs: Annotated[list[str], add]  #工作流日志(自动累加)

RULES = {
    "作业": [
        "作业须在截止时间前提交到课程平台。",
        "逾期提交按照课程说明扣分。"
    ],
    "调课": [
        "调课通知以课程群和教学平台公告为准。"
    ],
    "补考": [
        "补考申请由学院教学办公室统一处理。",
        "需在规定时间内提交材料。"
    ],
}

def classify_question(state: AssistantState):
    q = state["question"]
    if any(k in q for k in ["补考", "申诉", "成绩", "特殊情况"]):
        category = "complex"
    else:
        category = "routine"
    return {
        "category": category,
        "logs": [f"classify:{category}"]
    }

def retrieve_knowledge(state: AssistantState):
    q = state["question"]
    docs = []
    for keyword, values in RULES.items():
        if keyword in q:
            docs.extend(values)
    return {
        "retrieved_docs": docs,
        "logs": [f"retrieve:{len(docs)} docs"]
    }

def draft_answer(state: AssistantState):
    docs = state.get("retrieved_docs", [])
    if docs:
        draft = ";".join(docs)
    else:
        draft = "当前规则库中未找到直接答案,请教师进一步确认。"

    need_review = (state["category"] == "complex") or (len(docs) == 0)
    return {
        "draft_answer": draft,
        "need_human_review": need_review,
        "logs": [f"draft:review={need_review}"]
    }

def route_after_draft(state: AssistantState) -> Literal["teacher_review", "finalize_answer"]:
    if state["need_human_review"]:
        return "teacher_review"
    return "finalize_answer"

def teacher_review(state: AssistantState):
    payload = {
        "instruction": "请教师审核该复杂问题的答复草稿",
        "question": state["question"],
        "draft": state["draft_answer"],
        "docs": state["retrieved_docs"],
    }
    review = interrupt(payload)
    return {
        "review_result": review,
        "logs": ["review:completed"]
    }

def finalize_answer(state: AssistantState):
    review = state.get("review_result", {})
    if review and review.get("edited_answer"):
        answer = review["edited_answer"]
    else:
        answer = state["draft_answer"]

    return {
        "final_answer": answer,
        "logs": [f"final:{answer[:18]}"]
    }

builder = StateGraph(AssistantState)
builder.add_node("classify_question", classify_question)
builder.add_node("retrieve_knowledge", retrieve_knowledge)
builder.add_node("draft_answer", draft_answer)
builder.add_node("teacher_review", teacher_review)
builder.add_node("finalize_answer", finalize_answer)

builder.add_edge(START, "classify_question")
builder.add_edge("classify_question", "retrieve_knowledge")
builder.add_edge("retrieve_knowledge", "draft_answer")
builder.add_conditional_edges("draft_answer", route_after_draft)
builder.add_edge("teacher_review", "finalize_answer")
builder.add_edge("finalize_answer", END)

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "course-demo-001"}}

initial = graph.invoke(
    {"question": "老师,我想申请补考,流程是什么?", "logs": []},
    config=config,
)
print(initial["__interrupt__"])

final_state = graph.invoke(
    Command(
        resume={
            "approved": True,
            "edited_answer": "请在教务系统提交补考申请,并于三日内补充证明材料。"
        }
    ),
    config=config,
)
print(final_state["final_answer"])

8.7.2 课程助教智能体案例(高级版)

首先,安装 Ollama,并确保本机可以通过命令行正常调用。如果 Ollama 服务尚未启动,可以在Windows系统的cmd命令行界面中执行以下命令启动Ollama:

ollama serve

然后,下载回答模型和嵌入模型。若机器显存或内存较充足,可以直接使用 gemma4;若是普通教学用笔记本,也可以改用较小规格的 gemma4 变体,例如 gemma4:e4b。嵌入模型的作用是把文本转化成向量,这里使用nomic-embed-text。具体安装命令如下:

ollama pull gemma4:e4b  # 下载回答模型
ollama pull nomic-embed-text  # 下载嵌入模型

模型下载完成后,可用下面的命令检查本地模型列表:

ollama list

建议在独立虚拟环境中完成安装,以免与其他实验冲突。推荐命令如下:

python -m venv langgraph-env  # 在当前目录下创建一个名字叫langgraph-env的独立 Python 环境,用来隔离项目依赖,不污染系统全局 Python
langgraph-env\Scripts\activate
pip install -U langgraph langchain langchain-ollama langchain-chroma chromadb langchain-text-splitters

接下来要把课程资料切分成多个文档片段,再用嵌入模型将它们写入 Chroma(一种轻量级向量数据库)。具体代码如下(可以从教材官网下载build_vector_db_ollama.py):

# build_vector_db_ollama.py
from pathlib import Path
import shutil
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_ollama import OllamaEmbeddings
from langchain_chroma import Chroma
BASE_DIR = Path(__file__).resolve().parent
SOURCE_FILE = BASE_DIR / 'course_docs_sample.txt'
PERSIST_DIR = BASE_DIR / 'chroma_db'
COLLECTION_NAME = 'course_rules'
EMBED_MODEL = 'nomic-embed-text'
def load_documents() -> list[Document]:
    text = SOURCE_FILE.read_text(encoding='utf-8')
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=120,
        chunk_overlap=20,
        separators=['\n', '。', ';', ',']
    )
    chunks = splitter.split_text(text)
    return [
        Document(page_content=chunk, metadata={'source': SOURCE_FILE.name, 'chunk_id': index})
        for index, chunk in enumerate(chunks)
    ]
def main() -> None:
    if not SOURCE_FILE.exists():
        raise FileNotFoundError(f'未找到课程资料文件: {SOURCE_FILE}')
    if PERSIST_DIR.exists():
        shutil.rmtree(PERSIST_DIR)
    documents = load_documents()
    embeddings = OllamaEmbeddings(model=EMBED_MODEL)
    Chroma.from_documents(
        documents=documents,
        embedding=embeddings,
        collection_name=COLLECTION_NAME,
        persist_directory=str(PERSIST_DIR),
    )
    print(f'已写入 {len(documents)} 个文档片段到 {PERSIST_DIR}')
    print(f'嵌入模型: {EMBED_MODEL}')
if __name__ == '__main__':
    main()

完成建库后,需要把原来基础版案例中的 retrieve_knowledge 和 draft_answer 节点改造为“向量检索 + Gemma4 生成”。具体代码如下( 可以到教材官网下载course_agent_ollama_rag.py):

# course_agent_ollama_rag.py
from operator import add
from pathlib import Path
from typing import Annotated
from typing_extensions import TypedDict
from langchain_chroma import Chroma
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, StateGraph
BASE_DIR = Path(__file__).resolve().parent
PERSIST_DIR = BASE_DIR / 'chroma_db'
COLLECTION_NAME = 'course_rules'
CHAT_MODEL = 'gemma4:e4b'
EMBED_MODEL = 'nomic-embed-text'
class RagState(TypedDict):
    question: str
    retrieved_docs: list[str]
    draft_answer: str
    final_answer: str
    logs: Annotated[list[str], add]
if not PERSIST_DIR.exists():
    raise FileNotFoundError(f'未找到向量库目录: {PERSIST_DIR}。请先运行 build_vector_db_ollama.py。')
llm = ChatOllama(model=CHAT_MODEL, temperature=0)
embeddings = OllamaEmbeddings(model=EMBED_MODEL)
vector_store = Chroma(
    collection_name=COLLECTION_NAME,
    persist_directory=str(PERSIST_DIR),
    embedding_function=embeddings,
)
def retrieve_knowledge(state: RagState):
    docs = vector_store.similarity_search(state['question'], k=3)
    contents = [doc.page_content for doc in docs]
    return {'retrieved_docs': contents, 'logs': [f'retrieve:{len(contents)} docs']}
SYSTEM_PROMPT = '''
你是《AI编程与智能体开发》课程助教。
请严格依据给定材料回答问题。
如果材料不足,请明确说“知识库中未找到充分依据”,不要编造。
回答应尽量简洁、准确、适合本科生阅读。
'''.strip()
def generate_answer(state: RagState):
    if not state['retrieved_docs']:
        return {
            'draft_answer': '知识库中未找到相关材料,请补充课程资料或转人工处理。',
            'logs': ['draft:no_docs']
        }
    context = '\n\n'.join(
        f'[材料{index + 1}] {text}'
        for index, text in enumerate(state['retrieved_docs'])
    )
    prompt = f'''{SYSTEM_PROMPT}
学生问题:
{state['question']}
参考材料:
{context}
请给出最终回答:'''
    answer = llm.invoke(prompt).content
    return {'draft_answer': answer, 'logs': ['draft:done']}
def finalize_answer(state: RagState):
    return {'final_answer': state['draft_answer'], 'logs': [f"final:{state['draft_answer'][:18]}"]}
builder = StateGraph(RagState)
builder.add_node('retrieve_knowledge', retrieve_knowledge)
builder.add_node('generate_answer', generate_answer)
builder.add_node('finalize_answer', finalize_answer)
builder.add_edge(START, 'retrieve_knowledge')
builder.add_edge('retrieve_knowledge', 'generate_answer')
builder.add_edge('generate_answer', 'finalize_answer')
builder.add_edge('finalize_answer', END)
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
def main() -> None:
    config = {'configurable': {'thread_id': 'local-rag-demo-001'}}
    result = graph.invoke({'question': '补考申请流程是什么?', 'logs': []}, config=config)
    print('检索结果:')
    for index, doc in enumerate(result['retrieved_docs'], start=1):
        print(f'{index}. {doc}')
    print('\n最终回答:')
    print(result['final_answer'])
if __name__ == '__main__':
    main()

实现思路并不复杂:在状态中增加 need_human_review 和 review_result 两个字段;在 generate_answer 节点中加入审核判定逻辑;然后新增 route_after_generate 和 teacher_review 两个节点,其中 teacher_review 通过 interrupt(payload) 暂停工作流,把问题、检索材料和草稿答案交给教师;教师审核后,再用 Command(resume=...) 在相同 thread_id 下恢复执行。具体代码如下( 可以从教材官网下载course_agent_ollama_rag_review.py):

# course_agent_ollama_rag_review.py
from operator import add
from pathlib import Path
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langchain_chroma import Chroma
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.types import Command, interrupt
BASE_DIR = Path(__file__).resolve().parent
PERSIST_DIR = BASE_DIR / 'chroma_db'
COLLECTION_NAME = 'course_rules'
CHAT_MODEL = 'gemma4:e4b'
EMBED_MODEL = 'nomic-embed-text'
SENSITIVE_KEYWORDS = ['补考', '成绩', '申诉', '特殊情况']
class ReviewRagState(TypedDict):
    question: str
    retrieved_docs: list[str]
    draft_answer: str
    need_human_review: bool
    review_result: dict
    final_answer: str
    logs: Annotated[list[str], add]
if not PERSIST_DIR.exists():
    raise FileNotFoundError(f'未找到向量库目录: {PERSIST_DIR}。请先运行 build_vector_db_ollama.py。')
llm = ChatOllama(model=CHAT_MODEL, temperature=0)
embeddings = OllamaEmbeddings(model=EMBED_MODEL)
vector_store = Chroma(
    collection_name=COLLECTION_NAME,
    persist_directory=str(PERSIST_DIR),
    embedding_function=embeddings,
)
def retrieve_knowledge(state: ReviewRagState):
    docs = vector_store.similarity_search(state['question'], k=3)
    contents = [doc.page_content for doc in docs]
    return {'retrieved_docs': contents, 'logs': [f'retrieve:{len(contents)} docs']}
SYSTEM_PROMPT = '''
你是《AI编程与智能体开发》课程助教。
请严格依据给定材料回答问题。
如果材料不足,请明确说“知识库中未找到充分依据”,不要编造。
回答应尽量简洁、准确、适合本科生阅读。
'''.strip()
def generate_answer(state: ReviewRagState):
    if not state['retrieved_docs']:
        return {
            'draft_answer': '知识库中未找到相关材料,请补充课程资料或转人工处理。',
            'need_human_review': True,
            'logs': ['draft:no_docs']
        }
    context = '\n\n'.join(
        f'[材料{index + 1}] {text}'
        for index, text in enumerate(state['retrieved_docs'])
    )
    prompt = f'''{SYSTEM_PROMPT}
学生问题:
{state['question']}
参考材料:
{context}
请给出最终回答:'''
    answer = llm.invoke(prompt).content
    need_review = any(keyword in state['question'] for keyword in SENSITIVE_KEYWORDS)
    if '知识库中未找到充分依据' in answer:
        need_review = True
    return {
        'draft_answer': answer,
        'need_human_review': need_review,
        'logs': [f'draft:review={need_review}']
    }
def route_after_generate(state: ReviewRagState) -> Literal['teacher_review', 'finalize_answer']:
    if state['need_human_review']:
        return 'teacher_review'
    return 'finalize_answer'
def teacher_review(state: ReviewRagState):
    payload = {
        'instruction': '请教师审核本地RAG课程助教的答复草稿',
        'question': state['question'],
        'draft_answer': state['draft_answer'],
        'retrieved_docs': state['retrieved_docs'],
    }
    review = interrupt(payload)
    return {'review_result': review, 'logs': ['review:completed']}
def finalize_answer(state: ReviewRagState):
    review = state.get('review_result', {})
    if review and review.get('edited_answer'):
        answer = review['edited_answer']
    else:
        answer = state['draft_answer']
    return {'final_answer': answer, 'logs': [f'final:{answer[:18]}']}
builder = StateGraph(ReviewRagState)
builder.add_node('retrieve_knowledge', retrieve_knowledge)
builder.add_node('generate_answer', generate_answer)
builder.add_node('teacher_review', teacher_review)
builder.add_node('finalize_answer', finalize_answer)
builder.add_edge(START, 'retrieve_knowledge')
builder.add_edge('retrieve_knowledge', 'generate_answer')
builder.add_conditional_edges('generate_answer', route_after_generate)
builder.add_edge('teacher_review', 'finalize_answer')
builder.add_edge('finalize_answer', END)
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
def main() -> None:
    config = {'configurable': {'thread_id': 'local-rag-review-demo-001'}}
    initial = graph.invoke({'question': '补考申请流程是什么?', 'logs': []}, config=config)
    print('第一次调用返回:')
    if '__interrupt__' in initial:
        print(initial['__interrupt__'])
    else:
        print(initial['final_answer'])
        return
    final_state = graph.invoke(
        Command(
            resume={
                'approved': True,
                'edited_answer': '请先在教务系统提交补考申请,再按学院要求在规定时间内补充证明材料。'
            }
        ),
        config=config,
    )
    print('\n恢复执行后的最终回答:')
    print(final_state['final_answer'])
if __name__ == '__main__':
    main()