IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark中广播变量和累加器 -> 正文阅读

[大数据]Spark中广播变量和累加器

1、广播变量

广播变量的定义:

广播变量可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个spark操作使用,在机器学习中非常有用。广播变量是类型为spark.broadcast.Broadcast[T]的一个对象,其中存放着类型为T的值。它由运行SparkContext的驱动程序创建后发送给会参与计算的节点,非驱动程序所在节点(即工作节点)访问改变量的方法是调用该变量的value方法,这个值只会被发送到各节点一次,作为只读值处理。

广播变量的使用场景:

如果我们要在分布式计算里面分发大对象,例如字典、集合、黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么这时每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

广播变量图解:

不使用广播变量,每一个Task发送一个

使用广播变量的情况,每一个Executor发送一个

?广播变量的使用:

    val conf = new SparkConf().setAppName("WordCount").setMaster("local")
    val sc = new SparkContext(conf)
    val pairRdd = sc.parallelize(List((1,1), (5,10), (5,9), (2,4), (3,5), (3,6),(4,7), (4,8),(2,3), (1,2)),4)
    //将字段收集到Driver端,并广播发送到Executor
    val allIpAndProvince: Array[(Int, Int)] = pairRdd.reduceByKey(_+_).collect()
    //广播发送  返回引用
    val broadCast: Broadcast[Array[(Int, Int)]] = sc.broadcast(allIpAndProvince)
    val value24: RDD[(Int, Int)] = pairRdd.map(x => {
      //使用广播变量 进行优化
      val value: Array[(Int, Int)] = broadCast.value
      val map: Map[Int, Int] = value.toMap
      (x._1, x._2 + map.get(x._1).get)
    })

注意:

1、不能将RDD作为广播变量广播出去,RDD是不存储数据的。可以将RDD的结果广播出去。

2、广播变量只能在Driver端定义,不能在Executor端定义。

2、累加器

累加器的定义:

spark累加器就是定义在驱动程序的一个变量,但是在集群中的每一个任务都会有一份新的副本。在各个任务中更新副本的值都不会对驱动程序中的值产生影响,只有到最后所有的任务都计算完成后才会合并每一个任务的副本到驱动程序。

累加器的使用场景:

异常监控,调试,记录符合某特性的数据的数目等。如果一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

累加器图解:

在这里插入图片描述

累加器的使用:?

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
    // 声明累加器
    val sumAcc: LongAccumulator = sc.longAccumulator("sumAcc")
    rdd.foreach {
      case (word, count) => {
        // 使用累加器
        sumAcc.add(count)
      }
    }
    // 累加器的toString方法
    //println(sumAcc)
    //取出累加器中的值
    println(sumAcc.value)

自定义累加器:

object WordCount {
  def main(args: Array[String]): Unit = {
    
    val sc = SparkSession.builder().appName("WordCount").master("local").getOrCreate().sparkContext
   
    val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Spark", "Spark","Hive","Hive"), 2)

    //创建累加器
    val acc: myaccumulator = new myaccumulator()
    //注册累加器
    sc.register(acc,"wordcount")
    //使用累加器
    rdd.foreach{
      word => {
        acc.add(word)
      }
    }
    //获取累加器的累加结果
    println(acc.value)
    sc.stop()
  }
}

class myaccumulator extends AccumulatorV2[String,mutable.Map[String,Long]] {
  // 定义输出数据集合,一个可变的Map
  var map: mutable.Map[String, Long] = mutable.Map[String, Long]()

  // 是否为初始化状态,如果集合数据为空,即为初始化状态
  override def isZero: Boolean = map.isEmpty

  // 复制累加器
  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new myaccumulator()

  // 重置累加器
  override def reset(): Unit = map.clear()

  // 增加数据
  override def add(v: String): Unit = {
    if (v.startsWith("H")) {
      map(v) = map.getOrElse(v, 0L) + 1
    }
  }

  // 合并累加器
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    val map2: mutable.Map[String, Long] = other.value
    map2.foreach {
      case (word, count) => {
        map(word) = map.getOrElse(word, 0L) + count
      }
    }

  }

  //累加器的值
  override def value: mutable.Map[String, Long] = map
}

输出:Map(Hello -> 4, Hive -> 2)

注意:

1、累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。

2、累加器不是一个调优的操作,因为如果不这样做,结果是错的。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-05 11:25:14  更:2022-05-05 11:27:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:38:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码