1.添加依赖
<properties>
<flink.version>1.13.6</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
<fastjson.version>1.2.68</fastjson.version>
<lombok.version>1.18.12</lombok.version>
<hbase.version>2.1.0</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2.Hbase连接工具类
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import java.util.List;
public class HbaseUtils extends RichSourceFunction<String> {
private Connection connection = null;
private ResultScanner rs = null;
private Table table = null;
@Override
public void open(Configuration parameters) throws Exception {
org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
hconf.set("hbase.zookeeper.quorum", "IP:port");
hconf.set("zookeeper.znode.parent", "/hbase");
UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("username");
connection = ConnectionFactory.createConnection(hconf, User.create(userGroupInformation));
table = connection.getTable(TableName.valueOf("权限:表名"));
}
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
Scan scan = new Scan();
scan.setCaching(100);
rs = table.getScanner(scan);
for (Result result : rs) {
StringBuilder sb = new StringBuilder();
List<Cell> cells = result.listCells();
for (Cell cell : cells) {
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
sb.append(value).append("-");
}
String value = sb.replace(sb.length() - 1, sb.length(), "").toString();
sourceContext.collect(value);
}
}
@Override
public void cancel() {
}
@Override
public void close() throws Exception {
if (rs != null) rs.close();
if (table != null) table.close();
if (connection != null) connection.close();
}
}
3.执行读取Hbase数据
import com.data.utils.HbaseUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkHbaseApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.addSource(new HbaseUtils());
stream.print("==============");
env.execute();
}
}
4.错误日志,及解决方式
4.1AccessDeniedException
禁止访问
在默认情况情况,我们使用hbase的java api去访问hbase的服务时,使用的hbase的服务的用户名为启动java程序的系统用户名。 在有些情况下,我们要指定用户。
UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("hbase");
connection = ConnectionFactory.createConnection(configuration, User.create(userGroupInformation));
4.2NoSuchColumnFamilyException
- 错误日志
- 解决方案
其实报错信息已经很明显了 ,就是HBase表中不存在我读取的那个列族, 然后仔细检查了下, 发现是由于自己写列族名写错了 , 然后更改过来即可
我读取的那两个表其实是没有info这个列族的 之前的表用到了,然后马虎,这里记下来 避免再次发生错误.
|