IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark 3.0 中的屏障执行模式_Spark的MPI时代来了 -> 正文阅读

[大数据]Spark 3.0 中的屏障执行模式_Spark的MPI时代来了

RDD屏障概念引入

Spark 3.0 引入了一种名为RDDBarrier[T]的新型 RDD ,它表示 RDD 需要使用屏障执行模式来处理。此 RDD 公开了普通 RDD 中不可用的新功能。
RDDBarrier的源码定义如下:

/**
 * :: Experimental ::
 * Wraps an RDD in a barrier stage, which forces Spark to launch tasks of this stage together
@Experimental
@Since("2.4.0")
class RDDBarrier[T: ClassTag] private[spark] (rdd: RDD[T]) {
...
}

注释中便是第一手的定义,首先RDDBarrier是实验性的,把一个Rdd封装在了一个屏障阶段,这样回强制Spark在运行task的时候在这个阶段一起执行。

关于Barrier概念其实和并发环境中的屏障是一样的,直白点说就是多个线程会在同一个时间点开始执行。
Barrier

实操

来点例子,找找感觉。我们首先还是创建RDDBarrier Rdd

    val df = sparkSession.range(0,100).repartition(4)
    val barrierRdd = df.rdd.barrier()
    val count = barrierRdd.mapPartitions(v => v).count()
    println("count is " + count)

需要看出效果,我们把线程数量调整成1

val sparkSession = SparkSession.builder.
          master("local[1]")
          .appName("barrierRdd")
          .getOrCreate()

我们运行,看效果,我找出关键的日志:

21/12/15 22:42:20 INFO SharedState: Warehouse path is 'file:/Users/zhuxuemin/IdeaProjects/spark-3.0-examples/spark-warehouse'.
21/12/15 22:42:22 INFO CodeGenerator: Code generated in 265.914341 ms
21/12/15 22:42:22 INFO SparkContext: Starting job: count at BarrierRddExample.scala:15
21/12/15 22:42:22 WARN DAGScheduler: Barrier stage in job 0 requires 4 slots, but only 1 are available. Will retry up to 40 more times
21/12/15 22:42:37 WARN DAGScheduler: Barrier stage in job 0 requires 4 slots, but only 1 are available. Will retry up to 39 more times

观察最后的提示,以上就是Barrier stage需要4个槽位,但是其实只有一个,因为我们repartition(4)是开了4个并发,线程数量不够4个,就会等着不执行了。
我们调整到线程数量为4,再看看效果:

21/12/15 22:48:36 INFO DAGScheduler: ResultStage 1 (count at BarrierRddExample.scala:15) finished in 0.325 s
21/12/15 22:48:36 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/12/15 22:48:36 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
21/12/15 22:48:36 INFO DAGScheduler: Job 0 finished: count at BarrierRddExample.scala:15, took 1.269630 s
count is 100

结果就是执行完成了。barrierRdd会等待资源足够的时候同时执行。

引入背景

可能最大的疑惑就是不知道有啥子用,这项特性是jira中提到的,原始链接:https://issues.apache.org/jira/browse/SPARK-24374,标题叫做 SPIP: Support Barrier Execution Mode in Apache Spark,页面里面是同时上传了pdf文档的

我简单翻译一下,前面是介绍背景吧啦吧啦,略去,捡中间部分
…考虑一个简单的场景:作为一名数据科学家,我想构建一个管道,从生产数据仓库中获取训练事件并拟合具有数据并行性的 DL 模型。这里,“生产数据仓库”因公司而异,例如 Apache Hive,来自 AWS 的 Redshift、来自 Azure 的 Data Lake、来自 Databricks 的 Delta 等。经过社区多年的发展,Spark 是从这些生产数据仓库读取数据的事实上的选择,而 TensorFlow 和 Horovod 是分布式 DL 训练的流行选择。作为末端用户,如何进行衔接(数仓->模型训练)?我们可以简化吗?这里的建议是在 Apache Spark 中添加一个新的调度模型,以便用户可以将分布式 DL 训练正确嵌入作为 Spark 阶段,以简化分布式训练工作流程。例如,Horovod 使用 MPI 实现 all-reduce 以加速分布式 TensorFlowtraining。计算模型与 Spark 使用的 MapReduce 不同。在 Spark 中,阶段中的任务不依赖于同一阶段中的任何其他任务,因此可以独立调度。在 MPI 中,所有Worker同时启动并传递消息。为了在 Spark 中嵌入这个工作负载,我们需要引入一个新的调度模型,暂命名为“barrierscheduling”,它在启动任务的同时,为用户提供足够的信息和工具来嵌入分布式 DL 训练。 Spark 还可以提供额外的容错层,以防某些任务在中间失败,Spark 会中止所有任务并重新启动阶段。

为什么需要新的执行模式

传统的Spark执行模式我们仍旧算作为Map/Reduce模式,那么这种执行模式多年来一直很好地适用于不同的工作负载。为什么我们现在需要不同的执行模式?

原因之一是在spark上支持深度学习框架。深度学习框架不适合 Map/Reduce 模型。它们与称为 MPI(消息传递接口)的其他类型的执行模型配合得很好。例如,Uber 的大规模深度学习开源框架 Horovod 使用 MPI 为各种 DL 框架实现分布式深度学习。

为了原生支持深度学习,spark 需要支持不同于 Map/Reduce 的执行模型。新的执行模型以 MPI 为模型。

屏障执行模式

  1. 工作是阶段的集合。在这些阶段之间通常会进行洗牌。这与 Map/Reduce 模型相同
  2. 每个阶段都是任务的集合。这些任务都是一起开始的,而且是相互依赖的。与 Map/Reduce 相比,这是主要的区别之一。在 MPI 模型中,任务可以相互通信并相互依赖。所以他们需要一起开始。
  3. 由于任务相互依赖,当其中一个任务失败时,所有任务都会重试。同样,这与 Map/Reduce 模型不同。
  4. 任务数量始终由开发人员决定。这是因为即使数据可能很小,计算也可能复杂得多,并且可能需要比典型处理更多的资源。还应该有足够的资源来一起运行所有任务。

后记

我们其实可以说,Spark的MPI时代来了~

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-16 17:44:46  更:2021-12-16 17:45:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 11:57:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码