roman_日积跬步-终至千里

roman_日积跬步-终至千里

MySQL CDC 连接器允许从 MySQL 数据库读取快照数据(比如:flink任务消费时刻的整表数据)和增量数据。本文描述了如何设置 MySQL CDC 连接器来对 MySQL 数据库运行 SQL 查询。

 

一. 运行前准备

1. 依赖

1.1. Maven dependency

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <!-- 请使用已发布的版本依赖,snapshot版本的依赖需要本地自行编译。 -->
  <version>2.4.0</version>
</dependency>

 

1.2. SQL Client JAR(推荐)

下载 flink-sql-connector-mysql-cdc-2.4.0.jar<FLINK_HOME>/lib/ 目录下。

 

2. 配置 MySQL 服务器(必须)

你必须定义一个 MySQL 用户,该用户对 MySQL CDC 连接器监视的所有数据库都应该具有所需的权限。

# 创建用户
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

# 赋权
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

# 刷新权限
mysql> FLUSH PRIVILEGES;

注意:

 

二. 功能说明

1. 启动模式

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:



MySQLSource.builder()
    .startupOptions(StartupOptions.earliest()) // 从最早位点启动
    .startupOptions(StartupOptions.latest()) // 从最晚位点启动
    .startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 从指定 binlog 文件名和位置启动
    .startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // 从 GTID 集合启动
    .startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动
    ...
    .build()




CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    'scan.startup.mode' = 'earliest-offset', -- 从最早位点启动
    'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
    'scan.startup.mode' = 'specific-offset', -- 从特定位点启动
 
    'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 binlog 文件名
    'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置
    'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合

    'scan.startup.mode' = 'timestamp', -- 从特定位点启动
    'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳
    ...
)

 

2. 全量阶段支持 checkpoint

增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。

 

3. 关于无主键表

从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。

在使用无主键表时,需要注意以下两种情况。

 

Exactly-Once 处理

MySQL CDC 连接器是一个 Flink Source 连接器,它将首先读取表快照块,然后继续读取 binlog, 无论是在快照阶段还是读取 binlog 阶段,MySQL CDC 连接器都会在处理时准确读取数据,即使任务出现了故障。

 

三. 实战

1. 实现mysql整表与增量表同步

-- 'scan.startup.mode'= 'initial' 
-- 
CREATE TABLE tjy_sql1  
(  
  `id` int,  
  `name` string,  
  `face` string  
 ,PRIMARY KEY(id) NOT ENFORCED  
) WITH (  
        'connector' = 'mysql-cdc',  
        'hostname' = 'xxx',  
        'port' = '3306',  
        'username' = 'middle_test',  
        'password' = '123456',  
        'database-name' = 'middle_test',  
        'table-name' = 'tjy_fortest1'  
       -- ,'scan.incremental.snapshot.enabled' = 'false'  
       --  initial: 默认值,全表同步,然后进行增量同步;
       --  'scan.startup.mode'= 'initial'  
       -- 'debezium.snapshot.mode' = 'initial'      );  
  
  
 CREATE TABLE tjy_sql1_sink  
 (  
  `id` int,  
  `name` string,  
  `face` string  
  ,PRIMARY KEY(id) NOT ENFORCED  
 ) WITH (  
           'connector' = 'mysql-x',  
           'url' = 'jdbc:mysql://xxx:3306/middle_test?useunicode=true&characterEncoding=utf8&useSSL=false&useCursorFetch=true',  
           'username' = 'middle_test',  
           'password' = '123456',  
           'table-name' = 'flink_type',  
           'table-name' = 'tjy_fortest2'  
       );  
  
  
insert into tjy_sql1_sink select * from tjy_sql1;

 

FAQ

可能涉及到的问题

【Flink CDC(一)】实现mysql整表与增量读取-LMLPHP

 

参考:
官网:https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/mysql-cdc%28ZH%29.html

02-27 06:50