在SparkSql中要求被操作的数据必须是结构化的,所以引入了俩种数据类型,DataFrame和DataSet。DataFrame是spark1.3之后引入的分布式集合,DataSet是spark1.6之后引入的分布式集合。在spark2.0之后,DataFrame和DataSet的API统一了,DataFrame是DataSet的子集,DataSet是DataFrame的扩展。
(type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])
DataFrame
1、DataFrame的创建
方式1、Case class 然后通过RDD.toDF方法获取Dataframe
val subject_formula_rdd: RDD[String] = sc.textFile("D:\\TextFile\\subject_formula.txt")
val Subject_rdd: RDD[Subject] = subject_formula_rdd.map(x => {
val strings = x.split(",")
Subject(strings(0), strings(1), strings(2))
})
val dataFrame: DataFrame = Subject_rdd.toDF()
方式2、自定义schema 然后调用sqlContext的createDataFrame方法,生成Dataframe
val subject_row: RDD[Row] = subject_formula_rdd.map(x => {
val strings = x.split(",")
//Row是一个特殊的类型
Row(strings(0), strings(1), strings(2))
})
//自定义schema
val schema = StructType{
List(
StructField("String", StringType),
StructField("SubjectName", StringType),
StructField("subjectFormula", StringType),
)
}
//把数据封装成RDD[Row] + 自定义的schema信息
val dataFrame1 = sqlContext.createDataFrame(subject_row,schema)
val result = dataFrame1.select("String","SubjectName","subjectFormula")
result.show()
方式3、parkContext、SparkSession直接读取带格式的文件
val frame: DataFrame = sqlContext.read.json("D:\\TextFile\\subject_formula.json")
2、DataFrame的使用
object SqlDemo {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("DataframeDemo").master("local[*]").getOrCreate()
//导入隐式转换
import session.implicits._
//可以从该对象中获取到sparkContext 或者 sqlContext
val sc: SparkContext = session.sparkContext
val sqlContext:SQLContext = session.sqlContext
//创建Dataframe
val subject_formula_rdd: RDD[String] = sc.textFile("D:\\TextFile\\subject_formula.txt")
val Subject_rdd: RDD[Subject] = subject_formula_rdd.map(x => {
val strings = x.split(",")
Subject(strings(0), strings(1), strings(2))
})
val dataFrame: DataFrame = Subject_rdd.toDF()
//两种语法风格
//sql语法
//先把dataFrame注册为一张临时表
dataFrame.registerTempTable("emp")
//再写sql操作
val result1 = sqlContext.sql("select SubjectCode from emp order by SubjectCode")
//默认展示20行
result1.show(100)
//DSL语法 调用方法
//排序 默认升序
dataFrame.orderBy("SubjectCode").select("SubjectCode").show()
sc.stop()
}
}
case class Subject(SubjectCode: String,SubjectName:String,subjectFormula: String)
Dataset
与DataFrame 的不同:Dataset 默认自带schema信息。
如果是Dataset[String]类型,默认的schema信息,列名是value,类型是String。如果是Dataset[(String,Int)]类型,默认的schema信息,列名是_1,_2,也就是获取元组值的名称。类型是由元组的数据类型决定的。
1、dataset的创建
//方式1 直接读取
val subject_Dataset: Dataset[Subject] = session.read.textFile("D:\\TextFile\\subject_formula.txt").map(x=>{
val strings = x.split(",")
Subject(strings(0),strings(1),strings(2))
})
//方式2 通过Session.createDataset()
val subject_formula_rdd: RDD[String] = sc.textFile("D:\\TextFile\\subject_formula.txt")
val Subject_rdd: RDD[Subject] = subject_formula_rdd.map(x => {
val strings = x.split(",")
Subject(strings(0), strings(1), strings(2))
})
val subject_Dataset2: Dataset[Subject] = session.createDataset[Subject](Subject_rdd)
2、dataset的使用
object SqlDemo2 {
def main(args: Array[String]): Unit = {
val session: SparkSession = SparkSession.builder().appName("DataframeDemo").master("local[*]").getOrCreate()
//导入隐式转换
import session.implicits._
//可以从该对象中获取到sparkContext 或者 sqlContext
val sc: SparkContext = session.sparkContext
val sqlContext:SQLContext = session.sqlContext
val subject_Dataset: Dataset[Subject] = session.read.textFile("D:\\TextFile\\subject_formula.txt").map(x=>{
val strings = x.split(",")
Subject(strings(0),strings(1),strings(2))
})
//会有默认的schema:StructType(StructField(_1,StringType,true), StructField(_2,StringType,true), StructField(_3,StringType,true))
println(subject_Dataset.schema)
subject_Dataset.createTempView("v_subject")
val result = session.sql("select * from v_subject where length(subjectFormula)>20")
result.show()
//释放资源
session.stop()
}
}
case class Subject(SubjectCode: String,SubjectName:String,subjectFormula: String)
DataFrame、DataSet、RDD的相互转化
val session: SparkSession = SparkSession.builder().appName("DataframeDemo").master("local[*]").getOrCreate()
//导入隐式转换
import session.implicits._
//可以从该对象中获取到sparkContext 或者 sqlContext
val sc: SparkContext = session.sparkContext
val sqlContext:SQLContext = session.sqlContext
val subject_rdd: RDD[Subject] = sc.textFile("D:\\TextFile\\subject_formula.txt").map(x => {
val strings = x.split(",")
Subject(strings(0), strings(1), strings(2))
})
// RDD -> DataFrame
val dataFrame: DataFrame = subject_rdd.toDF()
// RDD -> Dataset
val dataSet1: Dataset[Subject] = subject_rdd.toDS()
//DataFrame -> Dataset
val dataSet2: Dataset[Subject] = dataFrame.as[Subject]
//Dataset -> DataFrame
val dataFrame2: DataFrame = dataSet1.toDF()
//DataFrame -> RDD
val rdd1: RDD[Row] = dataFrame2.rdd
//Dataset -> RDD
val rdd2: RDD[Subject] = dataSet2.rdd
注意:
相比于DataFrame ,DataSet由于有泛型,操作更方便,但是由于Python不支持泛型,因此仍保留了DataFrame。即Python只能用DataFrame,而Scala和Java建议使用DataSet。.
|