原文链接:https://blog.csdn.net/wangpei1949/article/details/90734304
同背压机制一样,Spark Streaming动态资源分配(即DRA,Dynamic Resource Allocation)也可以用来应对流处理中批次流量过载的场景。
Spark Streaming动态资源分配,允许为应用动态分配资源。当任务积压时,申请更多资源;当任务空闲时,使用最少资源。
在生产中,可将动态资源分配和背压机制一起使用,通过背压机制来细粒度确保系统稳定;通过动态资源分配机制来粗粒度根据应用负载,动态增减Executors。共同保证Spark Streaming流处理应用的稳定高效。
SparkStreaming的背压机制
原文链接:https://www.codenong.com/cs110871449/
主要是调整生产者和消费者的速率匹配情况
Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。(注:当处理能力小于接收速率时,容易造成内存溢出,但是当大于限制的接收速率时,又会浪费性能优势)
为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure):根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用背压机制,默认值false,即不启用。
注: 背压机制:动态调整采集器采集消息的速度与执行器处理消息的速度 采集器对kafka中的消息进行采集,然后将消息分配给执行器处理 1) 当采集器接收消息过快,Executor来不及处理,会导致内存溢出,所以采集器要降低采集速度 2) 当采集器的速度降低到一定程度,Executor处理消息的能力大于采集速度,会有浪费性能 3) 背压机制就是采集器根据Executor处理消息的能力动态调整采集速度,以便达到最佳的处理效果。 4)当数据量大,且不可控时,需要开启背压机制,动态调整接收与处理的速率。反之当数据量稳定时需要关闭背压机制,自己手动设置接收器速率,因为背压机制会一直占用计算资源进行动态调整的计算。
Spark 动态资源分配(Dynamic Resource Allocation) 解析
原文链接:https://cloud.tencent.com/developer/article/1195244
我们先看看,动态资源调整需要解决哪几个问题:
-
Cache问题。如果需要移除的Executor含有RDD cache该如何办? Cache去掉了重算即可。为了防止数据抖动,默认包含有Cache的Executor是不会被删除的,因为默认的Idle时间设置的非常大 -
Shuffle问题。 如果需要移除的Executor包含了Shuffle Write先关数据该怎么办? 而对于Shuffle,则需要和Yarn集成,需要配置yarn.nodemanager.aux-services。具体配置方式,大家可以Google。这样Spark Executor就不用保存Shuffle状态了。 -
添加和删除之后都需要告知DAGSchedule进行相关信息更新。 -
添加Worker的触发条件是:有Stage正在运行,并且预估需要的Executors > 现有的 -
删除Woker的触发条件是:一定时间内(默认60s)没有task运行的Executor 我们看到触发条件还是比较简单的。这种简单就意味着用户需要根据实际场景,调整各个时间参数,比如到底多久没有运行task的Executor才需要删除。
如果启动了动态资源分配,则在Yarn上可以看到,随着Spark Streaming任务队列中Queued的Batch越来越多,Executors数量在逐渐增加。
|