使用MTC整理资料还是非常棒的,结合traeIde闭环了

LangGraph4j 从入门到精通

版本基准: LangGraph4j 1.8.11 | 最低 Java: 17 | 许可证: MIT
前置要求: 已掌握 LangChain / LangChain4j 基础开发
仓库地址: https://github.com/langgraph4j/langgraph4j


目录


第一部分:概述与快速入门

1.1 LangGraph4j 是什么

LangGraph4j 是 Python 版 LangGraph 的 Java 移植版,用于在 Java 生态中构建有状态的、多步骤的、多 Agent 的 LLM 应用

核心定位

将 LLM 应用建模为有向图(Directed Graph)——节点(Node)负责执行逻辑,边(Edge)负责控制流转。这种图结构天然适合表达 Agent 的循环推理、工具调用、多步决策等复杂行为。

与 Python LangGraph 对比

维度 Python LangGraph Java LangGraph4j
语言 Python 3.9+ Java 17+
异步模型 asyncio CompletableFuture + java-async-generator
状态表示 TypedDict / Pydantic AgentState(Map 包装器)
LLM 集成 LangChain Python LangChain4j / Spring AI(双框架支持)
流式输出 Python async generator AsyncGenerator(java-async-generator 库)
检查点存储 SQLite / Postgres / Redis Memory / JDBC / Postgres / Oracle / Redis
Studio LangGraph Studio(SaaS + 本地) 内置 Studio(Jetty / SpringBoot / Quarkus)
可观测性 回调机制 Hooks + OpenTelemetry
部署形态 LangGraph Platform(独立服务) Java 库(嵌入到你的应用中)

核心价值

  1. 循环支持:突破传统 Chain 的线性限制,支持 Agent 的循环推理
  2. 状态持久化:内置检查点机制,支持中断恢复和时间旅行
  3. 多 Agent 编排:原生支持 Supervisor、层级、并行等协作模式
  4. 框架无关:同时支持 LangChain4j 和 Spring AI,不绑定特定 LLM 框架
  5. 生产就绪:多种数据库检查点、OpenTelemetry 集成、Studio 调试

1.2 项目概况与模块结构

项目信息

项目 信息
GitHub https://github.com/langgraph4j/langgraph4j
最新版本 1.8.11(2026-03-30)
最低 Java 17
许可证 MIT
提交数 3,300+ commits
核心维护者 bsorrentino

模块结构

langgraph4j/
├── langgraph4j-bom/                  # BOM 依赖管理(统一版本)
│
├── langgraph4j-core/                 # 🔧 核心引擎
│                                     #   StateGraph, AgentState, Node, Edge,
│                                     #   Checkpoint, Channel, Hooks
│
├── langgraph4j-opentelemetry/        # 📊 OpenTelemetry 集成
│                                     #   OTELWrapCallTraceHook
│
├── langgraph4j-postgres-saver/       # 💾 PostgreSQL 检查点
├── langgraph4j-mysql-saver/          # 💾 MySQL 检查点
├── langgraph4j-oracle-saver/         # 💾 Oracle 检查点
├── langgraph4j-redis-saver/          # 💾 Redis 检查点
│
├── langchain4j/                      # 🔗 LangChain4j 集成
│   ├── langchain4j-core/             #   核心集成(序列化器、工具服务)
│   └── langchain4j-agent/            #   AgentExecutor(ReACT Agent)
│
├── spring-ai/                        # 🔗 Spring AI 集成
│   ├── spring-ai-core/               #   核心集成
│   └── spring-ai-agent/              #   AgentExecutor(Spring AI 版)
│
├── studio/                           # 🖥️ Studio Web UI
│   ├── base/                         #   基础接口
│   ├── jetty/                        #   Jetty 服务器
│   ├── quarkus/                      #   Quarkus 服务器
│   └── springboot/                   #   Spring Boot 服务器
│
├── how-tos/                          # 📚 教程(Jupyter Notebook)
├── samples/                          # 📚 示例代码
└── generator/                        # 🔧 代码生成器

Maven 依赖引入

推荐使用 BOM 管理版本

<properties>
    <langgraph4j.version>1.8.11</langgraph4j.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.bsc.langgraph4j</groupId>
            <artifactId>langgraph4j-bom</artifactId>
            <version>${langgraph4j.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<!-- 核心依赖(必需) -->
<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-core</artifactId>
</dependency>

<!-- LangChain4j 集成(可选,二选一) -->
<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-langchain4j</artifactId>
</dependency>

<!-- Spring AI 集成(可选,二选一) -->
<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-spring-ai</artifactId>
</dependency>

<!-- PostgreSQL 检查点(可选) -->
<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-postgres-saver</artifactId>
</dependency>

<!-- OpenTelemetry(可选) -->
<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-opentelemetry</artifactId>
</dependency>

<!-- Studio Spring Boot(可选) -->
<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-studio-springboot</artifactId>
</dependency>

1.3 快速开始:第一个 StateGraph

以下是一个最简单的 LangGraph4j 示例——两步顺序处理:

import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import org.bsc.langgraph4j.*;
import java.util.*;

public class HelloWorld {

    // 1. 定义状态 Schema(声明状态通道及其 Reducer 行为)
    static final Map<String, Channel<?>> SCHEMA = Map.of(
        "input",    Channel.DEFAULT,                      // 覆盖模式
        "output",   Channel.DEFAULT,                      // 覆盖模式
        "steps",    Channels.appender(ArrayList::new)     // 追加模式
    );

    // 2. 定义节点:第一步 — 处理输入
    static Map<String, Object> step1(Map<String, Object> state) {
        var input = (String) state.get("input");
        return Map.of(
            "output", "处理结果: " + input.toUpperCase(),
            "steps",  "step1 完成"
        );
    }

    // 3. 定义节点:第二步 — 格式化输出
    static Map<String, Object> step2(Map<String, Object> state) {
        var output = (String) state.get("output");
        return Map.of(
            "output", "【" + output + "】",
            "steps",  "step2 完成"
        );
    }

    public static void main(String[] args) throws Exception {
        // 4. 构建图
        var graph = new StateGraph<>(SCHEMA)
            .addNode("step1", step1)
            .addNode("step2", step2)
            .addEdge(START, "step1")    // 入口 → step1
            .addEdge("step1", "step2")  // step1 → step2
            .addEdge("step2", END);     // step2 → 结束

        // 5. 编译并执行
        var workflow = graph.compile();

        var result = workflow.invoke(Map.of("input", "Hello LangGraph4j"));

        System.out.println("最终输出: " + result.get("output"));
        System.out.println("执行步骤: " + result.get("steps"));
    }
}

输出

最终输出: 【处理结果: HELLO LANGGRAPH4J】
执行步骤: [step1 完成, step2 完成]

关键理解:节点函数只返回需要更新的状态字段,框架会根据 Schema 中定义的 Reducer 策略自动合并到完整状态中。


1.4 核心概念速览

概念 一句话解释 类比
StateGraph 图的定义容器,添加节点和边后编译为可运行图 类定义
AgentState 图的共享状态,本质是 Map<String, Object> 的包装器 共享上下文
Channel / Reducer 定义状态字段如何被更新(覆盖 / 追加 / 自定义) 状态更新策略
NodeAction 节点的执行逻辑,接收状态、返回状态更新 方法体
EdgeAction 边的路由逻辑,根据状态决定下一个节点 if-else 分支
CompiledGraph 编译后的不可变图实例,支持 invoke / stream 调用 实例化的对象
CheckpointSaver 持久化检查点,支持状态恢复和时间旅行 存档/读档
Hooks 横切关注点机制(Before/After/WrapCall),用于日志、重试、追踪 AOP 切面
Subgraph 将一个图作为节点嵌入到另一个图中 方法调用

1.5 学习路径

为已掌握 LangChain / LangChain4j 的开发者定制的学习路径:

入门(1-2天)
├── 1. 理解核心概念:StateGraph → AgentState → Channel → Node → Edge
├── 2. 编写第一个 StateGraph(Hello World)
└── 3. 使用 MessagesState 构建简单对话 Agent

进阶(3-5天)
├── 4. 条件边与动态路由
├── 5. AgentExecutor(开箱即用的 ReACT Agent)
├── 6. 状态持久化(PostgresSaver)
└── 7. Studio 调试

高级(5-7天)
├── 8. 多 Agent Supervisor 模式
├── 9. 子图与嵌套工作流
├── 10. Hooks 机制(自定义重试、日志、指标)
├── 11. 并行执行 Fork-Join
└── 12. Human-in-the-Loop 中断机制

精通(持续)
├── 13. OpenTelemetry 分布式追踪
├── 14. 生产部署(Spring Boot + K8s)
├── 15. Adaptive RAG 等复杂模式
└── 16. 性能调优与成本控制

推荐学习资源


第二部分:核心概念深入

2.1 StateGraph 详解

StateGraph 是整个框架的核心容器类,用于定义图的结构。

构建方式

// 方式一:传入 Schema + State 工厂
var graph = new StateGraph<>(MyState.SCHEMA, MyState::new);

// 方式二:仅传入 Schema(使用默认 AgentState)
var graph = new StateGraph<>(SCHEMA);

// 方式三:使用便捷构建器(基于 MessagesState)
var graph = new MessagesStateGraph<ChatMessage>();

核心 API

// 添加节点
StateGraph<S> addNode(String id, NodeAction<S> action)
StateGraph<S> addNode(String id, AsyncNodeAction<S> action)

// 添加固定边
StateGraph<S> addEdge(String from, String to)

// 添加条件边
StateGraph<S> addConditionalEdges(String from, EdgeAction<S> edgeAction, EdgeMappings mappings)

// 添加条件入口点
StateGraph<S> addConditionalEntryPoint(EdgeAction<S> edgeAction, EdgeMappings mappings)

// 编译
CompiledGraph<S> compile()
CompiledGraph<S> compile(CompileConfig config)

// Hooks 注册
StateGraph<S> addBeforeCallNodeHook(BeforeCallNodeHook<S> hook)
StateGraph<S> addAfterCallNodeHook(AfterCallNodeHook<S> hook)
StateGraph<S> addWrapCallNodeHook(WrapCallNodeHook<S> hook)
StateGraph<S> addWrapCallNodeHook(String nodeId, WrapCallNodeHook<S> hook)  // 指定节点

// 图可视化(编译前)
String getGraph(GraphRepresentation.Type type, String title, boolean xdgOpen)

编译过程

compile() 执行以下操作:

  1. 结构检查:验证无孤立节点、边引用的节点存在
  2. 子图合并:将 StateGraph 类型的子图节点合并到父图中
  3. 生成执行计划:确定节点执行顺序和并行分组
  4. 返回不可变实例CompiledGraph 是线程安全的

2.2 AgentState 与状态 Schema

AgentState 基类

AgentState 是所有状态的基类,本质是 Map<String, Object> 的包装器:

public class AgentState {
    public AgentState(Map<String, Object> initData) { ... }

    // 获取状态值
    public <T> Optional<T> value(String key) { ... }

    // 获取原始数据 Map
    public Map<String, Object> data() { ... }

    // 获取最后一条消息(MessagesState 专用)
    public <M> Optional<M> lastMessage() { ... }
}

自定义状态类

public class ChatState extends AgentState {

    public static final String MESSAGES_KEY = "messages";
    public static final String NEXT_KEY = "next";
    public static final String CONTEXT_KEY = "context";

    // 定义 Schema:每个字段对应一个 Channel
    public static final Map<String, Channel<?>> SCHEMA = Map.of(
        MESSAGES_KEY, Channels.appender(ArrayList::new),  // 消息列表:追加模式
        NEXT_KEY,     Channel.DEFAULT,                     // 下一步:覆盖模式
        CONTEXT_KEY,  Channel.DEFAULT                      // 上下文:覆盖模式
    );

    public ChatState(Map<String, Object> initData) {
        super(initData);
    }

    // 类型安全的访问方法
    public List<ChatMessage> messages() {
        return this.<List<ChatMessage>>value(MESSAGES_KEY).orElse(List.of());
    }

    public Optional<String> next() {
        return this.value(NEXT_KEY);
    }

    public Optional<Map<String, Object>> context() {
        return this.value(CONTEXT_KEY);
    }
}

预置状态类

MessagesState<M>:专为对话场景设计,内置 messages 字段(Appender 通道):

// 使用预置 MessagesState
var graph = new MessagesStateGraph<ChatMessage>()
    .addNode("agent", agentNode)
    .addEdge(START, "agent")
    .addEdge("agent", END)
    .compile();

2.3 Channel 与 Reducer

Channel 定义状态字段被更新时的合并策略(即 Reducer 模式)。

三种核心 Reducer

① 覆盖模式(Default)

新值直接替换旧值:

"currentResult", Channel.DEFAULT          // 无默认值
"count",         Channel.of(0)            // 默认值为 0
"config",        Channel.of(Map.of())     // 默认值为空 Map
// 节点返回 {"count": 5} → 状态中 count 变为 5
// 下一个节点返回 {"count": 10} → 状态中 count 变为 10(覆盖)

② 追加模式(Appender)

新值追加到列表中:

"messages", Channels.appender(ArrayList::new)
"steps",    Channels.appender(ArrayList::new)
// 节点返回 {"steps": "step1"} → steps = ["step1"]
// 下一个节点返回 {"steps": "step2"} → steps = ["step1", "step2"]

③ 消息追加模式(MessageChannel.Appender)

专为聊天消息设计,支持按 ID 更新已有消息:

"messages", MessageChannel.Appender.of(ArrayList::new)

特殊行为:

  • 新消息(无 ID 或 ID 不存在)→ 追加
  • 已有消息(按 ID 匹配)→ 更新(支持人工编辑消息后恢复)

自定义 Reducer

// 自定义 Reducer:字符串拼接
var concatenatingChannel = new Channel<String>() {
    @Override
    public String merge(String existing, String newValue) {
        return existing == null ? newValue : existing + " | " + newValue;
    }
};

var SCHEMA = Map.of(
    "log", concatenatingChannel
);

Schema 设计原则

原则 说明
每个状态字段必须有对应的 Channel 否则编译时抛出异常
覆盖模式适合"当前值"语义 next(下一步路由)、result(当前结果)
追加模式适合"历史记录"语义 messages(对话历史)、steps(执行步骤)
避免在追加通道中存储大对象 会随执行步数线性增长

2.4 节点 NodeAction

节点是图的执行单元,接收当前状态,返回状态更新。

定义方式

① 实现 NodeAction 接口

public class MyNode implements NodeAction<MyState> {
    @Override
    public Map<String, Object> apply(MyState state) {
        var input = state.value("input").orElse("");
        return Map.of("output", "处理: " + input);
    }
}

graph.addNode("myNode", new MyNode());

② Lambda 简写

graph.addNode("myNode", (NodeAction<MyState>) state -> {
    return Map.of("output", "处理: " + state.value("input").orElse(""));
});

③ 异步节点

// 实现 AsyncNodeAction
public class AsyncNode implements AsyncNodeAction<MyState> {
    @Override
    public CompletableFuture<Map<String, Object>> apply(MyState state) {
        return CompletableFuture.supplyAsync(() -> {
            // 耗时操作(如 LLM 调用)
            return Map.of("output", "异步处理完成");
        });
    }
}

// 使用 node_async 工具方法
graph.addNode("asyncNode", node_async(state ->
    CompletableFuture.supplyAsync(() -> Map.of("output", "done"))
));

节点函数签名

// 同步
Map<String, Object> apply(AgentState state)

// 异步
CompletableFuture<Map<String, Object>> apply(AgentState state)

注意:节点函数只返回需要更新的字段,不需要返回完整状态。框架根据 Schema 中的 Reducer 策略自动合并。

节点返回值规则

返回值 效果
Map.of("key", value) 更新指定字段
Map.of() 空更新(不改变状态)
null 空更新(不改变状态)
抛出异常 图执行终止(除非被 WrapCall Hook 捕获)

2.5 边 EdgeAction

边定义节点间的控制流。

固定边

无条件从 A 跳转到 B:

graph.addEdge("nodeA", "nodeB");
graph.addEdge(START, "firstNode");   // 入口
graph.addEdge("lastNode", END);      // 结束

条件边

基于当前状态动态决定下一个节点:

// 定义路由函数
EdgeAction<MyState> router = state -> {
    var lastMsg = state.lastMessage().orElseThrow();
    if (lastMsg.hasToolExecutionRequests()) {
        return "executeTools";  // 有工具调用 → 执行工具
    }
    return END;                 // 无工具调用 → 结束
};

// 注册条件边
graph.addConditionalEdges("agent", router,
    EdgeMappings.builder()
        .to("executeTools")    // 路由函数返回 "executeTools" → 跳转到此节点
        .toEnd()               // 路由函数返回 END → 结束
        .build()
);

条件入口点

根据输入动态选择起始节点:

graph.addConditionalEntryPoint(
    edge_async(state -> {
        var mode = (String) state.value("mode").orElse("default");
        return "search".equals(mode) ? "searchNode" : "defaultNode";
    }),
    EdgeMappings.builder()
        .to("searchNode")
        .to("defaultNode")
        .build()
);

EdgeMappings 构建器

EdgeMappings.builder()
    .to("nodeA")          // 字符串路由值 → 节点
    .to("nodeB")
    .toEnd()              // 特殊:路由到 END
    .toEnd("FINISH")      // 特殊:字符串 "FINISH" 路由到 END
    .build()

2.6 CompiledGraph

编译后的不可变图实例,是实际运行的对象。

创建

// 无配置编译
var workflow = graph.compile();

// 带配置编译
var config = CompileConfig.builder()
    .checkpointSaver(new PostgresSaver.builder()...build())
    .releaseThread(false)
    .build();
var workflow = graph.compile(config);

调用方式

同步调用 invoke

// 基本调用
var result = workflow.invoke(Map.of("input", "Hello"));

// 带 RunnableConfig(用于检查点)
var runnableConfig = RunnableConfig.builder()
    .threadId("thread-123")  // 线程 ID,关联检查点
    .build();
var result = workflow.invoke(Map.of("input", "Hello"), runnableConfig);

流式调用 stream

// 流式执行,逐步获取每个节点的输出
for (var step : workflow.stream(Map.of("input", "Hello"))) {
    System.out.println("节点: " + step.node());
    System.out.println("状态: " + step.state());
}

NodeOutput 包含:

  • node() — 执行的节点名称
  • state() — 该节点执行后的状态快照

RunnableConfig

var config = RunnableConfig.builder()
    .threadId("thread-123")                              // 线程 ID(检查点关联)
    .addParallelNodeExecutor("nodeA", executor)          // 并行节点执行器
    .build();

// 检查是否在 Studio 中运行
boolean inStudio = RunnableConfig.isRunningInStudio();

线程管理

releaseThread 配置项控制线程生命周期:

行为 适用场景
false(默认) 执行完后保持线程,可通过检查点恢复 需要中断恢复、时间旅行
true 执行完后释放线程资源 一次性执行、无需恢复

2.7 Hooks 机制

Hooks 是 LangGraph4j 的核心横切关注点机制,用于实现日志、重试、追踪、指标收集等。

三种 Hook 类型

类型 执行时机 接口 典型用途
BeforeCall 节点/边执行前 BeforeCallNodeHook / BeforeCallEdgeHook 参数校验、日志、状态预处理
AfterCall 节点/边执行后 AfterCallNodeHook / AfterCallEdgeHook 结果处理、指标收集
WrapCall 包裹整个执行 WrapCallNodeHook / WrapCallEdgeHook 重试、超时、完整追踪

注册方式

// 全局 Hook(应用于所有节点/边)
graph.addBeforeCallNodeHook((nodeId, state, config) -> {
    log.info("节点 {} 即将执行", nodeId);
});
graph.addAfterCallNodeHook((nodeId, state, config, result) -> {
    log.info("节点 {} 执行完成", nodeId);
});

// 指定节点的 Hook
graph.addWrapCallNodeHook("unstableNode", (nodeId, state, config, action) -> {
    return retryAsync(action, 3, Duration.ofSeconds(1));
});

执行顺序

  • BeforeCall / AfterCall: LIFO(后注册先执行)—— 类似 Servlet Filter
  • WrapCall: FIFO(先注册先包裹)—— 最外层最先执行

WrapCall 实现重试

graph.addWrapCallNodeHook("llmNode", (nodeId, state, config, action) -> {
    int maxRetries = 3;
    CompletableFuture<Map<String, Object>> result = null;
    for (int i = 0; i < maxRetries; i++) {
        try {
            result = action.call();
            return result;  // 成功则返回
        } catch (Exception e) {
            log.warn("节点 {} 第 {} 次重试,原因: {}", nodeId, i + 1, e.getMessage());
            if (i == maxRetries - 1) throw e;
        }
    }
    return result;
});

WrapCall 实现耗时统计

graph.addWrapCallNodeHook((nodeId, state, config, action) -> {
    var start = System.currentTimeMillis();
    return action.call()
        .whenComplete((res, err) -> {
            var duration = System.currentTimeMillis() - start;
            if (err == null) {
                log.info("节点 {} 执行成功,耗时 {}ms", nodeId, duration);
            } else {
                log.error("节点 {} 执行失败,耗时 {}ms", nodeId, duration, err);
            }
        });
});

2.8 中断机制 Human-in-the-Loop

v1.8.6+ 大幅增强了中断支持,允许在图执行的任意位置暂停,等待人工输入后恢复。

核心能力

  • NodeHooks 支持可中断动作和流式节点
  • CompiledGraph 集成中断元数据到 CompletableFuture 链
  • Studio 支持管理中断和审批工作流
  • 子图 中断支持(编译子图和状态子图自动支持)

使用方式

中断通过 CompileConfigInterruptBefore / InterruptAfter 配置:

var config = CompileConfig.builder()
    .checkpointSaver(saver)  // 中断必须配合检查点
    .interruptBefore(List.of("humanReview"))  // 进入节点前暂停
    .interruptAfter(List.of("generate"))      // 节点执行后暂停
    .build();

var workflow = graph.compile(config);

恢复执行

// 获取当前状态(包含中断信息)
var state = workflow.getState(runnableConfig);

// 人工审核后,使用更新后的状态恢复
workflow.invoke(null, runnableConfig);  // 传入 null 表示从断点恢复

注意:中断机制必须配合 CheckpointSaver 使用,否则无法保存和恢复状态。


2.9 取消支持

v1.8.5+ 引入图执行取消能力:

var result = workflow.invoke(inputs, config);

if (result.isCancelled()) {
    // 处理取消逻辑
    log.info("图执行被取消");
}

GraphResult.Status 枚举值:

  • COMPLETED — 正常完成
  • CANCELLED — 被取消
  • ERROR — 执行出错

第三部分:状态持久化

3.1 为什么需要检查点

检查点(Checkpoint)是 LangGraph4j 的核心能力之一,解决以下问题:

场景 无检查点 有检查点
Agent 循环推理中断 丢失所有进度 从最后一步恢复
Human-in-the-Loop 无法暂停等待人工输入 暂停 → 审核 → 恢复
长时间运行的任务 进程崩溃后从头开始 自动恢复到最后检查点
调试 只能看到最终结果 时间旅行到任意历史状态
多轮对话 invoke 之间无记忆 通过 threadId 关联历史

3.2 MemorySaver

内存存储,适用于开发和测试

var saver = new MemorySaver();

var config = CompileConfig.builder()
    .checkpointSaver(saver)
    .build();

var workflow = graph.compile(config);

特点

  • 零配置,开箱即用
  • 进程重启后数据丢失
  • 不适合生产环境

3.3 PostgresSaver(生产推荐)

依赖

<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-postgres-saver</artifactId>
    <version>1.8.11</version>
</dependency>

配置

var saver = PostgresSaver.builder()
    .host("localhost")
    .port(5432)
    .user("admin")
    .password("password")
    .database("langgraph4j")
    .stateSerializer(stateSerializer)  // 状态序列化器
    .createTables(true)                // 自动建表
    .dropTablesFirst(false)            // 是否先删表(生产环境设为 false)
    .build();

推荐版本:PostgreSQL 16.4+

特性

  • 内置内存缓存,减少数据库往返
  • 自动 Schema 初始化
  • clearCheckpointsCache() 方法清除指定线程的缓存

3.4 其他 Saver:MySQL / Oracle / Redis

MysqlSaver

<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-mysql-saver</artifactId>
</dependency>
var saver = MysqlSaver.builder()
    .host("localhost").port(3306)
    .user("root").password("pwd").database("langgraph4j")
    .stateSerializer(serializer)
    .build();

:warning: 已知问题:MysqlSaver 存在数据不一致 bug(#347#348),建议优先使用 PostgresSaver。

OracleSaver

<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-oracle-saver</artifactId>
</dependency>
var saver = OracleSaver.builder()
    .host("localhost").port(1521)
    .user("admin").password("pwd").database("langgraph4j")
    .stateSerializer(serializer)
    .build();

RedisSaver

<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-redis-saver</artifactId>
</dependency>
var saver = RedisSaver.builder()
    .host("localhost").port(6379)
    .stateSerializer(serializer)
    .build();

适用场景:高性能、低延迟、缓存层。


3.5 CompileConfig 配置

var config = CompileConfig.builder()
    .checkpointSaver(saver)       // 检查点存储(必需才能持久化)
    .releaseThread(false)         // false=保持线程(可恢复),true=执行完释放
    .interruptBefore(List.of("humanReview"))  // 中断:进入节点前暂停
    .interruptAfter(List.of("generate"))      // 中断:节点执行后暂停
    .build();
配置项 默认值 说明
checkpointSaver null(不持久化) 检查点存储实现
releaseThread false 执行完后是否释放线程
interruptBefore 空列表 进入指定节点前暂停
interruptAfter 空列表 指定节点执行后暂停

3.6 检查点操作 API

获取状态历史

// 获取完整状态历史(按时间倒序)
var history = workflow.getStateHistory(runnableConfig);
for (var snapshot : history) {
    System.out.println("检查点 ID: " + snapshot.config().checkpointId());
    System.out.println("节点: " + snapshot.next());
    System.out.println("状态: " + snapshot.values());
}

获取最新检查点

var lastSnapshot = workflow.lastStateOf(runnableConfig);

时间旅行:从历史状态恢复

// 获取特定检查点
var pastState = workflow.getState(runnableConfig, checkpointId);

// 从该检查点重新执行
var result = workflow.invoke(null, pastState.config());

释放线程资源

saver.release(runnableConfig);

重要:如果 releaseThread=false,长时间不释放线程可能导致资源泄漏。生产环境中应在合适的时机调用 release()


3.7 状态序列化

状态持久化要求所有对象可序列化。LangGraph4j 提供了 ObjectStreamStateSerializer

var serializer = new ObjectStreamStateSerializer<MyState>(MyState::new);

自定义序列化器

对于不可序列化的对象(如 LangChain4j 的 ChatMessage),需注册自定义序列化器:

var serializer = new ObjectStreamStateSerializer<MessageState>(MessageState::new);
serializer.mapper()
    .register(ToolExecutionRequest.class, new ToolExecutionRequestSerializer())
    .register(ChatMessage.class, new ChatMessageSerializer());

3.8 已知问题

Issue 描述 解决方案
#347 MysqlSaver 数据不一致 优先使用 PostgresSaver
#348 MysqlSaver loadedCheckpoints 排序 bug 使用 clearCheckpointsCache()
#321 Studio 中 thread_id 复用导致主键冲突 每次执行使用唯一 thread_id
#234 请求增强 Checkpoint 添加创建日期 待官方修复
#157 请求增强 StateSnapshot 添加 parentConfig 待官方修复

第四部分:子图与嵌套工作流

4.1 三种子图方式对比

方式 适用场景 中断支持 流式支持 状态 Schema
编译子图 (CompiledGraph) 父子图共享 Schema :white_check_mark: 自动 :white_check_mark: 自动 必须共享至少一个 key
节点动作包装 (NodeAction) 父子图 Schema 不同 :warning: 手动 :warning: 手动 可完全不同
状态子图 (StateGraph) 最紧密集成 :white_check_mark: 自动 :white_check_mark: 自动 编译时合并

4.2 编译子图(CompiledGraph)

将编译后的子图直接作为节点添加到父图中:

// 定义子图
var childGraph = new StateGraph<>(SharedState.SCHEMA, SharedState::new)
    .addNode("childStep1", childAction1)
    .addNode("childStep2", childAction2)
    .addEdge(START, "childStep1")
    .addEdge("childStep1", "childStep2")
    .addEdge("childStep2", END);

// 编译子图
var compiledChild = childGraph.compile();

// 添加到父图
var parentGraph = new StateGraph<>(SharedState.SCHEMA, SharedState::new)
    .addNode("parentStep", parentAction)
    .addNode("subgraph", compiledChild)  // 子图作为节点
    .addEdge(START, "parentStep")
    .addEdge("parentStep", "subgraph")
    .addEdge("subgraph", END);

要求:父子图至少共享一个 Schema key 才能通信。


4.3 节点动作包装(NodeAction)

在节点函数内部手动调用子图,适用于父子图 Schema 不同的情况:

parentGraph.addNode("subgraph", state -> {
    // 转换状态
    var childInput = Map.of(
        "childMessages", state.value("parentMessages").orElse(List.of())
    );

    // 手动调用子图
    var childResult = compiledChild.invoke(childInput);

    // 转换回父图状态
    return Map.of(
        "parentMessages", childResult.get("childMessages")
    );
});

特点

  • 需要手动处理状态转换
  • 中断和流式支持需自行实现
  • 灵活性最高

4.4 状态子图(StateGraph)

将未编译的 StateGraph 直接作为节点添加,编译时合并到父图中:

// 定义子图(不编译)
var childGraph = new StateGraph<>(SharedState.SCHEMA, SharedState::new)
    .addNode("childA", childActionA)
    .addNode("childB", childActionB)
    .addEdge(START, "childA")
    .addEdge("childA", "childB")
    .addEdge("childB", END);

// 直接添加到父图(编译时合并)
var parentGraph = new StateGraph<>(SharedState.SCHEMA, SharedState::new)
    .addNode("subgraph", childGraph)  // 传入 StateGraph 而非 CompiledGraph
    .addEdge(START, "subgraph")
    .addEdge("subgraph", END);

var workflow = parentGraph.compile();

编译时合并行为

  • 子图节点 ID 自动重命名避免冲突:subgraph:childAsubgraph:childB
  • 通过 SubGraphNode.formatId("subgraph", "childA") 获取合并后的真实 ID
  • 中断配置使用重命名后的 ID
  • 流式输出自动支持

4.5 图可视化

LangGraph4j 支持两种可视化格式:

编译前可视化(StateGraph 级别)

// PlantUML 格式
var puml = graph.getGraph(GraphRepresentation.Type.PLANTUML, "我的图", false);

// Mermaid.js 格式
var mermaid = graph.getGraph(GraphRepresentation.Type.MERMAID, "我的图", false);

编译后可视化(CompiledGraph 级别)

// 编译后显示合并后的完整结构(含子图展开)
var mermaid = workflow.getGraph(GraphRepresentation.Type.MERMAID, "我的工作流", false);

提示:将 Mermaid 输出粘贴到 Mermaid Live Editor 即可查看可视化图。


第五部分:多 Agent 编排模式

5.1 模式总览

模式 描述 适用场景
Supervisor 主管 Agent 路由任务到专业 Agent 最常用,适合大多数多 Agent 场景
层级 多层 Supervisor,团队管理 Agent 数量多,需要分组管理
并行 (Fork-Join) 多个 Agent 同时执行 独立任务并行加速
顺序 Agent 按固定顺序执行 流水线处理

5.2 Supervisor 模式

架构

用户请求 → Supervisor Agent → [Researcher | Coder | ...] → Supervisor → FINISH

Supervisor 使用 LLM 理解用户意图,决定将任务路由到哪个专业 Agent。专业 Agent 执行完毕后,结果返回 Supervisor,由 Supervisor 决定继续分配还是结束。

完整代码示例

import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import org.bsc.langgraph4j.*;
import org.bsc.langgraph4j.jdk.*;
import java.util.*;

// 1. 定义状态
class SupervisorState extends AgentState {
    public static final Map<String, Channel<?>> SCHEMA = Map.of(
        "messages", Channels.appender(ArrayList::new),
        "next",     Channel.DEFAULT
    );

    public SupervisorState(Map<String, Object> initData) { super(initData); }

    public Optional<String> next() { return value("next"); }
}

// 2. 定义 Supervisor Agent(使用 LLM 做路由决策)
class SupervisorAgent implements NodeAction<SupervisorState> {

    // 路由结果(LLM 结构化输出)
    static class Router {
        @Description("要路由到的 Worker 名称。如果所有工作已完成,返回 FINISH")
        String next;
    }

    // LLM Service 接口
    interface Service {
        @SystemMessage("""
            你是一个主管,负责管理以下 Worker 之间的对话:{{members}}。
            给定用户请求,决定下一步应该由哪个 Worker 执行。
            每个 Worker 执行任务后会返回结果和状态。
            当所有工作完成时,返回 FINISH。
            只返回 Worker 名称或 FINISH,不要有其他内容。
            """)
        Router evaluate(@V("members") String members, @UserMessage String userMessage);
    }

    private final Service service;
    private final String[] members;

    SupervisorAgent(Service service, String[] members) {
        this.service = service;
        this.members = members;
    }

    @Override
    public Map<String, Object> apply(SupervisorState state) {
        var userMessage = state.<List<String>>value("messages")
            .orElse(List.of()).get(0);
        var result = service.evaluate(
            String.join(", ", members),
            userMessage
        );
        return Map.of("next", result.next);
    }
}

// 3. 定义专业 Agent
class ResearcherAgent implements NodeAction<SupervisorState> {
    @Override
    public Map<String, Object> apply(SupervisorState state) {
        // 调用 LLM 执行研究任务
        return Map.of("messages", "研究结果:...");
    }
}

class CoderAgent implements NodeAction<SupervisorState> {
    @Override
    public Map<String, Object> apply(SupervisorState state) {
        // 调用 LLM 执行编码任务
        return Map.of("messages", "代码实现:...");
    }
}

// 4. 构建 Supervisor 图
public class SupervisorExample {
    public static void main(String[] args) throws Exception {
        var supervisor = new SupervisorAgent(supervisorService, new String[]{"researcher", "coder"});
        var researcher = new ResearcherAgent();
        var coder = new CoderAgent();

        var graph = new StateGraph<>(SupervisorState.SCHEMA, SupervisorState::new)
            .addNode("supervisor", supervisor)
            .addNode("researcher", researcher)
            .addNode("coder", coder)
            .addEdge(START, "supervisor")
            .addConditionalEdges("supervisor",
                state -> state.next().orElse(END),
                EdgeMappings.builder()
                    .to("researcher")
                    .to("coder")
                    .toEnd("FINISH")  // "FINISH" 字符串路由到 END
                    .build())
            .addEdge("researcher", "supervisor")  // 执行完返回主管
            .addEdge("coder", "supervisor");      // 执行完返回主管

        var workflow = graph.compile();

        var result = workflow.invoke(Map.of(
            "messages", "帮我研究 LangGraph4j 并写一个示例代码"
        ));
    }
}

Prompt 设计要点

  1. 清晰列出所有 Worker 的职责描述,否则 LLM 路由准确率显著下降
  2. 明确 FINISH 条件,避免无限循环
  3. 限制输出格式,只返回 Worker 名称或 FINISH

模型分层策略

Agent 角色 推荐模型 理由
Supervisor GPT-4o / Claude-3.5 需要强推理和规划能力
执行型 Agent GPT-4o-mini / Claude-3-Haiku 成本低、速度快
本地部署 Llama 3.1 / Qwen 2.5 数据安全、零 API 成本

:warning: 成本警告:多 Agent 系统的 Token 消耗约为单 Agent 的 15 倍,必须设置最大轮次熔断机制。


5.3 层级模式

通过子图实现多级 Supervisor:

// 团队 1:研究团队
var researchTeam = new StateGraph<>(TeamState.SCHEMA, TeamState::new)
    .addNode("researchSupervisor", researchSupervisor)
    .addNode("webSearcher", webSearcher)
    .addNode("dbQuerier", dbQuerier)
    .addEdge(START, "researchSupervisor")
    .addConditionalEdges("researchSupervisor", researchRouter, ...)
    .addEdge("webSearcher", "researchSupervisor")
    .addEdge("dbQuerier", "researchSupervisor");

// 团队 2:开发团队
var devTeam = new StateGraph<>(TeamState.SCHEMA, TeamState::new)
    .addNode("devSupervisor", devSupervisor)
    .addNode("coder", coder)
    .addNode("tester", tester)
    .addEdge(START, "devSupervisor")
    .addConditionalEdges("devSupervisor", devRouter, ...)
    .addEdge("coder", "devSupervisor")
    .addEdge("tester", "devSupervisor");

// 顶层主管
var topGraph = new StateGraph<>(TopState.SCHEMA, TopState::new)
    .addNode("topSupervisor", topSupervisor)
    .addNode("researchTeam", researchTeam.compile())  // 子图作为节点
    .addNode("devTeam", devTeam.compile())
    .addEdge(START, "topSupervisor")
    .addConditionalEdges("topSupervisor", topRouter,
        EdgeMappings.builder()
            .to("researchTeam")
            .to("devTeam")
            .toEnd("FINISH")
            .build())
    .addEdge("researchTeam", "topSupervisor")
    .addEdge("devTeam", "topSupervisor");

5.4 并行执行 Fork-Join

限制说明

:warning: LangGraph4j 的并行执行有严格限制

  1. 仅支持 Fork-Join 模型(一个节点分叉到多个并行节点,然后汇聚)
  2. 仅允许 一层并行步骤(并行节点内部不能再有并行分支)
  3. 并行分支中 不允许条件边

基本用法

var workflow = new MessagesStateGraph<String>()
    .addNode("A", makeNode("A"))       // 分叉点
    .addNode("A1", makeNode("A1"))     // 并行分支 1
    .addNode("A2", makeNode("A2"))     // 并行分支 2
    .addNode("A3", makeNode("A3"))     // 并行分支 3
    .addNode("B", makeNode("B"))       // 汇聚点
    // Fork:A → A1, A2, A3 并行执行
    .addEdge("A", "A1")
    .addEdge("A", "A2")
    .addEdge("A", "A3")
    // Join:A1, A2, A3 → B
    .addEdge("A1", "B")
    .addEdge("A2", "B")
    .addEdge("A3", "B")
    .addEdge(START, "A")
    .addEdge("B", END)
    .compile();

配置并行执行器

var config = RunnableConfig.builder()
    .addParallelNodeExecutor("A1", ForkJoinPool.commonPool())
    .addParallelNodeExecutor("A2", Executors.newFixedThreadPool(4))
    .addParallelNodeExecutor("A3", ForkJoinPool.commonPool())
    .build();

workflow.stream(inputs, config);

编译子图作为并行节点

并行节点可以是编译后的子图,实现更复杂的并行子工作流:

var subWorkflow = new StateGraph<>(...)
    .addNode("s1", step1)
    .addNode("s2", step2)
    .addEdge(START, "s1")
    .addEdge("s1", "s2")
    .addEdge("s2", END)
    .compile();

var mainGraph = new StateGraph<>(...)
    .addNode("parallel1", subWorkflow)  // 子图作为并行节点
    .addNode("parallel2", anotherSubWorkflow)
    .addNode("fork", forkNode)
    .addNode("join", joinNode)
    .addEdge("fork", "parallel1")
    .addEdge("fork", "parallel2")
    .addEdge("parallel1", "join")
    .addEdge("parallel2", "join")
    .compile();

5.5 顺序模式

通过固定边串联多个 Agent:

var graph = new StateGraph<>(SCHEMA)
    .addNode("researcher", researcher)
    .addNode("writer", writer)
    .addNode("reviewer", reviewer)
    .addNode("publisher", publisher)
    .addEdge(START, "researcher")
    .addEdge("researcher", "writer")    // 研究 → 写作
    .addEdge("writer", "reviewer")      // 写作 → 审稿
    .addEdge("reviewer", "publisher")   // 审稿 → 发布
    .addEdge("publisher", END);

5.6 AgentExecutor — 开箱即用的 ReACT Agent

LangGraph4j 提供了内置的 AgentExecutor,封装了完整的 ReACT 循环(思考 → 工具调用 → 观察 → 再思考)。

LangChain4j 版

// 依赖
// org.bsc.langgraph4j:langchain4j-agent

// 定义工具
public class MyTools {
    @Tool("获取指定城市的天气")
    String getWeather(@P("城市名称") String city) {
        return weatherService.getForecast(city);
    }

    @Tool("计算两个数的和")
    double sum(double a, double b) {
        return a + b;
    }
}

// 构建 AgentExecutor
var agentExecutor = AgentExecutor.builder()
    .chatModel(chatModel)                    // LangChain4j ChatModel
    .toolsFromObject(new MyTools())          // 从对象提取工具
    .tool(specification, executor)           // 动态添加工具
    .build();

var workflow = agentExecutor.compile();

// 同步执行
var result = workflow.invoke(Map.of(
    "messages", UserMessage.from("北京今天天气如何?234 + 45 等于多少?")
));

// 流式执行
for (var step : workflow.stream(Map.of(
    "messages", UserMessage.from("Hello!")
))) {
    System.out.println(step);
}

Spring AI 版

// 依赖
// org.bsc.langgraph4j:spring-ai-agent

var graph = AgentExecutor.builder()
    .chatModel(chatModel)          // Spring AI ChatModel
    .tools(toolCallbacks)          // Spring AI ToolCallback 列表
    .build();

var workflow = graph.compile();

var iterator = workflow.stream(Map.of(
    "messages", new UserMessage("query")
));

AgentExecutorEx(Spring AI 增强版)

支持 Skills、Approval Action、ConversationContextPolicy:

var agent = AgentExecutorEx.builder()
    .chatModel(chatModel, true)     // true = 启用流式
    .toolsFromObject(new TestTools())
    .defaultSystem("始终使用可用的技能来协助用户")
    .skills(resourceLoader.getResource("classpath:skills"))  // 技能文件
    .conversationContextPolicy(new MessageWindowConversationContextPolicy(10))  // 消息窗口
    .build()
    .compile(compileConfig);

ConversationContextPolicy

v1.8.6+ 引入,用于在 LLM 调用前过滤消息(如滑动窗口),控制发送给 LLM 的上下文长度:

// LangChain4j 版
var agentExecutor = AgentExecutor.builder()
    .chatModel(chatModel)
    .conversationContextPolicy(new MessageWindowConversationContextPolicy(10))
    .build();

// 只保留最近 10 条消息发送给 LLM,避免超出上下文窗口

5.7 Adaptive RAG 实战

Adaptive RAG 是一种根据查询复杂度动态选择检索策略的模式。官方示例位于 langgraph4j-examples/langchain4j/adaptive-rag

架构

START
  ↓
[分析查询] ──→ [web_search] ──→ [grade_documents]
  ↓                                ↓
[vectorstore_retrieve] ──→ [grade_documents]
                                   ↓
                    [transform_query] ←── (not_useful)
                                   ↓
                              [generate] ──→ (useful) → END

核心节点

节点 功能 路由条件
web_search Tavily Web 搜索 查询需要实时信息
retrieve Chroma 向量数据库检索 查询可从知识库回答
grade_documents 评估检索文档的相关性 useful / not_useful / not_supported
transform_query 重写查询以改善检索 文档不相关时
generate 基于检索结果生成回答 文档相关时

依赖

  • Chroma 向量数据库(Docker 部署)
  • Tavily Web Search API
  • LangChain4j 集成模块

第六部分:LLM 框架集成

6.1 与 LangChain4j 集成

模块依赖

<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langchain4j-core-jdk</artifactId>
    <version>1.8.11</version>
</dependency>

工具状态共享(InvocationParameters)

通过 InvocationParameters 在图状态和工具之间传递数据:

// 工具端:通过 InvocationParameters 接收状态
@Tool("执行测试")
String execTest(String message, InvocationParameters context) {
    Object value = context.get("sharedKey");  // 从状态中读取
    return "结果: " + message;
}

// 图端:通过 InvocationContext 传递状态
var toolService = LC4jToolService.builder()
    .toolsFromObject(new MyTools())
    .build();

Command callResult = toolService.execute(
    List.of(toolExecutionRequest),
    InvocationContext.builder()
        .invocationParameters(InvocationParameters.from(state))
        .build(),
    "messages"
).join();

6.2 与 Spring AI 集成

模块依赖

<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-spring-ai</artifactId>
    <version>1.8.11</version>
</dependency>

工具状态共享(ToolContext)

通过 Spring AI 的 ToolContext 在工具中访问和修改图状态:

@Tool(description = "执行测试工具")
String execTest(@ToolParam String message, ToolContext context) {
    // 读取图状态
    Map<String, Object> state = context.getContext();
    Object value = state.get("sharedKey");

    // 更新图状态
    return SpringAIToolResponseBuilder.of(context)
        .update(Map.of("arg0", message))
        .buildAndReturn("结果: " + message);
}

6.3 流式输出

LangGraph4j 内建一等公民的流式支持,基于 java-async-generator 库。

图级流式

for (var step : workflow.stream(inputs, runnableConfig)) {
    // step 是 NodeOutput,包含节点名和状态
    System.out.println("节点: " + step.node());
    System.out.println("状态: " + step.state());
}

LLM Token 级流式(LangChain4j)

var generator = StreamingChatGenerator.<AiMessage, MyState>builder()
    .mapResult(response -> {
        if (response.finishReason() == FinishReason.TOOL_EXECUTION) {
            return Map.of("agent_outcome", new AgentOutcome(action, null));
        }
        return Map.of("agent_outcome", new AgentOutcome(null, finish));
    })
    .startingNode("agent")
    .startingState(state)
    .build();

// 将 generator 嵌入状态,图的 .stream() 会自动展开
streamingChatLanguageModel.generate(messages, tools, generator.handler());
return Map.of("agent_outcome", generator);

LLM Token 级流式(Spring AI)

Flux<ChatResponse> flux = chatClient.prompt()
    .messages(state.messages())
    .stream()
    .chatResponse();

var generator = StreamingChatGenerator.<MyState>builder()
    .startingNode("agent")
    .startingState(state)
    .mapResult(response -> Map.of("messages", response.getResult().getOutput()))
    .build(flux);

return Map.of("messages", generator);

StreamingOutputEnd(v1.8.5+)

用于显式标记流式输出结束:

if (output instanceof StreamingOutputEnd) {
    // 流式输出完成
}

StreamingOutput.isEnd() 已废弃,替换为 isStreamingEnd()


6.4 模型选择策略

Agent 角色 推荐模型 Token 成本 延迟
Supervisor / 路由 GPT-4o / Claude-3.5-Sonnet
复杂推理 Agent GPT-4o / Claude-3.5-Sonnet
简单执行 Agent GPT-4o-mini / Claude-3-Haiku
嵌入模型 text-embedding-3-small 极低
本地部署 Llama 3.1 8B / Qwen 2.5 7B 取决于硬件

成本控制建议

  1. Supervisor 必须用强模型(路由准确率直接决定系统质量)
  2. 执行型 Agent 优先用轻量模型
  3. 设置 ConversationContextPolicy 限制上下文长度
  4. 实现语义缓存减少重复推理
  5. 设置全局 Token 预算和熔断机制

第七部分:调试与可观测性

7.1 LangGraph4j Studio

Studio 是 LangGraph4j 内置的可视化调试工具,以 Web UI 形式提供交互式调试。

功能特性

  • :bar_chart: 图结构可视化(Mermaid 图)
  • :play_button: 运行工作流并实时显示执行步骤
  • :memo: 查看/编辑每个步骤的状态数据
  • :pause_button: 管理中断(Human-in-the-Loop 审批)
  • :counterclockwise_arrows_button: 恢复执行(从检查点恢复)
  • :satellite_antenna: 基于 HTTP Streaming 的实时通信

三种服务器实现

Jetty(独立运行)

<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-studio-jetty</artifactId>
    <version>1.8.11</version>
</dependency>
var instance = LangGraphStudioServer.Instance.builder()
    .title("我的 Agent")
    .graph(workflow)
    .addInputStringArg("messages", true, v -> new UserMessage(Objects.toString(v)))
    .compileConfig(CompileConfig.builder()
        .checkpointSaver(new MemorySaver())
        .build())
    .build();

LangGraphStudioServer4Jetty.builder()
    .port(8080)
    .instance("default", instance)
    .build()
    .start()
    .join();

Spring Boot

<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-studio-springboot</artifactId>
    <version>1.8.11</version>
</dependency>
@Configuration
public class StudioConfig extends LangGraphStudioConfig {

    @Override
    public Map<String, LangGraphStudioServer.Instance> instanceMap() {
        return Map.of("myAgent", LangGraphStudioServer.Instance.builder()
            .title("我的 Agent")
            .graph(workflow)
            .addInputStringArg("messages", true,
                v -> new UserMessage(Objects.toString(v)))
            .compileConfig(CompileConfig.builder()
                .checkpointSaver(new MemorySaver())
                .releaseThread(true)
                .build())
            .build());
    }
}

Quarkus

<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-studio-quarkus</artifactId>
    <version>1.8.11</version>
</dependency>

配置方式类似 Spring Boot。

访问地址

http://localhost:<port>/?instance=<instance_id>

检测 Studio 环境

boolean inStudio = RunnableConfig.isRunningInStudio();
// 基于 STUDIO_METADATA_KEY 判断

7.2 日志配置

LangGraph4j 使用 SLF4J 日志门面,支持 Logback / Log4j2 等实现。

logging.properties 示例

# 全局日志级别
.level=INFO

# LangGraph4j 日志级别
org.bsc.langgraph4j.level=FINE

# 节点执行日志
org.bsc.langgraph4j.CompiledGraph.level=FINE

# 检查点日志
org.bsc.langgraph4j.checkpoint.level=FINE

7.3 OpenTelemetry 集成

依赖

<dependency>
    <groupId>org.bsc.langgraph4j</groupId>
    <artifactId>langgraph4j-opentelemetry</artifactId>
    <version>1.8.11</version>
</dependency>

核心组件

组件 功能
OTELWrapCallTraceHook 为每个节点和边调用创建 Span,添加 config/state 属性
OTELWrapCallTraceSetParentHook 创建父 Span,将节点/边 Span 归组到工作流范围内

集成方式

var otelHook = new OTELWrapCallTraceHook<MyState>(stateSerializer);

var parentHook = OTELWrapCallTraceSetParentHook.<MyState>builder()
    .scope("MyWorkflow")     // 工作流范围名
    .groupName("stream")     // Span 组名
    .build();

var workflow = new StateGraph<>(MyState.SCHEMA, stateSerializer)
    // otelHook 先添加(FIFO,内层执行)
    .addWrapCallNodeHook(otelHook)
    .addWrapCallEdgeHook(otelHook)
    // parentHook 后添加(FIFO,外层执行,包裹所有节点/边)
    .addWrapCallNodeHook(parentHook)
    .addWrapCallEdgeHook(parentHook)
    .addNode("node1", action1)
    .addNode("node2", action2)
    .compile();

本地测试

模块内置了 Docker Compose 配置:

# Jaeger(UI 端口 16686)
docker compose -f src/docker/docker-compose-jaeger.yml up

# Grafana + Loki + OTel Collector
docker compose -f src/docker/docker-compose.yml up

7.4 自定义 Hook 实现指标收集

// 全局耗时统计 Hook
graph.addWrapCallNodeHook((nodeId, state, config, action) -> {
    var start = System.nanoTime();
    return action.call()
        .whenComplete((result, error) -> {
            var durationMs = (System.nanoTime() - start) / 1_000_000;
            // 记录到 Micrometer
            meterRegistry.timer("langgraph4j.node.duration",
                "node", nodeId,
                "status", error == null ? "success" : "error"
            ).record(durationMs, TimeUnit.MILLISECONDS);

            // 记录到日志
            log.info("节点 {} 执行{},耗时 {}ms",
                nodeId,
                error == null ? "成功" : "失败: " + error.getMessage(),
                durationMs);
        });
});

7.5 图可视化调试

PlantUML 输出

// 编译前
var puml = graph.getGraph(GraphRepresentation.Type.PLANTUML, "调试图", false);
System.out.println(puml);
// 粘贴到 https://www.plantuml.com/plantuml 查看

Mermaid.js 输出

// 编译后(含子图展开)
var mermaid = workflow.getGraph(GraphRepresentation.Type.MERMAID, "工作流", false);
System.out.println(mermaid);
// 粘贴到 https://mermaid.live 查看

第八部分:生产部署与运维

8.1 部署架构概述

关键认知:LangGraph4j 是一个 Java 库,不是独立服务。这与 Python 版 LangGraph(有 LangGraph Platform/Server)有本质区别。

部署 LangGraph4j 应用 = 部署一个标准的 Java 应用(Spring Boot / Quarkus / 普通 JAR)。

┌─────────────────────────────────────────┐
│         标准 Java 应用                   │
│  ┌─────────────────────────────────┐    │
│  │  Spring Boot / Quarkus          │    │
│  │  ┌───────────────────────────┐  │    │
│  │  │  LangGraph4j (库)         │  │    │
│  │  │  StateGraph + CompiledGraph│  │    │
│  │  └───────────────────────────┘  │    │
│  │  ┌─────────────┐ ┌───────────┐  │    │
│  │  │ LangChain4j │ │ Spring AI │  │    │
│  │  └─────────────┘ └───────────┘  │    │
│  └─────────────────────────────────┘    │
│  ┌──────────┐ ┌──────────┐ ┌────────┐  │
│  │ REST API │ │ WebSocket│ │ gRPC   │  │
│  └──────────┘ └──────────┘ └────────┘  │
└─────────────────────────────────────────┘

8.2 Spring Boot 集成

依赖注入

Graph 节点可以作为 Spring Bean,通过标准 DI 注入服务:

@Configuration
public class ChatModelConfiguration {

    @Bean
    @Profile("ollama")
    public ChatModel ollamaModel() {
        return OllamaChatModel.builder()
            .ollamaApi(new OllamaApi("http://localhost:11434"))
            .defaultOptions(OllamaOptions.builder()
                .model("qwen2.5:7b")
                .temperature(0.1)
                .build())
            .build();
    }

    @Bean
    @Profile("openai")
    public ChatModel openAiModel() {
        return OpenAiChatModel.builder()
            .apiKey(System.getenv("OPENAI_API_KEY"))
            .modelName("gpt-4o")
            .build();
    }
}

Web API 暴露

LangGraph4j 的 Graph 编译后是纯 Java 对象,需要自行编写 Controller 暴露端点:

@RestController
@RequestMapping("/api/agent")
public class AgentController {

    private final CompiledGraph<MyState> workflow;

    public AgentController(CompiledGraph<MyState> workflow) {
        this.workflow = workflow;
    }

    @PostMapping("/invoke")
    public Map<String, Object> invoke(@RequestBody Map<String, Object> request) {
        return workflow.invoke(request);
    }

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<NodeOutput> stream(@RequestParam String input) {
        return Flux.fromIterable(
            workflow.stream(Map.of("input", input))
        );
    }
}

环境变量管理

# application.properties
OPENAI_API_KEY=${OPENAI_API_KEY}
TAVILY_API_KEY=${TAVILY_API_KEY}
OLLAMA_BASE_URL=${OLLAMA_BASE_URL:http://localhost:11434}
# .env 文件
OPENAI_API_KEY=sk-xxx
TAVILY_API_KEY=tvly-xxx

8.3 Docker 容器化

FROM eclipse-temurin:17-jdk-alpine AS build
WORKDIR /app
COPY pom.xml .
COPY src ./src
RUN ./mvnw clean package -DskipTests

FROM eclipse-temurin:17-jre-alpine
WORKDIR /app
COPY --from=build /app/target/*.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-XX:+UseZGC", "-Xmx512m", "-jar", "app.jar"]
# 构建并运行
docker build -t langgraph4j-app .
docker run -p 8080:8080 \
  -e OPENAI_API_KEY=sk-xxx \
  -e TAVILY_API_KEY=tvly-xxx \
  langgraph4j-app

8.4 Kubernetes 部署

Pod 配置

apiVersion: v1
kind: Pod
metadata:
  name: langgraph4j-app
spec:
  containers:
  - name: app
    image: your-registry/langgraph4j-app:1.8.11
    ports:
      - containerPort: 8080
    env:
      - name: OPENAI_API_KEY
        valueFrom:
          secretKeyRef:
            name: agent-secrets
            key: openai-api-key
      - name: DATABASE_URL
        valueFrom:
          secretKeyRef:
            name: agent-secrets
            key: database-url
    resources:
      requests:
        memory: "512Mi"
        cpu: "500m"
      limits:
        memory: "2Gi"
        cpu: "2000m"
    livenessProbe:
      httpGet:
        path: /actuator/health
        port: 8080
      initialDelaySeconds: 30
      periodSeconds: 10
    readinessProbe:
      httpGet:
        path: /actuator/health/readiness
        port: 8080
      initialDelaySeconds: 10
      periodSeconds: 5

HPA 弹性伸缩

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: langgraph4j-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: langgraph4j-app
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

8.5 持久化选型对比

存储后端 模块 性能 可靠性 运维复杂度 推荐场景
PostgreSQL langgraph4j-postgres-saver :star: 生产首选
MySQL langgraph4j-mysql-saver 已有 MySQL 基础设施
Oracle langgraph4j-oracle-saver Oracle 环境集成
Redis langgraph4j-redis-saver 极高 高性能/缓存层
Memory langgraph4j-core 极高 仅开发测试

建议:生产环境优先使用 PostgreSQL(相对稳定,MysqlSaver 有已知 bug)。


8.6 连接池调优

数据库连接池(HikariCP)

# application.yml
spring:
  datasource:
    hikari:
      maximum-pool-size: 20
      minimum-idle: 5
      connection-timeout: 30000
      idle-timeout: 600000
      max-lifetime: 1800000

HTTP 客户端连接池

LangChain4j 和 Spring AI 的 HTTP 客户端底层使用 OkHttp/Retrofit,通过其配置调整:

# LangChain4j 连接超时
langchain4j.open-ai.timeout=60s

8.7 安全设计

层面 措施
API Key 管理 Kubernetes Secrets / HashiCorp Vault,绝不硬编码
传输加密 TLS 1.3、mTLS(Istio)
速率限制 Bucket4j 或网关层限流
访问控制 Spring Security + OAuth2 / JWT
Prompt 注入防护 输入/输出 Guardrails
审计日志 全量记录 Agent 操作和 LLM 调用

第九部分:性能优化

9.1 异步执行

节点异步化

// 使用 node_async 包装异步节点
graph.addNode("llmNode", node_async(state ->
    CompletableFuture.supplyAsync(() -> {
        // 非阻塞 LLM 调用
        var response = chatModel.generate(state.messages());
        return Map.of("messages", response);
    })
));

边异步化

graph.addConditionalEdges("agent",
    edge_async(state -> {
        // 异步路由决策
        return CompletableFuture.supplyAsync(() ->
            state.lastMessage().isPresent() ? "tools" : END
        );
    }),
    EdgeMappings.builder()
        .to("tools")
        .toEnd()
        .build()
);

9.2 并行节点优化

ForkJoinPool 配置

// 为并行节点指定专用线程池
var config = RunnableConfig.builder()
    .addParallelNodeExecutor("nodeA", Executors.newFixedThreadPool(4))
    .addParallelNodeExecutor("nodeB", ForkJoinPool.commonPool())
    .build();

注意事项

  • 并行节点数量不宜过多(建议 ≤ 5),避免线程竞争
  • I/O 密集型节点(LLM 调用、数据库查询)适合并行
  • CPU 密集型节点并行收益有限

9.3 Token 成本控制

模型分层

// Supervisor 使用强模型
var supervisorModel = OpenAiChatModel.builder()
    .apiKey(apiKey).modelName("gpt-4o").build();

// 执行 Agent 使用轻量模型
var executorModel = OpenAiChatModel.builder()
    .apiKey(apiKey).modelName("gpt-4o-mini").build();

ConversationContextPolicy

// 限制发送给 LLM 的消息数量
var agent = AgentExecutor.builder()
    .chatModel(chatModel)
    .conversationContextPolicy(new MessageWindowConversationContextPolicy(10))
    .build();

预算熔断

// 通过 Hook 实现 Token 预算控制
AtomicLong totalTokens = new AtomicLong(0);
long MAX_TOKENS = 100_000;

graph.addAfterCallNodeHook((nodeId, state, config, result) -> {
    // 累计 Token 使用量
    // 如果超出预算,抛出异常终止执行
    if (totalTokens.addAndGet(estimateTokens(result)) > MAX_TOKENS) {
        throw new RuntimeException("Token 预算超限");
    }
});

9.4 检查点缓存策略

PostgresSaver 内置内存缓存,减少数据库往返:

// 手动清除指定线程的缓存
saver.clearCheckpointsCache(runnableConfig);

对于高频访问的场景,可以考虑在 PostgresSaver 前加一层 Redis 缓存。


第十部分:常见问题与陷阱

10.1 状态管理陷阱

Schema 不匹配

问题:节点返回了 Schema 中未定义的 key。

// Schema 只定义了 "messages" 和 "next"
var SCHEMA = Map.of("messages", Channels.appender(ArrayList::new), "next", Channel.DEFAULT);

// 节点返回了 "result"(未定义)→ 编译时或运行时异常
return Map.of("result", "done");  // ❌ 错误

解决:确保所有节点返回的 key 都在 Schema 中定义。

序列化失败

问题:状态中包含不可序列化的对象(如 LangChain4j 的 ChatMessage)。

解决:注册自定义序列化器:

var serializer = new ObjectStreamStateSerializer<>(MyState::new);
serializer.mapper()
    .register(ChatMessage.class, new ChatMessageSerializer())
    .register(ToolExecutionRequest.class, new ToolExecutionRequestSerializer());

状态丢失

问题:不配置 CheckpointSaver 时,invoke() 调用之间没有记忆。

解决:配置 CompileConfig.builder().checkpointSaver(saver).build()


10.2 并行执行限制

:warning: 这是 LangGraph4j 最容易踩的坑之一

限制 说明
仅 Fork-Join 必须是一个分叉点对应一个汇聚点
单层并行 并行节点内部不能再有并行分支
无条件边 并行分支中不能使用 addConditionalEdges
无嵌套并行 Issue #332 讨论了进一步改进需求

设计建议:在设计工作流前,先画出图结构,确认是否满足这些限制。


10.3 检查点问题

MysqlSaver 数据不一致

  • Issue #347: MysqlSaver 存在数据不一致问题
  • Issue #348: loadedCheckpoints 排序 bug
  • 解决方案: 使用 clearCheckpointsCache() 清理缓存,或改用 PostgresSaver

thread_id 冲突

  • Issue #321: Studio 中复用 thread_id 会触发主键冲突
  • 解决方案: 每次执行使用唯一的 thread_id

检查点 TTL

LangGraph4j 不内置检查点 TTL 策略。需要在数据库层面配置:

-- PostgreSQL: 定期清理过期检查点
DELETE FROM checkpoints WHERE created_at < NOW() - INTERVAL '7 days';

10.4 流式输出问题

版本间输出不一致

  • Issue #350: 1.8.4 和 1.8.5/1.8.6 版本间 stream 输出结果不同
  • 解决方案: 锁定版本,升级前充分测试

流式中断图

  • Issue #118: 在流式节点输出时中断图执行仍是一个待解决问题

10.5 模型兼容性

本地模型函数调用失败

  • Issue #275: 本地部署 DeepSeek V3 时函数调用失败
  • 原因: 部分开源模型对 Function Calling / Tool Use 的支持不完善
  • 解决方案:使用经过验证的模型(Llama 3.1、Qwen 2.5),或升级模型版本

10.6 社区与支持

资源 说明
GitHub Issues 问题反馈(当前 20 个 Open Issues)
GitHub Discussions 技术讨论
示例仓库 官方示例代码
官方文档 API 文档和教程

注意:LangGraph4j 核心维护者基本为 bsorrentino 一人(1.5k Stars),社区规模有限。遇到问题时建议:

  1. 先查阅 Issues 和源码
  2. 参考 Python LangGraph 文档(概念通用)
  3. 参考 how-tos/ 目录的 Jupyter Notebook

附录

A. 模块依赖速查表

模块 Maven Artifact 用途
核心引擎 langgraph4j-core StateGraph, AgentState, Channel, Hooks
BOM langgraph4j-bom 统一版本管理
PostgreSQL langgraph4j-postgres-saver PostgreSQL 检查点
MySQL langgraph4j-mysql-saver MySQL 检查点
Oracle langgraph4j-oracle-saver Oracle 检查点
Redis langgraph4j-redis-saver Redis 检查点
OpenTelemetry langgraph4j-opentelemetry 分布式追踪
LangChain4j 核心 langchain4j-core-jdk LangChain4j 集成
LangChain4j Agent langchain4j-agent AgentExecutor (LangChain4j)
Spring AI 核心 langgraph4j-spring-ai Spring AI 集成
Spring AI Agent spring-ai-agent AgentExecutor (Spring AI)
Studio Jetty langgraph4j-studio-jetty Studio (Jetty)
Studio SpringBoot langgraph4j-studio-springboot Studio (Spring Boot)
Studio Quarkus langgraph4j-studio-quarkus Studio (Quarkus)

B. 核心 API 速查表

StateGraph

new StateGraph<>(SCHEMA)
new StateGraph<>(SCHEMA, StateFactory)
new MessagesStateGraph<M>()

.addNode(String id, NodeAction<S>)
.addEdge(String from, String to)
.addConditionalEdges(String from, EdgeAction<S>, EdgeMappings)
.addConditionalEntryPoint(EdgeAction<S>, EdgeMappings)
.compile()
.compile(CompileConfig)

Channel

Channel.DEFAULT                    // 覆盖(无默认值)
Channel.of(defaultValue)           // 覆盖(有默认值)
Channels.appender(Supplier<List>)  // 追加
MessageChannel.Appender.of(Supplier<List>)  // 消息追加

EdgeMappings

EdgeMappings.builder()
    .to("nodeA")
    .to("nodeB")
    .toEnd()          // 路由到 END
    .toEnd("FINISH")  // 字符串 "FINISH" 路由到 END
    .build()

CompiledGraph

workflow.invoke(Map<String, Object> inputs)
workflow.invoke(Map<String, Object> inputs, RunnableConfig)
workflow.stream(Map<String, Object> inputs)
workflow.stream(Map<String, Object> inputs, RunnableConfig)
workflow.getStateHistory(RunnableConfig)
workflow.lastStateOf(RunnableConfig)
workflow.getState(RunnableConfig, String checkpointId)

CompileConfig

CompileConfig.builder()
    .checkpointSaver(saver)
    .releaseThread(boolean)
    .interruptBefore(List<String>)
    .interruptAfter(List<String>)
    .build()

RunnableConfig

RunnableConfig.builder()
    .threadId(String)
    .addParallelNodeExecutor(String nodeId, Executor)
    .build()

C. 关键资源链接

LangChain4j + LangGraph 实现 Agent 集群 — 全技术细节整理

版本: v1.0 | 更新日期: 2026-04-07
适用范围: 基于 LangChain4j(Java)与 LangGraph(Python/LangGraph4j)构建多 Agent 协作系统的完整技术参考


目录


一、架构总览

1.1 系统全景图

┌─────────────────────────────────────────────────────────────────┐
│                      客户端 / API Gateway                        │
│            (认证、限流、路由、TLS 终止、负载均衡)                  │
└──────────────────────┬──────────────────┬───────────────────────┘
                       │                  │
        ┌──────────────▼──────┐  ┌────────▼────────────────┐
        │  Java Agent 集群    │  │  Python Agent 集群       │
        │  ┌───────────────┐  │  │  ┌───────────────────┐  │
        │  │ LangGraph4j   │  │  │  │ LangGraph         │  │
        │  │ (图编排引擎)   │  │  │  │ (图编排引擎)       │  │
        │  ├───────────────┤  │  │  ├───────────────────┤  │
        │  │ LangChain4j   │  │  │  │ LangChain         │  │
        │  │ (Agent框架)    │  │  │  │ (Agent框架)       │  │
        │  ├───────────────┤  │  │  ├───────────────────┤  │
        │  │ Spring Boot   │  │  │  │ FastAPI           │  │
        │  │ (应用框架)     │  │  │  │ (应用框架)         │  │
        │  └───────────────┘  │  │  └───────────────────┘  │
        └──────────┬──────────┘  └──────────┬──────────────┘
                   │                        │
      ┌────────────▼────────────────────────▼──────────────┐
      │              通信层                                  │
      │  gRPC / REST / Kafka / RabbitMQ / Redis Pub/Sub    │
      └────────────────────┬───────────────────────────────┘
                           │
      ┌────────────────────▼───────────────────────────────┐
      │              共享状态层                              │
      │  Redis (热数据/会话/分布式锁)                        │
      │  PostgreSQL (检查点/审计日志)                        │
      └────────────────────┬───────────────────────────────┘
                           │
      ┌────────────────────▼───────────────────────────────┐
      │              基础设施层                              │
      │  Kubernetes / Istio / Prometheus / Jaeger / Vault  │
      └────────────────────────────────────────────────────┘

1.2 核心设计原则

原则 说明
模块化 每个 Agent 独立开发、测试、部署、扩缩容
专业化 每个 Agent 专注特定领域(搜索、计算、翻译等)
可观测 全链路追踪、指标监控、日志关联
弹性 重试、熔断、降级、限流
安全 mTLS、RBAC、API Key 管理、数据加密
渐进式 从单 Agent 开始,按需引入多 Agent 协作

二、LangChain4j 核心技术细节

2.1 AiServices 架构

2.1.1 核心理念

AiServices 是 LangChain4j 的高层抽象,采用声明式接口 + 动态代理模式(类似 Spring Data JPA / Retrofit)。开发者定义接口,LangChain4j 在运行时通过反射生成实现类。

2.1.2 Agent 定义

// 最简定义
interface Assistant {
    String chat(String userMessage);
}

// 完整定义
interface Assistant {
    @SystemMessage("你是一个专业的技术顾问")
    @UserMessage("请回答以下问题:{{question}}")
    String answer(@V("question") String question, @MemoryId String memoryId);
}

2.1.3 关键注解

注解 用途 示例
@SystemMessage 系统提示词模板 @SystemMessage("你是...")
@UserMessage 用户消息模板 @UserMessage("{{var}}")
@V("name") 模板变量绑定 @V("topic") String topic
@MemoryId 记忆 ID(多用户隔离) @MemoryId String sessionId
@Tool 工具方法声明 @Tool("描述") String method()
@P 工具参数描述 @P("城市名") String city

2.1.4 构建方式

Assistant assistant = AiServices.builder(Assistant.class)
    .chatLanguageModel(model)
    .chatMemory(MessageWindowChatMemory.withMaxMessages(10))
    .tools(new WeatherTools(), new CalculatorTools())
    .contentRetriever(embeddingStoreContentRetriever)
    .systemMessageProvider(chatMemoryId -> "自定义系统提示词: " + chatMemoryId)
    .build();

2.1.5 返回类型

类型 说明
String 原始文本
boolean / Enum 结构化简单类型
POJO 自动从 JSON 解析(支持嵌套)
List<T> / Set<T> 集合类型
Result<T> 包装类型,含 TokenUsage、Sources、ToolExecution
TokenStream 流式返回
Flux<String> Reactor 流式返回(需 langchain4j-reactor

2.2 多 Agent 编排(langchain4j-agentic 模块)

注意: 该模块目前标记为实验性(Experimental)

2.2.1 Agent 定义(@Agent 注解)

public interface CreativeWriter {
    @UserMessage("你是创意写手,根据主题 {{topic}} 生成不超过3句话的故事")
    @Agent("根据给定主题生成故事")
    String generateStory(@V("topic") String topic);
}

2.2.2 AgenticScope(Agent 间共享作用域)

AgenticScope 是 Agent 间共享数据的集合:

  • 存储共享变量(Agent 写入结果,其他 Agent 读取)
  • 自动记录所有 Agent 的调用序列和响应
  • 支持 Agent 间协作通信
CreativeWriter writer = AgenticServices.agentBuilder(CreativeWriter.class)
    .chatModel(model)
    .outputKey("story")  // 结果写入 AgenticScope 的 "story" 变量
    .build();

2.2.3 工作流模式

① 顺序工作流(Sequential)

UntypedAgent workflow = AgenticServices.sequenceBuilder()
    .subAgents(creativeWriter, audienceEditor, styleEditor)
    .outputKey("story")
    .build();

String story = (String) workflow.invoke(Map.of(
    "topic", "dragons and wizards",
    "style", "fantasy",
    "audience", "young adults"
));

② 循环工作流(Loop)

UntypedAgent reviewLoop = AgenticServices.loopBuilder()
    .subAgents(styleScorer, styleEditor)
    .maxIterations(5)
    .exitCondition(scope -> scope.readState("score", 0.0) >= 0.8)
    .build();

③ 并行工作流(Parallel)

UntypedAgent planner = AgenticServices.parallelBuilder()
    .subAgents(foodExpert, movieExpert)
    .outputKey("plan")
    .build();

2.2.4 Agent 委托(AI Service 作为工具)

// 将 AI Service 作为工具暴露给路由 Agent
interface RouterAgent {
    @UserMessage("分析请求并转发给合适的专家...")
    String askToExpert(String request);
}

interface MedicalExpert {
    @UserMessage("你是医学专家...")
    @Tool("医学专家")
    String medicalRequest(String request);
}

RouterAgent router = AiServices.builder(RouterAgent.class)
    .chatModel(model)
    .tools(medicalExpert, legalExpert)  // AI Service 作为工具
    .build();

2.3 LLM 提供商集成

2.3.1 支持的提供商(25+)

分类 提供商 模块名
商业 API OpenAI langchain4j-open-ai
Azure OpenAI langchain4j-azure-open-ai
Anthropic (Claude) langchain4j-anthropic
Google Gemini langchain4j-google-ai-gemini
Google Vertex AI langchain4j-vertex-ai-gemini
Amazon Bedrock langchain4j-bedrock
Mistral AI langchain4j-mistral-ai
IBM watsonx langchain4j-watsonx
Oracle OCI langchain4j-oci-genai
国产模型 阿里通义千问 langchain4j-dashscope
智谱 AI langchain4j-zhipu-ai
百度千帆 langchain4j-qianfan
ChatGLM langchain4j-chatglm
本地部署 Ollama langchain4j-ollama
LocalAI langchain4j-local-ai
Jlama (Java 原生) langchain4j-jlama
Xinference langchain4j-xinference
通用 OpenAI-Compatible 任何兼容 OpenAI API 的服务

2.3.2 OpenAI-Compatible 模式

// 连接任何 OpenAI 兼容服务(Ollama、vLLM、LocalAI 等)
ChatModel model = OpenAiChatModel.builder()
    .baseUrl("http://localhost:11434/v1")  // Ollama
    .apiKey("ollama")
    .modelName("llama3.1")
    .build();

2.3.3 多模型策略

Agent 角色 推荐模型 理由
Supervisor/路由 GPT-4o / Claude-3.5 需要强推理和规划能力
执行型 Agent GPT-4o-mini / Claude-3-Haiku 成本低、速度快
嵌入模型 text-embedding-3-small 性价比高
本地部署 Llama 3.1 / Qwen 2.5 数据安全、无网络依赖

2.4 内存管理

2.4.1 内存类型对比

类型 类名 策略 适用场景
消息窗口 MessageWindowChatMemory 保留最近 N 条消息 原型开发、短对话
Token 窗口 TokenWindowChatMemory 保留最近 N 个 Token 生产环境、精确控制
持久化 自定义 ChatMemoryStore 外部存储 多实例、会话恢复

2.4.2 使用示例

// 消息窗口
ChatMemory memory = MessageWindowChatMemory.withMaxMessages(10);

// Token 窗口
ChatMemory memory = TokenWindowChatMemory.builder()
    .maxTokens(2000)
    .tokenCountEstimator(new OpenAiTokenCountEstimator())
    .build();

// 持久化内存
ChatMemory memory = MessageWindowChatMemory.builder()
    .id("session-123")
    .maxMessages(10)
    .chatMemoryStore(new PersistentChatMemoryStore())
    .build();

2.4.3 关键细节

  • SystemMessage 一旦添加始终保留,只能持有一条
  • 工具消息:如果 AiMessage(含 ToolExecutionRequest)被驱逐,关联的 ToolExecutionResultMessage 也会自动清除
  • 多用户隔离:通过 @MemoryId + ChatMemoryProvider 实现
interface Assistant {
    String chat(@MemoryId String memoryId, @UserMessage String message);
}

2.5 工具执行机制

2.5.1 @Tool 注解(高层 API)

class WeatherTools {
    @Tool("返回指定城市的天气预报")
    String getWeather(
        @P("城市名称") String city,
        TemperatureUnit unit
    ) {
        return weatherService.getForecast(city, unit);
    }

    @Tool("计算两个数的和")
    double sum(double a, double b) {
        return a + b;
    }
}

工具方法特性

  • 支持静态/非静态方法,任何可见性
  • 参数支持:原始类型、String、POJO(含嵌套)、enum、List/Set、Map
  • 返回类型:void → “Success”,String → 原样,其他 → JSON
  • 参数默认必需,@P(required = false) 设为可选

2.5.2 工具执行流程

LLM 返回 AiMessage(含 toolExecutionRequests)
    ↓
检查 AiMessage.hasToolExecutionRequests()
    ↓
遍历每个 ToolExecutionRequest(id、name、arguments JSON)
    ↓
在独立线程中并发执行(使用 Executor)
    ↓
生成 ToolExecutionResultMessage
    ↓
将结果发回 LLM 继续推理

2.5.3 ToolProvider(动态工具)

ToolProvider 接口允许运行时动态决定暴露哪些工具,适用于需要条件性工具可用性的场景。


2.6 RAG 集成

2.6.1 三种 RAG 模式

模式 说明 适用场景
Easy RAG 内置嵌入模型(bge-small-en-v1.5,24MB,JVM 内运行) 快速原型
Naive RAG 自定义嵌入模型和向量数据库 标准场景
Advanced RAG 查询转换、多源检索、重排序、分数过滤 生产级

2.6.2 向量数据库支持(30+)

类型 支持
内存 InMemoryEmbeddingStore
关系型 PostgreSQL (pgvector)、Hibernate
搜索引擎 Elasticsearch、Azure AI Search
专用向量库 Pinecone、Weaviate、Milvus、Chroma、Qdrant
云服务 Amazon Neptune、Azure Cosmos DB

2.6.3 文档加载器

FileSystemDocumentLoaderClassPathDocumentLoaderUrlDocumentLoaderAmazonS3DocumentLoaderAzureBlobStorageDocumentLoaderGitHubDocumentLoaderSeleniumDocumentLoaderPlaywrightDocumentLoader 等。

2.6.4 文档解析器

TextDocumentParserApachePdfBoxDocumentParserApachePoiDocumentParser(MS Office)、ApacheTikaDocumentParser(通用格式自动检测)、MarkdownDocumentParserYamlDocumentParser


2.7 流式输出

2.7.1 StreamingChatModel

StreamingChatLanguageModel model = OpenAiStreamingChatModel.builder()
    .apiKey(apiKey)
    .modelName(GPT_4_O_MINI)
    .build();

model.chat("Tell me a joke", new StreamingChatResponseHandler() {
    @Override
    public void onPartialResponse(String partialResponse) {
        System.out.print(partialResponse);  // 逐 token 输出
    }

    @Override
    public void onCompleteResponse(ChatResponse completeResponse) {
        // 完整响应,含 AiMessage 和 TokenUsage
    }

    @Override
    public void onError(Throwable error) {
        error.printStackTrace();
    }
});

2.7.2 AI Services 流式

interface Assistant {
    TokenStream chat(String message);
}

TokenStream stream = assistant.chat("Hello");
stream.onPartialResponse(partial -> {
    // 推送到 WebSocket/SSE
}).onCompleteResponse(response -> {
    // 完成
}).onError(error -> {
    // 错误
}).start();

2.7.3 Reactor 集成

// 需引入 langchain4j-reactor 模块
interface Assistant {
    Flux<String> chat(String userMessage);
}

2.8 Spring Boot 集成

2.8.1 依赖配置

<dependency>
    <groupId>dev.langchain4j</groupId>
    <artifactId>langchain4j-open-ai-spring-boot-starter</artifactId>
    <version>1.9.1-beta17</version>
</dependency>
<dependency>
    <groupId>dev.langchain4j</groupId>
    <artifactId>langchain4j-spring-boot-starter</artifactId>
    <version>1.9.1-beta17</version>
</dependency>

2.8.2 配置属性

langchain4j.open-ai.chat-model.api-key=${OPENAI_API_KEY}
langchain4j.open-ai.chat-model.model-name=gpt-4o
langchain4j.open-ai.chat-model.log-requests=true
langchain4j.open-ai.chat-model.log-responses=true
langchain4j.open-ai.chat-model.temperature=0.7
langchain4j.open-ai.chat-model.max-tokens=1000

2.8.3 声明式 AI Service

@AiService  // 自动扫描并注册为 Spring Bean
interface Assistant {
    @SystemMessage("你是一个礼貌的助手")
    String chat(String userMessage);
}

2.8.4 自动装配组件

Spring Boot Starter 自动装配(如果存在于应用上下文中):

  • ChatModel / StreamingChatModel
  • ChatMemory / ChatMemoryProvider
  • ContentRetriever / RetrievalAugmentor
  • @Component / @Service 类中所有 @Tool 注解的方法

2.8.5 多模型显式装配

@AiService(wiringMode = EXPLICIT, chatModel = "openAiChatModel")
interface OpenAiAssistant { String chat(String msg); }

@AiService(wiringMode = EXPLICIT, chatModel = "ollamaChatModel")
interface OllamaAssistant { String chat(String msg); }

2.8.6 工具自动发现

@Component
public class BookingTools {
    @Tool
    public Booking getBookingDetails(String bookingNumber, String customerName) {
        return bookingService.getBookingDetails(bookingNumber, customerName);
    }
}

要求: Java 17+、Spring Boot 3.2+


2.9 错误处理与容错

2.9.1 异常体系

异常类 触发场景 可重试
UnsupportedFeatureException 模型不支持的功能
ToolExecutionException 工具调用失败 视情况
InputGuardrailException 输入违反安全策略
OutputGuardrailException 输出违反安全策略
HttpException HTTP 非 2xx
RetriableException 暂时性错误
NonRetriableException 不可恢复错误

2.9.2 重试策略(RetryUtils)

RetryPolicy DEFAULT = retryPolicyBuilder()
    .maxRetries(3)
    .delayMillis(1000)
    .jitterScale(0.2)
    .backoffExp(2.0)
    .build();

延迟计算: delay = delayMillis × backoffExp^attempt + random × delayMillis × jitterScale

重试次数 基础延迟 实际延迟范围
1 1000ms 800-1200ms
2 2000ms 1600-2400ms
3 4000ms 3200-4800ms

2.9.3 防护栏(Guardrails)

// 输入防护
if (!result.isValid()) {
    throw new InputGuardrailException(result.toString());
}

// 输出防护(支持重试)
if (!result.isValid()) {
    if (result.isRetry()) {
        return processWithRetry(input, output, retryCount + 1);
    }
    throw new OutputGuardrailException(result.toString());
}

2.9.4 Quarkus 容错集成

@AiService
@Retry(maxRetries = 3)
@Fallback(fallbackMethod = "fallbackChat")
@Timeout(30000)
@CircuitBreaker
public interface Agent { String chat(String msg); }

2.10 可观测性

2.10.1 ChatModelListener(核心扩展点)

public interface ChatModelListener {
    default void onRequest(ChatModelRequestContext ctx) {}
    default void onResponse(ChatModelResponseContext ctx) {}
    default void onError(ChatModelErrorContext ctx) {}
}

2.10.2 Spring Boot 自动注入

@Configuration
class ObservabilityConfig {
    @Bean
    ChatModelListener chatModelListener() {
        return new ChatModelListener() {
            @Override
            public void onRequest(ChatModelRequestContext ctx) {
                ctx.attributes().put("traceId", UUID.randomUUID().toString());
                ctx.attributes().put("startTime", System.currentTimeMillis());
            }
            @Override
            public void onResponse(ChatModelResponseContext ctx) {
                long duration = System.currentTimeMillis() - (long) ctx.attributes().get("startTime");
                TokenUsage usage = ctx.chatResponse().tokenUsage();
                // 记录指标...
            }
        };
    }
}

2.10.3 Micrometer 指标集成

public class MetricsListener implements ChatModelListener {
    @Override
    public void onResponse(ChatModelResponseContext ctx) {
        meterRegistry.counter("llm.token.usage",
            "model", modelName, "type", "input"
        ).increment(tokenUsage.inputTokenCount());

        meterRegistry.timer("llm.request.duration")
            .record(duration, TimeUnit.MILLISECONDS);
    }
}

2.10.4 OpenTelemetry 分布式追踪

@Override
public void onRequest(ChatModelRequestContext ctx) {
    Span span = tracer.spanBuilder("llm_inference")
        .setAttribute("llm.model", ctx.chatRequest().modelName())
        .startSpan();
    ctx.attributes().put("span", span);
}

2.10.5 第三方平台集成

  • Arize Phoenix — OpenInference instrumentation
  • MLflow — OpenTelemetry OTLP 端点
  • Prometheus + Grafana — Micrometer + Spring Boot Actuator

三、LangGraph 核心技术细节

3.1 StateGraph 核心概念

3.1.1 执行模型

LangGraph 将 Agent 工作流建模为有向图,底层算法灵感来自 Google Pregel 系统:

  • 图执行以离散的"超级步(super-step)"推进
  • 每个超级步对所有活跃节点进行一次迭代
  • 节点收到消息后变为 active,执行完毕后若无新消息则投票 halt
  • 当所有节点 inactive 且无消息在途时,图执行终止

3.1.2 核心三要素

概念 说明
State 共享数据结构(TypedDict / Pydantic),代表应用当前快照
Nodes Python 函数,接收状态、返回状态更新
Edges 决定下一步执行哪个节点(固定/条件分支)

3.1.3 StateGraph 定义

from langgraph.graph import StateGraph, START, END

builder = StateGraph(MyState)
builder.add_node("node_a", node_a_function)
builder.add_node("node_b", node_b_function)
builder.add_edge(START, "node_a")
builder.add_edge("node_a", "node_b")
builder.add_edge("node_b", END)

graph = builder.compile()  # 必须编译后才能使用

3.1.4 节点函数签名

def my_node(state: MyState, config: RunnableConfig, runtime: Runtime):
    # state: 当前图状态
    # config: thread_id、tracing tags 等
    # runtime: store、stream_writer 等
    return {"key": "updated_value"}  # 返回状态更新(非完整状态)

3.1.5 边类型

普通边:固定转换

graph.add_edge("node_a", "node_b")

条件边:动态路由

def router(state: MyState) -> Literal["node_b", "node_c"]:
    if state["condition"]:
        return "node_b"
    return "node_c"

graph.add_conditional_edges("node_a", router)

3.1.6 Command 原语

将控制流和状态更新合二为一:

from langgraph.types import Command

def agent(state) -> Command[Literal["agent", "another_agent"]]:
    goto = get_next_agent(...)
    return Command(
        goto=goto,
        update={"my_state_key": "value"}
    )

# 跨图导航(子图 → 父图)
Command(goto="bob", update={...}, graph=Command.PARENT)

3.1.7 Send API(Map-Reduce)

from langgraph.constants import Send

def route(state):
    return [Send("process_item", {"item": item}) for item in state["items"]]

3.2 多 Agent 模式

3.2.1 模式对比

模式 通信方式 适用场景
Network(网络) 多对多,任意 Agent 可调用任意 Agent 无明确层级、无特定调用顺序
Supervisor(主管) 所有 Agent 与单一主管通信 集中控制、任务分配明确
Supervisor + Tool 子 Agent 作为工具暴露给主管 简化路由逻辑
Hierarchical(层级) 主管的主管,多层嵌套 Agent 数量多、分组管理

3.2.2 主管模式示例

def supervisor(state: MessagesState) -> Command[Literal["agent_1", "agent_2", END]]:
    response = model.invoke(...)  # LLM 决定路由
    return Command(goto=response["next_agent"])

def agent_1(state: MessagesState) -> Command[Literal["supervisor"]]:
    response = model.invoke(...)
    return Command(goto="supervisor", update={"messages": [response]})

builder = StateGraph(MessagesState)
builder.add_node(supervisor)
builder.add_node(agent_1)
builder.add_node(agent_2)
builder.add_edge(START, "supervisor")

3.2.3 主管 + 工具调用模式

from langgraph.prebuilt import InjectedState, create_react_agent

def agent_1(state: Annotated[dict, InjectedState]):
    response = model.invoke(...)
    return response.content  # 自动转为 ToolMessage

tools = [agent_1, agent_2]
supervisor = create_react_agent(model, tools)  # 预构建 ReAct Agent

3.2.4 层级模式

# 团队 1
team_1_graph = team_1_builder.compile()

# 团队 2
team_2_graph = team_2_builder.compile()

# 顶层主管
builder = StateGraph(MessagesState)
builder.add_node(top_level_supervisor)
builder.add_node("team_1_graph", team_1_graph)  # 子图作为节点
builder.add_node("team_2_graph", team_2_graph)
builder.add_edge(START, "top_level_supervisor")
builder.add_edge("team_1_graph", top_level_supervisor)
builder.add_edge("team_2_graph", top_level_supervisor)

3.3 状态管理

3.3.1 状态 Schema

from typing_extensions import TypedDict

class State(TypedDict):
    foo: int
    bar: list[str]

多 Schema 支持

class InputState(TypedDict):
    user_input: str

class OutputState(TypedDict):
    graph_output: str

class OverallState(TypedDict):
    foo: str
    user_input: str
    graph_output: str

builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)

3.3.2 Reducers(归约器)

from typing import Annotated
from operator import add

class State(TypedDict):
    foo: int                              # 默认:覆盖
    bar: Annotated[list[str], add]        # 自定义:追加

3.3.3 消息状态

from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

class GraphState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

add_messages 特殊能力:

  • 新消息 → 追加
  • 已有消息(按 ID)→ 更新
  • 支持 dict 自动反序列化为 Message 对象

3.4 人工介入(Human-in-the-Loop)

3.4.1 静态断点

graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["human_review"],  # 进入节点前暂停
    interrupt_after=["node_b"]          # 节点执行后暂停
)

3.4.2 动态中断(推荐)

from langgraph.types import interrupt

def human_review_node(state):
    human_input = interrupt("请审核以下内容并决定是否继续")
    return {"approved": human_input}

3.4.3 断点恢复

# 获取状态
state = graph.get_state(config)

# 使用 Command 恢复
graph.invoke(Command(resume="approved"), config)

3.4.4 典型场景

  1. 审核工具调用
  2. 验证 LLM 输出
  3. LLM 主动请求人工输入

3.5 持久化与检查点

3.5.1 Checkpointer 实现

Checkpointer 说明
MemorySaver 内存存储,开发/测试
SqliteSaver SQLite,轻量级生产
PostgresSaver PostgreSQL,生产推荐
AsyncPostgresSaver 异步 PostgreSQL
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver

checkpointer = PostgresSaver.from_conn_string("postgresql://...")

3.5.2 线程与检查点

config = {"configurable": {"thread_id": "1"}}
graph.invoke(input_data, config)

# 获取状态历史
history = list(graph.get_state_history(config))

# 从特定检查点恢复
config = {"configurable": {"thread_id": "1", "checkpoint_id": "xxx"}}
graph.invoke(None, config=config)

# 状态分叉
graph.update_state(config, values={"foo": 2}, as_node="node_a")

3.5.3 Memory Store(跨线程记忆)

from langgraph.store.memory import InMemoryStore

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),
        "dims": 1536,
        "fields": ["food_preference", "$"]
    }
)
memories = store.search(namespace, query="What does the user like to eat?")

3.6 流式输出

3.6.1 stream_mode

模式 说明
"values" 每步后发出完整状态
"updates" 每步后发出节点名和状态增量
"messages" 逐 token 流式输出 LLM 消息
"custom" 通过 StreamWriter 发出自定义数据
"debug" 详细调试事件
# LLM token 流
for message_chunk, metadata in graph.stream(inputs, stream_mode="messages"):
    print(message_chunk.content, end="|", flush=True)

# 自定义流
def my_node(state, writer: StreamWriter):
    writer({"progress": "50%"})
    return {"result": "done"}

# 多模式组合
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
    print(f"Mode: {mode}, Chunk: {chunk}")

3.7 子图与嵌套工作流

3.7.1 共享状态 Schema

# 子图
subgraph = subgraph_builder.compile()

# 父图
builder = StateGraph(State)
builder.add_node("subgraph_node", subgraph)  # 子图作为节点
builder.add_edge(START, "subgraph_node")

3.7.2 不同状态 Schema

def call_subgraph(state: MessagesState):
    response = subgraph.invoke({"subgraph_messages": state["messages"]})
    return {"messages": response["subgraph_messages"]}

3.7.3 跨图导航

def some_node_inside_subgraph(state):
    return Command(
        goto="bob",           # 父图中的另一个节点
        update={"key": "val"},
        graph=Command.PARENT  # 导航到父图
    )

3.8 错误处理与容错

3.8.1 设计哲学

节点失败时图默认停止,不猜测如何处理失败。失败是图生命周期的一等公民

3.8.2 Retry Policy

from langgraph.pregel import RetryPolicy

retry_policy = RetryPolicy(max_attempts=3)
graph.add_node("fetch_data", fetch_data, retry=retry_policy)

3.8.3 节点内错误处理

def my_node(state):
    try:
        result = unstable_api_call()
        return {"data": result}
    except TimeoutError:
        return {"data": None, "error": "timeout"}

3.8.4 NodeInterrupt(条件性暂停)

from langgraph.errors import NodeInterrupt

def risky_operation(state):
    if is_uncertain(state):
        raise NodeInterrupt("需要人工确认")
    return perform_operation(state)

3.8.5 图级错误路由(Saga 模式)

  1. 正常执行前向步骤
  2. 失败时路由到专用处理节点
  3. 执行补偿/回滚/状态更新
  4. 可选重新路由到恢复路径

3.9 部署方案

3.9.1 部署选项

选项 说明
Self-Hosted Lite 免费(最多 100 万节点执行)
Cloud SaaS 全托管,Plus/Enterprise
Hybrid SaaS 控制面 + 自托管数据面
Self-Hosted Enterprise 完全自行管理

3.9.2 本地开发

langgraph dev  # 启动本地开发服务器 http://127.0.0.1:2024

3.9.3 配置文件

通过 langgraph.json 配置图、环境、依赖等。


3.10 LangGraph Studio 调试

  • 可视化图结构:直观展示节点和边
  • 交互式测试:从 UI 直接运行图
  • 状态编辑与调试:修改状态后重新运行
  • 断点调试:选择 Interrupts 下拉菜单设置断点
  • Thread 管理:查看和管理线程
  • 长期记忆管理:查看和管理跨线程记忆
  • LangSmith 集成:将节点输入/输出添加到数据集

四、LangChain4j + LangGraph 集成方案

4.1 集成现状分析

结论:LangChain4j(Java)与 LangGraph(Python)之间不存在官方的跨语言直接集成。

两者虽然共享"LangChain"品牌,但分属不同语言生态,独立演进。

推荐替代方案LangGraph4j(Java 生态的 LangGraph 实现)

4.2 方案一:纯 Java 方案(LangGraph4j)

4.2.1 项目概况

  • GitHub: langgraph4j/langgraph4j
  • 最新版本: 1.8.11
  • 最低 Java: 17
  • 许可证: MIT
  • 与 LangChain4j 和 Spring AI 无缝集成

4.2.2 模块结构

langgraph4j/
├── langgraph4j-core/              # 核心组件
├── langgraph4j-opentelemetry/     # OpenTelemetry 集成
├── langgraph4j-mysql-saver/       # MySQL 检查点
├── langgraph4j-postgres-saver/    # PostgreSQL 检查点
├── langgraph4j-oracle-saver/      # Oracle 检查点
├── langgraph4j-redis-saver/       # Redis 检查点
├── langchain4j/                   # LangChain4j 集成
│   ├── langchain4j-core/
│   └── langchain4j-agent/
├── spring-ai/                     # Spring AI 集成
└── studio/                        # Web UI

4.2.3 Maven 依赖

<properties>
    <langgraph4j.version>1.8.11</langgraph4j.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.bsc.langgraph4j</groupId>
            <artifactId>langgraph4j-bom</artifactId>
            <version>${langgraph4j.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.bsc.langgraph4j</groupId>
        <artifactId>langgraph4j-core</artifactId>
    </dependency>
    <dependency>
        <groupId>org.bsc.langgraph4j</groupId>
        <artifactId>langchain4j-core-jdk</artifactId>
    </dependency>
</dependencies>

4.2.4 核心概念映射

Python LangGraph LangGraph4j
StateGraph StateGraph<S>
TypedDict state AgentState (Map)
add_messages reducer Channel.Reducer
NodeAction NodeAction<S>
EdgeAction EdgeAction<S>
CompiledGraph CompiledGraph<S>
MemorySaver MemorySaver
PostgresSaver PostgresSaver

4.2.5 优势

  • :white_check_mark: 全 Java 栈,无跨语言复杂性
  • :white_check_mark: 与 LangChain4j 深度集成
  • :white_check_mark: 支持 Spring Boot / Quarkus
  • :white_check_mark: 原生 OpenTelemetry 支持
  • :white_check_mark: 多种检查点后端(MySQL、PostgreSQL、Oracle、Redis)

4.3 方案二:gRPC 桥接

适用于必须同时使用 Java Agent 和 Python Agent 的场景。

4.3.1 Protocol Buffers 定义

syntax = "proto3";

package agentcluster;

service AgentService {
    rpc ProcessTask(TaskRequest) returns (TaskResponse);
    rpc StreamTask(TaskRequest) returns (stream TaskChunk);
    rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
}

message TaskRequest {
    string task_id = 1;
    string agent_type = 2;
    string payload = 3;
    map<string, string> context = 4;
    int32 priority = 5;
}

message TaskResponse {
    string task_id = 1;
    string status = 2;
    string result = 3;
    map<string, string> metadata = 4;
}

message TaskChunk {
    string task_id = 1;
    string chunk_id = 2;
    string content = 3;
    bool is_final = 4;
}

4.3.2 Java 端

public class JavaAgentGrpcService extends AgentServiceGrpc.AgentServiceImplBase {
    private final Assistant assistant;

    @Override
    public void processTask(TaskRequest request, StreamObserver<TaskResponse> observer) {
        String result = assistant.chat(request.getPayload());
        observer.onNext(TaskResponse.newBuilder()
            .setTaskId(request.getTaskId())
            .setStatus("SUCCESS")
            .setResult(result)
            .build());
        observer.onCompleted();
    }
}

4.3.3 Python 端

class PythonAgentServicer(agentcluster_pb2_grpc.AgentServiceServicer):
    async def ProcessTask(self, request, context):
        result = await graph.ainvoke({"messages": [request.payload]})
        return agentcluster_pb2.TaskResponse(
            task_id=request.task_id,
            status="SUCCESS",
            result=result["messages"][-1].content
        )

4.4 方案三:REST API 桥接

适用于简单集成场景。

@RestController
@RequestMapping("/api/agents")
public class AgentController {
    @PostMapping("/process")
    public ResponseEntity<AgentResponse> process(@RequestBody AgentRequest request) {
        return ResponseEntity.ok(agentService.process(request));
    }

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamProcess(@RequestParam String query) {
        return agentService.streamChat(query);
    }
}

4.5 方案四:消息队列驱动

适用于异步解耦场景。

// Kafka 生产者
@Service
public class AgentEventPublisher {
    @Autowired
    private KafkaTemplate<String, AgentEvent> kafkaTemplate;

    public void publishTaskCompletion(String taskId, AgentResult result) {
        kafkaTemplate.send("agent-events", taskId, new AgentEvent("TASK_COMPLETED", taskId, result));
    }
}

// Kafka 消费者
@KafkaListener(topics = "agent-events", groupId = "java-agent-cluster")
public void handleAgentEvent(AgentEvent event) {
    switch (event.getType()) {
        case "TASK_COMPLETED" -> processCompletedTask(event);
        case "TASK_FAILED" -> handleFailedTask(event);
    }
}

4.6 通信方案对比

方案 延迟 吞吐量 复杂度 流式支持 适用场景
纯 Java (LangGraph4j) 极低 极高 :white_check_mark: Java 团队首选
gRPC 低 (~1-5ms) :white_check_mark: 高频调用、跨语言
REST/HTTP 中 (~5-50ms) SSE 简单集成
Kafka 中高 极高 :cross_mark: 异步、事件溯源
RabbitMQ 中 (~1-10ms) :cross_mark: 任务路由
Redis Pub/Sub 极低 (<1ms) :cross_mark: 实时状态同步

五、跨语言通信设计

5.1 消息协议设计

5.1.1 基于 A2A 协议

Google 的 A2A(Agent-to-Agent)协议(50+ 技术合作伙伴支持)是跨框架 Agent 互操作的标准。

{
    "message_id": "uuid-v4",
    "conversation_id": "session-uuid",
    "source_agent": "java-research-agent",
    "target_agent": "python-analysis-agent",
    "message_type": "TASK_REQUEST",
    "timestamp": "2026-04-07T10:00:00Z",
    "content": {
        "task": "分析以下数据...",
        "parameters": {
            "model": "gpt-4",
            "temperature": 0.7
        }
    },
    "context": {
        "parent_message_id": "prev-uuid",
        "trace_id": "trace-uuid",
        "priority": 1
    },
    "metadata": {
        "content_type": "text/plain",
        "encoding": "utf-8",
        "ttl_seconds": 300
    }
}

5.1.2 消息类型

类型 说明
TASK_REQUEST 任务请求
TASK_RESPONSE 任务响应
TASK_STREAM_CHUNK 流式任务块
TASK_CANCEL 任务取消
HEARTBEAT 心跳
CAPABILITY_QUERY 能力查询
CAPABILITY_RESPONSE 能力响应
STATE_SYNC 状态同步
BROADCAST 广播消息
ERROR 错误消息

5.2 共享状态管理

5.2.1 Redis Key 设计

agent:cluster:{cluster_id}:state          → Hash    # 全局集群状态
agent:session:{session_id}:messages       → List    # 会话消息历史
agent:session:{session_id}:context        → Hash    # 会话上下文
agent:task:{task_id}:status               → String  # 任务状态
agent:task:{task_id}:result               → String  # 任务结果
agent:checkpoint:{graph_id}:{thread_id}   → Hash    # LangGraph 检查点
agent:locks:{resource_id}                 → String  # 分布式锁

5.2.2 Java 端

public class SharedStateManager {
    private final RedisTemplate<String, Object> redisTemplate;

    public void updateAgentState(String sessionId, String agentId, Map<String, Object> updates) {
        String key = "agent:session:" + sessionId + ":context";
        redisTemplate.opsForHash().putAll(key, updates);
        redisTemplate.convertAndSend("agent:state:updates",
            new StateUpdateEvent(sessionId, agentId, updates));
    }
}

5.2.3 Python 端

class SharedStateManager:
    def __init__(self, redis_url="redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)

    async def update_agent_state(self, session_id, agent_id, updates):
        key = f"agent:session:{session_id}:context"
        await self.redis.hset(key, mapping={k: json.dumps(v) for k, v in updates.items()})
        await self.redis.publish("agent:state:updates", json.dumps({...}))

5.3 事件驱动架构

5.3.1 消息中间件选型

中间件 吞吐量 延迟 持久化 复杂度 适用场景
Kafka 极高(百万/秒) 中 (5-50ms) 事件溯源、审计
RabbitMQ 高(万/秒) 低 (1-10ms) 可配置 任务路由、工作队列
Redis Pub/Sub 高(万/秒) 极低 (<1ms) 实时通知

5.3.2 Kafka Topic 设计

agent-events              # 全局 Agent 事件
agent-tasks               # 任务分发
agent-state-updates       # 状态变更
agent-errors              # 错误事件
agent-audit               # 审计日志

六、生产部署架构

6.1 容器化

6.1.1 Java Agent Dockerfile

FROM eclipse-temurin:21-jdk-alpine AS build
WORKDIR /app
COPY pom.xml .
COPY src ./src
RUN ./mvnw clean package -DskipTests

FROM eclipse-temurin:21-jre-alpine
WORKDIR /app
COPY --from=build /app/target/*.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-XX:+UseZGC", "-Xmx512m", "-jar", "app.jar"]

6.1.2 Python Agent Dockerfile

FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "agent_server:app", "--host", "0.0.0.0", "--port", "8000"]

6.2 Kubernetes 编排

6.2.1 Java Agent StatefulSet

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: java-agent-service
spec:
  serviceName: agent-cluster
  replicas: 3
  template:
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: app
                    operator: In
                    values: ["java-agent"]
              topologyKey: kubernetes.io/hostname
      containers:
        - name: agent
          image: your-registry/java-agent:1.8.11
          ports:
            - containerPort: 8080
          env:
            - name: OPENAI_API_KEY
              valueFrom:
                secretKeyRef:
                  name: agent-secrets
                  key: openai-api-key
          resources:
            requests:
              memory: "512Mi"
              cpu: "500m"
            limits:
              memory: "2Gi"
              cpu: "2000m"
          livenessProbe:
            httpGet:
              path: /actuator/health
              port: 8080
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /actuator/health/readiness
              port: 8080
            initialDelaySeconds: 20
            periodSeconds: 5

6.3 服务网格(Istio)

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: agent-routing
spec:
  hosts:
    - agent-cluster
  http:
    - match:
        - headers:
            x-agent-type:
              exact: research
      route:
        - destination:
            host: java-research-agent
            port:
              number: 8080
    - match:
        - headers:
            x-agent-type:
              exact: analysis
      route:
        - destination:
            host: python-analysis-agent
            port:
              number: 8000

Istio 提供的能力

  • 流量管理(按权重路由、灰度发布)
  • 可观测性(分布式追踪、指标)
  • 安全(mTLS 双向认证、RBAC)
  • 弹性(重试、超时、熔断)

6.4 弹性伸缩

6.4.1 HPA 自动扩缩容

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: java-agent-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: StatefulSet
    name: java-agent-service
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Pods
      pods:
        metric:
          name: agent_queue_depth
        target:
          type: AverageValue
          averageValue: "50"

七、安全设计

7.1 API Key 管理

# Kubernetes Secret
apiVersion: v1
kind: Secret
metadata:
  name: agent-secrets
type: Opaque
stringData:
  openai-api-key: sk-xxx
  anthropic-api-key: sk-ant-xxx
  redis-url: redis://:password@redis-cluster:6379

推荐实践

  • 使用 Kubernetes Secrets 或 HashiCorp Vault 管理
  • 绝不硬编码在代码或配置文件中
  • 定期轮换 API Key
  • 不同环境使用不同 Key

7.2 速率限制

// Bucket4j 速率限制
@Bean
public Bucket agentBucket() {
    return Bucket.builder()
        .addLimit(Bandwidth.classic(100, Refill.intervally(10, Duration.ofSeconds(1))))
        .build();
}

7.3 访问控制

@Configuration
@EnableWebSecurity
public class ApiSecurityConfig {
    @Bean
    public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
        http
            .securityMatcher("/api/agents/**")
            .authorizeHttpRequests(auth -> auth
                .requestMatchers("/api/agents/health").permitAll()
                .requestMatchers("/api/agents/**").hasRole("AGENT_CLIENT")
            )
            .oauth2ResourceServer(oauth2 -> oauth2.jwt(Customizer.withDefaults()));
        return http.build();
    }
}

7.4 数据安全

层面 措施
传输加密 TLS 1.3、mTLS(Istio)
存储加密 数据库透明加密(TDE)、Redis TLS
敏感数据脱敏 PII 检测和脱敏
审计日志 全量记录 Agent 操作
Prompt 注入防护 输入/输出 Guardrails

八、性能优化

8.1 模型层优化

策略 说明 预期效果
模型分层 Supervisor 用强模型,执行 Agent 用轻量模型 节省 60-80% Token 成本
Prompt 压缩 精简系统提示词、减少上下文 减少 20-40% Token
缓存 语义缓存相似查询 减少 30-50% API 调用
批处理 合并多个工具调用 减少网络往返
本地模型 Ollama 部署小模型处理简单任务 零 API 成本

8.2 架构层优化

策略 说明
连接池 HTTP/gRPC 连接池复用
异步非阻塞 使用 CompletableFuture / async-await
并行执行 无依赖的 Agent 并行运行
预加载 模型预热、向量库预加载
限流降级 超出容量时优雅降级

8.3 Token 成本控制

根据 Anthropic 测试数据,多 Agent 系统可能消耗约 15 倍 Token

控制策略

  1. 设置每个 Agent 的 maxTokens 上限
  2. 监控 Token 使用量,设置预算告警
  3. 对简单任务使用本地小模型
  4. 实现语义缓存减少重复推理
  5. 设置全局 Token 预算和熔断机制

九、技术选型决策树

团队主要技术栈是 Java 吗?
├── 是 → 是否需要 Python 特有库?
│   ├── 否 → ✅ 纯 Java 方案(LangGraph4j + LangChain4j)
│   └── 是 → 是否需要实时双向通信?
│       ├── 是 → gRPC 桥接
│       └── 否 → REST API 桥接
└── 否 → 使用 Python LangGraph + LangChain

Agent 数量规模?
├── 1-3 个 → 顺序/并行工作流(langchain4j-agentic)
├── 4-10 个 → Supervisor 模式
└── 10+ 个 → 层级模式(Hierarchical)

部署环境?
├── 云原生 → Kubernetes + Istio
├── 私有化 → Docker Compose / K3s
└── 边缘 → Ollama 本地模型 + 轻量容器

状态持久化需求?
├── 开发/测试 → MemorySaver
├── 轻量生产 → SQLite / Redis
└── 企业级 → PostgreSQL / MySQL

可观测性需求?
├── 基础 → SLF4J 日志
├── 标准 → ChatModelListener + Micrometer
└── 企业级 → OpenTelemetry + Jaeger + Prometheus + Grafana

十、最佳实践与建议

10.1 架构设计

  1. 优先纯 Java 方案:如果业务允许,LangGraph4j + LangChain4j 可避免所有跨语言复杂性
  2. 渐进式演进:从单 Agent 开始,按需引入多 Agent 协作,避免过度工程化
  3. 采用 A2A 协议:跨语言场景遵循 Google A2A 协议规范
  4. Redis 作为状态桥梁:跨语言共享状态的最佳中间层

10.2 开发实践

  1. 声明式 Agent 定义:使用 @AiService / @Agent 注解,保持代码简洁
  2. 工具自动发现:利用 Spring Boot Starter 自动装配 @Tool 方法
  3. 多用户内存隔离:始终使用 @MemoryId + ChatMemoryProvider
  4. 输入/输出防护栏:部署前务必配置 Guardrails

10.3 运维实践

  1. 全链路可观测:ChatModelListener + OpenTelemetry + Prometheus + Grafana
  2. 检查点持久化:生产环境使用 PostgreSQL/MySQL 检查点,支持故障恢复
  3. 模型分层:Supervisor 用 GPT-4o/Claude,执行型用 GPT-4o-mini/本地模型
  4. Token 预算控制:设置全局预算、单 Agent 限制、熔断机制

10.4 安全实践

  1. API Key 外部管理:Kubernetes Secrets / Vault,绝不硬编码
  2. 速率限制:Bucket4j 或 Istio RateLimit
  3. mTLS 加密:服务间通信启用双向 TLS
  4. 审计日志:全量记录 Agent 操作和 LLM 调用

附录:关键参考资源

1 个赞