flink chunjun实时同步
Last updated on November 20, 2024 am
🧙 Questions
- binlog实时到mysql
- binlog实时到hive
- binlog实时到kafka
- binlog实时到doris
- kafka实时到mysql
- kafka实时到hive
- kafka实时到kafka
- kafka实时到doris
☄️ Ideas
注意
有个guava版本问题,建议使用19.0版本的guava
binlog实时到mysql
binlog 一定要配置host和port
创建canal用户
show global variables like '%binlog_format%';
CREATE USER IF NOT EXISTS 'canal'@'%' Identified BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
修改mysql.cnf
log_bin = /var/lib/mysql/mysql-bin
binlog_format = ROW
{
"job": {
"content": [
{
"reader": {
"name": "binlogreader",
"parameter": {
"jdbcUrl": "jdbc:mysql://isxcode:30102/ispong_db",
"table": ["ispong_db.users"],
"password": "Define123...",
"database": "ispong_db",
"port": 30102,
"cat": "insert,update,delete",
"host": "172.23.39.227",
"pavingData": true,
"username": "root"
}
},
"writer" : {
"parameter" : {
"print" : true
},
"name" : "streamwriter"
}
}
],
"setting" : {
"speed" : {
"bytes" : 0,
"channel" : 1
}
}
}
}
mysql实时同步到hive
检查binlog是否开启
不支持update 和 delete
show variables like 'log_%';
{
"job": {
"content": [
{
"reader": {
"name": "binlogreader",
"parameter": {
"jdbcUrl": "jdbc:mysql://isxcode:30102/ispong_db?useSSL=false",
"table": ["ispong_db.users"],
"password": "Define123...",
"database": "ispong_db",
"username":"root",
"port": 30102,
"cat": "insert,update,delete",
"host": "isxcode",
"column":[
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"pavingData": true,
"splitUpdate" : true
}
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"print" : true,
"jdbcUrl" : "jdbc:hive2://isxcode:30115/ispong_db",
"username" : "ispong",
"password" : "",
"fileType" : "text",
"fieldDelimiter" : ",",
"writeMode" : "overwrite",
"partition": "pt",
"partitionType": "DAY",
"tablesColumn" : "{\"users_sink\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
"defaultFS" : "hdfs://isxcode:30116/",
"hadoopConfig" : {
"fs.defaultFS": "hdfs://isxcode:30116/",
"hadoop.user.name": "ispong"
}
}
}
}
],
"setting": {
"restore": {
"isRestore": true,
"isStream": true
},
"errorLimit": {},
"speed": {
"bytes": 0,
"channel": 1
}
}
}
}
kafka实时同步到mysql
{
"job": {
"content": [
{
"reader": {
"parameter": {
"topic": "ispong-topic",
"mode": "earliest-offset",
"timestamp": 1000,
"offset": "EARLIEST",
"groupId": "test-consumer-group",
"codec": "json",
"column": [
{
"name": "username",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"consumerSettings": {
"bootstrap.servers": "isxcode:30120",
"auto.commit.enable": "false"
}
},
"name": "kafkasource"
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"print": true,
"connection": [
{
"jdbcUrl": "jdbc:mysql://isxcode:30102/ispong_db?useSSL=false&allowPublicKeyRetrieval=true",
"table": ["users_sink"]
}
],
"username": "root",
"password": "Define123...",
"column": [
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"mode": "insert"
}
}
}
],
"setting": {
"restore": {
"isRestore": true,
"isStream": true
},
"errorLimit": {},
"speed": {
"bytes": 0,
"channel": 1
}
}
}
}
kafka实时到hive
{
"job": {
"content": [
{
"reader": {
"parameter": {
"topic": "ispong-topic",
"mode": "earliest-offset",
"timestamp": 1000,
"offset": "EARLIEST",
"groupId": "test-consumer-group",
"codec": "json",
"column": [
{
"name": "username",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"consumerSettings": {
"bootstrap.servers": "isxcode:30120",
"auto.commit.enable": "false"
}
},
"name": "kafkasource"
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
],
"setting": {
"restore": {
"isRestore": true,
"isStream": true
},
"speed": {
"readerChannel": 1,
"writerChannel": 1
}
}
}
}
提交作业
bash /opt/chunjun/bin/chunjun-yarn-perjob.sh -job /home/ispong/mysql_to_hive.json
🔗 Links
flink chunjun实时同步
https://ispong.isxcode.com/hadoop/flink/flink chunjun实时同步/