| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Flink1.15 SQL实现翻滚窗口实时计算 -> 正文阅读 |
|
[大数据]Flink1.15 SQL实现翻滚窗口实时计算 |
1 Flink?翻滚窗口适用场景1.1?定义将数据依据固定的窗口度对无界数据流进行切片。 1.2?特点时间对、窗口长度固定、event无重叠。 1.3?适用场景BI统计(计算各个时间段的指标) 2 Flink SQL窗口编程模型Table table = input ?.window([GroupWindow w].as("w")) //?定义一个group window并指定别名w ?.groupBy($("w")) //?按照窗口w对table进行分组 ?.select($("b").sum()); // select子句指定返回的列和聚合运算(非键控(key)的window) Table table = input ?.window([GroupWindow w].as("w")) //?定义一个group window并指定别名w ?.groupBy($("w"), $("a")) //?按照属性a和窗口w对table进行分组(键控的window) ?.select($("a"), $("b").sum()); // select子句指定返回的列和聚合运算 在select子句中,我们还可以返回Window的属性:start,end,rowtime
Table table = input ?.window([GroupWindow w].as("w")) //?定义一个group window并指定别名w ?.groupBy($("w"), $("a")) //?按照属性a和窗口w对table进行分组(键控的window) ?.select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count()); ?//select?句返回字段a、窗口的开始时间戳、窗口的结束时间戳、窗口的时间戳,b字段的count
3 Flink SQL滚动窗口实现3.1?滚动窗口参数滚动窗口通过Tumble类来定义,三个方法: 3.2?基于EventTime的滚动窗口实现package com.bigdata.chap05; import com.bigdata.entity.TempSensorData; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Tumble; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.time.Duration; import java.time.ZoneId; import static org.apache.flink.table.api.Expressions.*; /** ?*?基于事件时间的滚动窗口 ?*/ public class FlinkTableTumbleWinBaseEventTime { ????public static void main(String[] args) throws Exception { ????????//1、获取Stream执行环境 ????????StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ????????//2、创建表执行环境 ????????StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); ????????System.out.println(tEnv.getConfig().getLocalTimeZone()); ????????env.setParallelism(1); ????????//3、读取数据并提取时间戳指定水印生成策略 ????????WatermarkStrategy<TempSensorData> watermarkStrategy = WatermarkStrategy ????????????????.<TempSensorData>forBoundedOutOfOrderness(Duration.ofSeconds(2)) ????????????????.withTimestampAssigner(new SerializableTimestampAssigner<TempSensorData>() { ????????????????????@Override ????????????????????public long extractTimestamp(TempSensorData element, long recordTimestamp) { ????????????????????????return element.getTp()*1000; ????????????????????} ????????????????}); ????????DataStream<TempSensorData> tempSensorData = env.socketTextStream("hadoop1", 8888) ????????????????.map(event -> { ????????????????????String[] arr = event.split(","); ????????????????????return TempSensorData ????????????????????????????.builder() ????????????????????????????.sensorID(arr[0]) ????????????????????????????.tp(Long.parseLong(arr[1])) ????????????????????????????.temp(Integer.parseInt(arr[2])) ????????????????????????????.build(); ????????????????}).assignTimestampsAndWatermarks(watermarkStrategy); ????????tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); ????????//4、流转换为动态表 ????????Table table = tEnv.fromDataStream(tempSensorData, ????????????????$("sensorID"), ????????????????$("tp"), ????????????????$("temp"), ????????????????$("evTime").rowtime(),//新增evTime字段为rowtime ????????????????$("pt").proctime() ????????); ????????//table.execute().print(); ????????//5、自定义窗口并计算 ????????Table result = table.window(Tumble //窗口大小为2s ????????????????.over(lit(2).second()) //按照eventTime排序 ????????????????.on($("evTime")) ????????????????.as("w")) //按照sensorID和窗口分组 ????????????????.groupBy($("sensorID"), $("w")) //统计每个窗口的平均气温 ????????????????.select($("sensorID"), $("temp").avg().as("avgTemp")); ????????//6、打印 ????????result.execute().print(); ????} } 3.3?测试数据集在hadoop1节点上面打开nc服务:nc -lk 8888 ?,输入以下数据集测试运行 s-5,1645085900,14 s-5,1645085901,17 s-5,1645085902,22 s-5,1645085903,7 s-5,1645085904,21 s-5,1645085905,23 s-5,1645085906,8 s-5,1645085907,32 s-5,1645085908,15 s-5,1645085909,9 如果能基于EventTime按照时间窗口统计出每个传感器的平均气温,则说明Flink SQL翻滚窗口实现成功。 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/16 6:39:06- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |