1、Yarn的调度本地性是指将作业分配到数据所在节点,可以减少很多网络IO,对MR作业来说,只有map task有本地性需求,reduce task和failed map task都没有本地性需求
2、Yarn的调度本地性是通过延迟调度来满足的,本地性有3个级别:节点本地、机架本地和随意调度,当调度不能满足本地性时,调度器会计算错过的调度机会数量,并等待该计数达到阈值,然后将本地性约束放宽到下一个级别
3、为了调度到满足数据本地性的节点,可以错过一定数量的调度机会,这个错过机会数量的阈值由以下参数控制: FairScheduler
// 配置为浮点数,最终错过的节点数为配置 * 集群总节点数
yarn.scheduler.fair.locality.threshold.node 默认为-1.0f
yarn.scheduler.fair.locality.threshold.rack 默认为-1.0f
CapacityScheduler
// 配置为正整数,即最终错过的节点数
yarn.scheduler.capacity.node-locality-delay 默认为40
yarn.scheduler.capacity.rack-locality-additional-delay 默认为-1
4、如果 YARN 与文件系统分开部署,则应禁用此功能,因为本地性没有意义,将以上参数设置为-1即可禁用本地调度功能
container请求事件的产生
TaskAttemptImpl.java
taskAttempt.eventHandler.handle(new ContainerRequestEvent(
taskAttempt.attemptId, taskAttempt.resourceCapability,
taskAttempt.dataLocalHosts.toArray(
new String[taskAttempt.dataLocalHosts.size()]),
taskAttempt.dataLocalRacks.toArray(
new String[taskAttempt.dataLocalRacks.size()])));
dataLocalHosts和dataLocalRacks来源于MapTaskAttemptImpl.java
public MapTaskAttemptImpl(TaskId taskId, int attempt,
EventHandler eventHandler, Path jobFile,
int partition, TaskSplitMetaInfo splitInfo, JobConf conf,
TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
AppContext appContext) {
super(taskId, attempt, eventHandler,
taskAttemptListener, jobFile, partition, conf, splitInfo.getLocations(),
jobToken, credentials, clock, appContext);
this.splitInfo = splitInfo;
}
接下来是RMContainerAllocator.java,处理container请求事件
private void handleMapContainerRequest(ContainerRequestEvent reqEvent) {
...
if(mapContainerRequestAccepted) {
reqEvent.getCapability().setMemorySize(
mapResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(
mapResourceRequest.getVirtualCores());
scheduledRequests.addMap(reqEvent);
} else {
...
}
}
scheduledRequests.addMap方法
void addMap(ContainerRequestEvent event) {
ContainerRequest request = null;
...
} else {
request =
new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
for (String host : event.getHosts()) {
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
if (list == null) {
list = new LinkedList<TaskAttemptId>();
mapsHostMapping.put(host, list);
}
list.add(event.getAttemptID());
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to host " + host);
}
}
for (String rack : event.getRacks()) {
LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
if (list == null) {
list = new LinkedList<TaskAttemptId>();
mapsRackMapping.put(rack, list);
}
list.add(event.getAttemptID());
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to rack " + rack);
}
}
maps.put(event.getAttemptID(), request);
addContainerReq(request);
}
...
}
}
RMContainerRequestor.java addContainerReq方法
protected void addContainerReq(ContainerRequest req) {
for (String host : req.hosts) {
if (!isNodeBlacklisted(host)) {
addResourceRequest(req.priority, host, req.capability,
null);
}
}
for (String rack : req.racks) {
addResourceRequest(req.priority, rack, req.capability,
null);
}
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.nodeLabelExpression);
}
最终会调用addResourceRequestToAsk方法将资源请求添加进ask
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
...
ask.remove(remoteRequest);
ask.add(remoteRequest);
}
资源申请过程
AM通过RMContainerAllocator.heartbeat()来对所需资源进行申请,并分配资源:
protected synchronized void heartbeat() throws Exception {
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
List<Container> allocatedContainers = getResources();
if (allocatedContainers != null && allocatedContainers.size() > 0) {
scheduledRequests.assign(allocatedContainers);
}
...
}
getResources()会调用RMContainerRequestor的makeRemoteRequest()方法,这里会把之前存入ask中的资源请求取出并发送
protected AllocateResponse makeRemoteRequest() throws YarnException,
IOException {
...
AllocateRequest allocateRequest =
AllocateRequest.newInstance(lastResponseID,
super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
new ArrayList<ContainerId>(release), blacklistRequest);
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
...
return allocateResponse;
}
接下来的链路如下: scheduler.allocate -> ApplicationMasterProtocol.allocate() -> ApplicationMasterService.allocate() -> AMSProcessingChain.allocate() -> DefaultAMSProcessor.allocate() -> FairScheduler.allocate() or CapacityScheduler.allocate()
DefaultAMSProcessor.allocate() 这里会区分具体使用的调度器
allocation = getScheduler().allocate(appAttemptId, ask,
request.getSchedulingRequests(), release,
blacklistAdditions, blacklistRemovals, containerUpdateRequests);
FairScheduler的资源分配过程
FairScheduler.allocate() 这个方法比较长,关键的逻辑是把资源请求ask更新到FSAppAttempt,并从FSAppAttempt里获取新的container
public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
List<ContainerId> release, List<String> blacklistAdditions,
List<String> blacklistRemovals, ContainerUpdates updateRequests) {
FSAppAttempt application = getSchedulerApp(appAttemptId);
...
application.updateResourceRequests(ask);
...
List<Container> newlyAllocatedContainers =
application.pullNewlyAllocatedContainers();
...
return new Allocation(newlyAllocatedContainers, headroom,
preemptionContainerIds, null, null,
updatedNMTokens, null, null,
application.pullNewlyPromotedContainers(),
application.pullNewlyDemotedContainers(),
previousAttemptContainers, null);
}
FSAppAttempt.java 接下来是FairScheduler接收节点心跳最终分配container的逻辑
private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
...
Collection<SchedulerRequestKey> keysToTry = (reserved) ?
Collections.singletonList(
node.getReservedContainer().getReservedSchedulerKey()) :
getSchedulerKeys();
writeLock.lock();
try {
for (SchedulerRequestKey schedulerKey : keysToTry) {
...
addSchedulingOpportunity(schedulerKey);
PendingAsk rackLocalPendingAsk = getPendingAsk(schedulerKey,
node.getRackName());
PendingAsk nodeLocalPendingAsk = getPendingAsk(schedulerKey,
node.getNodeName());
...
NodeType allowedLocality;
if (scheduler.isContinuousSchedulingEnabled()) {
allowedLocality = getAllowedLocalityLevelByTime(schedulerKey,
scheduler.getNodeLocalityDelayMs(),
scheduler.getRackLocalityDelayMs(),
scheduler.getClock().getTime());
} else {
allowedLocality = getAllowedLocalityLevel(schedulerKey,
scheduler.getNumClusterNodes(),
scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());
}
if (rackLocalPendingAsk.getCount() > 0
&& nodeLocalPendingAsk.getCount() > 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: NODE_LOCAL" + ", allowedLocality: "
+ allowedLocality + ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL,
reserved, schedulerKey);
}
if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
continue;
}
if (rackLocalPendingAsk.getCount() > 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
.equals(NodeType.OFF_SWITCH))) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: RACK_LOCAL" + ", allowedLocality: "
+ allowedLocality + ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL,
reserved, schedulerKey);
}
PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
ResourceRequest.ANY);
if (!appSchedulingInfo.canDelayTo(schedulerKey, ResourceRequest.ANY)) {
continue;
}
if (offswitchAsk.getCount() > 0) {
if (getAppPlacementAllocator(schedulerKey).getUniqueLocationAsks()
<= 1 || allowedLocality.equals(NodeType.OFF_SWITCH)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: OFF_SWITCH" + ", allowedLocality: "
+ allowedLocality + ", priority: "
+ schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
reserved, schedulerKey);
}
}
...
}
} finally {
writeLock.unlock();
}
return Resources.none();
}
getAllowedLocalityLevel方法
NodeType getAllowedLocalityLevel(
SchedulerRequestKey schedulerKey, int numNodes,
double nodeLocalityThreshold, double rackLocalityThreshold) {
if (nodeLocalityThreshold > 1.0) {
nodeLocalityThreshold = 1.0;
}
if (rackLocalityThreshold > 1.0) {
rackLocalityThreshold = 1.0;
}
if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
return NodeType.OFF_SWITCH;
}
writeLock.lock();
try {
if (!allowedLocalityLevel.containsKey(schedulerKey)) {
allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(schedulerKey);
if (allowed.equals(NodeType.OFF_SWITCH)) {
return NodeType.OFF_SWITCH;
}
double threshold = allowed.equals(NodeType.NODE_LOCAL) ?
nodeLocalityThreshold :
rackLocalityThreshold;
int schedulingOpportunities = getSchedulingOpportunities(schedulerKey);
double thresholdNum = numNodes * threshold;
if (schedulingOpportunities > thresholdNum) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
if (LOG.isTraceEnabled()) {
LOG.trace("SchedulingOpportunities: " + schedulingOpportunities
+ ", nodeLocalityThreshold: " + thresholdNum
+ ", change allowedLocality from NODE_LOCAL to RACK_LOCAL"
+ ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(schedulerKey);
} else if (allowed.equals(NodeType.RACK_LOCAL)) {
if (LOG.isTraceEnabled()) {
LOG.trace("SchedulingOpportunities: " + schedulingOpportunities
+ ", rackLocalityThreshold: " + thresholdNum
+ ", change allowedLocality from RACK_LOCAL to OFF_SWITCH"
+ ", priority: " + schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId);
}
allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(schedulerKey);
}
}
return allowedLocalityLevel.get(schedulerKey);
} finally {
writeLock.unlock();
}
}
|