一:依赖关系
1:依赖和血缘关系介绍
????????rdd.todebugstring:打印血缘关系
? ? ? ? rdd.dependencies:打印依赖关系
2:保存血缘关系
?3:OneToOne依赖---窄依赖
?4:shuffle依赖--宽依赖
? ? ? ? 新的RDD的一个分区的数据依赖于旧的RDD多个分区的数据,这个依赖称之为shuffle依赖。
5:窄依赖的任务
?6:宽依赖的任务
?7:任务分类
1: 一个main方法里面可能有多个行动算子,比如collect,所以会有多个job
2:一个job可能会有多个阶段,比如上图宽依赖
3:一个阶段可能会有多个task,比如上图一个阶段中的多个分区
?二:持久化
1:RDD自身并不会保存数据,重复读取对象
?2:引入持久化进行优化(文件、内存均可)
3:持久化操作必须在行动算子执行时完成的。不然没有数据,没办法进行持久化。?
4:RDD对象的持久化操作并不一定是为了重用,在数据执行较长,或数据比较重要的场合也可以采用持久化操作。
5:CheckPoint检查点
所谓的检查点,就是通过将RDD中间结果写入磁盘。
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后出现问题,可以从检查点开始重做血缘,减少了开销。
对RDD进行checkpoint操作并不会马上被执行,必须执行action操作才能触发。
6: 缓存和检查点的区别
1:cache缓存只是将数据保存起来,不切断血缘依赖。checkpoint检查点切断血缘依赖。
2:cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。checkpoint的数据通常存储在hdfs等容错、高可用的文件系统,可靠性高。
3:建议对checkpoin的rdd使用cache缓存,这样checkpoint的job只需从cache缓存中读取数据即可,否则需要再从头计算一次rdd
cache:将数据临时存储在内存中进行数据重用
? ? ? ? ? ? ? ? 会在血缘关系中添加新的依赖。一旦出现问题,可以重头读取数据。
persist:将数据临时存储在磁盘文件中进行数据重用
? ? ? ? ? ? ? ? 涉及到磁盘IO,性能较低,但是数据安全
? ? ? ? ? ? ? ? 如果作业执行完毕,临时保存的数据文件就会丢失
checkpoint:将数据长久的保存在磁盘文件中进行数据重用
? ? ? ? ? ? ? ? 涉及到磁盘IO,性能较低,但是数据安全
? ? ? ? ? ? ? ? 为了保证数据安全,所以一般情况下,会独立执行作业
? ? ? ? ? ? ? ? 为了能够提高效率,一般情况下,是需要和cache联合使用
? ? ? ? ? ? ? ? 执行过程中,会切断血缘关系,重新建立新的血缘关系。因为保存的数据比较安全,所以就是数据源的保存地址发生了改变。导致血缘关系发生改变。
三:分区器
1:自定义分区器:根据设置的规则,将同一规则的数据放在同一分区内
package com.atguigu.bigdata.spark.rdd.part
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object Spark01_RDD_Part {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("nba","************"),
("cba","************"),
("wnba","************"),
("nba","************")
),3)
val value = rdd.partitionBy(new MyPartitioner)
value.saveAsTextFile("output")
sc.stop()
}
class MyPartitioner extends Partitioner{
//分区数量
override def numPartitions: Int = 3
//根据数据的key值,返回数据的分区索引(从0开始)
override def getPartition(key: Any): Int = {
key match {
case "nba" => 0
case "wnba" => 1
case _ => 2
}
}
}
}
四:文件的读取与保存
1:保存
package com.atguigu.bigdata.spark.rdd.IO
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_IO_Save {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
("a",1),
("b",2),
("c",3)
)
)
rdd.saveAsTextFile("output1")
rdd.saveAsObjectFile("output2")
rdd.saveAsSequenceFile("output3")
sc.stop()
}
}
2:读取
package com.atguigu.bigdata.spark.rdd.IO
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_IO_Load {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.textFile("output1")
println(rdd.collect().mkString(","))
val rdd1 = sc.objectFile[(String,Int)]("output2")
println(rdd1.collect().mkString(","))
val rdd2 = sc.sequenceFile[String,Int]("output3")
println(rdd2.collect().mkString(","))
sc.stop()
}
}
五:数据结构--累加器(分布式的共享只写变量)
1:概念
累加器用来将executor端变量信息聚合到driver端。在driver程序中定义的变量,在executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回driver端进行merge
package com.atguigu.bigdata.spark.acc
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_Acc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
//获取系统累加器,spark默认提供了简单数据聚合的累加器
val sumAcc = sc.longAccumulator("sum")
rdd.foreach(
num => {
sumAcc.add(num)
}
)
println(sumAcc.value)
sc.stop()
}
}
?2:累加器的少加和多加
package com.atguigu.bigdata.spark.acc
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_Acc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
//获取系统累加器,spark默认提供了简单数据聚合的累加器
val sumAcc = sc.longAccumulator("sum")
val mapRDD = rdd.map(
num => {
sumAcc.add(num)
num
}
)
//少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
mapRDD.collect()
mapRDD.collect()
//多加:多次执行
println(sumAcc.value)
sc.stop()
}
}
3:自定义累加器
package com.atguigu.bigdata.spark.acc
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_Acc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List("hello","spark","hello"))
//累加器:word count
//创建累加器对象
val wcAcc = new MyAccumulator()
//向spark进行注册
sc.register(wcAcc,"wordCountAcc")
rdd.foreach(
word => {
//数据的累加(使用累加器)
wcAcc.add(word)
}
)
println(wcAcc.value)
sc.stop()
}
/*
自定义累加器
1.继承:AccumulatorV2 定义泛型
IN:累加器输入的数据类型
OUT:累加器返回的数据类型
2.重写方法
*/
class MyAccumulator extends AccumulatorV2[String,Map[String,Long]] {
private var wcMap = Map[String,Long]()
//判断是否初始状态
override def isZero: Boolean = {
wcMap.isEmpty
}
override def copy(): AccumulatorV2[String, Map[String, Long]] = {
new MyAccumulator()
}
override def reset(): Unit = {
wcMap.clear()
}
//获取累加器需要计算的值
override def add(word: String): Unit = {
val newCnt = wcMap.getOrElse(word,0L) + 1
wcMap.updated(word,newCnt)
}
//driver合并多个累加器
override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = {
val map1 = this.wcMap
val map2 = other.value
map2.foreach{
case (word,count) => {
val newCount = map1.getOrElse(word,0L) + count
map1.updated(word,newCount)
}
}
}
//累加器结果
override def value: Map[String, Long] = {
wcMap
}
}
}
六:广播变量
? ? ? ? Task的量,是由driver的分区数决定的,和executor的个数无关
? ? ? ? 转换为
? ? ? ?只能访问不能修改
? ? ? ? spark中的广播变量就可以将闭包的数据保存到executor的内存中,不能进行更改。
package com.atguigu.bigdata.spark.acc
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
object Spark04_Bc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(List(
("a",1),("b",2),("c",3)
))
/*val rdd2 = sc.makeRDD(List(
("a",4),("b",5),("c",6)
))
//join会导致数据量几何增长,并且会影响shuffle大的性能,不推荐使用
val value:RDD[(String,(Int,Int))] = rdd1.join(rdd2)
value.collect().foreach(println)*/
/*val map = mutable.Map(("a",4),("b",5),("c",6))
rdd1.map{
case (w,c) => {
val l:Int = map.getOrElse(w,0)
(w,(c,l))
}
}.collect().foreach(println)*/
val map = mutable.Map(("a",4),("b",5),("c",6))
//封装广播变量
val bc:Broadcast[mutable.Map[String,Int]] = sc.broadcast(map)
rdd1.map{
case (w,c) => {
//访问广播变量
val l:Int = bc.value.getOrElse(w,0)
(w,(c,l))
}
}.collect().foreach(println)
sc.stop()
}
}
?
? ? ? ??
|