1.构建ReadFruitMapper类,用于读取 fruit 表中的数据
package test;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//读取HBase中的表数据,写入Reducer
public class ReadFruitMapper extends Mapper <LongWritable, Text,LongWritable, Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
context.write(key,value);
}
}
2.构建WriteFruitMRReducer类,用于将读取到的 fruit 表中的数据写入到 fruit1?表中、
package test;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//将Reducer中的数据写入HBase的表中
public class WriteFruitMRReducer extends TableReducer<LongWritable, Text, NullWritable> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Reducer<LongWritable, Text, NullWritable, Mutation>.Context context) throws IOException, InterruptedException {
//1.遍历values
for (Text value : values) {
//1.1获取每行数据
String[] fields = value.toString().split("\t");
//1.2构建put对象
Put put = new Put(Bytes.toBytes(fields[0]));
//1.3给put对象赋值
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1]));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2]));
//1.4写出数据
context.write(NullWritable.get(),put);
}
}
}
3.构建FruitMRDriver类,用于提交运行任务
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FruitMRDriver implements Tool {
//定义一个Configuration
private Configuration configuration = null;
@Override
public int run(String[] args) throws Exception {
//1.获取Job对象
Job job = Job.getInstance(configuration);
//2.设置驱动类路径
job.setJarByClass(FruitMRDriver.class);
//3.设置Mapper&Mapper输出的KV类型
job.setMapperClass(ReadFruitMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
//4.设置Reducer类
TableMapReduceUtil.initTableReducerJob(args[1],WriteFruitMRReducer.class,job);
//5.设置最终的输出数据的KV类型
//6.设置输入参数
FileInputFormat.setInputPaths(job,new Path(args[0]));
//6.提交任务
boolean result = job.waitForCompletion(true);
return result?0:1;
}
@Override
public void setConf(Configuration conf) {
configuration = conf;
}
@Override
public Configuration getConf() {
return configuration;
}
public static void main(String[] args) {
try {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new FruitMRDriver(), args);
System.exit(run);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.将FruitMRDriver类打成jar包,上传至HBase。
5.新建一个表(fruit1)
6.利用jar包运行任务
yarn jar jar/hbase01-1.0-SNAPSHOT.jar test.FruitMRDriver /hbase/hbasetest/bigdatafile/input/fruit.tsv fruit1
7.运行成功后,查看fruit1表中的数据是否传入成功!!!
|