优化背景:
for循环提交4次任务,会触发4个Job,由于Driver的单线程运行及Spark的任务调度决定了4个Job是串行执行,但这个4个任务是无关的,可以并行执行。
优化思路
通过线程池并行提交Job,Driver端不卡顿。
具体实现
val listBuffer = new ListBuffer[Future[String]]
val service: ExecutorService = Executors.newFixedThreadPool(4)
for (i <- 0 to 3) {
val task: Future[String] = service.submit(new Callable[String] {
override def call(): String = {
println(s"第${i}个任务。。。。。。。。。。。。。。。。")
val k = i
reRunDF
.withColumn(fieldStockAttributeId, lit(k))
.createOrReplaceTempView(s"${OverseasDetailQuantityReport.tblWwarehouseStorageRecord}_$k")
val resFrame = spark.sql(OverseasDetailQuantityReport.sqlMain(k))
resFrame.show()
writeStarRocks(resFrame, OverseasDetailQuantityReport.tblDetailQuantity, dbInfo)
writeToKafka(resFrame, OverseasDetailQuantityReport.tblDetailQuantity)
println(s"第${i}个任务。。。。。。。。。。。。。。。。结束")
"success"
}
})
listBuffer.append(task)
}
listBuffer.foreach(result=>{
println(result.get())
})
service.shutdown()
效果
优化前 : 5分钟
优化后:44秒
关键点
1,要用callable,不能用runnable,runnable没有返回值,无法阻塞driver,不阻塞driver导致driver线程马上结束,应用终止。callable有返回值,可以通过获取返回值阻塞Driver,应用能正常运行。阻塞代码如下:
listBuffer.foreach(result=>{
println(result.get())
})
2,使用了for循环,createOrReplaceTempView时临时表名必须是动态的,否则循环注册的临时表名相同,导致后续计算从同一张表中获取。
.createOrReplaceTempView(s"${OverseasDetailQuantityReport.tblWwarehouseStorageRecord}")
需改为动态临时表名:
.createOrReplaceTempView(s"${OverseasDetailQuantityReport.tblWwarehouseStorageRecord}_$k")
3,集群必须要有足够的资源,且提交任务时要申请足够的资源,否则调度系统仍然会让Job排队执行
/usr/local/service/spark/bin/spark-submit --master yarn --jars ./jars/guava-29.0-jre.jar --conf "spark.executor.extraClassPath=guava-29.0-jre.jar" --num-executors 6 --executor-cores 2 --executor-memory 4g --class com.quantity.OverseasDentityReportApp /home/hadoop/cter/finbatch-1.0.jar daily
|