林子雨编著《AI编程与智能体开发》(访问教材官网)
8.3 LangGraph快速入门
8.3.2 环境准备
为了保证依赖管理清晰,实际开发中通常建议使用独立的Python虚拟环境,以免与其他实验冲突。推荐命令如下:
python -m venv langgraph-env # 在当前目录下创建一个名字叫langgraph-env的独立 Python 环境,用来隔离项目依赖,不污染系统全局 Python
langgraph-env\Scripts\activate
pip install -U langgraph
8.3.3 第一个LangGraph应用
状态在整个工作流中流转,节点读取并更新状态,边则决定状态应流向哪个节点。三者各司其职、紧密配合。以下代码演示完整的 LangGraph构建过程:
# state_node_edge.py
from typing import TypedDict
from langgraph.graph import StateGraph, END
# 1. 定义 State:三个字段,各司其职
class State(TypedDict):
messages: list[str] # 用户输入的消息列表
intent: str | None # 由第一个节点识别出的意图
response: str | None # 由第二个节点生成的回复
# 2. 定义 Node:每个节点只做一件事,返回需要更新的字段
def classify_intent(state: State) -> dict:
"""读取最后一条消息,用关键词匹配识别意图"""
text = state["messages"][-1] if state["messages"] else ""
if any(kw in text for kw in ["计算", "多少", "分数", "成绩"]):
intent = "calculation"
elif any(kw in text for kw in ["规定", "政策", "要求"]):
intent = "policy"
else:
intent = "knowledge"
return {"intent": intent} # 只更新 intent 字段,其余字段保持不变
def generate_response(state: State) -> dict:
"""读取识别出的意图,生成对应回复"""
intent = state.get("intent", "knowledge")
question = state["messages"][-1] if state["messages"] else ""
if intent == "calculation":
response = f"这是一道计算题,让我帮你算一下:{question}"
elif intent == "policy":
response = f"关于规定的问题,我来查一下:{question}"
else:
response = f"关于您的问题「{question}」,这是我的解答..."
return {"response": response}
# 3. 构建工作流:先注册节点,再连接边
workflow = StateGraph(State)
workflow.add_node("classify", classify_intent)
workflow.add_node("respond", generate_response)
# 4. 定义边:固定顺序,classify → respond → END
workflow.add_edge("__start__", "classify")
workflow.add_edge("classify", "respond")
workflow.add_edge("respond", END)
# 5. 编译为可执行应用
app = workflow.compile()
# 6. 运行测试
result = app.invoke({"messages": ["期末考试成绩怎么计算?"], "intent": None, "response": None})
print(f"意图: {result['intent']}") # 输出: 意图: calculation
print(f"回复: {result['response']}") # 输出: 回复: 这是一道计算题,让我帮你算一下:期末考试成绩怎么计算?
8.4 LangGraph的条件边与条件节点
8.4.1 条件边的工作原理
路由函数必须是纯函数(Pure Function),即只根据输入状态决定输出,不产生副作用。下面是智学助手最基础的两路分支的例子:
# two_edges.py
from typing import TypedDict
from langgraph.graph import StateGraph, END
class AssistantState(TypedDict):
question: str
intent: str | None
answer: str | None
# 节点:处理知识问答
def knowledge_node(state: AssistantState) -> dict:
return {"answer": f"关于「{state['question']}」,这是我的解答..."}
# 节点:处理计算问题
def calc_node(state: AssistantState) -> dict:
return {"answer": f"让我帮您计算:{state['question']}"}
# 识别意图的节点
def classify_node(state: AssistantState) -> dict:
q = state["question"]
if any(kw in q for kw in ["计算", "多少", "分数"]):
return {"intent": "calculation"}
return {"intent": "knowledge"}
# 路由函数:根据意图决定走哪个节点
def route_by_intent(state: AssistantState) -> str:
intent = state.get("intent", "knowledge")
if intent == "calculation":
return "calc_node"
return "knowledge_node"
workflow = StateGraph(AssistantState)
workflow.add_node("classify", classify_node)
workflow.add_node("knowledge", knowledge_node)
workflow.add_node("calc", calc_node)
workflow.add_edge("__start__", "classify")
workflow.add_conditional_edges(
"classify", # 从 classify 节点出发
route_by_intent, # 路由函数
{
"knowledge_node": "knowledge", # 返回 "knowledge_node" → 执行 knowledge 节点
"calc_node": "calc", # 返回 "calc_node" → 执行 calc 节点
}
)
workflow.add_edge("knowledge", END)
workflow.add_edge("calc", END)
app = workflow.compile()
result = app.invoke({"question": "期末成绩怎么计算?", "intent": None, "answer": None})
print(result["answer"]) # 输出: 让我帮您计算:期末成绩怎么计算?
8.4.2 多条件分支
在上一节代码基础上,这里新增第三条分支:
# three_edges.py
from typing import TypedDict
from langgraph.graph import StateGraph, END
class AssistantState(TypedDict):
question: str
intent: str | None
sentiment: str | None # 新增:用户情绪
answer: str | None
# 新增节点:情绪安抚
def empathy_node(state: AssistantState) -> dict:
return {"answer": "非常抱歉给您带来了不好的体验。我已记录您的反馈,会尽快跟进处理。"}
# 节点:处理知识问答
def knowledge_node(state: AssistantState) -> dict:
return {"answer": f"关于「{state['question']}」,这是我的解答..."}
# 节点:处理计算问题
def calc_node(state: AssistantState) -> dict:
return {"answer": f"让我帮您计算:{state['question']}"}
# 识别意图和情绪
def classify_node(state: AssistantState) -> dict:
q = state["question"]
if any(kw in q for kw in ["计算", "多少", "分数"]):
return {"intent": "calculation", "sentiment": "neutral"}
elif any(kw in q for kw in ["投诉", "不满", "太差了"]):
return {"intent": "complaint", "sentiment": "negative"}
return {"intent": "knowledge", "sentiment": "neutral"}
# 路由函数:同时看意图和情绪
def route_by_intent(state: AssistantState) -> str:
intent = state.get("intent", "knowledge")
sentiment = state.get("sentiment", "neutral")
if intent == "complaint" and sentiment == "negative":
return "empathy" # 投诉且情绪负面,先安抚
if intent == "calculation":
return "calc"
return "knowledge"
workflow = StateGraph(AssistantState)
workflow.add_node("empathy", empathy_node)
workflow.add_node("classify", classify_node)
workflow.add_node("knowledge", knowledge_node)
workflow.add_node("calc", calc_node)
workflow.add_edge("__start__", "classify")
workflow.add_conditional_edges(
"classify",
route_by_intent,
{
"empathy": "empathy",
"calc": "calc",
"knowledge": "knowledge",
}
)
workflow.add_edge("empathy", END)
workflow.add_edge("knowledge", END)
workflow.add_edge("calc", END)
app = workflow.compile()
result = app.invoke({"question": "我投诉酒店房间卫生太差了?", "intent": None, "answer": None})
print(result["answer"]) # 输出: 非常抱歉给您带来了不好的体验。我已记录您的反馈,会尽快跟进处理。
8.4.3 LLM驱动的路由
在上一节代码基础上,这里把意图识别方式修改为由LLM判断意图:
# llm_intent.py
from typing import TypedDict
from langgraph.graph import StateGraph, END
from langchain_ollama import ChatOllama
from langchain_core.messages import SystemMessage, HumanMessage
import os
class AssistantState(TypedDict):
question: str
intent: str | None
sentiment: str | None # 新增:用户情绪
answer: str | None
llm = ChatOllama(
model=os.getenv("OLLAMA_MODEL", "gemma4:e4b"),
base_url=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"),
temperature=0
)
# 节点:情绪安抚
def empathy_node(state: AssistantState) -> dict:
return {"answer": "非常抱歉给您带来了不好的体验。我已记录您的反馈,会尽快跟进处理。"}
# 节点:处理知识问答
def knowledge_node(state: AssistantState) -> dict:
return {"answer": f"关于「{state['question']}」,这是我的解答..."}
# 节点:处理计算问题
def calc_node(state: AssistantState) -> dict:
return {"answer": f"让我帮您计算:{state['question']}"}
# classify_node 改为调用 LLM 判断意图
def classify_node(state: AssistantState) -> dict:
response = llm.invoke([
SystemMessage(content="""分析学生的问题,返回以下意图之一(只返回单词,不要解释):
knowledge(知识问答)、calculation(计算问题)、empathy(投诉反馈)"""),
HumanMessage(content=state["question"])
])
intent = response.content.strip().lower() # llm.invoke() 返回 AIMessage,.content 取文本
# 防御:LLM 返回意外值时回退到默认
valid_intents = {"knowledge", "calculation", "empathy"}
return {"intent": intent if intent in valid_intents else "knowledge"}
# 路由函数,只读取state["intent"],因为LLM 的判断结果已经写进了状态
def route_by_intent(state: AssistantState) -> str:
intent = state.get("intent", "knowledge")
return intent # 直接返回意图字符串,映射表用相同的 key
workflow = StateGraph(AssistantState)
workflow.add_node("empathy", empathy_node)
workflow.add_node("classify", classify_node)
workflow.add_node("knowledge", knowledge_node)
workflow.add_node("calc", calc_node)
workflow.add_edge("__start__", "classify")
workflow.add_conditional_edges(
"classify",
route_by_intent,
{
"empathy": "empathy",
"calc": "calc",
"knowledge": "knowledge",
}
)
workflow.add_edge("empathy", END)
workflow.add_edge("knowledge", END)
workflow.add_edge("calc", END)
app = workflow.compile()
result = app.invoke({"question": "这个西瓜怎么这么难吃", "intent": None, "answer": None})
print(result["answer"]) # 输出: 非常抱歉给您带来了不好的体验。我已记录您的反馈,会尽快跟进处理。
8.4.5 条件边的特殊用法:循环
条件边不仅可以向前分支,还可以指回已有节点,形成循环。智学助手的“反复追问”场景就需要这种结构——如果学生的问题描述不清楚,就循环要求补充,直到信息足够才继续处理。
# condition_edge_cycle.py
from typing import TypedDict
from langgraph.graph import StateGraph, END
class ClarifyState(TypedDict):
question: str
clarify_count: int # 记录已追问次数,防止无限循环
is_clear: bool # 问题是否已清晰
answer: str | None
def check_clarity(state: ClarifyState) -> dict:
"""判断问题是否足够清晰(这里用关键词判断,实际可用LLM来判断)"""
q = state["question"]
is_clear = len(q) > 10 and not q.endswith("?") # 简化判断:长度足够且不是模糊问法
return {"is_clear": is_clear, "clarify_count": state.get("clarify_count", 0)}
def ask_clarify(state: ClarifyState) -> dict:
"""追问节点:提示用户补充信息"""
count = state.get("clarify_count", 0) + 1
print(f"[第{count}次追问] 您的问题还不够具体,请补充更多细节。")
return {"clarify_count": count}
def answer_node(state: ClarifyState) -> dict:
if state.get("clarify_count", 0) >= 3:
return {"answer": "非常抱歉,我已经无法进一步追问了,请重新描述您的问题。"}
return {"answer": f"关于「{state['question']}」,这是我的解答..."}
def should_clarify(state: ClarifyState) -> str:
if state["is_clear"] or state.get("clarify_count", 0) >= 3:
return "answer" # 问题清晰,或追问次数达到上限,继续处理
return "clarify" # 问题不清晰,再次追问
workflow = StateGraph(ClarifyState)
workflow.add_node("check", check_clarity)
workflow.add_node("clarify", ask_clarify)
workflow.add_node("answer", answer_node)
workflow.add_edge("__start__", "check")
workflow.add_conditional_edges("check", should_clarify, {
"clarify": "clarify",
"answer": "answer",
})
workflow.add_edge("clarify", "check") # 追问后回到检查节点,形成循环
workflow.add_edge("answer", END)
app = workflow.compile()
result = app.invoke({"question": "这门课程为什么不及格?", "clarify_count": 0, "is_clear": False, "answer": None})
print(result["answer"])
8.5 LangGraph的工具调用
下面是一个完整代码示例:
# langgraph_toolnode_demo.py
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_ollama import ChatOllama
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode, tools_condition
# 1. 用 LangChain 的 @tool 定义工具
@tool
def query_course_notice(topic: str) -> str:
"""根据主题查询课程通知。topic 可以是:实验报告、课程答疑、期末复习。"""
notices = {
"实验报告": "实验报告需在第 10 周周五 18:00 前提交到课程平台。",
"课程答疑": "课程答疑安排在每周三 19:00-20:00,地点为线上会议室。",
"期末复习": "期末复习资料将在第 16 周发布,以课程平台通知为准。",
}
return notices.get(topic, "未查询到该主题的课程通知。")
@tool
def calculate_final_score(homework: float, experiment: float, exam: float) -> str:
"""按照课程规则计算总评成绩:作业 30%,实验 30%,期末 40%。"""
score = homework * 0.3 + experiment * 0.3 + exam * 0.4
return f"总评成绩为 {score:.1f} 分。"
tools = [query_course_notice, calculate_final_score]
# 2. 把工具绑定给大模型
llm = ChatOllama(
model="gemma4:e4b",
temperature=0,
base_url="http://127.0.0.1:11434",
)
llm_with_tools = llm.bind_tools(tools)
# 3. 定义 agent 节点:模型决定是否调用工具
def agent_node(state: MessagesState):
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
# 4. 用 ToolNode 包装工具
tool_node = ToolNode(tools)
# 5. 构建 LangGraph
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
builder.add_edge(START, "agent")
# agent 运行后:
# 如果模型返回 tool_calls,则进入 tools 节点;
# 如果模型没有返回 tool_calls,则结束。
builder.add_conditional_edges(
"agent",
tools_condition,
{
"tools": "tools",
END: END,
},
)
# 工具执行完后,把 ToolMessage 写回 messages,再回到 agent
builder.add_edge("tools", "agent")
graph = builder.compile()
if __name__ == "__main__":
result = graph.invoke(
{
"messages": [
HumanMessage(content="请查询实验报告什么时候提交。")
]
}
)
for message in result["messages"]:
print(message.type, ":", message.content)
8.6 LangGraph的状态和持久化
8.6.2 使用MessagesState保存对话历史
下面的示例构建一个最小聊天图。图中只有一个chatbot节点,该节点读取当前messages,调用本地Ollama模型生成回复,再把模型回复写回messages。由于编译时传入了InMemorySaver,同一个 thread_id下的后续调用可以读取前一次调用保存的消息历史。具体代码如下:
# langgraph_conversation_memory.py
from dotenv import load_dotenv
from langchain_ollama import ChatOllama
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, MessagesState, StateGraph
import os
load_dotenv()
llm = ChatOllama(
model=os.getenv("OLLAMA_MODEL", "gemma4:e4b"),
base_url=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"),
temperature=0.2,
)
def chatbot(state: MessagesState):
response = llm.invoke(state["messages"])
return {"messages": [response]}
builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)
memory = InMemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "student-001"}}
result1 = graph.invoke(
{"messages": [{"role": "user", "content": "我叫小林,正在学习LangGraph。"}]},
config=config,
)
print(result1["messages"][-1].content)
result2 = graph.invoke(
{"messages": [{"role": "user", "content": "我正在学习什么?"}]},
config=config,
)
print(result2["messages"][-1].content)
8.6.3 使用自定义状态保存会话信息
下面的示例在状态中同时保存messages和course两个字段。remember_course节点从用户输入中提取课程名称,answer节点再利用状态中的课程名称生成回答。这个例子说明,LangGraph的状态不仅可以保存对话历史,也可以保存业务字段。具体代码如下:
# langgraph_custom_state_memory.py
from typing import Annotated, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.graph.message import add_messages
class CourseState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
course: str
def remember_course(state: CourseState):
last_message = state["messages"][-1].content
if "数据库" in last_message:
return {"course": "数据库系统"}
if "人工智能" in last_message:
return {"course": "人工智能导论"}
return {}
def answer(state: CourseState):
course = state.get("course", "当前课程")
return {
"messages": [
{
"role": "assistant",
"content": f"我会结合《{course}》这门课来回答你的问题。"
}
]
}
builder = StateGraph(CourseState)
builder.add_node("remember_course", remember_course)
builder.add_node("answer", answer)
builder.add_edge(START, "remember_course")
builder.add_edge("remember_course", "answer")
builder.add_edge("answer", END)
graph = builder.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": "course-thread-001"}}
result = graph.invoke(
{
"messages": [{"role": "user", "content": "我正在学习数据库,请帮我复习索引。"}],
"course": "",
},
config=config,
)
print(result["messages"][-1].content)
print("当前课程:", result["course"])
8.6.4 使用SQLite实现检查点持久化
使用 SQLite 检查点前,需要安装额外依赖:
pip install langgraph-checkpoint-sqlite
下面的示例与 8.5.2节基本相同,只是把InMemorySaver换成了SqliteSaver。第一次运行程序时,状态会写入langgraph_state.db;再次运行程序时,只要使用相同数据库文件和相同 thread_id,就可以继续读取该线程的历史状态。具体代码如下:
# langgraph_sqlite_checkpoint.py
from dotenv import load_dotenv
from langchain_ollama import ChatOllama
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import START, END, MessagesState, StateGraph
import os
load_dotenv()
llm = ChatOllama(
model=os.getenv("OLLAMA_MODEL", "gemma4:e4b"),
base_url=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"),
temperature=0.2,
)
def chatbot(state: MessagesState):
response = llm.invoke(state["messages"])
return {"messages": [response]}
builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)
config = {"configurable": {"thread_id": "student-001"}}
with SqliteSaver.from_conn_string("langgraph_state.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
result = graph.invoke(
{"messages": [{"role": "user", "content": "请记住:我的实验题目是课程问答助手。"}]},
config=config,
)
print(result["messages"][-1].content)
result = graph.invoke(
{"messages": [{"role": "user", "content": "我的实验题目是什么?"}]},
config=config,
)
print(result["messages"][-1].content)
8.7 LangGraph编程实践
8.7.1 课程助教智能体案例(基础版)
# course-agent.py
from operator import add
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, interrupt
class AssistantState(TypedDict):
question: str #学生问题
category: str # 问题分类,routine / complex
retrieved_docs: list[str] # 检索到的知识库文档
draft_answer: str # 草稿答案
need_human_review: bool # 是否需要人工审核
review_result: dict # 教师审核结果
final_answer: str # 最终答案
logs: Annotated[list[str], add] #工作流日志(自动累加)
RULES = {
"作业": [
"作业须在截止时间前提交到课程平台。",
"逾期提交按照课程说明扣分。"
],
"调课": [
"调课通知以课程群和教学平台公告为准。"
],
"补考": [
"补考申请由学院教学办公室统一处理。",
"需在规定时间内提交材料。"
],
}
def classify_question(state: AssistantState):
q = state["question"]
if any(k in q for k in ["补考", "申诉", "成绩", "特殊情况"]):
category = "complex"
else:
category = "routine"
return {
"category": category,
"logs": [f"classify:{category}"]
}
def retrieve_knowledge(state: AssistantState):
q = state["question"]
docs = []
for keyword, values in RULES.items():
if keyword in q:
docs.extend(values)
return {
"retrieved_docs": docs,
"logs": [f"retrieve:{len(docs)} docs"]
}
def draft_answer(state: AssistantState):
docs = state.get("retrieved_docs", [])
if docs:
draft = ";".join(docs)
else:
draft = "当前规则库中未找到直接答案,请教师进一步确认。"
need_review = (state["category"] == "complex") or (len(docs) == 0)
return {
"draft_answer": draft,
"need_human_review": need_review,
"logs": [f"draft:review={need_review}"]
}
def route_after_draft(state: AssistantState) -> Literal["teacher_review", "finalize_answer"]:
if state["need_human_review"]:
return "teacher_review"
return "finalize_answer"
def teacher_review(state: AssistantState):
payload = {
"instruction": "请教师审核该复杂问题的答复草稿",
"question": state["question"],
"draft": state["draft_answer"],
"docs": state["retrieved_docs"],
}
review = interrupt(payload)
return {
"review_result": review,
"logs": ["review:completed"]
}
def finalize_answer(state: AssistantState):
review = state.get("review_result", {})
if review and review.get("edited_answer"):
answer = review["edited_answer"]
else:
answer = state["draft_answer"]
return {
"final_answer": answer,
"logs": [f"final:{answer[:18]}"]
}
builder = StateGraph(AssistantState)
builder.add_node("classify_question", classify_question)
builder.add_node("retrieve_knowledge", retrieve_knowledge)
builder.add_node("draft_answer", draft_answer)
builder.add_node("teacher_review", teacher_review)
builder.add_node("finalize_answer", finalize_answer)
builder.add_edge(START, "classify_question")
builder.add_edge("classify_question", "retrieve_knowledge")
builder.add_edge("retrieve_knowledge", "draft_answer")
builder.add_conditional_edges("draft_answer", route_after_draft)
builder.add_edge("teacher_review", "finalize_answer")
builder.add_edge("finalize_answer", END)
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "course-demo-001"}}
initial = graph.invoke(
{"question": "老师,我想申请补考,流程是什么?", "logs": []},
config=config,
)
print(initial["__interrupt__"])
final_state = graph.invoke(
Command(
resume={
"approved": True,
"edited_answer": "请在教务系统提交补考申请,并于三日内补充证明材料。"
}
),
config=config,
)
print(final_state["final_answer"])
8.7.2 课程助教智能体案例(高级版)
首先,安装 Ollama,并确保本机可以通过命令行正常调用。如果 Ollama 服务尚未启动,可以在Windows系统的cmd命令行界面中执行以下命令启动Ollama:
ollama serve
然后,下载回答模型和嵌入模型。若机器显存或内存较充足,可以直接使用 gemma4;若是普通教学用笔记本,也可以改用较小规格的 gemma4 变体,例如 gemma4:e4b。嵌入模型的作用是把文本转化成向量,这里使用nomic-embed-text。具体安装命令如下:
ollama pull gemma4:e4b # 下载回答模型
ollama pull nomic-embed-text # 下载嵌入模型
模型下载完成后,可用下面的命令检查本地模型列表:
ollama list
建议在独立虚拟环境中完成安装,以免与其他实验冲突。推荐命令如下:
python -m venv langgraph-env # 在当前目录下创建一个名字叫langgraph-env的独立 Python 环境,用来隔离项目依赖,不污染系统全局 Python
langgraph-env\Scripts\activate
pip install -U langgraph langchain langchain-ollama langchain-chroma chromadb langchain-text-splitters
接下来要把课程资料切分成多个文档片段,再用嵌入模型将它们写入 Chroma(一种轻量级向量数据库)。具体代码如下(可以从教材官网下载build_vector_db_ollama.py):
# build_vector_db_ollama.py
from pathlib import Path
import shutil
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_ollama import OllamaEmbeddings
from langchain_chroma import Chroma
BASE_DIR = Path(__file__).resolve().parent
SOURCE_FILE = BASE_DIR / 'course_docs_sample.txt'
PERSIST_DIR = BASE_DIR / 'chroma_db'
COLLECTION_NAME = 'course_rules'
EMBED_MODEL = 'nomic-embed-text'
def load_documents() -> list[Document]:
text = SOURCE_FILE.read_text(encoding='utf-8')
splitter = RecursiveCharacterTextSplitter(
chunk_size=120,
chunk_overlap=20,
separators=['\n', '。', ';', ',']
)
chunks = splitter.split_text(text)
return [
Document(page_content=chunk, metadata={'source': SOURCE_FILE.name, 'chunk_id': index})
for index, chunk in enumerate(chunks)
]
def main() -> None:
if not SOURCE_FILE.exists():
raise FileNotFoundError(f'未找到课程资料文件: {SOURCE_FILE}')
if PERSIST_DIR.exists():
shutil.rmtree(PERSIST_DIR)
documents = load_documents()
embeddings = OllamaEmbeddings(model=EMBED_MODEL)
Chroma.from_documents(
documents=documents,
embedding=embeddings,
collection_name=COLLECTION_NAME,
persist_directory=str(PERSIST_DIR),
)
print(f'已写入 {len(documents)} 个文档片段到 {PERSIST_DIR}')
print(f'嵌入模型: {EMBED_MODEL}')
if __name__ == '__main__':
main()
完成建库后,需要把原来基础版案例中的 retrieve_knowledge 和 draft_answer 节点改造为“向量检索 + Gemma4 生成”。具体代码如下( 可以到教材官网下载course_agent_ollama_rag.py):
# course_agent_ollama_rag.py
from operator import add
from pathlib import Path
from typing import Annotated
from typing_extensions import TypedDict
from langchain_chroma import Chroma
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, StateGraph
BASE_DIR = Path(__file__).resolve().parent
PERSIST_DIR = BASE_DIR / 'chroma_db'
COLLECTION_NAME = 'course_rules'
CHAT_MODEL = 'gemma4:e4b'
EMBED_MODEL = 'nomic-embed-text'
class RagState(TypedDict):
question: str
retrieved_docs: list[str]
draft_answer: str
final_answer: str
logs: Annotated[list[str], add]
if not PERSIST_DIR.exists():
raise FileNotFoundError(f'未找到向量库目录: {PERSIST_DIR}。请先运行 build_vector_db_ollama.py。')
llm = ChatOllama(model=CHAT_MODEL, temperature=0)
embeddings = OllamaEmbeddings(model=EMBED_MODEL)
vector_store = Chroma(
collection_name=COLLECTION_NAME,
persist_directory=str(PERSIST_DIR),
embedding_function=embeddings,
)
def retrieve_knowledge(state: RagState):
docs = vector_store.similarity_search(state['question'], k=3)
contents = [doc.page_content for doc in docs]
return {'retrieved_docs': contents, 'logs': [f'retrieve:{len(contents)} docs']}
SYSTEM_PROMPT = '''
你是《AI编程与智能体开发》课程助教。
请严格依据给定材料回答问题。
如果材料不足,请明确说“知识库中未找到充分依据”,不要编造。
回答应尽量简洁、准确、适合本科生阅读。
'''.strip()
def generate_answer(state: RagState):
if not state['retrieved_docs']:
return {
'draft_answer': '知识库中未找到相关材料,请补充课程资料或转人工处理。',
'logs': ['draft:no_docs']
}
context = '\n\n'.join(
f'[材料{index + 1}] {text}'
for index, text in enumerate(state['retrieved_docs'])
)
prompt = f'''{SYSTEM_PROMPT}
学生问题:
{state['question']}
参考材料:
{context}
请给出最终回答:'''
answer = llm.invoke(prompt).content
return {'draft_answer': answer, 'logs': ['draft:done']}
def finalize_answer(state: RagState):
return {'final_answer': state['draft_answer'], 'logs': [f"final:{state['draft_answer'][:18]}"]}
builder = StateGraph(RagState)
builder.add_node('retrieve_knowledge', retrieve_knowledge)
builder.add_node('generate_answer', generate_answer)
builder.add_node('finalize_answer', finalize_answer)
builder.add_edge(START, 'retrieve_knowledge')
builder.add_edge('retrieve_knowledge', 'generate_answer')
builder.add_edge('generate_answer', 'finalize_answer')
builder.add_edge('finalize_answer', END)
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
def main() -> None:
config = {'configurable': {'thread_id': 'local-rag-demo-001'}}
result = graph.invoke({'question': '补考申请流程是什么?', 'logs': []}, config=config)
print('检索结果:')
for index, doc in enumerate(result['retrieved_docs'], start=1):
print(f'{index}. {doc}')
print('\n最终回答:')
print(result['final_answer'])
if __name__ == '__main__':
main()
实现思路并不复杂:在状态中增加 need_human_review 和 review_result 两个字段;在 generate_answer 节点中加入审核判定逻辑;然后新增 route_after_generate 和 teacher_review 两个节点,其中 teacher_review 通过 interrupt(payload) 暂停工作流,把问题、检索材料和草稿答案交给教师;教师审核后,再用 Command(resume=...) 在相同 thread_id 下恢复执行。具体代码如下( 可以从教材官网下载course_agent_ollama_rag_review.py):
# course_agent_ollama_rag_review.py
from operator import add
from pathlib import Path
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langchain_chroma import Chroma
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, StateGraph
from langgraph.types import Command, interrupt
BASE_DIR = Path(__file__).resolve().parent
PERSIST_DIR = BASE_DIR / 'chroma_db'
COLLECTION_NAME = 'course_rules'
CHAT_MODEL = 'gemma4:e4b'
EMBED_MODEL = 'nomic-embed-text'
SENSITIVE_KEYWORDS = ['补考', '成绩', '申诉', '特殊情况']
class ReviewRagState(TypedDict):
question: str
retrieved_docs: list[str]
draft_answer: str
need_human_review: bool
review_result: dict
final_answer: str
logs: Annotated[list[str], add]
if not PERSIST_DIR.exists():
raise FileNotFoundError(f'未找到向量库目录: {PERSIST_DIR}。请先运行 build_vector_db_ollama.py。')
llm = ChatOllama(model=CHAT_MODEL, temperature=0)
embeddings = OllamaEmbeddings(model=EMBED_MODEL)
vector_store = Chroma(
collection_name=COLLECTION_NAME,
persist_directory=str(PERSIST_DIR),
embedding_function=embeddings,
)
def retrieve_knowledge(state: ReviewRagState):
docs = vector_store.similarity_search(state['question'], k=3)
contents = [doc.page_content for doc in docs]
return {'retrieved_docs': contents, 'logs': [f'retrieve:{len(contents)} docs']}
SYSTEM_PROMPT = '''
你是《AI编程与智能体开发》课程助教。
请严格依据给定材料回答问题。
如果材料不足,请明确说“知识库中未找到充分依据”,不要编造。
回答应尽量简洁、准确、适合本科生阅读。
'''.strip()
def generate_answer(state: ReviewRagState):
if not state['retrieved_docs']:
return {
'draft_answer': '知识库中未找到相关材料,请补充课程资料或转人工处理。',
'need_human_review': True,
'logs': ['draft:no_docs']
}
context = '\n\n'.join(
f'[材料{index + 1}] {text}'
for index, text in enumerate(state['retrieved_docs'])
)
prompt = f'''{SYSTEM_PROMPT}
学生问题:
{state['question']}
参考材料:
{context}
请给出最终回答:'''
answer = llm.invoke(prompt).content
need_review = any(keyword in state['question'] for keyword in SENSITIVE_KEYWORDS)
if '知识库中未找到充分依据' in answer:
need_review = True
return {
'draft_answer': answer,
'need_human_review': need_review,
'logs': [f'draft:review={need_review}']
}
def route_after_generate(state: ReviewRagState) -> Literal['teacher_review', 'finalize_answer']:
if state['need_human_review']:
return 'teacher_review'
return 'finalize_answer'
def teacher_review(state: ReviewRagState):
payload = {
'instruction': '请教师审核本地RAG课程助教的答复草稿',
'question': state['question'],
'draft_answer': state['draft_answer'],
'retrieved_docs': state['retrieved_docs'],
}
review = interrupt(payload)
return {'review_result': review, 'logs': ['review:completed']}
def finalize_answer(state: ReviewRagState):
review = state.get('review_result', {})
if review and review.get('edited_answer'):
answer = review['edited_answer']
else:
answer = state['draft_answer']
return {'final_answer': answer, 'logs': [f'final:{answer[:18]}']}
builder = StateGraph(ReviewRagState)
builder.add_node('retrieve_knowledge', retrieve_knowledge)
builder.add_node('generate_answer', generate_answer)
builder.add_node('teacher_review', teacher_review)
builder.add_node('finalize_answer', finalize_answer)
builder.add_edge(START, 'retrieve_knowledge')
builder.add_edge('retrieve_knowledge', 'generate_answer')
builder.add_conditional_edges('generate_answer', route_after_generate)
builder.add_edge('teacher_review', 'finalize_answer')
builder.add_edge('finalize_answer', END)
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
def main() -> None:
config = {'configurable': {'thread_id': 'local-rag-review-demo-001'}}
initial = graph.invoke({'question': '补考申请流程是什么?', 'logs': []}, config=config)
print('第一次调用返回:')
if '__interrupt__' in initial:
print(initial['__interrupt__'])
else:
print(initial['final_answer'])
return
final_state = graph.invoke(
Command(
resume={
'approved': True,
'edited_answer': '请先在教务系统提交补考申请,再按学院要求在规定时间内补充证明材料。'
}
),
config=config,
)
print('\n恢复执行后的最终回答:')
print(final_state['final_answer'])
if __name__ == '__main__':
main()
