IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink:窗口 -> 正文阅读

[大数据]Flink:窗口

窗口计算

概述

窗口计算是流计算的核心。

窗口计算就是把无界数据流切分为有限大小的“bucket”—>窗口(bucket/window/panel),在窗口上应用计算换上完成计算处理

在这里插入图片描述

整体的程序结构

在这里插入图片描述

窗口的切分

专业的说法:window assigner–>窗口分配器:把无界数据流怎么做窗口的划分

Tumbling Windows:滚动窗口

窗口大小是固定的、上一个窗口的结束是下一个窗口的开始(窗口不会重叠

在这里插入图片描述

Sliding Windows:滑动窗口

窗口大小是固定的,窗口有可能有重叠。窗口会有一个滑动步长(上一个窗口开始,往后滑动一定的时间下一个窗口开始)

在这里插入图片描述

Session Windows:会话窗口

窗口大小不固定,窗口之间会有一个间隙(gap)

在这里插入图片描述

Global Window:全局窗口

整个数据流是一个窗口,因为数据流是无界的,所以全局窗口默认情况下,永远不会触发计算数据
在这里插入图片描述

窗口代码-窗口分配器

窗口切分—windowAssigner的使用

TumblingWindowsAssigner

package com.baizhi.flink.assigner

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

/**
 * 通过滚动窗口对无界数据流进行窗口的切分
 * 以word count为需求实现
 */
object TumblingWindowsAssignerJob {

  def main(args: Array[String]): Unit = {

    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    //1.通过滚动窗口对无界数据流进行切分
    //2.滚动窗口的大小是5秒钟
    //3.ProcessingTime==》处理时间,以系统时钟作为时间基准
    
    //通过window算子对无界数据流进行窗口的切分:窗口分配器WindowAssigner
    val windowedStream: WindowedStream[(String, Int), String, TimeWindow] = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

    val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => (elem._1, sum._2 + elem._2))

    result.print()

    environment.execute("tumblingWindowsAssignerJob")
  }

}

SlidingWindowsAssigner

package com.baizhi.flink.assigner

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

object SlidingWindowsAssignerJob {

  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)


    //1.window算子实现窗口的划分
    //2SlidingProcessingTimeWindows实现滑动窗口
    //3.滑动窗口有固定大小:10秒钟;窗口的滑动步长5秒钟
    //滑动窗口有可能会重叠
    val windowedStream: WindowedStream[(String, Int), String, TimeWindow] = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

    val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => (elem._1, sum._2 + elem._2))

    result.print()

    environment.execute("tumblingWindowsAssignerJob")
  }

}

SessionWindowsAssigner

package com.baizhi.flink.assigner

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{ProcessingTimeSessionWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

object SessionWindowsAssignerJob {

  def main(args: Array[String]): Unit = {

    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    //window算子实现窗口的划分
    //2.withGap方法里面的时间表示的是间隙。设置了间隙是5秒钟
    var windowedStream = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
    val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => (elem._1, sum._2 + elem._2))

    result.print()

    environment.execute("tumblingWindowsAssignerJob")
  }

}

GlobalWindowAssigner

package com.baizhi.flink.assigner

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow}

/**
 * 通过滚动窗口对无界数据流进行窗口的切分
 * 以word count为需求实现
 */
object GlobalWindowsAssignerJob {

  def main(args: Array[String]): Unit = {

    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)


    //window算子进行窗口的划分
    //globalWindows就是把整个无界数据流划分为一个窗口
    //因为无界数据流没有结束,全局窗口也没有结束,所以全局窗口默认的是永远不会触发计算函数
    val windowedStream: WindowedStream[(String, Int), String, GlobalWindow] = keyedStream
      .window(GlobalWindows.create())
      .trigger(CountTrigger.of[GlobalWindow](3))//这是一个数量触发器;窗口中的元素个数到达了规定的值3,就会触发执行

    val result: DataStream[(String, Int)] = windowedStream.reduce((sum, elem) => {
      println("***********")
      (elem._1, sum._2 + elem._2)
    })

    result.print()

    environment.execute("tumblingWindowsAssignerJob")
  }

}

窗口代码-窗口计算函数

reduceFunction

reduceFunction 做的是增量计算—》每过来一个数据,就会执行一次计算。到窗口结束,触发计算的时候,就把计算的结果发送出去

package com.baizhi.flink.function

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object ReduceFunctionJob {

  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    val result: DataStream[(String, Int)] = keyedStream
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .reduce(new MyReduceFunction)

    result.print()

    environment.execute("reduceFunctionJob")
  }

}

class  MyReduceFunction extends ReduceFunction[(String, Int)]{
  override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {

//    println(value1+"***"+value2)

    (value2._1,value2._2+value1._2)
  }
}

processWindowFunction

processWindowFunction做的是批量计算—》每过来一个数据,就会把数据存储起来。到窗口结束,触发计算的时候,把所有的数据获取到进行一次性计算

可以获取到窗口的元数据信息—》窗口的开始时间,结束时间…

package com.baizhi.flink.function

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object ProcessWindowFunctionJob {

  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)


    val result: DataStream[(String, Int)] = keyedStream
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .process(new MyProcessWindowFunction)

    result.print()


    environment.execute("processWindowFunctionJob")
  }

}


//四个类型参数
//In:输入数据的类型
//Out:输出数据的类型
//Key: Key的类型,keyBy所对应的类型。keyedStream[类型1,类型2]-->类型2就是key的类型
//Window:窗口的类型,分成了两种,一种是时间窗口,一种是全局窗口
//class MyProcessWindowFunction extends ProcessWindowFunction[In,Out,Key,Window]
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int),(String, Int),String,TimeWindow]{
  //处理数据

  /**
   *
   * @param key
   * @param context 上下文
   * @param elements 这个参数是一个集合,就是把所有落入到窗口的数据先暂存起来;当窗口触发计算函数的时候,再从这个集合中读取所有的数据进行计算处理
   * @param out
   */
  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {

    //1.word count功能的实现
    val count: Int = elements.map(_._2).sum


    //2.获取元数据信息
    val timeWindow: TimeWindow = context.window

    //窗口的开始时间
    val start: Long = timeWindow.getStart

    //窗口的结束时间
    val end: Long = timeWindow.getEnd
    println(s"窗口时间:[${start}-${end}),窗口中的数据:${elements.mkString(",")}")



    //构建返回值
    out.collect((key,count))


  }
}

既要高效计算又要元数据信息

  • 高效计算就是指增量计算,是由ReduceFunction、aggregateFunction完成的
  • 元数据信息,是由ProcessWindowFunction完成的

就应该把这两个放在一起使用

package com.baizhi.flink.function

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object ReduceAndProcessWindowFunctionJob {

  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    val result: DataStream[String] = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .reduce(new MyReduceFunction2, new MyProcessWindowFunction2)

    result.print()


    environment.execute("processWindowFunctionJob")
  }

}
class MyReduceFunction2 extends ReduceFunction[(String,Int)]{
  override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {

    println(value1+"***"+value2)

    (value2._1,value1._2+value2._2)
  }
}

class MyProcessWindowFunction2 extends ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {

    //elements这里面存储的不是一个一个的数据,而是reduceFunction计算完成的结果
    println(elements.mkString(","))//

    val timeWindow: TimeWindow = context.window

    val start: Long = timeWindow.getStart

    val end: Long = timeWindow.getEnd
    println(s"窗口时间:[${start}-${end})")


    val list: List[(String, Int)] = elements.toList
    out.collect(list(0)._1+"的个数是:"+list(0)._2)

  }
}

在窗口计算中使用状态

  • 在窗口中使用状态:一个窗口对应一个状态,做数据的存储
  • 在窗口之间使用状态:把所有窗口的数据汇总起来
package com.baizhi.flink.function

import org.apache.flink.api.common.state.{KeyedStateStore, ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object StateJob {

  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val dataStream: DataStream[String] = environment.socketTextStream("hadoop10", 9999)

    val keyedStream: KeyedStream[(String, Int), String] = dataStream
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)

    val result: DataStream[String] = keyedStream
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .process(new MyProcessWindowFunction3)

    result.print()



    environment.execute("stateJob")
  }

}
class MyProcessWindowFunction3 extends ProcessWindowFunction[(String,Int),String,String,TimeWindow]{

  var vsdForEachWindow:ValueStateDescriptor[Int]=_
  var vsdForAllWindows:ValueStateDescriptor[Int]=_

  override def open(parameters: Configuration): Unit = {

    vsdForEachWindow=new ValueStateDescriptor[Int]("vsdfew",createTypeInformation[Int])
    vsdForAllWindows=new ValueStateDescriptor[Int]("vsdfaw",createTypeInformation[Int])

  }

  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {

    val count: Int = elements.map(_._2).sum


    //状态的使用

    //没有太大的意义,因为每个窗口的数据都进行了计算
    //针对每一个窗口
    val windowRuntimeContext: KeyedStateStore = context.windowState
    val valueStateForEachWindow: ValueState[Int] = windowRuntimeContext.getState(vsdForEachWindow)

    //全局的
    val globalRuntimeContext: KeyedStateStore = context.globalState
    //把所有窗口的数据都累计起来
    val valueStateForAllWindows: ValueState[Int] = globalRuntimeContext.getState(vsdForAllWindows)

    //从状态中读取之前所有窗口计算的单词的个数
    val oldCount: Int = valueStateForAllWindows.value()

    valueStateForAllWindows.update(oldCount+count)

    println(s"到目前位置,${key}的个数是:"+valueStateForAllWindows.value())




    out.collect(key+"个数是:"+count)

  }
}

Trigger

Trigger:触发器

由触发器决定了窗口的计算函数执行时间

  • 时间窗口-在默认情况下,时间窗口是在窗口结束时触发计算函数

  • 全局窗口-在默认情况下,永不触发

如果默认的触发器不能满足业务需要,就可以通过代码完成触发器的使用

keyedStream.window.trigger(trigger)

Flink提供了Trigger。flink提供了很多Trigger的具体类型可以直接使用

  • ConutTrigger:数量触发器,当窗口中的数据达到规定的数量时就触发计算
  • DeltaTrigger:计算两个数的差值,符合了规定的标准,就触发计算

如果Flink提供的Trigger具体类型不能满足业务需要,就自定义Trigger

  • 写一个类,继承Trigger
  • 重写里面的方法

Evictor

evict:剔除器,在计算函数触发计算之前/之后,可以把窗口中的数据剔除掉

keyedStream.window.evictor(evictor)

Flink提供了Evictor.Flink提供了很多具体的Evictor实现类

  • CountEvictor:数量剔除器,当窗口中数据的数量达到规定的值,进行数据的剔除

如果Flink提供的Evictor不能满足业务需要,可以自定义Evictor

  • 写一个类,实现Evictor接口
    定了窗口的计算函数执行时间

  • 时间窗口-在默认情况下,时间窗口是在窗口结束时触发计算函数

  • 全局窗口-在默认情况下,永不触发

如果默认的触发器不能满足业务需要,就可以通过代码完成触发器的使用

keyedStream.window.trigger(trigger)

Flink提供了Trigger。flink提供了很多Trigger的具体类型可以直接使用

  • ConutTrigger:数量触发器,当窗口中的数据达到规定的数量时就触发计算
  • DeltaTrigger:计算两个数的差值,符合了规定的标准,就触发计算

如果Flink提供的Trigger具体类型不能满足业务需要,就自定义Trigger

  • 写一个类,继承Trigger
  • 重写里面的方法

Evictor

evict:剔除器,在计算函数触发计算之前/之后,可以把窗口中的数据剔除掉

keyedStream.window.evictor(evictor)

Flink提供了Evictor.Flink提供了很多具体的Evictor实现类

  • CountEvictor:数量剔除器,当窗口中数据的数量达到规定的值,进行数据的剔除

如果Flink提供的Evictor不能满足业务需要,可以自定义Evictor

  • 写一个类,实现Evictor接口
  • 重写里面的方法
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-19 12:07:10  更:2021-08-19 12:07:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:22:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码