cache()
底层调用的局势persist(),只能设置缓存级别为StorageLevel.MEMORY_ONLY即在内存中进行缓存
persist()
这个才是真的缓存方法
--无参重载 => 只能设置缓存级别为StorageLevel.MEMORY_ONLY即在内存中进行缓存
--有参重载 => 可以设置缓存等级
是否是?磁盘 是否使?内存 是否使?堆外内存 是否反序列化 副本的个数
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
//这个最常用,先存储到内存中内存存储满了在存到磁盘
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false) }
//缓存的使用
object Test01 {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local"))
val rdd1 = sc.textFile("file:///D:\\IdeaProgram\\Bigdata-0730\\file\\Spark\\wc\\input")
//在这里设置一个cache缓存,只缓存结果,建议缓存完之后执行action,再去执行转换算子+执行算子可以会报错
val rdd2 = rdd1.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _).cache()
//这里如果不设置缓存,每次执行完一个action以后再去执行下一个action的时候都要从头开始进行计算
//如果设置了缓存,每次执行完一个action以后再去执行下一个action的时候直接缓存的为止读取数据
rdd2.collect()
rdd2.foreach(println(_))
rdd2.take(2)
}
}
cachepoint检查点机制
检查点(本质就是通过将RDD写入Disk做检查点)是为了通过lineage(血统)做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点问题而丢失分区,从做查点的RDD开始重做lineage,就会减少开销检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能.
检查点,类似于快照,chekpoint的作?就是将DAG中?较重要的数据做?个检查点,将结果存储到?个?可?的地?
//检查点的使用
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkDemo").setMaster("local")
val sc = new SparkContext(conf)
//设置检查点的路径
sc.setCheckpointDir("hdfs://hadoop01:8020/ck")
val rdd = sc.textFile("hdfs://hadoop01:8020/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//检查点的触发?定要使?个action算?
rdd.checkpoint()
rdd.saveAsTextFile("hdfs://hadoop01:8020/out10")
println(rdd.getCheckpointFile) //查看存储的位置
/**
查看是否可以设置检查点 rdd.isCheckpointed 这个?法在shell中可以使? 但是代码中不好?
*/
}
缓存和检查点的区别
缓存把RDD计算出来的然后放在内存,但是RDD的依赖链不能丢掉,当某个exexutor宕机时,上面cache的RDD就会丢掉,需要通过依赖链放入重新计算,不同的是,checkpoint是把RDD保存在HDFS上,是多副本可靠存储,所以依赖链可以丢掉,就是斩断了依赖链,是通过复制实现的高容错
cache和persist的比较
1.cache底层调用的是persist
2.cache默认持久化等级是内存且不能修改,persist可以修改持久化的等级
什么时候使用cache或checkpoint
1.某步骤计算特别耗时
2.计算链条特别长
3.发生shuffle之后
一般情况建议使用cache或是persist模式,因为不需要创建存储位置,默认存储到内存中计算速度快,而checkpoint需要手动创建存储位置和手动删除数据,若数据量非常庞大建议使用checkpoint
Task包括ResultTask(最后一个阶段的任务) + ShuffleMapTask(非最后一个阶段)
|