flink chunjun自定义插件

Last updated on January 18, 2025 am

🧙 Questions

修改chunjun源码,开发支持api同步的插件,即apireader和apiwriter
并实现本地调试

☄️ Ideas

如果windows上未安装flink环境,可以使用 jetbrain gateway

本地调试(以stream为例子)

因为stream不需要任何数据源支持
放开 chunjun-local-test 模块注释

20221014101714

给stream插件打上断点
com.dtstack.chunjun.connector.stream.source.StreamSourceFactory

20221014102507

修改测试的json文件
/home/ispong/chunjun/chunjun-local-test/src/main/java/com/dtstack/chunjun/local/test/LocalTest.java

20221014102633

成功定位

20221014102717

20221014102739

自定义api插件编写

添加插件api插件模块

从chunjun-connector-mysql模块中复制一份chunjun-connector-api,并添加到pom.xml

20221014104815

20221014105027

定义目录结构

20221014175829

  • conf/ApiConf.java

配置json请求参数

// 必填参数
public class ApiConf extends ChunJunCommonConf {

    // api 请求方式
    private String method;

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }
}
  • converter/ApiColumnConverter.java

行转换,将传入的row-value转换成flink可以识别的columnRowData,并推到flink管道中

  • converter/ApiRawTypeConverter.java

转换字段类型

// 字段类型转换
public class ApiRawTypeConverter {

    public static DataType apply(String type) {
        switch (type.toUpperCase(Locale.ENGLISH)) {
            case "INT":
                return DataTypes.INT();
            case "STRING":
                return DataTypes.STRING();
            default:
                throw new UnsupportedTypeException(type);
        }
    }
}
  • options/ApiOptions.java

json配置可选参数

// 可选参数
public class ApiOptions {

    // 同步最多条数
    public static final ConfigOption<Integer> MAX_ROWS =
            key("max-rows").intType().defaultValue(10).withDescription("can sync max rows.");
}
  • sink/ApiOutputFormat.java
// 数据写入逻辑
public class ApiOutputFormat extends BaseRichOutputFormat {

    // 写出单条数据
    @Override
    protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordException {}

    // 写出多条数据
    @Override
    protected void writeMultipleRecordsInternal() throws Exception {}

    // 打开资源
    @Override
    protected void openInternal(int taskNumber, int numTasks) throws IOException {}

    // 关闭资源
    @Override
    protected void closeInternal() throws IOException {}
}
  • sink/ApiSinkFactory.java
public class ApiSinkFactory extends BaseRichOutputFormatBuilder {

    public ApiSinkFactory(BaseRichOutputFormat format) {
        super(format);
    }

    @Override
    protected void checkFormat() {
        System.out.println(1);
    }
}
  • source/ApiInputFormat.java
// 数据读取逻辑
public class ApiInputFormat extends BaseRichInputFormat {

    private ApiConf apiConf;

    protected ChunJunCommonConf commonConf;

    private int dataNums;

    protected transient List<String> datas;

    // 创建数据分片
    @Override
    protected InputSplit[] createInputSplitsInternal(int minNumSplits) throws Exception {

        InputSplit[] inputSplits = new InputSplit[minNumSplits];
        for (int i = 0; i < minNumSplits; i++) {
            inputSplits[i] = new GenericInputSplit(i, minNumSplits);
        }

        return inputSplits;
    }

    // 打开数据连接
    @Override
    protected void openInternal(InputSplit inputSplit) throws IOException {

        datas = new ArrayList<>();
        datas.add("zhangsan");
        datas.add("lisi");
        datas.add("wangwu");
        datas.add("zhaoliu");
        dataNums = datas.size();
    }

    // 读取一条数据
    @Override
    protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException {

        // string 装RowData
        try {
            dataNums--;
            return rowConverter.toInternal(datas.get(dataNums));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    // 关闭数据连接
    @Override
    protected void closeInternal() throws IOException {

        dataNums = 0;
    }

    // 判断数据是否读取完毕
    @Override
    public boolean reachedEnd() throws IOException {

        return dataNums < 0;
    }

    public ApiConf getApiConf() {
        return apiConf;
    }

    public void setApiConf(ApiConf apiConf) {
        this.apiConf = apiConf;
    }
}
  • source/ApiInputFormatBuilder.java
// 为了构建data
public class ApiInputFormatBuilder extends BaseRichInputFormatBuilder<ApiInputFormat> {

    public ApiInputFormatBuilder() {
        super(new ApiInputFormat());
    }

    public void setApiConf(ApiConf apiConf) {
        super.setConfig(apiConf);
        format.setApiConf(apiConf);
    }

    @Override
    protected void checkFormat() {
        if (format.getApiConf().getMethod() == null || format.getApiConf().getMethod().isEmpty()) {
            throw new IllegalArgumentException("method can not be empty");
        }
    }
}
  • source/ApiSourceFactory.java
// 数据来源入口
public class ApiSourceFactory extends SourceFactory {

    private final ApiConf apiConf;

    public ApiSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) {
        super(syncConf, env);
        apiConf =
                GsonUtil.GSON.fromJson(
                        GsonUtil.GSON.toJson(syncConf.getReader().getParameter()), ApiConf.class);
        super.initCommonConf(apiConf);
    }

    // 获取数据类型转换器
    @Override
    public RawTypeConverter getRawTypeConverter() {
        return ApiRawTypeConverter::apply;
    }

    // 构建数据读取对象,返回flink的dataStream
    @Override
    public DataStream<RowData> createSource() {

        ApiInputFormatBuilder builder = new ApiInputFormatBuilder();
        builder.setApiConf(apiConf);
        HttpRestConfig httpRestConfig = new HttpRestConfig();
        httpRestConfig.setFields("username");
        AbstractRowConverter rowConverter = new HttpColumnConverter(httpRestConfig);
        builder.setRowConverter(rowConverter);
        return createInput(builder.finish());
    }
}
  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>chunjun-connectors</artifactId>
        <groupId>com.dtstack.chunjun</groupId>
        <version>1.12-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>chunjun-connector-api</artifactId>
    <name>ChunJun : Connectors : API</name>

    <dependencies>
  
        <dependency>
            <groupId>com.dtstack.chunjun</groupId>
            <artifactId>chunjun-connector-http</artifactId>
            <version>1.12-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.slf4j:slf4j-api</exclude>
                                    <exclude>log4j:log4j</exclude>
                                    <exclude>ch.qos.logback:*</exclude>
                                </excludes>
                            </artifactSet>
                            <relocations>
                                <relocation>
                                    <pattern>com.google.common</pattern>
                                    <shadedPattern>shade.core.com.google.common</shadedPattern>
                                </relocation>
                            </relocations>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-antrun-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-resources</id>
                        <!-- here the phase you need -->
                        <phase>package</phase>
                        <goals>
                            <goal>run</goal>
                        </goals>
                        <configuration>
                            <tasks>
                                <copy todir="${basedir}/../../${dist.dir}/connector/api/"
                                      file="${basedir}/target/${project.artifactId}-${project.version}.jar"/>
                                <!--suppress UnresolvedMavenProperty -->
                                <move file="${basedir}/../../${dist.dir}/connector/api/${project.artifactId}-${project.version}.jar"
                                      tofile="${basedir}/../../${dist.dir}/connector/api/${project.artifactId}.jar"/>
                                <delete>
                                    <!--suppress UnresolvedMavenProperty -->
                                    <fileset dir="${basedir}/../../${dist.dir}/connector/api/"
                                             includes="${project.artifactId}-*.jar"
                                             excludes="${project.artifactId}.jar"/>
                                </delete>
                            </tasks>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
测试json
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "apireader",
          "parameter": {
            "method": "get",
            "column": [
              {
                "name": "username",
                "type": "string"
              }
            ],
            "sliceRecordCount": [
              "30"
            ],
            "permitsPerSecond": 1
          }
        },
        "writer": {
          "parameter": {
            "column": [
              {
                "name": "username",
                "type": "string"
              }
            ],
            "print": true
          },
          "name": "streamwriter"
        }
      }
    ],
    "setting": {
      "errorLimit": {
        "record": 100
      },
      "speed": {
        "bytes": 0,
        "channel": 1,
        "readerChannel": 1,
        "writerChannel": 1
      }
    }
  }
}
本地测试

需要在chunjun-local-test中添加新插件依赖

20221014141849

控制台打印数据

20221014182823


flink chunjun自定义插件
https://ispong.isxcode.com/hadoop/flink/flink chunjun自定义插件/
Author
ispong
Posted on
October 12, 2022
Licensed under