IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 使用MapReduce程序实现从hbase读写数据输出到hdfs分布式文件系统中 -> 正文阅读

[大数据]使用MapReduce程序实现从hbase读写数据输出到hdfs分布式文件系统中

将hbase中的数据迁移到hdfs分布式文件系统中

package com.briup.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;


/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/8/11 9:45
 */
public class HbaseToHdfs extends Configured implements Tool {

    public static class HbaseToHdfsMapper extends TableMapper<Text, NullWritable> {
        /**
         * @param key     hbase中的行键 rowkey
         * @param value   hbase中的整行数据
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

            StringBuilder sb = new StringBuilder();

            //行键
            String rk = new String(key.get());

            sb.append("rk:" + rk).append(",");

            //拿到hbase中的整行数据
            //map NavigableMap<列族,NavigableMap<表名,NavigableMap<时间版本,值>
            NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = value.getMap();

            for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f : map.entrySet()) {
                //列族
                sb.append("cf:" + Bytes.toString(f.getKey())).append(",");

                //表名
                for (Map.Entry<byte[], NavigableMap<Long, byte[]>> q : f.getValue().entrySet()) {
                    sb.append("qv:" + Bytes.toString(q.getKey())).append(",");

                    //版本
                    for (Map.Entry<Long, byte[]> val : q.getValue().entrySet()) {
                        sb.append("VERSION:" + val.getKey()).append(",").append("value:" + Bytes.toString(val.getValue()));
                    }
                }
            }
            context.write(new Text(sb.toString()), NullWritable.get());
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        String output = conf.get("output");
        conf.set("hbase.zookeeper.quorum", "192.168.10.129:2181");

        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName("hbase2hdfs");

        //使用TableMapReduceUtil#initTableMapperJob方法设置map和reducer类,指定表,Scan扫描器,Mapper类,Mapper输出的键和值类型
        TableMapReduceUtil.initTableMapperJob("bd2101:emp", new Scan(), HbaseToHdfsMapper.class, Text.class, NullWritable.class, job);

        job.setNumReduceTasks(0);

        TextOutputFormat.setOutputPath(job, new Path(output));
        return job.waitForCompletion(true) ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(new ToolRunner().run(new HbaseToHdfs(), args));
    }
}

将程序打成jar包发送发送到集群上运行
yarn jar BD2101-1.0-SNAPSHOT-jar-with-dependencies.jar com.briup.hbase.HbaseToHdfs -D output=/user/zhudz

将hdfs中文件的数据迁移到hbase表中存储

准备一个person.txt文件,并将其上传至hdfs文件系统中/user/zhudz下

vi persion.txt

2000,tom,male
3000,jake,female
4000,briup,male

package com.briup.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/8/11 10:23
 */
public class HdfsToHbase extends Configured implements Tool {


    public static class HdfsToHbaseMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

        /**
         * @param key     map输入键的为行偏移量
         * @param value   map输入值为每一行的内容
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }

    public static class HdfsToHbaseReducer extends TableReducer<Text, NullWritable, NullWritable> {

        /**
         * @param key     map输出的键为每一行的内容
         * @param values  map输出的值为NullWritable
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            String[] line = key.toString().split(",");

            //指定行键
            Put put = new Put(Bytes.toBytes(line[0]));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes(line[1]));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("gender"), Bytes.toBytes(line[2]));

            context.write(NullWritable.get(), put);
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        conf.set("hbase.zookeeper.quorum", "106.14.59.12:2181");

        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName("hdfs2hbase");

        job.setMapperClass(HdfsToHbaseMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        TableMapReduceUtil.initTableReducerJob("briup:emp", HdfsToHbaseReducer.class, job);

        TextInputFormat.addInputPath(job, new Path("/user/zhudz/person.txt"));

        return job.waitForCompletion(true) ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(new ToolRunner().run(new HdfsToHbase(), args));
    }
}

将程序打成jar包发送发送到集群上运行
yarn jar BD2101-1.0-SNAPSHOT-jar-with-dependencies.jar com.briup.hbase.HdfsToHbase

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-13 12:07:06  更:2021-08-13 12:07:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:52:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码