1 通过数据源创建DF
原始数据:
{"name":"Tom","age":18},
{"name":"Alice","age":17}
步骤:
scala> val df = spark.read.json("file:///opt/module/spark/mycode/a.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.createOrReplaceTempView("people")
scala> spark.sql("select * from people").show
+---+-----+
|age| name|
+---+-----+
| 18| Tom|
| 17|Alice|
+---+-----+
2 通过反射机制创建DF
原始数据:
Tom,18
Alice,17
步骤:
scala> import spark.implicits._
case class People(name:String, age: Int)
scala> val rdd = sc.textFile("file:///opt/module/spark/mycode/b.txt")
rdd: org.apache.spark.rdd.RDD[String] = file:/
scala> val df = rdd.map(_.split(",")).map(data=>People(data(0),data(1).toInt)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> df.show
+-----+---+
| name|age|
+-----+---+
| Tom| 18|
|Alice| 17|
+-----+---+
注意,import spark.implicits._ 中的spark指的是SparkSession对象(在scala交互式环境中默认存在了一个SparkSession对象)。
启动spark-shell时的信息:
3 使用编程定义方式创建DF
当无法提前定义case class时,就需要采用编程方式定义RDD模式
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> val schemaString = "name age"
schemaString: String = name age
scala> val fields = schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,nullable=true))
fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,StringType,true))
scala> val schema = StructType(fields)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,StringType,true))
scala> val rowRDD = rdd.map(_.split(",")).map(data=>Row(data(0),data(1)))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[19] at map at <console>:29
scala> val df = spark.createDataFrame(rowRDD, schema)
df: org.apache.spark.sql.DataFrame = [name: string, age: string]
scala> df.show
+-----+---+
| name|age|
+-----+---+
| Tom| 18|
|Alice| 17|
+-----+---+
|