IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark RDD转换算子-Value类型总结(2) -> 正文阅读

[大数据]Spark RDD转换算子-Value类型总结(2)

一、groupBy

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中,一个组的数据在一个分区中,但是并不是说一个分区中只有一个组。

例子:将奇数偶数分为两组

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)

    groupRDD.collect().foreach(println)

    sc.stop()
  }

二、filter

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

例子:过滤出奇数

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    
    val filterRDD: RDD[Int] = rdd.filter(_ % 2 == 1)

    filterRDD.collect().foreach(println)

    sc.stop()
  }

三、sample

根据指定的规则从数据集中抽取数据
sample算子需要传入三个参数
第一个参数表示,抽取数据后是否将数据放回 ture(放回),false(丢弃)
第二个参数表示:
1.如果抽取不放回:每条数据被抽取的概率,基准值
2.如果抽取放回:表示每条数据可能被抽取的次数
第三个参数表示,抽取数据时随机算法的种子,不传递的话,那么使用当前系统时间

例子:

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd=sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))

    println(rdd.sample(false, 0.4,1).collect().mkString(","))
    println(rdd.sample(false, 0.4).collect().mkString(","))
    println(rdd.sample(true, 2).collect().mkString(","))

    sc.stop()
  }

四、distinct

用于将数据集中重复的数据去重

例子:去除重复数据

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    
    val rdd=sc.makeRDD(List(1,2,3,4,1,2,3,4))

    val rdd1: RDD[Int] = rdd.distinct()

    rdd1.collect().foreach(println)

    sc.stop()
  }

五、coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

第一个参数是预期分区数,第二是参数是是否开启shuffle
第二个参数是是否开启shuffle
coalesce算子默认情况下不会将分区的数据打乱重新组合
这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
如果想让数据均衡,可以进行shuffle处理

coalesce算子也可以扩大分区,但如果不进行shuffle操作,是没有意义的,不起作用

例子:将三个分区缩减为两个分区

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd=sc.makeRDD(List(1,2,3,4,5,6),3)
    
    //val newRDD: RDD[Int] = rdd.coalesce(2)
    val newRDD: RDD[Int] = rdd.coalesce(2,true)

    newRDD.saveAsTextFile("output")

    sc.stop()
  }

六、repartition

该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true,也就是默认开启shuffle无论是将分区数多的,RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。

例子:从两个分区扩大为三个分区

 def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd=sc.makeRDD(List(1,2,3,4,5,6),2)

    val newRDD: RDD[Int] = rdd.repartition(3)

    newRDD.saveAsTextFile("output")

    sc.stop()
  }

七、sortBy

该操作用于排序数据。在排序之前,可以将数据通过传入的函数表达式进行处理,之后按照传入的函数表达式处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。

第一个参数:依据指定规则来排序
第二个参数:默认为升序(true),降序(false)

sortBy默认情况下不改变分区,但是中间存在shuffle操作

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd=sc.makeRDD(List(7,4,3,5,6,1),2)

    val newRDD: RDD[Int] = rdd.sortBy(num => num)

    newRDD.saveAsTextFile("output")

    sc.stop()
  }
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-27 11:55:47  更:2021-08-27 11:56:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:07:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码