目录
@Author Jeffrey.miao 转载请标明出处:https://blog.csdn.net/The_Inertia/article/details/119839807
1.plugin介绍
2.plugin初始化
2.1 PluginContainer
2.2 rpc通信
2.3?SparkPlugin初始化
3.实战
1.plugin介绍
通过在submit作业时指定 --conf="spark.plugins=org.apache.spark.internal.plugin.TestSparkPlugin",加载自定义Plugin。
2.plugin初始化
2.1 PluginContainer
PluginContainer会在driver和executor中初始化:
driver中:
// org.apache.spark.SparkContext#_plugins
_plugins = PluginContainer(this, _resources.asJava)
sparkcontext初始化时,会初始化一个PluginContainer(DriverPluginContainer),并传入sparkcontext的引用。
executor中:
// org.apache.spark.executor.Executor#plugins
private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(replClassLoader) {
PluginContainer(env, resources.asJava)
}
Executor初始化时,会初始化一个PluginContainer(ExecutorPluginContainer),并传入sparkenv的引用。
2.2 rpc通信
PluginContainer的实现比较简单,就是一些插件和环境的初始化,值得关注就是其中的rpc环境的初始化。driver端,会将所有的plugin注册成一个endpoint,注册到dispatcher中。该endpoint的name是PluginEndpoint类的全限定名。后续executor中的plugin会通过该endpointName与driver端的plugin通信。(目前只开放了executor-plugin到driver-plugin的通信)
// org.apache.spark.internal.plugin.DriverPluginContainer
if (driverPlugins.nonEmpty) {
val pluginsByName = driverPlugins.map { case (name, plugin, _) => (name, plugin) }.toMap
sc.env.rpcEnv.setupEndpoint(classOf[PluginEndpoint].getName(),
new PluginEndpoint(pluginsByName, sc.env.rpcEnv))
}
可惜的是,在ExecutorPluginContainer中并没有注册endpoint,而且rpcEnv也没有直接开放给用户,因此如果不通过反射,就难以实现driver-plugin到executor-plugin的通信。
2.3?SparkPlugin初始化
// org.apache.spark.internal.plugin.PluginContainer#apply
private def apply(
ctx: Either[SparkContext, SparkEnv],
resources: java.util.Map[String, ResourceInformation]): Option[PluginContainer] = {
val conf = ctx.fold(_.conf, _.conf)
val plugins = Utils.loadExtensions(classOf[SparkPlugin], conf.get(PLUGINS).distinct, conf)
if (plugins.nonEmpty) {
ctx match {
case Left(sc) => Some(new DriverPluginContainer(sc, resources, plugins))
case Right(env) => Some(new ExecutorPluginContainer(env, resources, plugins))
}
} else {
None
}
}
无论是driver还是executor端,PluginContainer初始化时,都会加载spark.plugins中配置的plugin类。通过org.apache.spark.util.Utils#loadExtensions方法加载plugin类,如果sparkplugin的构造器中的param是sparkconf,则会优先加载该构造器,否则加载空构造。注意,如果需要在plugin中获取sparkconf中的配置信息,需要声明相应的构造器!
3.实战
实现一个sparkplugin较为简单,只需要分别实现SparkPlugin接口,DriverPlugin接口,ExecutorPlugin接口。具体参考spark的测试类即可。
|