IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 山东大学软件工程应用与实践: Spark(三) 代码分析 -> 正文阅读

[大数据]山东大学软件工程应用与实践: Spark(三) 代码分析

2021SC@SDUSC


目录

SparkEnv

1.安全管理器SecurityManager

2.基于Akka的分布式消息系统ActorSystem

3.map任务输出跟踪器mapOutputTracker


???????

SparkEnv

SparkEnv的方法createDriverEnv最终调用create创建SparkEnv。SparkEnv的构造步骤如下:

  1. 创建安全管理器SecurityManager;
  2. 创建基于Akka的分布式消息系统ActorSystem;
  3. 创建Map任务输出跟踪器mapOutputTracker;
  4. 实例化ShuffleManager;
  5. 创建ShuffleMemoryManager;
  6. 创建块传输服务BlockTransferService;
  7. 创建BlockManagerMaster;
  8. 创建块管理器BlockManager;
  9. 创建广播管理器BroadcastManager;
  10. 创建缓存管理器CacheManager;
  11. 创建HTTP文件服务器HttpFileServer;
  12. 创建测量系统MetricsSystem;
  13. 创建SparkEnv

1.安全管理器SecurityManager

SecurityManager主要对权限、账号进行设置,如果使用Hadoop YARN作为集群管理器,则需要使用证书生成secret key登录,最后给当前系统设置默认的口令认证实例,此实例采用匿名内部类实现。代码如下

private val secretKey = generateSecretKey()

//使用HTTP连接设置口令认证 
if (authOn) { 
    Authenticator.setDefault(
            new Authenticator(){
                override def getPasswordAuthentication() : PasswordAuthentication = {
                    var passAuth: PasswordAuthentication - null
                val userInfo = getRequestingURL().getUserInfo()
                if (userInfo != null){
                    val parts = userInfo.split(":", 2)
                    passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
                    }
                    return passAuth
                }
            }
    )
}

2.基于Akka的分布式消息系统ActorSystem

ActorSystem是Spark中最基础的设施, Spark既使用它发送分布式消息, 又用它实现并发编程。 在Scala中只需要自定义类型继承Actor, 并且提供act方法, 就如同Java里实现Runnable接口,需要实现run方法一样。但是不能直接调用act方法, 而是通过发送消息的方式(Scala发送消息是异步的)传递数据。 如:

Actor ! message

Akka是Actor编程模型的高级类库, 类似千JDK 1.5之后越来越丰富的并发工具包, 简化了程序员并发编程的难度。 ActorSystem便是Akka提供的川千创建分布式消息通信系统的基础类。?
?

SparkEnv中创建ActorSystem时用到了AkkaUtils工具类,代码如下。AkkaUtils.createActorSystem方法用于启动ActorSystem。AkkaUtils使用了Utils的静态方法startServiceOnPort,startServiceOnPort最终会回调方法startService:Int => (T, Int),此外的startService实际是方法doCreateActorSystem。真正启动ActorSystem是由doCreateActorSystem方法完成的。Spark的Driver中Akka的默认访问地址是akka://sparkDriver,Spark的Executor中Akka的默认访问地址是akka://sparkExecutor。如果不指定ActorSystem的端口,那么所有节点的ActorSystem端口在每次启动时随机产生。

val (actorSystem, boundPort) = 
    Option(defaultActorSystem) match {
        case Some(as) => (as, port)
        case None =>
            val actorSystemName = if (isDriver) driverActorSystemName else
                executorActorSystemName
            AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf,             
                securityManager)
}
def createActorSystem(
        name: String,
        host: String,
        port: Int,
        conf: SparkConf
        securityManager: SecurityManager): (ActorSystem, Int) = {
    val startService: Int => (ActorSystem, Int) = { actualPort =>
        doCreateActorSystem(name, host, actualPort, conf, securityManager)
    }
    Utils.startServiceOnPort(port, startService, conf, name)
}

3.map任务输出跟踪器mapOutputTracker

mapOutputTracker用于跟踪map阶段任务的输出状态,此状态便于reduce阶段任务获取地址及中间输出结果。每个map任务或者reduce任务都会有其唯一标识,分别为mapId和reduceId。每个reduce任务的输入可能是多个map任务的输出,reduce会到每个map任务的所有节点上拉取Block,这一过程叫shuffle。每批shuffle过程都有唯一的标识shuffleId。

首先是MapOutputTrackerMaster。MapOutputTrackerMaster内部使用mapStatuses:TimeStampedHashMap[Int, Array[MapStatus]]来维护跟踪各个map任务的输出状态。其中key对应shuffleId,Array存储各个map任务对应的状态信息MapStatus。由于MapStatus维护了map输出Block的地址BlockManagerId,所以reduce任务知道从何处获取map任务的中间输出。MapOutputTrackerMaster还使用cachedSerializedStatuses: TimeStampedHashMap[lnt,??Array[Byte]]维护序列化后的各个map任务的输出状态。 其中key对应shuffleld, Array存储各个序列化MapStatus生成的字节数组。

无论是Driver还是Executor, 最后都由mapOutputTracker的属性trackerActor有MapOutputTrackerMasterActor的引用,代码如下

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
    if (isDriver) {
        logInfo("Registering " + name)
        actorSystem.actorOf(Pros(newActor), name = name)
    } else {
        AkkaUtils.makeDriverRef(name, conf, actorSystem)
    }
    }
    
    val mapOutputTracker = if (isDriver) {
        new MapOutputTrackerMaster(conf)
    } else {
         new MapOutputTrackerWorker(conf)
}

    mapOutputTracker.trackerActor = registerOrLookup(
        "MapOutputTracker",
    new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf(MapOutputTrackerMaster], conf))

registerOrLookup方法通过调用AkkaUtils.makeDriverRef找到MapOutputTrackerMasterActor,实际正是利用ActorSystem提供的分布式消息机制实现的。也体会到了Akka的便捷。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-02 16:50:07  更:2021-12-02 16:52:25 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 14:07:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码