MapReduce 是 Google 公司开源的一项重要技术,它是一个编程模型,用以进行大数据量的计算。MapReduce 是一种简化的并行计算编程模型,它使那些没有多少并行计算经验的开发人员也可以开发并行应用程序。
- 模型非常方便使用,即使是对于完全没有分布式程序的程序员也是如此。它隐藏了并行计算的细节。
MapReduce 运行开发人员使用自己熟悉的语言进行开发。 - 通过
MapReduce ,应用程序可以在超过1000个节点的大型集群上运行,并且提供经过优化的错误容灾。
MapReduce 采用 “分而治之”思想,把对大规模数据集的操作,分发给一个主节点管理下的各个字节点共同完成,然后整合各个字节点的中间结果,得到最终的计算结果。简而言之,MapReduce 就是“分散任务,汇总结果 ”。
?MapReduce 编程模型
从MapReduce 自身的命名特点可以看出,MapReduce 至少由两部分组成:Map 和 Reduce 。Map理解为“分发”,Reduce理解为“聚合”。用户只需要编写 map() 和 reduce() 两个方法的逻辑,即可完成简单的分布式程序的设计。
?MapReduce 执行过程简要说明如下
- 读取
HDFS 文件内容,把内容中的每一行解析成一个个的<key, value> 键值对。key 是每行行首相对于文件起始位置的字节偏移量,value 就是具体的数据,一个文件切片对应一个 map task ,每读取一行就会调用一次 map 。 - 自定义
map 方法,编写自己的业务逻辑,对输入的<key, value> 处理,转换成新的<key,value> 输出作为中间结果。 - 为了让
reduce 可以并行处理 map 的结果,根据业务要求需要对 map 的输出进行一定的分区 对不同分区上的数据,按照 key 进行排序分组,相同 key 的value 放到一个集合中,把分组后的数据进行归约。每个 reduce 会接收各个map 中相同分区中的数据,对多个 map 任务的输出,按照不同的分区通过网络 copy 到不同 reduce 节点。这个过程称为 Shuffle洗牌 ,即Shuffle 就是把我们 map 中的数据分发到 reduce 中去的一个过程。 - 自定义
reduce 函数,编写自己的业务逻辑,对输入的<key,value> 键值对进行处理,转换成新的<key,value> 输出。 - 把
reduce 的输出保存到新的文件中。
搭建Windows Hadoop环境
- 下载 HadoopOnWindows 将 解压到一个没有中文没有空格的路径
D:/devtools - 在
window 上面配置配置 hadoop 的环境变量:HADOOP_HOME ,并将 %HADOOP_HOME%/bin 配置到 Path 中 - 把
hadoop 文件 bin 目录下的 hadoop.dll 文件放在系统盘 C:\Windows\System32 目录
Hadoop自定义数据类型
MapReduce 要求<key , value >的 key 和 value 都要实现 Writable 接口,从而支持Hadoop 的序列化 和反序列化 。
Java的类型 | Hadoop的内置类 | Java的类型 | Hadoop的内置类 | boolean | BooleanWritable | Float/float | FloatWritable | Integer/int | IntWritable | Double/double | DoubleWritable | Login/long | LoginWritable | String | Text | NullWritable 当 key 或 value 为空时使用 | ArrayWritable 存储属于Writable 类型的值数组 |
开发步骤
- 添加
hadoop 开发依赖 hadoop-client - 继承
Mapper 类实现自己的 Mapper 类,并重写 map() 方法 - 继承
Reduce 类实现自己的 Reduce 类,并重写 reduce() 方法 - 程序主入口类编写,创建
Job 和 任务入口 - 配置打包插件,执行
mvn clean package 对工程进行构建 - 上传
jar 到 Linudx 远程服务器任意目录,并执行程序输出到 output 目录下 - 查询执行后的结果
导入坐标依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
</dependencies>
编写Mapper业务逻辑
// Mapper
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
// key --> 字符偏移量 value 文本读取的一行数据 hello world context 上下文
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// map逻辑
// 获取每一行数据,转为字符串类型,通过split方法按空格进行拆分获取一个数组
String[] words = value.toString().split(" ");
// 遍历数组,获取每一个单词,通过上下文(context) 以 (单词,1) 的格式写到reduce中
for (String word : words) {
context.write(new Text(word),new IntWritable(1));
}
}
}
编写Reduce业务逻辑
// Reduce
public static class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// key hadoop value <1,1,1>
int count = 0;
// 遍历可迭代对象,获取数值进行统计
for (IntWritable num : values) {
count += num.get();
}
//次数计算完毕,通过上下文 context 以 (hadoop,4) 格式写到文件中
context.write(key,new IntWritable(count));
}
}
编写MapReduce程序主类
// 定义MR执行任务,关联Mapper 和 Reduce 以及 输出和输出文件地址
public static void main(String[] args) throws Exception{
//1. 实例化MR环境 --> Configuration
Configuration conf = new Configuration();
//2. 通过环境实例化一个任务 Job
Job job = Job.getInstance(conf,"词频统计");
//3. 指定执行任务的类是谁--> 入口方法所在的类
job.setJarByClass(WordCountJob.class);
// 4. 指定输入文件所在的位置
// D:\WorkSpace\MapReduce\input\words.txt
FileInputFormat.setInputPaths(job,new Path("D:\\WorkSpace\\MapReduce\\input\\words.txt"));
// 5. 指定Mapper阶段所对应的类
job.setMapperClass(WordCountMapper.class);
// Text, IntWritable
// 5.1 指定Mapper的输出key和value的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 6. 指定Reduce阶段所对应的类
job.setReducerClass(WordCountReduce.class);
// Text,IntWritable
// 6.1 指定Reduce的输出key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 7. 指定结果输出的地址 -->输出的地址一定不能存在
// 7.1 MR程序输出路径不能存在,通过HDFS API 进行判断删除
Path outputPath = new Path("D:\\WorkSpace\\MapReduce\\output");
// 获取HDFS 文件系统对象
FileSystem fs = FileSystem.get(conf);
// 判断指定的输出地址是否存在,如果存在,则删除
if(fs.exists(outputPath)){
fs.delete(outputPath,true);
}
// 指定处理后的数据输出地址
FileOutputFormat.setOutputPath(job,outputPath);
// 8. 执行任务,输出成功或失败
boolean flg = job.waitForCompletion(true);
System.out.println(flg?"执行成功":"执行失败");
// System.exit(flg?0:1);
}
MR程序打成JAR包,Hadoop平台运行
添加打JAR 包插件,并指定入口类
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<!--改成自己的MR程序main方法所在的类全路径-->
<mainClass>org.example.mapreduce.WordCountJob</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
修改MapReduce 程序,动态指定输入输出路径
public static void main(String[] args) throws Exception{
if(args.length<2){
System.err.println("Usage: yarn jar <jar_name> <in_path> <out_path>");
System.exit(2);
}
FileInputFormat.setInputPaths(job,args[0]);
FileOutputFormat.setOutputPath(job,args[1]);
}
通过Maven 插件进行打JAR 包
执行 mvn clean package 对工程进行构建
注意:词频统计的输入地址和输出地址都是HDFS文件系统地址。
[cdhong@centos8 hadoop]$ yarn jar mapreduce-demo-1.0-SNAPSHOT.jar /input/words.txt /output
使用IDEA直接交互Hadoop环境
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://node:9000"); // 执行操作的Hadoop环境,默认是本地
System.setProperty("HADOOP_USER_NAME","root"); // 指定操作的用户
数据去重
在MapReduce 流程中,Map 的输出<key, value> 经过 Shuffle 过程聚集成 <key, value-list> 后会交给Reduce 。当Reduce 接收到一个<key, value_list> 时就直接将key 复制到输出key 中,并将value 设置为空值。Reduce 中的key 表示要统计的数据,value 则没有太大意义。
片名:我不是药神,主演:徐峥,上映时间:2018-07-05,9.6
片名:千与千寻,主演:周冬雨, 上映时间:2019-06-21,评分:9.3
片名:阿甘正传,主演:汤姆·汉克斯, 上映时间:1994-07-06,评分:9.4
片名:阿甘正传,主演:汤姆·汉克斯, 上映时间:1994-07-06,评分:9.4
片名:触不可及,主演:弗朗索瓦·克鲁塞, 上映时间:2011-11-02,评分:9.1
片名:楚门的世界,主演:金·凯瑞, 上映时间:1998,评分:8.9
片名:寻梦环游记,主演:安东尼·冈萨雷斯,上映时间:2017-11-24,评分:9.6
片名:我不是药神,主演:徐峥,上映时间:2018-07-05,9.6
片名:楚门的世界,主演:金·凯瑞, 上映时间:1998,评分:8.9
public class RepeatHandler {
public static class RepeatHandlerMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static class RepeatHandlerReduce extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(RepeatHandler.class);
// 设置Mapper
job.setMapperClass(RepeatHandlerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 设置Reduce
job.setReducerClass(RepeatHandlerReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置输入和输出目录
FileInputFormat.setInputPaths(job, new Path("E:\\WorkSpace\\mapreduce\\input\\*"));
Path path = new Path("E:\\WorkSpace\\mapreduce\\output");
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);
// 任务执行
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
统计各部门员工薪水总和
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10
public class SalaryTotalHandler {
public static class SalaryTotalHandlerMapper extends Mapper<LongWritable, Text, IntWritable, DoubleWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",");
int deptNo = Integer.parseInt(line[7]); // 部门编号
double salary = Double.parseDouble(line[5]); // 薪资
context.write(new IntWritable(deptNo),new DoubleWritable(salary));
}
}
public static class SalaryTotalHandlerReduce extends Reducer<IntWritable, DoubleWritable,IntWritable, DoubleWritable>{
@Override
protected void reduce(IntWritable key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
// 对总工资求和
double total = 0;
for (DoubleWritable value : values) {
total += value.get();
}
context.write(key,new DoubleWritable(total));
}
}
}
统计各部门员工总数,平均薪资,总薪资???
多表查询
采用 MapReduce 实现类似下面 SQL 语句的功能: select d.*,e.* from emp e join dept d on e.deptno=d.deptno;
Map 端读取所有的文件,并为输出的内容加上标识,代表文件数据来源于员工表还是部门表,获取连接字段作为key ,进行分组。- 在
Reduce 端,获取每个分组中带有标识的数据与无标识的数据进行拼接即可。
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
public class EqualJoinHandler {
public static class EqualJoinHandlerMapper extends Mapper<LongWritable, Text, Text, Text> {
// 接收所有文件,对两张表打标识,根据连接列分组
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(","); //获取两个文件中的每一行数据,通过逗号分割获取数组
String deptNo = line.length == 3 ? line[0] : line[7]; // 根据数组的长度判断分别获取对应的分组字段 deptNo
context.write(new Text(deptNo), value); // 根据deptNo字段进行分组传递给Reduce
}
}
public static class EqualJoinHandlerReduce extends Reducer<Text, Text, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 数据暂存,Iterable有指针,且不方便后续处理
ArrayList<String> list = new ArrayList<>();
values.forEach(item -> list.add(item.toString()));
// 查找部门数据,用于拼接在Emp表的后面
String deptInfo = list.stream().filter(item -> item.split(",").length == 3).findFirst().orElse("");
// 查找所有员工数据,把dept表的数据拼接到每个员工表数据后面
list.stream()
.filter(item -> item.split(",").length > 3) // 过滤部门表数据
.map(item -> item.concat(deptInfo.substring(3))) // 拼接部门表数据,并去除部门编号前缀
.forEach(item -> context.write(new Text(item), NullWritable.get())); // 循环写入文件中
}
}
}
JSON数据格式化
通过 JSON 工具解析JSON字符串数据,获取所有数据维度,并按相应格式保存为数据文件
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
JSON数据样例
{
"success": true,
"msg": null,
"code": 0,
"content": {
"showId": "43e327f364c144be893e5adc4625c364",
"hrInfoMap": {
"7134703": {
"userId": 5930479,
"portrait": null,
"realName": "陈小姐",
"positionName": "招聘主管"
},
"8042425": {
"userId": 10905492,
"portrait": "i/image/M00/45/DA/Ciqc1F9Dj52Afz0hAACeGEp-ay0996.png",
"realName": "林小姐",
"positionName": "人事专员"
}
},
"pageNo": 1,
"positionResult": {
"resultSize": 15,
"result": [{
"positionId": 8094442,
"companyFullName": "上海致宇信息技术有限公司",
"companyShortName": "致宇信息",
"companySize": "150-500人",
"industryField": "金融,软件开发",
"financeStage": "不需要融资",
"companyLabelList": ["股票期权", "绩效奖金", "专项奖金", "年底双薪"],
"firstType": "开发|测试|运维类",
"secondType": "数据开发",
"thirdType": "BI工程师",
"skillLables": ["数据仓库", "Hadoop", "Spark", "Hive"],
"positionLables": ["数据仓库", "Hadoop", "Spark", "Hive"],
"industryLables": [],
"createTime": "2020-12-25 14:16:31",
"formatCreateTime": "2天前发布",
"city": "厦门",
"district": "思明区",
"businessZones": null,
"salary": "18k-22k",
"salaryMonth": "13",
"workYear": "1-3年",
"jobNature": "全职",
"education": "本科",
"positionAdvantage": "五险一金 年终奖 两次调薪 晋升空间",
"imState": "threeDays",
"lastLogin": "2020-12-25 18:17:20",
"publisherId": 10876210
}, {
"positionId": 8136910,
"positionName": "(大数据专场)Java开发工程师/专家-【数据架构】",
"companyId": 1880,
"companyFullName": "北京达佳互联信息技术有限公司",
"companyShortName": "快手",
"companyLogo": "i/image/M00/49/E7/Ciqc1F9QZJSAC0VBAACwLdjC9yo459.png",
"companySize": "2000人以上",
"industryField": "文娱丨内容",
"financeStage": "D轮及以上",
"companyLabelList": ["股票期权", "弹性工作", "定期体检", "岗位晋升"],
"firstType": "开发|测试|运维类",
"secondType": "后端开发",
"thirdType": "Java",
"skillLables": [],
"positionLables": [],
"industryLables": [],
"createTime": "2020-12-25 19:06:35",
"formatCreateTime": "2天前发布",
"city": "北京",
"district": "海淀区",
"businessZones": ["上地"],
"salary": "20k-40k",
"salaryMonth": "0",
"workYear": "3-5年",
"jobNature": "全职",
"education": "本科",
"positionAdvantage": "福利多,成长快",
"imState": "threeDays",
"lastLogin": "2020-12-25 18:48:45",
"publisherId": 11043272
}],
"locationInfo": {
"city": null,
"district": null,
"businessZone": null,
"isAllhotBusinessZone": false,
"locationCode": null,
"queryByGisCode": false
}
},
"pageSize": 15
},
"resubmitToken": null,
"requestId": null
}
MapReduce程序代码
public class LaGouJob {
public static class LaGouMapper extends Mapper<LongWritable, Text,Text, NullWritable>{
@Override
protected void map(LongWritable key, Text line, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 通过阿里巴巴的 fastJSON进行JSON数据解析
JSONArray result = JSON.parseObject(line.toString())
.getJSONObject("content")
.getJSONObject("positionResult")
.getJSONArray("result");
for (Object o : result) {
JSONObject obj = (JSONObject)o; // 类型转换
// 获取所需维度
String city = obj.getString("city");
String salary = obj.getString("salary");
String workYear = obj.getString("workYear");
String education = obj.getString("education");
// 拼接数据
// 完整格式:重庆,11-22K,本科,1-3年,股票期权|绩效奖金|专项奖金|年底双薪,数据仓库|Hadoop|Spark|Hive
String info = city+","+salary+","+workYear+","+education;
// 写到文件中
context.write(new Text(info),NullWritable.get());
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LaGouJob.class);
job.setMapperClass(LaGouMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//文件地址
FileInputFormat.setInputPaths(job,new Path("D:\\WorkSpace\\MapReduce\\input\\lagou\\*"));
FileOutputFormat.setOutputPath(job, HDFSUtil.delPath(conf,"D:\\WorkSpace\\MapReduce\\output"));
System.exit(job.waitForCompletion(true)?0:1);
}
}
序列化求每个部门的平均工资和奖金和
序列化时一种将内存中的Java 对象转化为其他可存储文件或可跨计算机传输数据流的一种技术。
由于在运行程序的过程中,保存在内存中的Java 对象会因为断电而丢失,或在分布式系统中,Java 对象需要从一台计算机传递给其他计算机进行计算,所有Java 对象需要通过某种技术转为为文件或实际可传输的数据流。这就是Java 的序列化。
常见的 Java 序列化方式是实现 java.io.Serializable 接口。而Hadoop 的序列化则是实现 org.apache.hadoop.io.Writable 接口,该接口包含 readFields() 、write() 两个方法。注意:序列化和反序列化方法的字段顺序需要保持一致。
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EmployeeWritable implements Writable {
private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private double sal;
private double comm;
private int deptno;
@Override // 序列化
public void write(DataOutput out) throws IOException {
out.writeInt(this.empno);
out.writeUTF(this.ename);
out.writeUTF(this.job);
out.writeInt(this.mgr);
out.writeUTF(this.hiredate);
out.writeDouble(this.sal);
out.writeDouble(this.comm);
out.writeInt(this.deptno);
}
@Override
public void readFields(DataInput in) throws IOException {
this.empno = in.readInt();
this.ename = in.readUTF();
this.job = in.readUTF();
this.mgr = in.readInt();
this.hiredate = in.readUTF();
this.sal = in.readDouble();
this.comm = in.readDouble();
this.deptno = in.readInt();
}
}
public class SalaryAvgHandler {
public static class SalaryAvgHandlerMapper extends Mapper<LongWritable, Text, IntWritable, EmployeeWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",");
EmployeeWritable emp = EmployeeWritable.builder()
.deptno(Integer.parseInt(line[0]))
.ename(line[1]).job(line[2])
.mgr(Integer.parseInt(Objects.equals(line[3], "") ? "-1" : line[3]))
.hiredate(line[4]).sal(Double.parseDouble(line[5]))
.comm(Double.parseDouble(Objects.equals(line[6], "") ? "0" : line[6]))
.deptno(Integer.parseInt(line[7])).build();
int deptno = Integer.parseInt(line[7]);
context.write(new IntWritable(deptno), emp);
}
}
public static class SalaryAvgHandlerReduce extends Reducer<IntWritable, EmployeeWritable, IntWritable, Text> {
@Override
protected void reduce(IntWritable key, Iterable<EmployeeWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
double sumSal = 0;
double sumComm = 0;
for (EmployeeWritable emp : values) {
count++;
sumSal += emp.getSal();
sumComm += emp.getComm();
}
String info = sumSal / count + "\t" + sumComm;
context.write(key, new Text(info));
}
}
}
WordCount TopN
在Hadoop 中,排序是MapReduce 的灵魂,MapTask 和ReduceTask 均会对数据按Key 排序,这个操作是MR 框架的默认行为,不过有的时候我们需要自己定义排序规则,具体实现有如下两种方式。
- 借助
TreeMap 集合工具和 Reduce 的生命周期方法 cleanup 实现 - 使用
MapReduce 的高级 API 使用多个 MapReduce 任务来完成
public static class WordCountHandlerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(" ");
for (String word : line) {
context.write(new Text(word), new IntWritable(1));
}
}
}
public static class WordCountHandlerReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
TreeMap<Integer, String> words = new TreeMap<>((o1, o2) -> -o1.compareTo(o2));
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
words.put(sum,key.toString());
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
words.entrySet().stream()
.limit(3)
.forEach(item-> {
try {
context.write(new Text(item.getValue()),new IntWritable(item.getKey()));
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
|