sqoop2实战 spring集成sqoop2

Last updated on September 15, 2024 pm

🧙 Questions

☄️Ideas

导入依赖

  • maven
<!-- https://mvnrepository.com/artifact/org.apache.sqoop/sqoop-client -->
<dependency>
    <groupId>org.apache.sqoop</groupId>
    <artifactId>sqoop-client</artifactId>
    <version>1.99.7</version>
</dependency>

-gradle

// https://mvnrepository.com/artifact/org.apache.sqoop/sqoop-client
implementation group: 'org.apache.sqoop', name: 'sqoop-client', version: '1.99.7'

sqoop client url 地址的结尾一定要有 /

private SqoopClient sqoopClient;

public SqoopClient initSqoopClient() {
	String url = "http://isxcode:30125/sqoop/";
    return new SqoopClient(url);
}

public void updateSqoopClientUrl(){
	sqoopClient.setServerUrl("http://isxcode:30125/sqoop/");
}
  • 创建mysql Link
/**
 * mysql link
 */
public MLink createMysqlLink(String linkName, String executor, ConnectInfo connectInfo) {

    // 执行创建link类型为generic-jdbc-connector
    MLink link = sqoopClient.createLink("generic-jdbc-connector");
    link.setName(linkName);
    link.setCreationUser(executor);

    // 配置link的配置信息
    MLinkConfig linkConfig = link.getConnectorLinkConfig();
    linkConfig.getStringInput("linkConfig.connectionString").setValue(connectInfo.getJdbcUrl());
    linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.cj.jdbc.Driver");
    linkConfig.getStringInput("linkConfig.username").setValue(connectInfo.getUsername());
    linkConfig.getStringInput("linkConfig.password").setValue(connectInfo.getPasswd());
    linkConfig.getStringInput("dialect.identifierEnclose").setValue("");

    // 保存link
    Status status = sqoopClient.saveLink(link);
    if (status.canProceed()) {
        log.info("sqoop2 link 创建成功:" + link.getName());
        return link;
    } else {
        log.info("sqoop2 link 创建异常: link name:" + linkName);
        return null;
    }
}
  • 创建hdfs link
/**
 * 创建hdfs Link
 */
public MLink createHdfsLink(String linkName, String executor, ConnectInfo connectInfo) {

    // 执行创建link类型为generic-jdbc-connector
    MLink link = sqoopClient.createLink("hdfs-connector");
    link.setName(linkName);
    link.setCreationUser(executor);

    // 查询表的地址
    TableInfo tableInfo = tableService.getHiveFormatTableInfo(connectInfo);

    // 配置link的配置信息
    MLinkConfig linkConfig = link.getConnectorLinkConfig();
    linkConfig.getStringInput("linkConfig.uri").setValue(tableInfo.getHiveHdfsUrl());

    // 保存link
    Status status = sqoopClient.saveLink(link);
    if (status.canProceed()) {
        log.info("sqoop2 link 创建成功:" + link.getName());
        return link;
    } else {
        log.info("sqoop2 link 创建异常: link name:" + linkName);
        return null;
    }
}

创建job

  • Mysql To Hdfs
/**
 * mysql 同步到 hdfs
 */
public String createMysqlToHdfsJob(String jobName, String executor, MLink fromLink, MLink toLink, ConnectInfo fromConnectInfo, ConnectInfo toConnectionInfo, JobSyncWorkConfig jobSyncWorkConfig) {

    // 创建job
    MJob job = sqoopClient.createJob(fromLink.getName(), toLink.getName());
    job.setName(jobName);
    job.setCreationUser(executor);

    // 做字段对应
    TableInfo hiveTableInfo = tableService.getHiveTableInfo(toConnectionInfo);
    Map<String, String> dataMapping = new HashMap<>();
    List<ColumnMappingDto> columnInfoDtos = JSON.parseArray(jobSyncWorkConfig.getMappingConfig(), ColumnMappingDto.class);
    columnInfoDtos.forEach(e -> dataMapping.put(e.getTo(), e.getFrom()));
    List<TableColumnInfo> tableColumnInfos = hiveTableInfo.getTableColumnInfos();
    List<String> columnAlias = new ArrayList<>();
    tableColumnInfos.forEach(e -> {
        columnAlias.add(dataMapping.get(e.getField()));
    });

    // 配置from配置
    MFromConfig fromJobConfig = job.getFromJobConfig();
    fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue(columnInfoDtos.get(0).getTo());
    String fromTableSql = "select " + Strings.join(columnAlias, ",") + " from " + fromConnectInfo.getTableName() + " where ${CONDITIONS} ";
    if (jobSyncWorkConfig.getConditionSql() != null && !jobSyncWorkConfig.getConditionSql().isEmpty()) {
        fromTableSql = fromTableSql + " and (" + jobSyncWorkConfig.getConditionSql() + ")";
    }
    fromJobConfig.getStringInput("fromJobConfig.sql").setValue(fromTableSql);

    // 配置to配置
    MToConfig toJobConfig = job.getToJobConfig();
    TableInfo toTableInfo = tableService.getHiveFormatTableInfo(toConnectionInfo);
    toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue(toTableInfo.getHiveTableLocation());
    toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");
    toJobConfig.getBooleanInput("toJobConfig.appendMode").setValue(true);
    toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(false);
    toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue(toTableInfo.getHiveTableLocation());
    toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE");
    toJobConfig.getStringInput("toJobConfig.nullValue").setValue("N");

    // 配置driver
    MDriverConfig driverConfig = job.getDriverConfig();
    driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);

    // 保存job
    Status status = sqoopClient.saveJob(job);
    if (status.canProceed()) {
        log.info("job创建成功:" + job.getName());
        return job.getName();
    } else {
        log.info("job创建失败:" + job.getName());
        return null;
    }
}
  • Hdfs to Mysql
public String createHdfsToMysqlJob(String jobName, String executor, MLink fromLink, MLink toLink, ConnectInfo fromConnectInfo, ConnectInfo toConnectionInfo, JobSyncWorkConfig jobSyncWorkConfig) {

    // 创建job
    MJob job = sqoopClient.createJob(fromLink.getName(), toLink.getName());
    job.setName(jobName);
    job.setCreationUser(executor);

    TableInfo hiveFormatTableInfo = tableService.getHiveFormatTableInfo(fromConnectInfo);

    // 配置from配置
    MFromConfig fromJobConfig = job.getFromJobConfig();
    fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue(hiveFormatTableInfo.getHiveTableLocation());

    // 配置to配置
    MToConfig toJobConfig = job.getToJobConfig();
    toJobConfig.getStringInput("toJobConfig.tableName").setValue(toConnectionInfo.getTableName());
//        toJobConfig.getStringInput("toJobConfig.columnList").setValue(toConnectionInfo.getTableName());

    // 配置driver
    MDriverConfig driverConfig = job.getDriverConfig();
    driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);

    // 保存job
    Status status = sqoopClient.saveJob(job);
    if (status.canProceed()) {
        log.info("job创建成功:" + job.getName());
        return job.getName();
    } else {
        log.info("job创建失败:" + job.getName());
        return null;
    }
}

执行job

public void startJob(String jobName) {

    MSubmission submission = sqoopClient.startJob(jobName);
    log.info("Job Submission Status : " + submission.getStatus());

    if (submission.getStatus().isRunning() && submission.getProgress() != -1) {
        log.info("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
    }
}
@RequestMapping("hiveToMysql")
public String hiveToMysql() {

    MLink hdfsLink = sqoopService.createHdfsLink();
    MLink mysqlLink = sqoopService.createMysqlLink();
    String jobName = sqoopService.createHdfsToMysqlJob(hdfsLink, mysqlLink);
    sqoopService.startJob(jobName);

    return "hello world";
}

@RequestMapping("mysqlToHdfs")
public String mysqlToHdfs() {

    MLink hdfsLink = sqoopService.createHdfsLink();
    MLink mysqlLink = sqoopService.createMysqlLink();
    String jobName = sqoopService.createMysqlToHdfsJob(mysqlLink, hdfsLink);
    sqoopService.startJob(jobName);

    return "hello world";
}
/**
     * 初始化
     */
    public SqoopClient initSqoopClient() {
        String url = "http://isxcode:30125/sqoop/";
        return new SqoopClient(url);
    }

    /**
     * mysql link
     */
    public MLink createMysqlLink(String linkName, String executor, ConnectInfo connectInfo) {

        // 执行创建link类型为generic-jdbc-connector
        MLink link = sqoopClient.createLink("generic-jdbc-connector");
        link.setName(linkName);
        link.setCreationUser(executor);

        // 配置link的配置信息
        MLinkConfig linkConfig = link.getConnectorLinkConfig();
        linkConfig.getStringInput("linkConfig.connectionString").setValue(connectInfo.getJdbcUrl());
        linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.cj.jdbc.Driver");
        linkConfig.getStringInput("linkConfig.username").setValue(connectInfo.getUsername());
        linkConfig.getStringInput("linkConfig.password").setValue(connectInfo.getPasswd());
        linkConfig.getStringInput("dialect.identifierEnclose").setValue("");

        // 保存link
        Status status = sqoopClient.saveLink(link);
        if (status.canProceed()) {
            log.info("sqoop2 link 创建成功:" + link.getName());
            return link;
        } else {
            log.info("sqoop2 link 创建异常: link name:" + linkName);
            return null;
        }
    }

    /**
     * 创建hdfs Link
     */
    public MLink createHdfsLink(String linkName, String executor, ConnectInfo connectInfo) {

        // 执行创建link类型为generic-jdbc-connector
        MLink link = sqoopClient.createLink("hdfs-connector");
        link.setName(linkName);
        link.setCreationUser(executor);

        // 查询表的地址
        TableInfo tableInfo = tableService.getDetailTableInfo(connectInfo);

        // 配置link的配置信息
        MLinkConfig linkConfig = link.getConnectorLinkConfig();
        linkConfig.getStringInput("linkConfig.uri").setValue(tableInfo.getHiveHdfsUrl());

        // 保存link
        Status status = sqoopClient.saveLink(link);
        if (status.canProceed()) {
            log.info("sqoop2 link 创建成功:" + link.getName());
            return link;
        } else {
            log.info("sqoop2 link 创建异常: link name:" + linkName);
            return null;
        }
    }

    /**
     * mysql 同步到 hdfs
     */
    public String createMysqlToHdfsJob(String jobName, String executor, MLink fromLink, MLink toLink, ConnectInfo fromConnectInfo, ConnectInfo toConnectionInfo, JobSyncWorkConfig jobSyncWorkConfig) {

        // 创建job
        MJob job = sqoopClient.createJob(fromLink.getName(), toLink.getName());
        job.setName(jobName);
        job.setCreationUser(executor);

        // 做字段对应
        TableColumn hiveTableColumn = tableService.getTableColumns(toConnectionInfo);
        Map<String, String> dataMapping = new HashMap<>();
        List<ColumnMappingDto> columnInfoDtos = JSON.parseArray(jobSyncWorkConfig.getMappingConfig(), ColumnMappingDto.class);
        if (columnInfoDtos == null || columnInfoDtos.isEmpty()) {
            throw new XDapBizException(JobsExceptionEnum.MOVE_WORK_TO_SELF);
        }
        columnInfoDtos.forEach(e -> dataMapping.put(e.getTo(), e.getFrom()));
        List<TableColumnInfo> tableColumnInfos = hiveTableColumn.getTableColumnInfos();
        List<String> columnAlias = new ArrayList<>();
        tableColumnInfos.forEach(e -> {
            columnAlias.add(dataMapping.get(e.getField()) + " as " + e.getField());
//            columnAlias.add(dataMapping.get(e.getField()));
        });

        // 配置from配置
        MFromConfig fromJobConfig = job.getFromJobConfig();
        fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue(columnInfoDtos.get(0).getTo());
        String fromTableSql = "select " + Strings.join(columnAlias, ',') + " from " + fromConnectInfo.getTableName() + " where ${CONDITIONS} ";
        if (jobSyncWorkConfig.getConditionSql() != null && !jobSyncWorkConfig.getConditionSql().isEmpty()) {
            fromTableSql = fromTableSql + " and (" + jobSyncWorkConfig.getConditionSql() + ")";
        }
        fromJobConfig.getStringInput("fromJobConfig.sql").setValue(fromTableSql);

        // 配置to配置
        MToConfig toJobConfig = job.getToJobConfig();
        TableInfo toTableInfo = tableService.getDetailTableInfo(toConnectionInfo);
        toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue(toTableInfo.getHiveTableLocation());
        toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");
        toJobConfig.getBooleanInput("toJobConfig.appendMode").setValue(true);
        toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(false);
        toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue(toTableInfo.getHiveTableLocation());
        toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE");
        toJobConfig.getStringInput("toJobConfig.nullValue").setValue("N");

        // 配置driver
        MDriverConfig driverConfig = job.getDriverConfig();
        driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);

        // 保存job
        Status status = sqoopClient.saveJob(job);
        if (status.canProceed()) {
            log.info("job创建成功:" + job.getName());
            return job.getName();
        } else {
            log.info("job创建失败:" + job.getName());
            return null;
        }
    }

    public String createHdfsToMysqlJob(String jobName, String executor, MLink fromLink, MLink toLink, ConnectInfo fromConnectInfo, ConnectInfo toConnectionInfo, JobSyncWorkConfig jobSyncWorkConfig) {

        // 创建job
        MJob job = sqoopClient.createJob(fromLink.getName(), toLink.getName());
        job.setName(jobName);
        job.setCreationUser(executor);

        TableInfo hiveFormatTableInfo = tableService.getDetailTableInfo(fromConnectInfo);

        // 配置from配置
        MFromConfig fromJobConfig = job.getFromJobConfig();
        fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue(hiveFormatTableInfo.getHiveTableLocation());

        // 配置to配置
        MToConfig toJobConfig = job.getToJobConfig();
        toJobConfig.getStringInput("toJobConfig.tableName").setValue(toConnectionInfo.getTableName());

        // 配置driver
        MDriverConfig driverConfig = job.getDriverConfig();
        driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);

        // 保存job
        Status status = sqoopClient.saveJob(job);
        if (status.canProceed()) {
            log.info("job创建成功:" + job.getName());
            return job.getName();
        } else {
            log.info("job创建失败:" + job.getName());
            return null;
        }
    }

    public void startJob(String jobName) {

        MSubmission submission = sqoopClient.startJob(jobName);
        log.info("Job Submission Status : " + submission.getStatus());

        if (submission.getStatus().isRunning() && submission.getProgress() != -1) {
            log.info("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
        }
    }

    
    /**
     * 执行同步数据作业d
     */
    public void executeSyncWork(String workId, String executeId) {

        long nowTime = System.currentTimeMillis();

        // 做同步日志
        StringBuilder logBuilder = new StringBuilder();
        logBuilder.append("正在提交...").append("\n");
        JobWorkLogs jobWorkLogs = logDao.getWorkLog(executeId);
        if (jobWorkLogs == null) {
            jobWorkLogs = new JobWorkLogs(snowflakeIdWorker.nextId(), executeId, logBuilder.toString(), "PENDING");
            logDao.addWorkLog(jobWorkLogs);
        }

        // 获取配置数据同步作业配置
        JobSyncWorkConfig workSyncConfig = syncWorkDao.getWorkSyncConfig(workId);

        // Note sqoop 相关Name最多32位
        String workIdLastStr = workId.substring(workId.length() - 8);
        String snowIdLastStr = snowflakeIdWorker.nextId().substring(workId.length() - 8);

        // 查询来源数据源
        String fromDbId = workSyncConfig.getFromDbId();
        DatasourceEntity fromDBInfo = datasourceDao.getDatasourceById(fromDbId);
        ConnectInfo fromDbConnectInfo = JSON.parseObject(fromDBInfo.getConnectInfo(), ConnectInfo.class);
        fromDbConnectInfo.setPasswd(decrypt(fromDbConnectInfo.getPasswd()));
        fromDbConnectInfo.setTableName(workSyncConfig.getFromDbTable());

        // 初始化来源link信息
        MLink fromLink;
        String fromLinkName = "FROM_LINK_" + workIdLastStr + "_" + snowIdLastStr;
        if ("MySQL".equals(fromDBInfo.getType())) {
            fromLink = sqoopService.createMysqlLink(fromLinkName, MpaasSession.getCurrentUser(), fromDbConnectInfo);
        } else if ("Hive".equals(fromDBInfo.getType())) {
            fromLink = sqoopService.createHdfsLink(fromLinkName, MpaasSession.getCurrentUser(), fromDbConnectInfo);
        } else {
            throw new XDapBizException(JobsExceptionEnum.DB_IS_NOT_SUPPORT);
        }

        // 查询目标数据源
        String toDbId = workSyncConfig.getToDbId();
        DatasourceEntity toDBInfo = datasourceDao.getDatasourceById(toDbId);
        ConnectInfo toDbConnectInfo = JSON.parseObject(toDBInfo.getConnectInfo(), ConnectInfo.class);
        toDbConnectInfo.setPasswd(decrypt(toDbConnectInfo.getPasswd()));
        toDbConnectInfo.setTableName(workSyncConfig.getToDbTable());

        //  初始化目标link信息
        MLink toLink;
        String toLinkName = "TO_LINK_" + workIdLastStr + "_" + snowIdLastStr;
        if ("MySQL".equals(toDBInfo.getType())) {
            toLink = sqoopService.createMysqlLink(toLinkName, MpaasSession.getCurrentUser(), toDbConnectInfo);
        } else if ("Hive".equals(toDBInfo.getType())) {
            toLink = sqoopService.createHdfsLink(toLinkName, MpaasSession.getCurrentUser(), toDbConnectInfo);
        } else {
            throw new XDapBizException(JobsExceptionEnum.DB_IS_NOT_SUPPORT);
        }

        // 初始化job
        String jobName = "JOB_" + workIdLastStr + "_" + snowIdLastStr;
        if ("MySQL".equals(fromDBInfo.getType()) && "Hive".equals(toDBInfo.getType())) {
            jobName = sqoopService.createMysqlToHdfsJob(jobName, MpaasSession.getCurrentUser(), fromLink, toLink, fromDbConnectInfo, toDbConnectInfo, workSyncConfig);
        } else if ("Hive".equals(fromDBInfo.getType()) && "MySQL".equals(toDBInfo.getType())) {
            jobName = sqoopService.createHdfsToMysqlJob(jobName, MpaasSession.getCurrentUser(), fromLink, toLink, fromDbConnectInfo, toDbConnectInfo, workSyncConfig);
        } else {
            throw new XDapBizException(JobsExceptionEnum.DB_IS_NOT_SUPPORT);
        }

        // 执行job
        try {
            logBuilder.append("提交任务成功").append("\n");
            logBuilder.append("正在等待计算资源").append("\n");
            logDao.updateWorkLog(executeId, logBuilder.toString());

            sqoopService.startJob(jobName);
        } catch (Exception e) {

            logBuilder.append(DateUtils.getNowDateStr()).append(" ERROR:").append(e.getMessage()).append("\n");
            logBuilder.append(DateUtils.getNowDateStr()).append(" ERROR Current task status: ERROR").append("\n");
            logDao.updateWorkLog(executeId, logBuilder.toString(), "OVER");

//            throw new XDapBizException(JobsExceptionEnum.SYNC_DATA_IS_ERROR);
        }

        // 运行成功
        logBuilder.append(DateUtils.getNowDateStr()).append(" INFO Cost time is: +").append(DateUtils.executeSecondTime(nowTime)).append("+s").append("\n");
        logBuilder.append(DateUtils.getNowDateStr()).append(" INFO Current task status: SUCCESS").append("\n");
        logDao.updateWorkLog(executeId, logBuilder.toString(), "OVER");

        // 保存血缘关系
        ResTableLineages resTableLineages = new ResTableLineages();
        resTableLineages.setId(snowflakeIdWorker.nextId());
        resTableLineages.setWorkId(workId);

        DatasourceEntity fromDbInfo = datasourceDao.getDatasourceById(workSyncConfig.getFromDbId());
        if (fromDbInfo != null) {
            ConnectInfo connectInfo = JSON.parseObject(fromDbInfo.getConnectInfo(), ConnectInfo.class);
            ResTableInfos tableInfo = tableAssetDao.getTableInfoAndSite(workSyncConfig.getFromDbTable(), connectInfo.getJdbcUrl(), "OUTER");
            if (tableInfo != null) {
                resTableLineages.setFromTableId(tableInfo.getId());
            }
        }

        DatasourceEntity toDbInfo = datasourceDao.getDatasourceById(workSyncConfig.getToDbId());
        if (toDbInfo != null) {
            ConnectInfo connectInfo = JSON.parseObject(toDbInfo.getConnectInfo(), ConnectInfo.class);
            ResTableInfos tableInfo = tableAssetDao.getTableInfoAndSite(workSyncConfig.getToDbTable(), connectInfo.getJdbcUrl(), "INNER");
            if (tableInfo != null) {
                resTableLineages.setToTableId(tableInfo.getId());
            }
        }

        if (resTableLineages.getToTableId() != null && resTableLineages.getFromTableId() != null && workId != null) {
            ResTableLineages tableIineage = tableAssetDao.getTableIineage(resTableLineages.getFromTableId(), resTableLineages.getToTableId(), workId);
            if (tableIineage == null) {
                tableAssetDao.addTableLineage(resTableLineages);
            }
        }
    }

Note

配置参数
  • jdbc connector
参数 类型 描述
fromJobConfig.partitionColumn string 分区字段
fromJobConfig.columnList list 选择同步的字段
fromJobConfig.schemaName string 数据源的名字
fromJobConfig.allowNullValueInPartitionColumn boolean 分区字段是否可以为null
fromJobConfig.sql string 自定义sql(不可以和tableName等类型同时使用)
fromJobConfig.tableName string 同步表名
fromJobConfig.boundaryQuery string
incrementalRead.checkColumn string
incrementalRead.lastValue string
toJobConfig.tableName string 同步表名
toJobConfig.schemaName string 数据源的名字
toJobConfig.columnList string 需要同步的字段
toJobConfig.stageTableName string 使用临时表名
toJobConfig.shouldClearStageTable boolean 是否删除临时表
  • hdfs connector
参数 类型 描述
fromJobConfig.inputDirectory string 输出文件地址
fromJobConfig.overrideNullValue boolean 是否重写null值
fromJobConfig.nullValue string null的默认值
incremental.incrementalType string
incremental.lastImportedDate string
toJobConfig.outputDirectory string 输入文地址
toJobConfig.outputFormat string 输入的文件格式
toJobConfig.compression string 压缩方式
toJobConfig.customCompression string 自定义压缩方式
toJobConfig.appendMode boolean 是否启用覆盖
toJobConfig.overrideNullValue string 是否复写null值
toJobConfig.nullValue string null的默认值
Hive 建表语句需要特殊处理
create table ispong_demo_1
(
    age      int,
    username varchar(100),
    id       int
)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','

sqoop2实战 spring集成sqoop2
https://ispong.isxcode.com/hadoop/sqoop/sqoop2实战 spring集成sqoop2/
Author
ispong
Posted on
February 20, 2021
Licensed under