?
?
/**
* !!!!!
* rdd不保存数据,保存计算逻辑,类似水管子
* 持久化是截留水池的关系
* 避免从血缘最开始的rdd开始计算
*/
object Test{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setCheckpointDir("cp")
val rdd = sc.makeRDD(List("hello scala","hive sql","hi","scala"),2)//不指定分区,就默认使用最大资源了
val flatRDD = rdd.flatMap(_.split(" "))
val mapRDD = flatRDD.map(
word =>{
println("@@@")
(word,1)
}
)
//cache: 将数据保存到内存中进行数据重用,不安全
// 会在血缘关系中添加新的依赖,一旦出现问题,可以重头读取数据
//persist:数据临时存储在磁盘文件,涉及到磁盘io,性能低但是数据安全,作业执行完毕临时保存的数据文件就会丢失
//checkpoint:将数据长久地保存在磁盘文件中进行数据重用,涉及到磁盘io,性能低但是数据安全
// 为了保证数据安全,所以一般情况下,会独立执行作业
// 为了提高效率,一般情况下,和cache联合使用
// 执行过程中,会切断血缘关系,重新建立新的血缘关系(等同于改变了数据源)
//持久化操作是在action算子执行时完成的
//rdd的持久化操作,不一定是为了重用,在数据执行较长或比较重要的场合都可以使用
//mapRDD.cache()
//mapRDD.persist()
mapRDD.checkpoint()
println(mapRDD.toDebugString)//查看血缘关系,checkpoint 还没有执行
//(2) MapPartitionsRDD[2] at map at Test.scala:35 []
// | MapPartitionsRDD[1] at flatMap at Test.scala:34 []
// | ParallelCollectionRDD[0] at makeRDD at Test.scala:33 []
//没有加cache的话,“@@@”会在***的上下分别出现,因为rdd从头执行计算了;
//加上cache,“@@@”只会出现在***上面
val resultRdd = mapRDD.reduceByKey(_+_)
resultRdd.collect().foreach(println)
println("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&")
println(mapRDD.toDebugString)//查看血缘关系,checkpoint执行后
//(2) MapPartitionsRDD[2] at map at Test.scala:35 []
// | ReliableCheckpointRDD[4] at collect at Test.scala:59 []
println("********************************")
val resultRDD2 = mapRDD.groupByKey()
resultRDD2.collect().foreach(println)
}
}
?
|