spark debezium实时同步

Last updated on November 22, 2024 pm

🧙 Questions

使用debezium实时同步,将数据库中的数据实时同步到kafka中

☄️ Ideas

docker安装zookeeper
docker run -it \
  --name isxcode-zookeeper \
  -p 2181:2181 \
  -p 2888:2888 \
  -p 3888:3888 \
  -v /Users/ispong/zookeeper/logs:/zookeeper/logs \
  -v /Users/ispong/zookeeper/data:/zookeeper/data \
  -v /Users/ispong/zookeeper/txns:/zookeeper/txns \
  -d debezium/zookeeper:3.0.0.Final
docker安装kafka

kafka版本 3.8.0

docker run -it \
  --name isxcode-kafka \
  -p 9092:9092 \
  -v /Users/ispong/kafka/logs:/kafka/logs \
  -v /Users/ispong/kafka/data:/kafka/data \
  # --link isxcode-zookeeper:isxcode-zookeeper \
  -e ZOOKEEPER_CONNECT=0.0.0.0:2181 \
  -d debezium/kafka:3.0.0.Final
docker安装debezium-connect

GROUP_ID,组id,id相同的为一个集群
CONFIG_STORAGE_TOPIC,Kafka Connect 将使用该主题保存连接器的配置信息。
OFFSET_STORAGE_TOPIC,偏移量用于追踪连接器在 Kafka 中的消费进度。
STATUS_STORAGE_TOPIC,状态主题用于记录连接器任务的运行状态。

docker run -it \
  --name isxcode-connect \
  -p 8083:8083 \
  -e GROUP_ID=1 \
  -e BOOTSTRAP_SERVERS=0.0.0.0:9092 \
  -e ZOOKEEPER_CONNECT=0.0.0.0:2181 \
  -e CONFIG_STORAGE_TOPIC=ispong_connect_configs \
  -e OFFSET_STORAGE_TOPIC=ispong_connect_offsets \
  -e STATUS_STORAGE_TOPIC=ispong_connect_statuses \
  # --link isxcode-kafka:isxcode-kafka \
  -d debezium/connect:3.0.0.Final
docker安装debezium-ui
docker run -it \
  --name debezium-ui \
  -p 8080:8080 \
  -e KAFKA_CONNECT_URIS=http://192.168.115.105:8083 \
  -d \
  debezium/debezium-ui:2.1.2.Final

访问界面
http://192.168.115.105:8080

使用界面创建监听

20241111160736

20241111160806

20241111160826

20241111161302

20241111161437

20241111161522

20241111161539

20241111161605

20241111161625

20241111161641

20241111161655

20241111161722

20241111161758

检测是否可用
# 进入kafka容器
docker exec -it isxcode-kafka bash
cd /kafka/bin
cat /etc/hosts
# 查看kafka的管道
kafka-topics.sh --bootstrap-server b2bfddfe41ee:9092 --list
kafka-console-consumer.sh --bootstrap-server 10.42.0.246:9092 --topic test2.ispong_db.ispong_test

20241111162150

20241111162256

需要耐心等待,或者配置config中的的起始点
sql查询binlog文件: show binary logs
“binlog.filename”: “binlog.000008”,
sql查询某个binlog中的pos位置: show binlog events in ‘binlog.000008’
“binlog.position”: “46903”

20241111165102


接口创建连接

表名一定要加schema前缀

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.115.105:8083/connectors/ -d '{
  "name": "ispong-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "192.168.168.116",
    "database.port": "30102",
    "database.user": "root",
    "database.password": "ispong123",
    "database.server.id": "1",
    "topic.prefix": "ispong-connector-prefix",
    "database.include.list": "ispong_db",
    "table.include.list": "ispong_.*",
    "table.exclude.list": "db.apaas_operate_log,db.apaas_data_log",
    "binlog.filename": "binlog.000008",
    "binlog.position": "4",
    "schema.history.internal.kafka.bootstrap.servers": "192.168.115.105:9092",
    "schema.history.internal.kafka.topic": "schemahistory.inventory"
  }
}'

如果record太大,需要添加一下配置

{
  "config":{
    // 添加以下配置项解决消息过大问题:
    "producer.override.max.request.size": "8388608",              // Kafka Producer 单条消息最大值,设置为 8MB
    "producer.override.compression.type": "gzip",                 // 启用消息压缩
    "consumer.override.fetch.max.bytes": "8388608",               // Kafka Consumer 拉取消息最大值
    "consumer.override.max.partition.fetch.bytes": "8388608",     // 每个分区可拉取的最大消息大小
    "max.batch.size": "1024",                                     // 限制批处理的大小
    "max.queue.size": "4096"                                      // 限制内存队列大小
  }
}
接口查看连接状态
curl 'http://192.168.115.105:8083/connectors/ispong-connector/status' \
  -H 'Accept: application/json' \
  --insecure
查看连接列表
curl 'http://192.168.115.105:8083/connectors/?expand=status' \
  -H 'Accept: application/json' \
  --insecure
接口查看连接配置
curl 'http://192.168.115.105:8083/connectors/ispong-connector/config' \
  -H 'Accept: application/json' \
  --insecure
接口暂停连接
curl 'http://192.168.115.105:8083/connectors/ispong-connector/pause' \
  -X 'PUT' \
  -H 'Content-Type: application/json' \
  --data-raw '{}'
接口恢复连接
curl 'http://192.168.115.105:8083/connectors/ispong-connector/resume' \
  -X 'PUT' \
  -H 'Content-Type: application/json' \
  --data-raw '{}'
接口重启连接
curl 'http://192.168.115.105:8083/connectors/ispong-connector/restart' \
  -H 'Content-Type: application/json' \
  --data-raw '{}'
接口删除连接
curl 'http://192.168.115.105:8083/connectors/ispong-connector' \
  -X 'DELETE' \
  -H 'Accept: application/json, text/plain, */*' \
  --insecure

spark debezium实时同步
https://ispong.isxcode.com/hadoop/spark/spark debezium实时同步/
Author
ispong
Posted on
November 11, 2024
Licensed under