SparkConf的介绍
SparkConf用于管理Spark所有的配置项,不论是使用旧的SparkContext作为访问Spark的入口,还是使用新提供的SparkSession统一访问入口,我们会使用SparkConf设置各类参数,并传递给SparkContext或SparkSession。
SparkConf类的定义介绍
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
import SparkConf._
def this() = this(true)
private val settings = new ConcurrentHashMap[String, String]()
@transient private lazy val reader: ConfigReader = {
val _reader = new ConfigReader(new SparkConfigProvider(settings))
_reader.bindEnv(new ConfigProvider {
override def get(key: String): Option[String] = Option(getenv(key))
})
_reader
}
if (loadDefaults) {
loadFromSystemProperties(false)
}
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
def set(key: String, value: String): SparkConf = {
set(key, value, false)
}
private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
if (!silent) {
logDeprecationWarning(key)
}
settings.put(key, value)
this
}
private[spark] def set[T](entry: ConfigEntry[T], value: T): SparkConf = {
set(entry.key, entry.stringConverter(value))
this
}
private[spark] def set[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {
set(entry.key, entry.rawStringConverter(value))
this
}
...
}
(1) loadDefaults参数决定是否加载System Properties中的属性。
def this() = this(true)
如果loadDefaults参数为true,将会调用loadFromSystemProperties(…)方法加载相应的属性。
if (loadDefaults) {
loadFromSystemProperties(false)
}
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
(2)SparkConf还在其伴生对象中定义了废弃的配置项以及一些在不同版本中发生变化的配置项
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
val configs = Seq(
DeprecatedConfig("spark.cache.class", "0.8",
"The spark.cache.class property is no longer being used! Specify storage levels using " +
"the RDD.persist() method instead."),
...
)
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
}
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
"spark.executor.userClassPathFirst" -> Seq(
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
...
private val allAlternatives: Map[String, (String, AlternateConfig)] = {
configsWithAlternatives.keys.flatMap { key =>
configsWithAlternatives(key).map { cfg => (cfg.key -> (key -> cfg)) }
}.toMap
}
其中, deprecatedConfigs使用的DeprecatedConfig中的三个字符串参数分别代表废弃的配置项名称、在哪个版本被废弃的以及废弃提示说明;configsWithAlternatives则使用键值对的形式表示新旧配置项的变化,键为字符串,表示配置项新的名称,值为AlternateConfig序列,包含多个AlternateConfig对象, AlternateConfig中的两个字符串参数分别表示配置项旧的名称以及出现的版本号。
SparkConf配置项的设置
SparkConf配置项的设置有如下三种方法:
- 来源于系统参数
- 使用SparkConf的API进行设置
- 从其他SparkConf中克隆
(1)来源于系统参数
即使用System.getProperties获取的属性中以spark.作为前缀的那部分属性。
(2)使用SparkConf的API进行设置
SparkConf提供了大量以“set”开头的方法方便我们设置配置,如下代码所示:
def setMaster(master: String): SparkConf = {
set("spark.master", master)
}
def setAppName(name: String): SparkConf = {
set("spark.app.name", name)
}
...
(3)从其他SparkConf中克隆
由SparkConf的clone()方法实现的,SparkConf实现了JDK的Cloneable接口,提供了克隆操作。
override def clone: SparkConf = {
val cloned = new SparkConf(false)
settings.entrySet().asScala.foreach { e =>
cloned.set(e.getKey(), e.getValue(), true)
}
cloned
}
SparkConf配置项的获取
SparkConf使用“get”开头的方法,获取不同类型的配置项值,其调用了getOption(…)方法,该方法会先按指定键获取,如果没有获取到就按照对应的废弃键尝试获取:
def get(key: String): String = {
getOption(key).getOrElse(throw new NoSuchElementException(key))
}
...
def getTimeAsSeconds(key: String): Long = catchIllegalValue(key) {
Utils.timeStringAsSeconds(get(key))
}
...
SparkConf配置项的校验
SparkConf提供了validateSettings()方法校验配置项的合法性并记录或打印提示日志,部分源码如下:
private[spark] def validateSettings() {
if (contains("spark.local.dir")) {
val msg = "Note that spark.local.dir will be overridden by the value set by " +
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS" +
" in YARN)."
logWarning(msg)
}
...
getOption(executorOptsKey).foreach { javaOpts =>
if (javaOpts.contains("-Dspark")) {
val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " +
"Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
throw new Exception(msg)
}
if (javaOpts.contains("-Xmx")) {
val msg = s"$executorOptsKey is not allowed to specify max heap memory settings " +
s"(was '$javaOpts'). Use spark.executor.memory instead."
throw new Exception(msg)
}
}
val deprecatedMemoryKeys = Seq(
"spark.storage.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
...
if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +
"instead use \"yarn\" with specified deploy mode."
...
|