一.RDD概述
1.1.什么是RDD
? RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
? 弹性
? 存储的弹性:内存与磁盘的自动切换;
? 容错的弹性:数据丢失可以自动恢复;
? 计算的弹性:计算出错重试机制;
? 分片的弹性:可根据需要重新分片。
? 分布式:数据存储在大数据集群不同节点上
? 数据集:RDD 封装了计算逻辑,并不保存数据
? 数据抽象:RDD 是一个抽象类,需要子类具体实现
? 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,
在新的RDD 里面封装计算逻辑
? 可分区、并行计算
1.1.1.什么是分布式计算
模拟分布式计算
1.1.2 RDD的实现原理
以java IO实现方式(装饰者模式)来理解rdd的实现原理
rdd数据处理方式类似于IO,使用了装饰者设计模式
1.2.RDD的核心属性
2.1分区列表
RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性
2.2分区计算函数
Spark 在计算时,是使用分区函数对每一个分区进行计算
2.3RDD之间的依赖关系
RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系
2.4分区器(可选)
当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区
2.5首选位置(可选)
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
1.3.执行原理
? 从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。
? Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的 计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。
? RDD 是 Spark 框架中用于数据处理的核心模型,以yarn集群来解释rdd的工作原理
- 启动 Yarn 集群环境
- Spark 通过申请资源创建调度节点和计算节点
- Spark 框架根据需求将计算逻辑根据分区划分成不同的任务(p代表不同分区)
Driver中,多个rdd形成关联后(一般复杂计算逻辑都是由多个rdd关联组合成复杂逻辑),根据分区分解成一个一个的task,进入到任务迟中
- 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给Executor 节点执行计算,接下来我们就一起看看 Spark 框架中RDD 是具体是如何进行数据处理的。
二.RDD的创建方式
2.1.从集合(内存)中创建RDD
从集合中创建RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD
val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 =
sparkContext.parallelize(List(1,
2,3,4)
)
val rdd2 =
sparkContext.makeRDD(List(1,
2,3,4)
)
rdd1.collect().foreach(println)
从底层代码实现来讲,makeRDD 方法其实就是parallelize 方法
def makeRDD[T:
ClassTag](seq:
Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope
{parallelize(seq, numSlices)
2.2.从外部存储(文件)创建RDD
由外部存储系统的数据集创建RDD 包括:本地的文件系统,所有Hadoop 支持的数据集, 比如HDFS、HBase 等。
val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()
三.RDD主要算子介绍
RDD分为转换算子和行动算子
3.1.转换算子
3.1.1 map
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
val rdd = spark.makeRDD(List(1, 2, 3, 4))
val mapRdd = rdd.map(num => {
num * 2
})
mapRdd.collect().foreach(println)
3.1.2 flatMap
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
val rdd = spark.makeRDD(List(List(1, 2),List(3,4)))
val flatMapRdd = rdd.flatMap(list => {
list
})
flatMapRdd.collect().foreach(println)
1
2
3
4
3.1.3 groupBy
? 将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样 的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中。
? 一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
val rdd = spark.makeRDD(List(1,2,3,4),1)
val groupByRdd = rdd.groupBy(num => {
num % 2
})
groupByRdd.saveAsTextFile("output")
3.1.4 filter
? 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出 现数据倾斜。
val rdd = spark.makeRDD(List(1,2,3,4),2)
val filterRdd = rdd.filter(num => {
num % 2 == 0
})
filterRdd.saveAsTextFile("output")
3.1.5 sortBy
? 该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原RDD 的分区数一致。中间存在 shuffle 的过程
val rdd = spark.makeRDD(List(6,4,3,2,5,1),2)
val sortByRdd = rdd.sortBy(num => num)
sortByRdd.collect().foreach(println)
1
2
3
4
5
6
3.1.6 reduceByKey
可以将数据按照相同的Key 对 Value 进行聚合
reduceByKey会寻找相同key的数据,当找到这样的两条记录时会对其value(分别记为x,y)做(x,y) => x+y 的处理,即只保留求和之后的数据作为value。反复执行这个操作直至每个key只留下一条记录。
val rdd = spark.makeRDD(List(("a",2),("a",3),("c",4)))
val reduceByKeyRdd = rdd.reduceByKey((x, y) => x + y)
reduceByKeyRdd.collect().foreach(println)
(a,5)
(c,4)
3.1.7 groupByKey
将数据源的数据根据 key 对 value 进行分组
val rdd = spark.makeRDD(List(("a",2),("a",3),("c",4)))
val groupByKeyRdd = rdd.groupByKey()
groupByKeyRdd.collect().foreach(println)
(a,CompactBuffer(2, 3))
(c,CompactBuffer(4))
3.2.行动算子
3.2.1 reduce
聚集RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
val rdd = spark.makeRDD(List(1,2,3,4))
val result = rdd.reduce((num1, num2) => num1 + num2)
println(result)
10
3.2.2 collect
在驱动程序中,以数组Array 的形式返回数据集的所有元素
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
rdd.collect().foreach(println)
3.2.3 count
返回RDD 中元素的个数
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val countResult: Long = rdd.count()
3.2.4 take
返回一个由RDD 的前 n 个元素组成的数组
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val takeResult: Array[Int] = rdd.take(2)
println(takeResult.mkString(","))
3.2.5 countByKey
统计每种 key 的个数
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2,
"b"), (3, "c"), (3, "c")))
val result: collection.Map[Int, Long] = rdd.countByKey()
3.2.6 save相关算子
将数据保存到不同格式的文件中
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
rdd.map((_,1)).saveAsSequenceFile("output2")
四.RDD序列化和依赖关系
4.1 序列化和闭包检查
4.1.1.为什么要序列化
object SerializerDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SerializerDemo")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val user = new User()
rdd.foreach(num => {
println("age=" + user.age + num)
})
sc.stop()
}
class User {
var age: Int = 30
}
}
报错:Task not serializable
从计算的角度, 算子以外的代码都是在Driver 端执行, 算子里面的代码都是在 Executor端执行
报错原因:Spark 算子外部Driver端构建了User对象,算子内部执行时(在Executor执行)使用了User对象。
算子内部在Excutor执行时需要Driver端将User对象传递过来,而User对象要在网络传递需要被序列化
解决办法:User类extends Serializable即可;或者再User前加case
4.1.2.闭包检查
object SerializerDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SerializerDemo")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List())
val user = new User()
rdd.foreach(num => {
println("age=" + user.age + num)
})
sc.stop()
}
class User{
var age: Int = 30
}
}
报错:Task not serializable
报错位置:Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
还未到任务执行阶段,在ClosureCleaner$.ensureSerializable处检测序列化时报错
报错原因:从计算的角度, 算子以外的代码都是在Driver 端执行, 算子里面的代码都是在 Executor
端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就
形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor
端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列
化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变
4.2.RDD依赖关系
4.2.1 RDD的依赖关系及血缘关系
4.2.2 RDD血缘关系的保存
rdd保存依赖关系的示意图
4.2.3宽窄依赖
窄依赖(OneToOne依赖)
宽依赖(Shuffle依赖)
4.3 RDD阶段划分
窄依赖产生的任务个数(窄依赖不需要划分阶段)
宽依赖产生的任务个数(宽依赖会划分阶段)
下一个阶段的执行需要等待上一个阶段的任务全部执行完成
rdd阶段的划分
4.4 RDD任务的划分
RDD 任务切分中间分为:Application、Job、Stage 和 Task
? Application:初始化一个 SparkContext 即生成一个Application;
? Job:一个Action 算子就会生成一个Job;
? Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
? Task:一个 Stage 阶段中,最后一个RDD 的分区个数就是Task 的个数。
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
一个应用程序对应多个Job(即一个应用程序中可能有多个行动算子),一个Job中可能会出现多个宽依赖(即一个Job中可能出现多个阶段),每个阶段最后一个RDD使用的分区数就是Task的当前阶段任务个数
|