Spark SQL与Hive on Spark
Spark SQL
在Hadoop发展过程中,为了给熟悉SQL,但又不理解MapReduce的技术人员提供快速上手的工具,Hive诞生,是运行在Hadoop上的SQL-on-Hadoop工具。基于Hive后续又有一款Shark诞生,运行在Spark引擎上,但是Shark受限于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等),制约了Spark的既定方针和与Spark其他组件的相互集成,所以停止对Shark的开发。才有了SparkSQL,不再受限于Hive,只是兼容Hive。
Hive on Spark
Hive默认使用MapReduce作为执行引擎,即Hive on MR。 实际上,Hive还可以使用Tez和Spark作为其执行引擎,分别为Hive on Tez和Hive on Spark。 Hive on Spark是由Cloudera发起的开源项目,其目的是把Spark作为Hive的一个计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算。Hive On Spark比SparkSQL稍晚。
SparkContext ==> SparkSession
在RDD编程中:获取一个SparkConf,再包装到SparkContext,以SparkContext就能创建和操作RDD。 这也是Spark早期版本时以各类Context做Spark主要切入点开启使用。对于每个其他的API,我们只需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于SQL,使用SQLContext;对于Hive,使用HiveContext。 但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession。SparkSession封装了SparkConf、SparkContext和SQLContext。为了兼容,HiveContext也被保存下来。
Spark context Web UI available at http://namenode:4040
Spark context available as 'sc' .
Spark session available as 'spark'.
创建SparkSession
// 类似于创建一个了 SparkContext,master设置为local,然后创建了一个SQLContext封装它。
val spark = SparkSession.builder.master("local").appName("spark session example").getOrCreate()
// 而想创建HiveContext的话,只需要enableHiveSupport(),以使得支持Hive
val spark = SparkSession.builder.master("local").appName("spark session example").enableHiveSupport().getOrCreate()
SparkSession如何使用
使用SparkSession 能创建出RDD
val rdd = spark.sparkContext.textFile("xxx")
rdd.toDF() // rdd又能转成DF [需要导入隐式转换的包:import spark.implicits._]
使用SparkSession创建不带schema的DS (还需要自己转DF)
val ds = spark.read.textFile("")
外部数据源 (不管数据源是啥,读进来直接就是DF,带schema)
val df = spark.read.format("json").load("data/data.json").show()
Spark SQL 对接hive
配置
参考:Hive平滑过渡到Spark SQL
Hive 开启元数据
nohup hive --service metastore -p 9083 &
测试发现不需要开启metastore,毕竟spark都去Mysql获取元数据了。。
hive-site.xml
将hive/conf目录下的hive-site.xml文件拷贝到spark/conf目录下。 且添加参数“hive.metastore.schema.verification”的值为“true”,设置为true可以在进入spark客户端时不报版本不匹配错误;不添加也可以正常运行的。
mysql 驱动包
由于需要从mysql中访问hive的元数据信息,所以启动时需要指定mysql的连接jar包。
启动
spark-shell
- spark-shell 指定驱动包jar包启动。
spark-shell --master local[2] --jars ~/lib/mysql-connector-java-5.1.49.jar
- 把jar包都放到 spark/jars 中,就可以不用在命令中指定jar启动。
启动后就能访问Hive中的表。
scala> spark.sql("show tables").show()
只不过现在进入的是scala>终端,使用sql每次都需要调用一次spark.sql。
spark-sql
spark-sql --jars ~/lib/mysql-connector-java-5.1.49.jar
// 然后报错java.sql.SQLException: No suitable driver 找不到合适的驱动。
// 最后还是把驱动包放到了spark/jars 目录下,重新启动
spark-shell --master local[2]
Spark master: local[2], Application Id: local-1650643815947
spark-sql> show tables;
// 关于 spark-sql --jars 找不到合适驱动
// spark-sql --help
--jars JARS Comma-separated list of jars to include on the driver and executor classpaths.
--driver-class-path Extra class path entries to pass to the driver. Note that
jars added with --jars are automatically included in the classpath.
// 然后实际上并没有放到 classpath,需要再增加使用--driver-class-path指定jar
// spark-sql --jars ~/lib/mysql-connector-java-5.1.49.jar --driver-class-path ~/lib/mysql-connector-java-5.1.49.jar
thirftserver
spark-shell、spark-sql启动都是一个spark application(一个4040),只能在终端操作且不能共享数据。 使用Spark的thirftserver来访问hive中的数据,Thirftserver作为服务端,beeline作为客户端来访问服务端。可以让多个客户端连接到同一个Thirftserver,Thirftserver跑的是同一个application。支持多个客户端同时访问,有助于多个客户端之间数据的共享。
[liqiang@Gargantua sbin]$ pwd
/home/liqiang/app/spark/sbin
[liqiang@Gargantua sbin]$ ./start-thriftserver.sh --master local[2]
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /home/liqiang/app/spark/logs/spark-liqiang-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-Gargantua.out
[liqiang@Gargantua sbin]$
注意,与HS2一样都是默认启动在10000端口。如果HS2也在启动中,会在上述日志里报错端口占用。Thirftserver内部其实就是一个HiveServer2。 处理完端口占用重新启动,可以通过4040端口确认启动成功。
在spark/bin 目录下使用beeline做客户端连接。连接后可直接运行SQL。当然也同Hive中笔记一样,还可以使用jdbc连接。
./beeline -u jdbc:hive2://gargantua:10000 -n hadoop
Beeline version 2.3.9 by Apache Hive
0: jdbc:hive2://localhost:10000>
在sql里的cache操作不再是lazy,会立刻执行,这是与RDD的很大区别。uncache.
RDD 与 DF/DS 交互使用
在Core中的编程模型是RDD 在SQL中编程模型是 DS/DF
RDD转DF。第一种方式:rdd.toDF()
底层是通过case class反射做了自动类型推导出DF的schema。
import spark.implicits._
val spark = SparkSession.builder()
.appName("Spark SQL basic example")
.master("local[2]")
.getOrCreate()
val rdd = spark.sparkContext.textFile("") // 通过sparkSession 也能拿到RDD
val rddInfo = rdd.map(_.split(","))
.map(x = > Info(x(0).trim.toInt,x(1).trim,x(2).trim.toInt))
val df = rddInfo.toDF() // 需要导入隐式转换的包:import spark.implicits._
df 转rdd 【很少这么使用,df/ds 是更高级api,rdd 更底层】
val rdd = df.rdd
RDD转DF。第二种方式:通过自己构建schema,然后作用到rdd上
- 把RDD ==> RRD[ROW]
- StructType 、StructField
- schema 作用到rdd上
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD.map(_.split(","))
.map(attributes => Row(attributes(0).trim.toInt, attributes(1)))
// Generate the schema
val schema = StructType(
Array(
StructField("id",InteagerType,true),
StructField("name",StringType,true)
)
)
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
SparkSQL
不只是SQL,可以把SQL嵌到代码(SQL,DF and DS) 处理结构化数据: 可用一种“数据格式/表结构”来约束的的数据[schema]
DF能知道结构化数据的schema信息,比RDD能感知到更多。
工作多用:Spark SQL + 少部分RDD 面试多问:Spark Core
DS 与 DF
都是分布式数据集(Rdd : 弹性分布式数据集),Spark 1.6 以后新特性 。
DF:DataFrame DS:DataSets
Core : RDD SQL : DF/DS
DataFrame 是带列名的 DataSet ,类似Python里的DataFrame,结构类似关系型数据库的表 源码:type DataFrame = DataSet[Row]
Spark的DataFrame就是借鉴了其他语言DataFrame设计而来。 Python/R 里的DataFrame 是单机的,但是 Spark 里的 DataFrame 是分布式的,不过无感知。
与RDD区别
rdd 读进来就是个字符串,并不知道数据结构化schema信息。
从存储的角度来说,是行式存储,数据按一行在一块儿。在对数据压缩时,由于一行中每列数据类型可能不一致,压缩计算会综合考虑每一列,导致压缩比不高。
DF 列式存储,每一列数据放一块,数据类型式明确的,压缩比更高。
RDD: 相同功能使用不同语言开发的代码,执行效率不一样。Python 反而低。
DF:不同语言开发,执行效率一致。将不同语言统一转为逻辑执行计划,再运行。
快速入门
读json 数据
val spark = SparkSession.builder().master("local[2]").appName("").getOrCreate()
// 读json 数据
val df = spark.read.format("json").load("data/people.json")
val df = spark.read.json("data/people.json")
// 获取结构
df.printSchema()
// 数据输出到终端
df.show
API方式操作DF
// 因为数据是带schema的,就能支持我们对任一信息直接操作(读/写)
// 获取数据的某行 传字符串
df.select("name").show
// 传列名 (多用列名,更强大)
df.select(df("name")).show
df.select("$name").show
df.select('name).show // 只有头上只有一个单引号
// show 的底层是show(20,true)。展示出内容是被截取的,行数也是限制20条。
df.show(30,false)
df.head
df.take(1)
df.sort('name.asc)
df.orderBy('name.asc)
...
SQL方式操作DF
// 注册成临时表、临时视图
df.createOrReplaceTempView("people")
// 全局视图,注意使用时需要加前缀 [e.g. SELECT * FROM global_temp.people]
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// 接着可以写SQL来操作
spark.sql("SELECT * FROM global_temp.people").show()
SaprkSQL 读文件
读取为DS
// 读进来的数据全部都在一列,列名叫value,类型就是 DataSet[String]
val ds:DataSet[String] = spark.read.textFile("")
ds.map(x => {
val splits = x.split("\t")
})
拆分后,DS也可以转DF
ds.toDF("","","","") // 每个列名指定
拿到DF以后,就可以很方便操作。
用SQL操作DF:
// 创建成视图
df.createOrReplaceTempView("people")
// 接着可以用sql来查询求和、分组、排序
spark.sql("select * from people").show(false)
用代码AP操作DF:
import org.apache.spark.sql.functions._
// 分组、聚合、排序
df.groupBy("xx").agg(sum("xxx")).sort('age.desc)
// 如果用到函数(sum),需要导包:import org.apache.spark.sql.functions._
基础平台的开发,更多是使用API开发而不用SQL。API开发可以更方便提供在页面上按需选择决定是否分组?是否排序?..
快速创建DF
val df = spark.createDataFrame(List(("pk",20),("ruoze",30)))
// 可以对列名重命名等...
操作Hive表元数据
Spark2.0后提供的catalog。 在此前,需要通过jdbc查MySQL中的hive元数据。这些元数据在mysql又是以各个表组织起来,需要开发人员熟悉这些表间关系,对表做关联查询。
catalog 方便访问元数据信息,获取DBname、Table、Function、colunm
val catalog = spark.catalog
开启Hive支持
pom.xml引入依赖;把hive-site.xml拷到项目
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.tools.version}</artifactId>
</dependency>
启用 enableHiveSupport
val spark = SparkSession.builder()
.appName("Spark SQL basic example")
.master("local[2]")
.enableHiveSupport()
.getOrCreate()
获取元数据
val catalog = spark.catalog
catalog.listDatabases().show()
catalog.listTables("pk_hive").show()
catalog.listFunctions().show()
catalog.listColums().show()
外部数据源 External DataSource
以前传统对离线/批处理,需要考虑各种兼容。
input: text? json? parquet..
transform
output: text? json? parquet? jdbc hbase...
如果对json处理,涉及Gson、fastjson,怎么选?兼容? 对文件格式,有几个字段?是否为空?、 数据类型转换,失败、保留?丢弃?都是很麻烦的事情。
但是spark sql 通过外部数据源 External DataSource API(ExtDS) 可以不用关心数据来源,格式,何处…都能处理好,成为DF/DS,再统一的写出去。
json格式为例
(注释掉 hive-site.xml 接下来测试不用连接hive)
读取为DF
可以理解为ExtDS 在input阶段指定不同的文件格式就组织好了schema。直接得到DF,接下来就要么SQL,要么API处理
import spark.implicits._
val df = spark.read.json("data/data.json") // 典型的外部数据源 简写
val df = spark.read.format("json").load("data/data.json")// 标准写法
val df = spark.read.format("json").opton("path","data/data.json").load()
结果来的数据会自动推测类型,自动组织结构 。
将DF写出去
df.select('name,'age).where("user = 'zhangsan' ").write.format("json").mode(SaveMode.Overwrite).save
注意细节事项: 如果文件存在会报错,覆盖(/追加、…):.mode(SaveMode.Overwrite) where用列名做条件需要三个等号 .where('user === “zhangsan”)
其他格式数据源 (参考官网:Data Sources)
jdbc
从jdbc读取
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://Gargantua:9965")
.option("dbtable", "jepsondb.emp")
.option("user", "armsdata")
.option("password", "123456")
.load()
写回jdbc
df.write
.format("jdbc")
.option("url", "jdbc:mysql://Gargantua:9965")
.option("dbtable", "jepsondb.emp")
.option("user", "armsdata")
.option("password", "123456")
.save()
对于url/driver这些应该写在配置文件,再从文件获取。 引入依赖
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.4.0</version>
</dependency>
新建一个文件(文件名就叫这个):application.conf
db.driver="com.mysql.jdbc.Driver"
db.url = "jdbc:mysql://Gargantua:9965"
db.dbname="jepsondb"
db.user = "armsdata"
db.password = "123456"
db.source.table="emp"
db.target.tabe="dept"
如何读取到代码
val conf = ConfigFactory.load()
val driver = conf.get("db.driver")
val url = conf.get("db.url")
...
文本
读
val df = spark.read.format("text").load(path)
或简写
val df = spark.read.text(path)
// 注意简写时不要和 DS 的textFile() 混淆
val ds = spark.read.textFile(path)
写 (写到文本格式只支持一个列,且只支持字符串类型。所有内容要拼字符串)
使用concat_ws("\t",'name,'age)拼接
df.select(concat_ws("\t",'name,'age))
如果需要压缩后写到文本
df.option("compression","gzip") // spark内置的压缩格式只有:bzip2 deflate lz4 gzip snappy none uncompression. 使用lzo需要自己依赖
parquet (树状的)
val df = spark.read.format("parquet").load(path)
// 读数据F时,不指定format(),默认是以parquet格式。读parquet时format也可以简写掉
val df = spark.read.load(path)
读进DF的数据,可以转其他格式再写。在format()中指定就行
CSV
默认分隔符逗号,可用Excel解析。
val df = spark.read.csv(path) // 简写
val df = spark.read.format("csv")
.option("sep",";") // 指定分隔符
.option("header","false") // 是否带列头
.option("inferSchema", "true") // 数据类型自动推导
// 或自己指定类型
.schema(structType) // structType需要自己创建
.load(path)
// Generate the structType
val structType = StructType(
Array(
StructField("id",InteagerType,true),
StructField("name",StringType,true)
)
)
所有关于CSV会用到的这些option,都可看源码 CSVOptions.scala,同理查看json、parquet。
https://spark-packages.org/ 第三方数据源开源库 protof、Es(Spark已经支持)、…
ExtDS 使用SQL
在上述所有外部数据源 ExtDS操作时,都是用scala完成。Spark SQL 当然可以用SQL实现。
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS ( // 所有option的内容都写到 OPTIONS
url "jdbc:mysql://Gargantua:9965",
dbtable "jepsondb",
user 'username',
password 'password'
)
SELECT * FROM resultTable
|