spark 集成debezium

Last updated on January 17, 2025 am

🧙 Questions

☄️ Ideas

安装zookeeper

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.7

安装kafka

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.7∂

检测kafka是否可用

docker exec -it kafka /bin/bash
env # 查看节点信息
# 创建测试topic
kafka-topics.sh --create --bootstrap-server 172.17.0.1:9092 --topic ispong-topic --replication-factor 1 --partitions 1 
# 创建消费者
kafka-console-consumer.sh --bootstrap-server 172.17.0.1:9092 --topic ispong-topic
# 创建生产者
kafka-console-producer.sh --topic ispong-topic --broker-list 172.17.0.1:9092
# 查询topic列表
kafka-topics.sh --list --bootstrap-server 172.17.0.1:9092

安装mysql

docker run \
  --name isxcode-mysql \
  --privileged=true \
  --restart=always \
  -d \
  -p 30306:3306 \
  -e MYSQL_ROOT_PASSWORD=ispong123 \
  -e MYSQL_DATABASE=isxcode_db \
  mysql:8.0

登录mysql测试

docker exec -it isxcode-mysql /bin/bash
mysql -h localhost -u root -pispong123 -P 3306

安装debezium

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link isxcode-mysql:mysql quay.io/debezium/connect:2.7

添加监听

topic-prefix: dbserver1
schema-topic: schemahistory.inventory

{
  "name": "inventory-connector",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    "database.hostname": "mysql",  
    "database.port": "3306",
    "database.user": "root",
    "database.password": "ispong123",
    "database.server.id": "184054",  
    "topic.prefix": "dbserver1",  
    "database.include.list": "isxcode_db",  
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",  
    "schema.history.internal.kafka.topic": "schema-changes.inventory"  
  }
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "ispong123", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "isxcode_db", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }'

验证监听是否加好

curl -H "Accept:application/json" localhost:8083/connectors/

查看监听信息

curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

创建表

create table isxcode_db.users2(
  username varchar(200),
  age int
);

推送内容

schemahistory.inventory
{
  "source" : {
    "server" : "dbserver1"
  },
  "position" : {
    "ts_sec" : 1723530898,
    "file" : "binlog.000002",
    "pos" : 391,
    "server_id" : 1
  },
  "ts_ms" : 1723530898090,
  "databaseName" : "isxcode_db",
  "ddl" : "create table isxcode_db.users(\n  username varchar(200),\n  age int\n)",
  "tableChanges" : [ {
    "type" : "CREATE",
    "id" : "\"isxcode_db\".\"users\"",
    "table" : {
      "defaultCharsetName" : "utf8mb4",
      "primaryKeyColumnNames" : [ ],
      "columns" : [ {
        "name" : "username",
        "jdbcType" : 12,
        "typeName" : "VARCHAR",
        "typeExpression" : "VARCHAR",
        "charsetName" : "utf8mb4",
        "length" : 200,
        "position" : 1,
        "optional" : true,
        "autoIncremented" : false,
        "generated" : false,
        "comment" : null,
        "hasDefaultValue" : true,
        "enumValues" : [ ]
      }, {
        "name" : "age",
        "jdbcType" : 4,
        "typeName" : "INT",
        "typeExpression" : "INT",
        "charsetName" : null,
        "position" : 2,
        "optional" : true,
        "autoIncremented" : false,
        "generated" : false,
        "comment" : null,
        "hasDefaultValue" : true,
        "enumValues" : [ ]
      } ],
      "attributes" : [ ]
    },
    "comment" : null
  } ]
}
dbserver1
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"field":"databaseName"},{"type":"string","optional":true,"field":"schemaName"},{"type":"string","optional":true,"field":"ddl"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"type"},{"type":"string","optional":false,"field":"id"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"defaultCharsetName"},{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"primaryKeyColumnNames"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"jdbcType"},{"type":"int32","optional":true,"field":"nativeType"},{"type":"string","optional":false,"field":"typeName"},{"type":"string","optional":true,"field":"typeExpression"},{"type":"string","optional":true,"field":"charsetName"},{"type":"int32","optional":true,"field":"length"},{"type":"int32","optional":true,"field":"scale"},{"type":"int32","optional":false,"field":"position"},{"type":"boolean","optional":true,"field":"optional"},{"type":"boolean","optional":true,"field":"autoIncremented"},{"type":"boolean","optional":true,"field":"generated"},{"type":"string","optional":true,"field":"comment"},{"type":"string","optional":true,"field":"defaultValueExpression"},{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"enumValues"}],"optional":false,"name":"io.debezium.connector.schema.Column","version":1},"optional":false,"field":"columns"},{"type":"string","optional":true,"field":"comment"}],"optional":true,"name":"io.debezium.connector.schema.Table","version":1,"field":"table"}],"optional":false,"name":"io.debezium.connector.schema.Change","version":1},"optional":false,"field":"tableChanges"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeValue","version":1},"payload":{"source":{"version":"2.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1723530898000,"snapshot":"false","db":"isxcode_db","sequence":null,"ts_us":1723530898000000,"ts_ns":1723530898000000000,"table":"users","server_id":1,"gtid":null,"file":"binlog.000002","pos":234,"row":0,"thread":null,"query":null},"ts_ms":1723530898090,"databaseName":"isxcode_db","schemaName":null,"ddl":"create table isxcode_db.users(\n  username varchar(200),\n  age int\n)","tableChanges":[{"type":"CREATE","id":"\"isxcode_db\".\"users\"","table":{"defaultCharsetName":"utf8mb4","primaryKeyColumnNames":[],"columns":[{"name":"username","jdbcType":12,"nativeType":null,"typeName":"VARCHAR","typeExpression":"VARCHAR","charsetName":"utf8mb4","length":200,"scale":null,"position":1,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"defaultValueExpression":null,"enumValues":null},{"name":"age","jdbcType":4,"nativeType":null,"typeName":"INT","typeExpression":"INT","charsetName":null,"length":null,"scale":null,"position":2,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"defaultValueExpression":null,"enumValues":null}],"comment":null}}]}}

插入数据

insert into isxcode_db.users values ('zhangsan',12);
{
    "payload": {
        "after": {
            "age": 12,
            "username": "zhangsan"
        },
        "before": null,
        "op": "c",
        "source": {
            "connector": "mysql",
            "db": "isxcode_db",
            "file": "binlog.000002",
            "gtid": null,
            "name": "dbserver1",
            "pos": 2321,
            "query": null,
            "row": 0,
            "sequence": null,
            "server_id": 1,
            "snapshot": "false",
            "table": "users",
            "thread": 8,
            "ts_ms": 1723531548000,
            "ts_ns": 1723531548000000000,
            "ts_us": 1723531548000000,
            "version": "2.7.1.Final"
        },
        "transaction": null,
        "ts_ms": 1723531548228,
        "ts_ns": 1723531548228393700,
        "ts_us": 1723531548228393
    },
    "schema": {
        "fields": [
            {
                "field": "before",
                "fields": [
                    {
                        "field": "username",
                        "optional": true,
                        "type": "string"
                    },
                    {
                        "field": "age",
                        "optional": true,
                        "type": "int32"
                    }
                ],
                "name": "dbserver1.isxcode_db.users.Value",
                "optional": true,
                "type": "struct"
            },
            {
                "field": "after",
                "fields": [
                    {
                        "field": "username",
                        "optional": true,
                        "type": "string"
                    },
                    {
                        "field": "age",
                        "optional": true,
                        "type": "int32"
                    }
                ],
                "name": "dbserver1.isxcode_db.users.Value",
                "optional": true,
                "type": "struct"
            },
            {
                "field": "source",
                "fields": [
                    {
                        "field": "version",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "connector",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "name",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "ts_ms",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "default": "false",
                        "field": "snapshot",
                        "name": "io.debezium.data.Enum",
                        "optional": true,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "type": "string",
                        "version": 1
                    },
                    {
                        "field": "db",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "sequence",
                        "optional": true,
                        "type": "string"
                    },
                    {
                        "field": "ts_us",
                        "optional": true,
                        "type": "int64"
                    },
                    {
                        "field": "ts_ns",
                        "optional": true,
                        "type": "int64"
                    },
                    {
                        "field": "table",
                        "optional": true,
                        "type": "string"
                    },
                    {
                        "field": "server_id",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "field": "gtid",
                        "optional": true,
                        "type": "string"
                    },
                    {
                        "field": "file",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "pos",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "field": "row",
                        "optional": false,
                        "type": "int32"
                    },
                    {
                        "field": "thread",
                        "optional": true,
                        "type": "int64"
                    },
                    {
                        "field": "query",
                        "optional": true,
                        "type": "string"
                    }
                ],
                "name": "io.debezium.connector.mysql.Source",
                "optional": false,
                "type": "struct"
            },
            {
                "field": "transaction",
                "fields": [
                    {
                        "field": "id",
                        "optional": false,
                        "type": "string"
                    },
                    {
                        "field": "total_order",
                        "optional": false,
                        "type": "int64"
                    },
                    {
                        "field": "data_collection_order",
                        "optional": false,
                        "type": "int64"
                    }
                ],
                "name": "event.block",
                "optional": true,
                "type": "struct",
                "version": 1
            },
            {
                "field": "op",
                "optional": false,
                "type": "string"
            },
            {
                "field": "ts_ms",
                "optional": true,
                "type": "int64"
            },
            {
                "field": "ts_us",
                "optional": true,
                "type": "int64"
            },
            {
                "field": "ts_ns",
                "optional": true,
                "type": "int64"
            }
        ],
        "name": "dbserver1.isxcode_db.users.Envelope",
        "optional": false,
        "type": "struct",
        "version": 2
    }
}

删除数据

delete from isxcode_db.users where age =13;
{
    "payload": {
        "after": null,
        "before": {
            "age": 13,
            "username": "zhangsan"
        },
        "op": "d"
}

更新数据

update isxcode_db.users set age = 14 where age = 13;
{
    "payload": {
        "after": {
            "age": 14,
            "username": "zhangsan"
        },
        "before": {
            "age": 13,
            "username": "zhangsan"
        },
        "op": "u"
    }
}

spark 集成debezium
https://ispong.isxcode.com/hadoop/spark/spark 集成debezium/
Author
ispong
Posted on
August 13, 2024
Licensed under