IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark-之自定义wordCount累加器 -> 正文阅读

[大数据]Spark-之自定义wordCount累加器

Spark-之自定义wordCount累加器

SparkCore中的3种数据类型:

  • 累加器(只写)
  • RDD
  • 广播变量(只读)

累加器在多个action算子触发的job中重复累加,且需要action算子才能触发累加器操作。

package com.shufang.acc

import com.shufang.utils.ScUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}

import scala.collection.mutable

object AccumulatorDemo001 {
  def main(args: Array[String]): Unit = {
    val sc: SparkContext = ScUtil.getSc

    // 通常累加器需要通过action算子触发,多个action算子可能会造成重复累加
    /*val longAcc: LongAccumulator = sc.longAccumulator("longAcc")

    sc.makeRDD(1 to 5)
      .foreach{
        longAcc.add(1L)
        println(_)
      }


    println(longAcc.value)*/
    val myAcc = new AccumulatorDemo001
    sc.register(myAcc,"udfAcc");

    val rdd: RDD[String] = sc.makeRDD(List("a", "a", "b", "a", "c"))

    rdd.foreach {
      case word =>{
        myAcc.add(word)
        println(word)

      }
    }


    println(myAcc.value)
    sc.stop()
  }
}


/**
 * 自定义累加器,实现wordCount
 */
class AccumulatorDemo001 extends AccumulatorV2[String,mutable.Map[String,Int]]{

  private var map: mutable.Map[String, Int] = mutable.Map[String, Int]()
  //判断是否为初始化的累加器,如果map为空,表示为初始化
  override def isZero: Boolean = {
    map.isEmpty
  }

  //拷贝
  override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
    new AccumulatorDemo001
  }

  //重置
  override def reset(): Unit = {
    map.clear()
  }

  //将每个元素添加到累加器
  override def add(k: String): Unit = {
    val newVal: Int = map.getOrElse(k, 0) + 1
    map.update(k,newVal)
  }

  //在Driver端对从多个Executor收集过来的变量进行merge
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
    val map1 = this.map
    val map2 = other.value

    // 将map2中的元素合并到map1中
    map2.foreach {
      case(word,count) => {
        val newVal: Int = map1.getOrElse(word,0) + count
        map1.update(word,newVal)
      }
    }
  }

  // 获取累加器的值
  override def value: mutable.Map[String, Int] = this.map
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-25 12:16:45  更:2021-08-25 12:16:56 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 19:05:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码