IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 大数据入门--HBase(三)与MR交互集成 -> 正文阅读

[大数据]大数据入门--HBase(三)与MR交互集成

  1. 完成HBase与MR交互的配置
  2. HBase表作为MR的输入
  3. HBase表作为MR的输出

HBase与MR交互配置

官方文档敬上

  1. 查看 HBase 的 MapReduce 任务的执行
    [hadoop@hadoop101 hbase-1.3.1]$ bin/hbase mapredcp
    
  2. 设置环境变量
    vi /etc/profile 添加HBASE_HOME,HADOOP_HOME
    export HBASE_HOME=/opt/module/hbase-1.3.1
    export HADOOP_HOME=/opt/module/hadoop-3.1.3
    
    修改hadoop-env.sh,添加
    export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$HBASE_HOME/lib/*
    
  3. 测试程序运行
    yarn jar $HBASE_HOME/lib/hbase-server-1.3.1.jar rowcounter tab01
    
  4. 其他hbase机器重复上述配置
    sudo xsync /etc/profile
    xsync xsync $HADOOP_HOME/etc/hadoop/hadoop-env.sh
    

官方案例

将hdfs上面的/input_fruit/fruit.tsv文件内容导入到HBase的fruit表中
首先准备我们表中的内容:fruit.tsv(\t分割)

1001    Apple   Red
1002    Pear    Yellow
1003    Pineapple       Yellow

上传至hdfs

hadoop fs -put fruit.tsv /input_fruit

HBase中建立表:fruit

hbase shell
#....
Hbase(main):001:0> create 'fruit','info

执行MR程序

yarn jar lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop101:8020/input_fruit

HDFS–>HBase的MR实现

还是上面的案例,这里我们用自己的代码实现

分析:
Map:读取HDFS中的文件
Reducer:写入到HBase

测试数据
其中999数据为,切分不符合条件的数据,用来测试计数器

1001	Apple	Red
1002	Pear	Yellow
1003	Pineapple	Yellow
999 Pineapple	Yellow

Mapper.java

public class ReadHdfsFileMapper extends Mapper<LongWritable, Text,LongWritable,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //直接写出
        context.write(key,value);
    }
}

Reducer.java

public class WriteHBaseReducer extends TableReducer<LongWritable, Text, NullWritable> {
    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //遍历Values
        for (Text value : values) {
            String text = value.toString();
            String[] split = text.split("\t");
            if(split.length!=3){
                context.getCounter("ImportTsv","Bad Lines").increment(1L);
                continue;
            }
            Put put = new Put(Bytes.toBytes(split[0]));
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(split[1]));
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(split[2]));
            context.write(NullWritable.get(),put);
        }
    }
}

Driver.java

/**
 * 演示案例:HDFS文件转换为HBase表数据
 */
public class Hdfs2HBaseDriver implements Tool {

    private Configuration configuration;

    @Override
    public int run(String[] args) throws Exception {
        //创建job实例
        Job job = Job.getInstance(configuration);

        //设置驱动类所在ClassPath
        job.setJarByClass(Hdfs2HBaseDriver.class);

        //设置Mapper & 输出类型
        job.setMapperClass(ReadHdfsFileMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        //设置Reducr & 输出类型
        TableMapReduceUtil.initTableReducerJob("fruit01", WriteHBaseReducer.class, job);

        //设置FileInputFormat & FileOutputFormat
        FileInputFormat.setInputPaths(job, args[0]);

        //执行Job
        boolean result = job.waitForCompletion(true);

        return result ? 0 : 1;
    }

    @Override
    public void setConf(Configuration configuration) {
        this.configuration = configuration;
    }

    @Override
    public Configuration getConf() {
        return configuration;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        ToolRunner.run(conf, new Hdfs2HBaseDriver(),args);
    }
}

本地调试

默认我们的应用是需要发布到集群上运行的
本地调试的话,按照如下步骤进行配置

  1. Configuration通过HBaseConfiguration.create();进行创建
public static void main(String[] args) throws Exception {
	Configuration conf = HBaseConfiguration.create();
	ToolRunner.run(conf, new Dirver(), args);
}
  1. 在resources下放入hbase-site.xml,内容与集群配置相同
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://hadoop101:8020/HBase</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <!-- 0.98 后的新变动,之前版本没有.port,默认端口为 60000 -->
    <property>
        <name>hbase.master.port</name>
        <value>60000</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
    </property>
</configuration>

HBase–>HBase的MR实现

上一个案例演示了,读取HDFS文件写入到HBase,接下来我们需要升级需求
将fruit01表的数据,写入到fruit02表,即演示从Hbase读取数据

直撸代码

public class Dirver implements Tool {

    private Configuration configuration;

    @Override
    public int run(String[] args) throws Exception {
        //创建job实例
        Job job = Job.getInstance(configuration);

        //设置驱动类所在ClassPath
        job.setJarByClass(Dirver.class);

        //设置Mapper & 输出类型
        TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("fruit01"), new Scan(), MyMapper.class, ImmutableBytesWritable.class, Put.class, job);

        //设置Reducr & 输出类型
        TableMapReduceUtil.initTableReducerJob("fruit02", MyReducer.class, job);

        //无需设置FileInputFormat & FileOutputFormat

        //执行Job
        boolean result = job.waitForCompletion(true);

        return result ? 0 : 1;
    }

    @Override
    public void setConf(Configuration configuration) {
        this.configuration = configuration;
    }

    @Override
    public Configuration getConf() {
        return configuration;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        ToolRunner.run(conf, new Dirver(), args);
    }

    /**
     * Mapper:负责读取fruit01数据,写出Key:rowKey,Val:put对象代表每一个列
     * ImmutableBytesWritable:代表rowKey
     */
    public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            Cell[] rawCells = value.rawCells();
            Put put = new Put(key.get());
            for (Cell cell : rawCells) {
                put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),cell.getTimestamp(), CellUtil.cloneValue(cell));
            }
            context.write(key, put);
        }
    }

    /**
     * Reducer:负责将put写入新表
     */
    public static class MyReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
            Iterator<Put> iterator = values.iterator();
            while(iterator.hasNext()){
                Put put = iterator.next();
                context.write(NullWritable.get(),put);
            }
        }
    }
}

此处有坑:
也许你想Mapper端直接输出Put对象,然后Put对象就可以直接在Reducer阶段写出。也就是Mapper阶段可以是Key为NullWritable,那么这坑就来了!!!
这样你会发现写出的rowKey只有最后一个003,也就是原本希望3行数据,发现最终写入只有一行。
然后调试,你会感觉Mapper阶段是正常的,但是Reducer阶段确只执行一次。所以Mapper阶段Key一定要定义为ImmutableBytesWritable或者自己定义一个Text

SO:必须用rowKey作为key,且类型必须是ImmutableBytesWritable

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-26 12:08:54  更:2021-07-26 12:11:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年3日历 -2024/3/29 4:05:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码