spark 集成debezium
Last updated on January 17, 2025 am
🧙 Questions
☄️ Ideas
安装zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.7
安装kafka
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.7∂
检测kafka是否可用
docker exec -it kafka /bin/bash
env # 查看节点信息
# 创建测试topic
kafka-topics.sh --create --bootstrap-server 172.17.0.1:9092 --topic ispong-topic --replication-factor 1 --partitions 1
# 创建消费者
kafka-console-consumer.sh --bootstrap-server 172.17.0.1:9092 --topic ispong-topic
# 创建生产者
kafka-console-producer.sh --topic ispong-topic --broker-list 172.17.0.1:9092
# 查询topic列表
kafka-topics.sh --list --bootstrap-server 172.17.0.1:9092
安装mysql
docker run \
--name isxcode-mysql \
--privileged=true \
--restart=always \
-d \
-p 30306:3306 \
-e MYSQL_ROOT_PASSWORD=ispong123 \
-e MYSQL_DATABASE=isxcode_db \
mysql:8.0
登录mysql测试
docker exec -it isxcode-mysql /bin/bash
mysql -h localhost -u root -pispong123 -P 3306
安装debezium
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link isxcode-mysql:mysql quay.io/debezium/connect:2.7
添加监听
topic-prefix: dbserver1
schema-topic: schemahistory.inventory
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "ispong123",
"database.server.id": "184054",
"topic.prefix": "dbserver1",
"database.include.list": "isxcode_db",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "ispong123", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "isxcode_db", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }'
验证监听是否加好
curl -H "Accept:application/json" localhost:8083/connectors/
查看监听信息
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
创建表
create table isxcode_db.users2(
username varchar(200),
age int
);
推送内容
schemahistory.inventory
{
"source" : {
"server" : "dbserver1"
},
"position" : {
"ts_sec" : 1723530898,
"file" : "binlog.000002",
"pos" : 391,
"server_id" : 1
},
"ts_ms" : 1723530898090,
"databaseName" : "isxcode_db",
"ddl" : "create table isxcode_db.users(\n username varchar(200),\n age int\n)",
"tableChanges" : [ {
"type" : "CREATE",
"id" : "\"isxcode_db\".\"users\"",
"table" : {
"defaultCharsetName" : "utf8mb4",
"primaryKeyColumnNames" : [ ],
"columns" : [ {
"name" : "username",
"jdbcType" : 12,
"typeName" : "VARCHAR",
"typeExpression" : "VARCHAR",
"charsetName" : "utf8mb4",
"length" : 200,
"position" : 1,
"optional" : true,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : true,
"enumValues" : [ ]
}, {
"name" : "age",
"jdbcType" : 4,
"typeName" : "INT",
"typeExpression" : "INT",
"charsetName" : null,
"position" : 2,
"optional" : true,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : true,
"enumValues" : [ ]
} ],
"attributes" : [ ]
},
"comment" : null
} ]
}
dbserver1
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"field":"databaseName"},{"type":"string","optional":true,"field":"schemaName"},{"type":"string","optional":true,"field":"ddl"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"field":"id"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"defaultCharsetName"},{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"primaryKeyColumnNames"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"jdbcType"},{"type":"int32","optional":true,"field":"nativeType"},{"type":"string","optional":false,"field":"typeName"},{"type":"string","optional":true,"field":"typeExpression"},{"type":"string","optional":true,"field":"charsetName"},{"type":"int32","optional":true,"field":"length"},{"type":"int32","optional":true,"field":"scale"},{"type":"int32","optional":false,"field":"position"},{"type":"boolean","optional":true,"field":"optional"},{"type":"boolean","optional":true,"field":"autoIncremented"},{"type":"boolean","optional":true,"field":"generated"},{"type":"string","optional":true,"field":"comment"},{"type":"string","optional":true,"field":"defaultValueExpression"},{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"enumValues"}],"optional":false,"name":"io.debezium.connector.schema.Column","version":1},"optional":false,"field":"columns"},{"type":"string","optional":true,"field":"comment"}],"optional":true,"name":"io.debezium.connector.schema.Table","version":1,"field":"table"}],"optional":false,"name":"io.debezium.connector.schema.Change","version":1},"optional":false,"field":"tableChanges"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeValue","version":1},"payload":{"source":{"version":"2.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1723530898000,"snapshot":"false","db":"isxcode_db","sequence":null,"ts_us":1723530898000000,"ts_ns":1723530898000000000,"table":"users","server_id":1,"gtid":null,"file":"binlog.000002","pos":234,"row":0,"thread":null,"query":null},"ts_ms":1723530898090,"databaseName":"isxcode_db","schemaName":null,"ddl":"create table isxcode_db.users(\n username varchar(200),\n age int\n)","tableChanges":[{"type":"CREATE","id":"\"isxcode_db\".\"users\"","table":{"defaultCharsetName":"utf8mb4","primaryKeyColumnNames":[],"columns":[{"name":"username","jdbcType":12,"nativeType":null,"typeName":"VARCHAR","typeExpression":"VARCHAR","charsetName":"utf8mb4","length":200,"scale":null,"position":1,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"defaultValueExpression":null,"enumValues":null},{"name":"age","jdbcType":4,"nativeType":null,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":null,"scale":null,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"defaultValueExpression":null,"enumValues":null}],"comment":null}}]}}
插入数据
insert into isxcode_db.users values ('zhangsan',12);
{
"payload": {
"after": {
"age": 12,
"username": "zhangsan"
},
"before": null,
"op": "c",
"source": {
"connector": "mysql",
"db": "isxcode_db",
"file": "binlog.000002",
"gtid": null,
"name": "dbserver1",
"pos": 2321,
"query": null,
"row": 0,
"sequence": null,
"server_id": 1,
"snapshot": "false",
"table": "users",
"thread": 8,
"ts_ms": 1723531548000,
"ts_ns": 1723531548000000000,
"ts_us": 1723531548000000,
"version": "2.7.1.Final"
},
"transaction": null,
"ts_ms": 1723531548228,
"ts_ns": 1723531548228393700,
"ts_us": 1723531548228393
},
"schema": {
"fields": [
{
"field": "before",
"fields": [
{
"field": "username",
"optional": true,
"type": "string"
},
{
"field": "age",
"optional": true,
"type": "int32"
}
],
"name": "dbserver1.isxcode_db.users.Value",
"optional": true,
"type": "struct"
},
{
"field": "after",
"fields": [
{
"field": "username",
"optional": true,
"type": "string"
},
{
"field": "age",
"optional": true,
"type": "int32"
}
],
"name": "dbserver1.isxcode_db.users.Value",
"optional": true,
"type": "struct"
},
{
"field": "source",
"fields": [
{
"field": "version",
"optional": false,
"type": "string"
},
{
"field": "connector",
"optional": false,
"type": "string"
},
{
"field": "name",
"optional": false,
"type": "string"
},
{
"field": "ts_ms",
"optional": false,
"type": "int64"
},
{
"default": "false",
"field": "snapshot",
"name": "io.debezium.data.Enum",
"optional": true,
"parameters": {
"allowed": "true,last,false,incremental"
},
"type": "string",
"version": 1
},
{
"field": "db",
"optional": false,
"type": "string"
},
{
"field": "sequence",
"optional": true,
"type": "string"
},
{
"field": "ts_us",
"optional": true,
"type": "int64"
},
{
"field": "ts_ns",
"optional": true,
"type": "int64"
},
{
"field": "table",
"optional": true,
"type": "string"
},
{
"field": "server_id",
"optional": false,
"type": "int64"
},
{
"field": "gtid",
"optional": true,
"type": "string"
},
{
"field": "file",
"optional": false,
"type": "string"
},
{
"field": "pos",
"optional": false,
"type": "int64"
},
{
"field": "row",
"optional": false,
"type": "int32"
},
{
"field": "thread",
"optional": true,
"type": "int64"
},
{
"field": "query",
"optional": true,
"type": "string"
}
],
"name": "io.debezium.connector.mysql.Source",
"optional": false,
"type": "struct"
},
{
"field": "transaction",
"fields": [
{
"field": "id",
"optional": false,
"type": "string"
},
{
"field": "total_order",
"optional": false,
"type": "int64"
},
{
"field": "data_collection_order",
"optional": false,
"type": "int64"
}
],
"name": "event.block",
"optional": true,
"type": "struct",
"version": 1
},
{
"field": "op",
"optional": false,
"type": "string"
},
{
"field": "ts_ms",
"optional": true,
"type": "int64"
},
{
"field": "ts_us",
"optional": true,
"type": "int64"
},
{
"field": "ts_ns",
"optional": true,
"type": "int64"
}
],
"name": "dbserver1.isxcode_db.users.Envelope",
"optional": false,
"type": "struct",
"version": 2
}
}
删除数据
delete from isxcode_db.users where age =13;
{
"payload": {
"after": null,
"before": {
"age": 13,
"username": "zhangsan"
},
"op": "d"
}
更新数据
update isxcode_db.users set age = 14 where age = 13;
{
"payload": {
"after": {
"age": 14,
"username": "zhangsan"
},
"before": {
"age": 13,
"username": "zhangsan"
},
"op": "u"
}
}
🔗 Links
spark 集成debezium
https://ispong.isxcode.com/hadoop/spark/spark 集成debezium/