IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark将数据加载到hbase--bulkload方式 -> 正文阅读

[大数据]spark将数据加载到hbase--bulkload方式

通过bulkload方式加载数据优点:与put方式相比
1.导入过程不占用Region资源
2.能快速导入海量的数据
3.节省内存
应该是业界将数据载入hbase常用方式之一,因此有必要学习掌握

实现步骤

步骤一 读取数据生成rdd

读入数据是面向行的表,一行有多个字段,需要转换成面向列的数据,构造keyValue对象,一定要注意key们要排序,比如user:age列要在user:gender列之前
需要设计行键保证行键唯一和避免数据都涌入一个region,如我的是按时间设计的,好几个月的数据,因此将数据按月预分区。

    val rdd = sc.textFile("file:///"+filePath)
      .flatMap(x=>getLineData(x,rowKeyBase,HBaseUtils.LOG_FIELD_NAMES))
      .sortByKey()
  //处理每一条记录生成keyvalue对象
  def getLineData(line:String,rowkey:String,fieldNames: TreeMap[String, Int]): List[(ImmutableBytesWritable, KeyValue)] ={
    val length = fieldNames.size
    val values:Array[String] = line.split("\\\t")
    if (null == values || values.length!=length) return Nil
    //println(rowkey+values(1)+Random.nextInt(100000).toString)
    val rowKey = Bytes.toBytes(rowkey+values(1)+Random.nextInt(1000).toString)
    val writable = new ImmutableBytesWritable(rowKey)
    val columnFamily = Bytes.toBytes("detail")
    fieldNames.toList.map{
      case (fieldName, fieldIndex) =>
        // KeyValue实例对象
        val keyValue = new KeyValue(
          rowKey, //
          columnFamily, //
          Bytes.toBytes(fieldName), //
          Bytes.toBytes(values(fieldIndex)) //
        )
        // 返回
        (writable, keyValue)
    }
  }

步骤二 配置输出HFile文件

输出前检查

检查HFile输出目录是否存在

    // TODO:构建Job,设置相关配置信息,主要为输出格式
    // a. 读取配置信息
    val hbaseConfig: Configuration = HBaseUtils.getHBaseConfiguration("hbase","2181")
    //  Configuration parameter hbase.mapreduce.hfileoutputformat.table.name cannot be empty
    hbaseConfig.set("hbase.mapreduce.hfileoutputformat.table.name", "log")
    // b. 如果输出目录存在,删除
    val dfs = FileSystem.get(hbaseConfig)
    val outputPath: Path = new Path("hdfs://hbase:9000/hbase/log/"+rowKeyBase)
    if (dfs.exists(outputPath)) {
      dfs.delete(outputPath, true)
    }
    dfs.close()

配置HFileOutputFormat2

    // TODO: 配置HFileOutputFormat2输出
    val conn = ConnectionFactory.createConnection(hbaseConfig)
    val htableName = TableName.valueOf("log")
    val table: Table = conn.getTable(htableName)
    HFileOutputFormat2.configureIncrementalLoad(
      Job.getInstance(hbaseConfig), //
      table, //
      conn.getRegionLocator(htableName) //
    )

输出HFile文件

    // TODO: 3. 保存数据为HFile文件//先排序
    rdd.sortBy(x=>(x._1, x._2.getKeyString), ascending = true)
      .saveAsNewAPIHadoopFile(
        "hdfs://hbase:9000/hbase/log/"+rowKeyBase,
        classOf[ImmutableBytesWritable], //
        classOf[KeyValue], //
        classOf[HFileOutputFormat2], //
        hbaseConfig)

将HFile文件bulkload到hbase表分区当中

    // TODO:4. 将输出HFile加载到HBase表中
    val load = new LoadIncrementalHFiles(hbaseConfig)
    load.doBulkLoad(outputPath, conn.getAdmin, table,
      conn.getRegionLocator(htableName))

出现的问题

写入权限
可以将HFile要输出的文件位置chmod 777 /outputDir

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-23 12:26:02  更:2021-11-23 12:27:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 15:41:36-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码