flink chunjun实时同步

Last updated on November 20, 2024 am

🧙 Questions

☄️ Ideas

注意

有个guava版本问题,建议使用19.0版本的guava

binlog实时到mysql

binlog 一定要配置host和port

创建canal用户

show global variables like '%binlog_format%';

CREATE USER IF NOT EXISTS 'canal'@'%' Identified BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

修改mysql.cnf

log_bin = /var/lib/mysql/mysql-bin
binlog_format = ROW
{ 
  "job": {
    "content": [
      {
        "reader": {
          "name": "binlogreader",
          "parameter": {
            "jdbcUrl": "jdbc:mysql://isxcode:30102/ispong_db",
            "table": ["ispong_db.users"],
            "password": "Define123...",
            "database": "ispong_db",
            "port": 30102,
            "cat": "insert,update,delete",
            "host": "172.23.39.227",
            "pavingData": true,
            "username": "root"
          }
        },
         "writer" : {
        "parameter" : {
          "print" : true
        },
        "name" : "streamwriter"
      }
      }
    ],
    "setting" : {
      "speed" : {
        "bytes" : 0,
        "channel" : 1
      }
    }
  }
}
mysql实时同步到hive

检查binlog是否开启
不支持update 和 delete

show variables like 'log_%';
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "binlogreader",
          "parameter": {
            "jdbcUrl": "jdbc:mysql://isxcode:30102/ispong_db?useSSL=false",
            "table": ["ispong_db.users"],
            "password": "Define123...",
            "database": "ispong_db",
            "username":"root",
            "port": 30102,
            "cat": "insert,update,delete",
            "host": "isxcode",
            "column":[
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "pavingData": true,
            "splitUpdate" : true
          }
        },
           "writer": {
            "name" : "hivewriter",
            "parameter" : {
            "print" : true,
            "jdbcUrl" : "jdbc:hive2://isxcode:30115/ispong_db",
            "username" : "ispong",
            "password" : "",
            "fileType" : "text",
            "fieldDelimiter" : ",",
            "writeMode" : "overwrite",
            "partition": "pt",
            "partitionType": "DAY",
            "tablesColumn" : "{\"users_sink\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
            "defaultFS" : "hdfs://isxcode:30116/",
            "hadoopConfig" : {
              "fs.defaultFS": "hdfs://isxcode:30116/",
              "hadoop.user.name": "ispong"
            }
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "isRestore": true,
        "isStream": true
      },
      "errorLimit": {},
      "speed": {
        "bytes": 0,
        "channel": 1
      }
    }
  }
}
kafka实时同步到mysql
{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "topic": "ispong-topic",
            "mode": "earliest-offset",
            "timestamp": 1000,
            "offset": "EARLIEST",
            "groupId": "test-consumer-group",
            "codec": "json",
            "column": [
              {
                "name": "username",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "consumerSettings": {
              "bootstrap.servers": "isxcode:30120",
              "auto.commit.enable": "false"
            }
          },
          "name": "kafkasource"
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "print": true,
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://isxcode:30102/ispong_db?useSSL=false&allowPublicKeyRetrieval=true",
                "table": ["users_sink"]
              }
            ],
            "username": "root",
            "password": "Define123...",
            "column": [
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "mode": "insert"
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "isRestore": true,
        "isStream": true
      },
      "errorLimit": {},
      "speed": {
        "bytes": 0,
        "channel": 1
      }
    }
  }
}
kafka实时到hive
{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "topic": "ispong-topic",
            "mode": "earliest-offset",
            "timestamp": 1000,
            "offset": "EARLIEST",
            "groupId": "test-consumer-group",
            "codec": "json",
            "column": [
              {
                "name": "username",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "consumerSettings": {
              "bootstrap.servers": "isxcode:30120",
              "auto.commit.enable": "false"
            }
          },
          "name": "kafkasource"
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "isRestore": true,
        "isStream": true
      },
      "speed": {
        "readerChannel": 1,
        "writerChannel": 1
      }
    }
  }
}
提交作业
bash /opt/chunjun/bin/chunjun-yarn-perjob.sh -job /home/ispong/mysql_to_hive.json

flink chunjun实时同步
https://ispong.isxcode.com/hadoop/flink/flink chunjun实时同步/
Author
ispong
Posted on
October 10, 2022
Licensed under