MapReduce on HBase
配置Hadoop运行时的依赖环境
vim /etc/profile
export HADOOP_CLASSPATH=/usr/local/soft/hbase-1.4.6/lib/*
source /etc/profile
echo $HADOOP_CLASSPATH
package com.liangzai.myhbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Demo05MRReadHBase {
public static class MRReadHBase extends TableMapper<Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String rowkey = Bytes.toString(key.get());
String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
context.write(new Text(clazz), new IntWritable(1));
}
}
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int cnt = 0;
for (IntWritable value : values) {
cnt += value.get();
}
context.write(key, new IntWritable(cnt));
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");
conf.set("fs.defaultFS", "hdfs://master:9000");
Job job = Job.getInstance(conf);
job.setJobName("Demo05MRReadHBase");
job.setJarByClass(Demo05MRReadHBase.class);
TableMapReduceUtil.initTableMapperJob(
"stu",
new Scan(),
MRReadHBase.class,
Text.class,
IntWritable.class,
job
);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path path = new Path("/MR/HBase/output/");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(path)) {
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);
job.waitForCompletion(true);
}
}
打jar包并上传到HDFS
配置Hadoop运行时的依赖环境
export HADOOP_CLASSPATH="$HBASE_HOME/lib/*"
提交任务
hadoop jar HBase-1.0.jar com.liangzai.myhbase.Demo05MRReadHBase
|