一、共享变量
通常,当传递给Spark操作(例如map或reduce)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,远程机器上变量的更新不会传播回驱动程序。支持跨任务的通用、读写共享变量将是低效的。但是,Spark为两种常见的使用模式提供了两种有限的共享变量类型:广播变量和累加器。
spark通过广播变量和累加器实现共享变量。
二、累加器
累加器是只能通过关联和交换操作添加的变量,因此可以有效地并行支持。它们可以用于实现计数器(如MapReduce)或求和。Spark本地支持数字类型的累加器,程序员可以添加对新类型的支持。作为用户,您可以创建命名或未命名的累加器。如下图所示,一个命名的累加器(在这个实例计数器中)将显示在修改累加器的stage的web UI中。Spark在Tasks表中显示任务修改的每个累加器的值。
三、基础演示
- 需求描述:利用累加器统计出worker上所有task运行次数
- 演示环境:
三台虚拟机组成的集群 - 运行环境:
spark-shell - 步骤:
- 启动spark-shell:
bin/spark-shell --master spark://niit01:7077 - 编写如下代码:
scala> val ac1 = sc.longAccumulator("ac1")
ac1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(ac1), value: 0)
scala> sc.parallelize(1 to 20).map(x => {ac1.add(1);x+1}).reduce(_+_)
res0: Int = 230
scala> ac1.value
res1: Long = 20
四、实验案例
- 实验要求:利用累加器完成对特定条件下的task任务运行次数统计,如统计出某数列中的奇数项被计算的次数
- 代码如下:
scala> val ac1 = sc.longAccumulator("ac1")
scala> ac1.value
res0: Long = 0
scala> sc.parallelize(1 to 20).map(x => {
| if (x % 2 != 0)
| {ac1.add(1) }
| x}).reduce(_+_)
scala> ac1.value
res2: Long = 10
- 在webui上查看,如下:
五、简要分析
- 上述的实验案例在webui上查看发现两台worker节点上每台节点上各自执行了5次。故,对于任意一个job而言,其所有的task的总数是集群中所有task之和。另外,对于累加器而言,每个task都可以共享累加器,底层利用分布式锁来确保每个task在使用累加器前其值都是最新的,每个task在原来的数值上进行累计。下面我们来查看下每个task运行时累加器的数值变化:
把上面的代码改写成如下:scala> val ac2 = sc.longAccumulator("ac2")
scala> sc.parallelize(1 to 20).map(x => {
| if( x % 2 != 0)
| {
|
|
| var hostname = java.net.InetAddress.getLocalHost.getHostName;
| val ac2Val = ac2.value
| var str = hostname + " : " + Thread.currentThread().getName + " => ac2 before add: " + ac2Val;
| ac2.add(1)
| val ac2Val2 = ac2.value
| str = str + " => ac2 after add: " + ac2Val2 + "\r\n"
| val socket = new java.net.Socket(hostname,9999);
| val out = socket.getOutputStream;
| out.write(str.getBytes())
| out.flush()
| out.close()
| socket.close()
| }
| x }).reduce(_+_)
注意:上述加入了Socket,故需要在worker节点上按照nc进行查看效果,关于nc的按照有两种,推荐使用在线安装,执行: yum install -y nc - 运行上述代码后查看其他两台节点控制台输出,查看前需要分别在两台从节点上启动nc,执行:
nc -lk 9999 回车,效果如下:
- niit02从节点
- niit03从节点
在web ui上查看,如下所示: 每个Task对累加器更新了5次。 至此,对累加器的介绍就到这里,如对你有所帮助,你的赞将是对我最大的鼓励,谢谢~!!
|