- 完成HBase与MR交互的配置
- HBase表作为MR的输入
- HBase表作为MR的输出
HBase与MR交互配置
官方文档敬上
- 查看 HBase 的 MapReduce 任务的执行
[hadoop@hadoop101 hbase-1.3.1]$ bin/hbase mapredcp
- 设置环境变量
vi /etc/profile 添加HBASE_HOME ,HADOOP_HOME export HBASE_HOME=/opt/module/hbase-1.3.1
export HADOOP_HOME=/opt/module/hadoop-3.1.3
修改hadoop-env.sh ,添加export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_HOME/lib/*
- 测试程序运行
yarn jar $HBASE_HOME/lib/hbase-server-1.3.1.jar rowcounter tab01
- 其他hbase机器重复上述配置
sudo xsync /etc/profile
xsync xsync $HADOOP_HOME/etc/hadoop/hadoop-env.sh
官方案例
将hdfs上面的/input_fruit/fruit.tsv文件内容导入到HBase的fruit表中 首先准备我们表中的内容:fruit.tsv(\t 分割)
1001 Apple Red
1002 Pear Yellow
1003 Pineapple Yellow
上传至hdfs
hadoop fs -put fruit.tsv /input_fruit
HBase中建立表:fruit
hbase shell
Hbase(main):001:0> create 'fruit','info
执行MR程序
yarn jar lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop101:8020/input_fruit
HDFS–>HBase的MR实现
还是上面的案例,这里我们用自己的代码实现
分析: Map:读取HDFS中的文件 Reducer:写入到HBase
测试数据 其中999数据为,切分不符合条件的数据,用来测试计数器
1001 Apple Red
1002 Pear Yellow
1003 Pineapple Yellow
999 Pineapple Yellow
Mapper.java
public class ReadHdfsFileMapper extends Mapper<LongWritable, Text,LongWritable,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(key,value);
}
}
Reducer.java
public class WriteHBaseReducer extends TableReducer<LongWritable, Text, NullWritable> {
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
String text = value.toString();
String[] split = text.split("\t");
if(split.length!=3){
context.getCounter("ImportTsv","Bad Lines").increment(1L);
continue;
}
Put put = new Put(Bytes.toBytes(split[0]));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(split[1]));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(split[2]));
context.write(NullWritable.get(),put);
}
}
}
Driver.java
public class Hdfs2HBaseDriver implements Tool {
private Configuration configuration;
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(configuration);
job.setJarByClass(Hdfs2HBaseDriver.class);
job.setMapperClass(ReadHdfsFileMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
TableMapReduceUtil.initTableReducerJob("fruit01", WriteHBaseReducer.class, job);
FileInputFormat.setInputPaths(job, args[0]);
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration configuration) {
this.configuration = configuration;
}
@Override
public Configuration getConf() {
return configuration;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
ToolRunner.run(conf, new Hdfs2HBaseDriver(),args);
}
}
本地调试
默认我们的应用是需要发布到集群上运行的 本地调试的话,按照如下步骤进行配置
- Configuration通过
HBaseConfiguration.create(); 进行创建
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
ToolRunner.run(conf, new Dirver(), args);
}
- 在resources下放入
hbase-site.xml ,内容与集群配置相同
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop101:8020/HBase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.master.port</name>
<value>60000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
</property>
</configuration>
HBase–>HBase的MR实现
上一个案例演示了,读取HDFS文件写入到HBase,接下来我们需要升级需求 将fruit01表的数据,写入到fruit02表,即演示从Hbase读取数据
直撸代码
public class Dirver implements Tool {
private Configuration configuration;
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(configuration);
job.setJarByClass(Dirver.class);
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("fruit01"), new Scan(), MyMapper.class, ImmutableBytesWritable.class, Put.class, job);
TableMapReduceUtil.initTableReducerJob("fruit02", MyReducer.class, job);
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration configuration) {
this.configuration = configuration;
}
@Override
public Configuration getConf() {
return configuration;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
ToolRunner.run(conf, new Dirver(), args);
}
public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
Cell[] rawCells = value.rawCells();
Put put = new Put(key.get());
for (Cell cell : rawCells) {
put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),cell.getTimestamp(), CellUtil.cloneValue(cell));
}
context.write(key, put);
}
}
public static class MyReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
Iterator<Put> iterator = values.iterator();
while(iterator.hasNext()){
Put put = iterator.next();
context.write(NullWritable.get(),put);
}
}
}
}
此处有坑: 也许你想Mapper端直接输出Put对象,然后Put对象就可以直接在Reducer阶段写出。也就是Mapper阶段可以是Key为NullWritable,那么这坑就来了!!! 这样你会发现写出的rowKey只有最后一个003,也就是原本希望3行数据,发现最终写入只有一行。 然后调试,你会感觉Mapper阶段是正常的,但是Reducer阶段确只执行一次。所以Mapper阶段Key一定要定义为ImmutableBytesWritable 或者自己定义一个Text
SO:必须用rowKey作为key,且类型必须是ImmutableBytesWritable
|