hive UDF
Last updated on January 18, 2025 am
🧙 Questions
UDF,用户自定义函数。
案例:
使用udf编写一个函数lookup(‘F’,’SEX_CODE’)。在函数中读取redis的值,并替换值。
重点: 注意配置连接池,new JedisPool(new JedisPoolConfig(), host, port);
☄️Ideas
初始化项目
mvn archetype:generate \
-DarchetypeGroupId=org.apache.maven.archetypes \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeVersion=1.1 \
-DgroupId=com.isxcode.demo \
-DartifactId=udf-demo \
-Dversion=0.1 \
-DinteractiveMode=false \
-Dpackage=com.isxcode.demo.hive
导入依赖
注意: maven打包的java版本,要与hive的java版本保持一致。hive的版本要与服务器中hive的版本保持一致。
<project>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<hive.version>2.1.1</hive.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.1</version>
</dependency>
</dependencies>
</project>
编写代码
创建UDF类,继承GenericUDF类
com.isxcode.App
package com.isxcode.demo.hive;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import redis.clients.jedis.Jedis;
@Description(name = "redis_lookup", value = "Lookup a value in Redis", extended = "SELECT redis_lookup('key')")
public class App extends UDF {
public String evaluate(String key, String type) {
Jedis jedis = new Jedis("172.23.39.227", 30119);
String value = jedis.hget(key, type);
jedis.close();
return value;
}
}
打包上传jar
mvn clean package -Dmaven.test.skip
hadoop fs -mkdir /ispong-test
hadoop fs -put udf-demo-0.1.jar /ispong-test/
hive添加redis驱动
注意: 需要重启hive服务
cd $HIVE_HOME/lib
wget https://repo1.maven.org/maven2/redis/clients/jedis/4.3.1/jedis-4.3.1.jar
如果是cdh版本
cd /opt/cloudera/parcels/CDH/lib/hive/auxlib
wget https://repo1.maven.org/maven2/redis/clients/jedis/4.3.1/jedis-4.3.1.jar
导入函数
Note: 函数名成全小写,hdfs的路径需要ip:port且最好是内网ip,不要使用localhost
-- 进入hive会话
create function demo_func as 'com.isxcode.demo.hive.App' using jar 'hdfs://172.23.39.227:30116/ispong-test/udf-demo-0.1.jar';
udf函数常用sql
- 查找函数
show functions like 'demo*';
- 删除函数
drop function demo_func;
- 测试函数
select demo_func('vlookup','F') from dual;
解析mongo
package com.definesys.hive.function;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.parquet.Strings;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 安装手册.
*
* 打包
* mvn clean package -Dmaven.test.skip
*
* 上传jar包
* scp mongo-to-json-function.jar ispong@127.0.0.1:~/
*
* 上传hdfs
* hadoop fs -mkdir /udf-repository
* hadoop fs -put mongo-to-json-function.jar /udf-repository
*
* 进入hive终端
* hive
*
* 创建函数
* create function mongo_to_json as 'com.definesys.hive.function.MongoTran' using jar 'hdfs://172.23.39.227:30116/udf-repository/mongo-to-json-function.jar';
*
* 插入数据
* insert into users_to values ('[Document{{cou_value=null, flag=null, avg_value=6.8416, mstatus=0, pollutant_code=001, min_value=6.4542, estatus=0, avg_revised=null, astatus=0, cou_revised=null, outlet_standard=6.5-9.5, max_value=7.4811, sstatus=0}}, Document{{cou_value=0.7354, flag=null, avg_value=6.1869, mstatus=0, pollutant_code=011, min_value=4.3890, estatus=0, avg_revised=null, astatus=0, cou_revised=null, outlet_standard=65, max_value=8.1850, sstatus=0}}, Document{{cou_value=0.0558, flag=null, avg_value=0.4315, mstatus=0, pollutant_code=060, min_value=0.1020, estatus=0, avg_revised=null, astatus=0, cou_revised=null, outlet_standard=25, max_value=0.6950, sstatus=0}}, Document{{cou_value=121.4165, flag=null, avg_value=1.4053, mstatus=0, pollutant_code=B01, min_value=0.0000, estatus=0, avg_revised=null, astatus=0, cou_revised=null, outlet_standard=null, max_value=9.3500, sstatus=0}}]', 30);
*
* 使用函数
* select mongo_to_json(username) from users_to where age = 30
*/
@Description(name = "mongo_to_json", value = "translate mongo to json", extended = "SELECT mongo_to_json(key)")
public class MongoTran extends UDF {
public String evaluate(String key) {
Pattern pattern = Pattern.compile("Document\\{\\{(.+?)\\}\\}");
Matcher matcher = pattern.matcher(key);
List<String> objList = new ArrayList<>();
while (matcher.find()) {
String objectStr = matcher.group(1);
String[] cols = objectStr.split(",");
List<String> colList = new ArrayList<>();
for (int i = 0; i < cols.length; i++) {
int length = cols[i].split("=")[0].length();
colList.add("\"" + cols[i].substring(0, length).trim() + "\":\"" + cols[i].substring(length + 1, cols[i].length()) + "\"");
}
objList.add("{" + Strings.join(colList, ",") + "}");
}
if (objList.size() > 1) {
return "[" + Strings.join(objList, ",") + "]";
} else if (objList.size() == 1) {
return objList.get(0);
}else{
return "";
}
}
}
🔗 Links
hive UDF
https://ispong.isxcode.com/hadoop/hive/hive UDF/