IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink中窗口函数(二)函数合用 -> 正文阅读

[大数据]flink中窗口函数(二)函数合用

(4) ProcessWindowFunction with Incremental Aggregation(与增量聚合结合)

  1. 可将ProcessWindowFunction与增量聚合函数ReduceFunctionAggregateFunction结合。

  2. 元素到达窗口时增量聚合,当窗口关闭时对增量聚合的结果用ProcessWindowFunction再进行全量聚合。

  3. 既能够增量聚合,也能够访问窗口的元数据信息(如开始结束时间、状态等)。

    DataStream<SensorReading> input = ...;
    
    input
      .keyBy(<key selector>)
      .window(<window assigner>)
      .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
    
    // Function definitions
    
    private static class MyReduceFunction implements ReduceFunction<SensorReading> {
    
      public SensorReading reduce(SensorReading r1, SensorReading r2) {
          return r1.value() > r2.value() ? r2 : r1;
      }
    }
    
    private static class MyProcessWindowFunction
        extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
    
      public void process(String key,
                        Context context,
                        Iterable<SensorReading> minReadings,
                        Collector<Tuple2<Long, SensorReading>> out) {
          SensorReading min = minReadings.iterator().next();
          out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
      }
    }
    
    // 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
    // {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
    
    // API: 如上ReduceFunction与ProcessWindowFunction
    
    // 示例: 获取一段时间内(Window Size)每一个用户(KeyBy)浏览的商品的最大价值的那条记录(ReduceFunction),并得到Key和Window信息。
    kafkaStream
        // 将从Kafka获取的JSON数据解析成Java Bean
        .process(new KafkaProcessFunction())
        // 提取时间戳生成水印
        .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
        // 按用户分组
        .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
        // 构造TimeWindow
        .timeWindow(Time.seconds(windowLengthSeconds))
        // 窗口函数: 获取这段窗口时间内每一个用户浏览的商品的最大价值对应的那条记录
        .reduce(
                new ReduceFunction<UserActionLog>() {
                    @Override
                    public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception {
                        return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2;
                    }
                },
                new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() {
                    @Override
                    public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception {
                        
                        UserActionLog max = elements.iterator().next();
        
                        String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
                        String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
        
                        String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的最大价值对应的那条记录: "+max;
                        out.collect(record);
        
                    }
                }
        )
        .print();
        
    // 结果
    Key: user_2 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_2', eventTime='2019-11-09 13:54:10', eventType='browse', productID='product_3', productPrice=30}
    Key: user_4 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_4', eventTime='2019-11-09 13:54:15', eventType='browse', productID='product_3', productPrice=30}
    Key: user_3 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_3', eventTime='2019-11-09 13:54:12', eventType='browse', productID='product_2', productPrice=20}
    Key: user_5 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_5', eventTime='2019-11-09 13:54:17', eventType='browse', productID='product_2', productPrice=20}
    

    (5)ProcessWindowFunction与AggregateFunction结合

    DataStream<Tuple2<String, Long>> input = ...;
    
    input
      .keyBy(<key selector>)
      .window(<window assigner>)
      .aggregate(new AverageAggregate(), new MyProcessWindowFunction());
    
    // Function definitions
    
    /**
     * The accumulator is used to keep a running sum and a count. The {@code getResult} method
     * computes the average.
     */
    private static class AverageAggregate
        implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
      @Override
      public Tuple2<Long, Long> createAccumulator() {
        return new Tuple2<>(0L, 0L);
      }
    
      @Override
      public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
        return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
      }
    
      @Override
      public Double getResult(Tuple2<Long, Long> accumulator) {
        return ((double) accumulator.f0) / accumulator.f1;
      }
    
      @Override
      public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
        return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
      }
    }
    
    private static class MyProcessWindowFunction
        extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
    
      public void process(String key,
                        Context context,
                        Iterable<Double> averages,
                        Collector<Tuple2<String, Double>> out) {
          Double average = averages.iterator().next();
          out.collect(new Tuple2<>(key, average));
      }
    }
    
// 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
// {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}

// API: 如上AggregateFunction与ProcessWindowFunction

// 示例: 获取一段时间内(Window Size)每一个用户(KeyBy)浏览的平均价值(AggregateFunction),并得到Key和Window信息。
kafkaStream
    // 将从Kafka获取的JSON数据解析成Java Bean
    .process(new KafkaProcessFunction())
    // 提取时间戳生成水印
    .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
    // 按用户分组
    .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
    // 构造TimeWindow
    .timeWindow(Time.seconds(windowLengthSeconds))
    // 窗口函数: 获取这段窗口时间内,每一个用户浏览的商品的平均价值,并发出Key和Window信息
    .aggregate(
         new AggregateFunction<UserActionLog, Tuple2<Long, Long>, Double>() {

             // 一、初始值
             // 定义累加器初始值
             @Override
             public Tuple2<Long, Long> createAccumulator() {
                 return new Tuple2<>(0L, 0L);
             }

             // 二、累加
             // 定义累加器如何基于输入数据进行累加
             @Override
             public Tuple2<Long, Long> add(UserActionLog value, Tuple2<Long, Long> accumulator) {
                 accumulator.f0 += 1;
                 accumulator.f1 += value.getProductPrice();
                 return accumulator;
             }

             // 三、合并
             // 定义累加器如何和State中的累加器进行合并
             @Override
             public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
                 acc1.f0 += acc2.f0;
                 acc1.f1 += acc2.f1;
                 return acc1;
             }

             // 四、输出
             // 定义如何输出数据
             @Override
             public Double getResult(Tuple2<Long, Long> accumulator) {
                 return accumulator.f1 / (accumulator.f0 * 1.0);
             }
         },
         new ProcessWindowFunction<Double, String, String, TimeWindow>() {
             @Override
             public void process(String key, Context context, Iterable<Double> elements, Collector<String> out) throws Exception {

                 Double avg = elements.iterator().next();

                 String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
                 String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");

                 String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的平均价值: "+String.format("%.2f",avg);
                 out.collect(record);

             }
         }

    )
    .print();
    
//结果
Key: user_2 窗口开始时间: 2019-11-09 14:05:40 窗口结束时间: 2019-11-09 14:05:50 浏览的商品的平均价值: 13.33
Key: user_3 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 25.00
Key: user_4 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 20.00
Key: user_2 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 30.00
Key: user_5 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 20.00
Key: user_1 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 23.33

(6)WindowFunction (Legacy)

In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. This interface will be deprecated at some point.

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

  /**
   * Evaluates the window and outputs none or several elements.
   *
   * @param key The key for which this window is evaluated.
   * @param window The window that is being evaluated.
   * @param input The elements in the window being evaluated.
   * @param out A collector for emitting elements.
   *
   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
   */
  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-30 18:32:03  更:2022-03-30 18:32:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 15:42:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码