IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark2.4.8 共享变量之累加器 -> 正文阅读

[大数据]Spark2.4.8 共享变量之累加器

一、共享变量


通常,当传递给Spark操作(例如map或reduce)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,远程机器上变量的更新不会传播回驱动程序。支持跨任务的通用、读写共享变量将是低效的。但是,Spark为两种常见的使用模式提供了两种有限的共享变量类型:广播变量和累加器。

spark通过广播变量和累加器实现共享变量。

二、累加器


累加器是只能通过关联和交换操作添加的变量,因此可以有效地并行支持。它们可以用于实现计数器(如MapReduce)或求和。Spark本地支持数字类型的累加器,程序员可以添加对新类型的支持。作为用户,您可以创建命名或未命名的累加器。如下图所示,一个命名的累加器(在这个实例计数器中)将显示在修改累加器的stage的web UI中。Spark在Tasks表中显示任务修改的每个累加器的值。
在这里插入图片描述

三、基础演示


  • 需求描述:利用累加器统计出worker上所有task运行次数
  • 演示环境:三台虚拟机组成的集群
  • 运行环境:spark-shell
  • 步骤:
    • 启动spark-shell:
      bin/spark-shell --master spark://niit01:7077
    • 编写如下代码:
      scala> val ac1 = sc.longAccumulator("ac1")
      ac1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(ac1), value: 0)
      scala> sc.parallelize(1 to 20).map(x => {ac1.add(1);x+1}).reduce(_+_) // 在map中统计出当前的map task 运行的次数
      res0: Int = 230  // reduce的结果
      
      scala> ac1.value // 获取累加器的值
      res1: Long = 20 // 累加器进行了20次
      

四、实验案例


  1. 实验要求:利用累加器完成对特定条件下的task任务运行次数统计,如统计出某数列中的奇数项被计算的次数
  2. 代码如下:
    	
    scala> val ac1 = sc.longAccumulator("ac1") // 创建累加器
    scala> ac1.value  // 获取累加器初始值
    res0: Long = 0
    
    scala> sc.parallelize(1 to 20).map(x => { // 统计奇数被执行的次数
         | if (x % 2 != 0)
         |   {ac1.add(1) }
         | x}).reduce(_+_)
         
    scala> ac1.value // 获取累加器的数值,只被累加了10次
    res2: Long = 10
    
  3. 在webui上查看,如下:
    在这里插入图片描述

五、简要分析


  • 上述的实验案例在webui上查看发现两台worker节点上每台节点上各自执行了5次。故,对于任意一个job而言,其所有的task的总数是集群中所有task之和。另外,对于累加器而言,每个task都可以共享累加器,底层利用分布式锁来确保每个task在使用累加器前其值都是最新的,每个task在原来的数值上进行累计。下面我们来查看下每个task运行时累加器的数值变化:
    把上面的代码改写成如下:
    scala> val ac2 = sc.longAccumulator("ac2") // 创建累加器
    scala> sc.parallelize(1 to 20).map(x => {
         | if( x % 2 != 0)
         | {
         |
         |
         |   var hostname = java.net.InetAddress.getLocalHost.getHostName;
         |   val ac2Val = ac2.value
         |   var str = hostname + " : " + Thread.currentThread().getName + " => ac2 before add: " + ac2Val;
         |   ac2.add(1)
         |   val ac2Val2 = ac2.value
         |   str = str + " => ac2 after add: " + ac2Val2 + "\r\n"
         |   val socket = new java.net.Socket(hostname,9999);
         |   val out = socket.getOutputStream;
         |   out.write(str.getBytes())
         |   out.flush()
         |   out.close()
         |   socket.close()
         | }
         | x }).reduce(_+_)
    
    注意:上述加入了Socket,故需要在worker节点上按照nc进行查看效果,关于nc的按照有两种,推荐使用在线安装,执行: yum install -y nc
  • 运行上述代码后查看其他两台节点控制台输出,查看前需要分别在两台从节点上启动nc,执行: nc -lk 9999 回车,效果如下:
    • niit02从节点
      在这里插入图片描述
    • niit03从节点
      在这里插入图片描述
      在web ui上查看,如下所示:
      在这里插入图片描述
      每个Task对累加器更新了5次。
      至此,对累加器的介绍就到这里,如对你有所帮助,你的赞将是对我最大的鼓励,谢谢~!!
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-09 16:21:04  更:2021-10-09 16:23:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 8:16:49-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码