hive UDF

Last updated on September 15, 2024 pm

🧙 Questions

UDF,用户自定义函数。
案例:
使用udf编写一个函数lookup(‘F’,’SEX_CODE’)。在函数中读取redis的值,并替换值。
重点: 注意配置连接池,new JedisPool(new JedisPoolConfig(), host, port);

☄️Ideas

初始化项目

mvn archetype:generate \
  -DarchetypeGroupId=org.apache.maven.archetypes \
  -DarchetypeArtifactId=maven-archetype-quickstart \
  -DarchetypeVersion=1.1 \
  -DgroupId=com.isxcode.demo \
  -DartifactId=udf-demo \
  -Dversion=0.1 \
  -DinteractiveMode=false \
  -Dpackage=com.isxcode.demo.hive

导入依赖

注意: maven打包的java版本,要与hive的java版本保持一致。hive的版本要与服务器中hive的版本保持一致。

<project>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <hive.version>2.1.1</hive.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>4.3.1</version>
        </dependency>

    </dependencies>
</project>

编写代码

创建UDF类,继承GenericUDF类 com.isxcode.App

package com.isxcode.demo.hive;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import redis.clients.jedis.Jedis;

@Description(name = "redis_lookup", value = "Lookup a value in Redis", extended = "SELECT redis_lookup('key')")
public class App extends UDF {

    public String evaluate(String key, String type) {

        Jedis jedis = new Jedis("172.23.39.227", 30119);
        String value = jedis.hget(key, type);
        jedis.close();
        return value;
    }
}

打包上传jar

mvn clean package -Dmaven.test.skip
hadoop fs -mkdir /ispong-test
hadoop fs -put udf-demo-0.1.jar /ispong-test/

hive添加redis驱动

注意: 需要重启hive服务

cd $HIVE_HOME/lib
wget https://repo1.maven.org/maven2/redis/clients/jedis/4.3.1/jedis-4.3.1.jar

如果是cdh版本

cd /opt/cloudera/parcels/CDH/lib/hive/auxlib
wget https://repo1.maven.org/maven2/redis/clients/jedis/4.3.1/jedis-4.3.1.jar

导入函数

Note: 函数名成全小写,hdfs的路径需要ip:port且最好是内网ip,不要使用localhost

-- 进入hive会话
create function demo_func as 'com.isxcode.demo.hive.App' using jar 'hdfs://172.23.39.227:30116/ispong-test/udf-demo-0.1.jar';

udf函数常用sql

  • 查找函数
show functions like 'demo*';
  • 删除函数
drop function demo_func;
  • 测试函数
select demo_func('vlookup','F') from dual;
解析mongo
package com.definesys.hive.function;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.parquet.Strings;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 安装手册.
 *
 * 打包
 * mvn clean package -Dmaven.test.skip
 *
 * 上传jar包
 * scp mongo-to-json-function.jar ispong@127.0.0.1:~/
 *
 * 上传hdfs
 * hadoop fs -mkdir /udf-repository
 * hadoop fs -put mongo-to-json-function.jar /udf-repository
 *
 * 进入hive终端
 * hive
 *
 * 创建函数
 * create function mongo_to_json as 'com.definesys.hive.function.MongoTran' using jar 'hdfs://172.23.39.227:30116/udf-repository/mongo-to-json-function.jar';
 *
 * 插入数据
 * insert into users_to values ('[Document{{cou_value=null, flag=null, avg_value=6.8416, mstatus=0, pollutant_code=001, min_value=6.4542, estatus=0, avg_revised=null, astatus=0, cou_revised=null, outlet_standard=6.5-9.5, max_value=7.4811, sstatus=0}}, Document{{cou_value=0.7354, flag=null, avg_value=6.1869, mstatus=0, pollutant_code=011, min_value=4.3890, estatus=0, avg_revised=null, astatus=0, cou_revised=null, outlet_standard=65, max_value=8.1850, sstatus=0}}, Document{{cou_value=0.0558, flag=null, avg_value=0.4315, mstatus=0, pollutant_code=060, min_value=0.1020, estatus=0, avg_revised=null, astatus=0, cou_revised=null, outlet_standard=25, max_value=0.6950, sstatus=0}}, Document{{cou_value=121.4165, flag=null, avg_value=1.4053, mstatus=0, pollutant_code=B01, min_value=0.0000, estatus=0, avg_revised=null, astatus=0, cou_revised=null, outlet_standard=null, max_value=9.3500, sstatus=0}}]', 30);
 *
 * 使用函数
 * select mongo_to_json(username) from users_to where age = 30
 */
@Description(name = "mongo_to_json", value = "translate mongo to json", extended = "SELECT mongo_to_json(key)")
public class MongoTran extends UDF {

    public String evaluate(String key) {

        Pattern pattern = Pattern.compile("Document\\{\\{(.+?)\\}\\}");
        Matcher matcher = pattern.matcher(key);
        List<String> objList = new ArrayList<>();
        while (matcher.find()) {
            String objectStr = matcher.group(1);
            String[] cols = objectStr.split(",");
            List<String> colList = new ArrayList<>();
            for (int i = 0; i < cols.length; i++) {
                int length = cols[i].split("=")[0].length();
                colList.add("\"" + cols[i].substring(0, length).trim() + "\":\"" + cols[i].substring(length + 1, cols[i].length()) + "\"");
            }
            objList.add("{" + Strings.join(colList, ",") + "}");
        }
        if (objList.size() > 1) {
            return "[" + Strings.join(objList, ",") + "]";
        } else if (objList.size() == 1) {
            return objList.get(0);
        }else{
            return "";
        }
    }
}

hive UDF
https://ispong.isxcode.com/hadoop/hive/hive UDF/
Author
ispong
Posted on
February 3, 2021
Licensed under