IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink 基于事件时间的窗口计算,和迟到数据的处理 -> 正文阅读

[大数据]Flink 基于事件时间的窗口计算,和迟到数据的处理

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object TestEventtime {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置使用事件时间,及处理数据(划分窗口计算)的 时候使用的时间是数据在产生的时候自带的时间信
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //设置每隔多长时间进行一次水位线计算。水位线代表的就是一个时间点,这个时间点只会随着进来的数据进行增大,不会降低
    //当水位线一确定,就表示水位线之前的数据都以为他们到齐了,一旦达到窗口计算的规则就进行窗口计算,就算有的数据因为各种原因没有到也不会等他了
    //水位线的计算方式是进来事件数据的最大时间减去自定义可允许减去的时间(毫秒)
    //这里设置的这个就是说每经过多久进行一次水位线的计算更新,这里设置 每经过一秒进行更新一次水位线
    environment.getConfig.setAutoWatermarkInterval(1000)
    //设置并行度为1,可以更直观的看到水位线的更新效果。因为如果设置大于一的话会根据并行度中最低的水位线进行窗口的计算,设置为1避免了这些干扰,效果直观
    environment.setParallelism(1)

      //在设置完了事件时间后就进行数据流的读入和计算等操作
    //从数据源读入数据
    val dataStream: DataStream[String] = environment.socketTextStream("192.168.229.10", 9999)
    //传进来的数据必须是:数据  时间戳 这个格式的,把时间戳进行转换
    val dataStream2: DataStream[(String, Long)] = dataStream.map(_.split(" ")).map(v => (v(1), v(2).toLong))

    //在读入数据源之后对数据流进行自定义水位线的提取方式设置、设置窗口的划分、窗口的计算
    val resault: DataStream[String] = dataStream2
      .assignTimestampsAndWatermarks(new MyWWx)//自定义类完成水位线的提取方式设置
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))//设置窗口划分为5秒一个窗口
      .process(new Myprocess)//自定义类设置计算方式

    //对计算完成的结果进行输出
    resault.print()
    //执行
    environment.execute("eventttime")
  }
}
//写一个类继承AssignerWithPeriodicWatermarks类,用来实现自定义的水位线划分方式
//水位线的提取,采用固定频次的方式:每间隔固定的时间提取一次水位线
//参数为输入数据类型
class MyWWx extends AssignerWithPeriodicWatermarks[(String, Long)]{
  //1.由于水位线是根据进来的数据的最大时间时间-允许延迟的最大时间来计算的,所以要先定义出这两个变量
  //设置最大事件时间
  var maxEventTime:Long = _
  //设置允许的最大延迟时间,单位是毫秒
  var maxAllowLateness:Long=2000

  //getCurrentWatermark这个方法是计算水位线的,每间隔固定的时间,执行一次这个方法比如说:程序中设置1秒钟提取一次水位线。这个方法就会每间隔1秒执行一次
  override def getCurrentWatermark: Watermark = new Watermark(maxEventTime-maxAllowLateness)
  //extractTimestamp 每接收一个数据,都会执行一次这个方法去提取每一个事件的时间
  override def extractTimestamp(t: (String, Long), l: Long): Long = {
    //根据进来的数据进行最大事件时间的更新,拿着之前的最大时间和现在进来的这条数据的事件时间比较一下,更新最大时间为他们中的最大值
    maxEventTime=Math.max(maxEventTime,t._2)
    println(s"线程编号是${Thread.currentThread().getId}的线程===>到目前为止,计算出来的水位线是:${maxEventTime-maxAllowLateness}")
    t._2
  }
}
//写一个类继承ProcessAllWindowFunction类,用来实现自定义窗口计算的方式
//三个参数分别为输入数据类型、输出数据类型、窗口类型
class Myprocess extends ProcessAllWindowFunction [(String, Long),String,TimeWindow]{
  override def process(context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {
    //1.根据上下文获取到窗口对象
    val window: TimeWindow = context.window
    //2.根据窗口对象获取到窗口的信息
    //获取到窗口开始时间
    val start: Long = window.getStart
    //获取到窗口结束时间
    val end: Long = window.getEnd
    //3.由于是滚动窗口下一个窗口的哦开始时上一个窗口的结束,所以是前毖后开
    println(s"窗口的开始以及结束时间是:[${start},${end})")
    //4.将窗口中积累的数据拼接成字符串输出。(由于process是每进来一条数据都会先存起来等够一个窗口里在进行计算)
    out.collect(elements.map(_._1).mkString(","))

  }
}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * 水位线设置以后,窗口就会进行计算,那么那些迟到的数据,在窗口计算完之后又进来了怎么办
 * Flink中对于这些迟到的数据会有以下三种下场
 * 1.直接扔了,也是Flink中默认的规则
 * 2.设置一个允许迟到的时间,就是说你在我允许的迟到时间内又来了,那我还可以重新把你拿到窗口中再计算一次
 * 3.对于那些连允许的迟到时间内都没有赶到的数据,就不会再那么好心打开窗口再计算一次了,你要是非想看看那些迟到了可以做一个标记,输出一下,看看谁迟到了
 */
object LateEvent {
  def main(args: Array[String]): Unit = {
    //1.设置环境
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    environment.getConfig.setAutoWatermarkInterval(1000)
    environment.setParallelism(1)

    //设置数据源拿到数据
    val dataSream: DataStream[String] = environment.socketTextStream("192.168.229.10", 9999)
    val dataStream2: DataStream[(String, Long)] = dataSream.map(_.split(" ")).map(v => (v(0), v(1).toLong))

    //创建一个输出标记
    val output = new OutputTag[(String, Long)]("迟到数据")

    val resault: DataStream[String] = dataStream2
      .assignTimestampsAndWatermarks(new MyWWx)
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
      .allowedLateness(Time.seconds(2)) //设置最大允许迟到2秒
      .sideOutputLateData(output) //设置如果超过了允许迟到了最大时间2秒之后在到的数据就是迟到数据放到上面创建的输出标记里面等会输出
      .process(new Myprocess)

    resault.print()
     //从结果中拿到那些太晚过来的数据,然后进行输出
    val latedata: DataStream[(String, Long)] = resault.getSideOutput(output)
    latedata.print("太晚的数据:")
    environment.execute("laterdata")
  }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-20 15:11:21  更:2021-08-20 15:11:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:09:19-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码