说明
hbase?数据说明
member表

result?表,用于存储 mapreduce?结果
?
mapreduce?程序功能
统计 member?表中 address.city?的值出现的次数
比如?上面截图中的数据,
beijing?出现了2次
ningde?出现了2次
Mapper
package com.test;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class AMapper extends TableMapper<Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
byte[] family = Bytes.toBytes("address");
byte[] column = Bytes.toBytes("city");
String data = Bytes.toString(value.getValue(family, column));
context.write(new Text(data), new IntWritable(1));
}
}
和之前从文件中逐行输入字符串不同,这里的Mapper?类型是 TableMapper<Text, IntWritable>
Text , IntWritable?分别是 Mapper?的输出的?键和值的类型。
Mapper?的输入直接体现在 map()函数的参数上
map(ImmutableBytesWritable key, Result value, Context context)
hbase的每一行数据,执行一次map函数,第一个参数是hbase数据的行键,第二个参数是整行的数据。
Reducer
package com.test;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text
import java.io.IOException;
public class AReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//super.reduce(key, values, context);
int total = 0;
for (IntWritable v : values) {
total += v.get();
}
byte[] rowKey = key.getBytes();
byte[] family = Bytes.toBytes("content");
byte[] column = Bytes.toBytes("count");
Put put = new Put(rowKey);
put.addColumn(family, column, Bytes.toBytes(total + ""));
context.write(new ImmutableBytesWritable(rowKey), put);
}
}
TableReducer<Text, IntWritable, ImmutableBytesWritable>
三个类,前两个是Reducer的输入
context.write?的第一个参数是?ImmutableBytesWritable(rowKey)?和第二个参数 put?的 rowkey?保持一致
(TODO:?这里我不太理解的是为何 context.write?不是直接?传入 put?就可以了,这里 put?里有了 hbase?写入行数据所需要的信息里,??ImmutableBytesWritable(rowKey)? 的作用是什么??
Main
package com.test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class Main {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", "127.0.0.1");
Job job = Job.getInstance(conf);
job.setJarByClass(Main.class);
byte[] family = Bytes.toBytes("address");
byte[] column = Bytes.toBytes("city");
Scan scan = new Scan();
scan.addColumn(family, column);
TableMapReduceUtil.initTableMapperJob("member", scan, AMapper.class, Text.class, IntWritable.class, job);
TableMapReduceUtil.initTableReducerJob("result", AReducer.class, job);
job.waitForCompletion(true);
}
}
? ? ? ? TableMapReduceUtil.initTableMapperJob(
"member", //?数据源对应的表
scan, //?查询
AMapper.class, // MAPPER对应的类
Text.class,? // MAPPER的输出键的类型
IntWritable.class,? // MAPPER的输出值的类型
job);
? ? ? ? TableMapReduceUtil.initTableReducerJob(
"result",//?存储结果数据的表
AReducer.class,? // REDUCER对应的
job);
运行结果


参考资料:
http://people.apache.org/~jdcryans/hbase-0.20.5-candidate-3/hbase-0.20.5/docs/api/org/apache/hadoop/hbase/mapreduce/TableReducer.html
org.apache.hadoop.hbase.mapreduce Class TableReducer<KEYIN,VALUEIN,KEYOUT>
java.lang.Object
org.apache.hadoop.mapreduce.Reducer<KEYIN,VALUEIN,KEYOUT,org.apache.hadoop.io.Writable>
org.apache.hadoop.hbase.mapreduce.TableReducer<KEYIN,VALUEIN,KEYOUT>
Type Parameters:
KEYIN ?- The type of the input key.
VALUEIN ?- The type of the input value.
KEYOUT ?- The type of the output ke
|