flink datasourcex
Last updated on January 18, 2025 am
🧙 Questions
基于datasourcex,实现数据源连接
☄️ Ideas
下载编译源码
建议切换到
v4.3.2
tag
git clone https://github.com/DTStack/DatasourceX.git
git checkout v4.3.2
本地maven安装oracle依赖
wget https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/12.2.0.1/ojdbc8-12.2.0.1.jar
mvn install:install-file -DgroupId=com.github.noraui -DartifactId=ojdbc8 -Dversion=12.2.0.1 -Dpackaging=jar -Dfile=ojdbc8-12.2.0.1.jar
删除部分模块
部分数据库的驱动需要在官网上下载,maven中央仓库中没有,且目前并不需要,所以先注释掉
vim pom.xml
<!-- 以下模块全部注释掉 -->
<!-- 达梦数据库 -->
<module>dm</module>
<!-- 南大gbase -->
<module>gbase</module>
<!-- 华毅数据库 -->
<module>greenplum6</module>
<!-- 人大金仓 -->
<module>kingbase8</module>
<!-- 亚马逊 -->
<module>aws_s3</module>
<module>s3</module>
<!-- vertica -->
<module>vertica</module>
<!-- 星环数据库 -->
<module>inceptor</module>
<!-- phoenix -->
<module>phoenix4_8</module>
<!-- hive3 -->
<module>hive3_cdp</module>
<!-- oceanbase -->
<module>oceanbase</module>
<module>test</module>
开始编译
mvn clean install -DskipTests
包路径: D:\definesys\DatasourceX\core\pluginLibs
安装datasourcex依赖
mvn install:install-file -DgroupId=com.dtstack.dtcenter -DartifactId=common.loader.core -Dversion=1.8.0-SNAPSHOT -Dpackaging=jar -Dfile=common.loader.core-1.8.0-SNAPSHOT.jar
spring项目使用
添加依赖
<dependency>
<groupId>com.dtstack.dtcenter</groupId>
<artifactId>common.loader.core</artifactId>
<version>1.8.0</version>
</dependency>
<!-- 自己打的依赖,可能缺少guava和apache collections -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
代码使用
先配置datasourcex config文件
@Configuration
public class DatasourcexConfig {
@Bean
public void initDatasourcex() {
System.out.println("加载Datasourcex");
ClientCache.setUserDir("D:\\definesys\\DatasourceX\\core\\pluginLibs");
}
}
@RestController
@RequestMapping("/dbx")
public class TestMysqlController {
public void test(ISourceDTO source, SqlQueryDTO queryDTO, IClient client) {
StopWatch stopWatch = new StopWatch("测试mysql数据库");
stopWatch.start("获取字段名称列表");
List columnClassInfo = client.getColumnClassInfo(source, queryDTO);
stopWatch.stop();
stopWatch.start("获取字段详情列表");
List columnMetaData = client.getColumnMetaData(source, queryDTO);
stopWatch.stop();
stopWatch.start("获取表名列表");
List tableList = client.getTableList(source, queryDTO);
stopWatch.stop();
stopWatch.start("获取数据列表");
List list = client.getPreview(source, queryDTO);
stopWatch.stop();
stopWatch.start("获取表详细信息");
Table table = client.getTable(source, queryDTO);
stopWatch.stop();
stopWatch.start("获取建表语句");
String createTableSql = client.getCreateTableSql(source, queryDTO);
stopWatch.stop();
stopWatch.start("获取当前数据源");
String currentDatabase = client.getCurrentDatabase(source);
stopWatch.stop();
stopWatch.start("判断表是否存在");
Boolean tableExistsInDatabase = client.isTableExistsInDatabase(source, queryDTO.getTableName(), currentDatabase);
stopWatch.stop();
stopWatch.start("执行sql");
client.executeSqlWithoutResultSet(source, queryDTO);
stopWatch.stop();
stopWatch.start("测试链接");
Boolean aBoolean = client.testCon(source);
stopWatch.stop();
stopWatch.start("获取表简单信息");
TableInfo tableInfo = client.getTableInfo(source, queryDTO.getTableName());
stopWatch.stop();
System.out.println(stopWatch.prettyPrint());
}
@PostMapping("/open/mysql")
public void testMysql(@RequestBody DbxReq dbxReq) {
IClient client = ClientCache.getClient(DataSourceType.MySQL.getVal());
Mysql5SourceDTO source = Mysql5SourceDTO.builder()
.url(dbxReq.getUrl())
.username(dbxReq.getUsername())
.password(dbxReq.getPassword())
.poolConfig(PoolConfig.builder().build())
.build();
SqlQueryDTO queryDTO = SqlQueryDTO.builder()
.tableName(dbxReq.getTableName())
.sql(dbxReq.getSql())
.limit(3)
.build();
test(source, queryDTO, client);
}
}
🔗 Links
flink datasourcex
https://ispong.isxcode.com/hadoop/flink/flink datasourcex/