IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink(三) -> 正文阅读

[大数据]flink(三)

Watermark

Watermark是一种衡量Event Time进展的机制。

Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。

数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。

Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime–t,那么这个窗口被触发执行。

watermark的引入

最常见的引入方式

dataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {
  override def extractTimestamp(element: SensorReading): Long = {
    element.timestamp * 1000
  }
} )

AssignerWithPunctuatedWatermarks

?周期性的生成 watermark:系统会周期性的将 watermark 插入到流中

?默认周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法进行设置

?升序和前面乱序的处理 BoundedOutOfOrderness ,都是基于周期性 watermark 的。

AssignerWithPeriodicWatermarks

?没有时间周期规律,可打断的生成 watermark

代码

package com.atguigu.aqitest

import java.util

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.util.Collector
import org.apache.lucene.spatial3d.geom.Bounded

object WindowTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(111)//设置默认间隔时间
    //val inputStreamFromFile:DataStream[String]=env.readTextFile("D:\\code\\ideaWorkspace\\FLINKTutorial\\src\\main\\resources\\sensor.txt")
    val inputStream = env.socketTextStream("192.168.1.102", 7777)
    val dataStream: DataStream[SensorReading] = inputStream
      .map(
        data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
        }
      )
      //.assignAscendingTimestamps(_.timestamp*1000L)
      //      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(10)) {
      //        override def extractTimestamp(element: SensorReading): Long = {
      //          element.timestamp * 1000L
      //        }
      //      })
      .assignTimestampsAndWatermarks(new MyWMAssigner)


    //    val resultStream=dataStream
    //      .keyBy("id")
    //      //.window(EventTimeSessionWindows.withGap(Time.minutes(1)))//会话窗口
    //      //.window(TumblingEventTimeWindows.of(Time.days(1),Time.hours(-8)))
    //      //.timeWindow(Time.seconds(15))//滚动窗口
    //      //.timeWindow(Time.seconds(15),Time.seconds(5))//滑动窗口
    //      //.countWindow(10,2)//滑动计数窗口,每十个数统计一次,隔两个数统计一次
    //      .countWindow(10)//滚动计数窗口,每十个数统计一次

    val resultStream = dataStream
      .keyBy("id")
      .timeWindow(Time.seconds(15), Time.seconds(5))
      .allowedLateness(Time.minutes(1))
      .sideOutputLateData(new OutputTag[SensorReading]("late"))
      //.reduce(new MyReduce())
      .apply(new MyWindowFun)

    dataStream.print("data")
    resultStream.getSideOutput(new OutputTag[SensorReading]("late"))
    resultStream.print("result")
    env.execute("window test")


  }
}

//自定义一个全窗口函数
class MyWindowFun() extends WindowFunction[SensorReading, (Long, Int), Tuple, TimeWindow] {

  override def apply(key: Tuple, window: TimeWindow, input: Iterable[SensorReading], out: Collector[(Long, Int)]): Unit = {
    out.collect((window.getStart, input.size))

  }
}

//定义一个周期性生成watermark的assigner
class MyWMAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {

  //需要两个关键参数,延迟时间,和当前所有数据中最大的时间戳

  val lateness: Long = 1000L
  var maxTs: Long = Long.MinValue + lateness

  override def getCurrentWatermark: Watermark = {

    new Watermark(maxTs - lateness)

  }

  override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {

    maxTs = maxTs.max(element.timestamp * 1000L)
    element.timestamp * 1000L

  }
}


//自定义一个断点式生成watermark的Assigner
class MyVmAssigner2 extends AssignerWithPunctuatedWatermarks[SensorReading] {
  override def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = {
    val lateness: Long = 1000L
    if (lastElement.id == "sensor_1")
      new Watermark(extractedTimestamp - lateness)
    else null
  }

  override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
    element.timestamp * 1000L

  }
}

总结

1、watermark就是事件时间,代表时间的进展

2、watermark主要用来处理乱序数据,一般就是直接定义一个延迟时间,延迟触发窗口操作
这里的延迟,指的是当前收到的数据内的时间戳

3、watermark延迟时间的设置,一般设置成最大乱序程度来定,通常设置成最大乱序程度
如果按照最大乱序程度定,那么就能保证所有的数据都是正确的
要权衡正确性和实时性的话,可以不按最大乱序程度,而是给一个相对较小的watermark延迟
watermark延迟时间,完全是程序自己定义的
最好的处理方式,是先了解数据的分布情况(抽样、或者根据经验、机器学习算法),可以指定一个合理的延迟,比较小,还能处理绝大多数乱序的情况

4、关窗操作,必须是时间进展到窗口关闭时间,事件时间语义下就是watermark达到窗口关闭时间
当前Ts最大时间戳-延迟时间=watermark,如果现在的watermark大于等于窗口结束时间,就关闭窗口

5、watermark代表的含义是,之后就不会再来时间戳比watermark里面的数值小的数据了
如果有不同分区的上游分区,当前任务会对它们创建各自的分区watermark,当前任务的事件时间就是最小的那个

6、处理乱序数据,Flink有三重保证
watermark可以设置延迟时间
window的allowedLateness方法,可以设置窗口允许处理迟到数据的时间
window的sideOutputLateData方法,可以将迟到的数据写入侧输出流

窗口有两个重要操作:触发计算,清空状态(关闭窗口)

ProcessFunction API

Flink提供了8个Process Function:

· ProcessFunction

· KeyedProcessFunction

· CoProcessFunction

· ProcessJoinFunction

· BroadcastProcessFunction

· KeyedBroadcastProcessFunction

· ProcessWindowFunction

· ProcessAllWindowFunction

KeyedProcessFunction

· processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的key,以及TimerService时间服务。Context还可以将结果输出到别的流(side outputs)。

· onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。

TimerService 和 定时器(Timers)

Context和OnTimerContext所持有的TimerService对象拥有以下方法:

· currentProcessingTime(): Long 返回当前处理时间

· currentWatermark(): Long 返回当前watermark的时间戳

· registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。

· registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。

· deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。

· deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

当定时器timer触发时,会执行回调函数onTimer()。注意定时器timer只能在keyed streams上面使用。

代码

import com.atguigu.aqitest.SensorReading
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object ProcessFunctionTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream = env.socketTextStream("localhost", 7777)
    val dataStream: DataStream[SensorReading] = inputStream
      .map( data => {
        val dataArray = data.split(",")
        SensorReading( dataArray(0), dataArray(1).toLong, dataArray(2).toDouble )
      } )

    // 检测每一个传感器温度是否连续上升,在10秒之内
    val warningStream: DataStream[String] = dataStream
      .keyBy("id")
      .process( new TempIncreWarning(10000L) )

    warningStream.print()
    env.execute("process function job")
  }
}

// 自定义 KeyedProcessFunction
class TempIncreWarning(interval: Long) extends KeyedProcessFunction[Tuple, SensorReading, String]{
  // 由于需要跟之前的温度值做对比,所以将上一个温度保存成状态
  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
  // 为了方便删除定时器,还需要保存定时器的时间戳
  lazy val curTimerTsState: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("cur-timer-ts", classOf[Long]) )

  //每来一次数据调用processElement方法
  override def processElement(value: SensorReading, ctx: KeyedProcessFunction[Tuple, SensorReading, String]#Context, out: Collector[String]): Unit = {
    // 首先取出状态
    val lastTemp = lastTempState.value()
    val curTimerTs = curTimerTsState.value()

    // 将上次温度值的状态更新为当前数据的温度值
    lastTempState.update(value.temperature)

    // 判断当前温度值,如果比之前温度高,并且没有定时器的话,注册10秒后的定时器
    if( value.temperature > lastTemp && curTimerTs == 0 ){
      val ts = ctx.timerService().currentProcessingTime() + interval
      ctx.timerService().registerProcessingTimeTimer(ts)
      curTimerTsState.update(ts)
    }
    // 如果温度下降,删除定时器
    else if( value.temperature < lastTemp ){
      ctx.timerService().deleteProcessingTimeTimer(curTimerTs)
      // 清空状态
      curTimerTsState.clear()
    }
  }

  // 定时器触发,说明10秒内没有来下降的温度值,报警
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
    out.collect( "温度值连续" + interval/1000 + "秒上升" )
    curTimerTsState.clear()
  }
}

侧输出流(SideOutput

大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。

代码

package com.atguigu.aqitest

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._

import org.apache.flink.util.Collector

object SideOutputTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream=env.socketTextStream("localhost",7777)
    val dataStream:DataStream[SensorReading] =inputStream.map(
      data=>{
        val dataArray=data.split(",")
        SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
      })

    //用ProcessFunction的侧输出流实现分流操作
    val highTempStream: DataStream[SensorReading]= dataStream
      .process(new SplitTempProcessor(30))

    val lowTempStream: DataStream[(String, Double, Long)] =highTempStream.getSideOutput(new OutputTag[(String,Double,Long)]("low-temp"))

    //打印输出
    highTempStream.print("high")
    lowTempStream.print("low")
    env.execute("side output test")

  }
}

//自定义ProcessFunction,用于区分高低温的数据
class SplitTempProcessor(threshold:Int)extends ProcessFunction[SensorReading,SensorReading]{
  override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
    //判断当前数据的温度值,如果大于阈值,输出到主流;如果小于阈值,输出到输出流
    if(value.temperature>threshold){
      out.collect(value)
    }else{
      ctx.output(new OutputTag[(String,Long,Double)]("low-temp"),(value.id,value.temperature,value.timestamp))
    }
  }
}

状态编程

流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。有状态的计算则会基于多个事件输出结果。

算子状态operate state

Flink为算子状态提供三种基本数据结构:

实现接口ListCheckpointed

列表状态(List state)

将状态表示为一组数据的列表。

联合列表状态(Union list state)

也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。

广播状态(Broadcast state)

如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

键控状态keyed state

Flink的Keyed State支持以下数据类型:

**ValueState[T]**保存单个的值,值的类型为T。

? get操作: ValueState.value()

? set操作: ValueState.update(value: T)

**ListState[T]**保存一个列表,列表里的元素的数据类型为T。基本操作如下:

? ListState.add(value: T)

? ListState.addAll(values: java.util.List[T])

? ListState.get()返回Iterable[T]

? ListState.update(values: java.util.List[T])

**MapState[K, V]**保存Key-Value对。

? MapState.get(key: K)

? MapState.put(key: K, value: V)

? MapState.contains(key: K)

? MapState.remove(key: K)

ReducingState[T]

AggregatingState[I, O]

State.clear()是清空操作。

代码

package com.atguigu.aqitest
import java.util
import java.util.concurrent.TimeUnit
import org.apache.flink.api.common.functions.{ReduceFunction, RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state._
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecIntermediateTableScan
import org.apache.flink.util.Collector

object StateTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //配置状态后端
    //    env.setStateBackend(new MemoryStateBackend())//内存级的状态后端
    //    env.setStateBackend(new FsStateBackend(""))//文件级状态后端
    //    env.setStateBackend(new RocksDBStateBackend("",true))

    //checkpoint相关配置
    //启用检查点,指定触发检查点的间隔时间(毫秒)
    env.enableCheckpointing(1000L)
    //其它配置
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointTimeout(30000L)//超时时间
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)//同一时间进行的checkpoint
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500L)//两次checkpoint至少要间隔时长
    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)//true从默认的checkpoint恢复
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)

    //重启策略的配置
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000L))//尝试重启次数 隔多长时间重启一次
    env.setRestartStrategy(RestartStrategies.failureRateRestart(5,Time.of(5,TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS)))
    env.setRestartStrategy(RestartStrategies.fallBackRestart())//不重启


    val inputStream = env.socketTextStream("localhost", 7777)
    val dataStream = inputStream
      .map(
        data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
        })

    val resultStream = dataStream
      .keyBy("id")
//      .flatMapWithState[(String, Double, Double), Double]({
//        case (inputData: SensorReading, None) => (List.empty, Some(inputData.temperature))
//        case (inputData: SensorReading, lastTemp: Some[Double]) => {
//          val diff = (inputData.temperature - lastTemp.get).abs
//          if (diff > 10.0) {
//            (List((inputData.id, lastTemp, inputData.temperature)), Some(inputData.temperature))
//          } else {
//            (List.empty, Some(inputData.temperature))
//          }
//
//        }
//      })

    val warningStream: DataStream[(String, Double, Double)] = dataStream.keyBy("id").map(new TempChangeWarning(10.0))
    warningStream.print("warning")

    env.execute("state test job")

  }
}

//自定义RichMapFunction
class TempChangeWarning(threshold: Double) extends RichMapFunction[SensorReading, (String, Double, Double)] {

  //定义状态变量,上一次的温度值
  private var lastTempState: ValueState[Double] = _

  override def open(parameters: Configuration): Unit = {

    lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double]))

  }

  override def map(value: SensorReading): (String, Double, Double) = {

    //从状态中取出上次的温度值
    val lastTemp = lastTempState.value()

    //更新状态
    lastTempState.update(value.temperature)


    //跟当前温度值计算差值,然后跟阈值比较,如果大于就报警
    val diff = (value.temperature - lastTemp).abs

    if (diff > threshold) {
      (value.id, lastTemp, value.temperature)
    } else {
      null
    }
  }
}

//自定义RichFlatMapFunction
class TempChangeWarningWithFlatmap(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {

  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-temp", classOf[Double]))

  override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {

    val lastTemp: Double = lastTempState.value()

    lastTempState.update(value.temperature)

    val diff: Double = (value.temperature - lastTemp).abs

    if (diff > threshold) {
      out.collect((value.id, lastTemp, value.temperature))
    }

  }
}
class MyProcessor extends KeyedProcessFunction[String, SensorReading, Int] {

  //两种方式获取状态
  //lazy val myState:ValueState[Int]=getRuntimeContext.getState(new ValueStateDescriptor[Int]("my-state",classOf[Int]) )
  var myState: ValueState[Int] = _

  //更好的方式
  override def open(parameters: Configuration): Unit = {
    myState = getRuntimeContext.getState(new ValueStateDescriptor[Int]("my-state", classOf[Int]))
  }

  //listState
  lazy val myListSate: ListState[String] = getIterationRuntimeContext.getListState(new ListStateDescriptor[String]("myList", classOf[String]))

  //mapState
  lazy val myMapState: MapState[String, Double] = getRuntimeContext.getMapState(new MapStateDescriptor[String, Double]("my-mapList", classOf[String], classOf[Double]))

  //reducingState
  lazy val myReducingState: ReducingState[SensorReading] = getRuntimeContext
    .getReducingState(new ReducingStateDescriptor[SensorReading]("my-reducingState", new ReduceFunction[SensorReading] {
      override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {
        SensorReading(value1.id, value1.timestamp.max(value2.timestamp), value1.temperature.min(value2.temperature))
      }
    }, classOf[SensorReading]))

  override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, Int]#Context, out: Collector[Int]): Unit = {

    myState.value()
    myState.update(1)

    myListSate.add("hello")
    myListSate.addAll(new util.ArrayList[String]())

    myMapState.put("sensor_1", 12)
    myMapState.get("sensor_1")

    myReducingState.add(value)
    myReducingState.clear()
  }

}

//operate state示例
class MyMapper() extends RichMapFunction[SensorReading, Long] with ListCheckpointed[Long] {
  //lazy val count:ValueState[Long]=getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
  var count: Long = 0L

  override def map(value: SensorReading): Long = {
    count += 1
    count
  }

  //快照
  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = {

    val stateList = new util.ArrayList[Long]()
    stateList.add(count)
    stateList

  }

  //恢复
  override def restoreState(state: util.List[Long]): Unit = {

        val iter=state.iterator()
        while (iter.hasNext()){
          count+=iter.next()
        }
  }
  //往后面听,会越来越清楚的

}

状态后端

MemoryStateBackend

内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。

FsStateBackend

将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。

RocksDBStateBackend

将所有状态序列化后,存入本地的RocksDB中存储。

注意:RocksDB的支持并不直接包含在flink中,需要引入依赖:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
   <version>1.10.0</version>
 </dependency>

Table API和Flink SQL

介绍

在这里插入图片描述

TableExample

package Table

import com.atguigu.aqitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._

object TableExample {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //读取数据创建DataStream
    val inputStream: DataStream[String] = env.readTextFile("D:\\code\\ideaWorkspace\\FLINKTutorial\\src\\main\\resources\\sensor.txt")
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val array: Array[String] = data.split(",")
      SensorReading(array(0), array(1).toLong, array(2).toDouble)
    })
    //创建表执行环境
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //基于数据流,转换成一张表,然后进行操作
    val dataTable: Table = tableEnv.fromDataStream(dataStream)


    //调用Table API 得到转换结果
    val resultTable: Table = dataTable
      .select("id,temperature")
      .filter("id=='sensor_1'")

    //或者直接写sql得到转换结果
    tableEnv.createTemporaryView("dataTable",dataTable)
    //tableEnv.registerTable("dataTable",dataTable)//与面等同
    val resultSqlTable:Table=tableEnv.sqlQuery("select id, temperature from dataTable where id = 'sensor_1'")


    //转换回数据流,打印输出
    val resultStream: DataStream[(String, Double)] = resultTable.toAppendStream[(String, Double)]
    resultStream.print("result")
    resultTable.printSchema()


    env.execute("table example job")

  }
}

环境的创建

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//val tableEnv: StreamTableEnvironment =StreamTableEnvironment.create(env)

//1.1创建老版本的流查询环境
val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
  .useOldPlanner()
  .inStreamingMode()
  .build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)

//1.2创建老版本的批式查询环境
val batchEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(batchEnv)

//1.3创建blink版本的流查询环境
val bsSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)

//1.4创建blink版本的批式查询环境
val bbSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inBatchMode()
  .build()
val bbTableEnv = TableEnvironment.create(bbSettings)

连接到文件系统

val filePath = "D:\\code\\ideaWorkspace\\FLINKTutorial\\src\\main\\resources\\sensor.txt"
tableEnv.connect(new FileSystem().path(filePath))
  .withFormat(new Csv()) //定义读取数据后的格式化方法
  .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("timestamp", DataTypes.BIGINT())
    .field("temperature", DataTypes.DOUBLE())
  ) //定义结构
  .createTemporaryTable("inputTable") //注册成一张表

//2.2连接到kafka
tableEnv.connect(new Kafka()
  .version("0.11")
  .topic("sensor")
  .property("bootstrap.servers", "localhost:9092")
  .property("zookeeper.connect", "localhost:2181")
)
  .withFormat(new Csv)
  .withSchema(new Schema()
    .field("id", DataTypes.STRING())
    .field("timestamp", DataTypes.BIGINT())
    .field("temperature", DataTypes.DOUBLE())
  )
  .createTemporaryTable("kafkaInputTable")

//3.表的查询
//3.1简单查询,过滤投影
val sensorTable: Table = tableEnv.from("inputTable")
val resultTable: Table = sensorTable
  .select('id, 'temperature)
  .filter('id === "sensor_1")
//3.2 SQL简单查询
val resultSqlTable: Table = tableEnv.sqlQuery(
  """
    |select id,temperature
    |from inputTable
    |where id ='sensor_1'
    |""".stripMargin
)

//3.3简单聚合
val aggResultTable:Table=sensorTable
  .groupBy('id)
  .select('id,'id.count as 'count)
//3.4 SQL实现简单聚合
val aggResultSqlTable:Table=tableEnv.sqlQuery("select id,count(id) as cnt from inputTable group by id")

//转换成流打印输出
val sensorTable1: Table = tableEnv.from("inputTable")
sensorTable1.toAppendStream[(String, Long, Double)].print()
env.execute("table api test job")
aggResultSqlTable.toRetractStream[(String,Double)].print("agg")
resultTable.toAppendStream[(String,Long)].print("result")

输出到文件

package Table
import com.atguigu.aqitest.SensorReading
import com.carrotsearch.hppc.ShortCharMap
import com.sun.prism.PixelFormat.DataType
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object OutputTableTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream = env.readTextFile("")
    val dataStream = inputStream.map(
      data => {
        val array = data.split(",")
        SensorReading(array(0), array(1).toLong, array(2).toDouble)
      }
    )

    val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useOldPlanner()
      .inStreamingMode()
      .build()
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)

    //将DataStream装换成Table
    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature as 'temp, 'timestamp as 'ts)

    //对table进行转换操作,得到结果表
    val resultTable: Table = sensorTable
      .select('id, 'temp)
      .filter('id === "sensor_1")

    //定义一个输出表,这就是要写入数据的TableSink
    tableEnv.connect(new FileSystem().path(""))
      .withFormat(new Csv)
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temp", DataTypes.DOUBLE())
      ).createTemporaryTable("outputTable")

    //将结果表写入table sink
    resultTable.insertInto("outputTable")


    sensorTable.printSchema()
    sensorTable.toAppendStream[(String, Double, Long)].print()
    env.execute("output table test")

  }

}

写入kafka

package Table

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, Schema}

object KafkaTableTest {
  def main(args: Array[String]): Unit = {
    val env=StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val settings:EnvironmentSettings=EnvironmentSettings.newInstance()
      .useOldPlanner()
      .inStreamingMode()
      .build()
    val tableEnv:StreamTableEnvironment=StreamTableEnvironment.create(env,settings)

    //从kafka读取数据
    tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("sensor")
      .property("bootstrap.servers", "localhost:9092")
      .property("zookeeper.connect", "localhost:2181")
    )
      .withFormat(new Csv)
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaInputTable")

    //做转换操作
    val sensorTable:Table=tableEnv.from("kafkaInputTable")
    val resultTable: Table = sensorTable
      .select('id, 'temperature)
      .filter('id === "sensor_1")

    val aggResultTable:Table=resultTable
      .groupBy('id)
      .select('id,'id.count as 'count)

    //定义一个连接到kafka的输出表
    tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("sinkTest")
      .property("bootstrap.servers", "localhost:9092")
      .property("zookeeper.connect", "localhost:2181")
    )
      .withFormat(new Csv)
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaOutputTable")

    resultTable.insertInto("kafkaOutputTable")
    env.execute("")
  }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-26 12:08:54  更:2021-07-26 12:11:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年4日历 -2024/4/19 23:34:05-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码