flink sql

Last updated on January 17, 2025 am

🧙 Questions

☄️ Ideas

String sql = "CREATE TABLE from_table(\n" +
   "    d_date STRING,\n" +
   "    d_datekey INT\n" +
   ") WITH (\n" +
   "    'connector'='mysql-cdc',\n" +
   "    'hostname'='localhost',\n" +
   "    'port'='30306',\n" +
   "    'database-name'='demo',\n" +
   "    'table-name'='dates',\n" +
   "    'username'='root',\n" +
   "    'scan.incremental.snapshot.enabled'='false',\n" +
   "    'password'='ispong123');" +
   "CREATE TABLE to_table(\n" +
   "    d_date STRING,\n" +
   "    primary key (d_date) not enforced,\n" +
   "    d_datekey INT\n" +
   ") WITH (\n" +
   "    'connector'='jdbc',\n" +
   "    'url'='jdbc:mysql://localhost:30306/demo',\n" +
   "    'table-name'='dates2',\n" +
   "    'driver'='com.mysql.cj.jdbc.Driver',\n" +
   "    'username'='root',\n" +
   "    'password'='ispong123');" +
   "insert into to_table (d_date,d_datekey )  select d_date, d_datekey from from_table";
特殊逻辑

创建a表

create table table_a
(
    id   varchar(200) null,
    name varchar(200) null
);

创建b表

create table table_b
(
    id   varchar(200) null,
    name varchar(200) null
);

创建c表

create table table_c
(
    id   varchar(200) null,
    name_a varchar(200) null,
    name_b varchar(200) null
);

将a和b join一下写到c

CREATE TABLE t_a(
    id string,
    name string
) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = '192.168.115.104',
     'port' = '30102',
     'username' = 'root',
     'password' = 'Define123..',
     'database-name' = 'ispong_db',
     'table-name' = 'table_a',
     'scan.incremental.snapshot.enabled'='false'
);

create table t_b(
    id string,
    name string
) with (
     'connector' = 'mysql-cdc',
     'hostname' = '192.168.115.104',
     'port' = '30102',
     'username' = 'root',
     'password' = 'Define123..',
     'database-name' = 'ispong_db',
     'table-name' = 'table_b',
     'scan.incremental.snapshot.enabled'='false'
);

CREATE TABLE t_c (
    id string,
    name_a string,
    name_b string,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url'='jdbc:mysql://192.168.115.104:30102/ispong_db',
    'table-name' = 'table_c',
    'driver'='com.mysql.cj.jdbc.Driver',
    'username' = 'root',
    'password' = 'Define123..'
);

insert into t_c
select
    a.id, 
    a.name AS name_a,
    b.name AS name_b
from t_a a left join t_b b
on a.id = b.id;
模版
public static void main(String[] args) {
        String flinkSql = "CREATE TABLE from_table(\n" + "    username STRING,\n" + "    age INT\n" + ") WITH (\n"
                + "    'connector'='jdbc',\n" + "    'url'='jdbc:mysql://localhost:30306/ispong_db',\n"
                + "    'table-name'='users',\n" + "    'driver'='com.mysql.cj.jdbc.Driver',\n"
                + "    'username'='root',\n" + "    'password'='ispong123');" + "CREATE TABLE to_table(\n"
                + "    username STRING,\n" + "    age INT\n" + ") WITH (\n" + "    'connector'='jdbc',\n"
                + "    'url'='jdbc:mysql://localhost:30306/ispong_db',\n" + "    'table-name'='users2',\n"
                + "    'driver'='com.mysql.cj.jdbc.Driver',\n" + "    'username'='root',\n"
                + "    'password'='ispong123');" + "insert into to_table select * from from_table";
        System.out.println(Base64.getEncoder().encodeToString(flinkSql.getBytes()));
}
CREATE TABLE from_table(
  username STRING,
  age INT
) WITH (
  'connector'='jdbc',
  'url'='jdbc:mysql://localhost:30306/ispong_db',
  'table-name'='users',
  'driver'='com.mysql.cj.jdbc.Driver',
  'username'='root',
  'password'='ispong123');

CREATE TABLE to_table(
  username  STRING,
  age INT
) WITH  (
  'connector'='jdbc',
  'url'='jdbc:mysql://localhost:30306/ispong_db',
  'table-name'='users2',
  'driver'='com.mysql.cj.jdbc.Driver',
  'username'='root',
  'password'='ispong123');"

insert  into to_table select  * from from_table;

初始化sql

CREATE TABLE MyTable (
    id INT,
    name STRING
) WITH (
    'connector' = 'datagen',  -- 指定连接器为 datagen
    'rows-per-second' = '10',  -- 每秒生成的行数
    'fields.id.kind' = 'sequence',  -- id 字段的生成方式为递增
    'fields.id.start' = '1',  -- id 的起始值
    'fields.id.end' = '100',  -- id 的最大值
    'fields.name.kind' = 'random',  -- name 字段生成随机字符串
    'fields.name.length' = '10'  -- name 字符串的长度
);

select count(1) from MyTable

flink sql
https://ispong.isxcode.com/hadoop/flink/flink sql/
Author
ispong
Posted on
November 12, 2023
Licensed under