一、累加器
累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。
以下仅使用系统累加器,若要自定义可继承AccumulatorV2,并设定泛型 ,重写累加器的抽象方法
val rdd = sc.makeRDD(List(1,2,3,4))
// 获取系统累加器
// Spark默认就提供了简单数据聚合的累加器。
//例如sc.doubleAccumulator;sc.collectionAccumulator
val sumAcc = sc.longAccumulator("sum")
/*
rdd.foreach(
num => {
// 使用累加器
sumAcc.add(num)
}
)
*/
//以上输出可直接得到10
val mapRDD = rdd.map(
num => {
// 使用累加器
sumAcc.add(num)
num
}
)
// 注意容易少加或多加
// 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
// 一般情况下,累加器会放置在行动算子进行操作
mapRDD.collect()
mapRDD.collect()
println(sumAcc.value)
//20,注意只有行动算子才会使用累加器,转换算子不执行。
二、广播变量
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值(无法更改),以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。(类似全局变量)
闭包数据,都是以Task为单位发送的,每个任务中包含闭包数据,这样可能会导致一个Executor中含有大量重复的数据,并且占用大量内存,Executor其实就一个JVM,所以在启动中,会自动分配内存,完全可以将任务中的闭包数据放置在Executor的内存中,达到共享的目的。
val rdd1 = sc.makeRDD(List(
("a", 1),("b", 2),("c", 3)
))
// val rdd2 = sc.makeRDD(List(
// ("a", 4),("b", 5),("c", 6)
// ))
val map = mutable.Map(("a", 4),("b", 5),("c", 6))
// join会导致数据量几何增长,并且会影响shuffle的性能,不推荐使用
//val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
//joinRDD.collect().foreach(println)
// (a, 1), (b, 2), (c, 3)
// (a, (1,4)),(b, (2,5)),(c, (3,6))
rdd1.map {
case (w, c) => {
val l: Int = map.getOrElse(w, 0)
(w, (c, l))
}
}.collect().foreach(println)
使用广播变量:
val rdd1 = sc.makeRDD(List(
("a", 1),("b", 2),("c", 3)
))
val map = mutable.Map(("a", 4),("b", 5),("c", 6))
// 封装广播变量
val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)
rdd1.map {
case (w, c) => {
// 方法广播变量
val l: Int = bc.value.getOrElse(w, 0)
(w, (c, l))
}
}.collect().foreach(println)
|