IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark2.X之累加器 -> 正文阅读

[大数据]Spark2.X之累加器


前言

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时, 可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变 量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果。


提示:以下是本篇文章正文内容,下面案例可供参考

一、累加器的作用

累加器支持在所有不同节点之间进行累加计算(比如计数、求和、集合累积)。

二、使用步骤

1.引入类

代码如下(示例):

import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.SparkConf
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._

2.代码解析

代码如下(示例):

//scala集合的累加
var counter1: Int = 0
var data = Seq(1,2,3)
data.foreach(x => counter1 += x )
LOGGER.info("scala集合的累加")
println(counter1)//6
println("+++++++++++++++++++++++++")

//RDD的累加
var counter2: Int = 0
val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
dataRDD.foreach(x => counter2 += x)
LOGGER.info("RDD的累加")
println(counter2)//0
println("+++++++++++++++++++++++++")

注意:上面的 RDD 操作中counter2运行结果是 0,因为 foreach 中的函数是传递给 Worker 中的 Executor 执行,用到了 counter2 变量,而 counter2 变量在 Driver 端定义的,在传递给 Executor 的时候,各个 Executor 都有了一份 counter2,最后各个 Executor 将各自个 x 加到自己的 counter2 上面了,和 Driver 端的 counter2 没有关系,如果解决?—使用累加器。

代码如下(示例):

//累加器longAccumulator的累加
val counter3 = sc.longAccumulator("count")
dataRDD.foreach(x => counter3.add(x))
LOGGER.info("累加器longAccumulator的累加")
println(counter3.value)//6
println("+++++++++++++++++++++++++")

有些场景会用到集合累加。
代码如下(示例):

//累加器longAccumulator的累加
//累加器collectionAccumulator的累加
val aList = List[String]("1")
val counter4 = sc.collectionAccumulator[List[String]]("add_list")
dataRDD.foreach(x =>{
  counter4.add(aList)
})
val accumulatorList = counter4.value.asScala.toList.flatten
LOGGER.info("累加器longAccumulator的累加")
println("accumulatorList:   "+accumulatorList)//List(1, 1, 1)

完整代码如下(示例):

import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConverters._

object spark_accumulators_job {

  val LOGGER: Logger = LoggerFactory.getLogger(getClass)
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder().config(initConf()).enableHiveSupport().getOrCreate()
    val sc: SparkContext = sparkSession.sparkContext

    不使用累加器
    //var counter = 0
    //val data = Seq(1, 2, 3)
    //data.foreach(x => counter += x)
    //println("Counter value: "+ counter)

    将data转换成RDD,再来重新计算
    //var counter = 0
    //val data = Seq(1, 2, 3)
    //var rdd = sc.parallelize(data)
    //rdd.foreach(x => counter += x)
    //println("Counter value: "+ counter)
    
    //scala集合的累加
    var counter1: Int = 0
    var data = Seq(1,2,3)
    data.foreach(x => counter1 += x )
    LOGGER.info("scala集合的累加")
    println(counter1)//6
    println("+++++++++++++++++++++++++")
    //RDD的累加
    var counter2: Int = 0
    val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
     dataRDD.foreach(x => counter2 += x)
    LOGGER.info("RDD的累加")
    println(counter2)//0
    println("+++++++++++++++++++++++++")
    //累加器longAccumulator的累加
    val counter3 = sc.longAccumulator("count")
    dataRDD.foreach(x => counter3.add(x))
    LOGGER.info("累加器longAccumulator的累加")
    println(counter3.value)//6
    println("+++++++++++++++++++++++++")

    //累加器collectionAccumulator的累加
    val aList = List[String]("1")
    val counter4 = sc.collectionAccumulator[List[String]]("add_list")
    dataRDD.foreach(x =>{
      counter4.add(aList)
    })
    val accumulatorList = counter4.value.asScala.toList.flatten
    LOGGER.info("累加器longAccumulator的累加")
    println("accumulatorList:   "+accumulatorList)//List(1, 1, 1)
  }

  def initConf(): SparkConf = {
    new SparkConf()
      .setAppName("Job")
      .registerKryoClasses(Array(
      classOf[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema],
      classOf[org.apache.spark.sql.types.StructType],
      classOf[org.apache.spark.sql.types.StructField],
      classOf[org.apache.spark.sql.types.DataType],
      classOf[Array[org.apache.spark.sql.Row]],
      classOf[Array[org.apache.spark.sql.types.StructField]]
    ))
  }

}

总结

在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。累加器解决了需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点 (Driver Program)之间共享变量的问题。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-03 16:21:05  更:2022-03-03 16:25:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 20:58:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码