flink kafka同步到hive

Last updated on September 15, 2024 pm

🧙 Questions

kafka同步到hive
Note:
分割符默认是 , 要注意表的分割符

☄️ Ideas

添加依赖
<properties>
  <hive.version>3.1.2</hive.version>
</properties>

<dependencies>
  
  <!-- flink-connector-hive_2.11 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.11</artifactId>
    <version>1.12.3</version>
  </dependency>

  <!-- Hive Dependency -->
  <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
  </dependency>

</dependencies>
代码
package com.isxcode;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import static org.apache.flink.table.api.Expressions.$;

public class Demo {
    public static void main(String[] args) {

        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        bsEnv.enableCheckpointing(5000);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,settings);
        tEnv.getConfig().getConfiguration().setString("pipeline.name", "isxcode-pipeline");

        // --- kafka ---
        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tEnv.executeSql("CREATE TABLE from_kafka (\n" +
                "   username STRING," +
                "   age INT" +
                ") WITH (\n" +
                "   'connector'='kafka'," +
                "   'topic'='ispong_kafka'," +
                "   'properties.zookeeper.connect'='isxcode:30121'," +
                "   'properties.bootstrap.servers'='isxcode:30120'," +
                "   'format'='csv'" +
                ")");

        // --- insert ---
        Table fromData = tEnv.from("from_kafka");

        fromData = fromData.where($("age").isGreater(20));
        fromData = fromData.select($("username").as("username"), $("age").as("age"));

        // --- hive ---
        String name = "to_hive";
        String defaultDatabase = "cdh_dev";
        String hiveConfDir = "/opt/flink/conf";

        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tEnv.registerCatalog(name, hive);
        tEnv.useCatalog(name);

        tEnv.createTemporaryView("from_kafka_tmp", fromData);
//        tEnv.executeSql("insert into ispong_table (username,age) values('zhangsan',4)");

        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        tEnv.executeSql("INSERT INTO ispong_table SELECT username,age FROM from_kafka_tmp");
//        tEnv.executeSql("select * from to_hive.cdh_dev.ispong_table").print();
    }

}
下载依赖
sudo wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.11/1.12.3/flink-sql-connector-hive-2.2.0_2.11-1.12.3.jar -O /opt/flink/lib/flink-sql-connector-hive-2.2.0_2.11-1.12.3.jar

# sudo cp /home/ispong/.m2/repository/org/apache/flink/flink-connector-hive_2.11/1.12.3/flink-connector-hive_2.11-1.12.3.jar /opt/flink/lib/
配置hadoop jar包
# 对应版本生效
# sudo wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar -O /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

sudo cp /opt/cloudera/parcels/CDH/jars/hadoop*.jar /opt/flink/lib/
sudo cp /opt/cloudera/parcels/CDH/jars/hive*.jar /opt/flink/lib/
格外配置hive配置文件
sudo vim /opt/flink/conf/hive-site.xml

# --- vim /opt/flink/conf/hive-site.xml ---
<?xml version="1.0"?>
<!--
       Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files  -->
<!-- that are implied by Hadoop setup variables.                                                -->
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive    -->
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
<!-- resource).                                                                                 -->

<!-- Hive Execution Parameters -->


  <!-- 配置数据库的url -->
  <property>
      <name>javax.jdo.option.ConnectionURL</name>
      <value>jdbc:mysql://isxcode:30102/cdh_hive</value>
  </property>
      
  <!-- 配置数据源驱动 -->
  <property>
      <name>javax.jdo.option.ConnectionDriverName</name>
      <value>com.mysql.cj.jdbc.Driver</value>
  </property>
  
  <!-- 配置数据源账号 (要较高权限)-->
  <property>
      <name>javax.jdo.option.ConnectionUserName</name>
      <value>ispong</value>
  </property>
  
  <!-- 配置数据源密码 -->
  <property>
      <name>javax.jdo.option.ConnectionPassword</name>
      <value>ispong2021</value>
  </property>

  <!-- hive.metastore.port -->
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://isxcode:30123</value>
  </property>

</configuration>
# --- vim /opt/flink/conf/hive-site.xml ---
sudo vim /opt/flink/conf/flink-conf.yaml

# --- sudo vim /opt/flink/conf/flink-conf.yaml ---
classloader.resolve-order: parent-first
classloader.check-leaked-classloader: false
# --- sudo vim /opt/flink/conf/flink-conf.yaml ---
打包运行
mvn clean package
flink run flink-demo-0.1.jar

flink kafka同步到hive
https://ispong.isxcode.com/hadoop/flink/flink kafka同步到hive/
Author
ispong
Posted on
August 9, 2021
Licensed under