flink kafka同步到hive
Last updated on January 18, 2025 am
🧙 Questions
kafka同步到hive
Note:
分割符默认是 , 要注意表的分割符
☄️ Ideas
添加依赖
<properties>
<hive.version>3.1.2</hive.version>
</properties>
<dependencies>
<!-- flink-connector-hive_2.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
代码
package com.isxcode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import static org.apache.flink.table.api.Expressions.$;
public class Demo {
public static void main(String[] args) {
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(5000);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,settings);
tEnv.getConfig().getConfiguration().setString("pipeline.name", "isxcode-pipeline");
// --- kafka ---
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE TABLE from_kafka (\n" +
" username STRING," +
" age INT" +
") WITH (\n" +
" 'connector'='kafka'," +
" 'topic'='ispong_kafka'," +
" 'properties.zookeeper.connect'='isxcode:30121'," +
" 'properties.bootstrap.servers'='isxcode:30120'," +
" 'format'='csv'" +
")");
// --- insert ---
Table fromData = tEnv.from("from_kafka");
fromData = fromData.where($("age").isGreater(20));
fromData = fromData.select($("username").as("username"), $("age").as("age"));
// --- hive ---
String name = "to_hive";
String defaultDatabase = "cdh_dev";
String hiveConfDir = "/opt/flink/conf";
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tEnv.registerCatalog(name, hive);
tEnv.useCatalog(name);
tEnv.createTemporaryView("from_kafka_tmp", fromData);
// tEnv.executeSql("insert into ispong_table (username,age) values('zhangsan',4)");
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("INSERT INTO ispong_table SELECT username,age FROM from_kafka_tmp");
// tEnv.executeSql("select * from to_hive.cdh_dev.ispong_table").print();
}
}
下载依赖
sudo wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.11/1.12.3/flink-sql-connector-hive-2.2.0_2.11-1.12.3.jar -O /opt/flink/lib/flink-sql-connector-hive-2.2.0_2.11-1.12.3.jar
# sudo cp /home/ispong/.m2/repository/org/apache/flink/flink-connector-hive_2.11/1.12.3/flink-connector-hive_2.11-1.12.3.jar /opt/flink/lib/
配置hadoop jar包
# 对应版本生效
# sudo wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar -O /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
sudo cp /opt/cloudera/parcels/CDH/jars/hadoop*.jar /opt/flink/lib/
sudo cp /opt/cloudera/parcels/CDH/jars/hive*.jar /opt/flink/lib/
格外配置hive配置文件
sudo vim /opt/flink/conf/hive-site.xml
# --- vim /opt/flink/conf/hive-site.xml ---
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files -->
<!-- that are implied by Hadoop setup variables. -->
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive -->
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
<!-- resource). -->
<!-- Hive Execution Parameters -->
<!-- 配置数据库的url -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://isxcode:30102/cdh_hive</value>
</property>
<!-- 配置数据源驱动 -->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>
<!-- 配置数据源账号 (要较高权限)-->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>ispong</value>
</property>
<!-- 配置数据源密码 -->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>ispong2021</value>
</property>
<!-- hive.metastore.port -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://isxcode:30123</value>
</property>
</configuration>
# --- vim /opt/flink/conf/hive-site.xml ---
配置flink
sudo vim /opt/flink/conf/flink-conf.yaml
# --- sudo vim /opt/flink/conf/flink-conf.yaml ---
classloader.resolve-order: parent-first
classloader.check-leaked-classloader: false
# --- sudo vim /opt/flink/conf/flink-conf.yaml ---
打包运行
mvn clean package
flink run flink-demo-0.1.jar
🔗 Links
flink kafka同步到hive
https://ispong.isxcode.com/hadoop/flink/flink kafka同步到hive/