IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink 学习(八)flink滑动窗口 -> 正文阅读

[大数据]flink 学习(八)flink滑动窗口


前言

滑动窗口适用场景:比如行程卡上统计最近14天内途径的城市,每次统计数据中会有上一个窗口最后13天的行程数据和最新1天的数据。

1.Sliding-ProcessingTime-Window

基于数据处理时间的滑动窗口

(1)数据源

每秒生成一条数据

public class IntegerSource implements SourceFunction<Integer> {
    int i = 0;

    @Override
    public void run(SourceContext ctx) throws Exception {
        while (true) {
            ctx.collect(i++);
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {

    }
}

(2)示例

	@Test
    public void slidingProcessingTimeWindowsTest() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        DataStreamSource<Integer> source = env.addSource(new IntegerSource());
        //基于ProcessingTime的滑动窗口,窗口长度时3s,每次滑动2s,即统计最近3s内的数据
        source.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(3), Time.seconds(1)))
                .process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
                        Iterator<Integer> it = elements.iterator();
                        int sum = 0;
                        while (it.hasNext()) {
                            Integer next = it.next();
                            sum += next;
                            System.out.println("元素: " + next + " ,处理时间:" + new Date());
                        }
                        out.collect(sum);
                    }
                })
                .print();
        env.execute("SlidingProcessingTimeWindows");
    }

结果:每个窗口的数据包含上一个窗口最后2s的数据和新的1s的数据

元素: 0 ,处理时间:Sat Apr 16 23:45:19 CST 2022
7> 0
元素: 0 ,处理时间:Sat Apr 16 23:45:20 CST 2022
元素: 1 ,处理时间:Sat Apr 16 23:45:20 CST 2022
8> 1
元素: 0 ,处理时间:Sat Apr 16 23:45:21 CST 2022
元素: 1 ,处理时间:Sat Apr 16 23:45:21 CST 2022
元素: 2 ,处理时间:Sat Apr 16 23:45:21 CST 2022
1> 3
元素: 1 ,处理时间:Sat Apr 16 23:45:22 CST 2022
元素: 2 ,处理时间:Sat Apr 16 23:45:22 CST 2022
元素: 3 ,处理时间:Sat Apr 16 23:45:22 CST 2022
2> 6
元素: 2 ,处理时间:Sat Apr 16 23:45:23 CST 2022
元素: 3 ,处理时间:Sat Apr 16 23:45:23 CST 2022
元素: 4 ,处理时间:Sat Apr 16 23:45:23 CST 2022
3> 9
元素: 3 ,处理时间:Sat Apr 16 23:45:24 CST 2022
元素: 4 ,处理时间:Sat Apr 16 23:45:24 CST 2022
元素: 5 ,处理时间:Sat Apr 16 23:45:24 CST 2022
4> 12
元素: 4 ,处理时间:Sat Apr 16 23:45:25 CST 2022
元素: 5 ,处理时间:Sat Apr 16 23:45:25 CST 2022
元素: 6 ,处理时间:Sat Apr 16 23:45:25 CST 2022
5> 15

2.Sliding-ProcessingTime-Window-Offset

带偏移量的基于基于ProcessingTime的滑动窗口

示例

@Test
    public void slidingProcessingTimeWindowsWithOffsetTest() throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        DataStreamSource<Integer> source = env.addSource(new IntegerSource());
        //基于ProcessingTime的滑动窗口,窗口长度时4s,每次滑动2s,偏移量是1s
        source.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(2), Time.seconds(1)))
                .process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
                        Iterator<Integer> it = elements.iterator();
                        int sum = 0;
                        while (it.hasNext()) {
                            Integer next = it.next();
                            sum += next;
                            System.out.println("元素: " + next + " ,处理时间:" + format.format(new Date()));
                        }
                        out.collect(sum);
                    }
                })
                .print();
        env.execute("slidingProcessingTimeWindowsWithOffset");
    }

结果:每次统计上一个窗口最后2s的数据和最新2s的数据,并且整体时间向后偏移了1s

元素: 0 ,处理时间:2022-04-16 23:58:37
元素: 1 ,处理时间:2022-04-16 23:58:37
2> 1
元素: 0 ,处理时间:2022-04-16 23:58:39
元素: 1 ,处理时间:2022-04-16 23:58:39
元素: 2 ,处理时间:2022-04-16 23:58:39
元素: 3 ,处理时间:2022-04-16 23:58:39
3> 6
元素: 2 ,处理时间:2022-04-16 23:58:41
元素: 3 ,处理时间:2022-04-16 23:58:41
元素: 4 ,处理时间:2022-04-16 23:58:41
元素: 5 ,处理时间:2022-04-16 23:58:41
4> 14
元素: 4 ,处理时间:2022-04-16 23:58:43
元素: 5 ,处理时间:2022-04-16 23:58:43
元素: 6 ,处理时间:2022-04-16 23:58:43
元素: 7 ,处理时间:2022-04-16 23:58:43
5> 22

3.Sliding-Count-Window

根据数据的数量分隔窗口

示例

@Test
    public void slidingCountWindowsTest() throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        DataStreamSource<Integer> source = env.addSource(new IntegerSource());
        //每个窗口5条数据,滑动2个数据
        source.countWindowAll(5, 2)
                .process(new ProcessAllWindowFunction<Integer, Integer, GlobalWindow>() {
                    @Override
                    public void process(Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
                        Iterator<Integer> it = elements.iterator();
                        int sum = 0;
                        while (it.hasNext()) {
                            Integer next = it.next();
                            sum += next;
                            System.out.println("元素: " + next + " ,处理时间:" + format.format(new Date()));
                        }
                        out.collect(sum);
                    }
                })
                .print();
        env.execute("slidingCountWindows");
    }

结果:

元素: 0 ,处理时间:2022-04-17 00:08:12
元素: 1 ,处理时间:2022-04-17 00:08:12
4> 1
元素: 0 ,处理时间:2022-04-17 00:08:14
元素: 1 ,处理时间:2022-04-17 00:08:14
元素: 2 ,处理时间:2022-04-17 00:08:14
元素: 3 ,处理时间:2022-04-17 00:08:14
5> 6
元素: 1 ,处理时间:2022-04-17 00:08:16
元素: 2 ,处理时间:2022-04-17 00:08:16
元素: 3 ,处理时间:2022-04-17 00:08:16
元素: 4 ,处理时间:2022-04-17 00:08:16
元素: 5 ,处理时间:2022-04-17 00:08:16
6> 15
元素: 3 ,处理时间:2022-04-17 00:08:18
元素: 4 ,处理时间:2022-04-17 00:08:18
元素: 5 ,处理时间:2022-04-17 00:08:18
元素: 6 ,处理时间:2022-04-17 00:08:18
元素: 7 ,处理时间:2022-04-17 00:08:18
7> 25
元素: 5 ,处理时间:2022-04-17 00:08:20
元素: 6 ,处理时间:2022-04-17 00:08:20
元素: 7 ,处理时间:2022-04-17 00:08:20
元素: 8 ,处理时间:2022-04-17 00:08:20
元素: 9 ,处理时间:2022-04-17 00:08:20
8> 35

4.Sliding-EventTime-Window

基于事件时间(EventTime)的滑动窗口

(1)数据源

public class EventElementSource implements SourceFunction<EventElement> {
    @Override
    public void run(SourceContext ctx) throws Exception {
        int id = 0;
        Random random = new Random();
        while (true) {
            long time = new Date().getTime() + random.nextInt(5000);
            Date date = new Date(time);
            EventElement eventElement = new EventElement(id++, time, date);
            System.out.println("生成时间:"+eventElement);
            ctx.collect(eventElement);
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {

    }
}

(2)示例

    @Test
    public void slidingEventTimeWindowsTest() throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
                .setParallelism(1);

        //添加数据源
        env.addSource(new EventElementSource())
                //设置时间戳和水印,使用数据里的时间
                .assignTimestampsAndWatermarks(WatermarkStrategy.<EventElement>forMonotonousTimestamps().withTimestampAssigner((eventTElement, re) -> eventTElement.getTime()))
                //基于事件时间的滑动窗口
                .windowAll(SlidingEventTimeWindows.of(Time.seconds(3),Time.seconds(2)))
                .process(new ProcessAllWindowFunction<EventElement, Integer, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<EventElement> elements, Collector<Integer> out) throws Exception {
                        Iterator<EventElement> it = elements.iterator();
                        int id = 0;
                        while (it.hasNext()) {
                            EventElement next = it.next();
                            id = next.getId();
                            System.out.println("元素: " + next + " ,处理时间:" + format.format(new Date()));
                        }
                        out.collect(id);
                    }
                })
                .print();
        env.execute("slidingEventTimeWindows");
    }

结果:

生成时间:EventElement(id=0, time=1650125744834, date=Sun Apr 17 00:15:44 CST 2022)
生成时间:EventElement(id=1, time=1650125742511, date=Sun Apr 17 00:15:42 CST 2022)
生成时间:EventElement(id=2, time=1650125745653, date=Sun Apr 17 00:15:45 CST 2022)
元素: EventElement(id=0, time=1650125744834, date=Sun Apr 17 00:15:44 CST 2022) ,处理时间:2022-04-17 00:15:42
元素: EventElement(id=1, time=1650125742511, date=Sun Apr 17 00:15:42 CST 2022) ,处理时间:2022-04-17 00:15:42
1
生成时间:EventElement(id=3, time=1650125744252, date=Sun Apr 17 00:15:44 CST 2022)
生成时间:EventElement(id=4, time=1650125748848, date=Sun Apr 17 00:15:48 CST 2022)
元素: EventElement(id=0, time=1650125744834, date=Sun Apr 17 00:15:44 CST 2022) ,处理时间:2022-04-17 00:15:44
元素: EventElement(id=2, time=1650125745653, date=Sun Apr 17 00:15:45 CST 2022) ,处理时间:2022-04-17 00:15:44
元素: EventElement(id=3, time=1650125744252, date=Sun Apr 17 00:15:44 CST 2022) ,处理时间:2022-04-17 00:15:44
3
生成时间:EventElement(id=5, time=1650125745767, date=Sun Apr 17 00:15:45 CST 2022)
生成时间:EventElement(id=6, time=1650125749420, date=Sun Apr 17 00:15:49 CST 2022)
元素: EventElement(id=4, time=1650125748848, date=Sun Apr 17 00:15:48 CST 2022) ,处理时间:2022-04-17 00:15:46
4
生成时间:EventElement(id=7, time=1650125751065, date=Sun Apr 17 00:15:51 CST 2022)
元素: EventElement(id=4, time=1650125748848, date=Sun Apr 17 00:15:48 CST 2022) ,处理时间:2022-04-17 00:15:47
元素: EventElement(id=6, time=1650125749420, date=Sun Apr 17 00:15:49 CST 2022) ,处理时间:2022-04-17 00:15:47
6
生成时间:EventElement(id=8, time=1650125753373, date=Sun Apr 17 00:15:53 CST 2022)
元素: EventElement(id=7, time=1650125751065, date=Sun Apr 17 00:15:51 CST 2022) ,处理时间:2022-04-17 00:15:48
7
生成时间:EventElement(id=9, time=1650125752809, date=Sun Apr 17 00:15:52 CST 2022)
生成时间:EventElement(id=10, time=1650125751337, date=Sun Apr 17 00:15:51 CST 2022)
生成时间:EventElement(id=11, time=1650125756070, date=Sun Apr 17 00:15:56 CST 2022)
元素: EventElement(id=8, time=1650125753373, date=Sun Apr 17 00:15:53 CST 2022) ,处理时间:2022-04-17 00:15:51
元素: EventElement(id=9, time=1650125752809, date=Sun Apr 17 00:15:52 CST 2022) ,处理时间:2022-04-17 00:15:51
9
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-18 17:49:30  更:2022-04-18 17:51:32 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 12:47:53-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码