当我们提交任务之后,Flink 会通过 SPI 机制将 classpath 下注册的所有工厂类加载进来,包括 DynamicTableFactory、DeserializationFormatFactory 等等。而对于 Format 来说,到底使用哪个 DeserializationFormatFactory,是根据 DDL 语句中的 Format 来决定的。通过将 Format 的值与工厂类的 factoryIdentifier() 方法的返回值进行匹配 来确定。


  • 工厂类(DeserializationFormatFactory):负责编译时根据 ‘format’ = ‘maxwell-json’创建对应的反序列化器。即 MaxwellJsonFormatFactory。

  • 反序列化类(DeserializationSchema):负责运行时的解析,根据固定格式将 CDC 数据转换成 Flink 系统能认识的 INSERT/DELETE/UPDATE 消息,如 RowData。即 MaxwellJsonDeserializationSchema。

  • Service 注册文件:需要添加 Service 文件 META-INF/services/org.apache.flink.table.factories.Factory ,并在其中增加一行我们实现的 MaxwellJsonFormatFactory 类路径。

  • insert:取出 data 中的值,也就是我们通过DDL定义的字段对应的值,再将其标记为 RowKind.INSERT 类型数据,最后下发。

  • update:分别取出 data 和 old 的值,然后循环 old 中每个字段,字段值如果为空说明是未修改的字段,那就用 data 中对应位置字段的值替代;之后将 old 标记为 RowKind.UPDATE_BEFORE 也就意味着 Flink 引擎需要将之前对应的值撤回,data 标记为 RowKind.UPDATE_AFTER 正常下发。

  • delete:取出 data 中的值,标记为 RowKind.DELETE,代表需要撤回。


  • Kafka 数据源表



  • Kafka 数据结果表&数据源表


  • MySQL 表

参考链接:



Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据-LMLPHP  福利来了  Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据-LMLPHP

Apache Flink 极客挑战赛

Flink 源码 | 自定义 Format 消费 Maxwell CDC 数据-LMLPHP

本文分享自微信公众号 - Flink 中文社区(gh_5efd76d10a8d)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

09-08 02:53