IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink的算子之transform算子 -> 正文阅读

[大数据]flink的算子之transform算子

一.flink算子的执行过程

env -> source -> transform -> sink

3.transform算子

转换算子,把当前的DataStream转化为另一个DataStream

1)简单转换算子(DataStream)

不涉及到分组,不涉及拆分与合并,可以以one-to-one传递数据的流

前提:在虚拟机开启8888端口,在端口发送数据,在控制台进行接收原数据

nc -lk 8888

?注意:转换算子需要用到隐式转换

import org.apache.flink.streaming.api.scala._

/**
 * 创建一个温度传感器的类,把三个字段以参数的形式进行传递
 * @param id id
 * @param timestamp 时间戳
 * @param temperature 温度值
 */
case class SensorReading(id:String,timestamp:Long,temperature:Double)

/**
   * 简单转换流
   * @param env 运行环境
   * @param dataStreamOriginal 原始数据流
   */
  def transform_simple(env:StreamExecutionEnvironment,dataStreamOriginal:DataStream[String]) = {
    //转换算子需要用到隐式转换
    //import org.apache.flink.streaming.api.scala._
   //把得到的数据进行切割,以类的参数的形式传出数据
    val dataStream_sensorReading:DataStream[SensorReading] = dataStreamOriginal.map(line=>{
      val fields = line.split(",")
      SensorReading(fields(0),fields(1).toLong,fields(2).toDouble)
    })
    //把得到的数据源输出
    dataStream_sensorReading.print()
    //把结果进行返回,传给其他参数
    dataStream_sensorReading
  }

  def main(args: Array[String]): Unit = {
    //创建实时流处理的环境
    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置全局并行度为1
    env.setParallelism(1)
    //创建原始数据流
    val dataStream_Original:DataStream[String]= env.socketTextStream("node160",8888)

    transform_simple(env,dataStream_Original)
    //实时数据流必须设置开启启动执行的语句
    env.execute("transform")
  }

b.键控算子(keyedStream)

按照key相同的进行分组?

滚动聚合算子,一定放在键控算子之后使用

聚合算子的特性

min:同一个key下,取当前字段的最小值,但是其他字段取同组内的第一条数据的值

minBy:聚合算子,minBy特点:同一个key下,取当前字段的最小值,其他字段取最小值的那条数据的内容

reduce:自定义取值方式

? ? ? ? ? ? ? ?两种形式:以匿名函数的形式和以ReduceFunction类形式

前提:在虚拟机开启8888端口,在端口发送数据,在控制台进行接收原数据

nc -lk 8888

/**
 * 以继承类的形式
 * 自定义reduce的类
 */
class MyReduce extends RichReduceFunction[SensorReading]{
  override def reduce(t: SensorReading, t1: SensorReading): SensorReading = {
    SensorReading(t.id,t1.timestamp,t.temperature.min(t1.temperature))
  }
}
/**
   * 键控转换算子
   * @param env 运行环境
   * @param dataStreamOriginal 原数据
   */
  def transform_keyBy(env:StreamExecutionEnvironment,dataStreamOriginal: DataStream[String]): Unit ={
    //把字符串类型的数据转成SensorReading类型的数据
    //简单转换流已经进行转化,直接获取即可
    val dataStream_sensorReading:DataStream[SensorReading] = transform_simple(env,dataStreamOriginal)
    //把数据按照id为key进行分组  可以直接写字段名,也可以写索引值
    val dataStream_keyById:KeyedStream[SensorReading,Tuple] = dataStream_sensorReading.keyBy(0)

    //聚合算子,min特点:同一个key下,取当前字段的最小值,但是其他字段取同组内的第一条数据的值
    val dataStream_min:DataStream[SensorReading] = dataStream_keyById.min("temperature")
    //
    val dataStream_minBy:DataStream[SensorReading] = dataStream_keyById.minBy("temperature")
    //输出获取得到的数据
    dataStream_min.print("min")
    dataStream_minBy.print("minBy")

    //通过reduce方法自定义取值方式,取最新时间的最小温度值
    // 方法一:以匿名函数的形式写的
    val dataStream_reduce:DataStream[SensorReading] = dataStream_keyById.reduce((currentData,newData)=>{
      //id:按照分组的原Id   timestamp:以最新的时间戳为主  temperature:原来的温度与最新的维度取最小值
      SensorReading(currentData.id,newData.timestamp,currentData.temperature.min(newData.temperature))
    })
    //通过reduce方法自定义取值方式,取最新时间的最小温度值,
    // 方法二:以ReduceFunction类形式写的
    val dataStream_reduce1:DataStream[SensorReading] = dataStream_keyById.reduce(new MyReduce)
    dataStream_reduce.print("reduce")
    dataStream_reduce1.print("reduce1")
  }

  def main(args: Array[String]): Unit = {
    //创建实时流处理的环境
    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置全局并行度为1
    env.setParallelism(1)
    //创建原始数据流
    val dataStream_Original:DataStream[String]= env.socketTextStream("node160",8888)

    transform_keyBy(env,dataStream_Original)

    //实时数据流必须设置开启启动执行的语句
    env.execute("transform")
  }

输出数据如下?

SensorReading(sensor_1,1547718200,25.0)
reduce> SensorReading(sensor_1,1547718200,25.0)
min> SensorReading(sensor_1,1547718200,25.0)
minBy> SensorReading(sensor_1,1547718200,25.0)
SensorReading(sensor_1,1547718201,30.0)
reduce> SensorReading(sensor_1,1547718201,25.0)
minBy> SensorReading(sensor_1,1547718200,25.0)
min> SensorReading(sensor_1,1547718200,25.0)
SensorReading(sensor_1,1547718202,10.0)
reduce> SensorReading(sensor_1,1547718202,10.0)
min> SensorReading(sensor_1,1547718200,10.0)
minBy> SensorReading(sensor_1,1547718202,10.0)

?c.多流转换算子

? ?(1)分流(SplitStream)

1.split():是一个过期的方法,但是还可以用,传入参数 -> 返回集合类型

2.分流之后接收的数据是SplitStream,是DataStream的子类

3.分流之后直接输出,与没有分流没有什么区别,需要用dataStream_HC.select获取拆分后标识符,在对获取标识符后的数据进行拆分

/**
   * 分流算子
   * 划分条件:数据>=30 高温流 ; 数据<30 低温流
   * @param env 运行环境
   * @param dataStreamOriginal 原数据
   * @return
   */
  def transform_splitStream(env:StreamExecutionEnvironment,dataStreamOriginal: DataStream[String]) = {
    //把字符串的类型转换为sensorReading,方便
    val dataStream_sensorReading:DataStream[SensorReading]= transform_simple(env,dataStreamOriginal)
    //拆分流: dataStream调用的split
    //type TraversableOnce[+A] = scala.collection.TraversableOnce[A]:集合的父类
    //传入参数 -> 返回集合类型  hot or cool
    //拆分后没有对数据进行改变,所以数据类型不变,例:分减
    //把数据按照划分条件划分成高温流和低温流
    val dataStream_HCC:SplitStream[SensorReading] = dataStream_sensorReading.split(data =>{
      val temp = data.temperature
      //用标识符获取对应数据
      //温度>=30
      /*if(temp>=30){
        //高温流的一个标识
        Seq("hot")
      }else{
        //温度<30
        Seq("cool")
      }*/
      if(temp>=30){
        //高温流的一个标识
        Seq("hot")
      }else if(temp>=20 && temp<30){
        //20<=温度<30
        Seq("comfortable")
      }else{
        Seq("cool")
      }
    })
    //dataStream_HCC.select的参数为不定长参数,可以同时查看多个参数,同时包含多个参数
    val dataStream_HC:DataStream[SensorReading] = dataStream_HCC.select("hot","cool")
    //获取不同的数据可以进行不同的操作
    //获取拆分后标识符为高温流的数据
    val dataStream_hot:DataStream[SensorReading] = dataStream_HCC.select("hot")
    //获取拆分后标识符为低温流的数据
    val dataStream_cool:DataStream[SensorReading] = dataStream_HCC.select("cool")
    val dataStream_comfortable:DataStream[SensorReading] = dataStream_HCC.select("comfortable")

    //输出对应数据并且给予标识符
    dataStream_hot.print("hot")
    dataStream_cool.print("cool")
    dataStream_comfortable.print("comfortable")
    dataStream_HC.print("hc")

    //直接输出,看不出变化,一起输出三个变量
    dataStream_HC.print("hcc")
  }
def main(args: Array[String]): Unit = {
    //创建实时流处理的环境
    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置全局并行度为1
    env.setParallelism(1)
    //创建原始数据流
    val dataStream_Original:DataStream[String]= env.socketTextStream("node160",8888)

    transform_splitStream(env,dataStream_Original)

    //实时数据流必须设置开启启动执行的语句
    env.execute("transform")
  }

(2)合流

1. 相同类型流:union

a.重点:类型相同 -> 同一个流进行操作

b.可以同时合并多个流,firstStream.union()的参数是不定长参数,可以合并多个流

 /**
   * 分流算子
   * 划分条件:数据>=30 高温流 ; 数据<30 低温流
   * @param env 运行环境
   * @param dataStreamOriginal 原数据
   * @return
   */
  def transform_splitStream(env:StreamExecutionEnvironment,dataStreamOriginal: DataStream[String]) = {
    //把字符串的类型转换为sensorReading,方便
    val dataStream_sensorReading:DataStream[SensorReading]= transform_simple(env,dataStreamOriginal)
    //拆分流: dataStream调用的split
    //type TraversableOnce[+A] = scala.collection.TraversableOnce[A]:集合的父类
    //传入参数 -> 返回集合类型  hot or cool
    //拆分后没有对数据进行改变,所以数据类型不变,例:分减
    //把数据按照划分条件划分成高温流和低温流
    val dataStream_HCC:SplitStream[SensorReading] = dataStream_sensorReading.split(data =>{
      val temp = data.temperature
      //用标识符获取对应数据
      //温度>=30
      if(temp>=30){
        //高温流的一个标识
        Seq("hot")
      }else if(temp>=20 && temp<30){
        //20<=温度<30
        Seq("comfortable")
      }else{
        Seq("cool")
      }
    })
    //dataStream_HCC.select的参数为不定长参数,可以同时查看多个参数,同时包含多个参数
    val dataStream_HC:DataStream[SensorReading] = dataStream_HCC.select("hot","cool")
    //获取不同的数据可以进行不同的操作
    //获取拆分后标识符为高温流的数据
    val dataStream_hot:DataStream[SensorReading] = dataStream_HCC.select("hot")
    //获取拆分后标识符为低温流的数据
    val dataStream_cool:DataStream[SensorReading] = dataStream_HCC.select("cool")
    val dataStream_comfortable:DataStream[SensorReading] = dataStream_HCC.select("comfortable")

    //输出对应数据并且给予标识符
    dataStream_hot.print("hot")
    dataStream_cool.print("cool")
    dataStream_comfortable.print("comfortable")
    dataStream_HC.print("hc")

    //直接输出,看不出变化,一起输出三个变量
    dataStream_HC.print("hcc")
  }

  /**
   * 把两个相同类型的流合并为一个流
   * 重点:类型相同 -> 同一个进行操作
   * @param firstStream 第一个流
   * @param otherStream 第二个流
   */
    //otherStream:DataStream[SensorReading]* : 不定长参数变成集合形式,不能直接接收
  def transform_UnionStream(firstStream:DataStream[SensorReading],otherStream:DataStream[SensorReading]): Unit ={
    //firstStream.union()的参数是不定长参数,可以合并多个流
      //把两个SensorReading的流合并成一个流
      //因为类型相同,所以可以统一输出
    val dataStream_union:DataStream[SensorReading] = firstStream.union(otherStream)
      dataStream_union.print("union")
  }
def main(args: Array[String]): Unit = {
    //创建实时流处理的环境
    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置全局并行度为1
    env.setParallelism(1)
    //创建原始数据流
    val dataStream_Original:DataStream[String]= env.socketTextStream("node160",8888)

    //transform_simple(env,dataStream_Original)
    //transform_keyBy(env,dataStream_Original)
    transform_splitStream(env,dataStream_Original)

    //实时数据流必须设置开启启动执行的语句
    env.execute("transform")
  }

?2.?不同类型流:connect

a.首先采用简单转换算子,把相同类型的数据流转换为其他类型数据流

b.不同类型的流进行合并,由于流的类型不同,所有合并后的流的泛型无法统一,需要按照流的顺序进行写类型,注意顺序一致,和写入流对应的类型保持一致

c.不是DataStream的类型基本不能输出? dataStream_Connect.print()

d.把合并后的类型转换为DataStream类型,有两种方式

a1.对合并后的流进行处理,以匿名函数的形式进行装换,装换后的类型可以相同也可以不同

 /**
   * 合并不同类型的流
   * @param firstStream 第一个流
   * @param otherStream 第二个流
   */
  def transform_ConnectStream(firstStream:DataStream[SensorReading],otherStream:DataStream[SensorReading]) = {
   //采用简单转换算子
    //为了实现不同类型的流合并,对其中一个流进行改造,把SensorReading转成了(String,Double)
    val dataStream_Tuple:DataStream[(String,Double)] = firstStream.map(data=>{
      (data.id,data.temperature)
    })
    //注意顺序一致,和写入流对应的类型保持一致
    //不同类型的流进行合并,由于流的类型不同,所有合并后的流的泛型无法统一
    val dataStream_Connect:ConnectedStreams[(String,Double),SensorReading] = dataStream_Tuple.connect(otherStream)

    //不是DataStream的类型基本不能输出
    //dataStream_Connect.print()

    //参数:函数和类实现接口的写法
    //对合并后的流进行处理,两个匿名函数类型的参数分别转换两个轮
    val dataStream_ConnectResult:DataStream[String] = dataStream_Connect.map(
      //第一个函数:元组
      firstData =>{
        firstData._1+"==="+firstData._2
      },
      //第二个函数:SensorReading
      //想怎么写,就怎么写
      //自定义写法:和第一个函数保持一致
      secondData =>{
        secondData.id+"==="+secondData.timestamp
      }
    )

    /**
     * 对两个参数进行操作可以转换为不同类行进行输出:Any
     * 对不同流分别处理,但是以同一条流输出
     */
    val dataStream_ConnectAny:DataStream[Any] = dataStream_Connect.map(
      //第一个函数:元组
      firstData =>{
        firstData._1+"==="+firstData._2
      },
      //第二个函数:SensorReading
      //想怎么写,就怎么写
      //自定义写法:和第一个函数保持一致
      secondData =>{
        SensorReading
      }
    )

    //对不同类型的流合并后进行输出
    dataStream_ConnectResult.print("connectString")
    dataStream_ConnectAny.print("connectAny")
    dataStream_ConnectClass.print("connectClass")
  }
 /**
   * 分流算子
   * 划分条件:数据>=30 高温流 ; 数据<30 低温流
   * @param env 运行环境
   * @param dataStreamOriginal 原数据
   * @return
   */
  def transform_splitStream(env:StreamExecutionEnvironment,dataStreamOriginal: DataStream[String]) = {
    //把字符串的类型转换为sensorReading,方便
    val dataStream_sensorReading:DataStream[SensorReading]= transform_simple(env,dataStreamOriginal)
    //拆分流: dataStream调用的split
    //type TraversableOnce[+A] = scala.collection.TraversableOnce[A]:集合的父类
    //传入参数 -> 返回集合类型  hot or cool
    //拆分后没有对数据进行改变,所以数据类型不变,例:分减
    //把数据按照划分条件划分成高温流和低温流
    val dataStream_HCC:SplitStream[SensorReading] = dataStream_sensorReading.split(data =>{
      val temp = data.temperature
      //用标识符获取对应数据
      //温度>=30
      /*if(temp>=30){
        //高温流的一个标识
        Seq("hot")
      }else{
        //温度<30
        Seq("cool")
      }*/
      if(temp>=30){
        //高温流的一个标识
        Seq("hot")
      }else if(temp>=20 && temp<30){
        //20<=温度<30
        Seq("comfortable")
      }else{
        Seq("cool")
      }
    })
    //dataStream_HCC.select的参数为不定长参数,可以同时查看多个参数,同时包含多个参数
    val dataStream_HC:DataStream[SensorReading] = dataStream_HCC.select("hot","cool")
    //获取不同的数据可以进行不同的操作
    //获取拆分后标识符为高温流的数据
    val dataStream_hot:DataStream[SensorReading] = dataStream_HCC.select("hot")
    //获取拆分后标识符为低温流的数据
    val dataStream_cool:DataStream[SensorReading] = dataStream_HCC.select("cool")
    val dataStream_comfortable:DataStream[SensorReading] = dataStream_HCC.select("comfortable")

    //输出对应数据并且给予标识符
    dataStream_hot.print("hot")
    dataStream_cool.print("cool")
    dataStream_comfortable.print("comfortable")
    dataStream_HC.print("hc")

    //直接输出,看不出变化,一起输出三个变量
    dataStream_HC.print("hcc")

    //把刚才拆分出来的高温流和低温流进行不同类型的合并,成一个流进行处理
    transform_ConnectStream(dataStream_hot,dataStream_cool)
  }

?b1.实例化类进行转换类型

//继承CoMapFunction或者RichCoMapFunction
//RichCoMapFunction<IN1, IN2, OUT>三个泛型
//自定义类
class MyCoMap extends RichCoMapFunction[(String,Double),SensorReading,Any]{
  override def map1(value: (String, Double)): Any = {
    value._1+"==="+value._2
  }

  override def map2(value: SensorReading): Any = {
    SensorReading
  }
}
//方法二:实例化类进行转换类型
    val dataStream_ConnectClass:DataStream[Any]= dataStream_Connect.map(new MyCoMap)

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-08 19:06:49  更:2022-06-08 19:09:56 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 4:59:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码