flink 至流云演示
Last updated on January 17, 2025 am
🧙 Questions
演示至流云快速使用
☄️ Ideas
1. 购买服务器
创建抢占式实例
过滤4核心8GB,然后任意选一个
选择
Centos 7.9 64位
的系统版本
固定带宽 1MB
注意安全组需要开放端口号,
8080
、8081
、8082
、4040
初始化root密码,确认下单
获取抢占服务器的内网ip和外网ip
系统信息如下
- 演示系统: Centos7.9
- 资源:4核8GB
- 外网ip:47.92.223.170
- 内网ip:172.16.215.83
- 账号:root
- 密码:Zhiliuyun123..
2. 进入服务器
ssh root@47.92.223.170
3. 安装java环境
yum install java-1.8.0-openjdk-devel java-1.8.0-openjdk -y
java -version
4. 下载至流云安装包
等待时间会比较久,大约1个GB大小安装包
cd /tmp
nohup wget https://isxcode.oss-cn-shanghai.aliyuncs.com/zhiliuyun/zhiliuyun.tar.gz >> download_zhiliuyun.log 2>&1 &
tail -f download_zhiliuyun.log
5. 解压安装包
cd /tmp
tar -vzxf zhiliuyun.tar.gz
6. 启动至流云
cd /tmp/zhiliuyun/bin
bash start.sh
7. 检测服务是否启动
8. 访问至轻云服务
1M的带宽,首次加载,大约40s
- 访问接口:http://47.92.223.170:8080
- 管理员账号:
admin
- 管理员密码:
admin123
9. 创建用户租户
- 创建用户
zhiliuyun
- 创建租户
体验租户
10. 上传许可证
在官网的最下面,可以免费获取体验许可证
11. 安装集群
退出后台管理,使用
zhiliuyun
账号登录
StandAlone类型的集群,支持默认安装flink服务
推荐使用内网ip
host:172.16.215.83
用户名: root
密码: Zhiliuyun123..
默认安装Flink: 打开
12. 访问flink服务
- flink ui: http://47.92.223.170:8081
13. 添加mysql数据源
如果用户没有可测试的mysql数据源,可以通过docker快速启动测试mysql
docker run \
--name zhiliuyun-mysql \
--privileged=true \
--restart=always \
-d \
-p 30306:3306 \
-e MYSQL_ROOT_PASSWORD=zhiliuyun123 \
-e MYSQL_DATABASE=test_db \
mysql:8.0
- dbType:
Mysql
- jdbcUrl:
jdbc:mysql://172.16.215.83:30306/test_db
- username:
root
- password:
zhiliuyun123
14. 新建作业流,执行默认flinksql
默认sql为查询当前时间
15. 查看结果
由于使用flink的print-connector,至流云无法获取结果,需要去flink页面查询
16. 新建jdbc执行sql
创建两个表,一张原始表一张结果表,并在原始表中插入一条数据
create table users(
username varchar(100),
sex int,
birth datetime
);
create table users_result(
username varchar(100),
sex int,
birth varchar(100)
);
insert into users values('zhangsan',13,now());
17. 新建jdbc查询sql
通过jdbc查询作业,查看原始表中的数据。
select * from users;
18. 自定义函数参考
19. 上传资源中心
自定义函数需要先上传编译的jar包
- 演示函数下载:https://openfly.oss-cn-shanghai.aliyuncs.com/flink-custom-func.jar
- 演示依赖下载:https://repo1.maven.org/maven2/cn/hutool/hutool-all/5.8.27/hutool-all-5.8.27.jar
- 演示jdbc依赖下载:https://openfly.oss-cn-shanghai.aliyuncs.com/flink-connector-jdbc-3.1.2-1.18.jar
20. 新建自定义函数
- 名称:to_chinese_date
- 类名:com.isxcode.acorn.udf.Func
- 备注:将时间格式转成中文
21. 使用自定义函数
新建flinksql作业,使用flinksql将原始表中的日期格式改成中文格式,并同步到结果表中。
注意:需要额外添加jdbc连接器的依赖
CREATE TABLE from_table(
username STRING,
sex INT,
birth Timestamp
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://172.16.215.83:30306/test_db',
'driver'='com.mysql.cj.jdbc.Driver',
'table-name'='users',
'username'='root',
'password'='zhiliuyun123');
CREATE TABLE to_table(
username STRING,
sex INT,
birth STRING
) WITH (
'connector'='jdbc',
'url'='jdbc:mysql://172.16.215.83:30306/test_db',
'driver'='com.mysql.cj.jdbc.Driver',
'table-name'='users_result',
'username'='root',
'password'='zhiliuyun123');
insert into to_table ( username,sex,birth ) select username,sex,to_chinese_date(birth) from from_table;
快捷测试
CREATE TABLE print_sink (
print_date string
) WITH (
'connector' = 'print'
);
INSERT INTO print_sink SELECT to_chinese_date(now());
使用jdbc查询作业,查看结果是否正确
select * from users_result;
22. 自定义作业使用
- 自定义作业模版地址:https://github.com/isxcode/flink-job-template
- 官网单词统计作业下载:https://openfly.oss-cn-shanghai.aliyuncs.com/WordCount.jar
上传资源中心
新建自定义作业
- 名称: flink-demo
- mainClass: org.apache.flink.examples.java.wordcount.WordCount
- 四个参数
–input
/tmp/in.txt
–output
/tmp/out.txt
服务器上,先创建需要统计的文件
vim /tmp/in.txt
zhangsan zhangsan zhangsan
lisi lisi lisi lisi
wangwu wangwu wangwu wangwu wangwu
服务器上查看结果,结果符合预期
cat /tmp/out.txt