2021SC@SDUSC
目录
SparkEnv
1.安全管理器SecurityManager
2.基于Akka的分布式消息系统ActorSystem
3.map任务输出跟踪器mapOutputTracker
???????
SparkEnv
SparkEnv的方法createDriverEnv最终调用create创建SparkEnv。SparkEnv的构造步骤如下:
- 创建安全管理器SecurityManager;
- 创建基于Akka的分布式消息系统ActorSystem;
- 创建Map任务输出跟踪器mapOutputTracker;
- 实例化ShuffleManager;
- 创建ShuffleMemoryManager;
- 创建块传输服务BlockTransferService;
- 创建BlockManagerMaster;
- 创建块管理器BlockManager;
- 创建广播管理器BroadcastManager;
- 创建缓存管理器CacheManager;
- 创建HTTP文件服务器HttpFileServer;
- 创建测量系统MetricsSystem;
- 创建SparkEnv
1.安全管理器SecurityManager
SecurityManager主要对权限、账号进行设置,如果使用Hadoop YARN作为集群管理器,则需要使用证书生成secret key登录,最后给当前系统设置默认的口令认证实例,此实例采用匿名内部类实现。代码如下
private val secretKey = generateSecretKey()
//使用HTTP连接设置口令认证
if (authOn) {
Authenticator.setDefault(
new Authenticator(){
override def getPasswordAuthentication() : PasswordAuthentication = {
var passAuth: PasswordAuthentication - null
val userInfo = getRequestingURL().getUserInfo()
if (userInfo != null){
val parts = userInfo.split(":", 2)
passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
}
return passAuth
}
}
)
}
2.基于Akka的分布式消息系统ActorSystem
ActorSystem是Spark中最基础的设施, Spark既使用它发送分布式消息, 又用它实现并发编程。 在Scala中只需要自定义类型继承Actor, 并且提供act方法, 就如同Java里实现Runnable接口,需要实现run方法一样。但是不能直接调用act方法, 而是通过发送消息的方式(Scala发送消息是异步的)传递数据。 如:
Actor ! message
Akka是Actor编程模型的高级类库, 类似千JDK 1.5之后越来越丰富的并发工具包, 简化了程序员并发编程的难度。 ActorSystem便是Akka提供的川千创建分布式消息通信系统的基础类。? ?
SparkEnv中创建ActorSystem时用到了AkkaUtils工具类,代码如下。AkkaUtils.createActorSystem方法用于启动ActorSystem。AkkaUtils使用了Utils的静态方法startServiceOnPort,startServiceOnPort最终会回调方法startService:Int => (T, Int),此外的startService实际是方法doCreateActorSystem。真正启动ActorSystem是由doCreateActorSystem方法完成的。Spark的Driver中Akka的默认访问地址是akka://sparkDriver,Spark的Executor中Akka的默认访问地址是akka://sparkExecutor。如果不指定ActorSystem的端口,那么所有节点的ActorSystem端口在每次启动时随机产生。
val (actorSystem, boundPort) =
Option(defaultActorSystem) match {
case Some(as) => (as, port)
case None =>
val actorSystemName = if (isDriver) driverActorSystemName else
executorActorSystemName
AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf,
securityManager)
}
def createActorSystem(
name: String,
host: String,
port: Int,
conf: SparkConf
securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, conf, name)
}
3.map任务输出跟踪器mapOutputTracker
mapOutputTracker用于跟踪map阶段任务的输出状态,此状态便于reduce阶段任务获取地址及中间输出结果。每个map任务或者reduce任务都会有其唯一标识,分别为mapId和reduceId。每个reduce任务的输入可能是多个map任务的输出,reduce会到每个map任务的所有节点上拉取Block,这一过程叫shuffle。每批shuffle过程都有唯一的标识shuffleId。
首先是MapOutputTrackerMaster。MapOutputTrackerMaster内部使用mapStatuses:TimeStampedHashMap[Int, Array[MapStatus]]来维护跟踪各个map任务的输出状态。其中key对应shuffleId,Array存储各个map任务对应的状态信息MapStatus。由于MapStatus维护了map输出Block的地址BlockManagerId,所以reduce任务知道从何处获取map任务的中间输出。MapOutputTrackerMaster还使用cachedSerializedStatuses: TimeStampedHashMap[lnt,??Array[Byte]]维护序列化后的各个map任务的输出状态。 其中key对应shuffleld, Array存储各个序列化MapStatus生成的字节数组。
无论是Driver还是Executor, 最后都由mapOutputTracker的属性trackerActor有MapOutputTrackerMasterActor的引用,代码如下
def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
logInfo("Registering " + name)
actorSystem.actorOf(Pros(newActor), name = name)
} else {
AkkaUtils.makeDriverRef(name, conf, actorSystem)
}
}
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
new MapOutputTrackerWorker(conf)
}
mapOutputTracker.trackerActor = registerOrLookup(
"MapOutputTracker",
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf(MapOutputTrackerMaster], conf))
registerOrLookup方法通过调用AkkaUtils.makeDriverRef找到MapOutputTrackerMasterActor,实际正是利用ActorSystem提供的分布式消息机制实现的。也体会到了Akka的便捷。
|