https://sparkbyexamples.com/spark/spark-read-write-using-hbase-spark-connector
示例来源如上,直接说问题:
// Reading from HBase to DataFrame
val hbaseDF = spark.read
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
这里的format不正确,即使是上文中的github链接的代码也不能直接运行。
第二点:HBase配置信息未加载,如何确定spark连接哪里的HBase
Exception in thread "main" java.lang.NullPointerException
at org.apache.hadoop.hbase.spark.HBaseRelation.<init>(DefaultSource.scala:139)
at org.apache.hadoop.hbase.spark.DefaultSource.createRelation(DefaultSource.scala:79)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at com.sdmctech.offline.application.simple.TransformAssetId2ID$$anonfun$main$1.apply$mcV$sp(TransformAssetId2ID.scala:47)
at com.sdmctech.offline.common.TApplication$class.start(TApplication.scala:26)
at com.sdmctech.offline.application.simple.TransformAssetId2ID$.start(TransformAssetId2ID.scala:16)
at com.sdmctech.offline.application.simple.TransformAssetId2ID$.main(TransformAssetId2ID.scala:19)
at com.sdmctech.offline.application.simple.TransformAssetId2ID.main(TransformAssetId2ID.scala)
Process finished with exit code 1
报错如上。
综上,正确示例应该为
val hbaseConf = HBaseConfiguration.create()
new HBaseContext(spark.sparkContext,hbaseConf)
allItemFeature.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "3",
"zkUrl" ->CommonName.zkUrl))
.format("org.apache.hadoop.hbase.spark")
.save()
|