使用Java编写mapreduce程序,核心思想是 分治
简单来说,mapreduce编程需要经过以下8个步骤
map阶段
第一步:
读取文件,解析成key value 对 k1 v1
第二步:
指定map逻辑,接收 k1 v1 转换成新的 k2 v2
shuffle阶段
第三步:分区
相同key的value发送到同一个reduce当中去,key进行合并,value形成一个集合
第四步:排序
第五步:规约
第六步:分组
reduce阶段
第七步:
自定义reduce逻辑,接收k2 v2 输出k3 v3
第八步:
将k3 v3 写出去
主代码
mvn配置
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
主程序示例
package cn.laojiajiun.unpivot;
/*
* 需求:
* mapredce单词统计
*
*
* */
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class UnpivotRun extends Configured implements Tool {
/*
* 程序入口类
* @param args
* */
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//这里执行完之后,得到一个int类型的返回值,表示我们程序的推出状态码
//如果退出状态码为0,表示程序执行成功
//通过这里这里设置configuration,就相当于我们把父类的configuration设置值了
int run = ToolRunner.run(configuration,new UnpivotRun(),args);
System.exit(run);
}
/*
* run方法组装成程序,组装八个类
* @param args
* */
@Override
public int run(String[] args) throws Exception {
//第一步,读取文件,解析成key,value对
//从父类里面获取configuration配置文件
//getInstance需要两个参数,第一个参数是我们的configuration配置文件,第二个参数是jobname,随便写
Job job = Job.getInstance(super.getConf(),"UnpivotRunDemoJob");
TextInputFormat.addInputPath(job,new Path("hdfs://192.168.88.4:8020/test_dir"));
job.setInputFormatClass(TextInputFormat.class);
//第二步,自定义map逻辑
job.setMapperClass(UnpivotMapper.class);
//设置我们key2, value2的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputKeyClass(IntWritable.class);
/**
* 第三到六步,全部省略
* 分区 相同key的value,发送到同一个reduce,key 合并,value 形成一个集合
* 排序
* 规约
* 分组
*/
//第七步:自定义reduce逻辑
job.setReducerClass(UnpivotReduce.class);
//设置我们key3 value3 的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//第八步:输出文件
//注意输出路径一定要不存在,存在就报错
TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.88.4:8020/src_output"));
job.setOutputFormatClass(TextOutputFormat.class);
//提交我们的任务到集群上面去
boolean b = job.waitForCompletion(true);
//确认我们程序退出的状态码
return b?0:1;
}
}
map自定义逻辑程序
package cn.laojiajiun.unpivot;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class UnpivotMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
/***
* 重写map方法,实现我们自己的逻辑,接收我们key1,value1 转换成成新的 key2 value2 输出
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//第一步: 切开我们一行的数据
String line = value.toString();
String[] split = line.split(" ");
//往下发送新的key value
for (String word : split) {
Text k2 = new Text(word);
IntWritable v2 = new IntWritable(1);
//通过write方法,将我们的数据往下发送
context.write(k2,v2);
}
}
}
reduce自定义逻辑
package cn.laojiajiun.unpivot;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
public class UnpivotReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
/***
*
* @param key 注意这个key 是k2
* @param values 注意这个values 是一个集合,集合的类型是v2
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int j=0;
for(IntWritable value:values) {
int num = value.get();
j += num;
}
// 输出key3 value3 类型
context.write(key,new IntWritable(j));
}
}
|