flink UDF

Last updated on November 22, 2024 pm

🧙 Questions

flink自定义函数,使用udf,连接redis,做数据字典转换。

☄️ Ideas

添加redis依赖
<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>4.3.1</version>
</dependency>
创建自定义函数
package com.isxcode.acorn.demo;

import org.apache.flink.table.functions.ScalarFunction;
import redis.clients.jedis.Jedis;

public class LookupFunc extends ScalarFunction {

    public String eval(String key, String type) {

        Jedis jedis = new Jedis("127.0.0.1", 30119);
        String value = jedis.hget(key, type);
        jedis.close();
        return value;
    }
}
注册并使用自定义函数

注意:
demo_func(username,’F’) 必须要有一个字段,例如username,如果两个参数都是静态只,则会报语法错误

package com.isxcode.acorn.demo;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class SqlJob {

    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);
        tEnv.getConfig().getConfiguration().setString("pipeline.name", "ispong-pipeline");

        tEnv.createFunction("demo_func", LookupFunc.class);

        tEnv.executeSql("CREATE TABLE from_table(\n" +
            "    username STRING,\n" +
            "    age INT,\n" +
            "    sex STRING\n" +
            ") WITH (\n" +
            "    'connector'='jdbc',\n" +
            "    'url'='jdbc:mysql://ispong-mac.local:3306/ispong_db',\n" +
            "    'table-name'='users',\n" +
            "    'driver'='com.mysql.cj.jdbc.Driver',\n" +
            "    'username'='root',\n" +
            "    'password'='ispong123')");

        tEnv.executeSql(" select demo_func(username,'F') from from_table").print();
    }
}

flink UDF
https://ispong.isxcode.com/hadoop/flink/flink UDF/
Author
ispong
Posted on
February 28, 2023
Licensed under