?? spark 目前较为基础且常用的场景应该就是对 hive 表进行读写操作 ,尤其通过使用spark sql 实现数据分析、关联等操作 ??常用的方式是直接采用spark on hive的方式,在创建SparkSession时开启enableHiveSupport。连接上hive之后,可以利用Spark sql对hive表进行读、写、join等操作,官方也推荐Spark sql模式,因为其支持对dataframe(Dataset的特列,DataFrame=Dataset[Row] )进行操作,很多数据分析人员习惯使用python,而python没有dataset,而且sql方式对数据进行批处理方式更为直观。
1.环境准备
?? 这里就不进行赘述,简单说明下:
- 1)将hive-site.xml拷贝到项目代码的resources目录下
- 2)在hive库中新建测试表;提前准好测试数据,通过load方式将文本数据加载到hive表
加载服务器数据文件sql语句,具体看下面sql 【补充】文件路径有:服务器本地文件、hdfs文件;加载方式:overwrite into(覆盖写)、into(追加写)
load data local inpath '/home/test_a.txt' overwrite into table zero_test_a partition(partition_name='20220518');
load data inpath '/user/hdfs/test_b.txt' into table zero_test_a partition(partition_name='20220518');
2.实操代码
2.1 spark连接hive
连接hive时候注意开启 enableHiveSupport() 配置,另外本地测试需要加上 master(“local[*]”) 或者 master(“local[]”)
val spark = SparkSession
.builder()
.master("local[*]")
.appName("spark_opts_hive")
.enableHiveSupport()
.config("hive.metastore.uris","thrift://xxx.xxx.xxx.xxx:9083")
.config("hive.metastore.warehouse.dir","/user/hive/warehouse")
.config("username","zero")
.config("password","zero")
.getOrCreate()
2.2 操作hive表
完成连接之后,可以通过spark.sql()方式实现读取、关联等操作,具体请看代码示例。
package com.zero.scala.sparkSql
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ArrayBuffer
object SparkHiveOps {
var sparkSession:SparkSession = _
private def create_sparksession():SparkSession = {
val hiveMetaUris = "thrift://xxx.xxx.xxx.xxx:9083"
SparkSession.builder().master("local[*]")
.appName("createSparkSession")
.enableHiveSupport()
.config("hive.metastore.warehouse.dir","/user/hive/warehouse")
.config("hive.metastore.uris","hiveMetaUris")
.getOrCreate()
}
private def init() :Unit = {
sparkSession = create_sparksession()
sparkSession.sql("use zeroDb")
}
private def destroy_sparkSession(ss:SparkSession) : Unit ={
if(ss != null) ss.close()
}
private def joinOps():DataFrame = {
sparkSession.sql("select a.c1,a.c2,b.c2 from zero_test_a a left join zero_test_b b on a.c1=b.c1 where b.c2='xxx'")
}
def main(args: Array[String]): Unit = {
init()
joinOps().show(5)
destroy_sparkSession(sparkSession)
}
}
|