将hbase中的数据迁移到hdfs分布式文件系统中
package com.briup.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
public class HbaseToHdfs extends Configured implements Tool {
public static class HbaseToHdfsMapper extends TableMapper<Text, NullWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
String rk = new String(key.get());
sb.append("rk:" + rk).append(",");
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = value.getMap();
for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f : map.entrySet()) {
sb.append("cf:" + Bytes.toString(f.getKey())).append(",");
for (Map.Entry<byte[], NavigableMap<Long, byte[]>> q : f.getValue().entrySet()) {
sb.append("qv:" + Bytes.toString(q.getKey())).append(",");
for (Map.Entry<Long, byte[]> val : q.getValue().entrySet()) {
sb.append("VERSION:" + val.getKey()).append(",").append("value:" + Bytes.toString(val.getValue()));
}
}
}
context.write(new Text(sb.toString()), NullWritable.get());
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
String output = conf.get("output");
conf.set("hbase.zookeeper.quorum", "192.168.10.129:2181");
Job job = Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName("hbase2hdfs");
TableMapReduceUtil.initTableMapperJob("bd2101:emp", new Scan(), HbaseToHdfsMapper.class, Text.class, NullWritable.class, job);
job.setNumReduceTasks(0);
TextOutputFormat.setOutputPath(job, new Path(output));
return job.waitForCompletion(true) ? 0 : -1;
}
public static void main(String[] args) throws Exception {
System.exit(new ToolRunner().run(new HbaseToHdfs(), args));
}
}
将程序打成jar包发送发送到集群上运行 yarn jar BD2101-1.0-SNAPSHOT-jar-with-dependencies.jar com.briup.hbase.HbaseToHdfs -D output=/user/zhudz
将hdfs中文件的数据迁移到hbase表中存储
准备一个person.txt文件,并将其上传至hdfs文件系统中/user/zhudz下 vi persion.txt 2000,tom,male 3000,jake,female 4000,briup,male
package com.briup.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class HdfsToHbase extends Configured implements Tool {
public static class HdfsToHbaseMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, NullWritable.get());
}
}
public static class HdfsToHbaseReducer extends TableReducer<Text, NullWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
String[] line = key.toString().split(",");
Put put = new Put(Bytes.toBytes(line[0]));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes(line[1]));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("gender"), Bytes.toBytes(line[2]));
context.write(NullWritable.get(), put);
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
conf.set("hbase.zookeeper.quorum", "106.14.59.12:2181");
Job job = Job.getInstance(conf);
job.setJarByClass(this.getClass());
job.setJobName("hdfs2hbase");
job.setMapperClass(HdfsToHbaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
TableMapReduceUtil.initTableReducerJob("briup:emp", HdfsToHbaseReducer.class, job);
TextInputFormat.addInputPath(job, new Path("/user/zhudz/person.txt"));
return job.waitForCompletion(true) ? 0 : -1;
}
public static void main(String[] args) throws Exception {
System.exit(new ToolRunner().run(new HdfsToHbase(), args));
}
}
将程序打成jar包发送发送到集群上运行 yarn jar BD2101-1.0-SNAPSHOT-jar-with-dependencies.jar com.briup.hbase.HdfsToHbase
|