IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark广播变量和累加器(附案例) -> 正文阅读

[大数据]Spark广播变量和累加器(附案例)

广播变量:

  • 如果使用了广播变量技术,则 Driver 端将共享数据只会发送到每个 Executor 一份。Executor 中的所有 Task 都复用这个对象。
  • 如果不用广播变量技术,则 Driver 端默认会将共享数据分发到每个 Task 中,造成网络分发压力大。甚至导致你在进行RDD持久化到内存时,因内存不足而被迫存到磁盘,增加了磁盘IO,严重降低性能。


广播变量使用方法(Python实现):

  • 要保证该共享对象是可序列化的。因为跨节点传输的数据都要是可序列化的。

  • 在Driver端将共享对象广播到每个Executor:

    #2-定义一个列表,装特殊字符
    list_v=[",", ".", "!", "#", "$", "%"]
    #3-将列表从Driver端广播到各个Executor中
    bc=sc.broadcast(list_v)
  • 在Executor中获取:
    list2=bc.value

累加器:

????????Spark提供的 Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator 只提供了累加的功能,即确提供了多个 task 对一个变量并行操作的功能。但是 task 只能对 Accumulator 进行累加操作,不能读取 Accumulator 的值,只有 Driver 程序可以读取 Accumulator 的值。创建的 Accumulator 变量的值能够在 Spark Web UI 上看到,所以在创建时应尽量为其命名。

????????Spark内置了三种类型的Accumulator,分别是?LongAccumulator 累加整数型,DoubleAccumulator 累加浮点型,CollectionAccumulator 累加集合元素

整数累加器使用方法(Python实现)?:

  • 在Driver端定义整数累加器,赋初始值。
    acc=sc.accumulator(0)
  • 在Task中每次累加1
    acc.add(1)
    # 或者 
    acc+=1

PySpark累加器和广播变量的综合案例:

  • 案例:过滤非单词字符,并对非单词字符进行统计
  • ?Python代码实现:
    from pyspark import SparkConf, SparkContext, StorageLevel
    import os,jieba,time,re
    
    #指定环境变量
    os.environ['SPARK_HOME'] = '/export/server/spark'
    os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python'
    os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python'
    
    if __name__ == '__main__':
        conf = SparkConf().setAppName('text1').setMaster('local[*]')
        sc = SparkContext(conf=conf)
        # 2-定义一个列表,装特殊字符
        list_v = [",", ".", "!", "#", "$", "%"]
        # 3-将列表从Driver端广播到各个Executor中
        bc = sc.broadcast(list_v)
        # 4-定义累加器,后续在分布式task中,每遇到特殊字符,就累加1
        acc = sc.accumulator(0)
        # 5-加载文本内容,形成RDD
        input_rdd=sc.textFile('file:///export/pyworkspace/sz27_spark/pyspark_core/data/accumulator_broadcast_data.txt')
        # 6-过滤空行
        filtered_rdd = input_rdd.filter(lambda line:len(line.strip())>0)
        # 7-将每行长句子,按空白字符拆分成短字符串
        str_rdd = filtered_rdd.flatMap(lambda line:re.split('\\s+',line))
        # 8-对上面的RDD,过滤,每遇到特殊字符,累加器acc就累加1,并且剔除掉特殊字符,形成的RDD只包含单词
        def filter_str(str):
            global acc
            #获取特殊字符列表
            list2 = bc.value
            if str in list2:
                acc.add(1)
                return False
            else:
                return True
        word_rdd = str_rdd.filter(filter_str)
        # 9-对单词进行计数,
        wordcount_rdd = word_rdd.map(lambda word:(word,1)).reduceByKey(lambda x,y:x+y)
        # 10-打印单词计数结果
        print('单词记数结果:',wordcount_rdd.collect())
        # 11-打印累加器的值,即特殊字符的个数
        print('特殊字符累加器结果:',acc.value)
        sc.stop()

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-22 20:40:12  更:2022-02-22 20:40:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 0:18:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码