一、RDD对象的重用
如果对同一个数据源先进行相同的一系列转换算子操作,但是最后要分别执行两个不同的转换算子然后打印出两份不同的数据,那么前面的转换操作代码我们需要重复写两次吗?其实不用,相同的转换代码只需要写一次,然后拿得到的一个RDD对象再去执行不同的算子就可以了。
val list = List("Hello Scala", "Hello Spark")
val rdd = sc.makeRDD(list)
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map(word=>{
println("@@@@@@@@@@@@")
(word,1)
})
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
reduceRDD.collect().foreach(println)
println("**************************************")
val groupRDD = mapRDD.groupByKey()
groupRDD.collect().foreach(println)
这样看我们好像重用了一份数据一样,为什么可以这样呢,不是说RDD本身是不存储数据的吗?是的,所以其实这里并没有重用数据,只是重用了一个RDD对象,当我们再拿这个RDD对象执行其他算子时,还是要根据其血缘关系从原始数据开始执行一遍前面的转换操作。
二、RDD缓存
那么我们可不可以真正实现数据的重用呢?可以的,只需要在要对数据进行下一步操作之前,将数据缓存在内存或者硬盘文件上,然后之后的算子操作都在存储中取数据就可以了。
RDD 通过 cache() 或者 persist() 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算 子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
wordToOneRdd.cache()
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。
三、RDD CheckPoint 检查点
cache()和persist()只能临时存储,作业执行完毕后数据就丢失了。我们可以通过设置检查点的方式来实现持久化,所谓的检查点其实就是通过将 RDD 中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
1.设置文件保存路径
checkpoint 需要落盘,需要指定检查点保存路径。检查点路径保存的文件,当作业执行完毕后,不会被删除。一般保存路径都是在分布式存储系统:HDFS
sc.setCheckpointDir("cp")
2.设置检查点
mapRDD.checkpoint()
对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
四、cache、persist和checkpoint的区别
cache : 1.将数据临时存储在内存中进行数据重用
persist : 1.将数据临时存储在磁盘文件中进行数据重用 2.涉及到磁盘IO,性能较低,但是数据安全 3.如果作业执行完毕,临时保存的数据文件就会丢失 4.cache和persist会在血缘关系中添加新的依赖。一旦出现问题,可以重头读取数据 checkpoint : 1.将数据长久地保存在磁盘文件中进行数据重用 2.涉及到磁盘IO,性能较低,但是数据安全 3.为了保证数据安全,所以一般情况下,会独立执行作业(就是会为了checkpoint重复执行一遍前面的作业) 4.为了能够提高效率,一般情况下,是需要和cache联合使用,先cache再checkpoint,这样checkpoint就直接将缓存中的数据持久化到硬盘,无需重复执行 5.执行过程中,会切断血缘关系。重新建立新的血缘关系, checkpoint等同于改变数据源
|