1. 摘要:
最近项目中需要实现 PostgreSQL 到 MySQL 的实时数据同步(CDC)技术选型甲方点名要NIFI,我使用 SOLO 快速开发了一款 NiFi 自定义处理器 CaptureChangePostgreSQL,基于 PostgreSQL 逻辑复制协议捕获 INSERT/UPDATE/DELETE 变更,输出为 JSON FlowFile,配合 JoltTransformJSON 字段映射和 PutDatabaseRecord 写入 MySQL,实现了完整的跨库实时数据同步链路。整个过程中从源码适配、编译打包、运行调试到多表路由,SOLO 全程参与,大幅缩短了开发周期。
2. 背景:
我是一名数据平台研发工程师。业务需求是将多个 PostgreSQL 表的变更实时同步到 MySQL,要求支持 INSERT、UPDATE、DELETE 全量操作,且能按表路由到不同的目标表。NiFi 官方提供了 MySQL CDC 处理器(CaptureChangeMySQL),但没有 PostgreSQL 版本。社区有一个基于 NiFi 1.14.0 的 PostgreSQL CDC 实现,但与 NiFi 2.9.0 存在大量 API 不兼容问题。如果从零开始研究 NiFi 插件开发体系、逻辑复制协议、NAR 打包机制,预计需要 1-2 周时间。于是我借助 SOLO 来加速整个开发过程。
3. 实践过程:
任务拆解:
-
源码适配:将社区版 NiFi 1.14.0 的 PostgreSQL CDC 源码适配到 NiFi 2.9.0,修复 API 兼容性(ExpressionLanguageScope、CoreAttributes、ClassLoaderUtils 等)。
-
编译打包:配置 Maven POM、依赖管理、NAR 打包,解决 opensaml 依赖下载、Maven 版本、develocity 扩展等问题。
-
运行时修复:解决 PostgreSQL JDBC 复制连接的 oid type M not known 错误、MySQL 时间格式不兼容(timestamptz 带 +08 时区后缀)。
-
功能增强:添加 FlowFile 属性(statement.type、cdc.table)支持多表路由和 PutDatabaseRecord 操作类型识别;支持 TRUNCATE 事件跳过;时间戳自动格式化。
-
多表 CDC:设计 RouteOnAttribute + JoltTransformJSON + PutDatabaseRecord 的多表同步架构。
用了 SOLO 哪些能力:
-
全栈代码生成与修改:自动修复 Java 源码中的包名、import 语句、API 兼容性问题,跨多个文件精准修改。
-
编译环境搭建:自动安装 Java 21、Maven 3.9.15,克隆 NiFi 源码,配置 Maven settings.xml(阿里云镜像 + Shibboleth 仓库),一键编译打包 NAR。
-
问题诊断与修复:面对 oid type M not known、Data truncation #22007、transfer relationship not specified、DELETE not working 等运行时错误,SOLO 能快速定位根因并给出修复方案。
-
文档生成:自动生成 README.md(编译指南 + 问题排查)和 DEVELOPMENT_GUIDE.md(AI 开发文档),方便后续维护和 AI 辅助开发。
关键 Prompt 还原:
-
“基于 NiFi 2.9.0 创建一个 PostgreSQL CDC 处理器,支持逻辑复制捕获 INSERT/UPDATE/DELETE 变更”(生成基础骨架与编译打包)
-
“编译报错 oid type M not known and not a number,怎么修复?”(运行时 JDBC 连接问题修复)
-
“MySQL 报 Data truncation: #22007,时间格式不对”(时间戳格式化,在 Decode.java 中添加 formatTimestamp 方法)
-
“DELETE 操作没有同步到 MySQL”(添加 statement.type FlowFile 属性 + PutDatabaseRecord 配置 Use statement.type Attribute)
-
“支持多个表的 CDC,按表名路由到不同的 JoltTransformJSON 和 PutDatabaseRecord”(多表架构设计 + cdc.table 属性)
-
“帮我编写一份能直接 AI 生成的开发文档”(生成 DEVELOPMENT_GUIDE.md)
中间踩过的坑:
-
oid type M not known:PostgreSQL JDBC 42.7.x 通过扩展查询协议建立复制连接时解析 oid 类型失败。修复方式是将 preferQueryMode=simple 从 Properties 传递改为直接写入 JDBC URL query string。
-
MySQL 时间格式:PostgreSQL timestamptz 输出 2024-08-02 08:54:32.148+08,MySQL datetime 不接受时区后缀。最初尝试用 JoltTransformJSON 的 substring 和 replaceAll 处理,但 Jolt 函数对某些格式不生效。最终在源码 Decode.java 中添加 formatTimestamp() 方法,在 CDC 源头直接格式化。
-
DELETE 不生效:两个原因叠加——① statement.type 属性值是小写 delete,但 PutDatabaseRecord 的 Java Enum valueOf() 大小写敏感,必须大写 DELETE;② PutDatabaseRecord 的 Statement Type 设为 UPSERT(不支持 DELETE),必须改为 Use statement.type Attribute。
-
TRUNCATE 报错:PostgreSQL 逻辑复制不捕获 TRUNCATE 事件(或捕获后 extractCdcType 返回 null),但 session.create() 创建的 FlowFile 没有 transfer/remove 就 continue,违反 NiFi 的 FlowFile 生命周期要求。修复为 session.remove(flowFile) 后再 continue。
-
NAR 体积膨胀:最初 NAR 3.5 MB,原因是 nifi-utils、postgresql、jackson 都被打入 NAR。改为 provided scope 后 NAR 仅 46 KB,所有外部依赖跟随 NiFi 框架。
-
API 创建处理器 409 错误:外部系统调用 NiFi API 创建处理器时,type 字段写错了包名(org.apache.nifi.processors.cdc.postgresql vs 实际 org.apache.nifi.cdc.postgresql.processors),导致 NAR 找不到处理器类。
-
Maven 编译 opensaml 下载失败:用户的 Maven settings.xml 中 huaweicloud 镜像的 mirrorOf=* 拦截了所有仓库请求,但华为云没有 opensaml。修复为 mirrorOf=*,!shibboleth 并添加 Shibboleth 仓库。
4. 成果展示:
最终实现了完整的 PostgreSQL CDC 到 MySQL 的实时同步方案:
CaptureChangePostgreSQL → RouteOnAttribute → JoltTransformJSON → PutDatabaseRecord
核心产出物:
-
nifi-cdc-postgresql-nar-2.9.0.nar(46 KB,精简版)
-
README.md(编译指南 + 8 个常见问题排查)
-
DEVELOPMENT_GUIDE.md(AI 开发文档,可直接用于生成 NiFi 数据流配置)
源码目录结构:
nifi-cdc-postgresql-bundle/
├── pom.xml
├── nifi-cdc-postgresql-processors/
│ ├── pom.xml
│ └── src/main/java/org/apache/nifi/cdc/postgresql/
│ ├── processors/CaptureChangePostgreSQL.java
│ └── replication/
│ ├── CDCException.java
│ ├── Column.java
│ ├── ConnectionManager.java
│ ├── Decode.java
│ ├── Event.java
│ ├── PGEasyReplication.java
│ ├── Relation.java
│ ├── Snapshot.java
│ ├── Stream.java
│ └── TupleData.java
└── nifi-cdc-postgresql-nar/
├── pom.xml
└── src/main/resources/
├── META-INF/LICENSE
├── META-INF/NOTICE
└── DEVELOPMENT_GUIDE.md
5. 效果与总结:
-
提效显著:从零开始适配 NiFi 2.9.0 的 PostgreSQL CDC 处理器,涉及 Java 源码修改、Maven 编译、运行时调试、功能增强、多表架构设计、文档编写,如果纯手工操作预计需要 1-2 周。借助 SOLO 交互式开发,整个过程仅用 约 2 天(含调试和文档),核心代码生成和编译打包在数分钟内完成。
-
SOLO 的角色:它是完美的"全栈开发伙伴"。在处理 Java 包名重构(9 个文件的 package + import 批量修改)、Maven POM 依赖管理(provided scope 精简 NAR)、运行时问题诊断(7 个不同类型的错误逐一排查修复)时,SOLO 的跨文件精准修改和上下文理解能力远超手动操作。
-
可复用方法论:“先跑通,再修复,后增强”。先让 SOLO 把源码编译通过跑起来(哪怕有 bug),再逐一解决运行时错误(oid type、时间格式、DELETE 不生效),最后添加增强功能(多表路由、时间格式化、TRUNCATE 跳过)。这种"小步快跑"的 Prompt 方式能让 AI 输出质量极高,每次修改都可以独立验证。
-
文档即代码:生成的 DEVELOPMENT_GUIDE.md 让后续新增表的 CDC 配置可以完全交给 AI 完成,只需提供源表和目标表的 DDL,AI 即可生成完整的 Jolt 规范、RouteOnAttribute 路由规则和 PutDatabaseRecord 配置。

