doris kafka同步到doris

Last updated on November 22, 2024 pm

🧙 Questions

将kafka中的数据导入doris中

☄️ Ideas

进入doris
docker exec -it ispong-mysql bash
mysql -u root -h 192.168.66.66 -pispong2021 -P 30131
创建数据库
CREATE DATABASE ispong;
SHOW DATABASES;
DROP DATABASE ispong;
创建表
-- 聚合表
CREATE TABLE test.site_visit
(
siteid      INT,
username VARCHAR (32),
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

-- 插入数据
insert into ispong.site_visit (siteid,username,pv) values (1,'site1',2);
select * from ispong.site_visit;

-- 主键表
CREATE TABLE ispong.ispong_table
(
username    VARCHAR (32) ,
age BIGINT
)
UNIQUE KEY (username)
DISTRIBUTED BY HASH(username) BUCKETS 10
PROPERTIES("replication_num" = "1");

-- 插入数据
insert into ispong.ispong_table (username,age) values ('ispong',3);
select * from ispong.sales_order;

-- 重复保存
CREATE TABLE ispong.session_data
(
visitorid SMALLINT,
sessionid   BIGINT,
city  VARCHAR (32),
province    VARCHAR (32)
)
DUPLICATE KEY (visitorid, sessionid)
DISTRIBUTED BY HASH(visitorid, sessionid) BUCKETS 10
PROPERTIES("replication_num" = "1");

-- 插入数据
insert into ispong.session_data (visitorid,sessionid,city,province) values (1,3,'jiangsu','suzhou');
select * from ispong.session_data;
添加kakfa load
{"username":"zhangsan","age":14}
USE ispong;

-- 增加  修改 删除
CREATE ROUTINE LOAD ispong_kafka_job on ispong_table
COLUMNS(username,age)
PROPERTIES
(
    "desired_concurrent_number"="1",
    "max_batch_interval" = "10",
    "max_batch_rows" = "200000",
    "max_batch_size" = "104857600",
    "strict_mode" = "false",
    "format" = "json"
)
FROM KAFKA
(
    "kafka_broker_list" = "isxcode:30120",
    "kafka_topic" = "ispong_8",
    "kafka_partitions" = "0",
    "kafka_offsets" = "0"
);

-- 
use ispong;
CREATE ROUTINE LOAD ispong_kafka_job on ispong_table
PROPERTIES
(
    "desired_concurrent_number"="1",
    "max_batch_interval" = "10",
    "max_batch_rows" = "200000",
    "max_batch_size" = "104857600",
    "strict_mode" = "false",
    "format" = "json"
)
FROM KAFKA
(
    "kafka_broker_list" = "172.23.39.206:30120",
    "kafka_topic" = "ispong_8",
    "kafka_partitions" = "0",
    "kafka_offsets" = "0"
);


-- 删除job

USE ispong;
-- 设置表可删除
alter table ispong_table enable feature "BATCH_DELETE";

CREATE ROUTINE LOAD ispong_kafka_delete_job ON ispong_table
WITH DELETE
COLUMNS(username,age)
PROPERTIES
(
    "desired_concurrent_number"="1",
    "max_batch_interval" = "10",
    "max_batch_rows" = "200000",
    "max_batch_size" = "104857600",
    "strict_mode" = "false",
    "format" = "json"
)
FROM KAFKA
(
    "kafka_broker_list" = "192.168.66.66:30120",
    "kafka_topic" = "ispong_kafka_delete_job",
    "kafka_partitions" = "0",
    "kafka_offsets" = "0"
);

SHOW ROUTINE LOAD;
SHOW ROUTINE LOAD TASK WHERE JobName = "ispong_kafka_job";
SHOW ROUTINE LOAD FOR ispong_kafka_job;
PAUSE ROUTINE LOAD FOR ispong_kafka_job; 
-- 修复 routine
RESUME ROUTINE LOAD FOR ispong_kafka_job;
-- 删除 routine
STOP ROUTINE LOAD FOR ispong_kafka_job;

SET show_hidden_columns=true

doris kafka同步到doris
https://ispong.isxcode.com/db/doris/doris kafka同步到doris/
Author
ispong
Posted on
December 15, 2021
Licensed under