IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark核心编程RDD -> 正文阅读

[大数据]Spark核心编程RDD

一.RDD概述

1.1.什么是RDD

? RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

? 弹性
	? 存储的弹性:内存与磁盘的自动切换;
	? 容错的弹性:数据丢失可以自动恢复;
	? 计算的弹性:计算出错重试机制;
	? 分片的弹性:可根据需要重新分片。
? 分布式:数据存储在大数据集群不同节点上
? 数据集:RDD 封装了计算逻辑,并不保存数据
? 数据抽象:RDD 是一个抽象类,需要子类具体实现
? 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,
在新的RDD 里面封装计算逻辑
? 可分区、并行计算

1.1.1.什么是分布式计算

模拟分布式计算

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dhwSUDS7-1648901089136)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/image-20220401085316318.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2eL7ZNAt-1648901089138)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/image-20220401085830738.png)]

1.1.2 RDD的实现原理

以java IO实现方式(装饰者模式)来理解rdd的实现原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nciPSky5-1648901089140)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/image-20220401090433484.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cUm1KN5j-1648901089141)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/image-20220401090321692.png)]

rdd数据处理方式类似于IO,使用了装饰者设计模式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BgGLnB1r-1648901089142)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/image-20220401090920474.png)]

1.2.RDD的核心属性

2.1分区列表

RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性

2.2分区计算函数

Spark 在计算时,是使用分区函数对每一个分区进行计算

2.3RDD之间的依赖关系

RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系

2.4分区器(可选)

当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

2.5首选位置(可选)

计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

1.3.执行原理

? 从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。

? Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的 计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。

? RDD 是 Spark 框架中用于数据处理的核心模型,以yarn集群来解释rdd的工作原理

  1. 启动 Yarn 集群环境

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VhUzMIPh-1648901089144)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/wps1.jpg)]

  1. Spark 通过申请资源创建调度节点和计算节点

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ec1hFTaa-1648901089145)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/wps2.jpg)]

  1. Spark 框架根据需求将计算逻辑根据分区划分成不同的任务(p代表不同分区)

Driver中,多个rdd形成关联后(一般复杂计算逻辑都是由多个rdd关联组合成复杂逻辑),根据分区分解成一个一个的task,进入到任务迟中

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fB2Sh7X9-1648901089147)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/wps3.jpg)]

  1. 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Q6rwxrNJ-1648901089150)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/wps4.jpg)]

从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给Executor 节点执行计算,接下来我们就一起看看 Spark 框架中RDD 是具体是如何进行数据处理的。

二.RDD的创建方式

2.1.从集合(内存)中创建RDD

从集合中创建RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD

val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = 
sparkContext.parallelize(List(1,
2,3,4)
)
val rdd2 = 
sparkContext.makeRDD(List(1,
2,3,4)
)
rdd1.collect().foreach(println)

从底层代码实现来讲,makeRDD 方法其实就是parallelize 方法

def makeRDD[T: 
ClassTag](seq:
Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope 
{parallelize(seq, numSlices)

2.2.从外部存储(文件)创建RDD

由外部存储系统的数据集创建RDD 包括:本地的文件系统,所有Hadoop 支持的数据集, 比如HDFS、HBase 等。

val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()

三.RDD主要算子介绍

RDD分为转换算子和行动算子

3.1.转换算子

3.1.1 map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

        val rdd = spark.makeRDD(List(1, 2, 3, 4))
        val mapRdd = rdd.map(num => {
            num * 2
        })
        mapRdd.collect().foreach(println)

3.1.2 flatMap

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

        val rdd = spark.makeRDD(List(List(1, 2),List(3,4)))
        val flatMapRdd = rdd.flatMap(list => {
            list
        })
        flatMapRdd.collect().foreach(println)
1
2
3
4

3.1.3 groupBy

? 将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样 的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中。

? 一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

        val rdd = spark.makeRDD(List(1,2,3,4),1)
        val groupByRdd = rdd.groupBy(num => {
            num % 2
        })
        groupByRdd.saveAsTextFile("output")

3.1.4 filter

? 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出 现数据倾斜。

        val rdd = spark.makeRDD(List(1,2,3,4),2)
        val filterRdd = rdd.filter(num => {
            num % 2 == 0
        })
        filterRdd.saveAsTextFile("output")

3.1.5 sortBy

? 该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原RDD 的分区数一致。中间存在 shuffle 的过程

        val rdd = spark.makeRDD(List(6,4,3,2,5,1),2)
        val sortByRdd = rdd.sortBy(num => num)
        sortByRdd.collect().foreach(println)
1
2
3
4
5
6

3.1.6 reduceByKey

可以将数据按照相同的Key 对 Value 进行聚合

reduceByKey会寻找相同key的数据,当找到这样的两条记录时会对其value(分别记为x,y)做(x,y) => x+y的处理,即只保留求和之后的数据作为value。反复执行这个操作直至每个key只留下一条记录。

        val rdd = spark.makeRDD(List(("a",2),("a",3),("c",4)))
        val reduceByKeyRdd = rdd.reduceByKey((x, y) => x + y)
        reduceByKeyRdd.collect().foreach(println)
(a,5)
(c,4)

3.1.7 groupByKey

将数据源的数据根据 key 对 value 进行分组

        val rdd = spark.makeRDD(List(("a",2),("a",3),("c",4)))
        val groupByKeyRdd = rdd.groupByKey()
        groupByKeyRdd.collect().foreach(println)
(a,CompactBuffer(2, 3))
(c,CompactBuffer(4))

3.2.行动算子

3.2.1 reduce

聚集RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

        val rdd = spark.makeRDD(List(1,2,3,4))
        val result = rdd.reduce((num1, num2) => num1 + num2)
        println(result)
10

3.2.2 collect

在驱动程序中,以数组Array 的形式返回数据集的所有元素

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 收集数据到 Driver
rdd.collect().foreach(println)

3.2.3 count

返回RDD 中元素的个数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val countResult: Long = rdd.count()

3.2.4 take

返回一个由RDD 的前 n 个元素组成的数组

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val takeResult: Array[Int] = rdd.take(2)
println(takeResult.mkString(","))

3.2.5 countByKey

统计每种 key 的个数

val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2,
"b"), (3, "c"), (3, "c")))
// 统计每种 key 的个数
val result: collection.Map[Int, Long] = rdd.countByKey()

3.2.6 save相关算子

将数据保存到不同格式的文件中

// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件
rdd.map((_,1)).saveAsSequenceFile("output2")

四.RDD序列化和依赖关系

4.1 序列化和闭包检查

4.1.1.为什么要序列化

object SerializerDemo {

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("SerializerDemo")
        val sc = new SparkContext(conf)
        val rdd = sc.makeRDD(List(1, 2, 3, 4))
        val user = new User()
        rdd.foreach(num => {
            println("age=" + user.age + num)
        })
        sc.stop()
    }

    class User {
        var age: Int = 30
    }

}
报错:Task not serializable

从计算的角度, 算子以外的代码都是在Driver 端执行, 算子里面的代码都是在 Executor端执行

报错原因:Spark 算子外部Driver端构建了User对象,算子内部执行时(在Executor执行)使用了User对象。
		算子内部在Excutor执行时需要Driver端将User对象传递过来,而User对象要在网络传递需要被序列化

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OhPoTOab-1648901089152)(Spark运行原理.assets/image-20220402094901562.png)]

解决办法:User类extends Serializable即可;或者再User前加case

4.1.2.闭包检查

object SerializerDemo {

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("SerializerDemo")
        val sc = new SparkContext(conf)
        val rdd = sc.makeRDD(List())
        val user = new User()
        // list 为空foreach应该不会执行,但仍然报错
        rdd.foreach(num => {
            println("age=" + user.age + num)
        })
        sc.stop()
    }

    class User{
        var age: Int = 30
    }

}
报错:Task not serializable
报错位置:Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
还未到任务执行阶段,在ClosureCleaner$.ensureSerializable处检测序列化时报错
报错原因:从计算的角度, 算子以外的代码都是在Driver 端执行, 算子里面的代码都是在 Executor
端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就
形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor
端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列
化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变

4.2.RDD依赖关系

4.2.1 RDD的依赖关系及血缘关系

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-70TSvYfd-1648901089154)(Spark运行原理.assets/image-20220402104207888.png)]

4.2.2 RDD血缘关系的保存

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-O3iqU3NX-1648901089155)(Spark运行原理.assets/image-20220402104658771.png)]

rdd保存依赖关系的示意图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-53TTprp6-1648901089156)(Spark运行原理.assets/image-20220402105040292.png)]

4.2.3宽窄依赖

窄依赖(OneToOne依赖)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QCQMSXN7-1648901089158)(Spark运行原理.assets/image-20220402110202532.png)]

宽依赖(Shuffle依赖)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yyAVachj-1648901089159)(Spark运行原理.assets/image-20220402110412404.png)]

4.3 RDD阶段划分

窄依赖产生的任务个数(窄依赖不需要划分阶段)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fR8Vi6yC-1648901089160)(Spark运行原理.assets/image-20220402111415428.png)]

宽依赖产生的任务个数(宽依赖会划分阶段)

下一个阶段的执行需要等待上一个阶段的任务全部执行完成

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NN3OhtKD-1648901089162)(Spark运行原理.assets/image-20220402111305958.png)]

rdd阶段的划分

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5dlPz6Bs-1648901089163)(Spark运行原理.assets/image-20220402113128210.png)]

4.4 RDD任务的划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task
? Application:初始化一个 SparkContext 即生成一个Application;
? Job:一个Action 算子就会生成一个Job; 
? Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
? Task:一个 Stage 阶段中,最后一个RDD 的分区个数就是Task 的个数。
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
一个应用程序对应多个Job(即一个应用程序中可能有多个行动算子),一个Job中可能会出现多个宽依赖(即一个Job中可能出现多个阶段),每个阶段最后一个RDD使用的分区数就是Task的当前阶段任务个数

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XCZvkytM-1648901089164)(Spark运行原理.assets/image-20220402114133981.png)]

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-04 12:18:09  更:2022-04-04 12:21:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 4:32:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码