MapReduce——wordcount
//先在开始前介绍环境:
//Hadoop2.7.5 zookeeper3.4.9 虚拟机JDK jdk1.8.0_141 本地JDK jdk-8u241-windows-x64
//环境配置详见
[https://blog.csdn.net/weixin_47878012/article/details/121579060]:
//IDEA必不可少
//IDEA必不可少
//IDEA必不可少
//在开始前将hdfs-site.xml当中的权限关闭
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
//项目git地址添加链接描述
1.配置windows的Hadoop运行环境
MapReduce本地运行时需要在windows系统需要配置hadoop运行环境,否则直接运行代码会出现以下问题:
缺少winutils.exe
Could not locate executable null \bin\winutils.exe in the hadoop binaries
缺少hadoop.dll
Unable to load native-hadoop library for your platform… using builtin-Java
classes where applicable
步骤:
Hadoop2.7.5配置好的文件已经上传到我的博客
[https://download.csdn.net/download/weixin_47878012/85075777?spm=1001.2014.3001.5503]:
第一步:将hadoop2.7.5文件夹拷贝到一个没有中文没有空格的路径下面
第二步:在windows上面配置hadoop的环境变量: HADOOP_HOME,并 将%HADOOP_HOME%\bin添加到path中
第三步:把hadoop2.7.5文件夹中bin目录下的hadoop.dll文件放到系统盘: C:\Windows\System32 目录 第四步:关闭windows重启
2.MapReduce 编程规范
Map 阶段 2 个步骤
- 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步
- 自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果
Shuffle阶段4个步骤
3.对输出的 Key-Value 对进行分区
4.对不同分区的数据按照相同的 Key 排序
5.(可选) 对分组过的数据初步规约, 降低数据的网络拷贝
6.对数据进行分组, 相同 Key 的 Value 放入一个集合中
Reduce 阶段 2 个步骤
7.对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的 Key-Value 进行处理, 转为新的 Key-Value(K3和V3)输出
8.设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据
3.WordCount
//需求: 在一堆给定的文本文件中统计输出每一个单词出现的总次数
Step 1. 数据格式准备
1.创建一个新的文件
cd /export/servers
vim wordcount.txt
2.向其中放入以下内容并保存
hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop
3.上传到 HDFS
hdfs dfs -mkdir /wordcount/
hdfs dfs -put wordcount.txt /wordcount/
4.创建Maven工程导入Maven坐标
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--XAUFE@yzy--><!--XAUFE@yzy--><!--XAUFE@yzy--><!--XAUFE@yzy--><!--XAUFE@yzy--><!--XAUFE@yzy-->
<modelVersion>4.0.0</modelVersion>
<groupId>XAUFE.yzy</groupId>
<artifactId>WordCount</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<!--XAUFE@yzy-->
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.4</version>
<!--XAUFE@yzy-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Step 2. Mapper
package cn.itcast.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*为了解决序列化问题Mapper自己定义了数据类型XAUFE@yzy
四个泛型解释:
KEYIN:K1的类型
VALUEIN:K2的类型
KEYOUT:K2的类型
VALUEOUT:V2的类型
* */
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
//map方法就是将k1v1转为k2v2
/*参数
key k1 行偏移量
value v1 每一行的文本数据
context 表示上下文对象
如何将k1v1转化为k2v2:
XAUFE@yzy
k1 v1
0 hello world hadoop
15 hdfs hive hello
k2 v2
hello 1
world 1
hdfs 1
hello 1
* */
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text = new Text();
LongWritable longWritable = new LongWritable();
//1:将一行的文本数据进行拆分 alt
String[] split = value.toString().split(",");
//2:遍历数组,组装k2v2 iter
for (String word : split) {
//3:将k2v2写入上下文 靠context
text.set(word);
longWritable.set(1);
context.write(text,longWritable);
}
}
}
Step 3. Reducer
package cn.itcast.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
//reduc方法作用:将k2v2转化为k3v3,写入上下文方法中
/*
* 参数:
* key :新k2
* value : 集合新 v2
* context 表示上下文对象*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
//如何将k2v2转化为k3v3
/*、
* XAUFE@yzy
新 k2 v2
hello 《1,1,1》
world 《1,1》
hadoop 《1》
k3 v3
hello 3
world 2
hadoop 1*
*/
long count = 0;
//1遍历集合 将集合中数字相加 得到v3
for (LongWritable value : values) {
count += value.get();
}
//2将k3和v3写入上下文中
context.write(key,new LongWritable(count));
}
}
Step 4. 定义主类, 描述 Job 并提交 Job
package cn.itcast.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain extends Configured implements Tool {
//指定一个job任务
@Override
public int run(String[] strings) throws Exception {
//1创建一个job任务对象
Job job = Job.getInstance(super.getConf(), "wordcount");
/*
* 如果打包运行出错,需要加配置
* job.setJarByClass(JobMain.class);
* XAUFE@yzy
* */
//2配置job任务对象(八个步骤)
//第一步指定文件读取方式和路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
/**
*
* 本地运行时输入路径
*/
//TextInputFormat.addInputPath(job,new Path("file:///G:\\mapreduce\\wordcount_input"));
//目标文件夹不需要存在
job.setMapperClass(WordCountMapper.class);
//设置map阶段k2的类型
job.setMapOutputKeyClass(Text.class);
//设置map阶段v2的类型
job.setMapOutputValueClass(LongWritable.class);
//跳过Shuffle阶段
//第三四五六步用默认方式
//第七步指定reduce阶段处理方式和数据类型
job.setReducerClass(WordCountReducer.class);
//设置k3v3的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//第八步设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
//设置输出路径 不管本地运行还是集群运行,输出目录不能存在XAUFE@yzy
Path path = new Path("hdfs://node01:8020/wordcount_out");
TextOutputFormat.setOutputPath(job,path);
//TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcount_out"));
/*
* 如果输出目录已经存在
* 获取FileSystem
* 判读目录是否存在
* XAUFE@yzy
* */
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
boolean bl2 = fileSystem.exists(path);
if (bl2){
//删除目标目录
fileSystem.delete(path,true);
}
/**
*
* 本地运行时输出路径
*/
//TextOutputFormat.setOutputPath(job,new Path("file:///G:\\mapreduce\\wordcount_output"));
//等待任务结束
boolean bl = job.waitForCompletion(true);
return bl ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//启动job任务
int run = ToolRunner.run(configuration,new JobMain(),args);
System.exit(run);
}
}
4. MapReduce 运行模式
本地运行模式
1.MapReduce 程序是被提交给 LocalJobRunner 在本地以单进程的形式运行
2.处理的数据及输出结果可以在本地文件系统, 也可以在hdfs上
3.提前在window系统中创建输入文件
4.更改输入输出路径
将
TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcount_out"));
更改为
TextInputFormat.addInputPath(job,new Path("file:///G:\\mapreduce\\wordcount_input"));
TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcount_out"));
5.执行结束就可以看到输出文件wordcount_out找到part-r-00000文件为统计结果
集群运行模式
1.将 MapReduce 程序提交给 Yarn 集群, 分发到很多的节点上并发执行
2.处理的数据和输出结果应该位于 HDFS 文件系统
3.提交集群的实现步骤:
1)将程序打成JAR包,在IDEA右侧Maven中点击package
2)将打好的JAR包传到Hadoop上
cd /export/serves
mkdir jar_test
cd jar_text
rz -E
3)用Hadoop命令运行JAR包: JAR包名后跟主函数的路径
hadoop jar original-WordCount-1.0-SNAPSHOT.jar XAUFE/yzy/mapreduce/JobMain
4.执行结束后找到wordcount_out中part-r-00000文件为统计结果
|