spark debezium实时同步
Last updated on November 22, 2024 pm
🧙 Questions
使用debezium实时同步,将数据库中的数据实时同步到kafka中
☄️ Ideas
docker安装zookeeper
docker run -it \
--name isxcode-zookeeper \
-p 2181:2181 \
-p 2888:2888 \
-p 3888:3888 \
-v /Users/ispong/zookeeper/logs:/zookeeper/logs \
-v /Users/ispong/zookeeper/data:/zookeeper/data \
-v /Users/ispong/zookeeper/txns:/zookeeper/txns \
-d debezium/zookeeper:3.0.0.Final
docker安装kafka
kafka版本
3.8.0
docker run -it \
--name isxcode-kafka \
-p 9092:9092 \
-v /Users/ispong/kafka/logs:/kafka/logs \
-v /Users/ispong/kafka/data:/kafka/data \
# --link isxcode-zookeeper:isxcode-zookeeper \
-e ZOOKEEPER_CONNECT=0.0.0.0:2181 \
-d debezium/kafka:3.0.0.Final
docker安装debezium-connect
GROUP_ID,组id,id相同的为一个集群
CONFIG_STORAGE_TOPIC,Kafka Connect 将使用该主题保存连接器的配置信息。
OFFSET_STORAGE_TOPIC,偏移量用于追踪连接器在 Kafka 中的消费进度。
STATUS_STORAGE_TOPIC,状态主题用于记录连接器任务的运行状态。
docker run -it \
--name isxcode-connect \
-p 8083:8083 \
-e GROUP_ID=1 \
-e BOOTSTRAP_SERVERS=0.0.0.0:9092 \
-e ZOOKEEPER_CONNECT=0.0.0.0:2181 \
-e CONFIG_STORAGE_TOPIC=ispong_connect_configs \
-e OFFSET_STORAGE_TOPIC=ispong_connect_offsets \
-e STATUS_STORAGE_TOPIC=ispong_connect_statuses \
# --link isxcode-kafka:isxcode-kafka \
-d debezium/connect:3.0.0.Final
docker安装debezium-ui
docker run -it \
--name debezium-ui \
-p 8080:8080 \
-e KAFKA_CONNECT_URIS=http://192.168.115.105:8083 \
-d \
debezium/debezium-ui:2.1.2.Final
使用界面创建监听
检测是否可用
# 进入kafka容器
docker exec -it isxcode-kafka bash
cd /kafka/bin
cat /etc/hosts
# 查看kafka的管道
kafka-topics.sh --bootstrap-server b2bfddfe41ee:9092 --list
kafka-console-consumer.sh --bootstrap-server 10.42.0.246:9092 --topic test2.ispong_db.ispong_test
需要耐心等待,或者配置config中的的起始点
sql查询binlog文件: show binary logs
“binlog.filename”: “binlog.000008”,
sql查询某个binlog中的pos位置: show binlog events in ‘binlog.000008’
“binlog.position”: “46903”
接口创建连接
表名一定要加schema前缀
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.115.105:8083/connectors/ -d '{
"name": "ispong-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "192.168.168.116",
"database.port": "30102",
"database.user": "root",
"database.password": "ispong123",
"database.server.id": "1",
"topic.prefix": "ispong-connector-prefix",
"database.include.list": "ispong_db",
"table.include.list": "ispong_.*",
"table.exclude.list": "db.apaas_operate_log,db.apaas_data_log",
"binlog.filename": "binlog.000008",
"binlog.position": "4",
"schema.history.internal.kafka.bootstrap.servers": "192.168.115.105:9092",
"schema.history.internal.kafka.topic": "schemahistory.inventory"
}
}'
如果record太大,需要添加一下配置
{
"config":{
// 添加以下配置项解决消息过大问题:
"producer.override.max.request.size": "8388608", // Kafka Producer 单条消息最大值,设置为 8MB
"producer.override.compression.type": "gzip", // 启用消息压缩
"consumer.override.fetch.max.bytes": "8388608", // Kafka Consumer 拉取消息最大值
"consumer.override.max.partition.fetch.bytes": "8388608", // 每个分区可拉取的最大消息大小
"max.batch.size": "1024", // 限制批处理的大小
"max.queue.size": "4096" // 限制内存队列大小
}
}
接口查看连接状态
curl 'http://192.168.115.105:8083/connectors/ispong-connector/status' \
-H 'Accept: application/json' \
--insecure
查看连接列表
curl 'http://192.168.115.105:8083/connectors/?expand=status' \
-H 'Accept: application/json' \
--insecure
接口查看连接配置
curl 'http://192.168.115.105:8083/connectors/ispong-connector/config' \
-H 'Accept: application/json' \
--insecure
接口暂停连接
curl 'http://192.168.115.105:8083/connectors/ispong-connector/pause' \
-X 'PUT' \
-H 'Content-Type: application/json' \
--data-raw '{}'
接口恢复连接
curl 'http://192.168.115.105:8083/connectors/ispong-connector/resume' \
-X 'PUT' \
-H 'Content-Type: application/json' \
--data-raw '{}'
接口重启连接
curl 'http://192.168.115.105:8083/connectors/ispong-connector/restart' \
-H 'Content-Type: application/json' \
--data-raw '{}'
接口删除连接
curl 'http://192.168.115.105:8083/connectors/ispong-connector' \
-X 'DELETE' \
-H 'Accept: application/json, text/plain, */*' \
--insecure
🔗 Links
spark debezium实时同步
https://ispong.isxcode.com/hadoop/spark/spark debezium实时同步/