67_尚硅谷_Hadoop_MapReduce_课程介绍
68_尚硅谷_Hadoop_MapReduce_概述&优点缺点
69_尚硅谷_Hadoop_MapReduce_核心思想
如果你的Reduce服务器cpu核数足够多,能够同时处理这么多map服务器,就是并行,否则就是并发(并发包括并行) MapReduce不擅长串行计算,因为它的效率很低,每次的中间计算结果它都要持久化到磁盘,磁盘的读写效率都比较低;而Spark它的中间结果是可以在内存当中处理的,那这个效率就会很高。
70_尚硅谷_Hadoop_MapReduce_官方WC源码&序列化类型
71_尚硅谷_Hadoop_MapReduce_编程规范
Mapper的输出类型对应Reduce的输入类型
72_尚硅谷_Hadoop_MapReduce_WordCount案例需求分析
分类和聚合 发送到Reduce,它就会把相同的key的value进行后续的处理,你是加是减是乘还是除,那跟你的业务逻辑有关系。那我们单词统计就是将相同key的value值加在一起。 你从Map到Reduce上下游的数据类型肯定是一致的。 Job的输出结果路径有一个注意事项:它不能提前存在,如果提前存在会报FilealReadyExit异常。
73_尚硅谷_Hadoop_MapReduce_WordCount案例环境准备
(2)在pom.xml文件中添加如下依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
(2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
74_尚硅谷_Hadoop_MapReduce_WordCount案例Mapper
写代码始终关注它的输入和输出是什么样子 map是一行内容调用一次
package com.atguigu.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outk = new Text();
private IntWritable outV = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
outk.set(word);
context.write(outk, outV);
}
}
}
75_尚硅谷_Hadoop_MapReduce_WordCount案例Reducer
Mapper的输出类型对应Reduce的输入类型
package com.atguigu.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
private IntWritable outv = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value : values) {
sum+=value.get();
}
outv.set(sum);
context.write(key,outv);
}
}
76_尚硅谷_Hadoop_MapReduce_WordCount案例Driver
package com.atguigu.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import sun.plugin.dom.core.Text;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("E:\\Hadoop-Input\\inputword"));
FileOutputFormat.setOutputPath(job,new Path("E:\\Hadoop-Input\\output1"));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
77_尚硅谷_Hadoop_MapReduce_WordCount案例Debug调试(要会Debug)
78_尚硅谷_Hadoop_MapReduce_WordCount案例集群运行
(1)用maven打jar包,需要添加的打包插件依赖
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
这个修改过后的,再进行打包才是我们想要的 要选中选中WordCountDriver 以后在企业里开发,通常情况下是在windows环境下搭建一个hadoop环境进行一个编写,编写完之后的代码进行打包,打包之后上传到HDFS,然后进行执行命令。(在企业里都用hive了,谁还用MapReduce)
79_尚硅谷_Hadoop_MapReduce_序列化概述
80_尚硅谷_Hadoop_MapReduce_自定义序列化步骤
为什么使用序列化?因为hadoop的序列化更轻,用起来更高效
81_尚硅谷_Hadoop_MapReduce_序列化案例需求分析
82_尚硅谷_Hadoop_MapReduce_序列化案例FlowBean
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.downFlow+this.upFlow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow=dataInput.readLong();
this.downFlow=dataInput.readLong();
this.sumFlow=dataInput.readLong();
}
@Override
public String toString() {
return upFlow +"\t" + downFlow + "\t" + sumFlow;
}
}
83_尚硅谷_Hadoop_MapReduce_序列化案例FlowMapper
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outk=new Text();
private FlowBean outV=new FlowBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");
String phone =split[1];
String up =split[split.length-3];
String down=split[split.length-2];
outk.set(phone);
outV.setUpFlow(Long.parseLong(up));
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow();
context.write(outk,outV);
}
}
84_尚硅谷_Hadoop_MapReduce_序列化案例FlowReducer
reduce可以把重复的手机号再聚合起来一起算流量
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
private FlowBean outV= new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
long totalUp=0;
long totaldown=0;
for (FlowBean value : values) {
totalUp+=value.getUpFlow();
totaldown+=value.getDownFlow();
}
outV.setUpFlow(totalUp);
outV.setDownFlow(totaldown);
outV.setSumFlow();
context.write(key,outV);
}
}
85_尚硅谷_Hadoop_MapReduce_序列化案例FlowDriver
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path("E:\\Hadoop-Input\\inputflow"));
FileOutputFormat.setOutputPath(job, new Path("E:\\Hadoop-output\\output2"));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
86_尚硅谷_Hadoop_MapReduce_序列化案例debug调试
略
87_尚硅谷_Hadoop_MapReduce_切片机制与MapTask并行度决定机制
88_尚硅谷_Hadoop_MapReduce_Job提交流程
89_尚硅谷_Hadoop_MapReduce_切片源码
略
90_尚硅谷_Hadoop_MapReduce_切片源码总结
本地块大小32M,集群128M。剩下的部分小于1.1倍不再切片,大于1.1.倍按照切片大小正常去切割。每一个文件单独切片
91_尚硅谷_Hadoop_MapReduce_FileInputFormat切片机制
92_尚硅谷_Hadoop_MapReduce_TextInputFormat
93_尚硅谷_Hadoop_MapReduce_CombineTextInputFormat
94_尚硅谷_Hadoop_MapReduce_MapReduce工作流程
接上图:
95_尚硅谷_Hadoop_MapReduce_Shuffle机制
96_尚硅谷_Hadoop_MapReduce_默认HashPartitioner分区
97_尚硅谷_Hadoop_MapReduce_自定义分区案例
98_尚硅谷_Hadoop_MapReduce_分区数与Reduce个数的总结
99_尚硅谷_Hadoop_MapReduce_排序概述
在整个mapTask阶段,mapTask阶段它执行了两次排序,分别是在环形缓冲区溢写之前进行了一次快排(对key的索引排序,按照字典的顺序排);对溢写文件又进行了一次merge归并操作。 在reduceTask阶段又有一段归并排序。
mapReduce当中的Key为什么一定要排序?是为了提高相应的效率 进到环形缓冲区, 不是进来一条数据就一定对你先排序,而是到达一定阈值(80%)之后,要往磁盘上溢写之前进行一次排序,这个排序的过程是在内存当中完成的。
100_尚硅谷_Hadoop_MapReduce_全排序案例
101_尚硅谷_Hadoop_MapReduce_二次排序案例
102_尚硅谷_Hadoop_MapReduce_区内排序案例
略
103_尚硅谷_Hadoop_MapReduce_Combiner概述
算平均值这种,它会影响你最终计算的结果
104_尚硅谷_Hadoop_MapReduce_Combiner案例
combin只是把这个maptask里汇总了,reduce还要汇总
105_尚硅谷_Hadoop_MapReduce_outputformat概述
106_尚硅谷_Hadoop_MapReduce_自定义outputformat案例需求分析
107_尚硅谷_Hadoop_MapReduce_自定义outputformat案例mapper&reducer
108_尚硅谷_Hadoop_MapReduce_自定义outputformat案例执行
略
109_尚硅谷_Hadoop_MapReduce_MapTask工作机制
Read阶段中的RecorderReader中的k对应的偏移量,v对应的是一行的内容。
110_尚硅谷_Hadoop_MapReduce_ReduceTask工作机制&并行度
ReduceTask首先干的第一件事就是拉取自己指定分区的数据,这个阶段叫做Copy阶段,拉的过程。 相同的key进入到Reduce中,这个阶段叫做Reduce阶段。最后让OutPutFormat输出。
111_尚硅谷_Hadoop_MapReduce_MapTask源码
略
112_尚硅谷_Hadoop_MapReduce_ReduceTask源码
略
113_尚硅谷_Hadoop_MapReduce_ReduceJoin案例需求分析
114_尚硅谷_Hadoop_MapReduce_ReduceJoin案例TableBean
115_尚硅谷_Hadoop_MapReduce_ReduceJoin案例Mapper
116_尚硅谷_Hadoop_MapReduce_ReduceJoin案例完成
略
117_尚硅谷_Hadoop_MapReduce_ReduceJoin案例debug
略
118_尚硅谷_Hadoop_MapReduce_MapJoin案例需求分析
将另一个表加载到内存中,,通过获取pid获取其它内容,然后进而将表进行连接
119_尚硅谷_Hadoop_MapReduce_MapJoin案例完成
略
120_尚硅谷_Hadoop_MapReduce_MapJoin案例debug
121_尚硅谷_Hadoop_MapReduce_ETL数据清洗案例
略
122_尚硅谷_Hadoop_MapReduce_MapReduce开发总结
123_尚硅谷_Hadoop_MapReduce_压缩概述
124_尚硅谷_Hadoop_MapReduce_压缩案例实操
|