IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink-SQL管理工具flink-streaming-platform-web部署 -> 正文阅读

[大数据]Flink-SQL管理工具flink-streaming-platform-web部署

1.简介

flink-streaming-platform-web系统是基于Apache Flink 封装的一个可视化的、轻量级的flink web客户端系统,用户只需在web 界面进行sql配置就能完成流计算任务。

主要功能:包含任务配置、启/停任务、告警、日志等功能,支持sql语法提示,格式化、sql语句校验。

目的:减少开发、降低成本 完全实现sql化 流计算任务。 😂

1、主要功能

[1] 任务支持单流 、双流、 单流与维表等。
[2] 支持本地模式、yarn-per模式、STANDALONE模式。
[3] 支持catalog、hive。
[4] 支持自定义udf、连接器等,完全兼容官方连接器。
[5] 支持sql的在线开发,语法提示,格式化。
[6] 支持钉钉告警、自定义回调告警、自动拉起任务。
[7] 支持自定义Jar提交任务。
[8] 支持多版本flink版本(需要用户编译对应flink版本)。
[9] 支持自动、手动savepoint备份,并且从savepoint恢复任务。
[10] 支持批任务如:hive。
目前flink版本已经升级到1.12

2、流程说明

这里部署flink-streaming-platform-web,并跑通kafka->flink sql -> mysql的一个样例,实时从kafka中读取json字符串,经过ETL操作,将数据汇总后写入mysql sink

– ETL

INSERT INTO flink_web_sample.sync_test_2
SELECT day_time,SUM(amnount) AS total_gmv
FROM flink_test_3
GROUP BY day_time;

2. 软件版本

2.1 软件版本列表

  • flink-streaming-platform-web.tar.gz

flink-streaming-platform-web(20210202).tar.gz (下载地址)

  • flink

flink-1.12.0-bin-scala_2.11.tgz (下载地址)

  • kafka

这里使用Kafka Connect Datagen来生成模拟数据,具体操作 请参考这里
json配置文件内容如下

//json文件名为 flink_web.source.datagen.json,注意这里的文件名和配置内容中的name参数的值,是保持一致的
{
  "name": "flink_web.source.datagen",
  "config": {
    "connector.class": "com.github.xushiyan.kafka.connect.datagen.performance.DatagenConnector",
    "tasks.max": "1",
    "topic.name": "flink2",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable": false,
    "poll.size": 10,
    "poll.interval.ms": 5000,
    "message.template": "{\"day_time\":\"2013\",\"id\":\"1\",\"amnount\":\"10\"}",
    "random.fields": "day_time:2013|2014|2015, id:1|2|3|4|5|6|7|8|9|10, amnount:10|90|100",
    "event.timestamp.field": "event_ts"
  }
}
  • 依赖的jar包

操作mysql需要的jar包(flink jdbc connector下载地址
flink-connector-jdbc_2.11-1.12.0.jar

mysql-connector-java-8.0.19.jar(根据使用的mysql版本来选择,各版本下载地址

操作kafka需要的jar包(flink kafka connector下载地址

这里只需要下载这一个jar包即可:flink-sql-connector-kafka_2.11-1.12.0.jar

将上述3个jar包,拷贝到flink的lib目录下,如下图所示:
在这里插入图片描述

2.2 添加第3放依赖jar包的方法

第一种方法:下载连接器Jar包后后直接放到 flink/lib/目录下就可以使用了,其缺点是:

1、该方案存在jar冲突可能,特别是连接器多了以后
2、在非yarn模式下每次新增jar需要重启flink集群服务器

第二种方法:配置每一个flink任务时,放到http的服务下填写到三方地址,例如设置内部的http repo下载源(公司内部建议放到内网的某个http服务)

http://ccblog.cn/jars/flink-connector-jdbc_2.11-1.12.0.jar
http://ccblog.cn/jars/flink-sql-connector-kafka_2.11-1.12.0.jar
http://ccblog.cn/jars/flink-streaming-udf.jar
http://ccblog.cn/jars/mysql-connector-java-5.1.25.jar

如下图所示:
在这里插入图片描述

3. 启动

3.1 启动flink

 cd /work/flink/flink-1.12.0/bin
./start-cluster.sh 

3.2 启动flink-streaming-platform-web

cd /work/flink/flink-streaming-platform-web/bin
sh deploy.sh start
//停止命令
sh deploy.sh stop

4. 配置flink-streaming-platform-web

4.1 登录页面

启动后,访问web页面

打开页面查看 http://127.0.0.1:9084/admin/index?message=nologin

登录号:admin 密码 123456。

4.2 系统配置

如下图,在下拉框中选择参数,在变量值文本框中填入对应的键值,点击保存即可。配置好的参数会出现在下面的列表中
在这里插入图片描述

4.3 任务配置

  • 新增配置
    在配置管理下,新增一个配置,即新建一个flink sql任务,并提交保存。
    在这里插入图片描述
  • 建表语句
    需要先在mysql中创建flink_web.sync_test_2表,否则直接提交flink sql任务不会自动创建目标表。这里表sync_test_2 固定创建在flink_web库下。建表语句和flink sql代码如下
-- mysql中创建目标表
use flink_web;
CREATE TABLE `sync_test_2` (
  `day_time` varchar(64) NOT NULL,
  `total_gmv` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`day_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
-- =================================================
 
-- 配置flink sql任务时的sql逻辑
 
--flink sql 
-- sink mysql table
 CREATE TABLE sync_test_2 (
     day_time string,
     total_gmv bigint,
     PRIMARY KEY (day_time) NOT ENFORCED
 ) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://127.0.0.1:3306/flink_web_sample',
  'table-name' = 'sync_test_2',
  'username' = 'root',
  'password' = 'adf_#Hasdfas380',
  'scan.auto-commit' = 'false',
  'sink.buffer-flush.max-rows' = '1'
 );
 
 -- kafka source 
 create table flink_test_3 ( 
  id BIGINT,
  day_time VARCHAR,
  amnount BIGINT,
  proctime AS PROCTIME ()
)
 with ( 
  'connector'='kafka',
  'topic'='flink2',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers'='localhost:9092',
  'format'='json'
 );
 
 -- ETL 
INSERT INTO flink_web_sample.sync_test_2
SELECT day_time,SUM(amnount) AS total_gmv
FROM flink_test_3
GROUP BY day_time;
  • 依次点击 开启配置 -> 提交任务
    在这里插入图片描述

5. 观察运行结果

5.1 观察任务提交结果

点解日志详情,观察已提交任务的运行状况,如下图则表示任务提交成功,并且任务正在成功运行。
在这里插入图片描述
如果提交失败,请排查flink的log/下日志文件,和flink-streaming-platform-web的日志文件
在这里插入图片描述

5.2 观察数据变化

  • 自动生成kafka实时数据
    往kafka中不断写入实时数据,观察mysql中目标表中的结果。写入数据可以使用kafka-connect-datagen自动写入,json文件flink_web.source.datagen.json内容模板是:
{
  "name": "flink_web.source.datagen",
  "config": {
    "connector.class": "com.github.xushiyan.kafka.connect.datagen.performance.DatagenConnector",
    "tasks.max": "1",
    "topic.name": "flink2",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable": false,
    "poll.size": 10,
    "poll.interval.ms": 5000,
    "message.template": "{\"day_time\":\"2013\",\"id\":\"1\",\"amnount\":\"10\"}",
    "random.fields": "day_time:2013|2014|2015, id:1|2|3|4|5|6|7|8|9|10, amnount:10|90|100",
    "event.timestamp.field": "event_ts"
  }
}
  • 执行数据生成命令:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @flink_web.source.datagen.json
  • 观察mysql结果:
    在这里插入图片描述
  • 手动生成kafka实时数据
    上述生成kafka数据时自动生成的,为了便于观察结果表中的汇总结果,我们可以手动往kafka中写入一条数据 {“day_time”: “2013”,“id”: 1,“amnount”:10}
    观察写入之前的结果表:
    在这里插入图片描述
    向kafka写入一条数据,首先进入到kafka的容器中,如下命令所示:
docker exec -it 27d6de189715 bash
cd /usr/bin
./kafka-console-producer --broker-list localhost:9092 --topic flink2

进入到生产消息命令行之后,输入一条记录,如下图所示:
在这里插入图片描述
再次观察mysql中的汇总结果
在这里插入图片描述

5.3 在flink的ui上观察任务

访问flink的UI地址 http://vm01:8081/#/overview
观察正在运行的任务
在这里插入图片描述
查看任务详情
在这里插入图片描述
码字不易,请为我点赞,谢谢!!
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-30 12:48:25  更:2021-07-30 12:50:09 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 6:25:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码