spark UDF
Last updated on January 17, 2025 am
🧙 Questions
构建spark自定义函数
场景:
在sparksql中使用自定义函数,在函数中,与redis数据库关联,置换lookup中的数据
重点: 注意配置连接池,new JedisPool(new JedisPoolConfig(), host, port);
☄️ Ideas
定义
- UDF: User Defined Functions: 用户自定义函数
- UDAF: User Defined Aggregate Functions:用户自定义聚合函数
- UDTFs: User Defined Table-Generating Functions:用户自定义对表处理的函数
UDF
添加jedis依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.1</version>
</dependency>
创建自定义函数
package com.isxcode.star.server.function;
import org.apache.spark.sql.api.java.UDF2;
import redis.clients.jedis.Jedis;
public class LookupFunc implements UDF2<String, String, String> {
@Override
public String call(String key, String type) throws Exception {
Jedis jedis = new Jedis("127.0.0.1", 30119);
String value = jedis.hget(key, type);
jedis.close();
return value;
}
}
在session中注册和使用自定义函数
sparksession需要自己初始化
public void executeSessionSql() {
// 注册自定义函数
sparkSession.udf().register("demo_func", new LookupFunc(), DataTypes.StringType);
// 连接目标表
Dataset<Row> jdbcDF = sparkSession.read()
.format("jdbc")
.option(" driver", " com.mysql.cj.jdbc.Driver")
.option("url", "jdbc:mysql://ispong-mac.local:3306")
.option("dbtable", "ispong_db.users")
.option("user", "root")
.option("password", "ispong123")
.load();
jdbcDF.createOrReplaceTempView("users");
// 使用自定义函数
sparkSession.sql("select demo_func('vlookup','F') from users").show();
}
🔗 Links
spark UDF
https://ispong.isxcode.com/hadoop/spark/spark UDF/