IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Hadoop Yarn作业调度本地性源码分析 -> 正文阅读

[大数据]Hadoop Yarn作业调度本地性源码分析

1、Yarn的调度本地性是指将作业分配到数据所在节点,可以减少很多网络IO,对MR作业来说,只有map task有本地性需求,reduce task和failed map task都没有本地性需求

2、Yarn的调度本地性是通过延迟调度来满足的,本地性有3个级别:节点本地、机架本地和随意调度,当调度不能满足本地性时,调度器会计算错过的调度机会数量,并等待该计数达到阈值,然后将本地性约束放宽到下一个级别

3、为了调度到满足数据本地性的节点,可以错过一定数量的调度机会,这个错过机会数量的阈值由以下参数控制:
FairScheduler

// 配置为浮点数,最终错过的节点数为配置 * 集群总节点数
yarn.scheduler.fair.locality.threshold.node 默认为-1.0f
yarn.scheduler.fair.locality.threshold.rack 默认为-1.0f

CapacityScheduler

// 配置为正整数,即最终错过的节点数
yarn.scheduler.capacity.node-locality-delay 默认为40
yarn.scheduler.capacity.rack-locality-additional-delay 默认为-1

4、如果 YARN 与文件系统分开部署,则应禁用此功能,因为本地性没有意义,将以上参数设置为-1即可禁用本地调度功能

container请求事件的产生

TaskAttemptImpl.java

taskAttempt.eventHandler.handle(new ContainerRequestEvent(
            taskAttempt.attemptId, taskAttempt.resourceCapability,
            taskAttempt.dataLocalHosts.toArray(
                new String[taskAttempt.dataLocalHosts.size()]),
            taskAttempt.dataLocalRacks.toArray(
                new String[taskAttempt.dataLocalRacks.size()])));

dataLocalHosts和dataLocalRacks来源于MapTaskAttemptImpl.java

public MapTaskAttemptImpl(TaskId taskId, int attempt, 
      EventHandler eventHandler, Path jobFile, 
      int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
      TaskAttemptListener taskAttemptListener, 
      Token<JobTokenIdentifier> jobToken,
      Credentials credentials, Clock clock,
      AppContext appContext) {
    super(taskId, attempt, eventHandler, 
        // splitInfo.getLocations()就是输入数据split所在节点
        taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
        jobToken, credentials, clock, appContext);
    this.splitInfo = splitInfo;
  }

接下来是RMContainerAllocator.java,处理container请求事件

private void handleMapContainerRequest(ContainerRequestEvent reqEvent) {
    ...

    if(mapContainerRequestAccepted) {
      // set the resources
      reqEvent.getCapability().setMemorySize(
          mapResourceRequest.getMemorySize());
      reqEvent.getCapability().setVirtualCores(
          mapResourceRequest.getVirtualCores());
      // 添加map
      scheduledRequests.addMap(reqEvent); //maps are immediately scheduled
    } else {
      ...
    }
  }

scheduledRequests.addMap方法

void addMap(ContainerRequestEvent event) {
      ContainerRequest request = null;
      
      ...
        } else {
          // 创建container请求
          request =
              new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
          // 将数据所在节点与task attempt相对应,是为了之后的container分配节点本地性
          for (String host : event.getHosts()) {
            LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
            if (list == null) {
              list = new LinkedList<TaskAttemptId>();
              mapsHostMapping.put(host, list);
            }
            list.add(event.getAttemptID());
            if (LOG.isDebugEnabled()) {
              LOG.debug("Added attempt req to host " + host);
            }
          }
          // 将机架与task attempt相对应,是为了之后的container分配机架本地性
          for (String rack : event.getRacks()) {
            LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
            if (list == null) {
              list = new LinkedList<TaskAttemptId>();
              mapsRackMapping.put(rack, list);
            }
            list.add(event.getAttemptID());
            if (LOG.isDebugEnabled()) {
              LOG.debug("Added attempt req to rack " + rack);
            }
          }
          maps.put(event.getAttemptID(), request);
          // 调用addContainerReq方法
          addContainerReq(request);
        }
        ...
      }
    }

RMContainerRequestor.java
addContainerReq方法

protected void addContainerReq(ContainerRequest req) {
    // Create resource requests
    for (String host : req.hosts) {
      // Data-local
      if (!isNodeBlacklisted(host)) {
        // 添加本地节点资源请求
        addResourceRequest(req.priority, host, req.capability,
            null);
      }
    }

    // Nothing Rack-local for now
    for (String rack : req.racks) {
      // 添加本地机架资源请求
      addResourceRequest(req.priority, rack, req.capability,
          null);
    }

    // 添加任意位置资源请求
    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
        req.nodeLabelExpression);
  }

最终会调用addResourceRequestToAsk方法将资源请求添加进ask

private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
    ...
    ask.remove(remoteRequest);
    ask.add(remoteRequest);    
  }

资源申请过程

AM通过RMContainerAllocator.heartbeat()来对所需资源进行申请,并分配资源:

protected synchronized void heartbeat() throws Exception {
    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
    // 申请资源
    List<Container> allocatedContainers = getResources();
    if (allocatedContainers != null && allocatedContainers.size() > 0) {
      // 分配资源
      scheduledRequests.assign(allocatedContainers);
    }
    ...
  }

getResources()会调用RMContainerRequestor的makeRemoteRequest()方法,这里会把之前存入ask中的资源请求取出并发送

protected AllocateResponse makeRemoteRequest() throws YarnException,
      IOException {
    ...
    // 从ask中取出资源请求
    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
          new ArrayList<ContainerId>(release), blacklistRequest);
    // 发送资源请求
    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
    ...
    return allocateResponse;
  }

接下来的链路如下:
scheduler.allocate -> ApplicationMasterProtocol.allocate() -> ApplicationMasterService.allocate() -> AMSProcessingChain.allocate() -> DefaultAMSProcessor.allocate() -> FairScheduler.allocate() or CapacityScheduler.allocate()

DefaultAMSProcessor.allocate()
这里会区分具体使用的调度器

allocation = getScheduler().allocate(appAttemptId, ask,
            request.getSchedulingRequests(), release,
            blacklistAdditions, blacklistRemovals, containerUpdateRequests);

FairScheduler的资源分配过程

FairScheduler.allocate()
这个方法比较长,关键的逻辑是把资源请求ask更新到FSAppAttempt,并从FSAppAttempt里获取新的container

public Allocation allocate(ApplicationAttemptId appAttemptId,
      List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
      List<ContainerId> release, List<String> blacklistAdditions,
      List<String> blacklistRemovals, ContainerUpdates updateRequests) {
    // Make sure this application exists
    FSAppAttempt application = getSchedulerApp(appAttemptId);
    ...
	// Update application requests
    application.updateResourceRequests(ask);
    ...
    List<Container> newlyAllocatedContainers =
        application.pullNewlyAllocatedContainers();
    ...
	return new Allocation(newlyAllocatedContainers, headroom,
        preemptionContainerIds, null, null,
        updatedNMTokens, null, null,
        application.pullNewlyPromotedContainers(),
        application.pullNewlyDemotedContainers(),
        previousAttemptContainers, null);
}

FSAppAttempt.java
接下来是FairScheduler接收节点心跳最终分配container的逻辑

private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
    ...

    Collection<SchedulerRequestKey> keysToTry = (reserved) ?
        Collections.singletonList(
            node.getReservedContainer().getReservedSchedulerKey()) :
        getSchedulerKeys();

    writeLock.lock();
    try {
      // 这里schedulerKey是请求类型的封装(eg : map、reduce、failed map)
      for (SchedulerRequestKey schedulerKey : keysToTry) {
        ...
        // 调度机会+1,用来计算为实现本地性而放弃的调度次数
        addSchedulingOpportunity(schedulerKey);
        // 获取当前节点所在机架上等待的资源请求
        PendingAsk rackLocalPendingAsk = getPendingAsk(schedulerKey,
            node.getRackName());
        // 获取当前节点上等待的资源请求
        PendingAsk nodeLocalPendingAsk = getPendingAsk(schedulerKey,
            node.getNodeName());

        ...
        // 允许的本地化级别
        NodeType allowedLocality;
        // isContinuousSchedulingEnabled由参数yarn.scheduler.fair.continuous-scheduling-enabled决定,默认为false
        if (scheduler.isContinuousSchedulingEnabled()) {
          allowedLocality = getAllowedLocalityLevelByTime(schedulerKey,
              scheduler.getNodeLocalityDelayMs(),
              scheduler.getRackLocalityDelayMs(),
              scheduler.getClock().getTime());
        } else {
          // 调用getAllowedLocalityLevel方法
          allowedLocality = getAllowedLocalityLevel(schedulerKey,
              scheduler.getNumClusterNodes(),
              // yarn.scheduler.fair.locality.threshold.node 默认为-1.0f
              scheduler.getNodeLocalityThreshold(),
              // yarn.scheduler.fair.locality.threshold.rack 默认为-1.0f
              scheduler.getRackLocalityThreshold());
        }
        
        // 如果当前节点存在资源请求,则直接分配container,实现了节点本地性调度
        if (rackLocalPendingAsk.getCount() > 0
            && nodeLocalPendingAsk.getCount() > 0) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("Assign container on " + node.getNodeName()
                + " node, assignType: NODE_LOCAL" + ", allowedLocality: "
                + allowedLocality + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL,
              reserved, schedulerKey);
        }

        if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
          continue;
        }
		
		// 如果当前节点所在机架存在资源请求,且allowedLocality为RACK_LOCAL或OFF_SWITCH,则分配container,实现了机架本地性调度
        if (rackLocalPendingAsk.getCount() > 0
            && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
            .equals(NodeType.OFF_SWITCH))) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("Assign container on " + node.getNodeName()
                + " node, assignType: RACK_LOCAL" + ", allowedLocality: "
                + allowedLocality + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL,
              reserved, schedulerKey);
        }
        // 获取所有等待的资源请求
        PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
            ResourceRequest.ANY);
        if (!appSchedulingInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) {
          continue;
        }
      
        if (offswitchAsk.getCount() > 0) {
          // 如果allowedLocality为OFF_SWITCH,则分配container,此时没有实现本地调度,为随机调度
          if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks()
              <= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) {
            if (LOG.isTraceEnabled()) {
              LOG.trace("Assign container on " + node.getNodeName()
                  + " node, assignType: OFF_SWITCH" + ", allowedLocality: "
                  + allowedLocality + ", priority: "
                  + schedulerKey.getPriority()
                  + ", app attempt id: " + this.attemptId);
            }
            return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
                reserved, schedulerKey);
          }
        }

        ...
      }
    } finally {
      writeLock.unlock();
    }

    return Resources.none();
  }

getAllowedLocalityLevel方法

NodeType getAllowedLocalityLevel(
      SchedulerRequestKey schedulerKey, int numNodes,
      double nodeLocalityThreshold, double rackLocalityThreshold) {
    // nodeLocalityThreshold和rackLocalityThreshold上限为1
    if (nodeLocalityThreshold > 1.0) {
      nodeLocalityThreshold = 1.0;
    }
    if (rackLocalityThreshold > 1.0) {
      rackLocalityThreshold = 1.0;
    }

    // 如果nodeLocalityThreshold或rackLocalityThreshold小于0,不启用本地调度,可以调度到任意节点
    // If delay scheduling is not being used, can schedule anywhere
    if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
      return NodeType.OFF_SWITCH;
    }

    writeLock.lock();
    try {

      // 默认级别为节点本地 NODE_LOCAL
      if (!allowedLocalityLevel.containsKey(schedulerKey)) {
        allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
        return NodeType.NODE_LOCAL;
      }

      NodeType allowed = allowedLocalityLevel.get(schedulerKey);

      // 如果已经是随机调度OFF_SWITCH,直接返回
      if (allowed.equals(NodeType.OFF_SWITCH)) {
        return NodeType.OFF_SWITCH;
      }

      double threshold = allowed.equals(NodeType.NODE_LOCAL) ?
          nodeLocalityThreshold :
          rackLocalityThreshold;

      // 当放弃的调度机会超出阈值时,降低本地性级别
      int schedulingOpportunities = getSchedulingOpportunities(schedulerKey);
      double thresholdNum = numNodes * threshold;
      if (schedulingOpportunities > thresholdNum) {
        // 节点本地降级到机架本地
        if (allowed.equals(NodeType.NODE_LOCAL)) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("SchedulingOpportunities: " + schedulingOpportunities
                + ", nodeLocalityThreshold: " + thresholdNum
                + ", change allowedLocality from NODE_LOCAL to RACK_LOCAL"
                + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
          resetSchedulingOpportunities(schedulerKey);
        // 机架本地降级到随意调度
        } else if (allowed.equals(NodeType.RACK_LOCAL)) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("SchedulingOpportunities: " + schedulingOpportunities
                + ", rackLocalityThreshold: " + thresholdNum
                + ", change allowedLocality from RACK_LOCAL to OFF_SWITCH"
                + ", priority: " + schedulerKey.getPriority()
                + ", app attempt id: " + this.attemptId);
          }
          allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
          resetSchedulingOpportunities(schedulerKey);
        }
      }
      return allowedLocalityLevel.get(schedulerKey);
    } finally {
      writeLock.unlock();
    }
  }
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-13 11:30:47  更:2021-10-13 11:30:57 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 8:21:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码