1、介绍
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。使用Spark SQL有两种方式,包括SQL语句以及Dataset API。Spark SQL的一个主要的功能就是执行SQL查询语句。Spark SQL也可以用来从Hive中查询数据。当我们使用某种编程语言开发的Spark作业来执行SQL时,返回的结果是Dataframe/Dataset类型的。当然,我们也可以通过Spark SQL的shell命令行工具,或者是JDBC/ODBC接口来访问。 Hive是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!
1.1 Hive和Shark
Spark SQL的前身是Shark,给熟悉RDBMS但又不理解mapreduce的技术人员提供快速上手的工具,Hive应运而生,它是当时唯一运行在hadoop上的SQL on hadoop工具。但是mapreduce计算过程中大量的中间磁盘落地过程消耗了大量的网络IO,降低了运行效率,为了提高SQL on hadoop的效率,大量的SQL on hadoop 工具开始产生,其中表现较为突出的是:mapR的Drill Cloudera的impala , shark。其中shark是伯克利实验室spark生态环境的组件之一,它修改了下图所示的右下角的内存管理,物理计划,执行三个模块,并使之能运行在spark引擎上,从而使得SQL查询的速度得到10-100倍的提升。
1.2 Shark和Spark SQL
随着Spark的发展,对于野心勃勃的Spark团队来说,shark对于hive的太多依赖(如采用hive的语法解析器,查询优化器等等),制约了spark的one stack rule them All的方针,制约了spark各个组件的相互集成,所以提出了Spark SQL项目。 Spark SQL抛弃原有shark的代码,汲取了shark的一些优点,如内存列存储,hive兼容性等,重新开发了Spark SQL代码;由于摆脱了对hive的依赖性,Spark SQL无论在数据兼容,性能优化,组建扩展方面都得到了极大的方便,真可谓“退一步,海阔天空”;
- 数据兼容方面:不但兼容hive,还可以从RDD,parquet文件,JSON文件中获取数据,未来版本甚至支持获取RDBMS数据以及cassandra等NOSQL数据;
- 性能优化方面:除了采取in memory columnar storage , byte code generation(内存列存储,生成字节码)等优化技术外,将会引进cost model 对查询进行动态评估,获取最佳物理计划等等;
- 组件扩展方面:无论是SQL的语法解析器,分析器,还是优化器,都可以重新定义,进行扩展。
2014年6月1日shark项目和Spark SQL项目的主持人reynold xin宣布:停止对shark的开发,团队将所有资源放到Spark SQL项目上,至此,shark的发展画上了句号,但也因此发展出两个直线:Spark SQL 和 hive on spark; 其中Spark SQL作为spark生态的一员继续发展,而不再受限于hive,只是兼容hive;而hive on spark是一个hive的发展计划,该计划将spark作为hive的底层引擎之一,也就是说,hive将不再受限于一个引擎,可以采用mapreduce,tez,spark等引擎;
1.3 Spark SQL on Hive:
Spark 使用 sql,操作hive中的数据,hive起到存储的作用; Hive操作,在Spark 2.0中,是支持读写hive中存储的数据的。但是,因为hive有较多的依赖,所以默认情况下,这些依赖没有包含在spark的发布包中。如果hive依赖可以在classpath路径中,那么spark会自动加载这些依赖。这些hive依赖必须在所有的worker node上都放一份,因为worker node上运行的作业都需要使用hive依赖的序列化与反序列化包来访问hive中的数据。 只要将hive-site.xml、hdfs-site.xml和core-site.xml都放入spark/conf目录下即可。 如果要操作Hive,那么构建SparkSession的时候,就必须启用Hive支持,包括连接到hive的元数据库,支持使用hive序列化与反序列化包,以及支持hive udf函数。 如果我们没有安装hive,也是可以启用hive支持的。如果我们没有放置hive-site.xml到spark/conf目录下,SparkSession就会自动在当前目录创建元数据库,同时创建一个spark.sql.warehouse.dir参数设置的目录,该参数的值默认是当前目录下的spark-warehouse目录。 在spark 2.0中,hive.metastore.warehouse.dir属性已经过时了,现在使用 spark.sql.warehouse.dir属性来指定hive元数据库的位置。
1.4 Hive on Spark:
Hive 起到存储和一部分计算的作用;(类似shark)
1.5 Spark SQL性能提升:
主要Spark SQL在几方面做了优化:
- 内存列存储:
Spark SQL的表数据在内存中存储不是采用原生态的JVM对象存储方式,而是采用了内存列存储,该存储方式无论在空间占用量,还是读取吞吐量,都占有很大优势; - 字节码生成技术(CG):
CG优化的实现主要依靠scala2.X的运行时反射机制; - Scala代码优化:
Spark SQL在使用Scala编写代码时,尽量避免低效的,容易GC的代码;尽管增加了编写代码的难度,但对于用户来说,还是使用统一的接口,并没有受到影响; - Predicate Pushdown (预言下推):
Spark SQL对sql语句的优化; 举例:select table1.name,table2.score from table1 join table2 on (table1.id = table2.id) where table1.age > 50 and table2.score > 90; 说明:按照sql的执行流程,先聚合,后过滤;按照预言下推优化,先过滤,再聚合;
2、特点
2.1 易整合
无缝地混合SQL查询与Spark程序。Spark SQL允许您使用SQL或熟悉的DataFrame API在Spark程序中查询结构化数据。可用于Java、Scala、Python和R。
2.2 统一的数据访问方式
以相同的方式连接到任何数据源。DataFrames和SQL提供了访问各种数据源的通用方法,包括Hive、Avro、Parquet、ORC、JSON和JDBC。您甚至可以通过这些源连接数据;
2.3 兼容Hive
在现有仓库上运行SQL或HiveQL查询。Spark SQL支持HiveQL语法以及Hive SerDes和udf,允许您访问现有的Hive仓库。
2.4 标准的数据兼容
通过JDBC或ODBC连接。服务器模式为业务智能工具提供行业标准的JDBC和ODBC连接。
3、DataFrame、Dataset、RDD之间的关系
在Spark SQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。从版本的产生上来看: RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6) RDD、DataFrame、Dataset全都是Spark平台下的分布式弹性数据集,如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。
3.1 DataFrame
Dataframe就是按列组织的Dataset。在逻辑概念上,可以大概认为Dataframe等同于关系型数据库中的表,或者是Python/R语言中的data frame,但是在底层做了大量的优化。Dataframe可以通过很多方式来构造:比如结构化的数据文件,Hive表,数据库,已有的RDD。Scala,Java,Python,R等语言都支持Dataframe。在Scala API中,Dataframe就是Dataset[Row]的类型别名。在Java中,需要使用Dataset来代表一个Dataframe。 与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验。 DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。 DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待。DataFrame也是懒执行的。 性能上比RDD要高,主要有两方面原因:
- 定制化内存管理
数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制。 - 优化的执行计划
查询计划通过Spark catalyst optimiser进行优化。
Dataframe的劣势在于在编译期缺少类型安全检查,导致运行时出错. Dataframe:untyped(无类型的)操作,有了SparkSession之后,就可以通过已有的RDD,Hive表,或者其他数据源来创建Dataframe,比如说通过json文件来创建。Dataframe提供了一种domain-specific language来进行结构化数据的操作,这种操作也被称之为untyped操作,与之相反的是基于强类型的typed操作。
3.2 Dataset
- 是DataFrame API的一个扩展,是Spark最新的数据抽象
- 用户友好的API风格,既具有类型安全检查也具有DataFrame的查询优化特性。
- Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
- 样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到Dataset中的字段名称。
- DataFrame是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as方法将DataFrame转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。
- Dataset是强类型的。比如可以有Dataset[Car],Dataset[Person]。
DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而Dataset不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。 Dataset是一个分布式的数据集。Dataset是Spark 1.6开始新引入的一个接口,它结合了RDD API的很多优点(包括强类型,支持Lambda表达式等),以及Spark SQL的优点(优化后的执行引擎)。Dataset可以通过JVM对象来构造,然后通过transformation类算子(map,flatMap,filter等)来进行操作。Scala和Java的API中支持Dataset,但是Python不支持Dataset API。不过因为Python语言本身的天然动态特性,Dataset API的不少feature本身就已经具备了(比如可以通过row.columnName来直接获取某一行的某个字段)。R语言的情况跟Python也很类似。 Dataset:typed(强类型)操作,Dataset与RDD比较类似,但是非常重要的一点不同是,RDD的序列化机制是基于Java序列化机制或者是Kryo的,而Dataset的序列化机制基于一种特殊的Encoder,来将对象进行高效序列化,以进行高性能处理或者是通过网络进行传输。Dataset除了Encoder,也同时支持Java序列化机制,但是Encoder的特点在于动态的代码生成,同时提供一种特殊的数据格式,来让Spark不将对象进行反序列化,即可直接基于二进制数据执行一些常见的操作,比如filter、sort、hash等。
3.3 三者的共性
- RDD、DataFrame、Dataset全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
- 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过;
- 三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出;
- 三者都有partition的概念;
- 三者有许多共同的函数,如filter,排序等;
- 在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持;
- DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型。
3.4 三者的区别:
RDD:
- RDD一般和spark mlib同时使用
- RDD不支持Spark SQL操作
DataFrame:
- 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值。
- DataFrame与Dataset一般不与Spark mlib同时使用
- DataFrame与Dataset均支持Spark SQL的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作
- DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定。
Dataset: Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。 DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段 而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息。 DataFrame多了数据的结构信息,即schema。RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。这样用户只需要指定自己的操作逻辑,DataFrame的优化器会帮助用户选择一条效率最优的执行路径。同时Tungsten优化使得DataFrame的存储和计算效率比RDD高很多。Spark的机器学习项目MLlib的ML pipeline就是完全基于DataFrame的,而且未来Streaming也会以DataFrame为核心。 DataSet以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。 DataSet创立需要一个显式的Encoder,把对象序列化为二进制,可以把对象的scheme映射为Spark SQL类型,然而RDD依赖于运行时反射机制。 Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。这个强类型的值是以编码的二进制形式被存储的,这种存储格式可以不用反序列化就直接可以被上面的算子(例如sort,Shuffle等)操作。所以在创建Dataset的时候需要指定用于这个编码工作的Encoder。因此具有如下三个特点:
- DataSet可以在编译时检查类型并且是面向对象的编程接口
- DataFrame会继承DataSet,DataFrame是面向Spark SQL的接口.
- DataFrame和DataSet可以相互转化, df.as[ElementType] 这样可以把DataFrame转化为DataSet, ds.toDF() 这样可以把DataSet转化为DataFrame。
从RDD发展到DataFrame、 Dataset的背后深层次原因: 因为spark的瓶颈在于内存和CPU,DataFrame的出现优化了算子的查询计划,同时依靠Tungsten计划逐渐摆脱对于JVM的依赖。
4、Spark SQL程序执行的入口
SQLContext: 要使用Spark SQL,首先就得创建一个SQLContext对象,或者是它的子类的对象,比如HiveContext的对象。
Java版本:
JavaSparkContext sc = ...;
SQLContext sqlContext = new SQLContext(sc);
Scala版本:
val sc: SparkContext = ...
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
SqlContext与HiveContext的区别: SqlContext现在只支持SQL语法解析器(SQL-92标准); HiveContext现在既支持SQL语法解析器也支持HiveSQL语法解析器,默认是HiveSQL语法解析器,用户可以通过配置切换SQL语法解析器,来运行HiveContext不支持的语法。
Spark SQL程序执行入口分类: 1、SQLContext:只支持解析SQL语法; 2、HiveContext:是SQLContext的一个子类,既支持SQL语法,也支持HQL语法,默认是HiveSQL语法解析器,用户可以通过配置切换SQL语法解析器,来运行HiveContext不支持的语法; 3、SparkSession:是Spark2.0之后提供的全新的Spark SQL入口,相当于是SQLContext和HiveContext结合体,而且在SparkSession中也封装了一个SparkContext。
新的Spark SQL入口SparkSession SparkSession:新的入口。从Spark 2.0开始,一个最大的改变就是,Spark SQL的统一入口就是SparkSession,SQLContext和HiveContext未来会被淘汰。可以通过SparkSession.builder()来创建一个SparkSession,如下代码所示。SparkSession内置就支持Hive,包括使用HiveSQL语句查询Hive中的数据,使用Hive的UDF函数,以及从Hive表中读取数据等。 在老的版本中,Spark SQL提供两种SQL查询起始点,一个叫SQLContext,用于Spark自己提供的SQL查询,一个叫HiveContext,用于连接Hive的查询。 SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
import spark.implicits._
SparkSession.builder:用于创建一个SparkSession。 import spark.implicits._ : 的引入是用于将DataFrames隐式转换成RDD,使df能够使用RDD中的方法。
如果需要Hive支持,则需要以下创建语句:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
5、Spark SQL支持的数据源
- 内存中集合或者是RDD
- 外部存储结构化文件(csv,txt,json,parquet、…)(可以存储在本地,也可以存储在HDFS)
- 关系型数据库(MySQL,…)
- 支持Hive连接
6、RDD、DataFrame、Dataset三者之间的转换
RDD 转为DataFrame toDF RDD 转为Dataset toDS DataFrame转为RDD dataFrame.rdd Dataset转为RDD dataset.rdd DataFrame转为Dataset dataFrame.as[数据类型] Dataset转为DataFrame dataset.toDF
|