因为RDD的实现原理和IO的实现原理差不多,我们先来说一下IO的实现原理: 其实真正进行读取数据的还是FileInputStream
IO实现原理图解:

RDD的工作流程:

- RDD不会存储数据;
- RDD也有装饰者模式;
- RDD只有调用collect方法,才会真正执行业务逻辑代码,封装操作都是对RDD的功能扩展
分区和并行度:
概念: 分区 & 并行的概念: 分区和并行度是可以不一样的, 当有2个分区和1个executor的时候,就还不是并行,只能并发执行
并行度执行解析: 对数据进行分区, 然后每个分区内必须一个一个执行,多个分区可以并行执行,做到执行区内有序,区外无序
例: 对数据 List(1,2,3,4), 两个分区 计算流程:
- 先进行分配,
0号分区 => 1 ,2 1号分区 => 3,4 - 如果再执行两次map的话, 就会先将每个分区的第一个数据的全部计算完成之后,才会进行执行第二个, 做到区内数据执行有序

RDD的特点:
介绍:RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。
? 弹性 ? 存储的弹性:内存与磁盘的自动切换(效率高); ? 容错的弹性:数据丢失可以自动恢复; ? 计算的弹性:计算出错重试机制; ? 分片的弹性:可根据需要重新分片(其实就是分区)。 ? 分布式:数据存储在大数据集群不同节点上 ? 数据集:RDD 封装了计算逻辑,并不保存数据(数据计算完成之后,就进行销毁了) ? 数据抽象:RDD 是一个抽象类,需要子类具体实现 ? 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑 ? 可分区、并行计算
创建RDD代码实现:
val conf = new SparkConf().setMaster("local").setAppName("RDD_Memory")
val sc = new SparkContext(conf)
val rdd:RDD[Int] = sc.wholeTextFiles("datas")
rdd.collect().foreach(println)
sc.stop()
RDD的分区源码解析:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD_Partition").setMaster("local[*]")
conf.set("spark.default.parallelism","3")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(1, 2, 3, 4,5))
rdd.saveAsTextFile("output")
sc.stop()
}
数据进行分区的分配规则(源码): 内存:
val array = seq.toArray
positions(array.length, numSlices).map { case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
文件: 使用的还是hadoop的读取流程分配, 可以指定最小分区 分区计算公式: 数据量字节数量/最小分区 =数据量
- 如果这个数据量> 每个分区的大小的1.1倍的话,会重新再增加一个分区
- 如果<= 1.1倍的话,则会把剩下的数据量,和最后一个分区进行聚合
源码: 分区的数量: 
math.min(defaultParallelism, 2)
文件具体的数据的分配例子展示: 
|