前言
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时, 可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变 量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果。
提示:以下是本篇文章正文内容,下面案例可供参考
一、累加器的作用
累加器支持在所有不同节点之间进行累加计算(比如计数、求和、集合累积)。
二、使用步骤
1.引入类
代码如下(示例):
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.SparkConf
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
2.代码解析
代码如下(示例):
var counter1: Int = 0
var data = Seq(1,2,3)
data.foreach(x => counter1 += x )
LOGGER.info("scala集合的累加")
println(counter1)
println("+++++++++++++++++++++++++")
var counter2: Int = 0
val dataRDD: RDD[Int] = sc.parallelize(data)
dataRDD.foreach(x => counter2 += x)
LOGGER.info("RDD的累加")
println(counter2)
println("+++++++++++++++++++++++++")
注意:上面的 RDD 操作中counter2运行结果是 0,因为 foreach 中的函数是传递给 Worker 中的 Executor 执行,用到了 counter2 变量,而 counter2 变量在 Driver 端定义的,在传递给 Executor 的时候,各个 Executor 都有了一份 counter2,最后各个 Executor 将各自个 x 加到自己的 counter2 上面了,和 Driver 端的 counter2 没有关系,如果解决?—使用累加器。
代码如下(示例):
val counter3 = sc.longAccumulator("count")
dataRDD.foreach(x => counter3.add(x))
LOGGER.info("累加器longAccumulator的累加")
println(counter3.value)
println("+++++++++++++++++++++++++")
有些场景会用到集合累加。 代码如下(示例):
val aList = List[String]("1")
val counter4 = sc.collectionAccumulator[List[String]]("add_list")
dataRDD.foreach(x =>{
counter4.add(aList)
})
val accumulatorList = counter4.value.asScala.toList.flatten
LOGGER.info("累加器longAccumulator的累加")
println("accumulatorList: "+accumulatorList)
完整代码如下(示例):
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConverters._
object spark_accumulators_job {
val LOGGER: Logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder().config(initConf()).enableHiveSupport().getOrCreate()
val sc: SparkContext = sparkSession.sparkContext
var counter1: Int = 0
var data = Seq(1,2,3)
data.foreach(x => counter1 += x )
LOGGER.info("scala集合的累加")
println(counter1)
println("+++++++++++++++++++++++++")
var counter2: Int = 0
val dataRDD: RDD[Int] = sc.parallelize(data)
dataRDD.foreach(x => counter2 += x)
LOGGER.info("RDD的累加")
println(counter2)
println("+++++++++++++++++++++++++")
val counter3 = sc.longAccumulator("count")
dataRDD.foreach(x => counter3.add(x))
LOGGER.info("累加器longAccumulator的累加")
println(counter3.value)
println("+++++++++++++++++++++++++")
val aList = List[String]("1")
val counter4 = sc.collectionAccumulator[List[String]]("add_list")
dataRDD.foreach(x =>{
counter4.add(aList)
})
val accumulatorList = counter4.value.asScala.toList.flatten
LOGGER.info("累加器longAccumulator的累加")
println("accumulatorList: "+accumulatorList)
}
def initConf(): SparkConf = {
new SparkConf()
.setAppName("Job")
.registerKryoClasses(Array(
classOf[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema],
classOf[org.apache.spark.sql.types.StructType],
classOf[org.apache.spark.sql.types.StructField],
classOf[org.apache.spark.sql.types.DataType],
classOf[Array[org.apache.spark.sql.Row]],
classOf[Array[org.apache.spark.sql.types.StructField]]
))
}
}
总结
在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。累加器解决了需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点 (Driver Program)之间共享变量的问题。
|