初试Hadoop之MapReduce
一、MapReduce的定义
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并行运行在一个Hadoop集群上。
二、MR的优缺点
优点:
缺点:
- 不擅长实时计算,无法像MySql一样,在毫秒或者秒级内返回结果
- 不擅长流式计算,因为输入的数据集时静态的
- 不擅长DAG(有向无环图)计算,使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下
三、认识与识别MR
3.1常用数据序列化类型
老规矩,先和我一起认识认识MR里面的数据类型吧,与Java做对比吧!
Java类型 | Hadoop Writable类型 |
---|
Boolean | BooleanWritable | Byte | ByteWritable | Int | IntWritable | Float | FloatWritable | Long | LongWritable | Double | DoubleWritable | String | Text | Map | MapWritable | Array | ArrayWritable | Null | NullWritable |
有细心的小伙伴可能发现了,这个类型,其实没那么难记,除了String类型外,就只剩下在后面加上一个Writable就可了。
3.2官方WordCount源码解析
- 看源码之前可以尝试着带着几个问题去看:
- 一个最基本的MR的程序是如何构成的?
- 程序的运行流程是怎样的?
官方源码解析如下
package org.apache.hadoop.examples;
import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount
{
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
this.result.set(sum);
context.write(key, this.result);
}
}
public static void main(String[] args)
throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; i++) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.3手写MR的WordCount案例
3.3.1需求说明
在给定的文本文件中统计输出每一个单词出现的总次数
(1)输入数据hello.txt
atguigu atguigu
ss ss
cls cls
jiao
banzhang
xue
hadoop
sgg sgg sgg
nihao nihao
bigdata0111
laiba
(2)期望输出数据
atguigu 2
banzhang 1
cls 2
hadoop 1
jiao 1
ss 2
xue 1
3.3.2 在IDEA创建项目并完成一些配置信息
? 创建Maven工程不会IDEA中创建Maven项目的可以参考我之前的博客Maven的安装以及在IDEA中的使用 ,
? (1) 在pom.xml中添加一下配置信息
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
? (2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入。
这个文件可以简单的理解成打印错误日志的文件。不配置不影响你的接下来操作。
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
<Appenders>
<Appender type="Console" name="STDOUT">
<Layout type="PatternLayout"
pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
</Appender>
</Appenders>
<Loggers>
<Logger name="test" level="info" additivity="false">
<AppenderRef ref="STDOUT" />
</Logger>
<Root level="info">
<AppenderRef ref="STDOUT" />
</Root>
</Loggers>
</Configuration>
3.3.3模拟实操MR
? (1)编写mapper类
package com.mr_test.wordcount_hello;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class helloMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
Text outk = new Text();
IntWritable outv = new IntWritable(1);
@Override
protected void map(LongWritable key ,Text value ,Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] splits = line.split(" ");
for (String split: splits) {
outk.set(split);
context.write(outk,outv);
}
}
}
? (2)编写Reducer类
package com.mr_test.wordcount_hello;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class helloReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
IntWritable outv = new IntWritable();
@Override
protected void reduce(Text key ,Iterable<IntWritable> values ,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value: values) {
sum += value.get();
}
outv.set(sum);
context.write(key,outv);
}
}
? (3)编写Driver驱动类
package com.mr_test.wordcount_hello;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class helloDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(helloDriver.class);
job.setMapperClass(helloMapper.class);
job.setReducerClass(helloReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job,new Path("D:\\StudyFile\\BigDate\\02.大数据技术之Hadoop\\03.代码\\day04\\MapReduce\\src\\main\\java\\com\\atguigu\\mr\\wordcount\\com.bigData.mapreduce\\src\\main\\resources\\wcinput\\hello.txt"));
FileOutputFormat.setOutputPath(job,new Path("D:\\StudyFile\\BigDate\\02.大数据技术之Hadoop\\03.代码\\day04\\MapReduce\\src\\main\\java\\com\\atguigu\\mr\\wordcount\\com.bigData.mapreduce\\src\\main\\java\\testData\\wcinput2"));
job.waitForCompletion(true);
}
}
(4) 运行结果截图
最近忙着摸鱼哈哈,没咋写博客了,但是这两天应该会再次更新MapReduce的后续内容和Yarn的。一起加油呀!!!
|