目录
一、什么是SparkSQL
二、SparkSQL特点
三、SparkSQL中两个编程抽象
四、IDEA开发SparkSQL操作Mysql、Hive
1. 添加依赖
2. 读取数据
五、附录
一、什么是SparkSQL
????????SparkSQL是Spark用于结构化数据(structured data)处理的Spark模块
二、SparkSQL特点
- 易整合:无缝的整合了SQL查询和Spark编程
- 统一的数据访问:使用相同的方式连接不同的数据源
- 兼容Hive:在已有的仓库上直接运行SQL或者HiveSQL
- 标准数据连接:通过JDBC或者ODBC来连接
三、SparkSQL中两个编程抽象
DataFrame
- 以RDD为基础的分布式数据集:类似于传统数据库中的二维表格
- 带有schema元信息:DataFrame所表示的二维表数据集的每一列都带有名称和类型
- 支持嵌套数据类型:与hive类似,支持struct、array和map
- 查询计划通过Spark catalyst optimisesr进行优化,性能要比RDD要高
DataSet
- 分布式数据集合:Spark1.6以后添加的一个新抽象,是DataFrame的一个扩展(具备DataFrame的所有特点)
- 用户有好的API风格,既具有类型安全检查,也具有查询优化特性
- 用样例类来对DataSet中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中指端名称
- DataSet是强类型的:例如可以有DAtaSet[Person]
两者关系:DataFrame = DataSet[Row]
- 可以通过as方法将DataFrame转换为DataSet
- Row为一个类型,所有的表结构信息都用Row来表示,获取数据时需要指定顺序
四、IDEA开发SparkSQL操作Mysql、Hive
1. 添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.2</version>
</dependency>
2. 读取数据
? ? ? ??
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object practice {
case class SaleRecord(shopId:String,date:String,volume:String)
def main(args: Array[String]): Unit = {
//导入sql函数包(如:聚合函数,窗口函数,window子句)
import org.apache.spark.sql.functions._
//创建SparkSession
val spark = SparkSession.builder().master("local[*]").appName("practice")
//.enableHiveSupport() //用于连接hive
.getOrCreate()
//导读sparkSession的隐式转换包
import spark.implicits._
//创建Properties配置信息
val pro = new Properties()
pro.setProperty("driver","com.mysql.jdbc.Driver")
pro.setProperty("user","root")
pro.setProperty("password","12345678")
val URL = "jdbc:mysql://192.168.131.200:3306/test?useSSL=false"
val TABLE = "user_info"
//SQL语句操作数据
/*spark.read.jdbc(URL,TABLE,pro)
.createTempView("test")
spark.sql(
"""
|select * from test
|""".stripMargin)
.show()*/
//算子操作数据
/*val frame: DataFrame = spark.read.jdbc(URL, TABLE, pro)
frame.select("*").where($"user_id".gt(15)).show()*/
//创建sparkContext
val sc = spark.sparkContext
//将文件读取到RDD中
val rdd: RDD[SaleRecord] = sc.textFile("file:///D:\\Study\\13_spark\\cha01\\file\\sales5.txt", 5)
.mapPartitions(_.map(line => {
val ps = line.split(",")
SaleRecord(ps(0), ps(1), ps(2))
}))
//创建DateFrame
val frame: DataFrame = spark.createDataFrame(rdd)
//输出表结构及数据类型
//frame.printSchema()
//每个店铺的月销售额及订单数量
frame.select($"shopId".cast("int"),
year($"date").as("year"),
month($"date").as("month"),
dayofmonth($"date").as("day"),
$"volume".cast("decimal(10,3)"))
//日聚合:目的是为了减少月聚合时的数据量
.groupBy($"shopId",$"year",$"month",$"day")
.agg(sum($"volume").as("dailyVolume"),count($"volume").as("dailyCnt"))
.groupBy($"shopId",$"year",$"month")
//越聚合
.agg(sum($"dailyVolume").as("monthlyVolume"),sum($"dailyCnt").as("monthlyCnt"))
//过滤得到月销售额大于等于100000并且月订单数大于1000的数据
.filter($"monthlyVolume".geq(100000) and $"monthlyCnt".gt(1000))
//按月销售额倒序,月订单数升序排序
.sort($"monthlyVolume".desc,$"monthlyCnt".asc)
.show()
//释放sparkContext资源
sc.stop()
//释放sparkSession资源
spark.close()
}
}
五、附录
spark相关依赖
<dependency>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.12</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.2</version>
</dependency>
|