flink chunjun安装

Last updated on January 18, 2025 am

🧙 Questions

安装编译chunjun,并使用chunjun实现mysql同步到mysql的功能

☄️ Ideas

前提

编译源码

最好在linux上编译,不推荐使用windows环境编译
gitee: https://gitee.com/dtstack_dev_0/chunjun.git
建议切换到v1.12.6版本

git clone https://github.com/DTStack/chunjun.git
cd chunjun
git checkout v1.12.6

也可以使用脚本:
sed -i ‘s/\r//‘ build/build.sh
sh build/build.sh

mvn clean package -DskipTests -U

20220922172831

打包路径: /home/ispong/chunjun/chunjun-assembly/target

20220922172913

解压安装chunjun

cp /home/ispong/chunjun/chunjun-assembly/target/chunjun-dist-1.12-SNAPSHOT.tar.gz /home/ispong/chunjun-dist.tar.gz
sudo mkdir -p /data/chunjun/chunjun-1.12
sudo chown -R ispong:ispong /data/chunjun/chunjun-1.12
cd /home/ispong
tar -vzxf chunjun-dist.tar.gz -C /data/chunjun/chunjun-1.12/
sudo ln -s /data/chunjun/chunjun-1.12 /opt/chunjun
chmod -R a+x /opt/chunjun/bin

复制chunjun-dist

注意: 一定要把 /opt/chunjun/lib/ 下面的jar包删除,然后将chunjun-clients下的包放进去

# cp -r /home/ispong/chunjun/chunjun-dist /opt/chunjun/ 
rm -rf /opt/chunjun/lib/*
cp /home/ispong/chunjun/chunjun-clients/target/chunjun-clients.jar /opt/chunjun/lib/  


# 其他的包
rm /opt/chunjun/chunjun-dist/connector/hive/chunjun-connector-hive.jar
rm /opt/chunjun/chunjun-dist/connector/hive3/chunjun-connector-hive3.jar
cp /opt/chunjun/chunjun-dist/connector/mysqlcdc/flink-connector-mysql-cdc.jar /opt/flink/lib
cp /opt/chunjun/chunjun-dist/connector/postgrecdc/flink-connector-postgre-cdc.jar /opt/flink/lib

建议使用chunjun官方推荐flink版本 1.12.7

nohup wget https://archive.apache.org/dist/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz >> download_flink.log 2>&1 &
tail -f download_flink.log
sudo mkdir -p /data/flink
sudo chown -R ispong:ispong /data/flink/
tar -vzxf flink-1.12.7-bin-scala_2.12.tgz -C /data/flink/
sudo ln -s /data/flink/flink-1.12.7 /opt/flink
sudo vim /opt/flink/conf/flink-conf.yaml

# 端口号一定要注释掉!!!
# rest.port: 8081
添加chunjun官方指定依赖
cd /opt/flink/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2/2.7.5-10.0/flink-shaded-hadoop-2-2.7.5-10.0.jar
# 下面依赖,根据报错中依赖的缺失,添加额外依赖 (可选)
wget https://repo1.maven.org/maven2/commons-configuration/commons-configuration/1.10/commons-configuration-1.10.jar
wget https://repo1.maven.org/maven2/commons-lang/commons-lang/2.6/commons-lang-2.6.jar
wget https://repo1.maven.org/maven2/org/apache/htrace/htrace-core/3.1.0-incubating/htrace-core-3.1.0-incubating.jar

配置环境变量

sudo vim /etc/profile

一定要配置 HADOOP_HOMEFLINK_HOME环境变量
HADOOP_CLASSPATH路径最后一定要带*

export HADOOP_CLASSPATH=/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/* 
export HADOOP_HOME=/opt/hadoop
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop
export CHUNJUN_HOME=/opt/chunjun/chunjun-dist
export FLINK_HOME=/opt/flink
source /etc/profile

运行作业

vim mysql_to_mysql.json
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "username",
                "type": "string"
              }
            ],
            "customSql": "",
            "where": "id < 1000",
            "splitPk": "id",
            "increColumn": "id",
            "startLocation": "2",
            "polling": true,
            "pollingInterval": 3000,
            "queryTimeOut": 1000,
            "username": "test",
            "password": "test123",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://isxcode:3306/test"
                ],
                "table": [
                  "user1"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "test",
            "password": "test123",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://isxcode:3306/test",
                "table": [
                  "user4"
                ]
              }
            ],
            "writeMode": "insert",
            "flushIntervalMills":"3000",
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "username",
                "type": "string"
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "restoreColumnName": "id"
      },
      "speed": {
        "channel": 1,
        "bytes": 0
      }
    }
  }
}
Yarn Session 模式

停止flink yarn session: yarn application -kill application_1663749280461_0055

cd /opt/flink/bin
./yarn-session.sh -t /opt/chunjun/chunjun-dist -d

20220922182647

20220922182656

获取yarn的应用id application_1663820824332_0005

20220922182754

cd /opt/chunjun/bin
./chunjun-yarn-session.sh -job /home/ispong/mysql_to_mysql.json -confProp {\"yarn.application.id\":\"application_1663899167778_0001\"}
Yarn Per-Job 模式
cd /opt/chunjun/chunjun-dist
bash ../bin/chunjun-yarn-perjob.sh -job /home/ispong/ispong_chunjun.json

20220922195322

20220922195258


flink chunjun安装
https://ispong.isxcode.com/hadoop/flink/flink chunjun安装/
Author
ispong
Posted on
September 20, 2022
Licensed under