IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink流处理引擎系统学习(十二) -> 正文阅读

[大数据]Flink流处理引擎系统学习(十二)

前言

这期分享windos的理解,只有这个理解清楚了,才能更好的根据场景选择合适的开窗处理。


一、window的基本概念

1.window是什么

示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。

2.window的分类

在这里插入图片描述
PS:
按key分组了用window构建多个window,未分组用windowAll(API后缀都带All)
区别示例:
在这里插入图片描述

3.window的生命周期

在这里插入图片描述

4.Window Assinger

在这里插入图片描述

5.Window Assinger分类(window小分类)

在这里插入图片描述
翻滚窗口
在这里插入图片描述
翻滚窗口的使用
在这里插入图片描述
滑动窗口
在这里插入图片描述
滑动窗口的使用
在这里插入图片描述
session窗口
在这里插入图片描述
session窗口的使用
在这里插入图片描述
PS:
sessionWindow只能基于时间
global窗口
在这里插入图片描述

6.window盘点

在这里插入图片描述
示例比较,注意体会各个window的区别
在这里插入图片描述

7.预定义的keyed window

在这里插入图片描述
预定义的这些window可以替换.window()

8.预定义的Non-Keyed window

在这里插入图片描述

二、窗口函数

在这里插入图片描述
PS
windowFunction/AllWindowFunction是早期版本一致遗留下来的,现在被ProcessWindowFunction/ProcessAllWindowFunction替换

1.ReduceFunction

在这里插入图片描述

2.AggregateFunction

在这里插入图片描述

3.FoldFunction

在这里插入图片描述
PS
已过时,官方已经不推荐用了

3.WindowFunction/AllWindowFunction

在这里插入图片描述
PS
我目前用的1.14.4已经标记过时了,官方也已经不推荐

4.ProcessWindowFunction/ProcessAllWindowFunction

在这里插入图片描述
这是新一代的窗口函数

4.新一代窗口函数混搭

在这里插入图片描述


三、触发器与驱逐器

1.什么是触发器

在这里插入图片描述

2.触发和清除

在这里插入图片描述

3.默认触发器

在这里插入图片描述

4.内置和自定义触发器

在这里插入图片描述

5.驱逐器的作用

在这里插入图片描述

6.内置驱逐器

在这里插入图片描述


四、延迟处理及窗口计算结果的使用

1.如何允许延迟

在这里插入图片描述

2.延迟数据的获取

在这里插入图片描述

3.晚点元素注意

在这里插入图片描述

4.window result的使用

在这里插入图片描述

5.水位线与窗口的交互

在这里插入图片描述

6.窗口估算注意事项

在这里插入图片描述


五、窗口函数示例

1.AggregateFunction

package spendreport.window;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author zhengwen
 **/
public class TestAggFunctionOnWindow {

  private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
      Tuple3.of("class1", "张三", 100D),
      Tuple3.of("class1", "李四", 78D),
      Tuple3.of("class1", "王五", 99D),
      Tuple3.of("class2", "赵六", 81D),
      Tuple3.of("class2", "钱七", 59D),
      Tuple3.of("class2", "马二", 97D),
  };

  public static void main(String[] args) throws Exception {
    //获取运行环境
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Tuple3<String, String, Double>> input = env.fromElements(ENGLISH_TRANSCRIPT);

    //求各班级英语平均分
    DataStream<Double> avgScore = input.keyBy(
        new KeySelector<Tuple3<String, String, Double>, String>() {
          @Override
          public String getKey(Tuple3<String, String, Double> value)
              throws Exception {
            return value.f0;
          }

        }).countWindow(2).aggregate(new AverageAggregate());

    //打印统计结果
    avgScore.print();

    //执行
    env.execute();

  }


  /**
   * 平均值 sum/count
   */
  private static class AverageAggregate implements
      AggregateFunction<Tuple3<String, String, Double>, Tuple2<Double, Long>, Double> {

    /**
     * 创建累加器来保存中间状态 sum和count
     *
     * @return
     */
    @Override
    public Tuple2<Double, Long> createAccumulator() {
      return new Tuple2<>(0D, 0L);
    }

    /**
     * 将元素添加到累加器并返回新的累加器
     *
     * @param value
     * @param accumulator
     * @return
     */
    @Override
    public Tuple2<Double, Long> add(Tuple3<String, String, Double> value,
        Tuple2<Double, Long> accumulator) {
      //来一个计算一下sum和count保存中间结果到累加器
      return new Tuple2<>(accumulator.f0 + value.f2, accumulator.f1 + 1L);
    }

    /**
     * 从累加器提取结果
     *
     * @param accumulator
     * @return
     */
    @Override
    public Double getResult(Tuple2<Double, Long> accumulator) {
      return accumulator.f0 / accumulator.f1;
    }

    /**
     * 累加器合并
     *
     * @param acc1
     * @param acc2
     * @return
     */
    @Override
    public Tuple2<Double, Long> merge(Tuple2<Double, Long> acc1,
        Tuple2<Double, Long> acc2) {
      return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
    }
  }
}

2.ReduceFunction

package spendreport.window;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author zhengwen
 **/
public class TestReduceFunctionOnWindow {

  private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
      Tuple3.of("class1", "张三", 100),
      Tuple3.of("class1", "李四", 78),
      Tuple3.of("class1", "王五", 99),
      Tuple3.of("class2", "赵六", 81),
      Tuple3.of("class2", "钱七", 59),
      Tuple3.of("class2", "马二", 97),
  };

  public static void main(String[] args) throws Exception {
    //获取运行环境
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Tuple3<String, String, Integer>> input = env.fromElements(ENGLISH_TRANSCRIPT);

    //求各班级英语总分
    //countWindow(2)满2个才计算
    DataStream<Tuple3<String, String, Integer>> totalPoints = input.keyBy(
        new KeySelector<Tuple3<String, String, Integer>, String>() {
          @Override
          public String getKey(Tuple3<String, String, Integer> value)
              throws Exception {
            return value.f0;
          }

        }).countWindow(2).reduce(
        new ReduceFunction<Tuple3<String, String, Integer>>() {
          @Override
          public Tuple3<String, String, Integer> reduce(
              Tuple3<String, String, Integer> v1,
              Tuple3<String, String, Integer> v2) throws Exception {
            return new Tuple3<>(v1.f0, v1.f1, v1.f2 + v2.f2);
          }
        });
    //打印统计结果
    totalPoints.print();

    //执行
    env.execute();

  }

}

3.ProcessWindowFunction

package spendreport.window;


import java.util.Iterator;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

/**
 * @author zhengwen
 **/
public class TestProcessWinFunctionOnWindow {


  private static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[]{
      Tuple3.of("class1", "张三", 100D),
      Tuple3.of("class1", "李四", 78D),
      Tuple3.of("class1", "王五", 99D),
      Tuple3.of("class2", "赵六", 81D),
      Tuple3.of("class2", "钱七", 59D),
      Tuple3.of("class2", "马二", 97D),
  };

  public static void main(String[] args) throws Exception {
    //获取运行环境
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Tuple3<String, String, Double>> input = env.fromElements(ENGLISH_TRANSCRIPT);

    //求各班级英语平均分
    DataStream<Double> avgScore = input.keyBy(
        new KeySelector<Tuple3<String, String, Double>, String>() {
          @Override
          public String getKey(Tuple3<String, String, Double> value)
              throws Exception {
            return value.f0;
          }
        }).countWindow(2).process(new MyProcessWindowFunction());

    //打印统计结果
    avgScore.print();

    //执行
    env.execute();

  }

  public static class MyProcessWindowFunction extends
      ProcessWindowFunction<Tuple3<String, String, Double>, Double, String, GlobalWindow> {

    @Override
    public void process(String tuple,
        Context context,
        Iterable<Tuple3<String, String, Double>> iterable, Collector<Double> collector)
        throws Exception {
      //拿到所有数据,最后才计算
      Double sum = 0D;
      Long count = 0L;
      Iterator<Tuple3<String, String, Double>> it = iterable.iterator();
      while (it.hasNext()) {
        Tuple3<String, String, Double> tp = it.next();
        sum += tp.f2;
        count++;
      }
      Double outScore = sum / count;
      collector.collect(outScore);
    }


  }
}

PS
这里例子MyProcessWindowFunction 的第3个参数KEY要与keyBy()的KeySelector返回return的一致。

总结


窗口函数多种写法,特别灵活,在业务场景中使用,先别慌写,理清楚。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-12 16:31:26  更:2022-05-12 16:32:18 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 5:59:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码