1.Spark 和hive de 集成
1. 构建SparkSessiond对象
2. 与hive 集成的配置
3.
2.maven 的依赖环境配置
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
3. 代码集成
package cn.wudl.tags.models.rule
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object JobModel {
def main(args: Array[String]): Unit = {
// 创建sparkSession 的实例对象
val spark :SparkSession = {
// 创建sparkSession
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName.stripSuffix("$"))
// 设置shuffle 的分区数目
.set("spark.sql.shuffle.partitions", "4")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result], classOf[Put]))
// 2. 用构造者模式 构建SparkSession
val session = SparkSession.builder().config(sparkConf)
//与hive 集成
.enableHiveSupport()
// 设置与hive 集成
.config("hive.metastore.uris", "thrift://192.168.1.140:9083")
// 设置hive 的数仓目录
.config("spark.sql.warehouse.dir", "hdfs://192.168.1.140:8020/user/hive/warehouse")
.getOrCreate()
// c. 返回会话对象
session
}
import org.apache.spark.sql.functions._
import spark.implicits._
val tagTable:String =
"""
|(
|SELECT id, name, rule, level FROM profile_tags.tbl_basic_tag WHERE id = 321
|union
|SELECT id, name, rule, level FROM profile_tags.tbl_basic_tag WHERE pid = 321
|) as tag_table
|""".stripMargin
val baseTagDF = spark.read.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://192.168.1.140:3306/?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC")
.option("dbtable", tagTable)
.option("user", "root")
.option("password", "123456")
.load()
// 打印df 的数据格式
baseTagDF.printSchema();
baseTagDF.show(1000,truncate = false)
spark.stop()
}
}
4. 执行结果
21/07/08 21:58:41 INFO SessionState: Created HDFS directory: /tmp/hive/Administrator/2dc03761-0ead-4df3-a025-7548bde33ca3/_tmp_space.db
21/07/08 21:58:41 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.1) is hdfs://192.168.1.140:8020/user/hive/warehouse
21/07/08 21:58:41 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
root
|-- id: long (nullable = false)
|-- name: string (nullable = true)
|-- rule: string (nullable = true)
|-- level: integer (nullable = true)
21/07/08 21:58:42 INFO CodeGenerator: Code generated in 129.7142 ms
21/07/08 21:58:42 INFO SparkContext: Starting job: show at JobModel.scala:54
21/07/08 21:58:42 INFO DAGScheduler: Got job 0 (show at JobModel.scala:54) with 1 output partitions
21/07/08 21:58:42 INFO DAGScheduler: Final stage: ResultStage 0 (show at JobModel.scala:54)
21/07/08 21:58:42 INFO DAGScheduler: Parents of final stage: List()
21/07/08 21:58:42 INFO DAGScheduler: Missing parents: List()
21/07/08 21:58:42 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at show at JobModel.scala:54), which has no missing parents
21/07/08 21:58:43 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/07/08 21:58:43 INFO JDBCRDD: closed connection
21/07/08 21:58:43 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1475 bytes result sent to driver
21/07/08 21:58:43 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 97 ms on localhost (executor driver) (1/1)
21/07/08 21:58:43 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
21/07/08 21:58:43 INFO DAGScheduler: ResultStage 0 (show at JobModel.scala:54) finished in 0.110 s
21/07/08 21:58:43 INFO DAGScheduler: Job 0 finished: show at JobModel.scala:54, took 0.380809 s
21/07/08 21:58:43 INFO CodeGenerator: Code generated in 11.4216 ms
+---+----+-----------------------------------------------------------------------------------------------------------------------+-----+
|id |name|rule |level|
+---+----+-----------------------------------------------------------------------------------------------------------------------+-----+
|321|职业 |inType=hbase
zkHosts=192.168.1.140
zkPort=2181
hbaseTable=tbl_tag_users
family=detail
selectFieldNames=id,job|4 |
|322|学生 |1 |5 |
|323|公务员 |2 |5 |
|324|军人 |3 |5 |
|325|警察 |4 |5 |
|326|教师 |5 |5 |
|327|白领 |6 |5 |
+---+----+-----------------------------------------------------------------------------------------------------------------------+-----+
21/07/08 21:58:43 INFO SparkUI: Stopped Spark web UI at http://192.168.1.1:4040
21/07/08 21:58:43 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/08 21:58:43 INFO MemoryStore: MemoryStore cleared
21/07/08 21:58:43 INFO BlockManager: BlockManager stopped
21/07/08 21:58:43 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/08 21:58:43 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/08 21:58:43 INFO SparkContext: Successfully stopped SparkContext
21/07/08 21:58:43 INFO ShutdownHookManager: Shutdown hook called
21/07/08 21:58:43 INFO ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\spark-ecfaf63a-8cfb-4c16-a300-ae47c03ca5b7
Disconnected from the target VM, address: '127.0.0.1:50966', transport: 'socket'
|