IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink连接Hbase -> 正文阅读

[大数据]Flink连接Hbase

1.添加依赖

<properties>
        <flink.version>1.13.6</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
        <fastjson.version>1.2.68</fastjson.version>
        <lombok.version>1.18.12</lombok.version>
        <hbase.version>2.1.0</hbase.version>
    </properties>
    <dependencies>
        <!--hbase客户端-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <!--flink流环境-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

2.Hbase连接工具类

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;

import java.util.List;

public class HbaseUtils extends RichSourceFunction<String> {

    private Connection connection = null;
    private ResultScanner rs = null;
    private Table table = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        org.apache.hadoop.conf.Configuration hconf = HBaseConfiguration.create();
        hconf.set("hbase.zookeeper.quorum", "IP:port");//192.168.23.39:2181
        hconf.set("zookeeper.znode.parent", "/hbase");

        //指定用户名为hbase的用户去访问hbase服务
        UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("username");
        connection = ConnectionFactory.createConnection(hconf, User.create(userGroupInformation));

        table = connection.getTable(TableName.valueOf("权限:表名"));//hbase:user
    }

    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        Scan scan = new Scan();
//        scan.addFamily(Bytes.toBytes("user_info"));
//        scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("user_name"));

        // Set the cache size.
        scan.setCaching(100);
        rs = table.getScanner(scan);
        
        for (Result result : rs) {
            StringBuilder sb = new StringBuilder();
            List<Cell> cells = result.listCells();
            for (Cell cell : cells) {
                String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                sb.append(value).append("-");
            }
            String value = sb.replace(sb.length() - 1, sb.length(), "").toString();
            sourceContext.collect(value);
        }
    }

    @Override
    public void cancel() {

    }

    @Override
    public void close() throws Exception {
        if (rs != null) rs.close();
        if (table != null) table.close();
        if (connection != null) connection.close();
    }
}

3.执行读取Hbase数据

import com.data.utils.HbaseUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkHbaseApp {
    public static void main(String[] args) throws Exception {
    	//1.获取流式环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		//2.从hbase读取数据
        DataStream<String> stream = env.addSource(new HbaseUtils());
        stream.print("==============");
        //3.提交任务,开始执行
        env.execute();
    }
}

4.错误日志,及解决方式

4.1AccessDeniedException

  • 错误日志

禁止访问

在这里插入图片描述

  • 解决方案

在默认情况情况,我们使用hbase的java api去访问hbase的服务时,使用的hbase的服务的用户名为启动java程序的系统用户名。
在有些情况下,我们要指定用户。


//指定用户名为hbase的用户去访问hbase服务
UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("hbase");
connection = ConnectionFactory.createConnection(configuration, User.create(userGroupInformation));

4.2NoSuchColumnFamilyException

  • 错误日志
    在这里插入图片描述
  • 解决方案

其实报错信息已经很明显了 ,就是HBase表中不存在我读取的那个列族, 然后仔细检查了下, 发现是由于自己写列族名写错了 , 然后更改过来即可

在这里插入图片描述

我读取的那两个表其实是没有info这个列族的 之前的表用到了,然后马虎,这里记下来 避免再次发生错误.

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-14 10:00:40  更:2022-05-14 10:01:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 5:54:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码