一、功能描述
本功能组件主要通过Java的API实现HBase的操作。利用log4j进行数据迁移过程的记录,采取批处理的方式实现数据迁移的过程。
技术实现
- 利用Java的API连接HBase数据库
- 利用log4j将执行信息进行输出,并捕获异常
二、依赖导入
首先,在Maven工程中的pom.xml的<dependencies></dependencies>中添加以下的依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
三、配置信息
3.1log4j的配置
log4j的配置文件主要实现程序运行的日志信息的输出,以及程序异常信息的输出,方便开发者以及运维人员进行程序状态的动态感知。
#指定log4j的输出信息
log4j.rootLogger=INFO, stdout, logfile
#指定log4j的标准输出
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
#指定log4j的标准输出的样式
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#指定标准输出的转换的格式
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
#指定日志文件的输出
log4j.appender.logfile=org.apache.log4j.FileAppender
#指定log4j的输出路径文件名
log4j.appender.logfile.File=log/hd.log
#指定日志日志输出样式
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
#指定日志文件的转换格式
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
3.2连接配置
#java连接HBase的配置
#定义连接端口号
hbase.zookeeper.property.clientPort=2181
#定义Zookeeper选举队列
hbase.zookeeper.quorum=10.68.128.215,10.68.128.211,10.68.128.212
#定义HBase的主节点
hbase.master=hdfs://10.68.128.215:60000
#定义hdfs的路径
hbase.root.dir=hdfs://10.68.128.215:9000/hbase
#java连接单节点配置
#访问HBase需要通过Zookeeper进行访问
hbase.zk=single01:2181
四、Configuration
Configuration主要实现配置信息的读取,并将读取的方法进行相应的封装。
public class Configuration {
private static Logger logger = Logger.getLogger(Configuration.class);
private static final String STR_EMPTY = "NULL";
private static final String INT_EMPTY = "0";
private static Properties config;
static {
config = new Properties();
URL datasource = Thread.currentThread().getContextClassLoader().getResource("datasource.properties");
try {
config.load(new FileReader(datasource.getPath()));
} catch (IOException e) {
logger.error(e);
System.exit(-1);
}
}
protected static String getString(String key){
return config.getProperty(key, STR_EMPTY);
}
protected static int getInt(String key){
return Integer.parseInt(config.getProperty(key,INT_EMPTY));
}
}
五、Common
Common接口主要实现自动关闭的方法,实现了时间计算的方法
public interface Common {
float MILLI_SEC = 1000.0f;
default void close(AutoCloseable...closes){
for (AutoCloseable close : closes) {
if (null!=close) {
try {
close.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
default String timeCost(long begin){
return "time consumption : "+(System.currentTimeMillis()-begin)/MILLI_SEC;
}
}
六、Java操作HBase集群
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
public class Query extends Configuration implements Common{
private static Logger logger = Logger.getLogger(Query.class);
public static Configuration configuration;
public static Connection connection;
public static Admin admin;
public static void init() {
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort",getString( "hbase.zookeeper.property.clientPort"));
configuration.set("hbase.zookeeper.quorum", getString( "hbase.zookeeper.quorum"));
configuration.set("hbase.master", getString("hbase.master"));
configuration.set("hbase.root.dir", getString("hbase.root.dir"));
try {
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void close() {
try {
if (null != admin) {
admin.close();
}
if (null != connection) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void createTable(String tableName, String[] cols) throws IOException {
init();
TableName tName = TableName.valueOf(tableName);
if (admin.tableExists(tName)) {
println(tableName + " exists.");
} else {
HTableDescriptor hTableDesc = new HTableDescriptor(tName);
for (String col : cols) {
HColumnDescriptor hColumnDesc = new HColumnDescriptor(col);
hTableDesc.addFamily(hColumnDesc);
}
admin.createTable(hTableDesc);
}
close(admin,connection);
}
public static void deleteTable(String tableName) throws IOException {
init();
TableName tName = TableName.valueOf(tableName);
if (admin.tableExists(tName)) {
admin.disableTable(tName);
admin.deleteTable(tName);
} else {
println(tableName + " not exists.");
}
close(admin,connection);
}
public static void listTables() {
init();
HTableDescriptor hTableDescriptors[] = null;
try {
hTableDescriptors = admin.listTables();
} catch (IOException e) {
e.printStackTrace();
}
for (HTableDescriptor hTableDescriptor : hTableDescriptors) {
println(hTableDescriptor.getNameAsString());
}
close(admin,connection);
}
public static void insert(String tableName, String rowKey, String colFamily, String col, String value) throws IOException {
init();
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes(value));
table.put(put);
table.close();
close(admin,connection);
}
public static void delete(String tableName, String rowKey, String colFamily, String col) throws IOException {
init();
if (!admin.tableExists(TableName.valueOf(tableName))) {
println(tableName + " not exists.");
} else {
Table table = connection.getTable(TableName.valueOf(tableName));
Delete del = new Delete(Bytes.toBytes(rowKey));
if (colFamily != null) {
del.addFamily(Bytes.toBytes(colFamily));
}
if (colFamily != null && col != null) {
del.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
}
table.delete(del);
table.close();
}
close(admin,connection);
}
public static void getData(String tableName, String rowKey, String colFamily, String col) throws IOException {
init();
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
if (colFamily != null) {
get.addFamily(Bytes.toBytes(colFamily));
}
if (colFamily != null && col != null) {
get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
}
Result result = table.get(get);
showCell(result);
table.close();
close(admin,connection);
}
public static void getData(String tableName, String rowKey) throws IOException {
getData(tableName, rowKey, null, null);
}
public static void showCell(Result result) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
println("RowName: " + new String(CellUtil.cloneRow(cell)) + " ");
println("Timetamp: " + cell.getTimestamp() + " ");
println("column Family: " + new String(CellUtil.cloneFamily(cell)) + " ");
println("row Name: " + new String(CellUtil.cloneQualifier(cell)) + " ");
println("value: " + new String(CellUtil.cloneValue(cell)) + " ");
}
}
private static void println(Object obj) {
System.out.println(obj);
}
}
|