1 相关概念
Yarn是Hadoop框架中用于集群资源管理调度和任务分配的平台。可以将其理解为分布式的操作系统,而MapReduce则是运行于系统上的应用程序。
1.1 组成结构
如图所示,从资源管理和分配角度来看,Yarn可以分为ResourceManager(RM)和NodeManager(NM)。其中RM主要在整体上负责处理来自客户端的请求并启动ApplicationMaster,并对NM进行调度管理;NodeManager分别对每个节点上的资源进行管理,并处理RM分配的任务。
从任务运行角度来看,当作业提交到Yarn集群后,会启动一个ApplicationMaster对任务进行监管,并调度资源运行任务;任务的具体执行在每个节点上的Container内进行,它是封装了内存、磁盘、cpu等资源的抽象容器。
1.2 工作流程
如下所示为Yarn处理MapReduce任务的流程
- 作业提交:MapReducce程序提交到客户端所在的节点后,YarnRunner向ResourceManager申请一个Application。
- RM将该应用程序资源的提交路径hdfs://…./.staging以及任务id返回给YarnRunner。
- 客户端根据返回的路径提交job运行所需的资源,如切片信息Job.split、配置文件job.xml,应用程序WordCount.jar。
- 程序资源提交完毕后,申请运行MRAppMaster。
- RM将MRAppMaster申请初始化成一个Task,并添加到调度队列
- 作业初始化:调度队列取出一个Task,并将其分配给一个空闲的NodeManager节点
- NodeManager在收到任务后会创建容器运行MRAppMaster
- Container下载任务所需的资源
- 任务分配:MrAppMaster向RM申请运行多个MapTask任务资源
- RM将MapTask任务分配给另外两个NodeManager,NodeManager领取任务并创建容器。
- 任务执行:MRAppmaster向两个收到任务的NodeManager发送程序脚本,两个NodeManager分别执行MapTask对数据分区排序。
- MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
- ReduceTask从MapTask获取相应分区的数据,执行reduce任务。
- 程序运行完毕后,MR会向RM申请注销自己。
YARN在任务运行中会将其进度和状态(包括counter)返回给应用管理器, 客户端通过应用管理器向用户展示实施进度。此外,客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
2 常用命令
Yarn主要用于对Hadoop集群进行管理,因此常通过yarn命令来对集群或任务运行情况进行查看。除了yarn之外,还可以通过Web界面对Hadoop信息进行查看(9870端口),或者通过配置好的历史服务器查看作业运行情况(19888/jobhistory)。
如下所示为常用的yarn命令行操作
yarn application -list
yarn application -list -appStates FINISHED
yarn applicationattempt -list application_1612577921195_0001
yarn application -kill application_1612577921195_0001
yarn logs -applicationId application_1612577921195_0001
yarn container -list appattempt_1612577921195_0001_000001
yarn container -status container_1612577921195_0001_01_000001
yarn logs -applicationId application_1612577921195_0001 -containerId container_1612577921195_0001_01_000001
yarn node -list -all
yarn queue -status default
yarn rmadmin -refreshQueues
3 配置参数
在使用Yarn时需要根据具体的生产环境中节点的CPU、内存等物理属性设置相关的配置参数,配置文件yarn-site.xml中常见的配置选项如下
ResourceManager相关
- yarn.resourcemanager.scheduler.class:配置调度器的默认容量
- yarn.resourcemanager.scheduler.client.thread-count:处理调度器请求的线程数量,默认50
NodeManager相关
- yarn.nodemanager.resource.detect-hardware-capabilities:让yarn自动根据硬件进行配置,默认false
- yarn.nodemanager.resource.cpu-vcores:NodeManager使用CPU核数,默认8个
- yarn.nodemanager.resource.count-logical-processors-as-cores:是否将虚拟核数当作CPU核数,默认false。如果某个节点上的CPU性能较强,可以开启虚拟把它当作多个核来用
- yarn.nodemanager.resource.pcores-vcores-multiplier:一个物理核当作几个虚拟核,默认1.0,例如4核8线程,该参数就应设为2
- yarn.nodemanager.resource.memory-mb:NodeManager使用内存,默认8G
- yarn.nodemanager.resource.system-reserved-memory-mb NodeManager为系统保留多少内存,以上二个参数配置一个即可
- yarn.nodemanager.pmem-check-enabled:是否开启物理内存检查来限制container,默认打开
- yarn.nodemanager.vmem-check-enabled:是否开启虚拟内存检查来限制container,默认打开
- yarn.nodemanager.vmem-pmem-ratio:虚拟内存物理内存比例,默认2.1
Container相关
- yarn.scheduler.minimum-allocation-vcores 容器最小CPU核数,默认1个
- yarn.scheduler.maximum-allocation-vcores 容器最大CPU核数,默认4个
- yarn.scheduler.minimum-allocation-mb 容器最最小内存,默认1G
- yarn.scheduler.maximum-allocation-mb 容器最最大内存,默认8G
在实际运行过程中,由于Linux和Java 8会占用大量的虚拟内存导致虚拟内存实际无法达到物理内存的2.1倍,因此一般将虚拟内存检查关闭。
4 调度器
Hadoop作业调度器主要有三种:FIFO、容量(Capacity Scheduler)和公平(Fair Scheduler)。在配置文件yarn-site.xml中可以对调度器类型进行设置,Apache Hadoop3.1.3默认的资源调度器是Capacity Scheduler。
<property>
<description>The class to use as the resource scheduler.</description>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
FIFO调度器(First In First Out)只有一个作业队列,根据提交作业的先后顺序,先来先服务。当一个作业在执行时,其他任务只能等待,这样效率很低,很少使用。
使用多队列一方面可以防止一个任务阻塞任务队列导致其他任务无法执行,另一方面可以对队列进行排序,优先保证重要任务的执行。
4.1 容量调度器
容量调度器Capacity Scheduler是Yahoo开发的多用户调度器,他在FIFO的基础上支持多任务并行,提高了运行效率。如下所示,它有几个特点: - 多队列:每个队列可配置一定的资源量,每个队列采用FIFO调度策略。
- 容量保证:管理员可为每个队列设置资源最低保证和资源使用上限。例如分别规定队列A、B、C所能使用的最大资源分别为20%、50%、30%。
- 灵活性:如果一个队列中的资源有剩余,可以暂时共享给那些需要资源的队列,而一旦该队列有新的应用程序提交,则其他队列借调的资源会归还给该队列
- 多租户:支持多用户共享集群和多应用程序同时运行。但会对同一用户提交的作业所占资源量进行限定,防止其独占队列资源。例如分别设置用户Tory和Jimmy的资源使用上限为50%。
在进行资源分配时,从高到低按照队列-> 作业 -> 容器的顺序进行
- 首先在队列层面进行资源分配,优先选择资源占用率最低的队列分配资源
- 在作业资源分配时按照提交作业的优先级和提交时间顺序分配
- 在容器层面,首先考虑容器优先级,若相同则按照数据本地性原则,优先分配任务和数据在同一节点或同一机架的容器,从而保证任务尽快运行。
队列设置
在Hadoop目录下的etc/hadoop文件夹下有容量调度器单独的配置文件capacity-scheduler.xml,在其中可以对容量调度器队列等相关内容进行配置。容量调度器默认只有default一个队列,如下所示新增一个名为hive的队列,并对其进行设置
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,hive</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>40</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>60</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.capacity</name>
<value>60</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
<value>1</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
<value>80</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.state</name>
<value>RUNNING</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
<value>*</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
<value>*</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
<value>*</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime</name>
<value>-1</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.hive.default-application-lifetime</name>
<value>-1</value>
</property>
重启Yarn或者执行yarn rmadmin -refreshQueues刷新队列,就可以看到两条队列
yarn默认将任务提交到default队列,可以在启动任务时通过-D 参数指定提交的队列名为hive
hadoop jar hadoop-mapreduce-examples-3.1.3.jar wordcount -D mapreduce.job.queuename=hive /input /output
或者在驱动类中进行设置
Configuration conf = new Configuration();
conf.set("mapreduce.job.queuename","hive");
Job job = Job.getInstance(conf);
任务优先级
容量调度器支持任务优先级的配置,在资源紧张时优先级高的任务将优先获取资源。默认情况,Yarn将所有任务的优先级限制为0,若想使用任务的优先级功能,首先在yarn-site.xml文件中设置最大优先级。
<property>
<name>yarn.cluster.max-application-priority</name>
<value>5</value>
</property>
设置修改后重启yarn生效,之后提交任务时就可以指定任务的优先级,级别高的任务会优先调度执行。
hadoop jar hadoop-mapreduce-examples-3.1.3.jar wordcount -D mapreduce.job.priority=5 /input /output
也可以根据应用id修改正在运行任务的优先级
yarn application -appID application_1611133087930_0009 -updatePriority 5
4.2 公平调度器
公平调度器Fair Schedulere是Facebook开发的多用户调度器,其核心理念是保证在时间上公平地分配资源。如果是需要大量高并发的场景可选用公平调度器。
公平调度器在容量调度器的基础上,更改了资源的分配方式。即它保留了多队列、容量保证、灵活性、多租户等特定,但是在分配资源时容量调度会优先选择资源利用率低的队列,而公平调度器会优先选择资源缺额比例大的队列以达到公平。
默认情况下,公平调度器采用最大最小公平算法分配资源。如果一个队列中有两个应用程序,则每个应用程序可得到1/2的资源;如果三个应用程序则每个可得到1/3资源。如果用户自定义了作业的权重,则需要考虑权重占比。
在具体资源分配时要考虑是否饥饿、资源分配比、资源使用权重比
- 首先判断任务是否饥饿isNeedy = 资源使用量 < 实际最小资源份额。实际最小资源份额minShare = Min(资源需求量,配置的最小资源)
- 若都饥饿则资源分配比小者优先。资源分配比:minShareRatio = 资源使用量 / Max(mindshare, 1)
- 若都不饥饿,资源使用权值比小者优先。资源使用权重比:useToWeightRatio = 资源使用量 / 权重
- 若都相同,则按照提交顺序。
在不加权的情况下关注的是资源个数占比,例如有一条队列总资源12个, 有4个job,对资源的需求分别是: job1->1, job2->2 , job3->6, job4->5。
- 按照公平原则12 / 4 = 3,每个作业分三个,与其需求相比job1多2个,job2多1个,job3缺3个,job4缺2个。
- 将job1和2多出的两个分给3和4,由于没有加权,平均分配3 / 2 = 1.5,则job3和4最后分得3 + 1.5 = 4.5。
- 由于没有多余的空闲资源,分配结束。
在考虑加权的情况下分配要考虑权重占比,例如有一条队列总资源16,有4个job 对资源的需求分别是: job1->4 job2->2 job3->10 job4->4 。每个job的权重为: job1->5 job2->8 job3->1 job4->2
- 按照权重分配资源,对于job1分配资源16 ? (5+8+1+2)? 5 =5,与其需求相比多1个;同理求出job2分配8个,多6个;job3分1个少9个;job4分2个少2个。
- 将job1和2多出的7个资源继续分配给job3和4。job3按权重分得7?(1+2)?1=2.33,还缺6.67;job4分得2+4.66,多2.66
- 将job4多出的资源继续分配给job3.最终job1分得4、job2为2、job3为1+2.33+2.66=6、job4为4
DRF策略
DRF(Dominant Resource Fairness),之前的资源分配都是只考虑内存的单一标准。但是节点的资源有很多种,例如内存,CPU,网络带宽等,通过DRF综合考虑分配的资源比例。
假设集群一共有100 CPU和10T 内存,而应用A需要(2 CPU, 300GB),应用B需要(6 CPU,100GB)。则两个应用分别需要A(2%CPU, 3%内存)和B(6%CPU, 1%内存)的资源,这就意味着A是内存主导的, B是CPU主导的,针对这种情况,我们可以选择DRF策略对不同应用进行不同资源(CPU和内存)的一个不同比例的限制。
队列设置
如下所示,使用公平调度器实现根据提交任务的用户分配队列。例如用户Tory提交的用户在root.tory队列运行,用户Bob提交的任务在root.bob运行
首先修改yarn-site.xml将调度器设置为公平调度器,并指定其配置文件的位置
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
<description>配置使用公平调度器</description>
</property>
<property>
<name>yarn.scheduler.fair.allocation.file</name>
<value>/opt/module/hadoop-3.1.3/etc/hadoop/fair-scheduler.xml</value>
<description>指明公平调度器队列分配配置文件</description>
</property>
<property>
<name>yarn.scheduler.fair.preemption</name>
<value>false</value>
<description>禁止队列间资源抢占</description>
</property>
之后新建公平调度器的配置文件fair-scheduler.xml,apache官网对配置文件中的参数进行了介绍:https://hadoop.apache.org/docs/r3.1.3/hadoop-yarn/hadoop-yarn-site/FairScheduler.html
<?xml version="1.0"?>
<allocations>
<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
<queueMaxResourcesDefault>4096mb,4vcores</queueMaxResourcesDefault>
<queue name="tory">
<minResources>2048mb,2vcores</minResources>
<maxResources>4096mb,4vcores</maxResources>
<maxRunningApps>4</maxRunningApps>
<maxAMShare>0.5</maxAMShare>
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
</queue>
<queue name="bob" type="parent">
<minResources>2048mb,2vcores</minResources>
<maxResources>4096mb,4vcores</maxResources>
<maxRunningApps>4</maxRunningApps>
<maxAMShare>0.5</maxAMShare>
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
</queue>
<queuePlacementPolicy>
<rule name="specified" create="false"/>
<rule name="nestedUserQueue" create="true">
<rule name="primaryGroup" create="false"/>
</rule>
<rule name="reject" />
</queuePlacementPolicy>
</allocations>
这样当用户如果提交队列时指定了具体队列,则按照第一个规则执行;否则匹配第二个规则,按照用户名进行匹配;若都没有匹配,按照最后一个规则进行拒绝或提交到默认队列。
5 Tool接口
Hadoop提供了Tool接口对MapReduce类进行封装,然后通过Driver类调用tool类执行任务。
例如在执行如下wc.jar时传入参数
hadoop jar wc.jar WordCountDriver -Dmapreduce.job.queuename=root.tory /input /output
由于原程序中读取第一个参数作为输入路径、第二个参数作为输出,但是这里的第一个参数是-D指定执行队列,这样程序在读取参数时就会报错。因此可以通过Tool类对原程序进行封装,在Driver类中可以对传入的参数进行判定和预处理再传给Tool执行。
如下所示WordCount类实现Tool 接口,在run() 方法中实现Mapper、Reducer、输入输出和参数等的设置;实现setConf()和getConf()方法用于系统调用进行参数的设置和获取。通过内部类WordCountMapper和WordCountReducer实现Map和Reduce任务。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import java.io.IOException;
public class WordCount implements Tool {
private Configuration conf;
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
...
}
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
...
}
}
之后通过Driver类对传入的参数先进行判断,再将后面几位正确的参数传入tool,调用其run()方法执行MapReduce
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.util.Arrays;
public class WordCountDriver {
private static Tool tool;
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
switch (args[0]){
case "wordcount":
tool = new WordCount();
break;
default:
throw new RuntimeException(" No such tool: "+ args[0] );
}
int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));
System.exit(run);
}
}
|