IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SparkSession和Hbase交互 -> 正文阅读

[大数据]SparkSession和Hbase交互

1、利用SparkSession从Hbase中读取数据,转换为DataFrame

package com.yyds.tags.tools


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
 * 从Hbase读取数据,向Hbase写入数据
 */
object HbaseTools {

  /**
   * 利用SparkSession从Hbase中读取数据,转换为DataFrame
   * @return
   */
  def read(spark:SparkSession , zkHosts:String, zkPort:String,table:String,family:String,fields:Seq[String]): DataFrame = {

    val sc: SparkContext = spark.sparkContext

    // 读取数据
    /*def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
               conf: Configuration = hadoopConfiguration,
              fClass: Class[F], kClass: Class[K], vClass: Class[V]
        ): RDD[(K, V)]
   */
    // 1. 读取配置信息,加载HBaseClient配置(主要ZK地址和端口号)
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", zkHosts)
    conf.set("hbase.zookeeper.property.clientPort", zkPort)
    conf.set("zookeeper.znode.parent", "/hbase")

    // 2. 设置表的名称
    conf.set(TableInputFormat.INPUT_TABLE, table)

    //设置读取列簇和列名称
    val scan: Scan = new Scan()
    scan.addFamily(Bytes.toBytes(family))
    fields.foreach{
      field => {
        scan.addColumn(Bytes.toBytes(family),Bytes.toBytes(field))
      }
    }

    conf.set(TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray) )


    // 3. 从HBase表加载数据
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] =
      sc.newAPIHadoopRDD(conf,
        classOf[TableInputFormat],
        classOf[ImmutableBytesWritable],
        classOf[Result])

    // 将RDD转换为Schema
    // DataFrame = RDD[ROW] + Schema
    val rowsRDD: RDD[Row] = hbaseRDD.map{
      case (_,result) =>
         // 基于列名称获取对应的值
        val values: Seq[String] = fields.map{
          field =>
            val value: Array[Byte] = result.getValue(Bytes.toBytes(family),Bytes.toBytes(field))
            // 转换为字符串
            Bytes.toString(value)
        }
        // 将序列转换为Row对象
        Row.fromSeq(values)
    }
    // 自定义schema
    val schema: StructType = StructType(
      fields.map{
        field =>
          StructField(field,StringType,nullable = true)
      }
    )

    spark.createDataFrame(rowsRDD,schema)
  }
}

2、利用SparkSession把DataFrame存入到Hbase中

package com.yyds.tags.tools


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
 * 从Hbase读取数据,向Hbase写入数据
 */
object HbaseTools {
  /**
   * 向Hbase中写入数据
   * @return
   */
  def write(dataFrame: DataFrame, zkHosts:String, zkPort:String,table:String,family:String,rowKeyColumn:String): Unit = {

   // 1. 设置HBase中Zookeeper集群信息
   val conf: Configuration = new Configuration()
    conf.set("hbase.zookeeper.quorum", zkHosts)
    conf.set("hbase.zookeeper.property.clientPort", zkPort)
    // 2. 设置读HBase表的名称
    conf.set(TableOutputFormat.OUTPUT_TABLE, table)
    // 3. 数据转换
    val columns: Array[String] = dataFrame.columns
    val putsRDD: RDD[(ImmutableBytesWritable, Put)] =
      dataFrame.rdd.map { row =>
        // 获取RowKey
        val rowKey: String = row.getAs[String](rowKeyColumn)
        // 构建Put对象
        val put = new Put(Bytes.toBytes(rowKey))
        // 将每列数据加入Put对象中
        val familyBytes = Bytes.toBytes(family)
        columns.foreach { column =>
          put.addColumn(
            familyBytes, //
            Bytes.toBytes(column), //
            Bytes.toBytes(row.getAs[String](column)) //
          )
        }
        // 返回二元组
        (new ImmutableBytesWritable(put.getRow), put)
      }
    // 4. 保存数据到表
    putsRDD.saveAsNewAPIHadoopFile(
      s"/apps/hbase/$table-" + System.currentTimeMillis(),
      classOf[ImmutableBytesWritable], //
      classOf[Put], //
      classOf[TableOutputFormat[ImmutableBytesWritable]], //
      conf //
    )
  }

}

3、测试

package com.yyds.tags.hbase.tools

import com.yyds.tags.tools.HbaseTools
import org.apache.spark.sql.{DataFrame, SparkSession}

object HBaseToolsTest {

  def main(args: Array[String]): Unit = {


    val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[4]")
      .config("spark.serializer",
        "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    import spark.implicits._

    val df: DataFrame = HbaseTools.read(
      spark, "192.168.42.7", "2181",
      "tbl_users", "detail", Seq("id", "gender")
    )
    println(s"count = ${df.count()}")
    df.printSchema()
    df.show(100, truncate = false)


    HbaseTools.write(
      df, "192.168.42.7", "2181",
      "tbl_users_test", "info", "id"
    )

    spark.stop()

  }

}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-12 17:36:19  更:2022-03-12 17:40:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 17:45:36-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码