1 问题发现
以下两段代码之间的区别是什么?
- Example1 用了 spark.close
- Example2 没用 spark.close
object Example1 {
def main(args: Array[String]): Unit = {
try {
val spark = SparkSession.builder.getOrCreate
// spark code here
} finally {
spark.close
}
}
}
object Example2 {
val spark = SparkSession.builder.getOrCreate
def main(args: Array[String]): Unit = {
// spark code here
}
}
2 参考答案
两段都一样。Spark会话的stop / close 最终称为 spark上下文的stop 。
def stop(): Unit = {
sparkContext.stop()
}
override def close(): Unit = stop()
Spark上下文已运行时间shutdown hook,以在退出JVM之前关闭spark上下文。
ShutdownHookManager.addShutdownHook(
_shutdownHookRef = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
stop()
}
因此,无论JVM如何退出,都将调用它。如果手动stop() ,此关闭挂钩将被取消以避免重复
def stop(): Unit = {
if (LiveListenerBus.withinListenerThread.value) {
throw new SparkException(
s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}")
}
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
if (!stopped.compareAndSet(false, true)) {
logInfo("SparkContext already stopped.")
return
}
if (_shutdownHookRef != null) {
ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
}
更多原理の补充说明:
当完成SparkSession 的使用时,始终关闭SparkSession 会触发释放可能提供给其他应用程序的群集资源。SparkSession 是一个会话,因此维护一些消耗JVM内存的资源。
我们可以拥有任意数量的SparkSession,但如果不使用内存,就不希望它们使用了不应该使用的内存,所以我们就要close 一个我们不再需要的。
SparkSession 是Spark SQL围绕Spark Core SparkContext 的包装器,因此在任何Spark应用程序中,我们都会被分配到群集资源,即vcores和内存。也就说是,只要SparkContext 正在使用,集群资源就不会分配给其他任务(不一定是Spark,也适用于提交给集群的其他非Spark应用程序 )。
这些群集资源是你的,直到你说“我干完了”,这转化为 SparkSession 。
但是,如果在close 之后只需退出Spark应用程序,就不必考虑执行close ,因为无论如何都会自动关闭资源。驱动程序和执行程序的JVM终止,集群的(心跳)连接也终止,因此最终将资源返回给集群管理器,以便它可以提供给其他应用程序使用。
|