spark UDF

Last updated on January 17, 2025 am

🧙 Questions

构建spark自定义函数
场景:
在sparksql中使用自定义函数,在函数中,与redis数据库关联,置换lookup中的数据
重点: 注意配置连接池,new JedisPool(new JedisPoolConfig(), host, port);

☄️ Ideas

定义

  • UDF: User Defined Functions: 用户自定义函数
  • UDAF: User Defined Aggregate Functions:用户自定义聚合函数
  • UDTFs: User Defined Table-Generating Functions:用户自定义对表处理的函数

UDF

添加jedis依赖
<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>4.3.1</version>
</dependency>
创建自定义函数
package com.isxcode.star.server.function;

import org.apache.spark.sql.api.java.UDF2;
import redis.clients.jedis.Jedis;


public class LookupFunc implements UDF2<String, String, String> {

    @Override
    public String call(String key, String type) throws Exception {

        Jedis jedis = new Jedis("127.0.0.1", 30119);
        String value = jedis.hget(key, type);
        jedis.close();
        return value;
    }

}
在session中注册和使用自定义函数

sparksession需要自己初始化

public void executeSessionSql() {

      // 注册自定义函数
      sparkSession.udf().register("demo_func", new LookupFunc(), DataTypes.StringType);

      // 连接目标表
      Dataset<Row> jdbcDF = sparkSession.read()
          .format("jdbc")
          .option(" driver", " com.mysql.cj.jdbc.Driver")
          .option("url", "jdbc:mysql://ispong-mac.local:3306")
          .option("dbtable", "ispong_db.users")
          .option("user", "root")
          .option("password", "ispong123")
          .load();

      jdbcDF.createOrReplaceTempView("users");

      // 使用自定义函数
      sparkSession.sql("select demo_func('vlookup','F') from users").show();
}

spark UDF
https://ispong.isxcode.com/hadoop/spark/spark UDF/
Author
ispong
Posted on
February 21, 2023
Licensed under