flink chunjun离线同步
Last updated on January 17, 2025 am
🧙 Questions
- 通用模板
- mysql离线同步到hive
- hive离线同步到mysql
- mysql离线同步到mysql
- hive离线同步到hive
- postgre离线同步到mysql
- postgre离线同步到hive
- hive离线同步到postgre
- hbase离线同步到hive
- hive离线同步到hbase
- mysql离线同步到sqlserver
- hive离线同步到sqlserver
- sqlserver离线同步到hive
- mysql离线同步到oracle
- hive离线同步到oracle
- oracle离线同步到hive
- mysql离线同步到doris
- hive离线同步到doris
☄️ Ideas
前提
初始化表
- mysql
create table users
(
username varchar(100),
age int
)
- hive
create table users
(
username string,
age int
)
PARTITIONED BY ( pt STRING )
ROW FORMAT
DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
- postgre
CREATE TABLE IF NOT EXISTS users
(
username varchar,
age int
);
通用模板
{
"job": {
"content": [
{
"reader": { // reader插件详细配置
"name" : "xxwriter", // reader 插件名称,具体名称参考各数据源配置文档
"parameter" : { // 数据源配置参数,具体配置参考各数据源配置文档
}
},
"writer": {} // writer插件详细配置
}
],
"setting": {
"speed": { // 速率限制
"channel": 1, // 任务并发数 默认为1
"readerChannel": -1, // source 并行度,默认-1代表采用全局并行度
"writerChannel": -1, // sink 并行度,默认-1代表采用全局并行度
"bytes": 0, // bytes >0 则表示开启任务限速
"rebalance" : false // 是否强制进行 rebalance,开启会消耗性能,默认false
},
"errorLimit": { // 出错控制
"record": 0, // 错误阈值,当错误记录数超过此阈值时任务失败,默认值为0
"percentage": 0.0 // 错误比例阈值,当错误记录比例超过此阈值时任务失败,默认值0.0
},
"metricPluginConf": {}, // 指标插件配置,可以接入普罗米修斯,进行实时监控
"restore": { // 任务类型及断点续传配置
"isStream" : false, // 是否为实时采集任务,默认false
"isRestore" : false, // 是否开启断点续传,默认false
"restoreColumnName" : "", // 断点续传字段名称,isRestore为true则必填
"restoreColumnIndex" : 0 // 断点续传字段索引 ID,isRestore为true则必填
},
"log": { // 日志记录配置
"isLogger": false, // 是否保存日志记录,默认值false
"level" : "info", // 日志级别,默认info
"path" : "/tmp/dtstack/chunjun/", // 服务器上日志保存路径,默认值/tmp/dtstack/chunjun/
"pattern":" log4j:%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n
logback : %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" // 日志输出格式
}, // 日志记录配置
"dirty": {} // 脏数据保存
}
}
}
mysql离线同步到hive
reader的name和writer的name,必须和chunjun中插件的名称保持一致。
hivewriter中操作的hive表,必须为分区表,默认是pt,默认分区类型为DAY。
自定义sql,需要指定清楚字段的类型,不能使用普通逗号隔开。
mysqlreader的jdbcUrl,必须写在connection里面,否则语法报错。
{
"job": {
"content": [
{
"reader": {
"name" : "mysqlreader",
"parameter" : {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true"]
}
],
"username" : "root",
"password" : "ispong123",
"column":[
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"customSql":"select username,age from users where username = 'zhangsan'"
}
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
"username" : "ispong",
"password" : "",
"fileType" : "text",
"fieldDelimiter" : ",",
"writeMode" : "overwrite",
"partition": "pt",
"partitionType": "DAY",
"tablesColumn" : "{\"users1\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
"defaultFS" : "hdfs://isxcode:9000/",
"hadoopConfig" : {
"fs.defaultFS": "hdfs://isxcode:9000/",
"hadoop.user.name": "ispong"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
hive离线同步到mysql
hive没有source,只能用hdfs作为数据来源
hdfswriter的fileType,只支持text、orc、parquet
{
"job": {
"content": [
{
"reader": {
"name" : "hdfsreader",
"parameter" : {
"path" : "hdfs://isxcode:9000/user/hive/warehouse/users1/pt=20221009",
"defaultFS" : "hdfs://isxcode:9000",
"fileType" : "text",
"fieldDelimiter":",",
"encoding":"UTF-8",
"column": [
{
"name": "username",
"type": "string"
},
{
"name": "age",
"type": "int"
}
]
}
},
"writer": {
"name" : "mysqlwriter",
"parameter" : {
"connection":[{
"jdbcUrl":"jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true",
"table": ["users"]
}],
"username" : "root",
"password" : "ispong123",
"column" : [
{
"name":"username",
"type":"varchar"
},
{
"name":"age",
"type":"int"
}
],
"mode":"insert"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
mysql离线同步到mysql
{
"job": {
"content": [
{
"reader": {
"name" : "mysqlreader",
"parameter" : {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true"]
}
],
"username" : "root",
"password" : "ispong123",
"column":[
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"customSql":"select username,age from users where username = 'zhangsan'"
}
},
"writer": {
"name" : "mysqlwriter",
"parameter" : {
"connection":[{
"jdbcUrl":"jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true",
"table": ["users_sink"]
}],
"username" : "root",
"password" : "ispong123",
"column" : [
{
"name":"username",
"type":"varchar"
},
{
"name":"age",
"type":"int"
}
],
"mode":"insert"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
hive离线同步到hive
hivewriter是走内网或者域名的方式将数据写到自己的hdfs中,所以需要在hdfsreader服务器中配置好域名映射
提前将writer中hive表权限,临时改为reader中执行用户
hadoop fs -chown -R ispong:hive /user/hive/warehouse/ispong_db.db/users
{
"job": {
"content": [
{
"reader": {
"name" : "hdfsreader",
"parameter" : {
"path" : "/user/hive/warehouse/ispong_db.db/users/",
"defaultFS" : "hdfs://isxcode:9000",
"fileType" : "text",
"fieldDelimiter":",",
"encoding":"UTF-8",
"column": [
{
"name": "username",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"hadoopConfig":{
"dfs.client.use.datanode.hostname":"true",
"dfs.replication":"1",
"fs.defaultFS": "hdfs://isxcode:9000",
"hadoop.user.name": "ispong"
}
}
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
"username" : "ispong",
"password" : "",
"fileType" : "text",
"fieldDelimiter" : ",",
"writeMode" : "overwrite",
"partition": "pt",
"partitionType": "DAY",
"tablesColumn" : "{\"users\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
"defaultFS" : "hdfs://isxcode:9000"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
postgre离线同步到mysql
{
"job": {
"content": [
{
"reader": {
"name" : "postgresqlreader",
"parameter" : {
"connection": [
{
"jdbcUrl": ["jdbc:postgresql://isxcode:54302/ispong_db?currentSchema=ispong_schema"]
}
],
"username" : "postgres",
"password" : "ispong123",
"column":[
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"customSql":"select username,age from users where username = 'zhangsan'"
}
},
"writer": {
"name" : "mysqlwriter",
"parameter" : {
"connection":[{
"jdbcUrl":"jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true",
"table": ["users"]
}],
"username" : "root",
"password" : "ispong123",
"column" : [
{
"name":"username",
"type":"varchar"
},
{
"name":"age",
"type":"int"
}
],
"mode":"insert"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
postgre离线同步到hive
{
"job": {
"content": [
{
"reader": {
"name" : "postgresqlreader",
"parameter" : {
"connection": [
{
"jdbcUrl": ["jdbc:postgresql://isxcode:54302/ispong_db?currentSchema=ispong_schema"]
}
],
"username" : "postgres",
"password" : "ispong123",
"column":[
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"customSql":"select username,age from users where username = 'zhangsan'"
}
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
"username" : "ispong",
"password" : "",
"fileType" : "text",
"fieldDelimiter" : ",",
"writeMode" : "overwrite",
"partition": "pt",
"partitionType": "DAY",
"tablesColumn" : "{\"users\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
"defaultFS" : "hdfs://isxcode:9000/",
"hadoopConfig" : {
"fs.defaultFS": "hdfs://isxcode:9000/",
"hadoop.user.name": "ispong",
"dfs.client.use.datanode.hostname":"true"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
hive离线同步到postgre
{
"job": {
"content": [
{
"reader": {
"name" : "hdfsreader",
"parameter" : {
"path" : "/user/hive/warehouse/users/",
"defaultFS" : "hdfs://isxcode:9000",
"fileType" : "text",
"fieldDelimiter":",",
"encoding":"UTF-8",
"column": [
{
"name": "username",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"hadoopConfig":{
"dfs.client.use.datanode.hostname":"true",
"dfs.replication":"1",
"fs.defaultFS": "hdfs://isxcode:9000",
"hadoop.user.name": "ispong"
}
}
},
"writer": {
"name" : "postgresqlwriter",
"parameter" : {
"connection": [
{
"jdbcUrl": "jdbc:postgresql://isxcode:54302/ispong_db?currentSchema=ispong_schema",
"schema":"ispong_schema",
"table":["users"]
}
],
"username" : "postgres",
"password" : "ispong123",
"column":[
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"writeMode" : "insert"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
hbase离线同步到hive
{
"job": {
"content": [
{
"reader": {
"name": "hbasereader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.property.clientPort": "2181",
"hbase.rootdir": "hdfs://isxcode:9000/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "isxcode",
"zookeeper.znode.parent": "/hbase"
},
"table" : "ispong_namespace:users",
"column": [
{
"name": "cf1:username",
"type": "string"
},
{
"name": "cf1:age",
"type": "int"
}
],
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": true,
"encoding": "utf-8"
}
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
"username" : "ispong",
"password" : "",
"fileType" : "text",
"fieldDelimiter" : ",",
"writeMode" : "overwrite",
"partition": "pt",
"partitionType": "DAY",
"tablesColumn" : "{\"users\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
"defaultFS" : "hdfs://isxcode:9000/",
"hadoopConfig" : {
"fs.defaultFS": "hdfs://isxcode:9000/",
"hadoop.user.name": "ispong",
"dfs.client.use.datanode.hostname":"true"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
hive离线同步到hbase
hbasewriter中int类型写入时会乱码
{
"job": {
"content": [
{
"reader": {
"name" : "hdfsreader",
"parameter" : {
"path" : "/user/hive/warehouse/users/",
"defaultFS" : "hdfs://isxcode:9000",
"fileType" : "text",
"fieldDelimiter":",",
"encoding":"UTF-8",
"column": [
{
"name": "username",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"hadoopConfig":{
"dfs.client.use.datanode.hostname":"true",
"dfs.replication":"1",
"fs.defaultFS": "hdfs://isxcode:9000",
"hadoop.user.name": "ispong"
}
}
},
"writer": {
"name" : "hbasewriter",
"parameter" : {
"hbaseConfig": {
"hbase.zookeeper.property.clientPort": "2181",
"hbase.rootdir": "hdfs://isxcode:9000/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "isxcode",
"zookeeper.znode.parent": "/hbase"
},
"table" : "ispong_namespace:users",
"rowkeyExpress" : "$(cf1:username)_$(cf1:age)",
"column": [
{
"name": "cf1:username",
"type": "string"
},
{
"name": "cf1:age",
"type": "string"
}
],
"encoding":"utf-8"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
mysql离线同步到sqlserver
{
"job": {
"content": [
{
"reader": {
"name" : "mysqlreader",
"parameter" : {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true"]
}
],
"username" : "root",
"password" : "ispong123",
"column":[
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"customSql":"select username,age from users where username = 'zhangsan'"
}
},
"writer": {
"name" : "sqlserverwriter",
"parameter" : {
"connection":[{
"jdbcUrl":"jdbc:sqlserver://isxcode:14303;DatabaseName=ispong_db",
"schema":"ispong_schema",
"table": ["users"]
}],
"username" : "sa",
"password" : "Ispong123",
"column" : [
{
"name":"username",
"type":"varchar"
},
{
"name":"age",
"type":"int"
}
],
"writeMode":"insert"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
hive离线同步到sqlserver
{
"job": {
"content": [
{
"reader": {
"name" : "hdfsreader",
"parameter" : {
"path" : "/user/hive/warehouse/users/",
"defaultFS" : "hdfs://isxcode:9000",
"fileType" : "text",
"fieldDelimiter":",",
"encoding":"UTF-8",
"column": [
{
"name": "username",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"hadoopConfig":{
"dfs.client.use.datanode.hostname":"true",
"dfs.replication":"1",
"fs.defaultFS": "hdfs://isxcode:9000",
"hadoop.user.name": "ispong"
}
}
},
"writer": {
"name" : "sqlserverwriter",
"parameter" : {
"connection":[{
"jdbcUrl":"jdbc:sqlserver://isxcode:14303;DatabaseName=ispong_db",
"schema":"ispong_schema",
"table": ["users"]
}],
"username" : "sa",
"password" : "Ispong123",
"column" : [
{
"name":"username",
"type":"varchar"
},
{
"name":"age",
"type":"int"
}
],
"writeMode":"insert"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
sqlserver离线同步到hive
schema jdbcUrl中指定不生效
{
"job": {
"content": [
{
"reader": {
"name" : "sqlserverreader",
"parameter" : {
"connection":[{
"jdbcUrl":["jdbc:sqlserver://isxcode:14303;DatabaseName=ispong_db"]
}],
"username" : "sa",
"password" : "Ispong123",
"column":[
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"customSql":"select username,age from ispong_schema.users where username = 'zhangsan'"
}
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
"username" : "ispong",
"password" : "",
"fileType" : "text",
"fieldDelimiter" : ",",
"writeMode" : "overwrite",
"partition": "pt",
"partitionType": "DAY",
"tablesColumn" : "{\"users\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
"defaultFS" : "hdfs://isxcode:9000/",
"hadoopConfig" : {
"fs.defaultFS": "hdfs://isxcode:9000/",
"hadoop.user.name": "ispong",
"dfs.client.use.datanode.hostname":"true"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
mysql离线同步到oracle
Oracle区分大小写
表名要大写,schema要大写,字段要大写
{
"job": {
"content": [
{
"reader": {
"name" : "mysqlreader",
"parameter" : {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true"]
}
],
"username" : "root",
"password" : "ispong123",
"column":[
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"customSql":"select username,age from users where username = 'zhangsan'"
}
},
"writer": {
"name" : "oraclewriter",
"parameter" : {
"connection":[{
"jdbcUrl":"jdbc:oracle:thin:@isxcode:15201:helowin",
"table": ["USERS"]
}],
"username" : "root",
"password" : "ispong123",
"column" : [
{
"name":"USERNAME",
"type":"varchar"
},
{
"name":"AGE",
"type":"int"
}
],
"writeMode":"insert"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
hive离线同步到oracle
{
"job": {
"content": [
{
"reader": {
"name" : "hdfsreader",
"parameter" : {
"path" : "/user/hive/warehouse/users/",
"defaultFS" : "hdfs://isxcode:9000",
"fileType" : "text",
"fieldDelimiter":",",
"encoding":"UTF-8",
"column": [
{
"name": "username",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"hadoopConfig":{
"dfs.client.use.datanode.hostname":"true",
"dfs.replication":"1",
"fs.defaultFS": "hdfs://isxcode:9000",
"hadoop.user.name": "ispong"
}
}
},
"writer": {
"name" : "oraclewriter",
"parameter" : {
"connection":[{
"jdbcUrl":"jdbc:oracle:thin:@isxcode:15201:helowin",
"table": ["USERS"]
}],
"username" : "root",
"password" : "ispong123",
"column" : [
{
"name":"USERNAME",
"type":"varchar"
},
{
"name":"AGE",
"type":"int"
}
],
"writeMode":"insert"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
oracle离线同步到hive
{
"job": {
"content": [
{
"reader": {
"name" : "oraclereader",
"parameter" : {
"connection":[{
"jdbcUrl":["jdbc:oracle:thin:@isxcode:15201:helowin"]
}],
"username" : "root",
"password" : "ispong123",
"column":[
{
"name": "USERNAME",
"type": "varchar"
},
{
"name": "AGE",
"type": "int"
}
],
"customSql":"select USERNAME , AGE from USERS where username = 'zhangsan'"
}
},
"writer": {
"name" : "hivewriter",
"parameter" : {
"jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
"username" : "ispong",
"password" : "",
"fileType" : "text",
"fieldDelimiter" : ",",
"writeMode" : "overwrite",
"partition": "pt",
"partitionType": "DAY",
"tablesColumn" : "{\"users\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
"defaultFS" : "hdfs://isxcode:9000/",
"hadoopConfig" : {
"fs.defaultFS": "hdfs://isxcode:9000/",
"hadoop.user.name": "ispong",
"dfs.client.use.datanode.hostname":"true"
}
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
mysql离线同步到doris
-- 查找doris的HttpPort
SHOW PROC '/frontends';
{
"job": {
"content": [
{
"reader": {
"name" : "mysqlreader",
"parameter" : {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true"]
}
],
"username" : "root",
"password" : "ispong123",
"column":[
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"customSql":"select username,age from users where username = 'zhangsan'"
}
},
"writer": {
"name": "dorisbatchwriter",
"parameter": {
"feNodes": ["isxcode:18030"],
"username": "root",
"password": "",
"database": "ispong_db",
"table": "users",
"column": [
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"batchSize": 1024,
"maxRetries": 3
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
hive离线同步到doris
{
"job": {
"content": [
{
"reader": {
"name" : "hdfsreader",
"parameter" : {
"path" : "/user/hive/warehouse/users/",
"defaultFS" : "hdfs://isxcode:9000",
"fileType" : "text",
"fieldDelimiter":",",
"encoding":"UTF-8",
"column": [
{
"name": "username",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"hadoopConfig":{
"dfs.client.use.datanode.hostname":"true",
"dfs.replication":"1",
"fs.defaultFS": "hdfs://isxcode:9000",
"hadoop.user.name": "ispong"
}
}
},
"writer": {
"name": "dorisbatchwriter",
"parameter": {
"feNodes": ["isxcode:18030"],
"username": "root",
"password": "",
"database": "ispong_db",
"table": "users",
"column": [
{
"name": "username",
"type": "varchar"
},
{
"name": "age",
"type": "int"
}
],
"batchSize": 1024,
"maxRetries": 3
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
提交作业
cd /opt/chunjun/bin
./chunjun-yarn-session.sh -job /home/ispong/test.json -confProp {\"yarn.application.id\":\"application_1665455628674_0001\"}
🔗 Links
flink chunjun离线同步
https://ispong.isxcode.com/hadoop/flink/flink chunjun离线同步/