LangGraph4j 从入门到精通
版本基准: LangGraph4j 1.8.11 | 最低 Java: 17 | 许可证: MIT
前置要求: 已掌握 LangChain / LangChain4j 基础开发
仓库地址: https://github.com/langgraph4j/langgraph4j
目录
- 第一部分:概述与快速入门
- 第二部分:核心概念深入
- 第三部分:状态持久化
- 第四部分:子图与嵌套工作流
- 第五部分:多 Agent 编排模式
- 第六部分:LLM 框架集成
- 第七部分:调试与可观测性
- 第八部分:生产部署与运维
- 第九部分:性能优化
- 第十部分:常见问题与陷阱
- 附录
第一部分:概述与快速入门
1.1 LangGraph4j 是什么
LangGraph4j 是 Python 版 LangGraph 的 Java 移植版,用于在 Java 生态中构建有状态的、多步骤的、多 Agent 的 LLM 应用。
核心定位
将 LLM 应用建模为有向图(Directed Graph)——节点(Node)负责执行逻辑,边(Edge)负责控制流转。这种图结构天然适合表达 Agent 的循环推理、工具调用、多步决策等复杂行为。
与 Python LangGraph 对比
| 维度 | Python LangGraph | Java LangGraph4j |
|---|---|---|
| 语言 | Python 3.9+ | Java 17+ |
| 异步模型 | asyncio | CompletableFuture + java-async-generator |
| 状态表示 | TypedDict / Pydantic | AgentState(Map 包装器) |
| LLM 集成 | LangChain Python | LangChain4j / Spring AI(双框架支持) |
| 流式输出 | Python async generator | AsyncGenerator(java-async-generator 库) |
| 检查点存储 | SQLite / Postgres / Redis | Memory / JDBC / Postgres / Oracle / Redis |
| Studio | LangGraph Studio(SaaS + 本地) | 内置 Studio(Jetty / SpringBoot / Quarkus) |
| 可观测性 | 回调机制 | Hooks + OpenTelemetry |
| 部署形态 | LangGraph Platform(独立服务) | Java 库(嵌入到你的应用中) |
核心价值
- 循环支持:突破传统 Chain 的线性限制,支持 Agent 的循环推理
- 状态持久化:内置检查点机制,支持中断恢复和时间旅行
- 多 Agent 编排:原生支持 Supervisor、层级、并行等协作模式
- 框架无关:同时支持 LangChain4j 和 Spring AI,不绑定特定 LLM 框架
- 生产就绪:多种数据库检查点、OpenTelemetry 集成、Studio 调试
1.2 项目概况与模块结构
项目信息
| 项目 | 信息 |
|---|---|
| GitHub | https://github.com/langgraph4j/langgraph4j |
| 最新版本 | 1.8.11(2026-03-30) |
| 最低 Java | 17 |
| 许可证 | MIT |
| 提交数 | 3,300+ commits |
| 核心维护者 | bsorrentino |
模块结构
langgraph4j/
├── langgraph4j-bom/ # BOM 依赖管理(统一版本)
│
├── langgraph4j-core/ # 🔧 核心引擎
│ # StateGraph, AgentState, Node, Edge,
│ # Checkpoint, Channel, Hooks
│
├── langgraph4j-opentelemetry/ # 📊 OpenTelemetry 集成
│ # OTELWrapCallTraceHook
│
├── langgraph4j-postgres-saver/ # 💾 PostgreSQL 检查点
├── langgraph4j-mysql-saver/ # 💾 MySQL 检查点
├── langgraph4j-oracle-saver/ # 💾 Oracle 检查点
├── langgraph4j-redis-saver/ # 💾 Redis 检查点
│
├── langchain4j/ # 🔗 LangChain4j 集成
│ ├── langchain4j-core/ # 核心集成(序列化器、工具服务)
│ └── langchain4j-agent/ # AgentExecutor(ReACT Agent)
│
├── spring-ai/ # 🔗 Spring AI 集成
│ ├── spring-ai-core/ # 核心集成
│ └── spring-ai-agent/ # AgentExecutor(Spring AI 版)
│
├── studio/ # 🖥️ Studio Web UI
│ ├── base/ # 基础接口
│ ├── jetty/ # Jetty 服务器
│ ├── quarkus/ # Quarkus 服务器
│ └── springboot/ # Spring Boot 服务器
│
├── how-tos/ # 📚 教程(Jupyter Notebook)
├── samples/ # 📚 示例代码
└── generator/ # 🔧 代码生成器
Maven 依赖引入
推荐使用 BOM 管理版本:
<properties>
<langgraph4j.version>1.8.11</langgraph4j.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-bom</artifactId>
<version>${langgraph4j.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- 核心依赖(必需) -->
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-core</artifactId>
</dependency>
<!-- LangChain4j 集成(可选,二选一) -->
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-langchain4j</artifactId>
</dependency>
<!-- Spring AI 集成(可选,二选一) -->
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-spring-ai</artifactId>
</dependency>
<!-- PostgreSQL 检查点(可选) -->
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-postgres-saver</artifactId>
</dependency>
<!-- OpenTelemetry(可选) -->
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-opentelemetry</artifactId>
</dependency>
<!-- Studio Spring Boot(可选) -->
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-studio-springboot</artifactId>
</dependency>
1.3 快速开始:第一个 StateGraph
以下是一个最简单的 LangGraph4j 示例——两步顺序处理:
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import org.bsc.langgraph4j.*;
import java.util.*;
public class HelloWorld {
// 1. 定义状态 Schema(声明状态通道及其 Reducer 行为)
static final Map<String, Channel<?>> SCHEMA = Map.of(
"input", Channel.DEFAULT, // 覆盖模式
"output", Channel.DEFAULT, // 覆盖模式
"steps", Channels.appender(ArrayList::new) // 追加模式
);
// 2. 定义节点:第一步 — 处理输入
static Map<String, Object> step1(Map<String, Object> state) {
var input = (String) state.get("input");
return Map.of(
"output", "处理结果: " + input.toUpperCase(),
"steps", "step1 完成"
);
}
// 3. 定义节点:第二步 — 格式化输出
static Map<String, Object> step2(Map<String, Object> state) {
var output = (String) state.get("output");
return Map.of(
"output", "【" + output + "】",
"steps", "step2 完成"
);
}
public static void main(String[] args) throws Exception {
// 4. 构建图
var graph = new StateGraph<>(SCHEMA)
.addNode("step1", step1)
.addNode("step2", step2)
.addEdge(START, "step1") // 入口 → step1
.addEdge("step1", "step2") // step1 → step2
.addEdge("step2", END); // step2 → 结束
// 5. 编译并执行
var workflow = graph.compile();
var result = workflow.invoke(Map.of("input", "Hello LangGraph4j"));
System.out.println("最终输出: " + result.get("output"));
System.out.println("执行步骤: " + result.get("steps"));
}
}
输出:
最终输出: 【处理结果: HELLO LANGGRAPH4J】
执行步骤: [step1 完成, step2 完成]
关键理解:节点函数只返回需要更新的状态字段,框架会根据 Schema 中定义的 Reducer 策略自动合并到完整状态中。
1.4 核心概念速览
| 概念 | 一句话解释 | 类比 |
|---|---|---|
| StateGraph | 图的定义容器,添加节点和边后编译为可运行图 | 类定义 |
| AgentState | 图的共享状态,本质是 Map<String, Object> 的包装器 |
共享上下文 |
| Channel / Reducer | 定义状态字段如何被更新(覆盖 / 追加 / 自定义) | 状态更新策略 |
| NodeAction | 节点的执行逻辑,接收状态、返回状态更新 | 方法体 |
| EdgeAction | 边的路由逻辑,根据状态决定下一个节点 | if-else 分支 |
| CompiledGraph | 编译后的不可变图实例,支持 invoke / stream 调用 | 实例化的对象 |
| CheckpointSaver | 持久化检查点,支持状态恢复和时间旅行 | 存档/读档 |
| Hooks | 横切关注点机制(Before/After/WrapCall),用于日志、重试、追踪 | AOP 切面 |
| Subgraph | 将一个图作为节点嵌入到另一个图中 | 方法调用 |
1.5 学习路径
为已掌握 LangChain / LangChain4j 的开发者定制的学习路径:
入门(1-2天)
├── 1. 理解核心概念:StateGraph → AgentState → Channel → Node → Edge
├── 2. 编写第一个 StateGraph(Hello World)
└── 3. 使用 MessagesState 构建简单对话 Agent
进阶(3-5天)
├── 4. 条件边与动态路由
├── 5. AgentExecutor(开箱即用的 ReACT Agent)
├── 6. 状态持久化(PostgresSaver)
└── 7. Studio 调试
高级(5-7天)
├── 8. 多 Agent Supervisor 模式
├── 9. 子图与嵌套工作流
├── 10. Hooks 机制(自定义重试、日志、指标)
├── 11. 并行执行 Fork-Join
└── 12. Human-in-the-Loop 中断机制
精通(持续)
├── 13. OpenTelemetry 分布式追踪
├── 14. 生产部署(Spring Boot + K8s)
├── 15. Adaptive RAG 等复杂模式
└── 16. 性能调优与成本控制
推荐学习资源:
- 官方仓库
how-tos/目录(Jupyter Notebook 教程) - 示例仓库:https://github.com/langgraph4j/langgraph4j-examples
- 官方文档站:Overview - LangGraph4j
第二部分:核心概念深入
2.1 StateGraph 详解
StateGraph 是整个框架的核心容器类,用于定义图的结构。
构建方式
// 方式一:传入 Schema + State 工厂
var graph = new StateGraph<>(MyState.SCHEMA, MyState::new);
// 方式二:仅传入 Schema(使用默认 AgentState)
var graph = new StateGraph<>(SCHEMA);
// 方式三:使用便捷构建器(基于 MessagesState)
var graph = new MessagesStateGraph<ChatMessage>();
核心 API
// 添加节点
StateGraph<S> addNode(String id, NodeAction<S> action)
StateGraph<S> addNode(String id, AsyncNodeAction<S> action)
// 添加固定边
StateGraph<S> addEdge(String from, String to)
// 添加条件边
StateGraph<S> addConditionalEdges(String from, EdgeAction<S> edgeAction, EdgeMappings mappings)
// 添加条件入口点
StateGraph<S> addConditionalEntryPoint(EdgeAction<S> edgeAction, EdgeMappings mappings)
// 编译
CompiledGraph<S> compile()
CompiledGraph<S> compile(CompileConfig config)
// Hooks 注册
StateGraph<S> addBeforeCallNodeHook(BeforeCallNodeHook<S> hook)
StateGraph<S> addAfterCallNodeHook(AfterCallNodeHook<S> hook)
StateGraph<S> addWrapCallNodeHook(WrapCallNodeHook<S> hook)
StateGraph<S> addWrapCallNodeHook(String nodeId, WrapCallNodeHook<S> hook) // 指定节点
// 图可视化(编译前)
String getGraph(GraphRepresentation.Type type, String title, boolean xdgOpen)
编译过程
compile() 执行以下操作:
- 结构检查:验证无孤立节点、边引用的节点存在
- 子图合并:将 StateGraph 类型的子图节点合并到父图中
- 生成执行计划:确定节点执行顺序和并行分组
- 返回不可变实例:
CompiledGraph是线程安全的
2.2 AgentState 与状态 Schema
AgentState 基类
AgentState 是所有状态的基类,本质是 Map<String, Object> 的包装器:
public class AgentState {
public AgentState(Map<String, Object> initData) { ... }
// 获取状态值
public <T> Optional<T> value(String key) { ... }
// 获取原始数据 Map
public Map<String, Object> data() { ... }
// 获取最后一条消息(MessagesState 专用)
public <M> Optional<M> lastMessage() { ... }
}
自定义状态类
public class ChatState extends AgentState {
public static final String MESSAGES_KEY = "messages";
public static final String NEXT_KEY = "next";
public static final String CONTEXT_KEY = "context";
// 定义 Schema:每个字段对应一个 Channel
public static final Map<String, Channel<?>> SCHEMA = Map.of(
MESSAGES_KEY, Channels.appender(ArrayList::new), // 消息列表:追加模式
NEXT_KEY, Channel.DEFAULT, // 下一步:覆盖模式
CONTEXT_KEY, Channel.DEFAULT // 上下文:覆盖模式
);
public ChatState(Map<String, Object> initData) {
super(initData);
}
// 类型安全的访问方法
public List<ChatMessage> messages() {
return this.<List<ChatMessage>>value(MESSAGES_KEY).orElse(List.of());
}
public Optional<String> next() {
return this.value(NEXT_KEY);
}
public Optional<Map<String, Object>> context() {
return this.value(CONTEXT_KEY);
}
}
预置状态类
MessagesState<M>:专为对话场景设计,内置 messages 字段(Appender 通道):
// 使用预置 MessagesState
var graph = new MessagesStateGraph<ChatMessage>()
.addNode("agent", agentNode)
.addEdge(START, "agent")
.addEdge("agent", END)
.compile();
2.3 Channel 与 Reducer
Channel 定义状态字段被更新时的合并策略(即 Reducer 模式)。
三种核心 Reducer
① 覆盖模式(Default)
新值直接替换旧值:
"currentResult", Channel.DEFAULT // 无默认值
"count", Channel.of(0) // 默认值为 0
"config", Channel.of(Map.of()) // 默认值为空 Map
// 节点返回 {"count": 5} → 状态中 count 变为 5
// 下一个节点返回 {"count": 10} → 状态中 count 变为 10(覆盖)
② 追加模式(Appender)
新值追加到列表中:
"messages", Channels.appender(ArrayList::new)
"steps", Channels.appender(ArrayList::new)
// 节点返回 {"steps": "step1"} → steps = ["step1"]
// 下一个节点返回 {"steps": "step2"} → steps = ["step1", "step2"]
③ 消息追加模式(MessageChannel.Appender)
专为聊天消息设计,支持按 ID 更新已有消息:
"messages", MessageChannel.Appender.of(ArrayList::new)
特殊行为:
- 新消息(无 ID 或 ID 不存在)→ 追加
- 已有消息(按 ID 匹配)→ 更新(支持人工编辑消息后恢复)
自定义 Reducer
// 自定义 Reducer:字符串拼接
var concatenatingChannel = new Channel<String>() {
@Override
public String merge(String existing, String newValue) {
return existing == null ? newValue : existing + " | " + newValue;
}
};
var SCHEMA = Map.of(
"log", concatenatingChannel
);
Schema 设计原则
| 原则 | 说明 |
|---|---|
| 每个状态字段必须有对应的 Channel | 否则编译时抛出异常 |
| 覆盖模式适合"当前值"语义 | 如 next(下一步路由)、result(当前结果) |
| 追加模式适合"历史记录"语义 | 如 messages(对话历史)、steps(执行步骤) |
| 避免在追加通道中存储大对象 | 会随执行步数线性增长 |
2.4 节点 NodeAction
节点是图的执行单元,接收当前状态,返回状态更新。
定义方式
① 实现 NodeAction 接口
public class MyNode implements NodeAction<MyState> {
@Override
public Map<String, Object> apply(MyState state) {
var input = state.value("input").orElse("");
return Map.of("output", "处理: " + input);
}
}
graph.addNode("myNode", new MyNode());
② Lambda 简写
graph.addNode("myNode", (NodeAction<MyState>) state -> {
return Map.of("output", "处理: " + state.value("input").orElse(""));
});
③ 异步节点
// 实现 AsyncNodeAction
public class AsyncNode implements AsyncNodeAction<MyState> {
@Override
public CompletableFuture<Map<String, Object>> apply(MyState state) {
return CompletableFuture.supplyAsync(() -> {
// 耗时操作(如 LLM 调用)
return Map.of("output", "异步处理完成");
});
}
}
// 使用 node_async 工具方法
graph.addNode("asyncNode", node_async(state ->
CompletableFuture.supplyAsync(() -> Map.of("output", "done"))
));
节点函数签名
// 同步
Map<String, Object> apply(AgentState state)
// 异步
CompletableFuture<Map<String, Object>> apply(AgentState state)
注意:节点函数只返回需要更新的字段,不需要返回完整状态。框架根据 Schema 中的 Reducer 策略自动合并。
节点返回值规则
| 返回值 | 效果 |
|---|---|
Map.of("key", value) |
更新指定字段 |
Map.of() |
空更新(不改变状态) |
null |
空更新(不改变状态) |
| 抛出异常 | 图执行终止(除非被 WrapCall Hook 捕获) |
2.5 边 EdgeAction
边定义节点间的控制流。
固定边
无条件从 A 跳转到 B:
graph.addEdge("nodeA", "nodeB");
graph.addEdge(START, "firstNode"); // 入口
graph.addEdge("lastNode", END); // 结束
条件边
基于当前状态动态决定下一个节点:
// 定义路由函数
EdgeAction<MyState> router = state -> {
var lastMsg = state.lastMessage().orElseThrow();
if (lastMsg.hasToolExecutionRequests()) {
return "executeTools"; // 有工具调用 → 执行工具
}
return END; // 无工具调用 → 结束
};
// 注册条件边
graph.addConditionalEdges("agent", router,
EdgeMappings.builder()
.to("executeTools") // 路由函数返回 "executeTools" → 跳转到此节点
.toEnd() // 路由函数返回 END → 结束
.build()
);
条件入口点
根据输入动态选择起始节点:
graph.addConditionalEntryPoint(
edge_async(state -> {
var mode = (String) state.value("mode").orElse("default");
return "search".equals(mode) ? "searchNode" : "defaultNode";
}),
EdgeMappings.builder()
.to("searchNode")
.to("defaultNode")
.build()
);
EdgeMappings 构建器
EdgeMappings.builder()
.to("nodeA") // 字符串路由值 → 节点
.to("nodeB")
.toEnd() // 特殊:路由到 END
.toEnd("FINISH") // 特殊:字符串 "FINISH" 路由到 END
.build()
2.6 CompiledGraph
编译后的不可变图实例,是实际运行的对象。
创建
// 无配置编译
var workflow = graph.compile();
// 带配置编译
var config = CompileConfig.builder()
.checkpointSaver(new PostgresSaver.builder()...build())
.releaseThread(false)
.build();
var workflow = graph.compile(config);
调用方式
同步调用 invoke
// 基本调用
var result = workflow.invoke(Map.of("input", "Hello"));
// 带 RunnableConfig(用于检查点)
var runnableConfig = RunnableConfig.builder()
.threadId("thread-123") // 线程 ID,关联检查点
.build();
var result = workflow.invoke(Map.of("input", "Hello"), runnableConfig);
流式调用 stream
// 流式执行,逐步获取每个节点的输出
for (var step : workflow.stream(Map.of("input", "Hello"))) {
System.out.println("节点: " + step.node());
System.out.println("状态: " + step.state());
}
NodeOutput 包含:
node()— 执行的节点名称state()— 该节点执行后的状态快照
RunnableConfig
var config = RunnableConfig.builder()
.threadId("thread-123") // 线程 ID(检查点关联)
.addParallelNodeExecutor("nodeA", executor) // 并行节点执行器
.build();
// 检查是否在 Studio 中运行
boolean inStudio = RunnableConfig.isRunningInStudio();
线程管理
releaseThread 配置项控制线程生命周期:
| 值 | 行为 | 适用场景 |
|---|---|---|
false(默认) |
执行完后保持线程,可通过检查点恢复 | 需要中断恢复、时间旅行 |
true |
执行完后释放线程资源 | 一次性执行、无需恢复 |
2.7 Hooks 机制
Hooks 是 LangGraph4j 的核心横切关注点机制,用于实现日志、重试、追踪、指标收集等。
三种 Hook 类型
| 类型 | 执行时机 | 接口 | 典型用途 |
|---|---|---|---|
| BeforeCall | 节点/边执行前 | BeforeCallNodeHook / BeforeCallEdgeHook |
参数校验、日志、状态预处理 |
| AfterCall | 节点/边执行后 | AfterCallNodeHook / AfterCallEdgeHook |
结果处理、指标收集 |
| WrapCall | 包裹整个执行 | WrapCallNodeHook / WrapCallEdgeHook |
重试、超时、完整追踪 |
注册方式
// 全局 Hook(应用于所有节点/边)
graph.addBeforeCallNodeHook((nodeId, state, config) -> {
log.info("节点 {} 即将执行", nodeId);
});
graph.addAfterCallNodeHook((nodeId, state, config, result) -> {
log.info("节点 {} 执行完成", nodeId);
});
// 指定节点的 Hook
graph.addWrapCallNodeHook("unstableNode", (nodeId, state, config, action) -> {
return retryAsync(action, 3, Duration.ofSeconds(1));
});
执行顺序
- BeforeCall / AfterCall: LIFO(后注册先执行)—— 类似 Servlet Filter
- WrapCall: FIFO(先注册先包裹)—— 最外层最先执行
WrapCall 实现重试
graph.addWrapCallNodeHook("llmNode", (nodeId, state, config, action) -> {
int maxRetries = 3;
CompletableFuture<Map<String, Object>> result = null;
for (int i = 0; i < maxRetries; i++) {
try {
result = action.call();
return result; // 成功则返回
} catch (Exception e) {
log.warn("节点 {} 第 {} 次重试,原因: {}", nodeId, i + 1, e.getMessage());
if (i == maxRetries - 1) throw e;
}
}
return result;
});
WrapCall 实现耗时统计
graph.addWrapCallNodeHook((nodeId, state, config, action) -> {
var start = System.currentTimeMillis();
return action.call()
.whenComplete((res, err) -> {
var duration = System.currentTimeMillis() - start;
if (err == null) {
log.info("节点 {} 执行成功,耗时 {}ms", nodeId, duration);
} else {
log.error("节点 {} 执行失败,耗时 {}ms", nodeId, duration, err);
}
});
});
2.8 中断机制 Human-in-the-Loop
v1.8.6+ 大幅增强了中断支持,允许在图执行的任意位置暂停,等待人工输入后恢复。
核心能力
- NodeHooks 支持可中断动作和流式节点
- CompiledGraph 集成中断元数据到 CompletableFuture 链
- Studio 支持管理中断和审批工作流
- 子图 中断支持(编译子图和状态子图自动支持)
使用方式
中断通过 CompileConfig 和 InterruptBefore / InterruptAfter 配置:
var config = CompileConfig.builder()
.checkpointSaver(saver) // 中断必须配合检查点
.interruptBefore(List.of("humanReview")) // 进入节点前暂停
.interruptAfter(List.of("generate")) // 节点执行后暂停
.build();
var workflow = graph.compile(config);
恢复执行
// 获取当前状态(包含中断信息)
var state = workflow.getState(runnableConfig);
// 人工审核后,使用更新后的状态恢复
workflow.invoke(null, runnableConfig); // 传入 null 表示从断点恢复
注意:中断机制必须配合
CheckpointSaver使用,否则无法保存和恢复状态。
2.9 取消支持
v1.8.5+ 引入图执行取消能力:
var result = workflow.invoke(inputs, config);
if (result.isCancelled()) {
// 处理取消逻辑
log.info("图执行被取消");
}
GraphResult.Status 枚举值:
COMPLETED— 正常完成CANCELLED— 被取消ERROR— 执行出错
第三部分:状态持久化
3.1 为什么需要检查点
检查点(Checkpoint)是 LangGraph4j 的核心能力之一,解决以下问题:
| 场景 | 无检查点 | 有检查点 |
|---|---|---|
| Agent 循环推理中断 | 丢失所有进度 | 从最后一步恢复 |
| Human-in-the-Loop | 无法暂停等待人工输入 | 暂停 → 审核 → 恢复 |
| 长时间运行的任务 | 进程崩溃后从头开始 | 自动恢复到最后检查点 |
| 调试 | 只能看到最终结果 | 时间旅行到任意历史状态 |
| 多轮对话 | invoke 之间无记忆 | 通过 threadId 关联历史 |
3.2 MemorySaver
内存存储,适用于开发和测试:
var saver = new MemorySaver();
var config = CompileConfig.builder()
.checkpointSaver(saver)
.build();
var workflow = graph.compile(config);
特点:
- 零配置,开箱即用
- 进程重启后数据丢失
- 不适合生产环境
3.3 PostgresSaver(生产推荐)
依赖
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-postgres-saver</artifactId>
<version>1.8.11</version>
</dependency>
配置
var saver = PostgresSaver.builder()
.host("localhost")
.port(5432)
.user("admin")
.password("password")
.database("langgraph4j")
.stateSerializer(stateSerializer) // 状态序列化器
.createTables(true) // 自动建表
.dropTablesFirst(false) // 是否先删表(生产环境设为 false)
.build();
推荐版本:PostgreSQL 16.4+
特性:
- 内置内存缓存,减少数据库往返
- 自动 Schema 初始化
clearCheckpointsCache()方法清除指定线程的缓存
3.4 其他 Saver:MySQL / Oracle / Redis
MysqlSaver
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-mysql-saver</artifactId>
</dependency>
var saver = MysqlSaver.builder()
.host("localhost").port(3306)
.user("root").password("pwd").database("langgraph4j")
.stateSerializer(serializer)
.build();
已知问题:MysqlSaver 存在数据不一致 bug(#347、#348),建议优先使用 PostgresSaver。
OracleSaver
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-oracle-saver</artifactId>
</dependency>
var saver = OracleSaver.builder()
.host("localhost").port(1521)
.user("admin").password("pwd").database("langgraph4j")
.stateSerializer(serializer)
.build();
RedisSaver
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-redis-saver</artifactId>
</dependency>
var saver = RedisSaver.builder()
.host("localhost").port(6379)
.stateSerializer(serializer)
.build();
适用场景:高性能、低延迟、缓存层。
3.5 CompileConfig 配置
var config = CompileConfig.builder()
.checkpointSaver(saver) // 检查点存储(必需才能持久化)
.releaseThread(false) // false=保持线程(可恢复),true=执行完释放
.interruptBefore(List.of("humanReview")) // 中断:进入节点前暂停
.interruptAfter(List.of("generate")) // 中断:节点执行后暂停
.build();
| 配置项 | 默认值 | 说明 |
|---|---|---|
checkpointSaver |
null(不持久化) | 检查点存储实现 |
releaseThread |
false | 执行完后是否释放线程 |
interruptBefore |
空列表 | 进入指定节点前暂停 |
interruptAfter |
空列表 | 指定节点执行后暂停 |
3.6 检查点操作 API
获取状态历史
// 获取完整状态历史(按时间倒序)
var history = workflow.getStateHistory(runnableConfig);
for (var snapshot : history) {
System.out.println("检查点 ID: " + snapshot.config().checkpointId());
System.out.println("节点: " + snapshot.next());
System.out.println("状态: " + snapshot.values());
}
获取最新检查点
var lastSnapshot = workflow.lastStateOf(runnableConfig);
时间旅行:从历史状态恢复
// 获取特定检查点
var pastState = workflow.getState(runnableConfig, checkpointId);
// 从该检查点重新执行
var result = workflow.invoke(null, pastState.config());
释放线程资源
saver.release(runnableConfig);
重要:如果
releaseThread=false,长时间不释放线程可能导致资源泄漏。生产环境中应在合适的时机调用release()。
3.7 状态序列化
状态持久化要求所有对象可序列化。LangGraph4j 提供了 ObjectStreamStateSerializer:
var serializer = new ObjectStreamStateSerializer<MyState>(MyState::new);
自定义序列化器
对于不可序列化的对象(如 LangChain4j 的 ChatMessage),需注册自定义序列化器:
var serializer = new ObjectStreamStateSerializer<MessageState>(MessageState::new);
serializer.mapper()
.register(ToolExecutionRequest.class, new ToolExecutionRequestSerializer())
.register(ChatMessage.class, new ChatMessageSerializer());
3.8 已知问题
| Issue | 描述 | 解决方案 |
|---|---|---|
| #347 | MysqlSaver 数据不一致 | 优先使用 PostgresSaver |
| #348 | MysqlSaver loadedCheckpoints 排序 bug | 使用 clearCheckpointsCache() |
| #321 | Studio 中 thread_id 复用导致主键冲突 | 每次执行使用唯一 thread_id |
| #234 | 请求增强 Checkpoint 添加创建日期 | 待官方修复 |
| #157 | 请求增强 StateSnapshot 添加 parentConfig | 待官方修复 |
第四部分:子图与嵌套工作流
4.1 三种子图方式对比
| 方式 | 适用场景 | 中断支持 | 流式支持 | 状态 Schema |
|---|---|---|---|---|
| 编译子图 (CompiledGraph) | 父子图共享 Schema | 必须共享至少一个 key | ||
| 节点动作包装 (NodeAction) | 父子图 Schema 不同 | 可完全不同 | ||
| 状态子图 (StateGraph) | 最紧密集成 | 编译时合并 |
4.2 编译子图(CompiledGraph)
将编译后的子图直接作为节点添加到父图中:
// 定义子图
var childGraph = new StateGraph<>(SharedState.SCHEMA, SharedState::new)
.addNode("childStep1", childAction1)
.addNode("childStep2", childAction2)
.addEdge(START, "childStep1")
.addEdge("childStep1", "childStep2")
.addEdge("childStep2", END);
// 编译子图
var compiledChild = childGraph.compile();
// 添加到父图
var parentGraph = new StateGraph<>(SharedState.SCHEMA, SharedState::new)
.addNode("parentStep", parentAction)
.addNode("subgraph", compiledChild) // 子图作为节点
.addEdge(START, "parentStep")
.addEdge("parentStep", "subgraph")
.addEdge("subgraph", END);
要求:父子图至少共享一个 Schema key 才能通信。
4.3 节点动作包装(NodeAction)
在节点函数内部手动调用子图,适用于父子图 Schema 不同的情况:
parentGraph.addNode("subgraph", state -> {
// 转换状态
var childInput = Map.of(
"childMessages", state.value("parentMessages").orElse(List.of())
);
// 手动调用子图
var childResult = compiledChild.invoke(childInput);
// 转换回父图状态
return Map.of(
"parentMessages", childResult.get("childMessages")
);
});
特点:
- 需要手动处理状态转换
- 中断和流式支持需自行实现
- 灵活性最高
4.4 状态子图(StateGraph)
将未编译的 StateGraph 直接作为节点添加,编译时合并到父图中:
// 定义子图(不编译)
var childGraph = new StateGraph<>(SharedState.SCHEMA, SharedState::new)
.addNode("childA", childActionA)
.addNode("childB", childActionB)
.addEdge(START, "childA")
.addEdge("childA", "childB")
.addEdge("childB", END);
// 直接添加到父图(编译时合并)
var parentGraph = new StateGraph<>(SharedState.SCHEMA, SharedState::new)
.addNode("subgraph", childGraph) // 传入 StateGraph 而非 CompiledGraph
.addEdge(START, "subgraph")
.addEdge("subgraph", END);
var workflow = parentGraph.compile();
编译时合并行为:
- 子图节点 ID 自动重命名避免冲突:
subgraph:childA、subgraph:childB - 通过
SubGraphNode.formatId("subgraph", "childA")获取合并后的真实 ID - 中断配置使用重命名后的 ID
- 流式输出自动支持
4.5 图可视化
LangGraph4j 支持两种可视化格式:
编译前可视化(StateGraph 级别)
// PlantUML 格式
var puml = graph.getGraph(GraphRepresentation.Type.PLANTUML, "我的图", false);
// Mermaid.js 格式
var mermaid = graph.getGraph(GraphRepresentation.Type.MERMAID, "我的图", false);
编译后可视化(CompiledGraph 级别)
// 编译后显示合并后的完整结构(含子图展开)
var mermaid = workflow.getGraph(GraphRepresentation.Type.MERMAID, "我的工作流", false);
提示:将 Mermaid 输出粘贴到 Mermaid Live Editor 即可查看可视化图。
第五部分:多 Agent 编排模式
5.1 模式总览
| 模式 | 描述 | 适用场景 |
|---|---|---|
| Supervisor | 主管 Agent 路由任务到专业 Agent | 最常用,适合大多数多 Agent 场景 |
| 层级 | 多层 Supervisor,团队管理 | Agent 数量多,需要分组管理 |
| 并行 (Fork-Join) | 多个 Agent 同时执行 | 独立任务并行加速 |
| 顺序 | Agent 按固定顺序执行 | 流水线处理 |
5.2 Supervisor 模式
架构
用户请求 → Supervisor Agent → [Researcher | Coder | ...] → Supervisor → FINISH
Supervisor 使用 LLM 理解用户意图,决定将任务路由到哪个专业 Agent。专业 Agent 执行完毕后,结果返回 Supervisor,由 Supervisor 决定继续分配还是结束。
完整代码示例
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import org.bsc.langgraph4j.*;
import org.bsc.langgraph4j.jdk.*;
import java.util.*;
// 1. 定义状态
class SupervisorState extends AgentState {
public static final Map<String, Channel<?>> SCHEMA = Map.of(
"messages", Channels.appender(ArrayList::new),
"next", Channel.DEFAULT
);
public SupervisorState(Map<String, Object> initData) { super(initData); }
public Optional<String> next() { return value("next"); }
}
// 2. 定义 Supervisor Agent(使用 LLM 做路由决策)
class SupervisorAgent implements NodeAction<SupervisorState> {
// 路由结果(LLM 结构化输出)
static class Router {
@Description("要路由到的 Worker 名称。如果所有工作已完成,返回 FINISH")
String next;
}
// LLM Service 接口
interface Service {
@SystemMessage("""
你是一个主管,负责管理以下 Worker 之间的对话:{{members}}。
给定用户请求,决定下一步应该由哪个 Worker 执行。
每个 Worker 执行任务后会返回结果和状态。
当所有工作完成时,返回 FINISH。
只返回 Worker 名称或 FINISH,不要有其他内容。
""")
Router evaluate(@V("members") String members, @UserMessage String userMessage);
}
private final Service service;
private final String[] members;
SupervisorAgent(Service service, String[] members) {
this.service = service;
this.members = members;
}
@Override
public Map<String, Object> apply(SupervisorState state) {
var userMessage = state.<List<String>>value("messages")
.orElse(List.of()).get(0);
var result = service.evaluate(
String.join(", ", members),
userMessage
);
return Map.of("next", result.next);
}
}
// 3. 定义专业 Agent
class ResearcherAgent implements NodeAction<SupervisorState> {
@Override
public Map<String, Object> apply(SupervisorState state) {
// 调用 LLM 执行研究任务
return Map.of("messages", "研究结果:...");
}
}
class CoderAgent implements NodeAction<SupervisorState> {
@Override
public Map<String, Object> apply(SupervisorState state) {
// 调用 LLM 执行编码任务
return Map.of("messages", "代码实现:...");
}
}
// 4. 构建 Supervisor 图
public class SupervisorExample {
public static void main(String[] args) throws Exception {
var supervisor = new SupervisorAgent(supervisorService, new String[]{"researcher", "coder"});
var researcher = new ResearcherAgent();
var coder = new CoderAgent();
var graph = new StateGraph<>(SupervisorState.SCHEMA, SupervisorState::new)
.addNode("supervisor", supervisor)
.addNode("researcher", researcher)
.addNode("coder", coder)
.addEdge(START, "supervisor")
.addConditionalEdges("supervisor",
state -> state.next().orElse(END),
EdgeMappings.builder()
.to("researcher")
.to("coder")
.toEnd("FINISH") // "FINISH" 字符串路由到 END
.build())
.addEdge("researcher", "supervisor") // 执行完返回主管
.addEdge("coder", "supervisor"); // 执行完返回主管
var workflow = graph.compile();
var result = workflow.invoke(Map.of(
"messages", "帮我研究 LangGraph4j 并写一个示例代码"
));
}
}
Prompt 设计要点
- 清晰列出所有 Worker 的职责描述,否则 LLM 路由准确率显著下降
- 明确 FINISH 条件,避免无限循环
- 限制输出格式,只返回 Worker 名称或 FINISH
模型分层策略
| Agent 角色 | 推荐模型 | 理由 |
|---|---|---|
| Supervisor | GPT-4o / Claude-3.5 | 需要强推理和规划能力 |
| 执行型 Agent | GPT-4o-mini / Claude-3-Haiku | 成本低、速度快 |
| 本地部署 | Llama 3.1 / Qwen 2.5 | 数据安全、零 API 成本 |
成本警告:多 Agent 系统的 Token 消耗约为单 Agent 的 15 倍,必须设置最大轮次熔断机制。
5.3 层级模式
通过子图实现多级 Supervisor:
// 团队 1:研究团队
var researchTeam = new StateGraph<>(TeamState.SCHEMA, TeamState::new)
.addNode("researchSupervisor", researchSupervisor)
.addNode("webSearcher", webSearcher)
.addNode("dbQuerier", dbQuerier)
.addEdge(START, "researchSupervisor")
.addConditionalEdges("researchSupervisor", researchRouter, ...)
.addEdge("webSearcher", "researchSupervisor")
.addEdge("dbQuerier", "researchSupervisor");
// 团队 2:开发团队
var devTeam = new StateGraph<>(TeamState.SCHEMA, TeamState::new)
.addNode("devSupervisor", devSupervisor)
.addNode("coder", coder)
.addNode("tester", tester)
.addEdge(START, "devSupervisor")
.addConditionalEdges("devSupervisor", devRouter, ...)
.addEdge("coder", "devSupervisor")
.addEdge("tester", "devSupervisor");
// 顶层主管
var topGraph = new StateGraph<>(TopState.SCHEMA, TopState::new)
.addNode("topSupervisor", topSupervisor)
.addNode("researchTeam", researchTeam.compile()) // 子图作为节点
.addNode("devTeam", devTeam.compile())
.addEdge(START, "topSupervisor")
.addConditionalEdges("topSupervisor", topRouter,
EdgeMappings.builder()
.to("researchTeam")
.to("devTeam")
.toEnd("FINISH")
.build())
.addEdge("researchTeam", "topSupervisor")
.addEdge("devTeam", "topSupervisor");
5.4 并行执行 Fork-Join
限制说明
LangGraph4j 的并行执行有严格限制:
- 仅支持 Fork-Join 模型(一个节点分叉到多个并行节点,然后汇聚)
- 仅允许 一层并行步骤(并行节点内部不能再有并行分支)
- 并行分支中 不允许条件边
基本用法
var workflow = new MessagesStateGraph<String>()
.addNode("A", makeNode("A")) // 分叉点
.addNode("A1", makeNode("A1")) // 并行分支 1
.addNode("A2", makeNode("A2")) // 并行分支 2
.addNode("A3", makeNode("A3")) // 并行分支 3
.addNode("B", makeNode("B")) // 汇聚点
// Fork:A → A1, A2, A3 并行执行
.addEdge("A", "A1")
.addEdge("A", "A2")
.addEdge("A", "A3")
// Join:A1, A2, A3 → B
.addEdge("A1", "B")
.addEdge("A2", "B")
.addEdge("A3", "B")
.addEdge(START, "A")
.addEdge("B", END)
.compile();
配置并行执行器
var config = RunnableConfig.builder()
.addParallelNodeExecutor("A1", ForkJoinPool.commonPool())
.addParallelNodeExecutor("A2", Executors.newFixedThreadPool(4))
.addParallelNodeExecutor("A3", ForkJoinPool.commonPool())
.build();
workflow.stream(inputs, config);
编译子图作为并行节点
并行节点可以是编译后的子图,实现更复杂的并行子工作流:
var subWorkflow = new StateGraph<>(...)
.addNode("s1", step1)
.addNode("s2", step2)
.addEdge(START, "s1")
.addEdge("s1", "s2")
.addEdge("s2", END)
.compile();
var mainGraph = new StateGraph<>(...)
.addNode("parallel1", subWorkflow) // 子图作为并行节点
.addNode("parallel2", anotherSubWorkflow)
.addNode("fork", forkNode)
.addNode("join", joinNode)
.addEdge("fork", "parallel1")
.addEdge("fork", "parallel2")
.addEdge("parallel1", "join")
.addEdge("parallel2", "join")
.compile();
5.5 顺序模式
通过固定边串联多个 Agent:
var graph = new StateGraph<>(SCHEMA)
.addNode("researcher", researcher)
.addNode("writer", writer)
.addNode("reviewer", reviewer)
.addNode("publisher", publisher)
.addEdge(START, "researcher")
.addEdge("researcher", "writer") // 研究 → 写作
.addEdge("writer", "reviewer") // 写作 → 审稿
.addEdge("reviewer", "publisher") // 审稿 → 发布
.addEdge("publisher", END);
5.6 AgentExecutor — 开箱即用的 ReACT Agent
LangGraph4j 提供了内置的 AgentExecutor,封装了完整的 ReACT 循环(思考 → 工具调用 → 观察 → 再思考)。
LangChain4j 版
// 依赖
// org.bsc.langgraph4j:langchain4j-agent
// 定义工具
public class MyTools {
@Tool("获取指定城市的天气")
String getWeather(@P("城市名称") String city) {
return weatherService.getForecast(city);
}
@Tool("计算两个数的和")
double sum(double a, double b) {
return a + b;
}
}
// 构建 AgentExecutor
var agentExecutor = AgentExecutor.builder()
.chatModel(chatModel) // LangChain4j ChatModel
.toolsFromObject(new MyTools()) // 从对象提取工具
.tool(specification, executor) // 动态添加工具
.build();
var workflow = agentExecutor.compile();
// 同步执行
var result = workflow.invoke(Map.of(
"messages", UserMessage.from("北京今天天气如何?234 + 45 等于多少?")
));
// 流式执行
for (var step : workflow.stream(Map.of(
"messages", UserMessage.from("Hello!")
))) {
System.out.println(step);
}
Spring AI 版
// 依赖
// org.bsc.langgraph4j:spring-ai-agent
var graph = AgentExecutor.builder()
.chatModel(chatModel) // Spring AI ChatModel
.tools(toolCallbacks) // Spring AI ToolCallback 列表
.build();
var workflow = graph.compile();
var iterator = workflow.stream(Map.of(
"messages", new UserMessage("query")
));
AgentExecutorEx(Spring AI 增强版)
支持 Skills、Approval Action、ConversationContextPolicy:
var agent = AgentExecutorEx.builder()
.chatModel(chatModel, true) // true = 启用流式
.toolsFromObject(new TestTools())
.defaultSystem("始终使用可用的技能来协助用户")
.skills(resourceLoader.getResource("classpath:skills")) // 技能文件
.conversationContextPolicy(new MessageWindowConversationContextPolicy(10)) // 消息窗口
.build()
.compile(compileConfig);
ConversationContextPolicy
v1.8.6+ 引入,用于在 LLM 调用前过滤消息(如滑动窗口),控制发送给 LLM 的上下文长度:
// LangChain4j 版
var agentExecutor = AgentExecutor.builder()
.chatModel(chatModel)
.conversationContextPolicy(new MessageWindowConversationContextPolicy(10))
.build();
// 只保留最近 10 条消息发送给 LLM,避免超出上下文窗口
5.7 Adaptive RAG 实战
Adaptive RAG 是一种根据查询复杂度动态选择检索策略的模式。官方示例位于 langgraph4j-examples/langchain4j/adaptive-rag。
架构
START
↓
[分析查询] ──→ [web_search] ──→ [grade_documents]
↓ ↓
[vectorstore_retrieve] ──→ [grade_documents]
↓
[transform_query] ←── (not_useful)
↓
[generate] ──→ (useful) → END
核心节点
| 节点 | 功能 | 路由条件 |
|---|---|---|
web_search |
Tavily Web 搜索 | 查询需要实时信息 |
retrieve |
Chroma 向量数据库检索 | 查询可从知识库回答 |
grade_documents |
评估检索文档的相关性 | useful / not_useful / not_supported |
transform_query |
重写查询以改善检索 | 文档不相关时 |
generate |
基于检索结果生成回答 | 文档相关时 |
依赖
- Chroma 向量数据库(Docker 部署)
- Tavily Web Search API
- LangChain4j 集成模块
第六部分:LLM 框架集成
6.1 与 LangChain4j 集成
模块依赖
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langchain4j-core-jdk</artifactId>
<version>1.8.11</version>
</dependency>
工具状态共享(InvocationParameters)
通过 InvocationParameters 在图状态和工具之间传递数据:
// 工具端:通过 InvocationParameters 接收状态
@Tool("执行测试")
String execTest(String message, InvocationParameters context) {
Object value = context.get("sharedKey"); // 从状态中读取
return "结果: " + message;
}
// 图端:通过 InvocationContext 传递状态
var toolService = LC4jToolService.builder()
.toolsFromObject(new MyTools())
.build();
Command callResult = toolService.execute(
List.of(toolExecutionRequest),
InvocationContext.builder()
.invocationParameters(InvocationParameters.from(state))
.build(),
"messages"
).join();
6.2 与 Spring AI 集成
模块依赖
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-spring-ai</artifactId>
<version>1.8.11</version>
</dependency>
工具状态共享(ToolContext)
通过 Spring AI 的 ToolContext 在工具中访问和修改图状态:
@Tool(description = "执行测试工具")
String execTest(@ToolParam String message, ToolContext context) {
// 读取图状态
Map<String, Object> state = context.getContext();
Object value = state.get("sharedKey");
// 更新图状态
return SpringAIToolResponseBuilder.of(context)
.update(Map.of("arg0", message))
.buildAndReturn("结果: " + message);
}
6.3 流式输出
LangGraph4j 内建一等公民的流式支持,基于 java-async-generator 库。
图级流式
for (var step : workflow.stream(inputs, runnableConfig)) {
// step 是 NodeOutput,包含节点名和状态
System.out.println("节点: " + step.node());
System.out.println("状态: " + step.state());
}
LLM Token 级流式(LangChain4j)
var generator = StreamingChatGenerator.<AiMessage, MyState>builder()
.mapResult(response -> {
if (response.finishReason() == FinishReason.TOOL_EXECUTION) {
return Map.of("agent_outcome", new AgentOutcome(action, null));
}
return Map.of("agent_outcome", new AgentOutcome(null, finish));
})
.startingNode("agent")
.startingState(state)
.build();
// 将 generator 嵌入状态,图的 .stream() 会自动展开
streamingChatLanguageModel.generate(messages, tools, generator.handler());
return Map.of("agent_outcome", generator);
LLM Token 级流式(Spring AI)
Flux<ChatResponse> flux = chatClient.prompt()
.messages(state.messages())
.stream()
.chatResponse();
var generator = StreamingChatGenerator.<MyState>builder()
.startingNode("agent")
.startingState(state)
.mapResult(response -> Map.of("messages", response.getResult().getOutput()))
.build(flux);
return Map.of("messages", generator);
StreamingOutputEnd(v1.8.5+)
用于显式标记流式输出结束:
if (output instanceof StreamingOutputEnd) {
// 流式输出完成
}
StreamingOutput.isEnd()已废弃,替换为isStreamingEnd()。
6.4 模型选择策略
| Agent 角色 | 推荐模型 | Token 成本 | 延迟 |
|---|---|---|---|
| Supervisor / 路由 | GPT-4o / Claude-3.5-Sonnet | 高 | 中 |
| 复杂推理 Agent | GPT-4o / Claude-3.5-Sonnet | 高 | 中 |
| 简单执行 Agent | GPT-4o-mini / Claude-3-Haiku | 低 | 低 |
| 嵌入模型 | text-embedding-3-small | 极低 | 低 |
| 本地部署 | Llama 3.1 8B / Qwen 2.5 7B | 零 | 取决于硬件 |
成本控制建议:
- Supervisor 必须用强模型(路由准确率直接决定系统质量)
- 执行型 Agent 优先用轻量模型
- 设置
ConversationContextPolicy限制上下文长度 - 实现语义缓存减少重复推理
- 设置全局 Token 预算和熔断机制
第七部分:调试与可观测性
7.1 LangGraph4j Studio
Studio 是 LangGraph4j 内置的可视化调试工具,以 Web UI 形式提供交互式调试。
功能特性
图结构可视化(Mermaid 图)
运行工作流并实时显示执行步骤
查看/编辑每个步骤的状态数据
管理中断(Human-in-the-Loop 审批)
恢复执行(从检查点恢复)
基于 HTTP Streaming 的实时通信
三种服务器实现
Jetty(独立运行)
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-studio-jetty</artifactId>
<version>1.8.11</version>
</dependency>
var instance = LangGraphStudioServer.Instance.builder()
.title("我的 Agent")
.graph(workflow)
.addInputStringArg("messages", true, v -> new UserMessage(Objects.toString(v)))
.compileConfig(CompileConfig.builder()
.checkpointSaver(new MemorySaver())
.build())
.build();
LangGraphStudioServer4Jetty.builder()
.port(8080)
.instance("default", instance)
.build()
.start()
.join();
Spring Boot
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-studio-springboot</artifactId>
<version>1.8.11</version>
</dependency>
@Configuration
public class StudioConfig extends LangGraphStudioConfig {
@Override
public Map<String, LangGraphStudioServer.Instance> instanceMap() {
return Map.of("myAgent", LangGraphStudioServer.Instance.builder()
.title("我的 Agent")
.graph(workflow)
.addInputStringArg("messages", true,
v -> new UserMessage(Objects.toString(v)))
.compileConfig(CompileConfig.builder()
.checkpointSaver(new MemorySaver())
.releaseThread(true)
.build())
.build());
}
}
Quarkus
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-studio-quarkus</artifactId>
<version>1.8.11</version>
</dependency>
配置方式类似 Spring Boot。
访问地址
http://localhost:<port>/?instance=<instance_id>
检测 Studio 环境
boolean inStudio = RunnableConfig.isRunningInStudio();
// 基于 STUDIO_METADATA_KEY 判断
7.2 日志配置
LangGraph4j 使用 SLF4J 日志门面,支持 Logback / Log4j2 等实现。
logging.properties 示例
# 全局日志级别
.level=INFO
# LangGraph4j 日志级别
org.bsc.langgraph4j.level=FINE
# 节点执行日志
org.bsc.langgraph4j.CompiledGraph.level=FINE
# 检查点日志
org.bsc.langgraph4j.checkpoint.level=FINE
7.3 OpenTelemetry 集成
依赖
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-opentelemetry</artifactId>
<version>1.8.11</version>
</dependency>
核心组件
| 组件 | 功能 |
|---|---|
OTELWrapCallTraceHook |
为每个节点和边调用创建 Span,添加 config/state 属性 |
OTELWrapCallTraceSetParentHook |
创建父 Span,将节点/边 Span 归组到工作流范围内 |
集成方式
var otelHook = new OTELWrapCallTraceHook<MyState>(stateSerializer);
var parentHook = OTELWrapCallTraceSetParentHook.<MyState>builder()
.scope("MyWorkflow") // 工作流范围名
.groupName("stream") // Span 组名
.build();
var workflow = new StateGraph<>(MyState.SCHEMA, stateSerializer)
// otelHook 先添加(FIFO,内层执行)
.addWrapCallNodeHook(otelHook)
.addWrapCallEdgeHook(otelHook)
// parentHook 后添加(FIFO,外层执行,包裹所有节点/边)
.addWrapCallNodeHook(parentHook)
.addWrapCallEdgeHook(parentHook)
.addNode("node1", action1)
.addNode("node2", action2)
.compile();
本地测试
模块内置了 Docker Compose 配置:
# Jaeger(UI 端口 16686)
docker compose -f src/docker/docker-compose-jaeger.yml up
# Grafana + Loki + OTel Collector
docker compose -f src/docker/docker-compose.yml up
7.4 自定义 Hook 实现指标收集
// 全局耗时统计 Hook
graph.addWrapCallNodeHook((nodeId, state, config, action) -> {
var start = System.nanoTime();
return action.call()
.whenComplete((result, error) -> {
var durationMs = (System.nanoTime() - start) / 1_000_000;
// 记录到 Micrometer
meterRegistry.timer("langgraph4j.node.duration",
"node", nodeId,
"status", error == null ? "success" : "error"
).record(durationMs, TimeUnit.MILLISECONDS);
// 记录到日志
log.info("节点 {} 执行{},耗时 {}ms",
nodeId,
error == null ? "成功" : "失败: " + error.getMessage(),
durationMs);
});
});
7.5 图可视化调试
PlantUML 输出
// 编译前
var puml = graph.getGraph(GraphRepresentation.Type.PLANTUML, "调试图", false);
System.out.println(puml);
// 粘贴到 https://www.plantuml.com/plantuml 查看
Mermaid.js 输出
// 编译后(含子图展开)
var mermaid = workflow.getGraph(GraphRepresentation.Type.MERMAID, "工作流", false);
System.out.println(mermaid);
// 粘贴到 https://mermaid.live 查看
第八部分:生产部署与运维
8.1 部署架构概述
关键认知:LangGraph4j 是一个 Java 库,不是独立服务。这与 Python 版 LangGraph(有 LangGraph Platform/Server)有本质区别。
部署 LangGraph4j 应用 = 部署一个标准的 Java 应用(Spring Boot / Quarkus / 普通 JAR)。
┌─────────────────────────────────────────┐
│ 标准 Java 应用 │
│ ┌─────────────────────────────────┐ │
│ │ Spring Boot / Quarkus │ │
│ │ ┌───────────────────────────┐ │ │
│ │ │ LangGraph4j (库) │ │ │
│ │ │ StateGraph + CompiledGraph│ │ │
│ │ └───────────────────────────┘ │ │
│ │ ┌─────────────┐ ┌───────────┐ │ │
│ │ │ LangChain4j │ │ Spring AI │ │ │
│ │ └─────────────┘ └───────────┘ │ │
│ └─────────────────────────────────┘ │
│ ┌──────────┐ ┌──────────┐ ┌────────┐ │
│ │ REST API │ │ WebSocket│ │ gRPC │ │
│ └──────────┘ └──────────┘ └────────┘ │
└─────────────────────────────────────────┘
8.2 Spring Boot 集成
依赖注入
Graph 节点可以作为 Spring Bean,通过标准 DI 注入服务:
@Configuration
public class ChatModelConfiguration {
@Bean
@Profile("ollama")
public ChatModel ollamaModel() {
return OllamaChatModel.builder()
.ollamaApi(new OllamaApi("http://localhost:11434"))
.defaultOptions(OllamaOptions.builder()
.model("qwen2.5:7b")
.temperature(0.1)
.build())
.build();
}
@Bean
@Profile("openai")
public ChatModel openAiModel() {
return OpenAiChatModel.builder()
.apiKey(System.getenv("OPENAI_API_KEY"))
.modelName("gpt-4o")
.build();
}
}
Web API 暴露
LangGraph4j 的 Graph 编译后是纯 Java 对象,需要自行编写 Controller 暴露端点:
@RestController
@RequestMapping("/api/agent")
public class AgentController {
private final CompiledGraph<MyState> workflow;
public AgentController(CompiledGraph<MyState> workflow) {
this.workflow = workflow;
}
@PostMapping("/invoke")
public Map<String, Object> invoke(@RequestBody Map<String, Object> request) {
return workflow.invoke(request);
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<NodeOutput> stream(@RequestParam String input) {
return Flux.fromIterable(
workflow.stream(Map.of("input", input))
);
}
}
环境变量管理
# application.properties
OPENAI_API_KEY=${OPENAI_API_KEY}
TAVILY_API_KEY=${TAVILY_API_KEY}
OLLAMA_BASE_URL=${OLLAMA_BASE_URL:http://localhost:11434}
# .env 文件
OPENAI_API_KEY=sk-xxx
TAVILY_API_KEY=tvly-xxx
8.3 Docker 容器化
FROM eclipse-temurin:17-jdk-alpine AS build
WORKDIR /app
COPY pom.xml .
COPY src ./src
RUN ./mvnw clean package -DskipTests
FROM eclipse-temurin:17-jre-alpine
WORKDIR /app
COPY --from=build /app/target/*.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-XX:+UseZGC", "-Xmx512m", "-jar", "app.jar"]
# 构建并运行
docker build -t langgraph4j-app .
docker run -p 8080:8080 \
-e OPENAI_API_KEY=sk-xxx \
-e TAVILY_API_KEY=tvly-xxx \
langgraph4j-app
8.4 Kubernetes 部署
Pod 配置
apiVersion: v1
kind: Pod
metadata:
name: langgraph4j-app
spec:
containers:
- name: app
image: your-registry/langgraph4j-app:1.8.11
ports:
- containerPort: 8080
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agent-secrets
key: openai-api-key
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: agent-secrets
key: database-url
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /actuator/health/readiness
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
HPA 弹性伸缩
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: langgraph4j-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: langgraph4j-app
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
8.5 持久化选型对比
| 存储后端 | 模块 | 性能 | 可靠性 | 运维复杂度 | 推荐场景 |
|---|---|---|---|---|---|
| PostgreSQL | langgraph4j-postgres-saver |
高 | 高 | 中 | |
| MySQL | langgraph4j-mysql-saver |
高 | 中 | 中 | 已有 MySQL 基础设施 |
| Oracle | langgraph4j-oracle-saver |
高 | 高 | 高 | Oracle 环境集成 |
| Redis | langgraph4j-redis-saver |
极高 | 中 | 低 | 高性能/缓存层 |
| Memory | langgraph4j-core |
极高 | 无 | 无 | 仅开发测试 |
建议:生产环境优先使用 PostgreSQL(相对稳定,MysqlSaver 有已知 bug)。
8.6 连接池调优
数据库连接池(HikariCP)
# application.yml
spring:
datasource:
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
HTTP 客户端连接池
LangChain4j 和 Spring AI 的 HTTP 客户端底层使用 OkHttp/Retrofit,通过其配置调整:
# LangChain4j 连接超时
langchain4j.open-ai.timeout=60s
8.7 安全设计
| 层面 | 措施 |
|---|---|
| API Key 管理 | Kubernetes Secrets / HashiCorp Vault,绝不硬编码 |
| 传输加密 | TLS 1.3、mTLS(Istio) |
| 速率限制 | Bucket4j 或网关层限流 |
| 访问控制 | Spring Security + OAuth2 / JWT |
| Prompt 注入防护 | 输入/输出 Guardrails |
| 审计日志 | 全量记录 Agent 操作和 LLM 调用 |
第九部分:性能优化
9.1 异步执行
节点异步化
// 使用 node_async 包装异步节点
graph.addNode("llmNode", node_async(state ->
CompletableFuture.supplyAsync(() -> {
// 非阻塞 LLM 调用
var response = chatModel.generate(state.messages());
return Map.of("messages", response);
})
));
边异步化
graph.addConditionalEdges("agent",
edge_async(state -> {
// 异步路由决策
return CompletableFuture.supplyAsync(() ->
state.lastMessage().isPresent() ? "tools" : END
);
}),
EdgeMappings.builder()
.to("tools")
.toEnd()
.build()
);
9.2 并行节点优化
ForkJoinPool 配置
// 为并行节点指定专用线程池
var config = RunnableConfig.builder()
.addParallelNodeExecutor("nodeA", Executors.newFixedThreadPool(4))
.addParallelNodeExecutor("nodeB", ForkJoinPool.commonPool())
.build();
注意事项
- 并行节点数量不宜过多(建议 ≤ 5),避免线程竞争
- I/O 密集型节点(LLM 调用、数据库查询)适合并行
- CPU 密集型节点并行收益有限
9.3 Token 成本控制
模型分层
// Supervisor 使用强模型
var supervisorModel = OpenAiChatModel.builder()
.apiKey(apiKey).modelName("gpt-4o").build();
// 执行 Agent 使用轻量模型
var executorModel = OpenAiChatModel.builder()
.apiKey(apiKey).modelName("gpt-4o-mini").build();
ConversationContextPolicy
// 限制发送给 LLM 的消息数量
var agent = AgentExecutor.builder()
.chatModel(chatModel)
.conversationContextPolicy(new MessageWindowConversationContextPolicy(10))
.build();
预算熔断
// 通过 Hook 实现 Token 预算控制
AtomicLong totalTokens = new AtomicLong(0);
long MAX_TOKENS = 100_000;
graph.addAfterCallNodeHook((nodeId, state, config, result) -> {
// 累计 Token 使用量
// 如果超出预算,抛出异常终止执行
if (totalTokens.addAndGet(estimateTokens(result)) > MAX_TOKENS) {
throw new RuntimeException("Token 预算超限");
}
});
9.4 检查点缓存策略
PostgresSaver 内置内存缓存,减少数据库往返:
// 手动清除指定线程的缓存
saver.clearCheckpointsCache(runnableConfig);
对于高频访问的场景,可以考虑在 PostgresSaver 前加一层 Redis 缓存。
第十部分:常见问题与陷阱
10.1 状态管理陷阱
Schema 不匹配
问题:节点返回了 Schema 中未定义的 key。
// Schema 只定义了 "messages" 和 "next"
var SCHEMA = Map.of("messages", Channels.appender(ArrayList::new), "next", Channel.DEFAULT);
// 节点返回了 "result"(未定义)→ 编译时或运行时异常
return Map.of("result", "done"); // ❌ 错误
解决:确保所有节点返回的 key 都在 Schema 中定义。
序列化失败
问题:状态中包含不可序列化的对象(如 LangChain4j 的 ChatMessage)。
解决:注册自定义序列化器:
var serializer = new ObjectStreamStateSerializer<>(MyState::new);
serializer.mapper()
.register(ChatMessage.class, new ChatMessageSerializer())
.register(ToolExecutionRequest.class, new ToolExecutionRequestSerializer());
状态丢失
问题:不配置 CheckpointSaver 时,invoke() 调用之间没有记忆。
解决:配置 CompileConfig.builder().checkpointSaver(saver).build()。
10.2 并行执行限制
这是 LangGraph4j 最容易踩的坑之一
| 限制 | 说明 |
|---|---|
| 仅 Fork-Join | 必须是一个分叉点对应一个汇聚点 |
| 单层并行 | 并行节点内部不能再有并行分支 |
| 无条件边 | 并行分支中不能使用 addConditionalEdges |
| 无嵌套并行 | Issue #332 讨论了进一步改进需求 |
设计建议:在设计工作流前,先画出图结构,确认是否满足这些限制。
10.3 检查点问题
MysqlSaver 数据不一致
- Issue #347:
MysqlSaver存在数据不一致问题 - Issue #348:
loadedCheckpoints排序 bug - 解决方案: 使用
clearCheckpointsCache()清理缓存,或改用 PostgresSaver
thread_id 冲突
- Issue #321: Studio 中复用 thread_id 会触发主键冲突
- 解决方案: 每次执行使用唯一的 thread_id
检查点 TTL
LangGraph4j 不内置检查点 TTL 策略。需要在数据库层面配置:
-- PostgreSQL: 定期清理过期检查点
DELETE FROM checkpoints WHERE created_at < NOW() - INTERVAL '7 days';
10.4 流式输出问题
版本间输出不一致
- Issue #350: 1.8.4 和 1.8.5/1.8.6 版本间 stream 输出结果不同
- 解决方案: 锁定版本,升级前充分测试
流式中断图
- Issue #118: 在流式节点输出时中断图执行仍是一个待解决问题
10.5 模型兼容性
本地模型函数调用失败
- Issue #275: 本地部署 DeepSeek V3 时函数调用失败
- 原因: 部分开源模型对 Function Calling / Tool Use 的支持不完善
- 解决方案:使用经过验证的模型(Llama 3.1、Qwen 2.5),或升级模型版本
10.6 社区与支持
| 资源 | 说明 |
|---|---|
| GitHub Issues | 问题反馈(当前 20 个 Open Issues) |
| GitHub Discussions | 技术讨论 |
| 示例仓库 | 官方示例代码 |
| 官方文档 | API 文档和教程 |
注意:LangGraph4j 核心维护者基本为 bsorrentino 一人(1.5k Stars),社区规模有限。遇到问题时建议:
- 先查阅 Issues 和源码
- 参考 Python LangGraph 文档(概念通用)
- 参考
how-tos/目录的 Jupyter Notebook
附录
A. 模块依赖速查表
| 模块 | Maven Artifact | 用途 |
|---|---|---|
| 核心引擎 | langgraph4j-core |
StateGraph, AgentState, Channel, Hooks |
| BOM | langgraph4j-bom |
统一版本管理 |
| PostgreSQL | langgraph4j-postgres-saver |
PostgreSQL 检查点 |
| MySQL | langgraph4j-mysql-saver |
MySQL 检查点 |
| Oracle | langgraph4j-oracle-saver |
Oracle 检查点 |
| Redis | langgraph4j-redis-saver |
Redis 检查点 |
| OpenTelemetry | langgraph4j-opentelemetry |
分布式追踪 |
| LangChain4j 核心 | langchain4j-core-jdk |
LangChain4j 集成 |
| LangChain4j Agent | langchain4j-agent |
AgentExecutor (LangChain4j) |
| Spring AI 核心 | langgraph4j-spring-ai |
Spring AI 集成 |
| Spring AI Agent | spring-ai-agent |
AgentExecutor (Spring AI) |
| Studio Jetty | langgraph4j-studio-jetty |
Studio (Jetty) |
| Studio SpringBoot | langgraph4j-studio-springboot |
Studio (Spring Boot) |
| Studio Quarkus | langgraph4j-studio-quarkus |
Studio (Quarkus) |
B. 核心 API 速查表
StateGraph
new StateGraph<>(SCHEMA)
new StateGraph<>(SCHEMA, StateFactory)
new MessagesStateGraph<M>()
.addNode(String id, NodeAction<S>)
.addEdge(String from, String to)
.addConditionalEdges(String from, EdgeAction<S>, EdgeMappings)
.addConditionalEntryPoint(EdgeAction<S>, EdgeMappings)
.compile()
.compile(CompileConfig)
Channel
Channel.DEFAULT // 覆盖(无默认值)
Channel.of(defaultValue) // 覆盖(有默认值)
Channels.appender(Supplier<List>) // 追加
MessageChannel.Appender.of(Supplier<List>) // 消息追加
EdgeMappings
EdgeMappings.builder()
.to("nodeA")
.to("nodeB")
.toEnd() // 路由到 END
.toEnd("FINISH") // 字符串 "FINISH" 路由到 END
.build()
CompiledGraph
workflow.invoke(Map<String, Object> inputs)
workflow.invoke(Map<String, Object> inputs, RunnableConfig)
workflow.stream(Map<String, Object> inputs)
workflow.stream(Map<String, Object> inputs, RunnableConfig)
workflow.getStateHistory(RunnableConfig)
workflow.lastStateOf(RunnableConfig)
workflow.getState(RunnableConfig, String checkpointId)
CompileConfig
CompileConfig.builder()
.checkpointSaver(saver)
.releaseThread(boolean)
.interruptBefore(List<String>)
.interruptAfter(List<String>)
.build()
RunnableConfig
RunnableConfig.builder()
.threadId(String)
.addParallelNodeExecutor(String nodeId, Executor)
.build()
C. 关键资源链接
| 资源 | 地址 |
|---|---|
| GitHub 主仓库 | https://github.com/langgraph4j/langgraph4j |
| 示例仓库 | https://github.com/langgraph4j/langgraph4j-examples |
| 官方文档站 | Overview - LangGraph4j |
| JavaDoc | https://langgraph4j.github.io/langgraph4j/apidocs/ |
| CHANGELOG | https://github.com/langgraph4j/langgraph4j/blob/main/CHANGELOG.md |
| 视觉构建器 | https://github.com/langgraph4j/langgraph4j-builder |
| OpenTelemetry 博客 | https://bsorrentino.github.io/bsorrentino/ai/2026/02/04/LangGraph4j-Hooks-and-OpenTelemertry.html |
| Python LangGraph 文档 | https://langchain-ai.github.io/langgraph/ |
| LangChain4j 文档 | https://docs.langchain4j.dev |
| Spring AI 文档 | Introduction :: Spring AI Reference |