(一)实现词频统计的基本的MapReduce 编程。
①在/user/hadoop/input 文件夹(该文件夹为空),创建文件wordfile1.txt 和wordfile2.txt 上传到HDFS 中的input 文件夹下。 文件wordfile1.txt 的内容如下: I love Spark I love Hadoop 文件wordfile2.txt 的内容如下: Hadoop is good Spark is fast ②启动Eclipse ,启动以后会弹出如下图所示界面,提示设置工作空间(workspace )。可以直接采用默认的设置“/home/hadoop/workspace ”,点击“OK ”按钮。可以看出,由于当前是采用hadoop 用户登录了Linux 系统,因此,默认的工作空间目录位于hadoop 用户目录“/home/hadoop ”下。 ③Eclipse 启动以后,选择“File–>New–>Java Project ”菜单,开始创建一个Java 工程。 ④在“Project name ”后面输入工程名称“WordCount ”,选中“Use default location ”,让这个Java 工程的所有文件都保存到“/home/hadoop/workspace/WordCount ”目录下。在“JRE ”这个选项卡中,可以选择当前的Linux 系统中已经安装好的JDK ,比如jdk1.8.0_162 。然后,点击界面底部的“Next> ”按钮,进入下一步的设置。 ⑤进入下一步的设置以后,需要在该界面中加载该Java 工程所需要用到的JAR 包,这些JAR 包中包含了与Hadoop 相关的Java API 。这些JAR 包都位于Linux 系统的Hadoop 安装目录下,对于本教程而言,就是“/usr/local/hadoop/share/hadoop ”目录下。点击界面中的“Libraries ”选项卡,然后,点击界面右侧的“Add External JARs… ”按钮,弹出如下图所示界面。 ⑥在该界面中,上面有一排目录按钮(即“usr ”、“local ”、“hadoop ”、“share ”、“hadoop ”、“mapreduce ”和“lib ”),当点击某个目录按钮时,就会在下面列出该目录的内容。 为了编写一个MapReduce 程序,一般需要向Java 工程中添加以下JAR 包: a.“/usr/local/hadoop/share/hadoop/common ”目录下的hadoop-common-3.1.3.jar 和haoop-nfs-3.1.3.jar ; b.“/usr/local/hadoop/share/hadoop/common/lib ”目录下的所有JAR 包; c.“/usr/local/hadoop/share/hadoop/mapreduce ”目录下的所有JAR 包,但是,不包括jdiff 、lib 、lib-examples 和sources 目录。 ⑦编写一个Java 应用程序,即WordCount.java 。在Eclipse 工作界面左侧的“Package Explorer ”面板中(如下图所示),找到刚才创建好的工程名称“WordCount ”,然后在该工程名称上点击鼠标右键,在弹出的菜单中选择“New–>Class ”菜单。 ⑧选择“New–>Class ”菜单以后会出现如下图所示界面,在该界面中只需要在“Name ”后面输入新建的Java 类文件的名称,这里采用名称“WordCount ”,其他都可以采用默认设置,然后,点击界面右下角“Finish ”按钮。 ⑨可以看出Eclipse 自动创建了一个名为“WordCount.java ”的源代码文件,并且包含了代码“public class WordCount{} ”,清空该文件里面的代码,然后在该文件中输入完整的词频统计程序代码。
(二)配置eclipse 环境,跑词频统计的程序。
(1)编译打包程序 ①编译上面编写的代码,直接点击Eclipse 工作界面上部的运行程序的快捷按钮,当把鼠标移动到该按钮上时,在弹出的菜单中选择“Run as ”,继续在弹出来的菜单中选择“Java Application ”,如下图所示。
②然后,会弹出如下图所示界面,点击界面右下角的“OK ”按钮,开始运行程序。 ③程序运行结束后,会在底部的“Console ”面板中显示运行结果信息(如下图所示)。 ④下面就可以把Java 应用程序打包生成JAR 包,部署到Hadoop 平台上运行。现在可以把词频统计程序放在“/usr/local/hadoop/myapp ”目录下。如果该目录不存在,可以使用如下命令创建。 cd /usr/local/hadoop mkdir myapp ⑤在Eclipse 工作界面左侧的“Package Explorer ”面板中,在工程名称“WordCount ”上点击鼠标右键,在弹出的菜单中选择“Export ”,如下图所示。 ⑥然后会弹出如下图所示界面,在该界面中选择“Runnable JAR file ”。 ⑦然后,点击“Next> ”按钮,弹出如下图所示界面。在该界面中,“Launch configuration ”用于设置生成的JAR 包被部署启动时运行的主类,需要在下拉列表中选择刚才配置的类“WordCount-WordCount ”。在“Export destination ”中需要设置JAR 包要输出保存到哪个目录,比如这里设置为“/usr/local/hadoop/myapp/WordCount.jar ”。在“Library handling ”下面选择“Extract required libraries into generated JAR ”。 ⑧然后点击“Finish ”按钮,会出现如下图所示界面。 ⑨可以忽略该界面的信息,直接点击界面右下角的“OK ”按钮,启动打包过程。打包过程结束后,会出现一个警告信息界面,如下图所示。 ⑩可以忽略该界面的信息,直接点击界面右下角的“OK ”按钮。至此,已经顺利把WordCount 工程打包生成了WordCount.jar 。可以到Linux 系统中查看一下生成的WordCount.jar 文件,可以在Linux 的终端中执行如下命令,可以看到,“/usr/local/hadoop/myapp ”目录下已经存在一个WordCount.jar 文件。 (2)运行程序 ①在运行程序之前,需要启动Hadoop 。 ②在启动Hadoop 之后,需要首先删除HDFS 中与当前Linux 用户hadoop 对应的input 和output 目录(即HDFS 中的“/user/hadoop/input ”和“/user/hadoop/output ”目录),这样确保后面程序运行不会出现问题。 ③然后,再在HDFS 中新建与当前Linux 用户hadoop 对应的input 目录,即“/user/hadoop/input ”目录。 ④然后把之前在Linux 本地文件系统中新建的两个文件wordfile1.txt 和wordfile2.txt (两个文件位于“/usr/local/hadoop ”目录下,并且里面包含了一些英文语句),上传到HDFS 中的“/user/hadoop/input ”目录下。 ⑤如果HDFS 中已经存在目录“/user/hadoop/output ”,则使用如下命令删除该目录。 ⑥现在就可以在Linux 系统中使用hadoop jar 命令运行程序。命令执行以后,当运行顺利结束时,屏幕上会显示类似如下的信息。 ⑦此时词频统计结果已经被写入了HDFS 的“/user/hadoop/output ”目录中,执行如下命令会在屏幕上显示如下词频统计结果。 至此,词频统计程序顺利运行结束。需要注意的是,如果要再次运行WordCount.jar ,需要首先删除HDFS 中的output 目录,否则会报错。
(三)编写MapReduce程序,实现计算平均成绩的程序。
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Score {
public static class Map extends
Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
while (tokenizerArticle.hasMoreElements()) {
StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
String strName = tokenizerLine.nextToken();
String strScore = tokenizerLine.nextToken();
Text name = new Text(strName);
int scoreInt = Integer.parseInt(strScore);
context.write(name, new IntWritable(scoreInt));
}
}
}
public static class Reduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
sum += iterator.next().get();
count++;
}
int average = (int) sum / count;
context.write(key, new IntWritable(average));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "localhost:9000");
String[] ioArgs = new String[] { "input/score", "output" };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Score Average <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Score Average");
job.setJarByClass(Score.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
(四)MapReduce 的工作原理是什么?
通过Client 、JobTracker 和TaskTracker 的角度来分析MapReduce 的工作原理。
??首先是客户端client要编写好mapreduce 程序,配置好mapreduce 的作业也就是job ,接下来就是启动job 了,启动job 是告知JobTracker 上要运行作业,这个时候JobTracker 就会返回给客户端一个新的job 任务的ID 值,接下来它会做检查操作,这个检查就是确定输出目录是否存在,如果存在那么job 就不能正常运行下去,JobTracker 会抛出错误给客户端,接下来还要检查输入目录是否存在,如果不存在同样抛出错误,如果存在JobTracker 会根据输入计算输入分片(Input Split ),如果分片计算不出来也会抛出错误,这些都做好了JobTracker 就会配置Job 需要的资源了。拿到jobID 后,将运行作业所需要的资源文件复制到HDFS 上,包括MapReduce 程序打包的JAR 文件、配置文件和计算所得的输入分片信息。这些文件都存放在jobTracker 专门为该作业创建的文件夹中,文件夹名为该作业的Job ID 。JAR 文件默认会有10个副本(mapred.submit.replication 属性控制);输入分片信息告诉 JobTracker 应该为这个作业启动多少个map 任务等信息。当资源文件夹创建完毕后,客户端会提交job 告知jobTracker 我已将所需资源写入hdfs 上,接下来请你帮我真正去执行job 。 ??分配好资源后,JobTracker 接收提交job 请求后就会初始化作业,初始化主要做的是将Job 放入一个内部的队列,等待作业调度器对其进行调度。当作业调度器根据自己的调度算法调度到该作业时,作业调度器会创建一个正在运行的job 对象(封装任务和记录信息),以便JobTracker 跟踪job 的状态和进程。创建job 对象时作业调度器会获取hdfs 文件夹中的输入分片信息,根据分片信息为每个input split 创建一个map 任务,并将map任务分配给tasktracker 执行。对于map 和reduce 任务,tasktracker 根据主机核的数量和内存的大小有固定数量的map 槽和reduce 槽。这里需要强调的是:map 任务不是随随便便地分配给某个tasktracker 的,这里涉及到后面要讲的数据本地化。 ??接下来就是任务分配了,这个时候tasktracker 会运行一个简单的循环机制定期发送心跳给jobtracker ,心跳间隔是5秒,程序员可以配置这个时间,心跳就是jobtracker 和tasktracker 沟通的桥梁,通过心跳,jobtracker 可以监控tasktracker 是否存活,也可以获取tasktracker 处理的状态和问题,同时tasktracker 也可以通过心跳里的返回值获取jobtracker 给它的操作指令。tasktracker 会获取运行job 所需的资源,比如代码等,为真正执行做准备。任务分配好后就是执行任务了。在任务执行时候jobtracker 可以通过心跳机制监控tasktracker 的状态和进度,同时也能计算出整个job 的状态和进度,而tasktracker 也可以本地监控自己的状态和进度。TaskTracker 每隔一段时间会给JobTracker 发送一个心跳,告诉JobTracker 它依然在运行,同时心跳中还携带者很多的信息,比如当前map 任务完成的进度等信息。当jobtracker 获得了最后一个完成指定任务的tasktracker 操作成功的通知时候,jobtracker 会把整个job 状态置为成功,然后当客户端查询job 运行状态时候(注意:这个是异步操作),客户端会查到job 完成的通知的。如果job 中途失败,mapreduce 也会有相应机制处理,一般而言如果不是程序员程序本身有bug ,mapreduce 错误处理机制都能保证提交的job 能正常完成。
(五)Hadoop 是如何运行MapReduce 程序的?
①将编译软件与hadoop 相连(如Eclipse 去链接hadoop ),直接运行程序。 ②将mapreduce 程序打包成jar 文件。
|