flink 至流云演示

Last updated on April 28, 2025 am

🧙 Questions

演示至流云快速使用

☄️ Ideas

1. 购买服务器

创建抢占式实例
过滤4核心8GB,然后任意选一个

选择Centos 7.9 64位的系统版本

固定带宽 1MB

注意安全组需要开放端口号,8080808180824040

初始化root密码,确认下单

获取抢占服务器的内网ip和外网ip

系统信息如下
  • 演示系统: Centos7.9
  • 资源:4核8GB
  • 外网ip:47.92.223.170
  • 内网ip:172.16.215.83
  • 账号:root
  • 密码:Zhiliuyun123..

2. 进入服务器

ssh root@47.92.223.170
BASH

3. 安装java环境

yum install java-1.8.0-openjdk-devel java-1.8.0-openjdk -y 
java -version
BASH

4. 下载至流云安装包

等待时间会比较久,大约1个GB大小安装包

cd /tmp
nohup wget https://isxcode.oss-cn-shanghai.aliyuncs.com/zhiliuyun/zhiliuyun.tar.gz >> download_zhiliuyun.log 2>&1 &
tail -f download_zhiliuyun.log
BASH

5. 解压安装包

cd /tmp
tar -vzxf zhiliuyun.tar.gz
BASH

6. 启动至流云

cd /tmp/zhiliuyun/bin
bash start.sh
BASH

7. 检测服务是否启动

8. 访问至轻云服务

1M的带宽,首次加载,大约40s

9. 创建用户租户

  • 创建用户zhiliuyun
  • 创建租户体验租户

10. 上传许可证

在官网的最下面,可以免费获取体验许可证

11. 安装集群

退出后台管理,使用zhiliuyun账号登录

StandAlone类型的集群,支持默认安装flink服务

推荐使用内网ip
host:172.16.215.83
用户名: root
密码: Zhiliuyun123..
默认安装Flink: 打开

12. 访问flink服务

13. 添加mysql数据源

如果用户没有可测试的mysql数据源,可以通过docker快速启动测试mysql

docker run \
  --name zhiliuyun-mysql \
  --privileged=true \
  --restart=always \
  -d \
  -p 30306:3306 \
  -e MYSQL_ROOT_PASSWORD=zhiliuyun123 \
  -e MYSQL_DATABASE=test_db \
  mysql:8.0
BASH
  • dbType: Mysql
  • jdbcUrl: jdbc:mysql://172.16.215.83:30306/test_db
  • username: root
  • password: zhiliuyun123

14. 新建作业流,执行默认flinksql

默认sql为查询当前时间

15. 查看结果

由于使用flink的print-connector,至流云无法获取结果,需要去flink页面查询

16. 新建jdbc执行sql

创建两个表,一张原始表一张结果表,并在原始表中插入一条数据

create table users(
    username varchar(100),
    sex int,
    birth datetime
);
create table users_result(
    username varchar(100),
    sex int,
    birth varchar(100)
);
insert into users values('zhangsan',13,now());
SQL

17. 新建jdbc查询sql

通过jdbc查询作业,查看原始表中的数据。

select * from users;
SQL

18. 自定义函数参考

19. 上传资源中心

自定义函数需要先上传编译的jar包

20. 新建自定义函数

  • 名称:to_chinese_date
  • 类名:com.isxcode.acorn.udf.Func
  • 备注:将时间格式转成中文

21. 使用自定义函数

新建flinksql作业,使用flinksql将原始表中的日期格式改成中文格式,并同步到结果表中。
注意:需要额外添加jdbc连接器的依赖

CREATE TABLE from_table(
    username STRING,
    sex INT,
    birth Timestamp
) WITH (
    'connector'='jdbc',
    'url'='jdbc:mysql://172.16.215.83:30306/test_db',
    'driver'='com.mysql.cj.jdbc.Driver',
    'table-name'='users',
    'username'='root',
    'password'='zhiliuyun123');

CREATE TABLE to_table(
    username STRING,
    sex INT,
    birth STRING
) WITH (
    'connector'='jdbc',
    'url'='jdbc:mysql://172.16.215.83:30306/test_db',
    'driver'='com.mysql.cj.jdbc.Driver',
    'table-name'='users_result',
    'username'='root',
    'password'='zhiliuyun123'); 

insert into to_table ( username,sex,birth ) select username,sex,to_chinese_date(birth) from from_table;
SQL

快捷测试

CREATE TABLE print_sink ( 
    print_date string 
) WITH ( 
    'connector' = 'print' 
);

INSERT INTO print_sink SELECT to_chinese_date(now());
SQL

使用jdbc查询作业,查看结果是否正确

select * from users_result;
SQL

22. 自定义作业使用

上传资源中心

新建自定义作业

  • 名称: flink-demo
  • mainClass: org.apache.flink.examples.java.wordcount.WordCount
  • 四个参数

    –input
    /tmp/in.txt
    –output
    /tmp/out.txt

服务器上,先创建需要统计的文件

vim /tmp/in.txt
BASH
zhangsan zhangsan zhangsan
lisi lisi lisi lisi
wangwu wangwu wangwu wangwu wangwu
TEXT

服务器上查看结果,结果符合预期

cat /tmp/out.txt
BASH

产品手册


flink 至流云演示
https://ispong.isxcode.com/hadoop/flink/flink 至流云演示/
Author
ispong
Posted on
October 25, 2024
Licensed under