更好的阅读体验:https://www.wolai.com/5pTShabtu4t3x3xe3vtQp1
使用JAVA API连接虚拟机的HBase并进行数据库操作
目录
环境
代码编写在物理机
Hadoop和HBase安装在VMware(Ubuntu)
Ubuntu版本:18.04
Ubuntu ip:192.168.129.128
代码编写环境: IDEA
配置和连接HBase
修改hbase的配置文件
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.tmp.dir</name>
<value>./tmp</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>localhost</value>
</property>
<property>
<name>hbase.master.ipc.address</name>
<value>0.0.0.0</value>
</property>
<property>
<name>hbase.regionserver.ipc.address</name>
<value>0.0.0.0</value>
</property>
</configuration>
hadoop的配置文件core-site.xml
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>file:/root/hadoop/tmp</value>
<description>Abase for other temporary directories.</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
启动hadoop和hbase
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-X3Wd7XcN-1653099528518)(image/image_RCZxN6wLKu.png)]
关闭虚拟的防火墙,保证主机能够ping通虚拟机,同时在主机内进行访问
http:虚拟机ip:16010/master-status
保证能够成功访问成功
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i2U8Z1sb-1653099528525)(image/image_b-TFCqv4Eg.png)]
在IDEA引入依赖文件:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Hadoop</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>2.4.11</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.4.11</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>2.4.11</version>
</dependency>
</dependencies>
</project>
在虚拟机的Hbase的shell下建立一个简单的表用于测试
hbase shell
create 'student', 'Sname', 'Sno'
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YsuappOu-1653099528525)(image/image_rYDQ7M8u5L.png)]
在虚拟机使用命令hostname查看主机名称
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CnR52OyF-1653099528526)(image/image_mKt0WGdNpZ.png)]
在物理机上修改hosts文件,文件路径为
C:\Windows\System32\drivers\etc
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0WKTh2Cb-1653099528526)(image/image_mHqAH1ZEHb.png)]
使用记事本编辑,添加
ip hostname
ip为你虚拟机的ip地址
hostname为虚拟机主机名称
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HxceBPVz-1653099528526)(image/image_xPxxE-XiuG.png)]
编写测试连接的代码
package hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
public class HadoopExample1 {
public static void main(String[] args) {
String ip = "192.168.129.128";
Configuration conf = HBaseConfiguration.create();
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.set("hbase.rootdir", "hdfs://" + ip + ":9000/hbase");
conf.set("hbase.zookeeper.quorum", ip);
conf.set("hbase.zookeeper.property.clientPort","2181");
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
TableName[] tableNames = admin.listTableNames();
for (TableName tableName : tableNames) {
System.out.println((tableName));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行后输出内容
......
22/05/17 17:29:47 INFO zookeeper.ClientCnxnSocket: jute.maxbuffer value is 4194304 Bytes
22/05/17 17:29:47 INFO zookeeper.ClientCnxn: zookeeper.request.timeout value is 0. feature enabled=
22/05/17 17:29:47 INFO zookeeper.ClientCnxn: Opening socket connection to server ubuntu/192.168.129.128:2181. Will not attempt to authenticate using SASL (unknown error)
22/05/17 17:29:47 INFO zookeeper.ClientCnxn: Socket connection established, initiating session, client: /192.168.129.1:53460, server: ubuntu/192.168.129.128:2181
22/05/17 17:29:47 INFO zookeeper.ClientCnxn: Session establishment complete on server ubuntu/192.168.129.128:2181, sessionid = 0x100000221b7000a, negotiated timeout = 90000
student
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OIjQg7qg-1653099528527)(image/image_7CaLsL2KmG.png)]
这个student就是刚刚在虚拟机端建立的测试用的表,说明此时使用Java API对HBase进行连接成功。
示例程序
通过Java API 操控HBase创建示例表进行增删改查
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-g5T1L6Xw-1653099528527)(image/image_LEQoICbcHU.png)]
源程序:
运行的主类TestExample1:
package hbase.main;
import hbase.example.Example1;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
public class TestExample1 {
public static void main(String[] args) {
String ip = "192.168.1.108";
Example1 example1 = new Example1(ip);
try {
String tableName = "studentInfo";
String family = "score";
example1.create("studentInfo", "name", "score");
example1.put(tableName, "zhangsan", family, "English", "78");
example1.put(tableName, "zhangsan", family, "Math", "55");
example1.put(tableName, "zhangsan", family, "Computer", "87");
example1.put(tableName, "lisi", family, "English", "23");
example1.put(tableName, "lisi", family, "Math", "88");
example1.put(tableName, "lisi", family, "Computer", "99");
String[] qualifies = {"English", "Math", "Computer"};
Result[] ZS_results = example1.gets("studentInfo", "zhangsan", "score", qualifies);
Result[] LS_results = example1.gets("studentInfo", "lisi", "score", qualifies);
System.out.println("---------------zhangsan---------------");
for (int i = 0; i < ZS_results.length; i++) {
String value = new String(ZS_results[i].getValue(family.getBytes(), qualifies[i].getBytes()));
System.out.print(value + ",");
}
System.out.println();
System.out.println("---------------lisi---------------");
for (int i = 0; i < LS_results.length; i++) {
String value = new String(LS_results[i].getValue(family.getBytes(), qualifies[i].getBytes()));
System.out.print(value + ",");
}
System.out.println();
System.out.println("--------------update-----------------");
example1.put(tableName, "zhangsan", family, "English", "0");
String value = new String(example1.get(tableName, "zhangsan", family, "English")
.getValue(family.getBytes(), "English".getBytes()));
System.out.println("修改后的score:English数据:" + value);
System.out.println("------------------delete--------------");
example1.delete(tableName, "zhangsan", family, "English");
ResultScanner scan = example1.scan(tableName, family);
Iterator<Result> iterator = scan.iterator();
while (iterator.hasNext()) {
Result next = iterator.next();
NavigableMap<byte[], byte[]> familyMap = next.getFamilyMap(family.getBytes());
for (Map.Entry<byte[], byte[]> m : familyMap.entrySet()) {
System.out.printf("%s, %s\n", new String(m.getKey()), new String(m.getValue()));
}
System.out.printf("\n----------------------------\n");
}
} catch (IOException e) {
e.printStackTrace();
}
example1.close();
}
}
连接HBase的工具类ConnectToHBase:
package hbase.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
public class ConnectToHBase {
public static Connection getConnectionAsDefault(String ip) {
Configuration conf = HBaseConfiguration.create();
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.set("hbase.rootdir", "hdfs://" + ip + ":9000/hbase");
conf.set("hbase.zookeeper.quorum", ip);
conf.set("hbase.zookeeper.property.clientPort","2181");
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
return connection;
}
}
执行增删改查的类Example1:
package hbase.example;
import hbase.utils.ConnectToHBase;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class Example1 {
private Connection connection = null;
private Admin admin = null;
public Example1(String ip) {
connection = ConnectToHBase.getConnectionAsDefault(ip);
try {
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
public void create(String ...columns) throws IOException {
TableName tableName = TableName.valueOf(columns[0]);
if (admin.tableExists(tableName)) {
dropTable(columns[0]);
}
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
for (int i = 1; i < columns.length; i++) {
ColumnFamilyDescriptor columnFamily = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columns[i])).build();
tableDescriptorBuilder.setColumnFamily(columnFamily);
}
admin.createTable(tableDescriptorBuilder.build());
}
public void put(String tableName, String row, String family, String qualify, String value) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(row.getBytes());
put.addColumn(family.getBytes(), qualify == null ? null : qualify.getBytes(), value.getBytes());
table.put(put);
table.close();
}
public Result get(String tableName, String row, String family, String qualify) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(row.getBytes());
get.addColumn(family.getBytes(), qualify.getBytes());
Result result = table.get(get);
table.close();
return result;
}
public Result[] gets(String tableName, String row, String family, String[] qualifies) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
List<Get> list = new ArrayList<>();
for (String qualify : qualifies) {
Get get = new Get(row.getBytes());
get.addColumn(family.getBytes(), qualify.getBytes());
list.add(get);
}
Result[] results = table.get(list);
return results;
}
public void delete(String tableName, String row, String family, String qualify) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(row.getBytes());
delete.addColumn(family.getBytes(), qualify.getBytes());
table.delete(delete);
table.close();
}
public ResultScanner scan(String tableName, String family) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
ResultScanner scanner = table.getScanner(family.getBytes());
return scanner;
}
public void dropTable(String tableName) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
admin.disableTable(table.getName());
admin.deleteTable(table.getName());
}
public void close() {
try {
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行程序后的结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-I1PdaVWA-1653099528528)(image/image_5k0-rsbZYg.png)]
需要注意的是
当执行更新数据后对该数据所在Cell进行删除不会将这个Cell直接删除,而是删除最新时间戳的数据,在删除后进行数据查看可以看到更新前的数据。
如上面程序对zhangsan,score:English数据设置为0后,删除该条数据后进行查看数据,按道理应该是查看不到的,但是却显示的是最开始插入的数据。
对程序打断点进行debug,在ubuntu端进行查看就可以发现:
这个是更新后的数据:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CyYR679R-1653099528528)(image/image_IiVKUgC6G8.png)]
zhangsan,score:English这个数据的时间戳是05:59:14,
在删除这条数据后:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8RXwUDZ4-1653099528528)(image/image_wQarJySuy_.png)]
该条数据的时间戳变成和插入该数据时的一样了,数据的值也变成的插入时的值,这就说明删除不是将数据直接删除,更新也不是直接用数据进行替换,而是保留下来,相当于插入了一个新的值,旧的值仍然保留,查询只是查询最新的值进行显示。
实验程序内容:
实验要求:
编程完成以下指定内容:
createTable(String tableName, String[] fields)
创建表,参数tableName为表的名称,字符串数组fields为存储记录各个域名称的数组。要求当Hbase已存在名为tableName的表的时候,先删除原有的表,再创建新的表。
addRecord(String tableName, String row, String[] fields, String[] values)
向表tableName,行row和字符串数组fields指定的单元格中添加对应的数据values。其中,如果fields中每个元素对应的列族下还有相应的列限定符,用“columnFamily:column”表示。例如同时向“Math”“computer science”“English”3列添加成绩时,用字符串数组fields为{“score:Math”, score: computer science", “score:English”},数组values储存这3门课的成绩。
scanColumn(String tableName, String column)
浏览表tableName某一列的数据,如果某一行记录中该列数据不存在,则返回null。要求当参数column为某一列族名称时,如果底下有若干个列限定符,则列出每个列限定符代表的列的数据;当参数column为某一列具体名称如score:Math时,只需要列出该列数据。
modifyData(String tableName, String row, String column, String value)
修改表tableName,行row,列column指定的单元格的数据
deleteRow(String tableName, String row)
删除表tableName中row指定的行的记录
实验代码:
连接HBase的自定义工具类代码ConnectToHBase :
package hbase.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
public class ConnectToHBase {
public static Connection getConnectionAsDefault(String ip) {
Configuration conf = HBaseConfiguration.create();
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.set("hbase.rootdir", "hdfs://" + ip + ":9000/hbase");
conf.set("hbase.zookeeper.quorum", ip);
conf.set("hbase.zookeeper.property.clientPort","2181");
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
return connection;
}
}
主程序代码Example:
package hbase.example;
import hbase.utils.ConnectToHBase;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class Example2 {
Connection connection = null;
Admin admin = null;
public Example2(String ip) throws IOException {
connection = ConnectToHBase.getConnectionAsDefault(ip);
admin = connection.getAdmin();
}
public void createTable(String tableName, String[] fields) throws IOException {
TableName tableNameEntity = TableName.valueOf(tableName);
if (admin.tableExists(tableNameEntity)) {
dropTable(tableName);
}
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableNameEntity);
for (String field : fields) {
ColumnFamilyDescriptor column = ColumnFamilyDescriptorBuilder.newBuilder(field.getBytes()).build();
tableDescriptorBuilder.setColumnFamily(column);
}
admin.createTable(tableDescriptorBuilder.build());
}
public void dropTable(String tableName) throws IOException {
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
}
public void addRecord(String tableName, String row, String[] fields, String[] values) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
List<Put> puts = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
Put put = new Put(row.getBytes());
boolean hasQualify = fields[i].indexOf(":") > 0;
String family = fields[i].split(":")[0];
String quality = hasQualify ? fields[i].split(":")[1] : "";
put.addColumn(family.getBytes(), quality.getBytes(), values[i].getBytes());
puts.add(put);
}
table.put(puts);
table.close();
}
public ResultScanner scanColumn(String tableName, String column) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
boolean hasQualify = column.indexOf(":") > 0;
String family = column.split(":")[0];
String quality = hasQualify ? column.split(":")[1] : "";
if (hasQualify) {
scan.addColumn(family.getBytes(), quality.getBytes());
} else {
scan.addFamily(family.getBytes());
}
ResultScanner scanner = table.getScanner(scan);
table.close();
return scanner;
}
public void printResultScanner(ResultScanner results) {
Iterator<Result> iterator = results.iterator();
while (iterator.hasNext()) {
Result next = iterator.next();
List<Cell> cells = next.listCells();
for (Cell cell : cells) {
String value = Bytes.toString(CellUtil.cloneValue(cell));
String row = Bytes.toString(CellUtil.cloneRow(cell));
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualify = Bytes.toString(CellUtil.cloneQualifier(cell));
if (qualify.isEmpty()) {
qualify = "null";
}
System.out.printf("row:%s family:%s qualify:%s value:%s\n", row, family, qualify, value);
}
}
}
public void modifyData(String tableName, String row, String column, String value) throws IOException{
addRecord(tableName, row, new String[]{column}, new String[]{value});
}
public void deleteRow(String tableName, String row) throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(row.getBytes());
table.delete(delete);
table.close();
}
public void close() {
try {
if (admin != null) {
admin.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
测试程序代码TestExample2:
package hbase.main;
import hbase.example.Example2;
import org.apache.hadoop.hbase.client.ResultScanner;
import java.io.IOException;
public class TestExample2 {
public static void main(String[] args) {
String ip = "192.168.1.108";
try {
Example2 example2 = new Example2(ip);
String[] fields = {"sNo", "student", "course"};
example2.createTable("sc", fields);
String[] fields2 = {"student:sName", "student:sSex", "student:sAge",
"course:Math", "course:computer science", "course:English"};
String[] zhangsan = {"Zhangsan", "male", "23", "86", "", "69"};
String[] mary = {"Mary", "female", "22", "", "77", "99"};
String[] lisi = {"Lisi", "male", "24", "98", "95", ""};
example2.addRecord("sc", "2015001", fields2, zhangsan);
example2.addRecord("sc", "2015002", fields2, mary);
example2.addRecord("sc", "2015003", fields2, lisi);
example2.modifyData("sc", "2015001", "course:Math", "100");
example2.deleteRow("sc", "2015002");
ResultScanner results = example2.scanColumn("sc", "course");
example2.printResultScanner(results);
example2.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
测试运行截图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d7bv5lEP-1653099528529)(image/image_ruPjryRwVB.png)]
ubuntu端截图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p0bVaoZr-1653099528530)(image/image_T5LCDtu0__.png)]
更好的阅读体验:https://www.wolai.com/5pTShabtu4t3x3xe3vtQp1
完整项目源文件:
Hadoop.7z
你也可以在github上下载:
https://github.com/waht-X/Big-Data-Lib.git
或者是百度网盘:
链接:https://pan.baidu.com/s/1yDYi7EQ1HkWBB3LgZPsC0A?pwd=bNo1 提取码:bNo1
资料参考:
在windows中运行HbaseAPI时报错:Can not resolve cts02, please check your network java.net.UnknownHostExceptio_wangxiaolong0的博客-CSDN博客
HBase编程实践_wr456wr的博客-CSDN博客
实验一 熟悉常用的Linux操作和Hadoop操作_wr456wr的博客-CSDN博客
(最详细)JAVA如何连接虚拟机的HBASE和hadoop(JAVA如何远程访问虚拟机HBASE) - 乌拉乌拉!!! - 博客园 (cnblogs.com)
HBase新版本Java API编程实战及基本操作方法封装 - 云+社区 - 腾讯云 (tencent.com)
hadoop - I get some exception when I run a HBase program with Java API - Stack Overflow
HBase: Connection refused: no further information, Call to localhost/127.0.0.1:16020 failed_qiaotongzxcv的博客-CSDN博客
记一次zookeeper连接慢的问题和解决方法_clooker的博客-CSDN博客_zookeeper连接慢
HBase扫描操作Scan - 简书 (jianshu.com)
|