HBase简介
HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩、实时读写的分布式数据库
利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为其分布式协同服务,主要用来存储非结构化和半结构化的松散数据(列存 NoSQL 数据库)。
HBase特点
-
大:一个表可以有上亿行,上百万列。 -
面向列:面向列表(簇)的存储和权限控制,列(簇)独立检索。 -
稀疏:对于为空(NULL)的列,并不占用存储空间,因此,表可以设计的非常稀疏。 -
无模式:每一行都有一个可以排序的主键和任意多的列,列可以根据需要动态增 加,同一张表中不同的行可以有截然不同的列。 -
数据多版本:每个单元中的数据可以有多个版本,默认情况下,版本号自动分配, 版本号就是单元格插入时的时间戳。 -
数据类型单一:HBase中的数据都是字节数组,没有类型。
HBase架构
Master
- 为Region server分配region
- 负责Region server的负载均衡
- 发现失效的Region server并重新分配其上的region
- 管理用户对table的增删改操作
RegionServer
- Region server维护region,处理对这些region的IO请求
- Region server负责切分在运行过程中变得过大的region
Region
Memstore&Storefile
-
一个region由多个store组成,一个store对应一个CF(列族)store包括位于内存中的memstore和位于磁盘的storefile写操作先写入memstore,当memstore中的数据达到某个阈值,hregionserver会启动flashcache进程写入storefile,每次写入形成单独的一个storefile -
当storefile文件的数量增长到一定阈值后,系统会进行合并(minor、major compaction),在合并过程中会进行版本合并和删除工作(majar),形成更大的storefile -
当一个region所有storefile的大小和数量超过一定阈值后,会把当前的region分割为两个,并由hmaster分配到相应的regionserver服务器,实现负载均衡 -
客户端检索数据,先在memstore找,找不到再找storefile
HLog
-
HLog文件就是一个普通的Hadoop Sequence File,Sequence File 的Key是HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括 sequence number和timestamp,timestamp是”写入时间”,sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。 -
HLog SequeceFile的Value是HBase的KeyValue对象,即对应HFile中的KeyValue。
HBase数据模型
RowKey(行键)
ColumnFamily&Qualifier(列簇和列)
-
HBase表中的每个列都归属于某个列族,列族必须作为表模式(schema)定义的一部分预先给出。如 create ‘test’, ‘course’。 -
列名以列族作为前缀,每个“列族”都可以有多个列成员(column);如course:math, course:english, 新的列族成员(列)可以随后按需、动态加入。 -
权限控制、存储以及调优都是在列族层面进行的; -
HBase把同一列族里面的数据存储在同一目录下,由几个文件保存。
TimeStamp(时间戳)
-
在HBase每个cell存储单元对同一份数据有多个版本,根据唯一的时间戳来区分每个版本之间的差异,不同版本的数据按照时间倒序排序,最新的数据版本排在最前面。 -
时间戳的类型是 64位整型。 -
时间戳可以由HBase(在数据写入时自动)赋值,此时时间戳是精确到毫秒的当前系统时间 -
时间戳也可以由客户显式赋值,如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳。
Cell(存储单元)
HBase读写流程
HBase Shell
HBaseAPI
常用java类
java类 | HBase数据模型 |
---|
Admin / HBaseAdmin/ HBaseConfiguration | 数据库 | HTable/HTableDescriptor | 表 | HColumnDescriptor | 列簇 | Put/Delete/Get/Scan/ResultScanner/ | 列 | CellUtil | 存储单元 |
示例代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
public class Demo2API {
Configuration conf = null;
Connection conn = null;
@Before
public void init() {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");
try {
conn = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void create_table() throws IOException {
Admin admin = conn.getAdmin();
HTableDescriptor tableName = new HTableDescriptor(TableName.valueOf("tableName"));
HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
cf1.setMaxVersions(5);
cf1.setTimeToLive(30);
tableName.addFamily(cf1);
admin.createTable(tableName);
}
@Test
public void drop_table() throws IOException {
Admin admin = conn.getAdmin();
String tableName = "tableName";
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
}
@Test
public void put() throws IOException {
Table testJavaAPI = conn.getTable(TableName.valueOf("testJavaAPI"));
Put put = new Put("00001".getBytes());
put.addColumn("cf1".getBytes(), "name".getBytes(), "zhangSan".getBytes());
testJavaAPI.put(put);
}
@Test
public void get() throws IOException {
Table testJavaAPI = conn.getTable(TableName.valueOf("testJavaAPI"));
Get get = new Get("00001".getBytes());
Result rs = testJavaAPI.get(get);
byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
System.out.println(Bytes.toString(value));
}
@Test
public void scan() throws IOException {
Table testJavaAPI = conn.getTable(TableName.valueOf("test1"));
Scan scan = new Scan();
scan.withStartRow("001".getBytes());
scan.withStopRow("007".getBytes());
ResultScanner scanner = testJavaAPI.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
byte[] row = rs.getRow();
String rk = Bytes.toString(row);
System.out.println();
if ("001".equals(rk)) {
byte[] value = rs.getValue("cf1".getBytes(), "name".getBytes());
System.out.println(Bytes.toString(value));
} else if ("002".equals(rk)) {
byte[] value = rs.getValue("cf1".getBytes(), "name0".getBytes());
System.out.println(Bytes.toString(value));
byte[] value1 = rs.getValue("cf1".getBytes(), "name1".getBytes());
System.out.println(Bytes.toString(value1));
byte[] value2 = rs.getValue("cf1".getBytes(), "name100".getBytes());
System.out.println(Bytes.toString(value2));
byte[] value3 = rs.getValue("cf1".getBytes(), "name2".getBytes());
System.out.println(Bytes.toString(value3));
byte[] value4 = rs.getValue("cf1".getBytes(), "name3".getBytes());
System.out.println(Bytes.toString(value4));
byte[] value5 = rs.getValue("cf1".getBytes(), "name4".getBytes());
System.out.println(Bytes.toString(value5));
byte[] value6 = rs.getValue("cf1".getBytes(), "name5".getBytes());
System.out.println(Bytes.toString(value6));
} else if ("007".equals(rk)) {
byte[] value6 = rs.getValue("cf1".getBytes(), "name".getBytes());
System.out.println(Bytes.toString(value6));
byte[] value7 = rs.getValue("cf1".getBytes(), "age1".getBytes());
System.out.println(Bytes.toString(value7));
}
rs = scanner.next();
}
}
@Test
public void cellUtil() throws IOException {
Table testJavaAPI = conn.getTable(TableName.valueOf("test1"));
Scan scan = new Scan();
scan.withStartRow("001".getBytes());
scan.withStopRow("007".getBytes());
ResultScanner scanner = testJavaAPI.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
for (Cell cell : rs.listCells()) {
byte[] rk = CellUtil.cloneRow(cell);
byte[] cf = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
byte[] value = CellUtil.cloneValue(cell);
System.out.println("rowkey:" + Bytes.toString(rk) + ",columnsFamily:" + Bytes.toString(cf) + ",qualifier:" + Bytes.toString(qualifier) + ",value:" + Bytes.toString(value));
}
rs = scanner.next();
}
}
@Test
public void putAll() throws IOException {
Admin admin = conn.getAdmin();
if (!admin.tableExists(TableName.valueOf("students"))) {
HTableDescriptor students = new HTableDescriptor(TableName.valueOf("students"));
HColumnDescriptor info = new HColumnDescriptor("info");
students.addFamily(info);
admin.createTable(students);
}
Table students = conn.getTable(TableName.valueOf("students"));
BufferedReader br = new BufferedReader(new FileReader("data/students.txt"));
String line = br.readLine();
while (line != null) {
String[] splits = line.split(",");
String rk = splits[0];
String name = splits[1];
String age = splits[2];
String gender = splits[3];
String clazz = splits[4];
Put put = new Put(Bytes.toBytes(rk));
put.addColumn("info".getBytes(), "name".getBytes(), name.getBytes());
put.addColumn("info".getBytes(), "age".getBytes(), age.getBytes());
put.addColumn("info".getBytes(), "gender".getBytes(), gender.getBytes());
put.addColumn("info".getBytes(), "clazz".getBytes(), clazz.getBytes());
students.put(put);
line = br.readLine();
}
br.close();
}
@After
public void closeAll() throws IOException {
if (conn != null) {
conn.close();
}
}
}
HBase过滤器
作用
- 过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端
- 过滤器的类型很多,但是可以分为两大类:
- 比较过滤器:可应用于rowkey、列簇、列、列值过滤器
- 专用过滤器:只能适用于特定的过滤器
比较过滤器
比较运算符
-
LESS < -
LESS_OR_EQUAL <= -
EQUAL = -
NOT_EQUAL <> -
GREATER_OR_EQUAL >= -
GREATER > -
NO_OP 排除所有
常见的六大比较过滤器
BinaryComparator
- 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])
BinaryPrefixComparator
- 通BinaryComparator,只是比较左端前缀的数据是否相同
NullComparator
BitComparator
RegexStringComparator
- 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL
SubstringComparator
布隆过滤器
简介
- Bloom Filter(布隆过滤器)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。
Bloom Filter 工作原理
- BloomFilter对于HBase的随机读性能至关重要,对于get操作以及部分scan操作可以剔除掉不会用到的HFile文件,减少实际IO次数,提高随机读性能。在此简单地介绍一下Bloom Filter的工作原理,Bloom Filter使用位数组来实现过滤,初始状态下位数组每一位都为0,如下图所示:
- 假如此时有一个集合S = {x1, x2, … xn},Bloom Filter使用k个独立的hash函数,分别将集合中的每一个元素映射到{1,…,m}的范围。对于任何一个元素,被映射到的数字作为对应的位数组的索引,该位会被置为1。比如元素x1被hash函数映射到数字8,那么位数组的第8位就会被置为1。下图中集合S只有两个元素x和y,分别被3个hash函数进行映射,映射到的位置分别为(0,3,6)和(4,7,10),对应的位会被置为1:
- 现在假如要判断另一个元素是否是在此集合中,只需要被这3个hash函数进行映射,查看对应的位置是否有0存在,如果有的话,表示此元素肯定不存在于这个集合,否则有可能存在。下图所示就表示z肯定不在集合{x,y}中:
HFile 中和 Bloom Filter 相关的Block
- Bloom Block:Bloom数据块,存储Bloom的位数组
- Bloom Index Block:Bloom数据块的索引
- BloomFilter Meta Block:从HFile角度看bloom数据块的一些元数据信息,大小个数等等
HBase中每个HFile都有对应的位数组,KeyValue在写入HFile时会先经过几个hash函数的映射,映射后将对应的数组位改为1,get请求进来之后再进行hash映射,如果在对应数组位上存在0,说明该get请求查询的数据不在该HFile中。
示例代码
rowKey过滤器:RowFilter
通过RowFilter与BinaryComparator过滤比rowKey 1500100010小的所有值出来
@Test
public void BinaryComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
BinaryComparator binaryComparator = new BinaryComparator(Bytes.toBytes(1500100010));
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.LESS, binaryComparator);
Scan scan = new Scan();
scan.setFilter(rowFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
列簇过滤器:FamilyFilter
通过FamilyFilter与SubstringComparator查询列簇名包含in的所有列簇下面的数据
@Test
public void SubstringComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
SubstringComparator substringComparator = new SubstringComparator("in");
FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
Scan scan = new Scan();
scan.setFilter(familyFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
通过FamilyFilter与 BinaryPrefixComparator 过滤出列簇以info开头的列簇下的所有数据
@Test
public void BinaryPrefixComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator("info".getBytes());
FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);
Scan scan = new Scan();
scan.withStartRow("1500100001".getBytes());
scan.withStopRow("1500100011".getBytes());
scan.setFilter(familyFilter);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
列过滤器:QualifierFilter
通过QualifierFilter与SubstringComparator查询列名包含in的列的值
public void printRS(ResultScanner scanner) throws IOException {
for (Result rs : scanner) {
String rowkey = Bytes.toString(rs.getRow());
System.out.println("当前行的rowkey为:" + rowkey);
for (Cell cell : rs.listCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
byte[] bytes = CellUtil.cloneValue(cell);
if ("age".equals(qualifier)) {
int value = Bytes.toInt(bytes);
System.out.println(family + ":" + qualifier + "的值为" + value);
} else {
String value = Bytes.toString(bytes);
System.out.println(family + ":" + qualifier + "的值为" + value);
}
}
}
}
@Test
public void SubstringComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
SubstringComparator substringComparator = new SubstringComparator("in");
FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
Scan scan = new Scan();
scan.setFilter(familyFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
过滤出 列的名字 中 包含 “am” 所有的列 及列的值
@Test
public void SubstringComparatorQualifierFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
SubstringComparator substringComparator = new SubstringComparator("am");
QualifierFilter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
Scan scan = new Scan();
scan.withStartRow("1500100001".getBytes());
scan.withStopRow("1500100011".getBytes());
scan.setFilter(qualifierFilter);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
列值过滤器:ValueFilter
通过ValueFilter与BinaryPrefixComparator过滤出所有的cell中值以 “张” 开头的学生
@Test
public void BinaryPrefixComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator("张".getBytes());
ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);
Scan scan = new Scan();
scan.setFilter(valueFilter);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
过滤出文科的学生,只会返回clazz列,其他列的数据不符合条件,不会返回
@Test
public void RegexStringComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
RegexStringComparator regexStringComparator = new RegexStringComparator("^文科.*");
ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, regexStringComparator);
Scan scan = new Scan();
scan.withStartRow("1500100001".getBytes());
scan.withStopRow("1500100011".getBytes());
scan.setFilter(valueFilter);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
专用过滤器
单列值过滤器:SingleColumnValueFilter
- SingleColumnValueFilter会返回满足条件的cell所在行的所有cell的值(即会返回一行数据)
通过SingleColumnValueFilter与查询文科班所有学生信息
@Test
public void RegexStringComparatorFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
"info".getBytes(),
"clazz".getBytes(),
CompareFilter.CompareOp.EQUAL,
new RegexStringComparator("^文科.*")
);
Scan scan = new Scan();
scan.setFilter(singleColumnValueFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
列值排除过滤器:SingleColumnValueExcludeFilter
- 与SingleColumnValueFilter相反,会排除掉指定的列,其他的列全部返回
通过SingleColumnValueExcludeFilter与BinaryComparator查询文科一班所有学生信息,最终不返回clazz列
@Test
public void RegexStringComparatorExcludeFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(
"info".getBytes(),
"clazz".getBytes(),
CompareFilter.CompareOp.EQUAL,
new BinaryComparator("文科一班".getBytes())
);
Scan scan = new Scan();
scan.setFilter(singleColumnValueExcludeFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
rowkey前缀过滤器:PrefixFilter
通过PrefixFilter查询以150010008开头的所有前缀的rowkey
@Test
public void PrefixFilterFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
PrefixFilter prefixFilter = new PrefixFilter("150010008".getBytes());
Scan scan = new Scan();
scan.setFilter(prefixFilter);
ResultScanner scanner = students.getScanner(scan);
Result rs = scanner.next();
while (rs != null) {
String id = Bytes.toString(rs.getRow());
String name = Bytes.toString(rs.getValue("info".getBytes(), "name".getBytes()));
int age = Bytes.toInt(rs.getValue("info".getBytes(), "age".getBytes()));
String gender = Bytes.toString(rs.getValue("info".getBytes(), "gender".getBytes()));
String clazz = Bytes.toString(rs.getValue("info".getBytes(), "clazz".getBytes()));
System.out.println(id + "\t" + name + "\t" + age + "\t" + gender + "\t" + clazz + "\t");
rs = scanner.next();
}
}
分页过滤器PageFilter
- 通过PageFilter查询第三页的数据,每页10条
- 使用PageFilter分页效率比较低,每次都需要扫描前面的数据,直到扫描到所需要查的数据
- 可设计一个合理的rowkey来实现分页需求
@Test
public void PageFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
int PageNum = 3;
int PageSize = 10;
Scan scan = new Scan();
if (PageNum == 1) {
scan.withStartRow("".getBytes());
PageFilter pageFilter = new PageFilter(PageSize);
scan.setFilter(pageFilter);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
} else {
String current_page_start_rows = "";
int scanDatas = (PageNum - 1) * PageSize + 1;
PageFilter pageFilter = new PageFilter(scanDatas);
scan.setFilter(pageFilter);
ResultScanner scanner = students.getScanner(scan);
for (Result rs : scanner) {
current_page_start_rows = Bytes.toString(rs.getRow());
}
scan.withStartRow(current_page_start_rows.getBytes());
PageFilter pageFilter1 = new PageFilter(PageSize);
scan.setFilter(pageFilter1);
ResultScanner scanner1 = students.getScanner(scan);
printRS(scanner1);
}
}
通过合理的设置rowkey来实现分页功能
@Test
public void PageFilterTest2() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
int PageSize = 10;
int PageNum = 3;
int baseId = 1500100000;
int start_row = baseId + (PageNum - 1) * PageSize + 1;
int end_row = start_row + PageSize;
Scan scan = new Scan();
scan.withStartRow(String.valueOf(start_row).getBytes());
scan.withStopRow(String.valueOf(end_row).getBytes());
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
多过滤器综合查询
查询文科班中的学生中学号以150010008开头并且年龄小于23的学生信息
@Test
public void FilterListFilter() throws IOException {
Table students = conn.getTable(TableName.valueOf("students"));
Scan scan = new Scan();
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
"info".getBytes()
, "clazz".getBytes()
, CompareFilter.CompareOp.EQUAL
, new RegexStringComparator("^文科.*"));
PrefixFilter prefixFilter = new PrefixFilter("150010008".getBytes());
SingleColumnValueFilter singleColumnValueFilter1 = new SingleColumnValueFilter(
"info".getBytes()
, "age".getBytes()
, CompareFilter.CompareOp.LESS
, new BinaryComparator(Bytes.toBytes(23)));
FilterList filterList = new FilterList();
filterList.addFilter(singleColumnValueFilter);
filterList.addFilter(prefixFilter);
filterList.addFilter(singleColumnValueFilter1);
scan.setFilter(filterList);
ResultScanner scanner = students.getScanner(scan);
printRS(scanner);
}
Phoenix
Phoenix搭建
Phoenix 4.15 HBase 1.4.6 hadoop 2.7.6
关闭hbase集群,在master中执行
stop-hbase.sh
上传解压配置环境变量
解压
tar -xvf apache-phoenix-4.15.0-HBase-1.4-bin.tar.gz
改名
mv apache-phoenix-4.15.0-HBase-1.4-bin phoenix-4.15.0
将phoenix-4.15.0-HBase-1.4-server.jar复制到所有节点的hbase lib目录下
scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar master:/usr/local/soft/hbase-1.4.6/lib/
scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar node1:/usr/local/soft/hbase-1.4.6/lib/
scp /usr/local/soft/phoenix-4.15.0/phoenix-4.15.0-HBase-1.4-server.jar node2:/usr/local/soft/hbase-1.4.6/lib/
启动hbase , 在master中执行
start-hbase.sh
配置环境变量
vim /etc/profile
增加:export PHOENIX_HOME=/usr/local/soft/phoenix-4.15.0
path:$PHOENIX_HOME/bin
source /etc/profile
Phoenix使用
连接sqlline
sqlline.py master,node1,node2
163/163 (100%) Done
Done
sqlline version 1.5.0
0: jdbc:phoenix:master,node1,node2>
常用命令
CREATE TABLE IF NOT EXISTS STUDENT (
id VARCHAR NOT NULL PRIMARY KEY,
name VARCHAR,
age BIGINT,
gender VARCHAR ,
clazz VARCHAR
);
!table
upsert into STUDENT values('1500100004','葛德曜',24,'男','理科三班');
upsert into STUDENT values('1500100005','宣谷芹',24,'男','理科六班');
upsert into STUDENT values('1500100006','羿彦昌',24,'女','理科三班');
select * from STUDENT ;
select * from STUDENT where age=24;
select gender ,count(*) from STUDENT group by gender;
select * from student order by gender;
delete from STUDENT where id='1500100004';
drop table STUDENT;
!quit
更多语法参照官网
https:
phoenix映射
视图映射
Phoenix创建的视图是只读的,所以只能用来做查询,无法通过视图对源数据进行修改等操作
hbase shell
create 'test','name','company'
put 'test','001','name:firstname','zhangsan'
put 'test','001','name:lastname','zhangsan'
put 'test','001','company:name','数加'
put 'test','001','company:address','合肥'
create view "test" (
empid varchar primary key,
"name"."firstname" varchar,
"name"."lastname" varchar,
"company"."name" varchar,
"company"."address" varchar
);
CREATE view "students" (
id VARCHAR NOT NULL PRIMARY KEY,
"info"."name" VARCHAR,
"info"."age" VARCHAR,
"info"."gender" VARCHAR ,
"info"."clazz" VARCHAR
) column_encoded_bytes=0;
select * from "test";
drop view "test";
表映射
使用Apache Phoenix创建对HBase的表映射,有两类:
1) 当HBase中已经存在表时,可以以类似创建视图的方式创建关联表,只需要将create view改为create table即可。
2)当HBase中不存在表时,可以直接使用create table指令创建需要的表,并且在创建指令中可以根据需要对HBase表结构进行显示的说明。
第1)种情况下,如在之前的基础上已经存在了test表,则表映射的语句如下:
create table "test" (
empid varchar primary key,
"name"."firstname" varchar,
"name"."lastname"varchar,
"company"."name" varchar,
"company"."address" varchar
)column_encoded_bytes=0;
upsert into "test" values('1','2','3','4','5');
CREATE table "students" (
id VARCHAR NOT NULL PRIMARY KEY,
"info"."name" VARCHAR,
"info"."age" VARCHAR,
"info"."gender" VARCHAR ,
"info"."clazz" VARCHAR
) column_encoded_bytes=0;
upsert into "students" values('1500110004','葛德曜','24','男','理科三班');
使用create table创建的关联表,如果对表进行了修改,源数据也会改变,同时如果关联表被删除,源表也会被删除。但是视图就不会,如果删除视图,源数据不会发生改变。
Phoenix二级索引
开启索引支持
# 关闭hbase集群
stop-hbase.sh
# 在/usr/local/soft/hbase-1.4.6/conf/hbase-site.xml中增加如下配置
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
<name>hbase.rpc.timeout</name>
<value>60000000</value>
</property>
<property>
<name>hbase.client.scanner.timeout.period</name>
<value>60000000</value>
</property>
<property>
<name>phoenix.query.timeoutMs</name>
<value>60000000</value>
</property>
# 同步到所有节点
scp hbase-site.xml node1:`pwd`
scp hbase-site.xml node2:`pwd`
# 修改phoenix目录下的bin目录中的hbase-site.xml
<property>
<name>hbase.rpc.timeout</name>
<value>60000000</value>
</property>
<property>
<name>hbase.client.scanner.timeout.period</name>
<value>60000000</value>
</property>
<property>
<name>phoenix.query.timeoutMs</name>
<value>60000000</value>
</property>
# 启动hbase
start-hbase.sh
# 重新进入phoenix客户端
sqlline.sql master,node1,node2
创建索引
全局索引
全局索引适合读多写少的场景。如果使用全局索引,读数据基本不损耗性能,所有的性能损耗都来源于写数据。数据表的添加、删除和修改都会更新相关的索引表(数据删除了,索引表中的数据也会删除;数据增加了,索引表的数据也会增加)
注意: 对于全局索引在默认情况下,在查询语句中检索的列如果不在索引表中,Phoenix不会使用索引表将,除非使用hint。
CREATE TABLE IF NOT EXISTS DIANXIN (
mdn VARCHAR ,
start_date VARCHAR ,
end_date VARCHAR ,
county VARCHAR,
x DOUBLE ,
y DOUBLE,
bsid VARCHAR,
grid_id VARCHAR,
biz_type VARCHAR,
event_type VARCHAR ,
data_source VARCHAR ,
CONSTRAINT PK PRIMARY KEY (mdn,start_date)
) column_encoded_bytes=0;
psql.py master,node1,node2 DIANXIN.sql DIANXIN.csv
CREATE INDEX DIANXIN_INDEX ON DIANXIN ( end_date );
select * from DIANXIN where end_date = '20180503154014';
select * from DIANXIN where end_date = '20180503154014';
select * from DIANXIN where end_date = '20180503154014' and start_date = '20180503154614';
select end_date from DIANXIN where end_date = '20180503154014';
CREATE INDEX DIANXIN_INDEX1 ON DIANXIN ( end_date,COUNTY );
select end_date,MDN,COUNTY from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';
select * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';
select * from DIANXIN where end_date = '20180503154014' and COUNTY = '8340104';
select end_date from DIANXIN where COUNTY = '8340103';
本地索引
本地索引适合写多读少的场景,或者存储空间有限的场景。和全局索引一样,Phoenix也会在查询的时候自动选择是否使用本地索引。本地索引因为索引数据和原数据存储在同一台机器上,避免网络数据传输的开销,所以更适合写多的场景。由于无法提前确定数据在哪个Region上,所以在读数据的时候,需要检查每个Region上的数据从而带来一些性能损耗。
注意:对于本地索引,查询中无论是否指定hint或者是查询的列是否都在索引表中,都会使用索引表。
@ 创建本地索引
CREATE LOCAL INDEX DIANXIN_LOCAL_IDEX ON DIANXIN(grid_id);
select grid_id from dianxin where grid_id='117285031820040';
select * from dianxin where grid_id='117285031820040';
全局索引与本地索引区别
覆盖索引
覆盖索引是把原数据存储在索引数据表中,这样在查询时不需要再去HBase的原表获取数据就,直接返回查询结果。
注意:查询是 select 的列和 where 的列都需要在索引中出现。
CREATE INDEX DIANXIN_INDEX_COVER ON DIANXIN ( x,y ) INCLUDE ( county );
select * from dianxin where x=117.288 and y =31.822;
select * from dianxin where x=117.288 and y =31.822;
select mdn,x,y,county from dianxin where x=117.288 and y =31.822;
查询条件必须放在索引中 select 中的列可以放在INCLUDE (将数据保存在索引中)
select x,y,count(*) from dianxin group by x,y;
PhoenixJDBC
# 导入依赖
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.15.0-HBase-1.4</version>
</dependency>
Connection conn = DriverManager.getConnection("jdbc:phoenix:master,node1,node2:2181");
Statement stat = conn.createStatement();
ResultSet rs = stat.executeQuery("select * from dianxin limit 10");
while(rs.next()){
String mdn = rs.getString("mdn");
System.out.println(mdn);
}
stat.close();
conn.close();
HBase SQL与Hive SQL执行流程
HBase的MapReduce过程
#### 代码示例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
public class Demo7MapReduceReadAndWriteHBase {
public static class ReadHBaseMapper extends TableMapper<Text, IntWritable> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String rowkey = Bytes.toString(key.get());
String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));
context.write(new Text(clazz), new IntWritable(1));
}
}
public static class WriteHBaseReducer extends TableReducer<Text, IntWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
Put put = new Put(key.toString().getBytes());
put.addColumn("cf1".getBytes(), "num".getBytes(), Bytes.toBytes(sum));
context.write(NullWritable.get(), put);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master,node1,node2:2181");
Job job = Job.getInstance(conf);
job.setJobName(Demo7MapReduceReadAndWriteHBase.class.getName());
job.setJarByClass(Demo7MapReduceReadAndWriteHBase.class);
TableMapReduceUtil.initTableMapperJob(TableName.valueOf("students")
, new Scan()
, ReadHBaseMapper.class
, Text.class
, IntWritable.class
, job
);
TableMapReduceUtil.initTableReducerJob("clazz_num", WriteHBaseReducer.class, job);
job.waitForCompletion(true);
}
}
Hive关联Hbase表
create external table students_hbase
(
id string,
name string,
age string,
gender string,
clazz string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" = "
:key,
info:name,
info:age,
info:gender,
info:clazz
")
tblproperties("hbase.table.name" = "default:students");
HBaseHA
//在node1,node2中启用备用HMaster
hbase-daemon.sh start master
HBase调优
Pre-Creating Regions(预分区)
- 默认情况下,在创建HBase表的时候会自动创建一个region分区,当导入数据的时候, 所有的HBase客户端都向这一个region写数据,直到这个region足够大了才进行切分。 一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入 HBase时,会按照region分区情况,在集群内做数据的负载均衡。
- 如果知道hbase数据表的key的分布情况,就可以在建表的时候对hbase进行region的预分区。这样做的好处是防止大数据量插入的热点问题,提高数据插入的效率。
CREATE TABLE IF NOT EXISTS STUDENT (
id VARCHAR NOT NULL PRIMARY KEY,
name VARCHAR,
age BIGINT,
gender VARCHAR ,
clazz VARCHAR
)split on('15001006|','15001007|','15001008|') ;
//在hbase中
create 'split_table_test', 'cf', SPLITS_FILE => 'region_split_info.txt'
create 'split_table_test', 'cf', SPLITS => ['a','e','r']
Rowkey设计
原则:
- 唯一原则
- 长度原则(10-100bytes 定长)
- 散列原则
常用方式
反转
202133 -> 331202 202134 -> 431202 202135 -> 531202 202136 -> 631202 202137 -> 731202 202138 -> 831202 202139 -> 931202 202140 -> 041202
时间戳反转
hash
md5 sha1
202133 -> 41DDBBCED55669818B2A40F4FED46F56 202134 -> 19D329403F02E2DA265CFC05D41FD253 202135 -> F6D06AEC4FB72A04F9CD4020BEF5E10F 202136 -> 0B512404B0411E623F64EC8981F8AE21
加上随机前缀
随机散列
第一次:202133 -> 41DDBBCED55669818B2A40F4FED46F56 第二次:202133 -> D55669818B2A40F4 第三次:202133 -> 02E2DA265CFC05D4
需求:查看某个时刻的数据
需求:将最新的数据放到最前面
- 大数减小数
- 通常数据里有时间戳
- 时间戳实际上是跟Long类型非常相似 一个很大的数
- Long.MAX_VALUE - 值
大数:300000
202137 -> 300000 - 202137 = 97863 202138 -> 300000 - 202138 = 97862 202139 -> 300000 - 202139 = 97861 202140 -> 300000 - 202140 = 97860
加盐
-
会在rowkey前面加上一个随机的前缀, -
优点:不需要知道rowkey的分步情况 -
缺点:不能再hbase中对数据进行查询和修改
CREATE TABLE IF NOT EXISTS STUDENT (
id VARCHAR NOT NULL PRIMARY KEY,
name VARCHAR,
age BIGINT,
gender VARCHAR ,
clazz VARCHAR
)salt_buckets=6;
inmemory
创建表的时候,可以通过HColumnDescriptor.setInMemory(true)将表放到 RegionServer的缓存中,保证在读取的时候被cache命中。
maxversion
创建表的时候,可以通过HColumnDescriptor.setMaxVersions(int maxVersions)设置 表中数据的最大版本,如果只需要保存最新版本的数据,那么可以设置 setMaxVersions(1)。
建立索引超时,查询超时
修改配置文件,hbase-site.xml
两个位置
/usr/local/soft/phoenix-4.15.0/bin
/usr/local/soft/hbase-1.4.6/conf/ 所有节点
增加配置
<property>
<name>hbase.rpc.timeout</name>
<value>60000000</value>
</property>
<property>
<name>hbase.client.scanner.timeout.period</name>
<value>60000000</value>
</property>
<property>
<name>phoenix.query.timeoutMs</name>
<value>60000000</value>
</property>
需要重启hbase
Compact & Split
Minor Compaction:
- 指选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,在这个过程中不会处理已经Deleted或Expired的Cell。一次 Minor Compaction 的结果是更少并且更大的StoreFile。
Major Compaction:
- 指将所有的StoreFile合并成一个StoreFile,这个过程会清理三类没有意义的数据:被删除的数据、TTL过期数据、版本号超过设定版本号的数据。另外,一般情况下,major compaction时间会持续比较长,整个过程会消耗大量系统资源,对上层业务有比较大的影响。因此线上业务都会将关闭自动触发major compaction功能,改为手动在业务低峰期触发。
RegionSplit
- region中存储的是一张表的数据,当region中的数据条数过多的时候,会直接影响查询效率。当region过大的时候,region会被 拆分为两个region,HMaster会将分裂的region分配到不同的regionserver上,这样可以让请求分散到不同的RegionServer 上,已达到负载均衡 , 这也是Hbase的一个优点 。
ConstantSizeRegionSplitPolicy
-
0.94版本前,HBase region的默认切分策略 -
当region中最大的store大小超过某个阈值(hbase.hregion.max.filesize=10G)之后就会触发切分,一个region等分为2个region。但是在生产线上这种切分策略却有相当大的弊端(切分策略对于大表和小表没有明显的区分): -
阈值(hbase.hregion.max.filesize)设置较大对大表比较友好,但是小表就有可能不会触发分裂,极端情况下可能就1个,形成热点,这对业务来说并不是什么好事。 -
如果设置较小则对小表友好,但一个大表就会在整个集群产生大量的region,这对于集群的管理、资源使用、failover来说都不是一件好事。
IncreasingToUpperBoundRegionSplitPolicy
region split阈值的计算公式是:
-
设regioncount:是region所属表在当前regionserver上的region的个数 -
阈值 = regioncount^3 * 128M * 2,当然阈值并不会无限增长,最大不超过MaxRegionFileSize(10G),当region中最大的store的大小达到该阈值的时候进行region split
例如:
- 第一次split阈值 = 1^3 * 256 = 256MB
- 第二次split阈值 = 2^3 * 256 = 2048MB
- 第三次split阈值 = 3^3 * 256 = 6912MB
- 第四次split阈值 = 4^3 * 256 = 16384MB > 10GB,因此取较小的值10GB
- 后面每次split的size都是10GB了
特点
- 相比ConstantSizeRegionSplitPolicy,可以自适应大表、小表;
- 在集群规模比较大的情况下,对大表的表现比较优秀
- 对小表不友好,小表可能产生大量的小region,分散在各regionserver上
- 小表达不到多次切分条件,导致每个split都很小,所以分散在各个regionServer上
SteppingSplitPolicy
-
2.0版本默认切分策略 -
? 相比 IncreasingToUpperBoundRegionSplitPolicy 简单了一些 ? region切分的阈值依然和待分裂region所属表在当前regionserver上的region个数有关系 -
如果region个数等于1,切分阈值为flush size 128M * 2 -
否则为MaxRegionFileSize。 -
这种切分策略对于大集群中的大表、小表会比 IncreasingToUpperBoundRegionSplitPolicy 更加友好,小表不会再产生大量的小region,而是适可而止。
KeyPrefixRegionSplitPolicy
根据rowKey的前缀对数据进行分区,这里是指定rowKey的前多少位作为前缀,比如rowKey都是16位的,指定前5位是前缀,那么前5位相同的rowKey在相同的region中。
DelimitedKeyPrefixRegionSplitPolicy
- 保证相同前缀的数据在同一个region中,例如rowKey的格式为:userid_eventtype_eventid,指定的delimiter为 _ ,则split的的时候会确保userid相同的数据在同一个region中。
- 按照分隔符进行切分,而KeyPrefixRegionSplitPolicy是按照指定位数切分。
BusyRegionSplitPolicy
DisabledRegionSplitPolicy
不启用自动拆分, 需要指定手动拆分
热点问题
原因
当大量的client访问hbase集群的一个或少数几个节点,造成少数region server的读/写请求过多、负载过大,而其他region server负载却很小,就造成了“热点”现象。
危害
大量访问会使热点region所在的单个主机负载过大,引起性能下降甚至region不可用。
原理
有大量连续编号的row key ==> 大量row key相近的记录集中在个别region ==> client检索记录时,对个别region访问过多 ==> 此region所在的主机过载 ==> 热点
解决办法
HBase BulkLoading
优点:
-
如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。 -
它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。
限制:
-
仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。 -
HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群
代码
- 生成HFile部分
package com.shujia;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Demo10BulkLoading {
public static class BulkLoadingMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split(",");
String mdn = splits[0];
String start_time = splits[1];
String longitude = splits[4];
String latitude = splits[5];
String rowkey = mdn + "_" + start_time;
KeyValue lg = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lg".getBytes(), longitude.getBytes());
KeyValue lt = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lt".getBytes(), latitude.getBytes());
context.write(new ImmutableBytesWritable(rowkey.getBytes()), lg);
context.write(new ImmutableBytesWritable(rowkey.getBytes()), lt);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");
Job job = Job.getInstance(conf);
job.setJarByClass(Demo10BulkLoading.class);
job.setJobName("Demo10BulkLoading");
job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
job.setNumReduceTasks(4);
job.setMapperClass(BulkLoadingMapper.class);
job.setReducerClass(KeyValueSortReducer.class);
FileInputFormat.addInputPath(job, new Path("/data/DIANXIN/"));
FileOutputFormat.setOutputPath(job, new Path("/data/hfile"));
Connection conn = ConnectionFactory.createConnection(conf);
Table dianxin_bulk = conn.getTable(TableName.valueOf("dianxin_bulk"));
RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf("dianxin_bulk"));
HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);
job.waitForCompletion(true);
LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
load.doBulkLoad(new Path("/data/hfile"), conn.getAdmin(), dianxin_bulk, regionLocator);
}
}
说明
-
最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。 -
最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。 -
MR例子中HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。 -
MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了,但不能直接使用mv命令移动,因为直接移动不能更新HBase的元数据。 -
HFile入库到HBase通过HBase中 LoadIncrementalHFiles的doBulkLoad方法,对生成的HFile文件入库
思维导图
|