IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Spark - Executor 初始化 && 报警都进行1次 -> 正文阅读

[Java知识库]Spark - Executor 初始化 && 报警都进行1次

一.引言

程序启动 M?个 Executor,每个 Executor 共 N core,即每个 Executor 上的 task = N,现在有一个 object 需要在每一个 task 上初始化公用变量,有两个问题需要解决:

A.该公用变量在 executor 上共用,所以 N 个 task 只需要有一个构造,其他 N-1 个 task 共用即可

B.初始化阶段,如果报错则至少会收到 M 个相同的报警日志,如何压缩到只收一次

二.一个 Task 初始化

Executor 上 N 个 task 都需要 InitValue 这个变量,所以 SelfObject 初始化一次 N 个 task 就都可以使用,所以这里要采用类似 singleton 单例类的写法,保证 N 个 task 只有1个 task 初始化,其他 N-1 个 Task 等这一个 Task 初始化即可。基于上述描述,通过加锁的方式可以实现需求:

1.主函数侧修改

通过 synchornized 关键字,保证只有一个 task 上执行 init 初始化逻辑,这样即保证了 N-1 个 task 等待 1个 task 初始化,随后公用的原则

2.初始化函数侧修改

为了保证 Singleton,SelfObject 内执行 init 方法时需要判断 initValue 是否为 null,从而保证对应变量只会被初始化一次,这里偷了个懒没有使用双重检测,有需要的同学也可以自己修改为双重检测的 Singleton。

通过主函数端 Synchornized + 初始化函数端?Singleton 的配合,我们达成了上面要求的第一个目标,Executor 上只有一个 Task 初始化对应变量,避免 N 个 task 一起初始化可能造成的并发问题,且节省了初始化的开销,非常的奈斯 👍。

三.一次报警

上面我们可以看到 init 函数内包含了 try-catch 逻辑,这是因为我们 initValue 初始化时有可能发生异常,为了提高响应速度,我们在 case 中加入报警策略,这样任务初始化变量异常时,程序员即可第一时间获得报警从而及时修复任务:

这样修改后新的问题又来了,虽然上面一个 task 初始化保证了 N 个 task 只有一个执行 init 逻辑,最多报警一次,但是由于有 M 个 Executor,所以如果出错时将会收到 M 次报警,如果 M 非常大,显然这不是一个好的方法,所以我们还需要解决第二个问题,M 个 Executor 只有一个 Executor 发送报警即可。

1.复杂版

A.如何保证一次

遇到这个问题,第一反应是通过 Executor 和 Driver 通信,Executor 端初始化出错后,自己不报错,而是将信息同步至 Driver,由于 Dirver 是单线程的,所以让 Driver 进行单独报错,这样可以保证只发一次报警。

B.Executor-Driver 通信

上面的方法可行,接下来就需要寻找 Executor-Driver 通信的手段了,这里由于平常经常计数,所以第一时间想到了?sc.longAccumulator,如果 Executor 上初始化异常,则对?sc.longAccumulator 进行 add 操作,在 Driver 上启动一个常驻线程定时监测?sc.longAccumulator 的值,如果该值超过 0 则代表 Executor 上 init 出错,这时候报警随后将该线程退出即可。

C.实现

这里将监控逻辑的线程添加至?sparkContext 与 RDD 逻辑之间,常驻线程池内定义了 state 变量,保证报警一次后后续不再报警,epoch + sleep 则控制检测的次数和频率,例如初始化任务执行10min,那么可以 10x1min 的频率检测,也可以 20x30second 的频率检测,取决于自己的节奏。同时需要将 monitorNum 传入 init 函数,如果初始化异常则执行??monitorNum.add 的操作,这样 driver 端的常驻线程即可感知并完成报警一次的操作。

Tips:

为什么常驻线程选择 newCachedThreadPool,因为 newCachedThreadPool 线程在空闲 60s 后可以完全退出,即避免 Dirver 端检测线程一直运行对 driver 端造成影响,更详细的?newCachedThreadPool 使用可以参考:Executor - 一文搞懂 ThreadPoolExecutor 与 BlockingQueue。最后贴下完整报警代码:

    // 添加监控
    val monitorNum = sc.longAccumulator("Model Monitor")
    val cachedPool: ExecutorService = Executors.newCachedThreadPool()
    cachedPool.execute(new Runnable {
      override def run(): Unit = {
        // 报警1次或9次没报警退出,60s后线程池退出
        var state = true
        var epoch = 0

        while (state && epoch < 9) {
          if (monitorNum.value > 0) {
            SendMailUtil.send("Object 初始化 initValue", "初始化异常!")
            state = false
          } else {
            Thread.sleep(60000)
            epoch += 1
          }
        }
      }
    })

2.简单好用版?👍

上面的方法可以解决只发一次报警的问题,但是相对比较复杂,且无法传递 Executor 具体报错信息,只能通过 add 的数字变化感知到报错但无法报错具体初始化错误。有一种更好的办法:指定一个 Executor 发报警,剩下的 Executor 忽略。

A.指定 Executor 报警 (推荐👍)

Spark 可以通过 SparkEnv 上下文变量获取对应 ExecutorId,只需指定一个 Executor 发报警即可

    import org.apache.spark.SparkEnv
    
    val executorId = SparkEnv.get.executorId

只需要 if 即可,除此之外,还可以根据 e.getMessage 获取报错相关信息,通过报警函数发送,相比与之前单独起线程的方法,省事了很多。

Tips:

针对单轮次的任务,这样指定问题不大,但是如果是循环执行,由于 Executor 挂掉后会起新的 Executor,且 ExecutorId 是累加的 (例如申请10个 Executor,1号 Executor 挂掉了,这时候会新起一个 11号 Executor,而不是重新启动 1号 Executor),这时候再用 Executor == "1"?判断就有风险了,所以如果任务 Executor 是轮循的且 Executor 有挂掉的风险,建议使用 PartitionId 或者上面复杂的版本,但整体而言这个方法最简单且大部分时候有效。

B.指定 TaskId 报警

上面通过?SparkEnv.get 获取 ExecutorId 进行 if 判断并报警,有些同学一定有疑问,可不可以使用 TaskContext 获取 taskId,指定某个 task 发送报警:

    import org.apache.spark.TaskContext

    val taskId = TaskContext.getPartitionId()

其实也没问题,但是上面的场景不支持,因为使用了 synchronized 关键字,所以初始化时调用的 task 不固定,所以如果 if 指定 taskA 报警,而实际执行 init 的是 taskB,则 taskA 无法感知报错从而发送报警,如果在所有 task 上初始化则没有问题,当然也可以指定一个 PartionId 范围,这样会避免同步锁导致的单一 TaskId 不命中的问题。

C.在页面查看 ExecutorId

指定 ExecutorId 报警后,如果想到对应 Executor 查看 e.printStackTrace 的异常栈怎么找:

可以打开 spark 日志界面,前面第一列即为 ExecutorId,如果指定 if (executorId?== "1") 且 init 阶段报错,接到告警信息后即可第一时间查看 executorId = 1 的 stderr 日志定位 init 初始化阶段的问题,非常的便捷。

四.总结

如何在 Executor 端进行单独初始化 M -> 1,以及如何将 Task 端的错误 MxN -> 1 发送单次报警大致就这么多内容,这里还有一个问题需要解释下,为什么 SelfObject 不在 Dirver 端直接 init 并广播,这样不就直接保证了初始化1次和异常报警1次的需求:这里因为一些 initValue 是官方定义的 class 且不支持序列化,例如 Tensorflow 的 SavedModelBundle?,针对这样的变量如果在 Driver 端初始化并广播则会报 class 无法序列化的问题,所以才需要将 initValue 从 Dirver 初始化挪到 Executor 初始化。同时也有一些反思,面对上述问题,第一时间想到的是 dirver 与 executor 通信并起线程监控的方法,把简单问题复杂化的同时也引入了不必要的资源和时间浪费,还是要透过现象看本质,针对主要矛盾下手,这样可以快速找到最简单可靠的方法。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-06-21 21:21:07  更:2022-06-21 21:23:38 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 17:18:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码