flink datasourcex

Last updated on January 18, 2025 am

🧙 Questions

基于datasourcex,实现数据源连接

☄️ Ideas

下载编译源码

建议切换到v4.3.2tag

git clone https://github.com/DTStack/DatasourceX.git
git checkout v4.3.2
本地maven安装oracle依赖
wget https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/12.2.0.1/ojdbc8-12.2.0.1.jar
mvn install:install-file -DgroupId=com.github.noraui -DartifactId=ojdbc8 -Dversion=12.2.0.1 -Dpackaging=jar -Dfile=ojdbc8-12.2.0.1.jar

删除部分模块

部分数据库的驱动需要在官网上下载,maven中央仓库中没有,且目前并不需要,所以先注释掉

vim pom.xml
<!-- 以下模块全部注释掉 -->
<!-- 达梦数据库 -->
<module>dm</module>
<!-- 南大gbase -->
<module>gbase</module>
<!-- 华毅数据库 -->
<module>greenplum6</module>
<!-- 人大金仓 -->
<module>kingbase8</module>
<!-- 亚马逊 -->
<module>aws_s3</module>
<module>s3</module>
<!-- vertica  -->
<module>vertica</module>
<!-- 星环数据库 -->
<module>inceptor</module>
<!-- phoenix -->
<module>phoenix4_8</module>
<!-- hive3 -->
<module>hive3_cdp</module>
<!-- oceanbase -->
<module>oceanbase</module>
<module>test</module>

开始编译

mvn clean install -DskipTests

20220921114119

包路径: D:\definesys\DatasourceX\core\pluginLibs

20220921114221

安装datasourcex依赖

20220921114356

mvn install:install-file -DgroupId=com.dtstack.dtcenter -DartifactId=common.loader.core -Dversion=1.8.0-SNAPSHOT -Dpackaging=jar -Dfile=common.loader.core-1.8.0-SNAPSHOT.jar

spring项目使用

添加依赖
<dependency>
    <groupId>com.dtstack.dtcenter</groupId>
    <artifactId>common.loader.core</artifactId>
    <version>1.8.0</version>
</dependency>

<!-- 自己打的依赖,可能缺少guava和apache collections -->
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

<dependency>
    <groupId>commons-collections</groupId>
    <artifactId>commons-collections</artifactId>
    <version>3.2.2</version>
</dependency>
代码使用

先配置datasourcex config文件

@Configuration
public class DatasourcexConfig {

    @Bean
    public void initDatasourcex() {

         System.out.println("加载Datasourcex");
        ClientCache.setUserDir("D:\\definesys\\DatasourceX\\core\\pluginLibs");
    }

}
@RestController
@RequestMapping("/dbx")
public class TestMysqlController {

    public void test(ISourceDTO source, SqlQueryDTO queryDTO, IClient client) {

        StopWatch stopWatch = new StopWatch("测试mysql数据库");

        stopWatch.start("获取字段名称列表");
        List columnClassInfo = client.getColumnClassInfo(source, queryDTO);
        stopWatch.stop();

        stopWatch.start("获取字段详情列表");
        List columnMetaData = client.getColumnMetaData(source, queryDTO);
        stopWatch.stop();

        stopWatch.start("获取表名列表");
        List tableList = client.getTableList(source, queryDTO);
        stopWatch.stop();

        stopWatch.start("获取数据列表");
        List list = client.getPreview(source, queryDTO);
        stopWatch.stop();

        stopWatch.start("获取表详细信息");
        Table table = client.getTable(source, queryDTO);
        stopWatch.stop();

        stopWatch.start("获取建表语句");
        String createTableSql = client.getCreateTableSql(source, queryDTO);
        stopWatch.stop();

        stopWatch.start("获取当前数据源");
        String currentDatabase = client.getCurrentDatabase(source);
        stopWatch.stop();

        stopWatch.start("判断表是否存在");
        Boolean tableExistsInDatabase = client.isTableExistsInDatabase(source, queryDTO.getTableName(), currentDatabase);
        stopWatch.stop();

        stopWatch.start("执行sql");
        client.executeSqlWithoutResultSet(source, queryDTO);
        stopWatch.stop();

        stopWatch.start("测试链接");
        Boolean aBoolean = client.testCon(source);
        stopWatch.stop();

        stopWatch.start("获取表简单信息");
        TableInfo tableInfo = client.getTableInfo(source, queryDTO.getTableName());
        stopWatch.stop();

        System.out.println(stopWatch.prettyPrint());
    }

    @PostMapping("/open/mysql")
    public void testMysql(@RequestBody DbxReq dbxReq) {

        IClient client = ClientCache.getClient(DataSourceType.MySQL.getVal());

        Mysql5SourceDTO source = Mysql5SourceDTO.builder()
            .url(dbxReq.getUrl())
            .username(dbxReq.getUsername())
            .password(dbxReq.getPassword())
            .poolConfig(PoolConfig.builder().build())
            .build();

        SqlQueryDTO queryDTO = SqlQueryDTO.builder()
            .tableName(dbxReq.getTableName())
            .sql(dbxReq.getSql())
            .limit(3)
            .build();

        test(source, queryDTO, client);
    }
}

flink datasourcex
https://ispong.isxcode.com/hadoop/flink/flink datasourcex/
Author
ispong
Posted on
September 20, 2022
Licensed under