一.概述
1 spark历史
前身: shark (即Hive on Spark)
hive 进程维护 , shark 线程维护
新入口:SparkSession
RDD----->DataFrame------->Dataset
基本数据类型:Row,schema,StructType,StructField
支持: scala,Java python,R
shark:
执行计划优化完全依赖于Hive,不方便添加新的优化策略;
Spark是线程级并行,而MapReduce是进程级并行。
Spark在兼容Hive的实现上存在线程安全问题,导致Shark
不得不使用另外一套独立维护的打了补丁的Hive源码分支;
Spark SQL:
作为Spark生态的一员继续发展,而不再受限于Hive,
只是兼容Hive;Hive on Spark作为Hive的底层引擎之一
Hive可以采用Map-Reduce、Tez、Spark等引擎
2 Spark-SQL 概述 2.1 特点 ? ? ? 1.? 数据兼容:不仅兼容Hive,还可以从RDD、parquet文件、Json文件获取数据、支持从RDBMS获取数据 ? ? ? ? 2.性能优化:采用内存列式存储、自定义序列化器等方式提升性能; ? ? ? ?3. 组件扩展:SQL的语法解析器、分析器、优化器都可以重新定义和扩展 ? ? ? ?4. 兼容: Hive兼容层面仅依赖HiveQL解析、Hive元数据。 从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了,Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责 ? ? ? ?5. 支持: 数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据; Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范;
2.2 作用
- Spark 中用于处理结构化数据的模块;
- 相对于RDD的API来说,提供更多结构化数据信息和计算方法
- 可以通过SQL或DataSet API方式同Spark SQL进行交互,
2.3 Spark SQL架构图
?3 Dataset演进历史
RDD------------------->DataFrame-------------------->Dataset
0.0? ? ? ? ? ? ? ? ? ? ? ? ? ? ?1.3? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 1.6
?
3.1 RDD
3.1.1 优点 编译时类型安全,编译时就能检查出类型错误; 面向对象的编程风格,直接通过class.name的方式来操作数据; idAge.filter(.age > “”) // 编译时报错, int不能跟与String比较 idAgeRDDPerson.filter(.age > 25) // 直接操作一个个的person对象
3.1.2 缺点
序列化和反序列化的性能开销,无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化反序列化; GC的性能开销,频繁的创建和销毁对象, 势必会增加GC
3.2 DataFrame
3.2.1 优点
off-heap类似于地盘, schema类似于地图, 有自己地盘了, 不再受JVM的限制, 也就不再受GC的困扰了 通过schema和off-heap, DataFrame克服了RDD的缺点。对比RDD提升计算效率、减少数据读取、底层计算优化;
3.2.2 缺点
DataFrame克服了RDD的缺点,但是却丢了RDD的优点。 DataFrame不是类型安全的,API也不是面向对象风格的。 // API不是面向对象的 idAgeDF.filter(idAgeDF.col(“age”) > 25) // 不会报错, DataFrame不是编译时类型安全的 idAgeDF.filter(idAgeDF.col(“age”) > “”)
3.2.3 核心特征
????????1.DataFrame的前身是SchemaRDD,不继承RDD,自己实现了RDD的大部分功能,在? ? ? ? ? ?????????DataFrame上调用RDD的方法转化成另外一个RDD ????????2.DataFrame可以看做分布式Row对象的集合,DataFrame 不仅有比RDD更多的算子,还可以? ? ? ? ?进行执行计划的优化; ????????3.DataFrame表示为DataSet[Row],即DataSet的子集 ????????4.Row :被 DataFrame 自动实现,一行就是一个Row对象 ????????5.Schema :包含了以ROW为单位的每行数据的列的信息; Spark通过Schema就能够读懂数? ? ? ? ? ?据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了 ????????6.off-heap : Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中, 当要操作数据? ? ? ? ? ?时, 就直接操作off-heap内存 ????????7.Tungsten:新的执行引擎 ????????8.Catalyst:新的语法解析框架
3.3 Dataset Spark第三代API:Dataset;Dataset的核心:Encoder
3.3.1 区别 DataSet不同于RDD,没有使用Java序列化器或者Kryo进行序列化,而是使用一个特定的编码器进行序列化,这些序列化器可以自动生成,而且在spark执行很多操作(过滤、排序、hash)的时候不用进行反序列化。
3.3.2 特点 编译时的类型安全检查,性能极大的提升,内存使用极大降低、减少GC、极大的减少网络数据的传输,极大的减少scala和java之间代码的差异性。 DataFrame每一个行对应了一个Row。而Dataset的定义更加宽松,每一个record对应了一个任意的类型。DataFrame只是Dataset的一种特例。 不同于Row是一个泛化的无类型JVM object, Dataset是由一系列的强类型JVM object组成的,Scala的case class或者Java class定义。因此Dataset可以在编译时进行类型检查 Dataset以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。 Dataset创立需要一个显式的Encoder,把对象序列化为二进制。 4 SparkSQL API
?
SparkSession:Spark的一个全新的切入点,统一Spark入口;
4.1创建SparkSession val spark = SparkSession.builder .appName() .enableHiveSupport() .getOrCreate() .master() spark.conf.set(“spark.sql.shuffle.partitions”,6) spark.conf.set(“spark.executor.memory”, “2g”)
4.2 核心API sparkSession: spark入口 统一封装SparkConf,SparkContext,SQLContext, 配置运行参数,读取文件,创建数据,使用SQL Dataset: 统一Dataset接口,其中DataFrame==Dataset[Row] 基本实现了类似RDD的所有算子 column: Dataset的列对象 包括对列操作的基本函数 ROW : DataFrame的行对象 包括对行操作的基本函数 Encoder : 序列化 支持常用的数据类型,可以直接序列化,也支持case class自定义数据对象进行序列化 functions: Dataset的内置函数 支持丰富的操作函数(聚合,collection… …) SQlImplict: 隐式转换 其中scala对象RDD转换成DF/DS ,DF/DS使用Map/FlatMap方法等; 要采用的隐式转换格式的 val spark= SparkSession.() import spark.implicts._ 注意 : Dataset是一个类(RDD是一个抽象类,而Dataset不是抽象类),其中有三个参数: SparkSession(包含环境信息) QueryExecution(包含数据和执行逻辑) Encoder[T]:数据结构编码信息(包含序列化、schema、数据类型)
5 基本操作
5.1 Row import org.apache.spark.sql.Row //创建行对象 val row1=Row(1,”ss”,12,2.2) //访问 row1(0) row1.getInt(0)//要与数据的类型对应 row1.getAsInt
5.2 Schema DataFrame(即带有Schema信息的RDD)Spark通过Schema就能够读懂数据 DataFrame中提供了详细的数据结构信息,从而使得SparkSQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么,DataFrame中的数据结构信息,即为schema。
5.3 Schema & StructType & StructField import org.apache.spark.sql.types._
val s=(new StructType) .add(“列名”,”类型”,可是否为空,”备注”) .add(“列名”,”类型”,true/false,”备注”) val s1=(new StructType) .add(“列名”,IntType,可是否为空,”备注”) .add(“列名”,StringType,可是否为空,”备注”) val s2=(new StructType) .add(StructField(“列名”,IntType,false)) .add(StructField(“列名”,StringType,true)) val s3=StructType(StructField(“列名”,StringType,true):: StructField(“列名”,IntType,true)::Nil ) val s4=StructType( (List/Sep) (StructField(“列名”,StringType,true):: StructField(“列名”,IntType,true)::Nil )) 6 Dataset & DataFrame Spark提供了一整套用于操纵数据的DSL (DSL :Domain Specified Language,领域专用语言) DSL在语义上与SQL关系查询非常相近
6.1 Dataset& DataFrame 的创建 ?
由range生成Dataset val numDS = spark.range(5, 100, 5) //降序显示前五个 numDS.orderBy(desc(“id”)).show(5) //显示总数,平均数,偏差,最大值,最小值 numDS.describe().show 多列由集合生成Dataset case class Person(name:String, age:Int, height:Int) val seq1 = Seq(Person(“Jack”, 28, 184), Person(“Tom”, 10, 144)) val ds1 = spark.createDataset(seq1) ds1.show val seq2 = Seq((“Jack”, 28, 184), (“Tom”, 10, 144)) val ds2 = spark.createDataset(seq2) ds2.show 集合转成DataFrame,并修改列名必须有类型 val seq1 =[Sep/List] ((“Jack”, 28, 184), (“Tom”, 10, 144), (“Andy”, 16, 165)) val df1 = spark.createDataFrame(seq1) .withColumnRenamed("_1", “name1”) .withColumnRenamed("_2",“age1”) .withColumnRenamed("_3", “height1”) df1.orderBy(desc(“age1”)).show(10) val df2 = spark.createDataFrame(seq1).toDF(“name”, “age”, “height”).show // 简单!2.0.0的新方法 createDataset 无法运用toDS 修改列名
?
|