flink sql
Last updated on January 17, 2025 am
🧙 Questions
☄️ Ideas
String sql = "CREATE TABLE from_table(\n" +
" d_date STRING,\n" +
" d_datekey INT\n" +
") WITH (\n" +
" 'connector'='mysql-cdc',\n" +
" 'hostname'='localhost',\n" +
" 'port'='30306',\n" +
" 'database-name'='demo',\n" +
" 'table-name'='dates',\n" +
" 'username'='root',\n" +
" 'scan.incremental.snapshot.enabled'='false',\n" +
" 'password'='ispong123');" +
"CREATE TABLE to_table(\n" +
" d_date STRING,\n" +
" primary key (d_date) not enforced,\n" +
" d_datekey INT\n" +
") WITH (\n" +
" 'connector'='jdbc',\n" +
" 'url'='jdbc:mysql://localhost:30306/demo',\n" +
" 'table-name'='dates2',\n" +
" 'driver'='com.mysql.cj.jdbc.Driver',\n" +
" 'username'='root',\n" +
" 'password'='ispong123');" +
"insert into to_table (d_date,d_datekey ) select d_date, d_datekey from from_table";
特殊逻辑
创建a表
create table table_a
(
id varchar(200) null,
name varchar(200) null
);
创建b表
create table table_b
(
id varchar(200) null,
name varchar(200) null
);
创建c表
create table table_c
(
id varchar(200) null,
name_a varchar(200) null,
name_b varchar(200) null
);
将a和b join一下写到c
CREATE TABLE t_a(
id string,
name string
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.115.104',
'port' = '30102',
'username' = 'root',
'password' = 'Define123..',
'database-name' = 'ispong_db',
'table-name' = 'table_a',
'scan.incremental.snapshot.enabled'='false'
);
create table t_b(
id string,
name string
) with (
'connector' = 'mysql-cdc',
'hostname' = '192.168.115.104',
'port' = '30102',
'username' = 'root',
'password' = 'Define123..',
'database-name' = 'ispong_db',
'table-name' = 'table_b',
'scan.incremental.snapshot.enabled'='false'
);
CREATE TABLE t_c (
id string,
name_a string,
name_b string,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url'='jdbc:mysql://192.168.115.104:30102/ispong_db',
'table-name' = 'table_c',
'driver'='com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = 'Define123..'
);
insert into t_c
select
a.id,
a.name AS name_a,
b.name AS name_b
from t_a a left join t_b b
on a.id = b.id;
模版
public static void main(String[] args) {
String flinkSql = "CREATE TABLE from_table(\n" + " username STRING,\n" + " age INT\n" + ") WITH (\n"
+ " 'connector'='jdbc',\n" + " 'url'='jdbc:mysql://localhost:30306/ispong_db',\n"
+ " 'table-name'='users',\n" + " 'driver'='com.mysql.cj.jdbc.Driver',\n"
+ " 'username'='root',\n" + " 'password'='ispong123');" + "CREATE TABLE to_table(\n"
+ " username STRING,\n" + " age INT\n" + ") WITH (\n" + " 'connector'='jdbc',\n"
+ " 'url'='jdbc:mysql://localhost:30306/ispong_db',\n" + " 'table-name'='users2',\n"
+ " 'driver'='com.mysql.cj.jdbc.Driver',\n" + " 'username'='root',\n"
+ " 'password'='ispong123');" + "insert into to_table select * from from_table";
System.out.println(Base64.getEncoder().encodeToString(flinkSql.getBytes()));
}
CREATE TABLE from_table(
username STRING,
age INT
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://localhost:30306/ispong_db',
'table-name'='users',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='root',
'password'='ispong123');
CREATE TABLE to_table(
username STRING,
age INT
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://localhost:30306/ispong_db',
'table-name'='users2',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='root',
'password'='ispong123');"
insert into to_table select * from from_table;
初始化sql
CREATE TABLE MyTable (
id INT,
name STRING
) WITH (
'connector' = 'datagen', -- 指定连接器为 datagen
'rows-per-second' = '10', -- 每秒生成的行数
'fields.id.kind' = 'sequence', -- id 字段的生成方式为递增
'fields.id.start' = '1', -- id 的起始值
'fields.id.end' = '100', -- id 的最大值
'fields.name.kind' = 'random', -- name 字段生成随机字符串
'fields.name.length' = '10' -- name 字符串的长度
);
select count(1) from MyTable
🔗 Links
flink sql
https://ispong.isxcode.com/hadoop/flink/flink sql/