IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark SQL 概述 -> 正文阅读

[大数据]Spark SQL 概述

一、SparkSQL的发展

1.1 概述

SparkSQL是Spark?态体系中的构建在SparkCore基础之上的?个基于SQL的计算模块。 SparkSQL的前身不叫SparkSQL,?叫Shark,最开始的时候底层代码优化,sql的解析、执?引擎等等完全基于 Hive,总是Shark的执?速度要?hive?出?个数量级,但是hive的发展制约了Shark,所以在15年中旬的时候, shark负责?,将shark项?结束掉,重新独?出来的?个项?,就是sparksql,不再依赖hive,做了独?的发展, 逐渐的形成两条互相独?的业务:SparkSQL和Hive-On-Spark。在SparkSQL发展过程中,同时也吸收了Shark的一些特点:基于内存的列存储,动态字节码优化技术。

1.2 特点

官网的原话:**Spark SQL is Apache Spark’s module for working with structured data. **
即Spark SQL是Apache Spark处理结构化数据的模块。

  • ?集成

无缝地将SQL查询与Spark程序混合。
Spark SQL允许您使用SQL或熟悉的DataFrame API在Spark程序中查询结构化数据。适用于Java、Scala、Python和R语言。

  • 统一的数据访问

以相同的方式连接到任何数据源。
DataFrames和SQL提供了一种访问各种数据源的通用方法,包括Hive、Avro、Parquet、ORC、JSON和JDBC。您甚至可以通过这些源连接数据。

  • 蜂巢集成

在现有仓库上运行SQL或HiveQL查询。
Spark SQL支持HiveQL语法以及Hive SerDes和udf,允许您访问现有的Hive仓库。

  • 标准的连接

通过JDBC或ODBC连接。
服务器模式为业务智能工具提供了行业标准JDBC和ODBC连接。

1.3 总结

????????SparkSQL就是Spark生态体系中用于处理结构化数据的?个模块。结构化数据是什么?存储在关系型数据库中的数据,就是结构化数据;半结构化数据是什么?类似xml、json等的格式的数据被称之为半结构化数据;非结构化数据是什么?音频、视频、图片等为非结构化数据。 换句话说,SparkSQL处理的就是?维表数据。

二、SparkSQL的编程入口和模型

2.1 SparkSQL编程入口

????????SparkSQL中的编程模型,不再是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,Spark2.0之后将这两个模型进行了合并——SparkSession。SparkSession的构建需要依赖SparkConf或者SparkContext。使用工厂构建器 (Builder方式)模式创建SparkSession。

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

2.2?编程模型简介

主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。

  • SQL

SQL不用多说,就和Hive操作?样,但需要清楚一点,SQL操作的是表,所以如果用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表。

  • DataFrame和Dataset

DataFrame和Dataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说?了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东?,?如表头,表名,字段,字段类型,只有数据。 Dataset是在spark1.6.2开始出现出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫 SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。 ?般的,将RDD称之为Spark体系中的第?代编程模型;DataFrame?RDD多了?个Schema元数据信息, 被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强?的函数式编程)和 DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新?代的编程模型。

2.3?RDD V.S. DataFrame V.S. Dataset

创建DataFrame

1.spark.read.format("xxx")

#?这种方式对应与文件/外部输入 例如csv对应了CSVFileFormat

2.spark.createDataFrame(RDD[A])

[A<:Product] : 元组和样例类都是Product子类型

3.import spark.implicits._? ? // 导入sparkSession中的隐式转换操作

? Seq(("a",1),("b",2),("c",3)).toDF("k","v").show()

Dataset的创建注意事项:

# 不建议使用普通类
class User(val id:Int,val name:String)

val list3 = List(new User(100,"xx"))

spark.createDataset(list3).show  // list3中的类型应该存在Encoder

# 对于普通类型,要么转成样例类,否则需要手动提供Encoder隐式值

implicit val e:Encoder[User] = Encoders.javaSerialization(classOf[User])

class User(val id:Int,val name:String) extends Serializable

2.4?RDD、DataFrame、Dataset之间的转换

  • RDD=>DataFrame? ? ? ? ? ? ?rdd.toDF
  • RDD=>Dataset? ? ? ? ? ? ? ? ? ?rdd.toDS
  • DataFrame=>RDD? ? ? ? ? ? ?df.rdd
  • DataFrame=>Dataset? ? ? ? df.as[User]
  • Dataset=>RDD? ? ? ? ? ? ? ? ? ds.rdd
  • Dataset=>DataFrame? ? ? ??ds.toDF

2.5 DataFrame & Dataset API

网址:http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset

分类:

  • Action算子:collect show
  • Typed transformations 强类型转换 返回Dataset[U]
  • Untyped transformations ?返回DataFrame \Column
  • 其它

三、SparkSQL的数据加载和落地

3.1 数据的加载

SparkSQL中加载外部的数据,使用统一的API入口

spark.read.format(数据?件格式).load(path)

这个方式有更加清晰的简写方式,比如要加载json格式的文件

spark.read.json(path)

注:默认加载的文件格式为parquet

def main(args: Array[String]): Unit = {
 val spark = SparkSession.builder()
 .master("local[*]")
 .appName("SparkSQLLoadAndSave")
 .getOrCreate()

 //加载数据 默认的加载?件的格式为parquet
 var pdf = spark.read.format("json").load("file:///E:/data/spark/sql/people.json")
 //简写?式
 pdf = spark.read.json("file:///E:/data/spark/sql/people.json")

 //parquet
 pdf = spark.read.parquet("file:///E:/data/spark/sql/users.parquet")

 //text 加载普通的?本?件,只能解析成?列
 pdf = spark.read.text("file:///E:/data/spark/sql/dailykey.txt")

 //csv 普通的?本?件,列之间以","作为分隔符
 pdf = spark.read.csv("file:///E:/data/spark/sql/province.csv")
 .toDF("pid", "province", "code", "cid")// 根据需要重新命名列名 数据类型均为字符串

 //orc 是rc?件格式的升级版本
 pdf = spark.read.orc("file:///E:/data/spark/sql/student.orc")

 //jdbc
 val url = "jdbc:mysql://localhost:3306/test"
 val table = "wordcount"
 val properties = new Properties()
 properties.put("user", "bigdata")
 properties.put("password", "sorry")
 pdf = spark.read.jdbc(url, table, properties)
 pdf.printSchema()
 pdf.show()

spark.stop()
}

3.2 数据的落地

????????SparkSQL对数据的落地保存使用的api为:spark.write.save(),需要指定数据的落地格式。

和read的默认格式?样,save的默认格式也是parquet,需要在write和save之间指定具体的格式

format(format) ,同样也有简写方式:spark.write.json/parquet等

def main(args: Array[String]): Unit = {
 val spark = SparkSession.builder()
 .master("local[*]")
 .appName("SparkSQLLoadAndSave")
 .getOrCreate()
 val df = spark.read.orc("file:///E:/data/spark/sql/student.orc")
 /*
 数据的落地
 默认的存储格式为parquet,同时基于snappy压缩?式存储
 # 落地的保存?式SaveMode
 ErrorIfExists:?录存在报错,默认的格式
 Append:在原有的基础之上追加
 Ignore:忽略,如果?录存在则忽略,不存在则创建
 Overwrite:覆盖(删除并重建)
 */
 
df.write.format("json").mode(SaveMode.Overwrite).save("file:///E:/data/spark/sql/stu")
 val url = "jdbc:mysql://localhost:3306/test"
 val table = "student"
 val properties = new Properties()
 properties.put("user", "bigdata")
 properties.put("password", "sorry")
 df.write.mode(SaveMode.Append).jdbc(url, table, properties)
 spark.stop()
 }

四、SparkSQL与Hive的整合

SparkSQL和Hive的整合,是?种比较常见的关联处理方式,SparkSQL加载Hive中的数据进行业务处理,同时将计 算结果落地回Hive中。 ?先将服务器中Hadoop安装路径下的hdfs-site.xml、core-site.xml以及hive中hive-site.xml三个?件拿出来,放到?程中的resource?件夹下。

重点:注意修改每个文件中的路径,一定要改成虚拟机的服务ip地址或者映射名称

import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* 使?SparkSQL来操作Hive
*/
object _04SparkSQLOnHiveOps {
 def main(args: Array[String]): Unit = {
 System.setProperty("hadoop.home.dir","E:\\hadoop-common-2.2.0-bin-master")
 val spark = SparkSession.builder()
 .appName("SparkSQLOnHive")
 .master("local")
 .enableHiveSupport() // 开启hive机制
 .getOrCreate()
 /**
     * 
     如果我们开起了Hive的机制后,并且导?了相关的xml?件,可能会有?个??的问题
     之前读取的本地磁盘?件,就突然找不到了,为什么?
     因为我们使?了hive,那么它默认会搜索Hdfs上的数据?件,所以本地磁盘会找不到?件,这个问题很 
     好解决,在路径前?加上file:///D:\BaiduNetdiskDownload\sql-data
     如果无效,那么请把连接Hive的配置?件删掉,因为现在不需要?它
     */

 val df = spark.read.text("E:\\test.txt")
 df.createTempView("stu")
 spark.sql(s"""
 |select
 | tmp.word,
 | count(tmp.word) as counts
 | from(
 | select
 | explode(split(value," ")) word
 | from stu
 | ) tmp
 | group by tmp.word
 | order by counts desc
 |""".stripMargin).show()

// 获取hive中的表或者数据
// spark.sql("select * from test.fact_access_log")
// val url = "jdbc:mysql://localhost:3306/spark"
// val tab = "stu"
// val prop = new Properties()
// prop.setProperty("user","root")
// prop.setProperty("password","123456")

// 读取MySQL数据
// val df: DataFrame = spark.read.jdbc(url, tab, prop)

// 接下来将数据存?hive
/**
* hive建表语句
* CREATE TABLE `stu` (
* `id` bigint,
* `name` string,
* `age` bigint
* )
*/
// df.write.insertInto("test.stu")
 }
}

注:执行如上代码时,首先要在Hive中先建表?

五、SparkSQL中的函数操作

5.1 函数的定义

SQL中函数,其实就是各大编程语言中的函数,或者方法,就是对某?特定功能的封装,通过它可以完成较为复杂的统计。这里函数的学习,就基于Hive中的函数来学习。

5.2 函数的分类

1)功能上划分

数值

  • round(x,[d]):对x保留d位小数,同时会对结果四舍五入
  • floor(x):获取不大于x的最大整数
  • ceil(x):获取不小于x的最小整数
  • rand():
  • 获取0到1之间的随机数? ?/? 获取表中随机的两条记录
hive> select * , rand() rand from teacher order by rand limit 2;
or
hive> select * from teacher order by rand() limit 2;

数学

  • abs(x):取绝对值

条件

  1. ?if(expr1, expr2, expr3):如果expr1为true,返回expr2,反之返回expr3
  2. case when 多条件表达式

日期

  1. current_date(),获取当前的日期,日期格式为标准格式:yyyy-MM-dd
  2. current_timestamp():获取当前日期的时间戳,格式:yyyy-MM-dd HH:mm:ss.SSS
  3. add_months(start_date, num_months):返回start_date之后num_months月的日期
  4. date_add(start_date, num_days):返回start_date之后num_days天的日期
  5. date_sub(start_date, num_days):返回start_date之前num_days天的日期
  6. next_day(start_date, day_of_week),返回start_date之后最接近的day_of_week对应的日期
  7. dayofmonth(date) 返回date对应月份中的第几天
  8. weekofyear(date) 返回date对应年份中的第几周
  9. minute hour day month year 获取日期中对应的年月日时分
  10. date_format(date,format),返回指定格式化时间
  11. datediff(date1, date2),返回date1和date2之间的差值(单位是天),换句话说就是date1-date2
  12. from_unixtime(unix_time, format)将unix_time转化为格式化时间
  13. to_date(datetime)返回datetime中的日期部分

字符串

注意:数据库中的字符串索引从1开始,而不是0

  1. length(str) 返回字符串str的长度
  2. instr(str, substr),作?等同于str.indexOf(substr)
  3. substr substring(str, pos[, len]):从str的pos位置开始,截取子字符串,截取len的长度,如果不传len,截取余下所有
  4. substring_index(str, delim, count):将字符串str使?delim进?分割,返回强count个使?delim拼接的子字符串
  5. concat(str1, str2)拼接字串
  6. concat_ws(separator, str1, str2):使?指定分隔符来拼接字符串

统计函数

  1. index(arr, n),就是arr(n)获取索弓|n对应的元素
  2. sum、count、max、avg、 min等

特殊

  • array:返回数组
  • collect_set:返回?个元素不重复的set集合
  • collect_list:返回?个元素可重复的list集合
  • split(str, regex):使?regex分隔符将str进?切割,返回?个字符串数组
  • explode(array):将?个数组,转化为多?
  • ?cast(type1 as type2):将数据类型type1的数据转化为数据类型type2

Demo:使用SQL方式统计WordCount

select
 tmp.word,
 count(1) counts
from (
 select
 explode(split(line, "\\s+")) word
 from test_wc
) tmp
group by tmp.word
order by counts desc, tmp.word;

2)实现方式上划分

  • UDF(User Defined function)用户自定义函数

一路输入,一路输出,比如year,date_add, instr

  • UDAF(User Defined aggregation function)?户?定义聚合函数

多路输入,?路输出,常见的聚合函数,count、sum、collect_list

  • UDTF(User Defined table function)?户?定义表函数

一路输入,多路输出,explode

  • 开窗函数

row_number() ——>分组topN的求解

select
 tmp.*
from (
 select
 name,
 age,
 married,
 height,
 row_number() over(partition by married order by height) rank
 from teacher
) tmp
where tmp.rank < 3;

?5.3 自定义函数

5.3.1 概述

????????当系统提供的这些函数,满足不了实际需要时,就需要进行自定义相关的函数,一般自定义的函数分为两种, UDF和UDAF。

5.3.2 UDF

? ? ? ?一路输?,一路输出,完成就是基于scala函数。

????????通过模拟获取字符串长度的udf来学习自定义udf操作。

object _01SparkSQLUDFOps {
 def main(args: Array[String]): Unit = {
 val spark = SparkSession.builder()
 .master("local[*]")
 .appName("SparkSQLUDF")
 .getOrCreate()

 import spark.implicits._
 val rdd = spark.sparkContext.parallelize(List(
 "songhaining",
 "yukailu",
 "liuxiangyuan",
 "maningna"
 ))

//使?sparksession进?udf和udaf的注册
// spark.udf.register[Int, String]("myLen", (str:String) => myStrLength(str))
// spark.udf.register[Int, String]("myLen", str => myStrLength(str))
 spark.udf.register[Int, String]("myLen", myStrLength)
 
 /* private val myLen: UserDefinedFunction = udf((s: String) => {
 s.length
})
 df.select(myLen($"name")).show()s
 */
 val df = rdd.toDF("name")
 df.createOrReplaceTempView("test")

 //求取每个字符串的?度
 val sql =
 """
 |select
 | name,
 | length(name) nameLen,
 | myLen(name) myNameLen
 |from test
 """.stripMargin
 spark.sql(sql).show()
 spark.stop()
 }

 //?定义udf
 def myStrLength(str:String):Int = str.length
}

5.3.3 UDAF

????????多路输入,一路输出,类似combineByKey

????????通过模拟avg函数,来学习如何自定义UDAF操作。

object _02SparkSQLUDAFOps {
 def main(args: Array[String]): Unit = {
 val spark = SparkSession.builder()
 .master("local[*]")
 .appName("SparkSQLUDAF")
 .getOrCreate()

 import spark.implicits._
 val rdd = spark.sparkContext.parallelize(List(
 Student(1, "宋海宁", 168.5, 105.5, 18),
 Student(2, "麻宁娜", 165.0, 101.0, 19),
 Student(3, "刘?媛", 170.5, 108.5, 17),
 Student(4, "蔚凯璐", 172.5, 115, 16)
 ))
 spark.udf.register("myAvg", new MyUDAFAVG)
 val df = rdd.toDS()
 df.createOrReplaceTempView("student")
val sql =
 """
 |select
 | round(avg(height), 1) avg_height,
 | avg(weight) avg_weight,
 | avg(age) avg_age,
 | myAvg(weight) myAvg_wight
 |from student
 """.stripMargin
 spark.sql(sql).show()
 spark.stop()
 }
}
case class Student(id:Int, name:String, height:Double, weight:Double, age:Int)

自定义UDAF

class MyUDAFAVG extends UserDefinedAggregateFunction {
 /*
 指定?户?定义udaf输?参数的元数据
 datediff(date1, date2)
 */
 override def inputSchema: StructType = {
 StructType(List(
 StructField("weight", DataTypes.DoubleType, false)
 ))
 }
 //udaf返回值的数据类型
 override def dataType: DataType = DataTypes.DoubleType
 //udaf?定义函数求解过程中的临时变量的数据类型
 override def bufferSchema: StructType = {
 StructType(List(
 StructField("sum", DataTypes.DoubleType, false),
 StructField("count", DataTypes.IntegerType, false)
 ))
 }
 //聚合函数是否幂等,相同输?是否总是得到相同输出
 override def deterministic: Boolean = true
 /*
 分区内的初始化操作
 其实就是给sum和count赋初始值
 */
override def initialize(buffer: MutableAggregationBuffer): Unit = {
 buffer.update(0, 0.0)
 buffer.update(1, 0)
 }
 /**
 * 分区内的更新操作
 * @param buffer 临时变量
 * @param input ?定义函数调?时传?的值
 */
 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
 buffer.update(0, buffer.getDouble(0) + input.getDouble(0))
 buffer.update(1, buffer.getInt(1) + 1)
 }
 //分区间的合并操作
 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
 buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))
 buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1))
 }
 //udaf聚合结果的返回值
 override def evaluate(buffer: Row): Double = {
 buffer.getDouble(0) / buffer.getInt(1)
 }
}

5.4 多维立方体分析函数

grouping sets 、rollup 、cube 是用来处理多维分析的函数:

  • grouping sets:对分组集中指定的组表达式的每个?集执?group by, group by A,B grouping sets(A,B)就等价于 group by A union group by B,其中A和B也可以是?个集合,比如group by A,B,C grouping sets((A,B),(A,C))。
  • rollup:在指定表达式的每个层次级别创建分组集。group by A,B,C with rollup ?先会对(A、B、C)进行group by,然后对(A、B)进?group by,然后是(A)进?group by,最后对全表进行group by操作。
  • cube : 为指定表达式集的每个可能组合创建分组集。group by A,B,C with cube 首先会对(A、B、C)进行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),(C),最后对全表进行group by操作。

六、SparkSQL之SQL调优

6.1 缓存数据至内存

Spark SQL可以通过调用sqlContext.cacheTable("tableName") 或者dataFrame.cache(),将表??种柱状格式(an inmemory columnar format)缓存至内存中。然后Spark SQL在执行查询任务时,只需扫描必需的列,从而以减少扫描数据量、提高性能。通过缓存数据,Spark SQL还可以自动调节压缩,从而达到最小化内存使用率和降低GC压力的目的。调用sqlContext.uncacheTable("tableName")可将缓存的数据移出内存。

可通过两种方式配置缓存数据功能:

  • 使用SQLContext的setConf方法
  • 执行SQL命令SET key=value
Property NameDefaultMeaning
spark.sql.inMemoryColumnarStorage.compressedtrue如果假如设置为true,SparkSql会根据统计信息?动的为每个列 选择压缩?式进?压缩。
spark.sql.inMemoryColumnarStorage.compressed10000控制列缓存的批量大小。批次?有助于改善内存使?和压缩,但 是缓存数据会有OOM的?险。

6.2 参数调优

可以通过配置下表中的参数调节Spark SQL的性能。

Property NameDefaultMeaning
spark.sql.files.maxPartitionBytes134217728获取数据到分区中的最大字节数。
spark.sql.files.openCostlnBytes4194304 (4MB)该参数默认4M,表示小于4M的小文件会合并到一个分区中,
spark.sql.broadcastTimeout300广播等待超时时间,单位秒。
spark.sql.autoBroadcastJoinThreshold10485760(10 MB)最大广播表的大小。设置为-1可以禁止该功能。当前统计信息仅支持Hive Metastore表。
spark.sql.shuffle.partitions200设置huffle分区数,默认200。

6.3 SQL数据倾斜优化

以group By出现数据倾斜为例进行解决。采用的案例就是wordcount。两阶段聚合进行解决:局部聚合+全局聚合。

object _03SparkSQLDataskewOps {
 def main(args: Array[String]): Unit = {
 Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
 Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
 Logger.getLogger("org.spark_project").setLevel(Level.WARN)
 val spark = SparkSession.builder()
 .master("local[*]")
 .appName("SparkSQLDataskew")
 .getOrCreate()
 val list = List(
 "zhang zhang wen wen wen wen yue yue",
 "gong yi can can can can can can can can can can",
 "chang bao peng can can can can can can"
 )
 import spark.implicits._
 val rdd = spark.sparkContext.parallelize(list)
 val df = rdd.toDF("line")
 df.createOrReplaceTempView("test")
 println("原始表中的数据---------------------")
 df.show()
 println("step 1-----------进?数据拆分-------------")
 var sql =
 """
 |select
 | split(line, '\\s+')
 |from test
 """.stripMargin
 spark.sql(sql).show()
 println("step 2-----------进?列转化为多?数据-------------")
 sql =
 """
 |select
 | explode(split(line, '\\s+')) word
 |from test
 """.stripMargin
 spark.sql(sql).show()
 println("step 3-----------进?添加前缀打散数据-------------")
 sql =
 """
 |select
 | concat_ws("_", cast(floor(rand() * 2) as string), t1.word) prefix_word
 |from (
 | select
 | explode(split(line, '\\s+')) word
| from test
 |) t1
 """.stripMargin
 spark.sql(sql).show()
 println("step 4-----------进?有前缀的局部聚合-------------")
 sql =
 """
 |select
 | concat_ws("_", cast(floor(rand() * 2) as string), t1.word)
prefix_word,
 | count(1) countz
 |from (
 | select
 | explode(split(line, '\\s+')) word
 | from test
 |) t1
 |group by prefix_word
 """.stripMargin
 spark.sql(sql).show()
 println("step 5-----------进?去前缀操作-------------")
 sql =
 """
 |select
 | t2.prefix_word,
 | substr(t2.prefix_word, instr(t2.prefix_word, "_") + 1) up_word,
 | t2.countz
 |from (
 | select
 | concat_ws("_", cast(floor(rand() * 2) as string), t1.word)
prefix_word,
 | count(1) countz
 | from (
 | select
 | explode(split(line, '\\s+')) word
 | from test
 | ) t1
 | group by prefix_word
 |) t2
 """.stripMargin
 spark.sql(sql).show()
 println("step 6-----------进?全局聚合-------------")
 sql =
 """
 |select
 | substr(t2.prefix_word, instr(t2.prefix_word, "_") + 1) up_word,
 | sum(t2.countz) counts
 |from (
 | select
| concat_ws("_", cast(floor(rand() * 2) as string), t1.word)
prefix_word,
 | count(1) countz
 | from (
 | select
 | explode(split(line, '\\s+')) word
 | from test
 | ) t1
 | group by prefix_word
 |) t2
 |group by up_word
 """.stripMargin
 spark.sql(sql).show()
 spark.stop()
 }
 //?定义添加前缀的函数
/* def addPrefix(str:String):String = {
 val random = new Random()
 random.nextInt(2) + "_" + str
 }*/
}

由于处理过程中,使用了两层group By,所以经常将使用sql的处理称之为双重group by。?

参考资料:https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

七、SparkSQL运行架构及原理

Spark SQL对SQL语句的处理与关系型数据库类似,即词法/语法解析、绑定、优化、执行。Spark SQL会先将SQL语句解析成?棵树,然后使用规则(Rule)对Tree进?绑定、优化等处理过程。Spark SQL由Core、Catalyst、 Hive、Hive-ThriftServer四部分构成:

  • Core: 负责处理数据的输?和输出,如获取数据,查询结果输出成DataFrame等
  • Catalyst: 负责处理整个查询过程,包括解析、绑定、优化等
  • Hive: 负责对Hive数据进?处理
  • Hive-ThriftServer: 主要?于对hive的访问

7.1 SparkSQL运行架构

7.2 SparkSQL运行原理

1.使用SessionCatalog保存元数据

  • 在解析SQL语句之前,会创建SparkSession,或者如果是2.0之前的版本初始化SQLContext,SparkSession只是封 装了SparkContext和SQLContext的创建?已。会把元数据保存在SessionCatalog中,涉及到表名,字段名称和字 段类型。创建临时表或者视图,其实就会往SessionCatalog注册

2.解析SQL,使用ANTLR生成未绑定的逻辑计划

  • 当调用SparkSession的sql或者SQLContext的sql方法,我们以2.0为准,就会使用SparkSqlParser进?解析SQL. 使 ?的ANTLR进?词法解析和语法解析。它分为2个步骤来生成Unresolved LogicalPlan:
  1. 词法分析:Lexical Analysis,负责将token分组成符号类
  2. 构建?个分析树或者语法树AST

3 使用分析器Analyzer绑定逻辑计划

  • 在该阶段,Analyzer会使用Analyzer Rules,并结合SessionCatalog,对未绑定的逻辑计划进行解析,生成已绑定的逻辑计划。

4.使?优化器Optimizer优化逻辑计划

  • 优化器也是会定义一套Rules,利用这些Rule对逻辑计划和Exepression进行迭代处理,从而使得树的节点进行和并和优化

5.使用SparkPlanner生成物理计划

  • SparkSpanner使用Planning Strategies,对优化后的逻辑计划进行转换,生成可以执行的物理计划SparkPlan.

6.使用QueryExecution执行物理计划

  • 此时调?SparkPlan的execute?法,底层其实已经再触发JOB了,然后返回RDD
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-17 15:27:43  更:2021-08-17 15:28:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:05:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码