IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark优化,多线程提交任务,提升效率 -> 正文阅读

[大数据]Spark优化,多线程提交任务,提升效率

优化背景:

for循环提交4次任务,会触发4个Job,由于Driver的单线程运行及Spark的任务调度决定了4个Job是串行执行,但这个4个任务是无关的,可以并行执行。

优化思路

通过线程池并行提交Job,Driver端不卡顿。

具体实现

val listBuffer = new ListBuffer[Future[String]]
    val service: ExecutorService = Executors.newFixedThreadPool(4)
    for (i <- 0 to 3) {
      val task: Future[String] = service.submit(new Callable[String] {
        override def call(): String = {
          println(s"第${i}个任务。。。。。。。。。。。。。。。。")
          val k = i
          reRunDF
            .withColumn(fieldStockAttributeId, lit(k))
            .createOrReplaceTempView(s"${OverseasDetailQuantityReport.tblWwarehouseStorageRecord}_$k")

          val resFrame = spark.sql(OverseasDetailQuantityReport.sqlMain(k))
          resFrame.show()

          writeStarRocks(resFrame, OverseasDetailQuantityReport.tblDetailQuantity, dbInfo)
          writeToKafka(resFrame, OverseasDetailQuantityReport.tblDetailQuantity)
          println(s"第${i}个任务。。。。。。。。。。。。。。。。结束")
          "success"
        }
      })

      listBuffer.append(task)
    }

    //遍历获取结果
    listBuffer.foreach(result=>{
      println(result.get())
    })

    service.shutdown()

效果

优化前 : 5分钟
在这里插入图片描述

优化后:44秒
在这里插入图片描述

关键点

1,要用callable,不能用runnable,runnable没有返回值,无法阻塞driver,不阻塞driver导致driver线程马上结束,应用终止。callable有返回值,可以通过获取返回值阻塞Driver,应用能正常运行。阻塞代码如下:

//遍历获取结果
    listBuffer.foreach(result=>{
      println(result.get())
    })

2,使用了for循环,createOrReplaceTempView时临时表名必须是动态的,否则循环注册的临时表名相同,导致后续计算从同一张表中获取。

.createOrReplaceTempView(s"${OverseasDetailQuantityReport.tblWwarehouseStorageRecord}")

需改为动态临时表名:

.createOrReplaceTempView(s"${OverseasDetailQuantityReport.tblWwarehouseStorageRecord}_$k")

3,集群必须要有足够的资源,且提交任务时要申请足够的资源,否则调度系统仍然会让Job排队执行

/usr/local/service/spark/bin/spark-submit --master yarn --jars ./jars/guava-29.0-jre.jar --conf "spark.executor.extraClassPath=guava-29.0-jre.jar"   --num-executors 6 --executor-cores 2 --executor-memory 4g --class com.quantity.OverseasDentityReportApp /home/hadoop/cter/finbatch-1.0.jar daily
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-01 00:08:39  更:2022-04-01 00:12:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 15:03:27-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码