1、SparkSQL介绍
Hive是shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是为了完全脱离Hive的限制。
2、SparkSQL 创建 DataFrame 的方式
在使用SparkSQL时Scala2.0+的版本创建的方式 val session: SparkSession = SparkSession .builder() .appName(“test”) .enableHiveSupport() .getOrCreate() session.sparkContext.setLogLevel(“Error”)
2.1、读取json格式文件创建DataFrame session.read.format(“json”).load("./data/json")
2.2、通过json格式的RDD创建DataFrame 在读取是要隐式转换, 在读取之前导入 import session.imlicits._ json数据要转换为DataSet格式数据
2.3、非json格式的RDD创建DataFrame 通过反射的方式将非json 格式的 RDD 转换成 DataFrame(不建议使用)
2.4、动态创建Schema将非json格式RDD转换成DataFrame
2.5、读取parquet文件创建DataFrame
读取parquet文件时,先要读取json文件,再以parquet的格式写入,再以session.read.format(“parquet”).load("./data/parquet")
读取json格式的数据和非json格式的数据,都要将数据转换为DataSet的格式,DataFrame是DataSet的Row版本
3、Spark on hive
3.1、配置 在你的spark的client端的spark的conf目录下,创建hive-site.xml文件写入以下配置,注意节点也是hive服务端 hive.metastore.uris thrift://mynode1:9083
4、SparkSQL UDF与UDAF的区别
UDF:用户自定义函数。 可以自定义类实现 UDFX 接口。 UDAF:用户自定义聚合函数。 实现 UDAF 函数如果要自定义类要继承 UserDefinedAggregateFunction 类
开窗函数 row_number over(partition x1 order by x2)对表中的数据按照X1分组,按照X2排序,对每个分组内的数据标号,标号在每个分组内从1开始,每个分组内非标号是连续的 rank()over(partition x1 order by x2)对表中的数据按照X1分组,按照X2排序,对每个分组内的数据标号,标号在每个分组内从1开始,每个分组内非标号是不连续的 dense_rank() over (partition x1 order by x2)对表中的数据按照X1分组,按照X2排序,对每个分组内的数据标号,标号在每个分组内从1开始,每个分组内的标号连续且相同的数据标号相同
|