IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink常用转换算子和自定义函数 -> 正文阅读

[大数据]Flink常用转换算子和自定义函数

??本篇文章主要介绍Flink中的常见转换算子和自定义函数

一、转换算子

1.单流输入

val env = StreamExecutionEnvironment.getExecutionEnvironment

1.1.map、filter:对流中的每个元素进行操作

env.fromElements(1,2,3,4,5,6)
      .map(_*10)
      .print()
      
env.fromElements(1,2,3,4,5,6)
        .filter(_>5)
        .print()

1.2、flatMap:将多个集合里的元素取出来,组成一个新的集合

env.fromCollection(List("AA BB CC","aa bb cc"))
        .flatMap(_.split(" "))
        .print()

1.3、keyBy:
??输入必须是 Tuple 类型
??将一个流拆分成不相交的分区
??默认根据元素指定的key的hashcode值进行分区
??数据没有分流,还是在一个数据流中

env.fromCollection(List("aa bb cc","aa bb nn"))
      .flatMap(_.split(" "))
        .map((_,1))
        .keyBy(_._1)
      .sum(1)
        .print()*

1.4、reduce:将上一次计算完以后的结果跟新的数据再进行聚合运算

env.fromCollection(List("aa bb cc","aa bb nn"))
      .flatMap(_.split(" "))
      .map((_,1))
      .keyBy(_._1)
        .reduce(
          (t1,t2)=>(t1._1,t1._2+t2._2)
        )
        .print()

1.5、aggregations:提供聚合算子,如求max、min、sum、minBy、maxBy
2.多流输入
2.1、union
??对两个或者两个以上的 DataStream 进行合并操作
??需要保证两个数据集的格式是一致的

	val input1 = env.fromElements(1, 2, 3, 4, 5, 6)
    val input2 = env.fromElements(10,10,20,30)
    val unionDataStream = input1.union(input2)

2.2、connect
??两份数据流被 Connect 之后,只是被放在了同一个流中
??内部依然保持各自的数据和形式不发生变化,两份数据相互独立

	val input1 = env.fromElements(1, 2, 3, 4, 5, 6)
	val input3 = env.fromElements("aa", "bb", "cc")
    val connStream: ConnectedStreams[Int, String] = input1.connect(input3)
    //使用CoMapFunction对connect后的流进行操作
    connStream.map(new CoMapFunction[Int,String,String] {
      override def map1(value: Int): String = {
        "tom0" + value
      }

      override def map2(value: String): String = {
        value.toUpperCase()
      }
    }).print()

2.3、CoGroup:两个数据流/集合按照key进行group

val input4=env.socketTextStream("192.168.217.110",9002)
                .map(new MapFunction<String, Tuple2<String,String>>() {

                    @Override
                    public Tuple2<String,String> map(String s) throws Exception {

                        return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
                    }
                });

val input5=env.socketTextStream("192.168.217.110",9001)
                .map(new MapFunction<String, Tuple2<String,String>>() {

                    @Override
                    public Tuple2<String,String> map(String s) throws Exception {

                        return Tuple2.of(s.split(" ")[0],s.split(" ")[1]);
                    }
                });
input1.coGroup(input2)
                .where(new KeySelector<Tuple2<String,String>, Object>() {

                    @Override
                    public Object getKey(Tuple2<String, String> value) throws Exception {
                        return value.f0;
                    }
                }).equalTo(new KeySelector<Tuple2<String,String>, Object>() {

            @Override
            public Object getKey(Tuple2<String, String> value) throws Exception {
                return value.f0;
            }
        }).window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)))
                .trigger(CountTrigger.of(1))
                .apply(new CoGroupFunction<Tuple2<String,String>, Tuple2<String,String>, Object>() {

                    @Override
                    public void coGroup(Iterable<Tuple2<String, String>> iterable, Iterable<Tuple2<String, String>> iterable1, Collector<Object> collector) throws Exception {
                        StringBuffer buffer=new StringBuffer();
                        buffer.append("DataStream frist:\n");
                        for(Tuple2<String,String> value:iterable){
                            buffer.append(value.f0+"=>"+value.f1+"\n");
                        }
                        buffer.append("DataStream second:\n");
                        for(Tuple2<String,String> value:iterable1){
                            buffer.append(value.f0+"=>"+value.f1+"\n");
                        }
                        collector.collect(buffer.toString());
                    }
                }).print();

2.3、coMap、coFlatMap:对ConnectedStreams进行map和flatmap
3.分区算子
3.1、dataStream.shuffle:随机分区
??dataStream.rebalance:循环分区
??dataStream.rescale :调节分区
??dataStream.global:数据发往同一个分区
??dataStream.broadcast:
3.2、自定义分区

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment  env.readTextFile("filePath")
        .flatMap(_.split(" "))
      .map((_,1))
        .partitionCustom(new MyPartitioner,_._1)
        .print()
    env.execute()
  }
  
class MyPartitioner extends Partitioner[String]{
  override def partition(k: String, i: Int): Int = {
    if(k.equals("dd")){
      0
    }else{
      1
    }
  }
}

4.侧输出流

val oddTag = new OutputTag[Int]("奇数")
val evenTag = new OutputTag[Int]("偶数")

val result = env.fromElements(1, 2, 3, 4, 5, 6)
      .process(new ProcessFunction[Int, Int] {
        override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
          if (value % 2 == 0) {
            //输出到主流
            //out.collect(value)
            ctx.output(evenOutPutTag,value)
          } else {
            //输出到侧输出流
            ctx.output(oddOutPutTag, value)
          }
        }
      })

二、自定义函数

5.1、函数类
Flink 包含了各类算子实现UDF函数的抽象类或者接口
例如 MapFunction, FilterFunction, ProcessFunction 等等

class MyFilter() extends FilterFunction[String]{
    override def filter(t: String): Boolean = t.equals("flink")
}
// 调用实现类
dataStream.filter(new MyFilter())

5.2、RichFunction
可以获取运行环境的上下文
??getRuntimeContext()获取运行时上下文,例如函数执行的并行度,任务的名字,以及 state 状态等
??setRuntimeContext()设置运行时上下文,拥有生命周期方法
??open()初始化方法,当一个算子被调用之前 open()会被调用
??close()生命周期中最后调用的方法,做一些清理工作
??如:RichMapFunction、RichFlatMapFunction

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-27 11:55:47  更:2021-08-27 11:56:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:01:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码