flink checkpoint使用

Last updated on January 17, 2025 am

🧙 Questions

检测flink实时同步,测试checkpoint和savepoint逻辑

☄️ Ideas

下载依赖
本地启动mysql
docker run \
  --name ispong-mysql \
  --privileged=true \
  -d \
  -p 30306:3306 \
  -e MYSQL_ROOT_PASSWORD=root123 \
  -e MYSQL_DATABASE=isxcode_db \
  mysql:8.0
创建表
create table cdc_source
(
    username varchar(200) null,
    age      int          null
);

create table cdc_target
(
    username varchar(200) null,
    age      int          null
);
编写flinksql
CREATE TABLE from_table(
    username STRING,
    age INT
) WITH (
    'connector'='mysql-cdc',
    'hostname' = 'localhost',
    'port' = '30306',
    'username' = 'root',
    'password' = 'root123',
    'database-name' = 'isxcode_db',
    'table-name' = 'cdc_source',
    'scan.incremental.snapshot.enabled'='false',
    'server-time-zone'='UTC',
	'scan.startup.mode'='latest-offset'
); 

CREATE TABLE target_table(
    username STRING,
    age INT,
    PRIMARY KEY(username) NOT ENFORCED
) WITH (
    'connector'='jdbc',
    'url'='jdbc:mysql://localhost:30306/isxcode_db',
    'driver'='com.mysql.cj.jdbc.Driver',
    'table-name'='cdc_target',
    'username'='root',
    'password'='root123'); 

INSERT INTO target_table select * from from_table;
CREATE TABLE from_table(
    username STRING,
    age INT
) WITH (
    'connector'='jdbc',
    'url'='jdbc:mysql://localhost:30306/isxcode_db',
    'driver'='com.mysql.cj.jdbc.Driver',
    'table-name'='cdc_source',
    'username'='root',
    'password'='root123',
	  'scan.fetch-size'= '1'
);

CREATE TABLE target_table(
    username STRING,
    age INT
) WITH (
    'connector'='jdbc',
    'url'='jdbc:mysql://localhost:30306/isxcode_db',
    'driver'='com.mysql.cj.jdbc.Driver',
    'table-name'='cdc_target',
    'username'='root',
    'password'='root123',
    'sink.buffer-flush.max-rows'='1'); 

INSERT INTO target_table select * from from_table;

常见问题

问题1
'scan.incremental.snapshot.chunk.key-column' is required for table without primary key when 'scan.incremental.snapshot.enabled' enabled.
在来源表添加
'scan.incremental.snapshot.enabled'='false'
问题2
The main method caused an error: please declare primary key for sink table when query contains update/delete record.
目标表添加
PRIMARY KEY(username) NOT ENFORCED
问题3
The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
-- 时区问题
show variables like '%time_zone%';
set time_zone='+8:00';

-- 或者配置
来源表
'server-time-zone'='UTC'
问题4
Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD or FLUSH_TABLES privilege(s) for this operation
-- 权限不足
SHOW GRANTS for ispong;
GRANT SELECT,RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'ispong'@'%';
FLUSH PRIVILEGES;
SHOW GRANTS for ispong;
问题5
Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'MASTER STATUS' at line 1
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:121)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1200)
	at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:553)
	at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496)
	at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.determineSnapshotOffset(MySqlSnapshotChangeEventSource.java:276)
	at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.determineSnapshotOffset(MySqlSnapshotChangeEventSource.java:46)
	at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:113)
	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
	... 8 more
不支持8.4以上的版本,换8.0的mysql版本
问题6
来源表没有更新
来源表必须要有个主键,否则只会增加不会更新
问题7
Caused by: org.apache.flink.table.api.TableException: Column 'username' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='DROP' to suppress this exception and drop such records silently.
	at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processNotNullConstraint(ConstraintEnforcer.java:261)
	at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:241)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:425)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:520)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:110)
	at org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher.emitRecordsUnderCheckpointLock(DebeziumChangeFetcher.java:273)
	at org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:258)
	at org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:155)
	at org.apache.flink.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:447)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:114)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)

在flinkConfig中添加配置
数据中包含null 直接跳过

{
  "table.exec.sink.not-null-enforcer": "DROP"
}

测试checkpoint功能

{
  "execution.checkpointing.interval": 100,
  "execution.checkpointing.externalized-checkpoint-retention": "RETAIN_ON_CANCELLATION",
  "state.checkpoints.dir":"file:///Users/ispong/flink",
  "state.backend ":"filesystem",
  "restart-strategy.type":"disable"
}
配置项参数 默认值 描述
execution.checkpointing.interval null 多久check一次,如果不配置不会check
execution.checkpointing.externalized-checkpoint-retention NO_EXTERNALIZED_CHECKPOINTS DELETE_ON_CANCELLATION:取消删除 、RETAIN_ON_CANCELLATION取消失败保留、NO_EXTERNALIZED_CHECKPOINTS检查禁用
state.checkpoints.dir s3://mybucket/flink-app/checkpoints or hdfs://namenode:port/flink/checkpoints or file:///local , 需要和 state.backend配合使用
state.backend filesystem filesystem 存到文件系统

A表同步到B表,B表中age字段可为空,同步过程中将age字段改成非空
查看binlog文件

# SHOW MASTER STATUS;
rm /tmp/output.sql
mysqlbinlog --base64-output=DECODE-ROWS --verbose --start-datetime="2024-12-27 6:49:04" /var/lib/mysql/binlog.000005 > /tmp/output.sql
cat /tmp/output.sql

恢复数据启动

{
  "execution.checkpointing.interval": 100,
  "execution.checkpointing.externalized-checkpoint-retention": "RETAIN_ON_CANCELLATION",
  "state.checkpoints.dir":"file:///Users/ispong/flink",
  "state.backend ":"filesystem",
  "restart-strategy.type":"disable"
}
.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("file:///Users/ispong/flink/fd5cddc30b1a2fef3b4d1dc5e54777a3/chk-1",true, RestoreMode.CLAIM))

flink checkpoint使用
https://ispong.isxcode.com/hadoop/flink/flink checkpoint使用/
Author
ispong
Posted on
December 24, 2024
Licensed under