【Code With SOLO】用 SOLO 快速开发出 [ Apache NiFi 兼容的自定义 CDC Processor ]

1. 摘要:

最近项目中需要实现 PostgreSQL 到 MySQL 的实时数据同步(CDC)技术选型甲方点名要NIFI,我使用 SOLO 快速开发了一款 NiFi 自定义处理器 CaptureChangePostgreSQL​,基于 PostgreSQL 逻辑复制协议捕获 INSERT/UPDATE/DELETE 变更,输出为 JSON FlowFile,配合 JoltTransformJSON 字段映射和 PutDatabaseRecord 写入 MySQL,实现了完整的跨库实时数据同步链路。整个过程中从源码适配、编译打包、运行调试到多表路由,SOLO 全程参与,大幅缩短了开发周期。

2. 背景:

我是一名数据平台研发工程师。业务需求是将多个 PostgreSQL 表的变更实时同步到 MySQL,要求支持 INSERT、UPDATE、DELETE 全量操作,且能按表路由到不同的目标表。NiFi 官方提供了 MySQL CDC 处理器(CaptureChangeMySQL),但没有 PostgreSQL 版本。社区有一个基于 NiFi 1.14.0 的 PostgreSQL CDC 实现,但与 NiFi 2.9.0 存在大量 API 不兼容问题。如果从零开始研究 NiFi 插件开发体系、逻辑复制协议、NAR 打包机制,预计需要 1-2 周时间。于是我借助 SOLO 来加速整个开发过程。

3. 实践过程:

任务拆解:

  • 源码适配:将社区版 NiFi 1.14.0 的 PostgreSQL CDC 源码适配到 NiFi 2.9.0,修复 API 兼容性(ExpressionLanguageScope、CoreAttributes、ClassLoaderUtils 等)。

  • 编译打包:配置 Maven POM、依赖管理、NAR 打包,解决 opensaml 依赖下载、Maven 版本、develocity 扩展等问题。

  • 运行时修复:解决 PostgreSQL JDBC 复制连接的 oid type M not known​ 错误、MySQL 时间格式不兼容(timestamptz​ 带 +08​ 时区后缀)。

  • 功能增强:添加 FlowFile 属性(statement.type​、cdc.table​)支持多表路由和 PutDatabaseRecord 操作类型识别;支持 TRUNCATE 事件跳过;时间戳自动格式化。

  • 多表 CDC:设计 RouteOnAttribute + JoltTransformJSON + PutDatabaseRecord 的多表同步架构。

用了 SOLO 哪些能力:

  • 全栈代码生成与修改:自动修复 Java 源码中的包名、import 语句、API 兼容性问题,跨多个文件精准修改。

  • 编译环境搭建:自动安装 Java 21、Maven 3.9.15,克隆 NiFi 源码,配置 Maven settings.xml(阿里云镜像 + Shibboleth 仓库),一键编译打包 NAR。

  • 问题诊断与修复:面对 oid type M not known​、Data truncation #22007​、transfer relationship not specified​、DELETE not working​ 等运行时错误,SOLO 能快速定位根因并给出修复方案。

  • 文档生成:自动生成 README.md(编译指南 + 问题排查)和 DEVELOPMENT_GUIDE.md(AI 开发文档),方便后续维护和 AI 辅助开发。

关键 Prompt 还原:

  1. “基于 NiFi 2.9.0 创建一个 PostgreSQL CDC 处理器,支持逻辑复制捕获 INSERT/UPDATE/DELETE 变更”(生成基础骨架与编译打包)

  2. “编译报错 oid type M not known and not a number,怎么修复?”(运行时 JDBC 连接问题修复)

  3. “MySQL 报 Data truncation: #22007,时间格式不对”(时间戳格式化,在 Decode.java 中添加 formatTimestamp 方法)

  4. “DELETE 操作没有同步到 MySQL”(添加 statement.type FlowFile 属性 + PutDatabaseRecord 配置 Use statement.type Attribute)

  5. “支持多个表的 CDC,按表名路由到不同的 JoltTransformJSON 和 PutDatabaseRecord”(多表架构设计 + cdc.table 属性)

  6. “帮我编写一份能直接 AI 生成的开发文档”(生成 DEVELOPMENT_GUIDE.md)

中间踩过的坑:

  • oid type M not known:PostgreSQL JDBC 42.7.x 通过扩展查询协议建立复制连接时解析 oid 类型失败。修复方式是将 preferQueryMode=simple​ 从 Properties 传递改为直接写入 JDBC URL query string。

  • MySQL 时间格式:PostgreSQL timestamptz​ 输出 2024-08-02 08:54:32.148+08​,MySQL datetime​ 不接受时区后缀。最初尝试用 JoltTransformJSON 的 substring​ 和 replaceAll​ 处理,但 Jolt 函数对某些格式不生效。最终在源码 Decode.java 中添加 formatTimestamp()​ 方法,在 CDC 源头直接格式化。

  • DELETE 不生效:两个原因叠加——① statement.type​ 属性值是小写 delete​,但 PutDatabaseRecord 的 Java Enum valueOf()​ 大小写敏感,必须大写 DELETE​;② PutDatabaseRecord 的 Statement Type 设为 UPSERT​(不支持 DELETE),必须改为 Use statement.type Attribute​。

  • TRUNCATE 报错:PostgreSQL 逻辑复制不捕获 TRUNCATE 事件(或捕获后 extractCdcType 返回 null),但 session.create()​ 创建的 FlowFile 没有 transfer/remove 就 continue,违反 NiFi 的 FlowFile 生命周期要求。修复为 session.remove(flowFile)​ 后再 continue。

  • NAR 体积膨胀:最初 NAR 3.5 MB,原因是 nifi-utils​、postgresql​、jackson​ 都被打入 NAR。改为 provided​ scope 后 NAR 仅 46 KB,所有外部依赖跟随 NiFi 框架。

  • API 创建处理器 409 错误:外部系统调用 NiFi API 创建处理器时,type 字段写错了包名(org.apache.nifi.processors.cdc.postgresql​ vs 实际 org.apache.nifi.cdc.postgresql.processors​),导致 NAR 找不到处理器类。

  • Maven 编译 opensaml 下载失败:用户的 Maven settings.xml 中 huaweicloud​ 镜像的 mirrorOf=*​ 拦截了所有仓库请求,但华为云没有 opensaml。修复为 mirrorOf=*,!shibboleth​ 并添加 Shibboleth 仓库。

4. 成果展示:

最终实现了完整的 PostgreSQL CDC 到 MySQL 的实时同步方案:

CaptureChangePostgreSQL → RouteOnAttribute → JoltTransformJSON → PutDatabaseRecord


核心产出物:

  • ​nifi-cdc-postgresql-nar-2.9.0.nar​(46 KB,精简版)

  • ​README.md​(编译指南 + 8 个常见问题排查)

  • ​DEVELOPMENT_GUIDE.md​(AI 开发文档,可直接用于生成 NiFi 数据流配置)

源码目录结构:

nifi-cdc-postgresql-bundle/
├── pom.xml
├── nifi-cdc-postgresql-processors/
│   ├── pom.xml
│   └── src/main/java/org/apache/nifi/cdc/postgresql/
│       ├── processors/CaptureChangePostgreSQL.java
│       └── replication/
│           ├── CDCException.java
│           ├── Column.java
│           ├── ConnectionManager.java
│           ├── Decode.java
│           ├── Event.java
│           ├── PGEasyReplication.java
│           ├── Relation.java
│           ├── Snapshot.java
│           ├── Stream.java
│           └── TupleData.java
└── nifi-cdc-postgresql-nar/
    ├── pom.xml
    └── src/main/resources/
        ├── META-INF/LICENSE
        ├── META-INF/NOTICE
        └── DEVELOPMENT_GUIDE.md

5. 效果与总结:

  • 提效显著:从零开始适配 NiFi 2.9.0 的 PostgreSQL CDC 处理器,涉及 Java 源码修改、Maven 编译、运行时调试、功能增强、多表架构设计、文档编写,如果纯手工操作预计需要 1-2 周。借助 SOLO 交互式开发,整个过程仅用 约 2 天(含调试和文档),核心代码生成和编译打包在数分钟内完成。

  • SOLO 的角色:它是完美的"全栈开发伙伴"。在处理 Java 包名重构(9 个文件的 package + import 批量修改)、Maven POM 依赖管理(provided scope 精简 NAR)、运行时问题诊断(7 个不同类型的错误逐一排查修复)时,SOLO 的跨文件精准修改和上下文理解能力远超手动操作。

  • 可复用方法论:“先跑通,再修复,后增强”。先让 SOLO 把源码编译通过跑起来(哪怕有 bug),再逐一解决运行时错误(oid type、时间格式、DELETE 不生效),最后添加增强功能(多表路由、时间格式化、TRUNCATE 跳过)。这种"小步快跑"的 Prompt 方式能让 AI 输出质量极高,每次修改都可以独立验证。

  • 文档即代码:生成的 DEVELOPMENT_GUIDE.md 让后续新增表的 CDC 配置可以完全交给 AI 完成,只需提供源表和目标表的 DDL,AI 即可生成完整的 Jolt 规范、RouteOnAttribute 路由规则和 PutDatabaseRecord 配置。