flink checkpoint使用
Last updated on January 17, 2025 am
🧙 Questions
检测flink实时同步,测试checkpoint和savepoint逻辑
☄️ Ideas
下载依赖
- flink-sql-connector-mysql-cdc-3.2.1.jar
- flink-connector-jdbc-3.1.2-1.18.jar
- mysql-connector-j-8.0.32.jar
本地启动mysql
docker run \
--name ispong-mysql \
--privileged=true \
-d \
-p 30306:3306 \
-e MYSQL_ROOT_PASSWORD=root123 \
-e MYSQL_DATABASE=isxcode_db \
mysql:8.0
创建表
create table cdc_source
(
username varchar(200) null,
age int null
);
create table cdc_target
(
username varchar(200) null,
age int null
);
编写flinksql
CREATE TABLE from_table(
username STRING,
age INT
) WITH (
'connector'='mysql-cdc',
'hostname' = 'localhost',
'port' = '30306',
'username' = 'root',
'password' = 'root123',
'database-name' = 'isxcode_db',
'table-name' = 'cdc_source',
'scan.incremental.snapshot.enabled'='false',
'server-time-zone'='UTC',
'scan.startup.mode'='latest-offset'
);
CREATE TABLE target_table(
username STRING,
age INT,
PRIMARY KEY(username) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://localhost:30306/isxcode_db',
'driver'='com.mysql.cj.jdbc.Driver',
'table-name'='cdc_target',
'username'='root',
'password'='root123');
INSERT INTO target_table select * from from_table;
CREATE TABLE from_table(
username STRING,
age INT
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://localhost:30306/isxcode_db',
'driver'='com.mysql.cj.jdbc.Driver',
'table-name'='cdc_source',
'username'='root',
'password'='root123',
'scan.fetch-size'= '1'
);
CREATE TABLE target_table(
username STRING,
age INT
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://localhost:30306/isxcode_db',
'driver'='com.mysql.cj.jdbc.Driver',
'table-name'='cdc_target',
'username'='root',
'password'='root123',
'sink.buffer-flush.max-rows'='1');
INSERT INTO target_table select * from from_table;
常见问题
问题1
'scan.incremental.snapshot.chunk.key-column' is required for table without primary key when 'scan.incremental.snapshot.enabled' enabled.
在来源表添加
'scan.incremental.snapshot.enabled'='false'
问题2
The main method caused an error: please declare primary key for sink table when query contains update/delete record.
目标表添加
PRIMARY KEY(username) NOT ENFORCED
问题3
The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
-- 时区问题
show variables like '%time_zone%';
set time_zone='+8:00';
-- 或者配置
来源表
'server-time-zone'='UTC'
问题4
Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD or FLUSH_TABLES privilege(s) for this operation
-- 权限不足
SHOW GRANTS for ispong;
GRANT SELECT,RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'ispong'@'%';
FLUSH PRIVILEGES;
SHOW GRANTS for ispong;
问题5
Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'MASTER STATUS' at line 1
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:121)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1200)
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:553)
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496)
at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.determineSnapshotOffset(MySqlSnapshotChangeEventSource.java:276)
at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.determineSnapshotOffset(MySqlSnapshotChangeEventSource.java:46)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:113)
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
... 8 more
不支持8.4以上的版本,换8.0的mysql版本
问题6
来源表没有更新
来源表必须要有个主键,否则只会增加不会更新
问题7
Caused by: org.apache.flink.table.api.TableException: Column 'username' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='DROP' to suppress this exception and drop such records silently.
at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processNotNullConstraint(ConstraintEnforcer.java:261)
at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:241)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:425)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:520)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:110)
at org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher.emitRecordsUnderCheckpointLock(DebeziumChangeFetcher.java:273)
at org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:258)
at org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:155)
at org.apache.flink.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:447)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:114)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
在flinkConfig中添加配置
数据中包含null 直接跳过
{
"table.exec.sink.not-null-enforcer": "DROP"
}
测试checkpoint功能
{
"execution.checkpointing.interval": 100,
"execution.checkpointing.externalized-checkpoint-retention": "RETAIN_ON_CANCELLATION",
"state.checkpoints.dir":"file:///Users/ispong/flink",
"state.backend ":"filesystem",
"restart-strategy.type":"disable"
}
配置项参数 | 默认值 | 描述 |
---|---|---|
execution.checkpointing.interval | null | 多久check一次,如果不配置不会check |
execution.checkpointing.externalized-checkpoint-retention | NO_EXTERNALIZED_CHECKPOINTS | DELETE_ON_CANCELLATION:取消删除 、RETAIN_ON_CANCELLATION取消失败保留、NO_EXTERNALIZED_CHECKPOINTS检查禁用 |
state.checkpoints.dir | s3://mybucket/flink-app/checkpoints or hdfs://namenode:port/flink/checkpoints or file:///local , 需要和 state.backend配合使用 | |
state.backend | filesystem | filesystem 存到文件系统 |
A表同步到B表,B表中age字段可为空,同步过程中将age字段改成非空
查看binlog文件
# SHOW MASTER STATUS;
rm /tmp/output.sql
mysqlbinlog --base64-output=DECODE-ROWS --verbose --start-datetime="2024-12-27 6:49:04" /var/lib/mysql/binlog.000005 > /tmp/output.sql
cat /tmp/output.sql
恢复数据启动
{
"execution.checkpointing.interval": 100,
"execution.checkpointing.externalized-checkpoint-retention": "RETAIN_ON_CANCELLATION",
"state.checkpoints.dir":"file:///Users/ispong/flink",
"state.backend ":"filesystem",
"restart-strategy.type":"disable"
}
.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("file:///Users/ispong/flink/fd5cddc30b1a2fef3b4d1dc5e54777a3/chk-1",true, RestoreMode.CLAIM))
🔗 Links
flink checkpoint使用
https://ispong.isxcode.com/hadoop/flink/flink checkpoint使用/