Spark-之自定义wordCount累加器
SparkCore中的3种数据类型:
累加器在多个action算子触发的job中重复累加,且需要action算子才能触发累加器操作。
package com.shufang.acc
import com.shufang.utils.ScUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import scala.collection.mutable
object AccumulatorDemo001 {
def main(args: Array[String]): Unit = {
val sc: SparkContext = ScUtil.getSc
val myAcc = new AccumulatorDemo001
sc.register(myAcc,"udfAcc");
val rdd: RDD[String] = sc.makeRDD(List("a", "a", "b", "a", "c"))
rdd.foreach {
case word =>{
myAcc.add(word)
println(word)
}
}
println(myAcc.value)
sc.stop()
}
}
class AccumulatorDemo001 extends AccumulatorV2[String,mutable.Map[String,Int]]{
private var map: mutable.Map[String, Int] = mutable.Map[String, Int]()
override def isZero: Boolean = {
map.isEmpty
}
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
new AccumulatorDemo001
}
override def reset(): Unit = {
map.clear()
}
override def add(k: String): Unit = {
val newVal: Int = map.getOrElse(k, 0) + 1
map.update(k,newVal)
}
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
val map1 = this.map
val map2 = other.value
map2.foreach {
case(word,count) => {
val newVal: Int = map1.getOrElse(word,0) + count
map1.update(word,newVal)
}
}
}
override def value: mutable.Map[String, Int] = this.map
}
|