IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink1.15 SQL实现翻滚窗口实时计算 -> 正文阅读

[大数据]Flink1.15 SQL实现翻滚窗口实时计算

摘要:

  1. 随着Flink1.5的发布,FlinkSQL 流批一体更加成熟与完善。
  2. Flink SQL可以替代Flink DataStream实现窗口计算。

1 Flink?翻滚窗口适用场景

1.1?定义

将数据依据固定的窗口度对无界数据流进行切片。

1.2?特点

时间对、窗口长度固定、event无重叠。

1.3?适用场景

BI统计(计算各个时间段的指标)

2 Flink SQL窗口编程模型

Table table = input

?.window([GroupWindow w].as("w")) //?定义一个group window并指定别名w

?.groupBy($("w")) //?按照窗口w对table进行分组

?.select($("b").sum()); // select子句指定返回的列和聚合运算(非键控(key)的window)

Table table = input

?.window([GroupWindow w].as("w")) //?定义一个group window并指定别名w

?.groupBy($("w"), $("a")) //?按照属性a和窗口w对table进行分组(键控的window)

?.select($("a"), $("b").sum()); // select子句指定返回的列和聚合运算

在select子句中,我们还可以返回Window的属性:start,end,rowtime

注意:基于时间的窗口是左闭右开的,例如从9点开始创建一个1个时的窗口,则start为09:00:00.000,end为10:00:00.000,rowtime为09:59:59.999。时间戳正好等于end的event是不会被分组到这个窗口的。

Table table = input

?.window([GroupWindow w].as("w")) //?定义一个group window并指定别名w

?.groupBy($("w"), $("a")) //?按照属性a和窗口w对table进行分组(键控的window)

?.select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count());

?//select?句返回字段a、窗口的开始时间戳、窗口的结束时间戳、窗口的时间戳,b字段的count

注意:我们到底取哪个时间戳是由业务决定的,一般是start。

3 Flink SQL滚动窗口实现

3.1?滚动窗口参数

滚动窗口通过Tumble类来定义,三个方法:

3.2?基于EventTime的滚动窗口实现

package com.bigdata.chap05;

import com.bigdata.entity.TempSensorData;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.Tumble;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import java.time.ZoneId;

import static org.apache.flink.table.api.Expressions.*;

/**

?*?基于事件时间的滚动窗口

?*/

public class FlinkTableTumbleWinBaseEventTime {

????public static void main(String[] args) throws Exception {

????????//1、获取Stream执行环境

????????StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

????????//2、创建表执行环境

????????StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

????????System.out.println(tEnv.getConfig().getLocalTimeZone());

????????env.setParallelism(1);

????????//3、读取数据并提取时间戳指定水印生成策略

????????WatermarkStrategy<TempSensorData> watermarkStrategy = WatermarkStrategy

????????????????.<TempSensorData>forBoundedOutOfOrderness(Duration.ofSeconds(2))

????????????????.withTimestampAssigner(new SerializableTimestampAssigner<TempSensorData>() {

????????????????????@Override

????????????????????public long extractTimestamp(TempSensorData element, long recordTimestamp) {

????????????????????????return element.getTp()*1000;

????????????????????}

????????????????});

????????DataStream<TempSensorData> tempSensorData = env.socketTextStream("hadoop1", 8888)

????????????????.map(event -> {

????????????????????String[] arr = event.split(",");

????????????????????return TempSensorData

????????????????????????????.builder()

????????????????????????????.sensorID(arr[0])

????????????????????????????.tp(Long.parseLong(arr[1]))

????????????????????????????.temp(Integer.parseInt(arr[2]))

????????????????????????????.build();

????????????????}).assignTimestampsAndWatermarks(watermarkStrategy);

????????tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));

????????//4、流转换为动态表

????????Table table = tEnv.fromDataStream(tempSensorData,

????????????????$("sensorID"),

????????????????$("tp"),

????????????????$("temp"),

????????????????$("evTime").rowtime(),//新增evTime字段为rowtime

????????????????$("pt").proctime()

????????);

????????//table.execute().print();

????????//5、自定义窗口并计算

????????Table result = table.window(Tumble

//窗口大小为2s

????????????????.over(lit(2).second())

//按照eventTime排序

????????????????.on($("evTime"))

????????????????.as("w"))

//按照sensorID和窗口分组

????????????????.groupBy($("sensorID"), $("w"))

//统计每个窗口的平均气温

????????????????.select($("sensorID"), $("temp").avg().as("avgTemp"));

????????//6、打印

????????result.execute().print();

????}

}

3.3?测试数据集

在hadoop1节点上面打开nc服务:nc -lk 8888 ?,输入以下数据集测试运行

s-5,1645085900,14

s-5,1645085901,17

s-5,1645085902,22

s-5,1645085903,7

s-5,1645085904,21

s-5,1645085905,23

s-5,1645085906,8

s-5,1645085907,32

s-5,1645085908,15

s-5,1645085909,9

如果能基于EventTime按照时间窗口统计出每个传感器的平均气温,则说明Flink SQL翻滚窗口实现成功。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-09 12:46:25  更:2022-05-09 12:50:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:39:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码