IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark广播变量 -> 正文阅读

[大数据]Spark广播变量

Spark广播变量

1. 什么是广播变量?

广播变量(Boardcast)是Spark中应对shuffle造成的性能变慢的有效克制手段,它是一种分发机制,一次性封装目标数据结构,以Excutor为粒度做数据分发。数据分发数=Excutor数

1.1 如何理解广播变量

需求: WordCount升级版,统计所有文件里,指定单词的数量。

  • WordCount V1.0
val dict = List("spark","java","scala","python")
val words = spark.sparkContext.textFile("hdfs://word_count_files/*.txt")
val keyWords = words.flatMap(line=> line.split(" ")).filter(word=> dict.contains(word))
keyWords.map((_,1)).reduceByKey(_+_).collect

V1版本调度内幕:

? dict列表连带后面的三行代码会被打包到一个Task里边去,正常情况下代码“负重”较轻,传到每个Excutor端的数据可以近似理解成数据负重,在v1.0版本下,dict就是主要数据负重,如果系统并行度大,Task数量较多(比如有一千个并行度,那么这个dict就要重复传送一千次),这个dict的负重就已经很可观了,更别说很多时候这个dict的大小是几Mb数据了。

痛点:v1.0 的变量分发,是以Task为粒度的,实际生产中的Task数量远大于Excutor数量。

  • WordCount V2.0
val dict = List(“spark”, “tune”)val 
bc = spark.sparkContext.broadcast(dict)
val words = spark.sparkContext.textFile(~/words.csv”).flatmap(line=>line.split(" "))
val keywords = words.filter(word => bc.value.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect

在广播变量的运行机制下,封装成广播变量的数据,由 Driver 端以 Executors 为粒度分发,每一个 Executors 接收到广播变量之后,将其交给 BlockManager 管理。由于广播变量携带的数据已经通过专门的途径存储到 BlockManager 中,因此分发到 Executors 的 Task 不需要再携带同样的数据。

2. 如何让Spark SQL使用广播变量

上面的demo样例是RDD形式的,那么如何让SparkSQL使用广播变量呢?

spark join两张表的时候如果有一张表小于10M,就会自动使用Boardcast Join,这个10M,就是默认的广播阈值。

需要注意两点:

1. 10M指的是加载到内存中小于10M,有些情况下压缩文件如orc小于10M但是并没有采取boardcast join,原因很可能是解压之后超过了10M。
2. join的两张表,只要有一张表小于10m即可。

2.1 通过配置项使用广播变量

spark.sql.autoBoardcastJoinThreshold指定更大的阈值,同样需要注意指定的是文件解压之后的大小也要小于该值。就算是没有压缩,本地文件加载到内存中也会出现数据大小膨胀,需要注意。

2.2 用Join Hints 强制广播

val table1: DataFrame = spark.read.parquet(path1)
val table2: DataFrame = spark.read.parquet(path2)
table1.createOrReplaceTempView("t1")
table2.createOrReplaceTempView("t2")
 
val query: String = “select /*+ broadcast(t2) */ * from t1 inner join t2 on t1.key = t2.key”
val queryResutls: DataFrame = spark.sql(query)

如果不希望频繁注册tempView 也可以用Api形式

table1.join(table2.hint("boardcast"),Seq("key"),"inner")

2.3 用boardcast函数强制广播

import org.apache.spark.sql.functions.broadcast
table1.join(broadcast(table2), Seq(“key”), “inner”)

3. 广播变量的限制

性能角度:

Driver在创建广播变量的时候,需要拉取所有数据分片合并整理再广播出去,所以Driver端需要耗费大量内存来做一个大广播变量。而且Spark硬性规定,广播变量大于8G直接抛异常中断任务。

功能角度:

并不是所有的join都可以转换为boardcast join

boardcast join不支持全连接(full outer join)。

左连接(left outer join)只能广播右表,右连接(right outer join)只支持广播左表。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-25 12:16:45  更:2021-08-25 12:17:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 19:02:27-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码