1、利用SparkSession从Hbase中读取数据,转换为DataFrame
package com.yyds.tags.tools
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object HbaseTools {
def read(spark:SparkSession , zkHosts:String, zkPort:String,table:String,family:String,fields:Seq[String]): DataFrame = {
val sc: SparkContext = spark.sparkContext
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", zkHosts)
conf.set("hbase.zookeeper.property.clientPort", zkPort)
conf.set("zookeeper.znode.parent", "/hbase")
conf.set(TableInputFormat.INPUT_TABLE, table)
val scan: Scan = new Scan()
scan.addFamily(Bytes.toBytes(family))
fields.foreach{
field => {
scan.addColumn(Bytes.toBytes(family),Bytes.toBytes(field))
}
}
conf.set(TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray) )
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] =
sc.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
val rowsRDD: RDD[Row] = hbaseRDD.map{
case (_,result) =>
val values: Seq[String] = fields.map{
field =>
val value: Array[Byte] = result.getValue(Bytes.toBytes(family),Bytes.toBytes(field))
Bytes.toString(value)
}
Row.fromSeq(values)
}
val schema: StructType = StructType(
fields.map{
field =>
StructField(field,StringType,nullable = true)
}
)
spark.createDataFrame(rowsRDD,schema)
}
}
2、利用SparkSession把DataFrame存入到Hbase中
package com.yyds.tags.tools
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object HbaseTools {
def write(dataFrame: DataFrame, zkHosts:String, zkPort:String,table:String,family:String,rowKeyColumn:String): Unit = {
val conf: Configuration = new Configuration()
conf.set("hbase.zookeeper.quorum", zkHosts)
conf.set("hbase.zookeeper.property.clientPort", zkPort)
conf.set(TableOutputFormat.OUTPUT_TABLE, table)
val columns: Array[String] = dataFrame.columns
val putsRDD: RDD[(ImmutableBytesWritable, Put)] =
dataFrame.rdd.map { row =>
val rowKey: String = row.getAs[String](rowKeyColumn)
val put = new Put(Bytes.toBytes(rowKey))
val familyBytes = Bytes.toBytes(family)
columns.foreach { column =>
put.addColumn(
familyBytes,
Bytes.toBytes(column),
Bytes.toBytes(row.getAs[String](column))
)
}
(new ImmutableBytesWritable(put.getRow), put)
}
putsRDD.saveAsNewAPIHadoopFile(
s"/apps/hbase/$table-" + System.currentTimeMillis(),
classOf[ImmutableBytesWritable],
classOf[Put],
classOf[TableOutputFormat[ImmutableBytesWritable]],
conf
)
}
}
3、测试
package com.yyds.tags.hbase.tools
import com.yyds.tags.tools.HbaseTools
import org.apache.spark.sql.{DataFrame, SparkSession}
object HBaseToolsTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[4]")
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
import spark.implicits._
val df: DataFrame = HbaseTools.read(
spark, "192.168.42.7", "2181",
"tbl_users", "detail", Seq("id", "gender")
)
println(s"count = ${df.count()}")
df.printSchema()
df.show(100, truncate = false)
HbaseTools.write(
df, "192.168.42.7", "2181",
"tbl_users_test", "info", "id"
)
spark.stop()
}
}
|