package com.newegg.modesty.service;
import com.newegg.modesty.config.AutoPartsConfig;
import com.newegg.modesty.hbase.HbaseOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class AuroPartsService {
@Autowired
private AutoPartsConfig config;
public void startJob(){
HbaseOperation hbase=new HbaseOperation(config.getHbaseHost(),config.getHbaseTable());
hbase.initConnect();
hbase.scanTable();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>HbaseDemoWork</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<artifactId>logback-classic</artifactId>
<groupId>ch.qos.logback</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.9.3</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.newegg.modesty.constant;
public class Constant {
private Constant(){
}
public static final String NAME_COUNT = "NameCount";
public static final String PHONE_COUNT = "PhoneCount";
public static final String AGE_COUNT = "AgeCount";
public static final String EMAIL_COUNT = "EmailCount";
public static final String FAMILY_INFO = "information";
public static final String FAMILY_CONTACT = "contact";
public static final String COLUMN_AGE = "age";
public static final String COLUMN_NAME = "name";
public static final String COLUMN_PHONE = "phone";
public static final String COLUMN_EMAIL = "email";
}
package com.newegg.modesty.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AutoPartsConfig {
@Value("${hbase.hosts}")
private String hbaseHost;
@Value("${hbase.table}")
private String hbaseTable;
public String getHbaseHost() {
return hbaseHost;
}
public void setHbaseHost(String hbaseHost) {
this.hbaseHost = hbaseHost;
}
public String getHbaseTable() {
return hbaseTable;
}
public void setHbaseTable(String hbaseTable) {
this.hbaseTable = hbaseTable;
}
}
package com.newegg.modesty.hbase;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.newegg.modesty.constant.Constant;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class HbaseOperation {
private static final Logger LOGGER = LoggerFactory.getLogger(HbaseOperation.class);
private Connection connection;
private String hbaseHost;
private String hbaseTable;
private ExecutorService executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(1024));
public HbaseOperation(String hbaseHost, String hbaseTable) {
this.hbaseHost = hbaseHost;
this.hbaseTable = hbaseTable;
}
public void initConnect() {
Configuration baseConf = HBaseConfiguration.create();
baseConf.set("hbase.zookeeper.quorum", hbaseHost);
try {
connection = initConnect(connection, baseConf);
} catch (IOException e) {
LOGGER.error("connect hbase has error", e);
}
}
private Connection initConnect(Connection connection, Configuration configuration) throws IOException {
if (Objects.nonNull(configuration) && Objects.nonNull(connection)) {
connection.close();
}
connection = ConnectionFactory.createConnection(configuration);
return connection;
}
public ConcurrentHashMap<String, AtomicInteger> scanTable() {
Table table = null;
LOGGER.info("begin to run hbase data");
TableName tableName = TableName.valueOf(this.hbaseTable);
try {
table = connection.getTable(tableName);
List<HRegionInfo> regionInfoList = connection.getAdmin().getTableRegions(tableName);
LOGGER.info("fetch table regionInfoList:{}", regionInfoList);
if (Objects.nonNull(regionInfoList) && !CollectionUtils.isEmpty(regionInfoList)) {
return scanMultipleRegions(table, regionInfoList);
}
} catch (IOException e) {
LOGGER.info("scan table has error,", e);
} finally {
if (Objects.nonNull(table)) {
try {
table.close();
} catch (IOException e) {
LOGGER.info("table close has error,", e);
}
}
}
return new ConcurrentHashMap<>();
}
private ConcurrentHashMap<String, AtomicInteger> scanMultipleRegions(final Table table, List<HRegionInfo> regionInfoList) {
LOGGER.info("start run multiple region function.");
ConcurrentHashMap<String, AtomicInteger> result = new ConcurrentHashMap<>();
result.put(Constant.NAME_COUNT, new AtomicInteger(0));
result.put(Constant.PHONE_COUNT, new AtomicInteger(0));
result.put(Constant.AGE_COUNT, new AtomicInteger(0));
result.put(Constant.EMAIL_COUNT, new AtomicInteger(0));
Map<String, Future<?>> futureMap = new LinkedHashMap<>();
for (HRegionInfo hRegionInfo : regionInfoList) {
Future<?> f = executorService.submit(() -> scanTableByRegion(hRegionInfo, result, table));
futureMap.put(hRegionInfo.getRegionNameAsString(), f);
}
if (!CollectionUtils.isEmpty(futureMap)) {
for (Map.Entry<String, Future<?>> entry : futureMap.entrySet()) {
Future<?> future = futureMap.get(entry.getKey());
try {
LOGGER.info("start to wait region result:[{}]", entry.getKey());
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
LOGGER.info("wait thread pool has error.", e);
Thread.currentThread().interrupt();
}
}
LOGGER.info("end to scan table");
}
print("End result", result);
return result;
}
private void print(String message, ConcurrentHashMap<String, AtomicInteger> resultMap) {
LOGGER.info(message + ", nameCount:{}, phoneCount:{}, ageCount:{}, emailCount:{}", resultMap.get(Constant.NAME_COUNT),
resultMap.get(Constant.PHONE_COUNT), resultMap.get(Constant.AGE_COUNT), resultMap.get(Constant.EMAIL_COUNT));
}
private void scanTableByRegion(HRegionInfo hRegionInfo, ConcurrentHashMap<String, AtomicInteger> resultMap, Table finalTable) {
LOGGER.info("start to scan region:[{}]", Objects.nonNull(hRegionInfo) ? hRegionInfo.getRegionNameAsString() : "all");
long start = System.currentTimeMillis();
String regionName = "all";
Scan scan = new Scan();
try {
if (Objects.nonNull(hRegionInfo)) {
regionName = hRegionInfo.getRegionNameAsString();
LOGGER.info("region [{}] start:{}, end:{}", regionName,
Bytes.toString(hRegionInfo.getStartKey()), Bytes.toString(hRegionInfo.getEndKey()));
scan.setStartRow(hRegionInfo.getStartKey());
scan.setStopRow(hRegionInfo.getEndKey());
}
scan.setCaching(5000);
ResultScanner results = finalTable.getScanner(scan);
long rowCount = 0;
long itemCount = 0;
for (Result result : results) {
transFormResult(result, resultMap);
rowCount++;
if (rowCount % 5000 == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
print(new String(result.getRow()), resultMap);
}
}
LOGGER.info("end to scan region:[{}], row sum is {},hbase item sum is {}, used time {} s.",
regionName, rowCount, itemCount,
(System.currentTimeMillis() - start) / 1000);
} catch (IOException e) {
LOGGER.info("search table has error,", e);
Thread.currentThread().interrupt();
}
}
private void transFormResult(Result result, ConcurrentHashMap<String, AtomicInteger> resultMap) {
try {
byte[] infoNames = result.getValue(Constant.FAMILY_INFO.getBytes(StandardCharsets.UTF_8), Constant.COLUMN_NAME.getBytes(StandardCharsets.UTF_8));
byte[] infoAge = result.getValue(Constant.FAMILY_INFO.getBytes(StandardCharsets.UTF_8), Constant.COLUMN_AGE.getBytes(StandardCharsets.UTF_8));
byte[] contPhones = result.getValue(Constant.FAMILY_CONTACT.getBytes(StandardCharsets.UTF_8), Constant.COLUMN_PHONE.getBytes(StandardCharsets.UTF_8));
byte[] contEmail = result.getValue(Constant.FAMILY_CONTACT.getBytes(StandardCharsets.UTF_8), Constant.COLUMN_EMAIL.getBytes(StandardCharsets.UTF_8));
if (!Objects.isNull(infoNames)) {
String data = new String(infoNames, StandardCharsets.UTF_8);
if (data.length() > 0) {
resultMap.get(Constant.NAME_COUNT).addAndGet(1);
}
}
if (!Objects.isNull(infoAge)) {
String data = new String(infoAge, StandardCharsets.UTF_8);
if (data.length() > 0) {
resultMap.get(Constant.AGE_COUNT).addAndGet(1);
}
}
if (!Objects.isNull(contPhones)) {
String data = new String(contPhones, StandardCharsets.UTF_8);
if (data.length() > 0) {
resultMap.get(Constant.PHONE_COUNT).addAndGet(1);
}
}
if (!Objects.isNull(contEmail)) {
String data = new String(contEmail, StandardCharsets.UTF_8);
if (data.length() > 0) {
System.out.println(data);
resultMap.get(Constant.EMAIL_COUNT).addAndGet(1);
}
}
} catch (Exception e) {
LOGGER.info("Analysis row[{}] error.", new String(result.getRow()));
}
}
}
注释
package com.newegg.modesty.hbase;
import com.newegg.modesty.Config.Config;
import com.newegg.modesty.property.Prop;
import org.apache.commons.configuration.ConfigurationFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class HbaseJob {
private static final Logger LOGGER = LoggerFactory.getLogger(HbaseJob.class);
private Connection connection;
private String hbaseHost;
private String hbaseTable;
private ExecutorService executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MINUTES, new LinkedBlockingDeque<>(1024));
public HbaseJob(String hbaseHost, String hbaseTable) {
this.hbaseHost = hbaseHost;
this.hbaseTable = hbaseTable;
}
public void initConnect() {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", hbaseHost);
try {
connection = initConnect(connection, configuration);
} catch (IOException e) {
LOGGER.error("connect has a error" + e);
}
}
private Connection initConnect(Connection connection, Configuration configuration) throws IOException {
if (Objects.nonNull(configuration) && Objects.nonNull(connection)) {
connection.close();
}
return ConnectionFactory.createConnection();
}
public HashMap<String, AtomicInteger> scanTable() {
Table table = null;
LOGGER.info("start rub hbase data");
TableName tableName = TableName.valueOf(hbaseTable);
try {
table = connection.getTable(tableName);
List<HRegionInfo> regionsInfoList = connection.getAdmin().getTableRegions(tableName);
LOGGER.info("regionsInfoList:{}", regionsInfoList);
if (!CollectionUtils.isEmpty(regionsInfoList)) {
return scanMultipleRegions(table, regionsInfoList);
}
} catch (IOException e) {
LOGGER.error("scan table has error" + e);
} finally {
if (Objects.nonNull(table)) {
try {
table.close();
} catch (IOException e) {
LOGGER.error("table close has error" + e);
}
}
}
return new HashMap<>();
}
private HashMap<String, AtomicInteger> scanMultipleRegions(Table table, List<HRegionInfo> regionsInfoList) {
LOGGER.info("start scan multiple table");
HashMap<String, AtomicInteger> result = new HashMap<>();
result.put(Prop.COLUMN_NAME, new AtomicInteger(0));
result.put(Prop.COLUMN_AGE, new AtomicInteger(0));
result.put(Prop.COLUMN_PHONE, new AtomicInteger(0));
Map<String, Future<?>> futureMap = new LinkedHashMap<>();
for (HRegionInfo hRegionInfo : regionsInfoList) {
Future<?> f = executorService.submit(() -> scanTableByRegion(hRegionInfo, result, table));
futureMap.put(hRegionInfo.getRegionNameAsString(), f);
}
if (!CollectionUtils.isEmpty(futureMap)) {
for (Map.Entry<String, Future<?>> entry : futureMap.entrySet()) {
Future<?> future = futureMap.get(entry.getKey());
LOGGER.info("start to region result:[{}]", entry.getKey());
try {
future.get();
} catch (Exception e) {
LOGGER.error("region result has error", e);
Thread.currentThread().interrupt();
}
}
LOGGER.info("end to scan table");
}
print("End result", result);
return result;
}
private void print(String message, HashMap<String, AtomicInteger> resultMap) {
LOGGER.info(message + ", nameCount:{}, phoneCount:{}, emailCount{},ageCount:{}", resultMap.get(Prop.COUNT_NAME),
resultMap.get(Prop.COLUMN_PHONE), resultMap.get(Prop.COUNT_EMAIL), resultMap.get(Prop.COUNT_AGE));
}
private void scanTableByRegion(HRegionInfo hRegionInfo, HashMap<String, AtomicInteger> resultMap, Table table) {
LOGGER.info("start to scan region:[{}]", Objects.nonNull(hRegionInfo) ? hRegionInfo.getRegionNameAsString() : "all");
long start = System.currentTimeMillis();
String regionName = "all";
Scan scan = new Scan();
if (Objects.nonNull(hRegionInfo)) {
regionName = hRegionInfo.getRegionNameAsString();
LOGGER.info("region[{}] start{},end{}", regionName, Bytes.toString(hRegionInfo.getStartKey()), Bytes.toString(hRegionInfo.getEndKey()));
scan.setStartRow(hRegionInfo.getStartKey());
scan.setStopRow(hRegionInfo.getEndKey());
}
scan.setCaching(100);
try {
ResultScanner results = table.getScanner(scan);
for (Result result : results) {
transFormResult(result, resultMap);
}
} catch (IOException e) {
LOGGER.error("table scan has error" + e);
}
}
private void transFormResult(Result result, HashMap<String, AtomicInteger> resultMap) {
try {
byte[] infoNames = result.getValue(Prop.FAMILY_INFO.getBytes(StandardCharsets.UTF_8), Prop.COLUMN_NAME.getBytes(StandardCharsets.UTF_8));
byte[] infoAge = result.getValue(Prop.FAMILY_INFO.getBytes(StandardCharsets.UTF_8), Prop.COLUMN_AGE.getBytes(StandardCharsets.UTF_8));
byte[] ConPhone = result.getValue(Prop.FAMILY_CONST.getBytes(StandardCharsets.UTF_8), Prop.COLUMN_PHONE.getBytes(StandardCharsets.UTF_8));
byte[] ConEmail = result.getValue(Prop.FAMILY_CONST.getBytes(StandardCharsets.UTF_8), Prop.COLUMN_EMAIL.getBytes(StandardCharsets.UTF_8));
if (Objects.nonNull(infoNames)) {
String data = new String(infoNames, StandardCharsets.UTF_8);
if (data.length() > 0) {
resultMap.get(Prop.COUNT_NAME).addAndGet(1);
}
}
if (Objects.nonNull(infoAge)) {
String data = new String(infoAge, StandardCharsets.UTF_8);
if (data.length() > 0) {
resultMap.get(Prop.COUNT_AGE).addAndGet(1);
}
}
if (Objects.nonNull(ConPhone)) {
String data = new String(ConPhone, StandardCharsets.UTF_8);
if (data.length() > 0) {
resultMap.get(Prop.COUNT_PHONE).addAndGet(1);
}
}
if (Objects.nonNull(ConEmail)) {
String data = new String(ConEmail, StandardCharsets.UTF_8);
if (data.length() > 0) {
resultMap.get(Prop.COUNT_EMAIL).addAndGet(1);
}
}
} catch (Exception e) {
LOGGER.error("add data has error" + e);
}
}
}
|