1. spark为什么这么快,spark sql一定比hive快吗?
spark是基于内存计算的,速度比mapreduce要快。与mr相比spark使用DAG有向无环图进行计算,减少了数据的落地,而mr则是每次计算数据都会写入磁盘,再从磁盘读取出来计算。
spark比mr快主要两个原因:
①mr通常需要将计算结果写入磁盘,然后还要读取磁盘,从而导致频繁的磁盘IO。
②mr采用的多进程模型,而spark采用了多线程模型。也就是说mr的map task 和reduce task 是进程级别的,而spark task 则是基于线程模型,也就是说 map reduce task都是jvm进程,每次启动都需要重新申请资源,消耗大量时间;spark则是通过复用线程池中的线程减少启动,关闭task所需要的开销。
题外话:写mr任务时可以设置jvm重用次数来优化mr
总结:Spark快不是绝对的,但绝大多数情况下,spark都比mr要快,特别是在迭代计算中。者得益于spark对内存使用和jvm使用的优化。
2. 谈谈你对RDD理解
RDD是Spark提供最基本的也是最重要的数据抽象概念,它是一种有容错机制的特殊数据集合,可以分布式在集群的节点上,一函数式操作集合的方式进行各种并行操作。RDD核心特点包括:
① A list of partitions 一个分区列表
② a function for computing each split 计算每个分区的函数
③ a list of dependencies on other RDDs rdd 依赖列表
④ optionally, a partitioner for key-value rdds(e.g. to say that the RDD is hash-partitioned) 对于key-value类型rdd 分区函数对每个分区
⑤ optionally, a list of preferred locations to compute each split on (e.g. block locations for an hdfs file) 每个分区都有标记位置
RDD具有容错机制,是只读的,可以执行转换操作创建新的RDD。具体来讲,RDD具有以下几个属性:
① 只读的:只能通过转换操作来生成新的RDD
②分布式:可以分布在多台机器上进行并行处理
③弹性:计算过程中内存不够时它会和磁盘进行数据交换
④基于内存:可以全部或部分缓存在内存中,在多次计算间重用
3. 简述下spark中缓存机制与checkpoint机制,说明两者的区别和联系
cache 能够让重复数据在同一个 application 中的 jobs 间共享。RDD的cache()方法其实调用的就是persist方法,默认的缓存策略为MEMORY_ONLY。
cache与persist的目的是:将会重复多次使用的RDD进行持久化或者说缓存,避免重复计算,从而减少应用的执行时间。
checkpoint的目的是:容错。
即cache 和 checkpoint 的显著区别是:cache把 RDD 计算出来然后放在内存中, 但是RDD 的依赖链也不能丢掉, 当某个点某个 executor 宕了, 上面cache 的RDD就会丢掉, 需要通过依赖链重新计算出来;而 checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存储,所以依赖链就可以丢掉了,即斩断了依赖。
这里值得注意的是:cache 机制是每计算出一个要 cache 的 partition 就直接将其 cache 到内存了。但checkpoint 没有使用这种第一次计算得到就存储的方法,而是等到 job 结束后另外启动专门的 job 去完成 checkpoint 。也就是说需要 checkpoint 的 RDD 会被计算两次。因此,在使用 checkpoint 的时候,应该先执行 cache 操作,这样第二次运行的 job 就不用再去计算该 rdd 了,直接读取 cache 写磁盘。
cache:主要目的是RDD数据缓存,不会截断血缘关系,使用计算过程中的数据缓存。
checkpoint:主要目的是容错,会截断依赖,checkpoint 会额外提交一次任务。
4. spark核心组件及其功能
Master(Cluster Manager):集群中的管理节点,管理集群资源,通知Worker启动Executor或Driver。
Worker:集群中工作节点,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver或Executor
Driver:执行Spark应用中的main方法,负责实际代码的执行工作。其主要的任务:
- 负责向集群申请资源,向master注册信息
- Executor启动后向Driver反向注册
- 负责作业的解析、生成Stage并调度Task到Executor上
- 监控Task的执行情况,执行完成后释放资源
- 通知master注销应用程序
Executor:是一个 JVM 进程,负责执行具体的Task。Spark 应用启动时, Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃, 会将出错节点上的任务调度到其他 Executor 节点上继续运行。Executor 核心功能:
5. 简述Spark中共享变量(广播变量和累加器)的基本原理和用途
通常情况下,一个传递给 RDD 操作(如map、reduceByKey)的 func 是在远程节点上执行的。函数func 在多个节点执行过程中使用的变量,是Driver上同一个变量的多个副本。这些变量以副本的方式拷贝到每个task中,并且各task中变量的更新并不会返回 Driver 。
为了解决以上问题,Spark 提供了两种特定类型的共享变量 : 广播变量 和 累加器。广播变量主要用于高效分发较大的数据对象,累加器主要用于对信息进行聚合。
广播变量的好处,不需要每个task带上一份变量副本,而是变成每个节点的executor才一份副本。这样的话, 就可以让变量产生的副本大大减少。而且 Spark 使用高效广播算法(BT协议)分发广播变量以降低通信成本。
累加器是 Spark 中提供的一种分布式的变量机制,在Driver端进行初始化,task中对变量进行累加操作。
广播变量典型的使用案例是Map Side Join;累加器经典的应用场景是用来在 Spark 应用中记录某些事件/信息的数量。
6. Spark提交作业参数
缺省值:1 in YARN mode, all the available cores on the worker in standalone and Mesos coarse-grained modes
生产环境中不宜设置为1!否则 work 进程中线程数过少,一般 2~5 为宜;
缺省值:1g
该参数与executor分配的core有关,分配的core越多 executor_memory 值就应该越大;core与memory的比值一般在 1:2 到 1:4 之间,即每个core可分配2~4G内存。如 executor_cores为4,那么executor_memory 可以分配 8G ~ 16G;
单个Executor内存大小一般在 20G 左右(经验值),单个JVM内存太高易导致GC代价过高,或资源浪费;
- executor_cores * num_executors
表示的是能够并行执行Task的数目。不宜太小或太大!理想情况下,一般给每个core分配 2-3 个task,由此可反推 num_executors 的个数;
driver 不做任何计算和存储,只是下发任务与yarn资源管理器和task交互,一般设为 1-2G 即可;
增加每个executor的内存量,增加了内存量以后,对性能的提升,有三点:
-
如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO; -
对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给executor分配更多内存,会减少磁盘的写入操作,进而提升性能; -
task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,性能提升;
7. spark宽窄依赖,以及spark如何划分stage,如何确定每个stage中task个数
RDD之间的依赖关系分为窄依赖( narrow dependency )和宽依赖( Wide Depencency ,也称为Shuffle Depencency )。
窄依赖:指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据规模无关)
宽依赖:是指父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区(O(n),与数据规模有关)
相比于宽依赖,窄依赖对优化很有利,主要基于以下几点:
① 宽依赖往往对应着Shuffle操作,需要在运行过程中将同一个父RDD的分区传入到不同的子RDD分区中,中间可能涉及多个节点之间的数据传输;而窄依赖的每个父RDD的分区只会传入到一个子RDD分区中,通常可以在一个节点内完成转换
② 当RDD分区丢失时(某个节点故障),Spark会对数据进行重算;对于窄依赖,由于父RDD的一个分区只对应一个子RDD分区,这样只需要重算和子RDD分区对应的父RDD分区即可,所以这个重算对数据的利用率是100%的,但对于宽依赖,重算的父RDD分区对应多个子RDD分区的,这样实际上父RDD中只有一部分的数据是被用于恢复这个丢失的子RDD分区的,另一部分对应子RDD的其他未丢失分区,这就造成了多余的计算;更一般的,宽依赖中子RDD分区通常来自多个父RDD分区,所有的父RDD分区都要进行重新计算
Stage:根据RDD之间的依赖关系将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
Task:Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。
8. 如何理解spark中lineage
首先要明确一下 Spark 中 RDD 的容错机制。每一个 RDD 都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系( lineage ),所以只要输入数据是可容错的,那么任意一个 RDD 的分区( Partition )出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。如果一个分区出错了:
对于窄依赖,则只要把丢失的父 RDD 分区重算即可,不依赖于其他分区
对于宽依赖,则父 RDD 的所有分区都需要重算,代价昂贵
所以在长“血统”链特别是有宽依赖的时候,需要在适当的时机设置数据检查点。
9. spark streaming 读取kafka两种方式对比?
方式一:基于Receiver的方式
使用 Receiver 来获取数据。Receiver是使用 Kafka 高阶消费者 API 实现的。Receiver从Kafka中获取的数据存储在Spark Executor的内存中的(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后Spark Streaming 定期启动 job 处理这些数据。
默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,保证数据的零丢失,就必须启用Spark Streaming的预写日志机制(WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统上的预写日志中。即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
方式二、基于Direct的方式
这种方式在Spark 1.3中引入,用来替代 Receiver 方式。这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就使用Kafka的低阶消费者 API 获取Kafka指定offset范围的数据。
优点如下:
简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,是一对一的映射关系;
高性能:如果要保证零数据丢失,在基于 Receiver 方式中,需要开启WAL机制。这种方式效率较低,因为数据实际上存在多份,Kafka 中的数据有多份,复制到HDFS上的数据有多份,Executor中的数据还有一份。而基于 Direct的方式,不依赖 Receiver,不需要开启WAL机制,只要复制 Kafka 中的数据进行处理;
支持用户管理offset,支持消息的EOS传递 (所谓EOS语义指 Exactly Once语义,表示数据有且仅被处理一次)。
两种方式的对比:
基于Receiver的方式,使用 Kafka 高阶API,在 ZooKeeper 中保存消费过的offset,这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失,但无法保证数据消费的精确一次。
基于Direct的方式,使用 Kafka 低阶API实现,Spark Streaming 自己负责追踪消费的offset,并保存在checkpoint中(或者用户自己管理),可以保证数据是消费一次且仅消费一次。
在实际生产环境中大都用Direct方式。
10. yarn-cluster模式作业提交流程
① Client先向RM提交请求,并上传jar到HDFS上
② RM在集群中选择一个NM,在其上启动AppMaster,在AppMaster中实例化SparkContext(Driver)
③ AppMaster向RM注册应用程序,注册的目的是申请资源。RM监控App的运行状态直到结束
④ AppMaster申请到资源后,与NM通信,在Container中启动Executor进程
⑤ Executor向Driver注册,申请任务
⑥ Driver对应用进行解析,最后将Task发送到Executor上
⑦ Executor中执行Task,并将执行结果或状态汇报给Driver
⑧ 应用执行完毕,AppMaster通知RM注销应用,回收资源
11 spark内存管理 ☆☆☆☆☆
Spark集群会启动Driver和Executor两种JVM进程,前者为主控进程,后者负责执行具体的计算任务。一个应用程序有一个Driver和多个Executor,缺省情况下二者都分配是1G内存。Driver的内存管理简单,这里主要介绍从内存管理指的是Executor的内存管理。
Spark可以使用JVM堆内存,但不能精准控制堆内存的申请和释放;spark 1.6 引入了堆外内存,直接在工作节点的系统内存中开辟空间。存储经过序列化的二进制数据。堆外内存空间的占用可以被精确计算,相比较堆内内存来说降低了管理的难度,减少了误差。
Executor内存运行的并发任务共享JVM堆内存,按照用途分为:
-
Storage(存储内存):缓存 RDD 数据和广播变量数据 -
Execution(执行内存):执行Shuffle时占用的内存 -
Other(剩余空间):存储 Spark 内部的对象实例,和用户定义的Spark应用程序中的对象实例
Spark 1.6之前的内存管理采用静态内存管理机制,启动前配置各个区域占比,运行过程中各内存区间的大小均是固定的。堆内内存默认 Storage:Execution:Other为6:2:2,堆外内存没有other区,默认Storage:Execution=1:1。由于 Spark 堆内内存大小的记录是不准确的,Storage 内存和 Execution 内存都有预留空间防止OOM。堆外内存不需要。
Spark 1.6之后引入统一内存管理,存储内存和执行内存共享同一存储空间,可以动态占用对方的空闲区域。运行前设定存储内存和执行内存的比例,运行时:双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
Execution执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间;存储内存的空间被Execution执行内存占用后,无法让对方"归还"
other还是固定的
12. spark数据倾斜怎么办?
数据倾斜指的是在并行处理海量数据过程中,某个或者某些分区的数据显著多于其它分区,从而使得该部分的处理速度成为整个数据集处理的瓶颈。数据倾斜可能发生在Map端,也可能发生在Reduce端。
Map端数据倾斜的主要原因是,输入数据的不均匀:
解决Map端数据倾斜主要是增加数据预处理
Reduce端的数据倾斜是最常见的场景,产生的原因主要是:
-
数据中有很多空值或缺省值 -
Shuffle + Key 分布不均(主要原因)对于空值或缺省值一般而言可以通过过滤解决,因为分析这些数据从业务上讲没有太大意义。“Shuffle + Key分布不均“ 是造成数据倾斜的主要原因,解决这种原因造成的数据倾斜也是解决数据倾斜问题的主要任务。解决此问题的思路有: -
消除Shuffle。没有了Shuffle,自然就没有了数据倾斜。这里典型的解决方案是map端的join -
改变Reduce并行度。这种方法理论上可行,最简单,但通常没有效 -
加盐。给 key 添加随机数,将原来不能打散的数据,强行打散,这是最通用的解决方案
强行打散key的方法主要有:
如果以上办法都不行,采用本方法。将一张表的数据加盐强行打散,对另一张表数据扩容
13. spark调优
第一部分 编码的优化
-
减少对数据源的扫描 -
RDD复用。避免创建重复的RDD。同一份数据,只应该创建一个RDD -
RDD缓存/持久化。选择合理的缓存级别对多次使用的RDD进行持久化;对经过复杂计算、计算链条过长的RDD做checkpoint -
巧用 filter。尽可能早的执行filter操作,过滤无用数据;过滤较多数据后,使用 coalesce 对数据进行重分区 -
使用高性能算子。避免使用groupByKey,根据场景选择使用高性能的聚合算子,如reduceByKey、aggregateByKey;重分区时选择没有shuffle的操作;合理的选择map、mapPartitions;使用foreachPartition 优化输出操作 -
设置合理的并行度,让并行度与资源相匹配 -
合理使用广播变量,减少网络数据的传输 -
Kryo序列化 -
多使用Spark SQL。Spark SQL 编码更容易,开发更简单;Spark的优化器对SQL语句做了大量的优化,一般情况下实现同样的功能,Spark SQL更容易也更高效 -
使用高性的集合框架
第二部分 参数优化
-
shuffle参数调优 -
内存参数调优 -
资源调优 -
启用动态资源分配 -
调节本地等待时长
|