IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink 分流之 Filter/Split/SideOutPut 比较 -> 正文阅读

[大数据]Flink 分流之 Filter/Split/SideOutPut 比较

应用场景:

我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何操作呢?

通常来说针对不同的场景,有以下三种办法进行流的拆分

  1. Filter 分流
  2. Split 分流
  3. SideOutPut 分流

1. Filter 分流

我们可以通过做多次 filter 算子,把需要的不同数据生成不同的流

代码示例:

object filterStreamExample {
	  def main(args: Array[String]): Unit = {
	    val env = StreamExecutionEnvironment.getExecutionEnvironment
	    env.setParallelism(1)
	
	    //1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
	    //2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
	    val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
	
	    val littleStream = inputStream.filter(_.split(",")(0).toInt < 500)
	    val bigStream = inputStream.filter(_.split(",")(0).toInt >= 500)
	
	    //打印结果
	    littleStream.print("little------")
	    bigStream.print("big------")
	    env.execute()
	  }
}

结果:

little------:13> 49,2011-02-18,1,0,2,0,5,1,1,0.521667,0.511983,0.516667,0.264925,579,2348,2927
big------:11> 687,2012-11-17,4,1,11,0,6,0,1,0.325,0.326383,0.545417,0.179729,1313,4316,5629
big------:8> 552,2012-07-05,3,1,7,0,4,1,1,0.8275,0.761367,0.457917,0.194029,1405,4836,6241
big------:11> 688,2012-11-18,4,1,11,0,0,0,1,0.3425,0.337746,0.692917,0.227612,922,3747,4669
little------:2> 279,2011-10-06,4,0,10,0,4,1,1,0.494167,0.480425,0.620833,0.134954,639,4126,4765
big------:11> 689,2012-11-19,4,1,11,0,1,1,2,0.380833,0.375621,0.623333,0.235067,449,5050,5499
...

总结:

Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。


2. Split 分流

Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。

代码示例:

object splitStreamExample {
	  def main(args: Array[String]): Unit = {
		    val env = StreamExecutionEnvironment.getExecutionEnvironment
		//    env.setParallelism(1)
		
		    //1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
		    //2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
		    val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
		
		    val splitStream: SplitStream[String] = inputStream.split(new OutputSelector[String] {
		      override def select(out: String): lang.Iterable[String] = {
		        val tags = new util.ArrayList[String]()
		        if (out.split(",")(0).toInt < 500) {
		          tags.add("littleStream")
		        } else if (out.split(",")(0).toInt >= 500) {
		          tags.add("bigStream")
		        }
		        return tags
		      }
		    })
		    
			//打印结果
		    splitStream.select("littleStream").print("little------")
		    splitStream.select("bigStream").print("big------")
		
		    env.execute()
	  }
}

结果:

little------:14> 49,2011-02-18,1,0,2,0,5,1,1,0.521667,0.511983,0.516667,0.264925,579,2348,2927
big------:12> 687,2012-11-17,4,1,11,0,6,0,1,0.325,0.326383,0.545417,0.179729,1313,4316,5629
little------:5> 369,2012-01-04,1,1,1,0,3,1,2,0.1075,0.119337,0.414583,0.1847,95,2273,2368
big------:12> 688,2012-11-18,4,1,11,0,0,0,1,0.3425,0.337746,0.692917,0.227612,922,3747,4669
little------:14> 50,2011-02-19,1,0,2,0,6,0,1,0.399167,0.391404,0.187917,0.507463,532,1103,1635
...

总结:

需要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 littleStream 和 bigStream 流再次调用 split 切分,控制台会抛出以下异常

Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.

是因为该方式已经 废弃 并且建议使用最新的 SideOutPut 进行分流操作。

3. SideOutPut 分流

SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行
? 定义 OutputTag
? 调用特定函数进行数据拆分

  1. ProcessFunction (本次使用该函数)
  2. KeyedProcessFunction
  3. CoProcessFunction
  4. KeyedCoProcessFunction
  5. ProcessWindowFunction
  6. ProcessAllWindowFunction

代码示例:

object sideOutStreamExample {
		  def main(args: Array[String]): Unit = {
			    val env = StreamExecutionEnvironment.getExecutionEnvironment
			//    env.setParallelism(1)
			
			    //1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
			    //2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
			    val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
			
			    //定义两个OutTag
			    val littleOutTag = new OutputTag[String]("littleStream")
			    val bigOutTag = new OutputTag[String]("bigStream")
			
			    val processStream = inputStream.process(new ProcessFunction[String, String] {
			      override def processElement(i: String,
			                                  context: ProcessFunction[String, String]#Context,
			                                  out: Collector[String]): Unit = {
			        if (i.split(",")(0).toInt < 500) {
			          context.output(littleOutTag, i)
			        } else if (i.split(",")(0).toInt >= 500) {
			          context.output(bigOutTag, i)
			        }
			      }
			    })
			
			    val littleStream = processStream.getSideOutput(littleOutTag)
			    val bigStream = processStream.getSideOutput(bigOutTag)
			
			    littleStream.print("little------")
			    bigStream.print("big------")
			    env.execute()
		  }
}

结果:

little------:1> 95,2011-04-05,2,0,4,0,2,1,2,0.414167,0.39835,0.642083,0.388067,167,1628,1795
little------:6> 323,2011-11-19,4,0,11,0,6,0,1,0.329167,0.324483,0.502083,0.224496,943,2720,3663
big------:11> 552,2012-07-05,3,1,7,0,4,1,1,0.8275,0.761367,0.457917,0.194029,1405,4836,6241
little------:6> 324,2011-11-20,4,0,11,0,0,0,2,0.463333,0.457058,0.684583,0.18595,787,2733,3520
big------:11> 553,2012-07-06,3,1,7,0,5,1,1,0.828333,0.752533,0.450833,0.146142,1366,4841,6207
little------:6> 325,2011-11-21,4,0,11,0,1,1,3,0.4475,0.445062,0.91,0.138054,220,2545,2765
...

总结:

Flink 最新提供的 SideOutPut 方式拆分流是可以多次进行拆分的,无需担心会报出异常。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-13 11:48:47  更:2022-05-13 11:51:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 5:31:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码