IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark 3.0 Dynamic Partition Pruning 动态裁剪分区 -> 正文阅读

[大数据]Spark 3.0 Dynamic Partition Pruning 动态裁剪分区

目录

1、Spark 3.0 简介

2、Adaptive Query Execution(AQE)简介

3、Dynamic Partition Pruning?动态裁剪分区

???????4、DPP相关参数

5、DPP代码测试???????


1、Spark 3.0 简介

Spark3.0解决了超过3400个JIRAs,历时一年半之久,是整个社区集体智慧的成果。Spark SQL和Spark Cores是其中的核心模块,其余模块如PySpark等模块均是建立在两者之上。

与性能相关的新功能主要有:

  • Adaptive Query Execution
  • Dynamic Partition Pruning
  • Query Complication Speedup
  • Join Hints?

2、Adaptive Query Execution(AQE)简介

Adaptive Query Execution(AQE)在之前的1.x版本里已经有所实现,但是之前的框架存在一些缺陷,导致使用不是很多,在Spark3.0中Databricks(Spark初创团队创建的大数据与AI智能公司)和Intel的工程师合作,解决了相关的问题。

在Spark1.0中所有的Catalyst Optimizer都是基于规则 (rule) 优化的。为了产生比较好的查询规则,优化器需要理解数据的特性,于是在Spark2.0中引入了基于代价的优化器 (cost-based optimizer),也就是所谓的CBO。然而,CBO也无法解决很多问题,比如:

  • 数据统计信息普遍缺失,统计信息的收集代价较高;
  • 储存计算分离的架构使得收集到的统计信息可能不再准确;
  • Spark部署在某一单一的硬件架构上,cost很难被估计;
  • Spark的UDF(User-defined Function)简单易用,种类繁多,但是对于CBO来说是个黑盒子,无法估计其cost。

总而言之,由于种种限制,Spark的优化器无法产生最好的Plan。也正是因为上诉原因,运行期的自适应调整就变得相当重要,对于Spark更是如此,于是有了AQE,其基本方法也非常简单易懂。

如下图所示,在执行完部分的查询规划后,Spark可以收集到结果的统计信息,然后利用这些信息再对查询规划重新进行优化。这个优化的过程不是一次性的,而是基于执行完Query stage (RDD宽依赖) 边界进行优化,也就是说随着查询规划的执行会不断的进行优化, 而且尽可能地复用了现有优化器的已有优化规则。让整个查询优化变得更加灵活和自适应。

3、Dynamic Partition Pruning?动态裁剪分区

也许你还会看到调优宝典告诉你调整shuffle产生的partitions的数量。而当前默认数量是200,但是这个200为什么就不得而知了。然而,这个值设置为多少都不是最优的。其实在不同shuffle,数据的输入大小和分布绝大多数都是不一样。那么简单地用一个配置,让所有的shuffle来遵循,显然是不好的。要设得太小,每个partition的大小就会太大,那么GC的压力就会很大,aggregation和sort会更有可能的去spill数据到磁盘。但是,要是设太大,partition的大小就会太小,partition的数量会大。这个会导致不必要的IO,也让task调度器的压力剧增。那么调度器会导致所有task都变慢。这一系列问题在query plan复杂的时候变得尤为突出,还可能会影响到其他性能,最后耗时耗力却调优失败。

问题:

  • 整个查询执行过程中统一使用分区数;
  • 查询执行不同的阶段,数据规模会发生变化;
  • 这样有个不好的地方资源浪费,得不到合理、充分的分配。

优化:

  • 设置较大的初始分区数来满足整个查询执行过程中最大分区数;
  • 每个 query stage 结束和按需自动合并分区。

Query stage:

AQE基于运行时收集Query stage,通过每个阶段调用clalesced进行合并动作。

分区合并实现,看一下普通shuffle没有合并的情况有2 map和5 reduce,reduce1、5的数据量明显高于其他3Reduce,个大家也知道stage间是有依赖、顺序的,那么出现以下情况我们的资源CUP会出现空转、等待情况最终产生长尾Task。

???????

我们希望每个task的数据量是基本相当的,那么AQE基于运行时收集Query stage信息然后通过clalesced算子进行merge操作合并了2、3、4分区。那么后面的任务基于这三个分区进行操作即可,从而达到想要的效果。

???????4、DPP相关参数

Property NameDefaultMeaningversion
spark.sql.adaptive.enabledtrueWhen true, enable adaptive query execution, which re-optimizes the query plan in the middle of query execution, based on accurate runtime statistics.1.6.0
spark.sql.adaptive.coalescePartitions.enabledtrueWhen true and 'spark.sql.adaptive.enabled' is true, Spark will coalesce contiguous shuffle partitions according to the target size (specified by 'spark.sql.adaptive.advisoryPartitionSizeInBytes'), to avoid too many small tasks.3.0.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum(none)The initial number of shuffle partitions before coalescing. If not set, it equals to spark.sql.shuffle.partitions. This configuration only has an effect when 'spark.sql.adaptive.enabled' and 'spark.sql.adaptive.coalescePartitions.enabled' are both true.3.0.0
spark.sql.adaptive.coalescePartitions.minPartitionNum1MBThe minimum size of shuffle partitions after coalescing. This is useful when the adaptively calculated target size is too small during partition coalescing.3.2.0
spark.sql.adaptive.advisoryPartitionSizeInBytesThe advisory size in bytes of the shuffle partition during adaptive optimization (when spark.sql.adaptive.enabled is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.3.0.0

5、DPP代码测试

package cn.lilili.spark.opt.aqe

import cn.lilili.spark.opt.utils.InitUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * @author duke
 * @version 0.0.1
 **/
object AqeDemo {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("AqeDemo").setMaster("local[*]")
      .set("spark.sql.autoBroadcastJoinThreshold", "-1")
      .set("spark.sql.adaptive.enabled", "true")
      .set("spark.sql.adaptive.coalescePartitions.enabled", "true")
      .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "100")
      .set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "10")
      .set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "5mb")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
    runHql(sparkSession)
  }

  def runHql(sparkSession: SparkSession): Unit = {
    val hqlStr: String =
      """
        |SELECT
        |   U.*,R.*
        |FROM
        |   USER_INFO AS U
        |LEFT JOIN REGION_INFO AS R
        |   ON U.CITY = R.CITY
        |""".stripMargin

    sparkSession.sql(hqlStr).show()

    while (true) {}

  }

}

?设置DPP与不设置区别,在分区数量与耗时都有很大的差异。

???????

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-08 19:06:49  更:2022-06-08 19:07:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 4:44:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码