hbase的javaAPI的操作
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<releases><enabled>true</enabled></releases>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version></dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.14.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
-
- 创建包结构, 编写代码: com.itheima.hbase
6.1 创建表
需求: 
? 要求在hbase中进行建表, 将查表数据存储在hbase的表中
实现步骤
1) 通过hbase的连接工厂创建hbase的连接对象
2) 根据连接获取相关的管理对象: Admin(执行对表操作) 和 Table(执行对表数据操作)
3) 执行相关的操作:
4) 处理结果集(只有查询存在结果集):
5) 释放资源
代码实现
@Test
public void test01() throws Exception{
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
Connection hConn = ConnectionFactory.createConnection(conf);
Admin admin = hConn.getAdmin();
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf("WATER_BILL"));
ColumnFamilyDescriptorBuilder familyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder("C1".getBytes());
ColumnFamilyDescriptor familyDesc = familyDescriptorBuilder.build();
tableDescriptorBuilder.setColumnFamily(familyDesc);
TableDescriptor desc = tableDescriptorBuilder.build();
admin.createTable(desc);
admin.close();
hConn.close();
}
6.2 添加数据
@Test
public void test02() throws Exception{
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
Connection hConn = ConnectionFactory.createConnection(conf);
Table table = hConn.getTable(TableName.valueOf("WATER_BILL"));
Put put = new Put("4944191".getBytes());
put.addColumn("C1".getBytes(),"name".getBytes(),"登卫红".getBytes());
put.addColumn("C1".getBytes(),"address".getBytes(),"贵州省铜仁市德江县7单元267室".getBytes());
put.addColumn("C1".getBytes(),"sex".getBytes(),"男".getBytes());
put.addColumn("C1".getBytes(),"pay_time".getBytes(),"2020-05-10".getBytes());
put.addColumn("C1".getBytes(),"bill_record".getBytes(),"308.1".getBytes());
table.put(put);
table.close();
hConn.close();
}
6.3 抽取一些公共的方法
private Connection hConn;
private Table table;
private Admin admin;
private String tableName="WATER_BILL";
@Before
public void before() throws Exception{
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
hConn = ConnectionFactory.createConnection(conf);
table = hConn.getTable(TableName.valueOf(tableName));
admin = hConn.getAdmin();
}
@After
public void after() throws Exception{
table.close();
admin.close();
hConn.close();
}
6.4 查询某一条数据
@Test
public void test03() throws Exception{
Get get = new Get("4944191".getBytes());
Result result = table.get(get);
List<Cell> cellList = result.listCells();
for (Cell cell : cellList) {
byte[] rowBytes = CellUtil.cloneRow(cell);
String rowkey = Bytes.toString(rowBytes);
byte[] familyBytes = CellUtil.cloneFamily(cell);
String family = Bytes.toString(familyBytes);
byte[] qualifierBytes = CellUtil.cloneQualifier(cell);
String qualifier = Bytes.toString(qualifierBytes);
byte[] valueBytes = CellUtil.cloneValue(cell);
String value = Bytes.toString(valueBytes);
System.out.println("rowkey为:"+rowkey+"; 列族为:"+family+";列名为:"+qualifier+";列值为:"+value);
}
}
6.5 删除数据
@Test
public void test04() throws Exception{
Delete delete = new Delete("4944191".getBytes());
delete.addFamily("C1".getBytes());
table.delete(delete);
}
6.6 删除表
@Test
public void test05() throws Exception{
if( admin.isTableEnabled(TableName.valueOf(tableName)) ){
admin.disableTable(TableName.valueOf(tableName));
}
admin.deleteTable(TableName.valueOf(tableName));
}
6.7 导入数据的操作
-
导入数据的格式 hbase org.apache.hadoop.hbase.mapreduce.Import 表名 HDFS数据文件路径
-
导入案例 需求: 将资料中提供的10w的水表数据导入到hbase的water_bill表中
第一步: 将10w的抄表数据上传到HDFS中 /hbase/WATER_BILL/import
rz 上传到 linux
然后执行:
hdfs dfs -mkdir -p /hbase/WATER_BILL/import
hdfs dfs -put part-m-00000_10w /hbase/WATER_BILL/import
第二步: 执行导入到 WATER_BILL (列族为C1)
hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL hdfs://node1:8020/hbase/WATER_BILL/import/part-m-00000_10w
-
如何进行导出操作: hbase org.apache.hadoop.hbase.mapreduce.Export 表名 输出HDFS数据路径
6.8 基于scan的扫描查询
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NlfZmTPD-1625830795695)(C:\Users\LYS\AppData\Roaming\Typora\typora-user-images\image-20210709183357658.png)]](https://img-blog.csdnimg.cn/20210709194304295.png)
需求: 查询2020年6月份的所有用户的用水量: C1:RECORD_DATE
SQL:
select RECORD_DATE,NAME,NUM_USAGE from WATER_BILL where RECORD_DATE >= '2020-06-01 00:00:00' and RECORD_DATE < '2020-07-01 00:00:00'
代码实现
@Test
public void test06() throws Exception{
Scan scan = new Scan();
scan.addColumn("C1".getBytes(),"RECORD_DATE".getBytes());
scan.addColumn("C1".getBytes(),"NAME".getBytes());
scan.addColumn("C1".getBytes(),"NUM_USAGE".getBytes());
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(
"C1".getBytes(),
"RECORD_DATE".getBytes(),
CompareOperator.GREATER_OR_EQUAL,
new BinaryComparator("2020-06-01 00:00:00".getBytes()));
SingleColumnValueFilter filter2 = new SingleColumnValueFilter(
"C1".getBytes(),
"RECORD_DATE".getBytes(),
CompareOperator.LESS,
new BinaryComparator("2020-07-01 00:00:00".getBytes()));
FilterList filterList = new FilterList();
filterList.addFilter(filter1);
filterList.addFilter(filter2);
scan.setFilter(filterList);
scan.setLimit(10);
ResultScanner results = table.getScanner(scan);
for (Result result : results) {
List<Cell> cellList = result.listCells();
for (Cell cell : cellList) {
byte[] rowBytes = CellUtil.cloneRow(cell);
String rowkey = Bytes.toString(rowBytes);
byte[] familyBytes = CellUtil.cloneFamily(cell);
String family = Bytes.toString(familyBytes);
byte[] qualifierBytes = CellUtil.cloneQualifier(cell);
String qualifier = Bytes.toString(qualifierBytes);
byte[] valueBytes = CellUtil.cloneValue(cell);
if(qualifier.equals("NUM_USAGE")){
double value = Bytes.toDouble(valueBytes);
System.out.println("rowkey为:"+rowkey+"; 列族为:"+family+";列名为:"+qualifier+";列值为:"+value);
}else{
String value = Bytes.toString(valueBytes);
System.out.println("rowkey为:"+rowkey+"; 列族为:"+family+";列名为:"+qualifier+";列值为:"+value);
}
}
System.out.println("-------------------------------------------------");
}
}
|