1. 环境准备
新建项目后在 pom.xml 中添加依赖: 注意:会报错 javax.el 包不存在,是一个测试用的依赖,不影响使用
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.11</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
<version>3.0.1-b06</version>
</dependency>
</dependencies>
2. 创建链接
根据官方 API 介绍,HBase 的客户端连接由 ConnectionFactory 类来创建(工厂模式),用户使用完成之后需要手动关闭连接。同时连接是一个重量级的,推荐使用一个进程使用一个连接,对 HBase 的命令通过连接中的两个属性 Admin 和 Table 来实现。
1. 单线程创建连接
package com.fickler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class HBaseConnection {
public static void main(String[] args) throws IOException {
Configuration configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
Connection connection = ConnectionFactory.createConnection(configuration);
CompletableFuture<AsyncConnection> asyncConnection = ConnectionFactory.createAsyncConnection(configuration);
System.out.println(connection);
connection.close();
}
}
HBase 创建连接是一个重量级的连接,连接的时候时间花费较多,大约需要等待1min左右,出现下面的示图就代表连接成功了 
2. 多线程创建连接
使用类单例模式,确保使用一个连接,可以同时用于多个线程。
package com.fickler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class HBaseConnection {
public static Connection connection = null;
static {
try {
connection = ConnectionFactory.createConnection();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void closeConnection() throws IOException {
if (connection != null) {
connection.close();
}
}
public static void main(String[] args) throws IOException {
System.out.println(HBaseConnection.connection);
connection.close();
}
}
在 resources 文件夹中创建配置文件 hbase-site.xml,添加如下内容
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop104</value>
</property>
</configuration>
创建成功 
3. DDL
创建 HBaseDDL 类,添加静态方法即可作为工具类
public class HBaseDDL {
public static Connection connection = HBaseConnection.connection;
}
1. 创建命名空间
public static void createNamespace(String namespace) throws IOException {
Admin admin = connection.getAdmin();
NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);
builder.addConfiguration("user", "fickler");
try {
admin.createNamespace(builder.build());
} catch (IOException e) {
System.out.println("命名空间已经存在");
throw new RuntimeException(e);
}
admin.close();
}
 可以看到代码执行前和执行后,命名空间中多了一个 “fickler”  查看 “fickler” 命名空间的详细描述,可以看到我们设置的 kv 键值对 
2. 判断表格是否存在
public static boolean isTableExists(String namespace, String tableName) throws IOException {
Admin admin = connection.getAdmin();
boolean b = false;
try {
b = admin.tableExists(TableName.valueOf(namespace, tableName));
} catch (IOException e) {
throw new RuntimeException(e);
}
admin.close();
return b;
}

3. 创建表
public static void createTable(String namespace, String tableName, String... columnFamilies) throws IOException {
if (columnFamilies.length == 0) {
System.out.println("创建表格至少有一个列族");
return;
}
if (isTableExists(namespace, tableName)) {
System.out.println("表格已经存在");
return;
}
Admin admin = connection.getAdmin();
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName));
for (String columnFamily : columnFamilies) {
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
columnFamilyDescriptorBuilder.setMaxVersions(5);
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
}
try {
admin.createTable(tableDescriptorBuilder.build());
} catch (IOException e) {
throw new RuntimeException(e);
}
admin.close();
}
 查看表格的详细描述 
4. 修改表
public static void modifyTable(String namespace, String tableName, String columnFamily, int version) throws IOException {
if (!isTableExists(namespace, tableName)) {
System.out.println("表格不存在,无法修改");
return;
}
Admin admin = connection.getAdmin();
try {
TableDescriptor descriptor = admin.getDescriptor(TableName.valueOf(namespace, tableName));
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(descriptor);
ColumnFamilyDescriptor columnFamily1 = descriptor.getColumnFamily(Bytes.toBytes(columnFamily));
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(columnFamily1);
columnFamilyDescriptorBuilder.setMaxVersions(version);
tableDescriptorBuilder.modifyColumnFamily(columnFamilyDescriptorBuilder.build());
admin.modifyTable(tableDescriptorBuilder.build());
} catch (IOException e) {
throw new RuntimeException(e);
}
admin.close();
}

5. 删除表
public static boolean deleteTable(String namespace, String tableName) throws IOException {
if (!isTableExists(namespace, tableName)) {
System.out.println("表格不存在,无法删除");
return false;
}
Admin admin = connection.getAdmin();
try {
TableName tableName1 = TableName.valueOf(namespace, tableName);
admin.disableTable(tableName1);
admin.deleteTable(tableName1);
} catch (IOException e) {
throw new RuntimeException(e);
}
admin.close();
return true;
}
这里的表格后续还会用到,这里就先不删除了…
4. DML
创建类 HBaseDML
public class HBaseDML {
public static Connection connection = HBaseConnection.connection;
}
1. 插入数据
public static void putCell(String namespace, String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value));
try {
table.put(put);
} catch (IOException e) {
throw new RuntimeException(e);
}
table.close();
}
对比代码执行前面表格中的内容可以看出,数据已经插入成功了 
2. 读取数据
public static void getCell(String namespace, String tableName, String rowKey, String columnFamily, String columnName) throws IOException {
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
get.readAllVersions();
try {
Result result = table.get(get);
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String value = new String(CellUtil.cloneValue(cell));
System.out.println(value);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
table.close();
}
可以看到控制台已经输出了结果,说明读取数据成功 
3. 扫描数据
public static void scanRows(String namespace, String tableName, String startRow, String stopRow) throws IOException {
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRow));
scan.withStopRow(Bytes.toBytes(stopRow));
try {
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println(new String(CellUtil.cloneRow(cell)) + "-" + new String(CellUtil.cloneFamily(cell)) + "-" + new String(CellUtil.cloneQualifier(cell)) + "-" + new String(CellUtil.cloneValue(cell)) + "\t");
}
System.out.println();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
table.close();
}

4. 带过滤扫描
public static void filterScan(String namespace, String tableName, String startRow, String stopRow, String columnFamily, String columnName, String value) throws IOException {
Table table = connection.getTable(TableName.valueOf(namespace, tableName));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRow));
scan.withStopRow(Bytes.toBytes(stopRow));
FilterList filterList = new FilterList();
ColumnValueFilter columnValueFilter = new ColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), CompareOperator.EQUAL, Bytes.toBytes(value));
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), CompareOperator.EQUAL, Bytes.toBytes(value));
filterList.addFilter(singleColumnValueFilter);
scan.setFilter(filterList);
try {
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println(new String(CellUtil.cloneRow(cell)) + "-" + new String(CellUtil.cloneFamily(cell)) + "-" + new String(CellUtil.cloneQualifier(cell)) + "-" + new String(CellUtil.cloneValue(cell)) + "\t");
}
System.out.println();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
table.close();
}
5. 删除数据
public static void deleteColumn(String nameSpace, String tableName, String rowKey, String family, String column) throws IOException {
Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
delete.addFamily(Bytes.toBytes(family));
table.delete(delete);
table.close();
}
|