flink chunjun自定义插件
Last updated on January 18, 2025 am
🧙 Questions
修改chunjun源码,开发支持api同步的插件,即apireader和apiwriter
并实现本地调试
☄️ Ideas
如果windows上未安装flink环境,可以使用 jetbrain gateway
本地调试(以stream为例子)
因为stream不需要任何数据源支持
放开 chunjun-local-test 模块注释
给stream插件打上断点
com.dtstack.chunjun.connector.stream.source.StreamSourceFactory
修改测试的json文件
/home/ispong/chunjun/chunjun-local-test/src/main/java/com/dtstack/chunjun/local/test/LocalTest.java
成功定位
自定义api插件编写
添加插件api插件模块
从chunjun-connector-mysql模块中复制一份chunjun-connector-api,并添加到pom.xml
定义目录结构
- conf/ApiConf.java
配置json请求参数
// 必填参数
public class ApiConf extends ChunJunCommonConf {
// api 请求方式
private String method;
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
}
- converter/ApiColumnConverter.java
行转换,将传入的row-value转换成flink可以识别的columnRowData,并推到flink管道中
- converter/ApiRawTypeConverter.java
转换字段类型
// 字段类型转换
public class ApiRawTypeConverter {
public static DataType apply(String type) {
switch (type.toUpperCase(Locale.ENGLISH)) {
case "INT":
return DataTypes.INT();
case "STRING":
return DataTypes.STRING();
default:
throw new UnsupportedTypeException(type);
}
}
}
- options/ApiOptions.java
json配置可选参数
// 可选参数
public class ApiOptions {
// 同步最多条数
public static final ConfigOption<Integer> MAX_ROWS =
key("max-rows").intType().defaultValue(10).withDescription("can sync max rows.");
}
- sink/ApiOutputFormat.java
// 数据写入逻辑
public class ApiOutputFormat extends BaseRichOutputFormat {
// 写出单条数据
@Override
protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordException {}
// 写出多条数据
@Override
protected void writeMultipleRecordsInternal() throws Exception {}
// 打开资源
@Override
protected void openInternal(int taskNumber, int numTasks) throws IOException {}
// 关闭资源
@Override
protected void closeInternal() throws IOException {}
}
- sink/ApiSinkFactory.java
public class ApiSinkFactory extends BaseRichOutputFormatBuilder {
public ApiSinkFactory(BaseRichOutputFormat format) {
super(format);
}
@Override
protected void checkFormat() {
System.out.println(1);
}
}
- source/ApiInputFormat.java
// 数据读取逻辑
public class ApiInputFormat extends BaseRichInputFormat {
private ApiConf apiConf;
protected ChunJunCommonConf commonConf;
private int dataNums;
protected transient List<String> datas;
// 创建数据分片
@Override
protected InputSplit[] createInputSplitsInternal(int minNumSplits) throws Exception {
InputSplit[] inputSplits = new InputSplit[minNumSplits];
for (int i = 0; i < minNumSplits; i++) {
inputSplits[i] = new GenericInputSplit(i, minNumSplits);
}
return inputSplits;
}
// 打开数据连接
@Override
protected void openInternal(InputSplit inputSplit) throws IOException {
datas = new ArrayList<>();
datas.add("zhangsan");
datas.add("lisi");
datas.add("wangwu");
datas.add("zhaoliu");
dataNums = datas.size();
}
// 读取一条数据
@Override
protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException {
// string 装RowData
try {
dataNums--;
return rowConverter.toInternal(datas.get(dataNums));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// 关闭数据连接
@Override
protected void closeInternal() throws IOException {
dataNums = 0;
}
// 判断数据是否读取完毕
@Override
public boolean reachedEnd() throws IOException {
return dataNums < 0;
}
public ApiConf getApiConf() {
return apiConf;
}
public void setApiConf(ApiConf apiConf) {
this.apiConf = apiConf;
}
}
- source/ApiInputFormatBuilder.java
// 为了构建data
public class ApiInputFormatBuilder extends BaseRichInputFormatBuilder<ApiInputFormat> {
public ApiInputFormatBuilder() {
super(new ApiInputFormat());
}
public void setApiConf(ApiConf apiConf) {
super.setConfig(apiConf);
format.setApiConf(apiConf);
}
@Override
protected void checkFormat() {
if (format.getApiConf().getMethod() == null || format.getApiConf().getMethod().isEmpty()) {
throw new IllegalArgumentException("method can not be empty");
}
}
}
- source/ApiSourceFactory.java
// 数据来源入口
public class ApiSourceFactory extends SourceFactory {
private final ApiConf apiConf;
public ApiSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) {
super(syncConf, env);
apiConf =
GsonUtil.GSON.fromJson(
GsonUtil.GSON.toJson(syncConf.getReader().getParameter()), ApiConf.class);
super.initCommonConf(apiConf);
}
// 获取数据类型转换器
@Override
public RawTypeConverter getRawTypeConverter() {
return ApiRawTypeConverter::apply;
}
// 构建数据读取对象,返回flink的dataStream
@Override
public DataStream<RowData> createSource() {
ApiInputFormatBuilder builder = new ApiInputFormatBuilder();
builder.setApiConf(apiConf);
HttpRestConfig httpRestConfig = new HttpRestConfig();
httpRestConfig.setFields("username");
AbstractRowConverter rowConverter = new HttpColumnConverter(httpRestConfig);
builder.setRowConverter(rowConverter);
return createInput(builder.finish());
}
}
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>chunjun-connectors</artifactId>
<groupId>com.dtstack.chunjun</groupId>
<version>1.12-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>chunjun-connector-api</artifactId>
<name>ChunJun : Connectors : API</name>
<dependencies>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-http</artifactId>
<version>1.12-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>log4j:log4j</exclude>
<exclude>ch.qos.logback:*</exclude>
</excludes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>shade.core.com.google.common</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<!-- here the phase you need -->
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<copy todir="${basedir}/../../${dist.dir}/connector/api/"
file="${basedir}/target/${project.artifactId}-${project.version}.jar"/>
<!--suppress UnresolvedMavenProperty -->
<move file="${basedir}/../../${dist.dir}/connector/api/${project.artifactId}-${project.version}.jar"
tofile="${basedir}/../../${dist.dir}/connector/api/${project.artifactId}.jar"/>
<delete>
<!--suppress UnresolvedMavenProperty -->
<fileset dir="${basedir}/../../${dist.dir}/connector/api/"
includes="${project.artifactId}-*.jar"
excludes="${project.artifactId}.jar"/>
</delete>
</tasks>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
测试json
{
"job": {
"content": [
{
"reader": {
"name": "apireader",
"parameter": {
"method": "get",
"column": [
{
"name": "username",
"type": "string"
}
],
"sliceRecordCount": [
"30"
],
"permitsPerSecond": 1
}
},
"writer": {
"parameter": {
"column": [
{
"name": "username",
"type": "string"
}
],
"print": true
},
"name": "streamwriter"
}
}
],
"setting": {
"errorLimit": {
"record": 100
},
"speed": {
"bytes": 0,
"channel": 1,
"readerChannel": 1,
"writerChannel": 1
}
}
}
}
本地测试
需要在chunjun-local-test中添加新插件依赖
控制台打印数据
🔗 Links
flink chunjun自定义插件
https://ispong.isxcode.com/hadoop/flink/flink chunjun自定义插件/