IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> sparksql -> 正文阅读

[大数据]sparksql

sparksql:

Spark SQL是Spark处理数据的一个模块

专门用来处理结构化数据的模块,像json,parquet,avro,csv,普通表格数据等均可。

与基础RDD的API不同,Spark SQL中提供的接口将提供给更多关于结构化数据和计算的信息,并针对这些信息,进行额外的处理优化

操作方式说明:

? ? ?SparkSql shell:

类似于hive shell

? ? ?DataFrames API:

最早专为sql on spark设计的数据抽象,与RDD相似,增加了数据结构scheme描述信息部分。

写spark代码,面向DF(DataFrams缩写)编程,可以与其它Spark应用代码无缝集成。

比RDD更丰富的算子,更有利于提升执行效率、减少数据读取、执行计划优化。

? ? ? DataSets API:

集成了RDD强类型和DataFrames结构化的优点,官方正强力打造的新数据抽象类型。

写spark代码,面向DS编程,可以与其它Spark应用代码无缝集成。

比RDD更丰富的算子,更有利于提升执行效率、减少数据读取、执行计划优化。

? ? ? ? ?面向程序接口对接的操作:通过JDBC、ODBC等方式操作SparkSql

通过jdbc、odbc链接后,发送相关的sparksql请求,实现基于sparksql功能开发。

? ??

? ? ?SparkSQl特点:

??

可以利用SQL、DataFrams API、DataSets API或其它语言调用的基于sparksql模块计算,均是sparkcore执行引擎,其对计算的表达是独立的,即开发人员可以轻松在不同API之间切换实现相同的功能。

也可以通过命令行、JDBC、ODBC的方式来操作SparkSQL,方便其它数据平台、BI平台使用SparkSql模块。

在spark应用程序开发中,可以无缝使用SparkSql操作数据。

可以直接使用Hive表格数据。

与Hive的兼容性极好:它复用了Hive的前端(去掉驱动mapreduce执行任务的部分)和元数据,因此可以拿过来hivesql的东西在sparksql上运行即可。

并不是100%完全兼容,但绝大多数情况下,不需要改动,或只需要极小的改动!!!

比如个别版本不支持直接insert into table xxx values(xxx...)的插入数据的方式

SparkSql的应用中,sql是一个重要方面,但不局限制sql。

sparksql操作代码:

1.SparkSql Shell操作SparkSql:
//直接输入spark-sql+自己想要添加的参数即可,与spark-shell相似

spark-sql [options]

//如指定运行模式

spark-sql local[*]

//如指定运行spark webui服务的端口,解决多人共用一个入口机时候的进入时候报port bind exception的问题

spark-sql --conf spark.ui.port=4075

//也可以用于似于hive -e的方式,直接直接一段sparksql代码

spark-sql –e “sparksql code”

2.DataFrames API 操作 SparkSql
与之前的Sbt构建SparkWordCount步骤完全一样。
1.6版本

package?com.tl.job003.sql

import?org.apache.spark.SparkConf

import?org.apache.spark.SparkContext

import?org.apache.spark.sql.SQLContext

object?SparkSqlTest?{

??def?main(args: Array[String]) {

???val?conf?=?new?SparkConf()

???conf.setMaster("local");

???conf.setAppName("TestSparkSql");

???val?sc?=?new?SparkContext(conf)

???val?sqlContext?=?new?SQLContext(sc)

???// 添加将RDD转化为DataFrame的功能包引入

???import?sqlContext.implicits._

???val?df?=?sqlContext.read.json("file:\\E:\\test\\job003\\sparksql\\input_weibo.json")

???df.show();

???sc.stop();

? }

}

  • DataFrame常用操作
    • 常用命令
      • show
      • printSchema
      • select
      • filter
      • groupBy
      • count
      • orderBy
      • 其它操作类比于sql即可
    • 综合示例-1.6.x

package com.tl.job003.sql

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.sql.SQLContext

object SparkSqlTest {

? def main(args: Array[String]) {

?? val conf = new SparkConf()

?? conf.setMaster("local");

?? conf.setAppName("TestSparkSql");

?? val sc = new SparkContext(conf)

?? val sqlContext = new SQLContext(sc)

?? // 添加将RDD转化为DataFrame的功能包引入

?? import sqlContext.implicits._

?? val df = sqlContext.read.json("file:\\E:\\test\\job003\\sparksql\\input_weibo.json")

?? //默认显示内容的top20

?? df.show()

?? // 打印内容对应的表结构

?? df.printSchema()

?? // 选择内容当中的某一个列对应的内容

?? df.select("content").show()

?? // 选择任意列并进行自定义操作

?? df.select(df("content"), df("commentCount"), df("commentCount") + 10000).show()

?? // 选择评论数大于100的数据显示出来

?? df.filter(df("commentCount") > 100).show()

??

?? // 按userId进行分组计数统计

?? df.groupBy("userId").count().show()

??

?? //分组统计结果按默认升序排列

?? df.groupBy("userId").count().orderBy("count").show()

??

?? //分组统计结果按降序排列

?? import org.apache.spark.sql.functions._

?? df.groupBy("userId").count().orderBy(desc("count")).show()

?? sc.stop();

? }

}

  • 创建DataFrames-2.3.x

package com.tl.job011.sparksql

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.SparkSession

object TestSparkSqlFor2_3_x {

??def main(args: Array[String]): Unit = {

????//1、构建spark session

????val sparkSession = SparkSession

??????.builder()

??????.appName("SparkSql-2.3.2-TestCase")

??????.master("local[*]")

??????.getOrCreate()

????// For implicit conversions like converting RDDs to DataFrames

????import sparkSession.implicits._

????//2、构建data frames

????val df = sparkSession.read.json("F:\\test_sbt\\FirstSpark4Scala\\input_json.txt")

????//3、df算子操作

????df.show()

????val cc = df.count()

????println("数据共有多少行=" + cc)

????

????//4、停掉相关会话

????sparkSession.stop()

??}

}

  • 综合示例-2.3.x

package com.tl.job011.sparksql

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.SparkSession

object TestSparkSqlFor2_3_x {

??def main(args: Array[String]): Unit = {

????//1、构建spark session

????val sparkSession = SparkSession

??????.builder()

??????.appName("SparkSql-2.3.2-TestCase")

??????.master("local[*]")

??????.getOrCreate()

????// For implicit conversions like converting RDDs to DataFrames

????import sparkSession.implicits._

????//2、构建data frames

????val df = sparkSession.read.json("F:\\test_sbt\\FirstSpark4Scala\\input_json.txt")

????//3、df算子操作 ???

????// 打印内容对应的表结构

????df.printSchema()

????// 选择内容当中的某一个列对应的内容

????df.select("content").show()

????// 选择任意列并进行自定义操作

????df.select(df("content"), df("commentCount"), df("commentCount") + 10000).show()

????// 选择评论数大于100的数据显示出来

????df.filter(df("commentCount") > 100).show()

????// 按userId进行分组计数统计

????df.groupBy("userId").count().show()

????//分组统计结果按默认升序排列

????df.groupBy("userId").count().orderBy("count").show()

????//分组统计结果按降序排列

????import org.apache.spark.sql.functions._

????df.groupBy("userId").count().orderBy(desc("count")).show()

????//4、停掉相关会话

????sparkSession.stop()

??}

}

  • RDD与DataFrame互操作
    • 将RDD转化成DataFrame(将无结构化数据转化成有结构化数据)
      • 将一个RDD转化为带Scheme的DataFrame
      • 实现转化的方式有两种
        • 反射推断
        • 程序编码实现数据与结构的对应,达到转化目标。(重点)
      • 1.6.x的代码实现

package com.tl.job011.sparksql

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.Row

object TestRddToDataFrameFor1_6_x {

??def main(args: Array[String]): Unit = {

????//1、构建spark context,构建sql context

????val conf = new SparkConf

????conf.setAppName("SparkSql-Test")

????conf.setMaster("local[*]")

????val sc = new SparkContext(conf)

????val sqlContext = new SQLContext(sc)

????// 解决隐式转换问题, 如RDDs to DataFrames

????import sqlContext.implicits._

????//2、构建scheme

????val schema =

??????StructType(

????????"stdNo name classNo className".split(" ").map(fieldName => StructField(fieldName, StringType, true)))

????//3、构建rdd,linesRDD

????val studentRDD = sc.textFile("F:\\test_sbt\\FirstSpark4Scala\\student_mysql.txt")

????//4、将linesRDD转换成Row rdd

????val rowRDD = studentRDD.map(_.split("\\t")).map(p => Row(p(0), p(1), p(2), p(3)))

????//5、创建df,由row rdd + scheme

????val studentDataFrame = sqlContext.createDataFrame(rowRDD, schema)

????//6、df算子操作

????studentDataFrame.printSchema()

????studentDataFrame.show()

????//7、停掉相关会话

????sc.stop()

??}

}

      • 2.3.x的代码实现

package com.tl.job011.sparksql

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.Row

object TestRddToDataFrameFor2_3_x {

??def main(args: Array[String]): Unit = {

????//1、构建spark session

????val sparkSession = SparkSession

??????.builder()

??????.appName("SparkSql-2.3.2-TestCase")

??????.master("local[*]")

??????.getOrCreate()

????// For implicit conversions like converting RDDs to DataFrames

????import sparkSession.implicits._

????//2、构建scheme

????val schema =

??????StructType(

????????"stdNo name classNo className".split(" ").map(fieldName => StructField(fieldName, StringType, true)))

????//3、构建rdd,linesRDD

????val studentRDD = sparkSession.sparkContext.textFile("F:\\test_sbt\\FirstSpark4Scala\\student_mysql.txt")

????//4、将linesRDD转换成Row rdd

????val rowRDD = studentRDD.map(_.split("\\t")).map(p => Row(p(0), p(1), p(2), p(3)))

????//5、创建df,由row rdd + scheme

????val studentDataFrame = sparkSession.createDataFrame(rowRDD, schema)

????//6、df算子操作

????studentDataFrame.printSchema()

????studentDataFrame.show()

????//7、停掉相关会话

????sparkSession.stop()

??}

}

    • 将DataFrame转化成RDD

//提供直接转化成RDD的方式,即df.rdd即可

studentDataFrame.rdd

    • SparkSql临时表生成(内存中存放的表,应用结束即消失)
      • 直接通过df注册生成

//将df对象直接注册成一张临时表

df.registerTempTable("weibo_doc"); ???//1.6.x版本

df. createTempView ("weibo_doc"); ???//2.3.x版本

    • 将RDD转换成DF后,再如上例注册成临时表
    • 临时表与DataFrame互操作
      • 应用开发中,使用SqlContext调用sql语句生成DataFrame
      • 综合示例-融合1.6.x和2.3.x?

package com.tl.job003.sql

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.sql.SQLContext

object SparkSqlTest {

? def main(args: Array[String]) {

?? val conf = new SparkConf()

?? conf.setMaster("local");

?? conf.setAppName("TestSparkSql");

?? val sc = new SparkContext(conf)

?? val sqlContext = new SQLContext(sc)

?? // 添加将RDD转化为DataFrame的功能包引入

?? import sqlContext.implicits._

?? val df = sqlContext.read.json("file:\\E:\\test\\job003\\sparksql\\input_weibo.json")

?? //注册为名称为weibo_doc的临时表-1.6.x

?//? df.registerTempTable("weibo_doc");

???//注册为名称为weibo_doc的临时表-2.3.x

?? df. createTempView ("weibo_doc");

?

? //执行该sql,并做相应的dataframe操作即可

???sqlContext.sql("select userId,content from weibo_doc order by commentCount desc").show();

?

?? sc.stop();

? }

}

  • DataFrame数据持久化
    • parquet数据格式,默认的输入和输出均为该格式
    • spark自带:天然集成,强力推荐的数据格式
  • parquet产生背景
    • 面向分析型业务的列式存储格式
    • 由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache顶级项目.
    • Twitter的日志结构是复杂的嵌套数据类型,需要设计一种列式存储格式,既能支持关系型数据(简单数据类型),又能支持复杂的嵌套类型的数据,同时能够适配多种数据处理框架。
  • parquet的优点
    • 压缩数据,内部自带gzip压缩
    • 不失真
    • 减少IO吞吐量
    • 高效的查询
    • 多数据处理平台,均支持parquet,包括hive等。
  • 综合示例实现-2.3.x(与1.6.x差距核心在sparkSession抽象)

package com.tl.job011.sparksql

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.Row

object TestSparksqlPersistFor2_3_x {

??def main(args: Array[String]): Unit = {

??????//1、构建spark session

????val sparksql = SparkSession

??????.builder()

??????.appName("SparkSql-2.3.2-TestCase")

??????.master("local[*]")

??????.getOrCreate()

????// For implicit conversions like converting RDDs to DataFrames

????import sparksql.implicits._

????//2、构建scheme

?????val df = sparksql.read.json("F:\\test_sbt\\FirstSpark4Scala\\input_json.txt")

????//3、注册成表

????df.createTempView("weibo")

????//4、df算子操作

????var resultDF = sparksql.sql("select * from weibo")

????resultDF.repartition(1).write.format("parquet").save("F:\\test_sbt\\FirstSpark4Scala\\save3")

????//5、停掉相关会话

????sparksql.stop()

??}

}

4、DataSets API操作SparkSql

  • 1开发环境搭建步骤
    • 与DataFrames完全相同
  • 2?DataSets相关操作
    • 操作说明
      • DataSet集成了RDD和DataFrame的优点,也称为强类型的DataFrame。
      • DataSets和DataFrames具有完全相同的成员函数。
      • 两者中,每个行的数据类型不同。DataFrame也可以叫Dataset[Row],即DataFrame是Dataset的一种特定形式。而DataSet的每一行是不固定的,需要模式匹配来确定。
    • 版本说明
      • 在1.6.2版本DataSet为alpha版测试功能,API方面均没有得到丰富和完善。
      • 在2.0.0开始DataSet得到正式推广使用,由于其API和DataFrame在成员函数中完全对等,在使用上差异极小,由于是强类型,故仅在数据集case class模式匹配时,有明显差别。
    • DataSets的Spark2.3.2版本上的应用

package?com.tl.job011.sparksql

import?org.apache.spark.sql.SparkSession

//样例类

case?class?Student(name: String, age: Long, address: String)

/**

?* 抽象数据类型DataSet测试类

?*/

object?TestSparkSqlDataSet?{

??def?main(args: Array[String]): Unit = {

????//1、构建spark session

????val?sparkSession?= SparkSession

??????.builder()

??????.appName("SparkSql-2.3.2-TestCase")

??????.master("local[*]")

??????.getOrCreate()

????//引入自动隐式类型转换

????import?sparkSession.implicits._

????// 从基础数据对象类型创建DataSet

????val?primitiveDS?= Seq(1, 2, 3).toDS()

????val?col?= primitiveDS.map(_ +?1).collect() // Returns: Array(2, 3, 4)

????col.foreach(println)

????println("-----------------")

????primitiveDS.show()

????// 已为样例类case class创建完成编码类Encoder

????val?caseClassDS?= Seq(Student("脱口秀大会", 3, "北京")).toDS()

????caseClassDS.show()

????

????// 指定相应的文件导入形成样例类对应的DataSet,通过json的key和样例类的字段名称对应即可

????val?path?= "F:\\test_sbt\\FirstSpark4Scala\\student_data.txt"

????val?peopleDS?= sparkSession.read.json(path).as[Student]

????peopleDS.select("name", "age", "address").show()

????//关停会话上下文

????sparkSession.stop()

??}

}

5、多数据集抽象类型对比分析

  • spark抽象数据集列表
    • RDD
    • DataFrame
    • DataSet
  • 相同点
    • 全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
    • 三者都有惰性机制,在进行Transform操作时不会立即执行,在遇到Action操作时会正式提交作业执行。
    • 均采用spark的内存运算和优化策略,内存使用和执行效率上均可以得到保障。
    • 均有partition的概念,便于分布式并行计算处理,达到分而治之。
    • 均有许多共同的函数,如map、filter、sort等。
    • 在进行三者的相关操作时候,个别特殊操作时必须引入一个相同的包依赖。( 早期称为 import?sqlContext.implicits._,最新版本称为import spark.implicits._)
    • DF和DS均可以通过模式匹配获取内部的变量类型和值。
    • DF和DS产生于SparkSql,天然支持SparkSql。
  • 区别点
    • RDD
      • 不支持SparkSql操作,均需进行转成DF或是DS才行。
      • 类型是安全的,编译时候即可检查出类型错误。(强类型)
      • 机器间通信、IO操作均需要序列化、反序列化对象,性能开销大。
    • DataFrame
      • 有scheme的RDD:比RDD增加了数据的描述信息。
      • 比RDD的API更丰富,增加了针对结构化数据API。
      • 只有一个固定类型的DataSet,即为DataFrame=DataSet[Row]
      • 序列化和反序列化时做了结构化优化,减少了不必要的结构化信息的序列化,提高了执行效率。
    • DataSet
      • 强类型的DataFrame,与DF有完全相同的成员函数。
      • 每行的类型不固定,需要使用模式匹配case class后,获取实际的类信息、字段类型、字段值。
      • 访问对象数据时,比DF更加直接简单。
      • 在序列化和反序列化时,引入了Encoder机制,达到按需序列化和反序列化,不必像之前整个对象操作了,进一步提高了效率。
  • 应用场景
    • 使用RDD场景
      • 数据为非结构化,如流媒体等数据
      • 对数据集进行底层的转换、处理、控制
      • 不需要列式处理,而是通过常规的对象.属性来使用数据。
      • 对DF、DS带来的开发效率、执行效率提升不敏感时
    • 使用DF(必须)
      • R或是python语言开发者,使用DF
    • 使用DS(必须)
      • 在编译时就有高度的类型安全,想要有类型的JVM对象,用上Catalyst优化,并得益于Tungsten生成的高效代码
    • 使用DF、DS场景
      • 需要丰富的语义、高级抽象和特定领域专用的API时
      • 处理需要对半结构化数据进行高级处理,如filter、map、aggregation、average、sum、SQL查询、列式访问或使用lambda函数
      • 在不同的Spark库之间使用一致和简化的API
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-15 15:56:05  更:2021-11-15 15:57:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 5:39:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码