一:与MR对接
1:官方案例
? ? ? ? 1.1:读取HBASE数据
自己在Hadoop102,103,104三台机器上输入
export HADOOP_CLASSPATH=$HADOOOP_CLASSPATH:/opt/module/hbase/lib/*
然后在hbase目录下使用如下命令,这个是Hadoop读取hbase中的表数据,表名为student
/opt/module/hadoop-3.1.3/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
????????1.2:案例二:使用 MapReduce将本地数据导入到 HBase
新建tsv格式的文件
1001 Apple Red
1002 Pear Yellow
1003 Pineapple Yellow
创建hbase表
Hbase (main):001:0> create 'fruit','info'
创建input_fruit文件夹并上传fruit.tsv文件
$ opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
$ opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
执行mapreduce到hbase的fruit表
$ opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv
-Dimporttsv.columns=HBASE _ROW_ info:name,info:color fruit hdfs://hadoop:102:9000 /input_fruit
?使用命令查看导入后的结果
Hbase (main):001:0> scan 'fruit'
2:自定义案例
? ? ? ? 2.1:目标:将hdfs里面的数据写入到hbase的表中。
mapper
package com.atguigu.mr1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FruitMapper extends Mapper<LongWritable, Text,LongWritable,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(key,value);
}
}
reducer
package com.atguigu.mr1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class FruitReducer extends TableReducer<LongWritable, Text, NullWritable> {
String cf1 = null;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
cf1 = configuration.get("cf1");
}
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//1.遍历values
for (Text value : values) {
//2.获取每一行数据
String[] fields = value.toString().split("\t");
//3.构建put对象
Put put = new Put(Bytes.toBytes(fields[0]));
//4.给put赋值
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1]));
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2]));
//5.写出
context.write(NullWritable.get(),put);
}
}
}
driver
package com.atguigu.mr1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FruitDriver implements Tool {
//定义一个Configuration
private Configuration configuration = null;
public int run(String[] args) throws Exception {
//1.获取job对象
Job job = Job.getInstance(configuration);
//2.设置驱动类路径
job.setJarByClass(FruitDriver.class);
//3.设置mapper输出的kv类型
job.setMapperClass(FruitMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
//4.设置reduce类
TableMapReduceUtil.initTableReducerJob(args[1],FruitReducer.class,job);
//5.设置输入参数
FileInputFormat.setInputPaths(job,new Path(args[0]));
//6.提交任务
boolean result = job.waitForCompletion(true);
return result ? 0:1;
}
public void setConf(Configuration conf) {
configuration = conf;
}
public Configuration getConf() {
return configuration;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
ToolRunner.run(configuration,new FruitDriver(),args);
}
}
执行运行jar包
yarn jar hbase-demo-1.0-SNAPSHOT.jar com.atguigu.mr1.FruitDriver /fruit.tsv fruit1
结果检查?
? ? ? ? 2.2:从hbase读数据然后写入到hbase里面
mapper
package com.atguigu.mr2;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class FruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//构建put对象
Put put = new Put(key.get());
//1.获取数据
for (Cell cell : value.rawCells()) {
//2.判断当前的cell是否为"name"
if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//3.给put对象赋值
put.add(cell);
}
}
//4.写出
context.write(key,put);
}
}
reducer
package com.atguigu.mr2;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class FruitReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
//遍历写出
for (Put value : values) {
context.write(NullWritable.get(),value);
}
}
}
driver
package com.atguigu.mr2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FruirDriver2 implements Tool {
//定义配置信息
private Configuration configuration = null;
public int run(String[] args) throws Exception {
//1.获取job对象
Job job = Job.getInstance(configuration);
//2.设置主类路径
job.setJarByClass(FruirDriver2.class);
//3.设置mapper输出kv类型
TableMapReduceUtil.initTableMapperJob(args[0],new Scan(),FruitMapper.class, ImmutableBytesWritable.class, Put.class,job);
//4.设置reducer输出kv的表
TableMapReduceUtil.initTableReducerJob(args[1],FruitReducer.class,job);
//5.提交任务
boolean result = job.waitForCompletion(true);
return result ? 0 :1;
}
public void setConf(Configuration conf) {
configuration = conf;
}
public Configuration getConf() {
return configuration;
}
public static void main(String[] args) throws Exception {
//Configuration configuration = new Configuration();
Configuration configuration = HBaseConfiguration.create();
ToolRunner.run(configuration,new FruirDriver2(),args);
}
}
二:与hive对接
1:Hive ? ? ? ? 1.1数据仓库 ????????Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用 HQL去管理查询。 ? ? ? ? 1.2用于数据分析、清洗 ????????Hive适用于离线的数据分析和清洗,延迟较高。 ? ? ? ? 1.3基于 HDFS、 MapReduce ????????Hive存储的数据依旧在 DataNode上,编写的 HQL语句终将是转换为 MapReduce代码执行。 2:HBase ? ? ? ? 2.1数据库 ????????是一种面向列族存储的非关系型数据库。 ? ? ? ? 2.2用于存储结构化和非结构化的数据 ????????适用于单表非关系型数据的存储,不适合做关联查询,类似JOIN等操作。 ? ? ? ? 2.3基于 HDFS ????????数据持久化存储的体现形式是HFile,存放于DataNode中,被ResionServer以 region的形式进行管理。 ? ? ? ? 2.4延迟较低,接入在线业务使用 ????????面对大量的企业数据,HBase可以直线单表大量数据的存储,同时提供了高效的数据访问速度。
3:建立hive表,关联hbase表,插入数据到hive表的同时能影响hbase表
CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:co
mm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
4:在hive中创建临时中间表,用于load文件中的数据,不能将数据直接load进hive所关联的hbase那张表中。
CREATE TABLE emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by ' t';
5:向hive中间表中load数据
hive> load data local inpath '/home/admin/softwares/data emp.txt' into table emp;
6:通过insert命令将中间表中的数据导入到hive关联hbase的那张表中
hive> insert into table hive_ hbase _emp_table select * from emp;
7:查看hive以及关联的hbase表中是否已经成功的同步插入了数据
hive
hive> select from hive_ h base _emp_table;
hbase
Hbase > scan 'hbase _emp_table'
|