10.Spark调优策略

一:资源设置
core memory executor-num
executor driver

1)--executor-memory MEM 1G 每个executor的内存大小
Cache
shuffle
task
2)--executor-cores NUM 1 每个executor的cpu core数量
4exe * 2core = 8个
4exe * 4core = 16个
3)--num-executors 2 executor的数量
4exe * 2core = 8个
8exe * 2core = 16个
100task ? * 2 ?
4)--queue root.用户名 运行的队列
${SPARK_HOME}/bin/spark-submit --class xxxx \
master yarn \
--deploy-mode cluster \
--executor-cores ? \
--num-executors ? \
--executor-memory ? \
application.jar xxx yyy zzz
二:广播变量在Spark中的使用
50exe 1000task
val params = … // 10M val rdd = … rdd.foreach(x=>{…params…})
1000task * 10M = 10G 50exe * 10M = 500M
作业题: ETL处理中日志 join ip这个功能结合我们讲解的广播变量的原理及实现 重构我们的ETL实现逻辑
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object JoinApp02 {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.1")
val spark = SparkSession.builder().master("local[*]").appName("JoinApp02").getOrCreate()
val peopleInfo: collection.Map[String, String] = spark.sparkContext
.parallelize(Array(("100", "pk"), ("101", "jepson"))).collectAsMap()
val peopleBroadcast: Broadcast[collection.Map[String, String]] = spark.sparkContext.broadcast(peopleInfo)
val peopleDetail: RDD[(String, (String, String, String))] = spark.sparkContext
.parallelize(Array(("100", "ustc", "beijing"), ("103", "xxx", "shanghai")))
.map(x => (x._1, x))
peopleDetail.mapPartitions(x => {
val broadcastPeople: collection.Map[String, String] = peopleBroadcast.value
for((key,value) <- x if broadcastPeople.contains(key))
yield (key, broadcastPeople.get(key).getOrElse(""), value._2)
}).foreach(println)
Thread.sleep(2000000)
spark.stop()
}
}
三:Shuffle调优
map端缓冲区大小 : spark.shuffle.file.buffer 默认32k
如果过小 ==> 数据频繁写入磁盘文件
reduce端拉取数据缓冲区大小: spark.reducer.maxSizeInFlight
reduce端拉取数据重试次数:spark.shuffle.io.maxRetries 默认3次
reduce端拉取数据等待间隔:spark.shuffle.io.retryWait 默认5s
实际生产中这些参数调成可以以乘以2的叠加调整
四:JVM相关
对象==>eden 和其中一个survivor0区 此时另外一个survivor1是空
eden + survivor0 满 ==> minor gc
==> survivor1
8:1:1
old enough or Survivor2 is full ==> old ==> full gc
stop the world
个人建议在生产上优先使用CMS G1 JDK8
young gc/full gc 导致 stop the time 可能会导致出现 file not found/ file lost / timeout 等异常错误
调节连接等待时长 避免gc 导致的超时 spark.core.connection.ack.wait.timeout
调节executor堆外内存 spark.yarn.executor.memoryOverhead 1~2G之间 executor lost、oom、shuffle output file cannot find 300M
spark-submit时通过–conf 把我们讲解的这些参数一一设置进去即可
内存 统一内存管理方式(可以借的) 静态管理方式(固定)
其他调优
官方提供的调优文档: http://spark.apache.org/docs/latest/tuning.html
打开页面直接翻译看 
预估对象占用内存


12.云平台建设的思考
大数据项目和平台的差异性对比
Hadoop、Hive、Spark、Flink、Storm、Scala 其实都是大数据开发必备的技术/框架
项目(项目):以功能为主 中小公司/大公司
平台:大公司 提供给用户通用/定制化的解决方案
数据采集
离线计算
实时计算
机器学习
图计算
......
认知云平台能为我们提供的能力
为什么要构建大数据云平台 1)统一管控 数据分散、异构 ==> 信息孤岛 数据的存储、资源 ==> 统一的资源管控 造成资源浪费 2)能力 运维、支撑程度 性能、规模瓶颈
==>
数据湖:数据和资源的共享
云能力:集群、扩容、快速开发
大数据云平台功能架构
大数据云平台所涉及到的常见的功能 
数据湖架构

数据存储和计算角度剖析
存储和计算:大数据的思想 ==> Hadoop 公有云 多租户 多用户 ==> 数据安全 ==> 不同的租户或者用户登录到系统之后,能访问的数据的权限是不一样 ==> Table: 行 列 ==> 数据权限控制到行和列级别 ==> 使用的资源也是不一样的 queue 私有云
有多个集群:热 冷 备份
数据迁移 自动、压缩
计算能力能否无缝对接 √
隔离:队列隔离、资源隔离
资源角度剖析
资源分配如何最优、资源使用率最大化 Spark/Flink spark-submit 多少executor、每个executor多少memory、每个executor多少core?
调度框架:AZ、Oozie、crontab
一个作业涉及多个Job:workflow
多个job之间如何做资源隔离
Spark/Flink on YARN:申请资源是需要一定的时间的,调优使得申请资源的时间减少!
10分钟:2分钟+8分钟 30秒+9分30秒
兼容性角度剖析
兼容性角度 Apache Hadoop CDH Cloudera Manager HDP MapR 华为:Huawei Manager FusionInsight 阿里 腾讯 百度 …
假设数据平台是基于CDH/HDP来构建的
但是:用户现在已有的数据是基于阿里、华为云来开发和运维的
务必要提前做一件事情:熟悉已有的数据平台的一些功能 vs 我们数据平台的功能
数据存储:CarbonData
调度:oozie ==> PK调度
执行引擎和运行方式适配角度剖析
引擎的适配 离线:Spark Flink 做一个功能:按照我们的产品设计文档,写一份代码,可以同时运行在不同的引擎上 √Spark √Flink Apache Beam 支持写一份代码,可以同时运行在不同的引擎上像spark、flink
运行方式的适配 √YARN √K8S(Docker)
Spark和Flink的选择
Spark和Flink的选择性问题 Spark到现在社区是比较成熟、提供的功能也是比较完善的 Flink 实时部分
Spark 离线的性能优于Flink Fllink的实时性优于SparkStreaming
|