flink chunjun离线同步

Last updated on September 15, 2024 pm

🧙 Questions

☄️ Ideas

前提

初始化表

  • mysql
create table users
(
    username varchar(100),
    age      int
)
  • hive
create table users
(
    username string,
    age      int
)
PARTITIONED BY ( pt STRING ) 
ROW FORMAT 
DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
  • postgre
CREATE TABLE IF NOT EXISTS users
(
    username varchar,
    age      int   
);

通用模板

{
  "job": {
    "content": [
      {
        "reader": { // reader插件详细配置
          "name" : "xxwriter", // reader 插件名称,具体名称参考各数据源配置文档
          "parameter" : { // 数据源配置参数,具体配置参考各数据源配置文档

          }
        }, 
        "writer": {} // writer插件详细配置
      }
    ],
    "setting": {
      "speed": { // 速率限制
        "channel": 1, // 任务并发数 默认为1 
        "readerChannel": -1, // source 并行度,默认-1代表采用全局并行度
        "writerChannel": -1, // sink 并行度,默认-1代表采用全局并行度
        "bytes": 0, // bytes >0 则表示开启任务限速
        "rebalance" : false // 是否强制进行 rebalance,开启会消耗性能,默认false
      }, 
      "errorLimit": { // 出错控制
        "record": 0, // 错误阈值,当错误记录数超过此阈值时任务失败,默认值为0
        "percentage": 0.0 // 错误比例阈值,当错误记录比例超过此阈值时任务失败,默认值0.0
      }, 
      "metricPluginConf": {}, // 指标插件配置,可以接入普罗米修斯,进行实时监控
      "restore": { // 任务类型及断点续传配置
        "isStream" : false, // 是否为实时采集任务,默认false
        "isRestore" : false, // 是否开启断点续传,默认false
        "restoreColumnName" : "", // 断点续传字段名称,isRestore为true则必填
        "restoreColumnIndex" : 0 // 断点续传字段索引 ID,isRestore为true则必填
      }, 
      "log": { // 日志记录配置
        "isLogger": false, // 是否保存日志记录,默认值false
        "level" : "info", // 日志级别,默认info
        "path" : "/tmp/dtstack/chunjun/", // 服务器上日志保存路径,默认值/tmp/dtstack/chunjun/
        "pattern":" log4j:%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n
                    logback : %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" // 日志输出格式
      }, // 日志记录配置
      "dirty": {} // 脏数据保存
    }
  }
}

mysql离线同步到hive

reader的name和writer的name,必须和chunjun中插件的名称保持一致。
hivewriter中操作的hive表,必须为分区表,默认是pt,默认分区类型为DAY。
自定义sql,需要指定清楚字段的类型,不能使用普通逗号隔开。
mysqlreader的jdbcUrl,必须写在connection里面,否则语法报错。

{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "mysqlreader", 
          "parameter" : {
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true"]
              }
            ],
            "username" : "root",
            "password" : "ispong123",
            "column":[
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql":"select username,age from users where username = 'zhangsan'"
          }
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
            "username" : "ispong",
            "password" : "",
            "fileType" : "text",
            "fieldDelimiter" : ",",
            "writeMode" : "overwrite",
            "partition": "pt",
            "partitionType": "DAY",
            "tablesColumn" : "{\"users1\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
            "defaultFS" : "hdfs://isxcode:9000/",
            "hadoopConfig" : {
              "fs.defaultFS": "hdfs://isxcode:9000/",
              "hadoop.user.name": "ispong"
            }
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
hive离线同步到mysql

hive没有source,只能用hdfs作为数据来源
hdfswriter的fileType,只支持text、orc、parquet

{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "hdfsreader", 
          "parameter" : {
            "path" : "hdfs://isxcode:9000/user/hive/warehouse/users1/pt=20221009",
            "defaultFS" : "hdfs://isxcode:9000",
            "fileType" : "text",
            "fieldDelimiter":",",
            "encoding":"UTF-8",
            "column": [
              {
                "name": "username",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ]
          }
        },
        "writer": {
          "name" : "mysqlwriter",
          "parameter" : {
            "connection":[{
              "jdbcUrl":"jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true",
              "table": ["users"]
            }],
            "username" : "root",
            "password" : "ispong123",
            "column" : [
              {
                "name":"username",
                "type":"varchar"
              },
              {
                "name":"age",
                "type":"int"
              }
            ],
            "mode":"insert"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
mysql离线同步到mysql
{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "mysqlreader", 
          "parameter" : {
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true"]
              }
            ],
            "username" : "root",
            "password" : "ispong123",
            "column":[
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql":"select username,age from users where username = 'zhangsan'"
          }
        },
        "writer": {
          "name" : "mysqlwriter",
          "parameter" : {
            "connection":[{
              "jdbcUrl":"jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true",
              "table": ["users_sink"]
            }],
            "username" : "root",
            "password" : "ispong123",
            "column" : [
              {
                "name":"username",
                "type":"varchar"
              },
              {
                "name":"age",
                "type":"int"
              }
            ],
            "mode":"insert"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
hive离线同步到hive

hivewriter是走内网或者域名的方式将数据写到自己的hdfs中,所以需要在hdfsreader服务器中配置好域名映射
提前将writer中hive表权限,临时改为reader中执行用户
hadoop fs -chown -R ispong:hive /user/hive/warehouse/ispong_db.db/users

{
  "job": {
    "content": [
      {
       "reader": {
          "name" : "hdfsreader", 
          "parameter" : {
            "path" : "/user/hive/warehouse/ispong_db.db/users/",
            "defaultFS" : "hdfs://isxcode:9000",
            "fileType" : "text",
            "fieldDelimiter":",",
            "encoding":"UTF-8",
            "column": [
              {
                "name": "username",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "hadoopConfig":{
              "dfs.client.use.datanode.hostname":"true",
              "dfs.replication":"1",
              "fs.defaultFS": "hdfs://isxcode:9000",
              "hadoop.user.name": "ispong"
            }
          }
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
            "username" : "ispong",
            "password" : "",
            "fileType" : "text",
            "fieldDelimiter" : ",",
            "writeMode" : "overwrite",
            "partition": "pt",
            "partitionType": "DAY",
            "tablesColumn" : "{\"users\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
            "defaultFS" : "hdfs://isxcode:9000"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
postgre离线同步到mysql
{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "postgresqlreader", 
          "parameter" : {
            "connection": [
              {
                "jdbcUrl": ["jdbc:postgresql://isxcode:54302/ispong_db?currentSchema=ispong_schema"]
              }
            ],
            "username" : "postgres",
            "password" : "ispong123",
            "column":[
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql":"select username,age from users where username = 'zhangsan'"
          }
        },
        "writer": {
          "name" : "mysqlwriter",
          "parameter" : {
            "connection":[{
              "jdbcUrl":"jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true",
              "table": ["users"]
            }],
            "username" : "root",
            "password" : "ispong123",
            "column" : [
              {
                "name":"username",
                "type":"varchar"
              },
              {
                "name":"age",
                "type":"int"
              }
            ],
            "mode":"insert"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
postgre离线同步到hive
{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "postgresqlreader", 
          "parameter" : {
            "connection": [
              {
                "jdbcUrl": ["jdbc:postgresql://isxcode:54302/ispong_db?currentSchema=ispong_schema"]
              }
            ],
            "username" : "postgres",
            "password" : "ispong123",
            "column":[
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql":"select username,age from users where username = 'zhangsan'"
          }
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
            "username" : "ispong",
            "password" : "",
            "fileType" : "text",
            "fieldDelimiter" : ",",
            "writeMode" : "overwrite",
            "partition": "pt",
            "partitionType": "DAY",
            "tablesColumn" : "{\"users\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
            "defaultFS" : "hdfs://isxcode:9000/",
            "hadoopConfig" : {
              "fs.defaultFS": "hdfs://isxcode:9000/",
              "hadoop.user.name": "ispong",
              "dfs.client.use.datanode.hostname":"true"
            }
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
hive离线同步到postgre
{
  "job": {
    "content": [
      {
         "reader": {
          "name" : "hdfsreader", 
          "parameter" : {
            "path" : "/user/hive/warehouse/users/",
            "defaultFS" : "hdfs://isxcode:9000",
            "fileType" : "text",
            "fieldDelimiter":",",
            "encoding":"UTF-8",
            "column": [
              {
                "name": "username",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "hadoopConfig":{
              "dfs.client.use.datanode.hostname":"true",
              "dfs.replication":"1",
              "fs.defaultFS": "hdfs://isxcode:9000",
              "hadoop.user.name": "ispong"
            }
          }
        },
        "writer": {
          "name" : "postgresqlwriter",
          "parameter" : {
             "connection": [
              {
                "jdbcUrl": "jdbc:postgresql://isxcode:54302/ispong_db?currentSchema=ispong_schema",
                "schema":"ispong_schema",
                "table":["users"]
              }
            ],
            "username" : "postgres",
            "password" : "ispong123",
             "column":[
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "writeMode" : "insert"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
hbase离线同步到hive
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "hbasereader",
          "parameter": {
            "hbaseConfig": {
              "hbase.zookeeper.property.clientPort": "2181",
              "hbase.rootdir": "hdfs://isxcode:9000/hbase",
              "hbase.cluster.distributed": "true",
              "hbase.zookeeper.quorum": "isxcode",
              "zookeeper.znode.parent": "/hbase"
            },
            "table" : "ispong_namespace:users",
            "column": [
              {
                "name": "cf1:username",
                "type": "string"
              },
              {
                "name": "cf1:age",
                "type": "int"
              }
            ],
            "startRowkey": "",
            "endRowkey": "",
            "isBinaryRowkey": true,
            "encoding": "utf-8"
          }
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
            "username" : "ispong",
            "password" : "",
            "fileType" : "text",
            "fieldDelimiter" : ",",
            "writeMode" : "overwrite",
            "partition": "pt",
            "partitionType": "DAY",
            "tablesColumn" : "{\"users\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
            "defaultFS" : "hdfs://isxcode:9000/",
            "hadoopConfig" : {
              "fs.defaultFS": "hdfs://isxcode:9000/",
              "hadoop.user.name": "ispong",
              "dfs.client.use.datanode.hostname":"true"
            }
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
hive离线同步到hbase

hbasewriter中int类型写入时会乱码

{
  "job": {
    "content": [
      {
         "reader": {
          "name" : "hdfsreader", 
          "parameter" : {
            "path" : "/user/hive/warehouse/users/",
            "defaultFS" : "hdfs://isxcode:9000",
            "fileType" : "text",
            "fieldDelimiter":",",
            "encoding":"UTF-8",
            "column": [
              {
                "name": "username",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "hadoopConfig":{
              "dfs.client.use.datanode.hostname":"true",
              "dfs.replication":"1",
              "fs.defaultFS": "hdfs://isxcode:9000",
              "hadoop.user.name": "ispong"
            }
          }
        },
        "writer": {
          "name" : "hbasewriter",
          "parameter" : {
            "hbaseConfig": {
              "hbase.zookeeper.property.clientPort": "2181",
              "hbase.rootdir": "hdfs://isxcode:9000/hbase",
              "hbase.cluster.distributed": "true",
              "hbase.zookeeper.quorum": "isxcode",
              "zookeeper.znode.parent": "/hbase"
            },
            "table" : "ispong_namespace:users",
            "rowkeyExpress" : "$(cf1:username)_$(cf1:age)",
            "column": [
              {
                "name": "cf1:username",
                "type": "string"
              },
              {
                "name": "cf1:age",
                "type": "string"
              }
            ],
            "encoding":"utf-8"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
mysql离线同步到sqlserver
{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "mysqlreader", 
          "parameter" : {
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true"]
              }
            ],
            "username" : "root",
            "password" : "ispong123",
            "column":[
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql":"select username,age from users where username = 'zhangsan'"
          }
        },
        "writer": {
          "name" : "sqlserverwriter",
          "parameter" : {
            "connection":[{
              "jdbcUrl":"jdbc:sqlserver://isxcode:14303;DatabaseName=ispong_db",
              "schema":"ispong_schema",
              "table": ["users"]
            }],
            "username" : "sa",
            "password" : "Ispong123",
            "column" : [
              {
                "name":"username",
                "type":"varchar"
              },
              {
                "name":"age",
                "type":"int"
              }
            ],
            "writeMode":"insert"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
hive离线同步到sqlserver
{
  "job": {
    "content": [
      {
         "reader": {
          "name" : "hdfsreader", 
          "parameter" : {
            "path" : "/user/hive/warehouse/users/",
            "defaultFS" : "hdfs://isxcode:9000",
            "fileType" : "text",
            "fieldDelimiter":",",
            "encoding":"UTF-8",
            "column": [
              {
                "name": "username",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "hadoopConfig":{
              "dfs.client.use.datanode.hostname":"true",
              "dfs.replication":"1",
              "fs.defaultFS": "hdfs://isxcode:9000",
              "hadoop.user.name": "ispong"
            }
          }
        },
        "writer": {
          "name" : "sqlserverwriter",
          "parameter" : {
            "connection":[{
              "jdbcUrl":"jdbc:sqlserver://isxcode:14303;DatabaseName=ispong_db",
              "schema":"ispong_schema",
              "table": ["users"]
            }],
            "username" : "sa",
            "password" : "Ispong123",
            "column" : [
              {
                "name":"username",
                "type":"varchar"
              },
              {
                "name":"age",
                "type":"int"
              }
            ],
            "writeMode":"insert"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
sqlserver离线同步到hive

schema jdbcUrl中指定不生效

{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "sqlserverreader", 
          "parameter" : {
            "connection":[{
              "jdbcUrl":["jdbc:sqlserver://isxcode:14303;DatabaseName=ispong_db"]
            }],
            "username" : "sa",
            "password" : "Ispong123",
            "column":[
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql":"select username,age from ispong_schema.users where username = 'zhangsan'"
          }
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
            "username" : "ispong",
            "password" : "",
            "fileType" : "text",
            "fieldDelimiter" : ",",
            "writeMode" : "overwrite",
            "partition": "pt",
            "partitionType": "DAY",
            "tablesColumn" : "{\"users\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
            "defaultFS" : "hdfs://isxcode:9000/",
            "hadoopConfig" : {
              "fs.defaultFS": "hdfs://isxcode:9000/",
              "hadoop.user.name": "ispong",
              "dfs.client.use.datanode.hostname":"true"
            }
          }
        } 
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
mysql离线同步到oracle

Oracle区分大小写
表名要大写,schema要大写,字段要大写

{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "mysqlreader", 
          "parameter" : {
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true"]
              }
            ],
            "username" : "root",
            "password" : "ispong123",
            "column":[
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql":"select username,age from users where username = 'zhangsan'"
          }
        },
        "writer": {
          "name" : "oraclewriter",
          "parameter" : {
            "connection":[{
              "jdbcUrl":"jdbc:oracle:thin:@isxcode:15201:helowin",
              "table": ["USERS"]
            }],
            "username" : "root",
            "password" : "ispong123",
            "column" : [
              {
                "name":"USERNAME",
                "type":"varchar"
              },
              {
                "name":"AGE",
                "type":"int"
              }
            ],
            "writeMode":"insert"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
hive离线同步到oracle
{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "hdfsreader", 
          "parameter" : {
            "path" : "/user/hive/warehouse/users/",
            "defaultFS" : "hdfs://isxcode:9000",
            "fileType" : "text",
            "fieldDelimiter":",",
            "encoding":"UTF-8",
            "column": [
              {
                "name": "username",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "hadoopConfig":{
              "dfs.client.use.datanode.hostname":"true",
              "dfs.replication":"1",
              "fs.defaultFS": "hdfs://isxcode:9000",
              "hadoop.user.name": "ispong"
            }
          }
        },
        "writer": {
          "name" : "oraclewriter",
          "parameter" : {
            "connection":[{
              "jdbcUrl":"jdbc:oracle:thin:@isxcode:15201:helowin",
              "table": ["USERS"]
            }],
            "username" : "root",
            "password" : "ispong123",
            "column" : [
              {
                "name":"USERNAME",
                "type":"varchar"
              },
              {
                "name":"AGE",
                "type":"int"
              }
            ],
            "writeMode":"insert"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
oracle离线同步到hive
{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "oraclereader", 
          "parameter" : {
            "connection":[{
              "jdbcUrl":["jdbc:oracle:thin:@isxcode:15201:helowin"]
            }],
            "username" : "root",
            "password" : "ispong123",
            "column":[
              {
                "name": "USERNAME",
                "type": "varchar"
              },
              {
                "name": "AGE",
                "type": "int"
              }
            ],
            "customSql":"select USERNAME , AGE from USERS where username = 'zhangsan'"
          }
        },
        "writer": {
          "name" : "hivewriter",
          "parameter" : {
            "jdbcUrl" : "jdbc:hive2://isxcode:10000/default",
            "username" : "ispong",
            "password" : "",
            "fileType" : "text",
            "fieldDelimiter" : ",",
            "writeMode" : "overwrite",
            "partition": "pt",
            "partitionType": "DAY",
            "tablesColumn" : "{\"users\":[{\"key\":\"username\",\"type\":\"string\"},{\"key\":\"age\",\"type\":\"int\"}]}",
            "defaultFS" : "hdfs://isxcode:9000/",
            "hadoopConfig" : {
              "fs.defaultFS": "hdfs://isxcode:9000/",
              "hadoop.user.name": "ispong",
              "dfs.client.use.datanode.hostname":"true"
            }
          }
        } 
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
mysql离线同步到doris
-- 查找doris的HttpPort
SHOW PROC '/frontends';
{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "mysqlreader", 
          "parameter" : {
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://isxcode:30306/ispong_db?useSSL=false&allowPublicKeyRetrieval=true"]
              }
            ],
            "username" : "root",
            "password" : "ispong123",
            "column":[
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "customSql":"select username,age from users where username = 'zhangsan'"
          }
        },
        "writer": {
          "name": "dorisbatchwriter",
          "parameter": {
            "feNodes": ["isxcode:18030"],
            "username": "root",
            "password": "",
            "database": "ispong_db",
            "table": "users",
            "column": [
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "batchSize": 1024,
            "maxRetries": 3
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
hive离线同步到doris
{
  "job": {
    "content": [
      {
        "reader": {
          "name" : "hdfsreader", 
          "parameter" : {
            "path" : "/user/hive/warehouse/users/",
            "defaultFS" : "hdfs://isxcode:9000",
            "fileType" : "text",
            "fieldDelimiter":",",
            "encoding":"UTF-8",
            "column": [
              {
                "name": "username",
                "type": "string"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "hadoopConfig":{
              "dfs.client.use.datanode.hostname":"true",
              "dfs.replication":"1",
              "fs.defaultFS": "hdfs://isxcode:9000",
              "hadoop.user.name": "ispong"
            }
          }
        },
        "writer": {
          "name": "dorisbatchwriter",
          "parameter": {
            "feNodes": ["isxcode:18030"],
            "username": "root",
            "password": "",
            "database": "ispong_db",
            "table": "users",
            "column": [
              {
                "name": "username",
                "type": "varchar"
              },
              {
                "name": "age",
                "type": "int"
              }
            ],
            "batchSize": 1024,
            "maxRetries": 3
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
提交作业
cd /opt/chunjun/bin
./chunjun-yarn-session.sh -job /home/ispong/test.json -confProp {\"yarn.application.id\":\"application_1665455628674_0001\"}

flink chunjun离线同步
https://ispong.isxcode.com/hadoop/flink/flink chunjun离线同步/
Author
ispong
Posted on
October 9, 2022
Licensed under