flink UDF
Last updated on January 17, 2025 am
🧙 Questions
flink自定义函数,使用udf,连接redis,做数据字典转换。
☄️ Ideas
添加redis依赖
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.1</version>
</dependency>
创建自定义函数
package com.isxcode.acorn.demo;
import org.apache.flink.table.functions.ScalarFunction;
import redis.clients.jedis.Jedis;
public class LookupFunc extends ScalarFunction {
public String eval(String key, String type) {
Jedis jedis = new Jedis("127.0.0.1", 30119);
String value = jedis.hget(key, type);
jedis.close();
return value;
}
}
注册并使用自定义函数
注意:
demo_func(username,’F’) 必须要有一个字段,例如username,如果两个参数都是静态只,则会报语法错误
package com.isxcode.acorn.demo;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class SqlJob {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.getConfig().getConfiguration().setString("pipeline.name", "ispong-pipeline");
tEnv.createFunction("demo_func", LookupFunc.class);
tEnv.executeSql("CREATE TABLE from_table(\n" +
" username STRING,\n" +
" age INT,\n" +
" sex STRING\n" +
") WITH (\n" +
" 'connector'='jdbc',\n" +
" 'url'='jdbc:mysql://ispong-mac.local:3306/ispong_db',\n" +
" 'table-name'='users',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n" +
" 'username'='root',\n" +
" 'password'='ispong123')");
tEnv.executeSql(" select demo_func(username,'F') from from_table").print();
}
}
🔗 Links
flink UDF
https://ispong.isxcode.com/hadoop/flink/flink UDF/