IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink转换算子Transform -> 正文阅读

[大数据]Flink转换算子Transform

①map、flatform
map:输入一个元素,输出一个元素
flatMap:打平操作,将输入的元素压平,从而对输出结果的数量不做要求,可以为0、1或者多个
②keyBy
在逻辑上将Stream根据key的Hash值进行分区,DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区
③滚动聚合算子(Rolling Aggregation)
这些算子可以针对 KeyedStream 的每一个支流做聚合。
sum()
min()
max()
minBy()
maxBy()
min与minBy的区别
假设数据如下:
sensor_1,1547718199,35.8
sensor_1,1547718201,15.4
sensor_1,1547718302,6.7
sensor_1,1547718405,38.1
sensor_1,1547718500,3
sensor_1,1547718600,12.5
sensor_1,1547718700,13
min程序:

val resultStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
      })
      .keyBy("id")
      .min("temperature")

结果:
SensorReading(sensor_1,1547718199,35.8)
SensorReading(sensor_1,1547718199,15.4)
SensorReading(sensor_1,1547718199,6.7)
SensorReading(sensor_1,1547718199,6.7)
SensorReading(sensor_1,1547718199,3.0)
SensorReading(sensor_1,1547718199,3.0)
SensorReading(sensor_1,1547718199,3.0)
min只找到对应字段的最小值,其他字段为首次出现值

minBy程序:

val resultStream = inputStream
  .map(data => {
    val arr = data.split(",")
    SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
  })
  .keyBy("id")
  .minBy("temperature")

结果:
SensorReading(sensor_1,1547718199,35.8)
SensorReading(sensor_1,1547718201,15.4)
SensorReading(sensor_1,1547718302,6.7)
SensorReading(sensor_1,1547718302,6.7)
SensorReading(sensor_1,1547718500,3.0)
SensorReading(sensor_1,1547718500,3.0)
SensorReading(sensor_1,1547718500,3.0)
minBy以数入字段为基础找到其最小值及其这个最小值对应的其他字段
④reduce
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
ⅰ.lambda表达式:

  def reduce(fun: (T, T) => T): DataStream[T] = {
  }

其中第一个T为先前聚合结果,后一个T为新数据
程序:

    val resultStream = inputStream
      .map(data =>{
        val arr = data.split(",")
        SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
      })
      .keyBy("id")
      .reduce((curSensor,newSensor) =>{
SensorReading(curSensor.id,newSensor.timestamp,
curSensor.temperature.min(newSensor.temperature))
      })

ⅱ.ReduceFunction

def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
  }
程序:
val resultStream = inputStream
      .map(data =>{
        val arr = data.split(",")
        SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
      })
      .keyBy("id")
      .reduce(new MyReduceFunction)

class MyReduceFunction extends ReduceFunction[SensorReading]{
  override def reduce(t: SensorReading, t1: SensorReading): SensorReading = {
    SensorReading(t.id,t1.timestamp,t.temperature.min(t1.temperature))
  }
}

⑤split将一个流分成多个流DataStream → SplitStream
select将分开的流选取出来SplitStream→DataStream
程序:

    val splitStream = dataStream
      .split(data =>{
        if(data.temperature > 20) List("high") else List("low")
      })

    val highStream = splitStream.select("high")
    val lowStream = splitStream.select("low")
    val allStream = splitStream.select("high","low")

⑥connect、comap、union
Connect
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
程序:

val splitStream = dataStream
      .split(data => {
        if(data.temperature > 10) List("high") else List("low")
      })

    val highStream = splitStream.select("high")
    val lowStream = splitStream.select("low")

    val allStream = highStream.connect(lowStream.map(data => {
      (data.id,data.temperature)
    }))

comap
connectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。

def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): 
  }

其中fun1与fun2输出可不同

val resultStream = allStream
      .map(
        data => (data.id,data.temperature,"high")
        ,
        data => data
      )

union
DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操 作,产生一个包含所有 DataStream 元素的新 DataStream

val allStream = highStream.union(lowStream)
val resultStream = allStream.map(data =>{
      (data.id,data.temperature)
})

connect 与 union 区别:
①union流的类型必须是一样,union 可以操作多个流DataStream → DataStream。
②connect流的类型可以不一样,在之后的 coMap中再去调整成为connectedStreams → DataStream,connect 只能操作两个流。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-05 17:25:10  更:2021-08-05 17:27:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 4:25:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码