一、从Data Sources创建DataFrames
1、DataFrame的数据源
- DataFrames从数据源读取数据,并将数据写入数据源
- Spark SQL支持广泛的数据源类型和格式
- Text files
- Binary format files(二进制格式文件)
- Apache Parquet, Apache ORC, Apache Avro data format
- Tables
- Cloud
- Such as Amazon S3 and Microsoft ADLS
- 也可以使用自定义或第三方数据源类型
2、DataFrames and Apache Parquet Files
- Parquet是DataFrame数据的一种非常常见的文件格式
- Parquet的特点
- 优化的二进制存储的结构化数据
- 架构元数据被嵌入到文件中
- 高效的性能和大数据量
- 许多Hadoop生态系统工具支持
- Spark、Hadoop MapReduce、Hive等
- 使用parquet-tools查看Parquet文件模式和数据
(1)使用head显示前几条记录
$ parquet-tools head mydatafile.parquet
(2)使用schema查看模式
$ parquet-tools schema mydatafile.parquet
3、从数据源创建DataFrame
- spark.read返回一个DataFrameReader对象
- 使用DataFrameReader设置来指定如何从数据中加载数据源
- format:表示数据源类型,如csv、json、parquet等(默认为parquet)
- option:指定底层数据源的键/值设置
- schema:指定要使用的模式,而不是从数据源推断一个模式
- 基于数据源创建DataFrame
4、示例:从数据源创建DataFrame
python:
myDF = spark.read.format("csv").option("header","true").load("/loudacre/myFile.csv")
python:
myDF = spark.read.format("avro").load("/loudacre/myData.avro")
5、DataFrameReader便捷函数
- 可以为某些格式调用特定格式的加载函数
- 下面两个代码示例是相同的,常规写法与简洁写法
spark.read.format("csv").load("/loudacre/myFile.csv")
spark.read.csv("/loudacre/myFile.csv")
6、指定数据源文件位置
- 从文件数据源读取时,必须指定位置
- 位置可以是单个文件、文件列表、目录或通配符
- 例如
- spark.read.json(“myfile.json”)
- spark.read.json(“mydata/”)
- spark.read.json(“mydata/*.json”)
- spark.read.json(“myfile1.json”,“myfile2.json”)
- 文件和目录由绝对或相对URI引用
- 相对URI(使用默认文件系统)
- 绝对URI
- hdfs://nnhost/loudacre/myfile.json
- file:/home/training/myfile.json
7、从Hive Tables中创建DataFrames
- Apache Hive提供对HDFS中的数据类似数据库的访问
- 将模式(Schemas)应用于HDFS文件
- 元数据存储在Hive metastore中
- Spark可以对Hive tables进行读写操作
- 从Hive metastore推断DataFrame schema
- Spark Hive支持必须开启并配置Hive的位置
python:
usersDF = spark.read.table("users")
8、从内存中的数据创建DataFrame
val mydata = List(("Josiah","Bartlet"),("Harry","Potter"))
val myDF = spark.createDataFrame(mydata)
myDF.show
+------+-------+
| _1| _2|
+------+-------+
|Josiah|Bartlet|
| Harry| Potter|
+------+-------+
二、将DataFrames保存到Data Sources
1、关键的DataFrameWriter Functions
- DataFrame write function返回一个DataFrameWriter
- 将数据保存到一个数据源,如表或一组文件
- 工作原理类似于DataFrameReader
- DataFrameWriter方法
- format指定数据源类型
- mode决定目录或表是否已经存在
- error、overwrite、append或ignore(默认为error)
- partitionBy将数据以表单的形式存储在分区目录中
column=value(和Hive分区一样) - option指定目标数据源的属性
- save将数据以文件的形式保存在指定的目录下
- saveAsTable将数据保存到Hive metastore表中
- 基于Hive仓库默认的数据位置
- 设置path选项以覆盖位置
2、示例:将一个DataFrames保存到Data Sources
- 向一个名为my_table的Hive metastore table写入数据
myDF.write.mode("append").option("path","/loudacre/mydata").saveAsTable("my_table")
- 在mydata目录下以Parquet文件的形式写入数据
myDF.write.save("mydata")
3、保存数据到文件
- 从DataFrame中保存数据时,必须指定一个目录
- Spark将数据保存到该目录下的一个或多个部件文件中
devDF.write.csv("devices")
$ hdfs dfs -ls
devices
Found 4 items
-rw-r--r-- 3 training training 0 … devices/_SUCCESS
-rw-r--r-- 3 training training 2119 … devices/part-00000-e0fa6381-….csv
-rw-r--r-- 3 training training 2202 … devices/part-00001-e0fa6381-….csv
-rw-r--r-- 3 training training 2333 … devices/part-00002-e0fa6381-….csv
三、DataFrame Schemas
1、DataFrame Schemas
- 每个DataFrame都有一个相关的模式(schema)
- 模式(schema)用于定义列的名称和类型
- 模式(schema)是不可变的并在DataFrame创建时定义
myDF.printSchema()
root
|-- lastName: string (nullable = true)
|-- firstName: string (nullable = true)
|-- age: integer (nullable = true)
- 当从数据源创建一个新的DataFrame时,模式可以:
- 当DataFrame被转换(transformation)创建时,Spark会基于查询(query)计算新的模式(schema)。
2、Inferred Schemas(推断模式)
- Spark可以从结构化数据加载模式,例如:
- Parquet、ORC和Avro数据文件 ——schema被嵌入到文件中
- Hive table ——schema在Hive metastore中定义
- Parent DataFrames
- Spark还可以尝试从半结构化数据源推断模式
3、示例:推断CSV文件的模式(没有表头)
spark.read.option("inferSchema","true").csv("people.csv").printSchema()
root
|-- _c0: integer (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: integer (nullable = true)
4、示例:推断CSV文件的模式(有表头)
spark.read.option("inferSchema","true").option("header","true").csv("people.csv").printSchema()
root
|-- pcode: integer (nullable = true)
|-- lastName: string (nullable = true)
|-- firstName: string (nullable = true)
|-- age: integer (nullable = true)
5、Inferred Schemas(推断模式)与Manual Schemas(手动模式)
- 依赖于Spark的自动推断模式的缺点
- 推断时需要初始文件扫描,这可能需要很长时间
- 推断出来的模式对您的用例可能不正确
- 手动定义模式
- 模式Schemas是一个包含StructField列表的StructType对象
- 每个StructField表示模式中的一列,指定了:
- 列的名称
- 列的数据类型
- 数据是否可以为空(可选——默认为true)
6、示例:错误的模式推断
spark.read.option("inferSchema","true").option("header","true").csv("people.csv").printSchema()
root
|-- pcode: integer (nullable = true)
|-- lastName: string (nullable = true)
|-- firstName: string (nullable = true)
|-- age: integer (nullable = true)
7、示例:以编程方式手动定义模式(Python)
from pyspark.sql.types import *
columnsList = [
StructField("pcode", StringType()),
StructField("lastName", StringType()),
StructField("firstName", StringType()),
StructField("age", IntegerType())]
peopleSchema = StructType(columnsList)
8、示例:以编程方式手动定义模式(Scala)
import org.apache.spark.sql.types._
val columnsList = List(
StructField("pcode", StringType),
StructField("lastName", StringType),
StructField("firstName", StringType),
StructField("age", IntegerType))
val peopleSchema = StructType(columnsList)
9、示例:应用手动模式
spark.read.option("header","true").schema(peopleSchema).csv("people.csv").printSchema()
root
|-- pcode: string (nullable = true)
|-- lastName: string (nullable = true)
|-- firstName: string (nullable = true)
|-- age: integer (nullable = true)
四、Eager and Lazy Execution
1、急于执行和懒惰执行
- 一旦代码中出现语句,操作就会立即执行
- 当只在结果被引用时才执行操作时,操作是惰性的
- Spark查询执行起来既迟缓又急切
- DataFrame模式是急切确定的
- 数据转换被延迟执行
- 当在一系列转换中调用一个动作时,会触发惰性执行
2、Eager and Lazy Execution(1)
python:
> usersDF = spark.read.json("users.json")
3、Eager and Lazy Execution(2)
python:
> usersDF = spark.read.json("users.json")
> nameAgeDF = usersDF.select("name","age")
4、Eager and Lazy Execution(3)
python:
> usersDF = spark.read.json("users.json")
> nameAgeDF = usersDF.select("name","age")
> nameAgeDF.show()
5、Eager and Lazy Execution(4)
python:
> usersDF = spark.read.json("users.json")
> nameAgeDF = usersDF.select("name","age")
> nameAgeDF.show()
+-------+----+
| name| age|
+-------+----+
| Alice|null|
|Brayden| 30|
| Carla| 19|
…
五、基本要点
- DataFrames可以从如下几种不同类型的数据源加载和保存
- CSV和JSON等半结构化文本文件
- Parquet、Avro和ORC等结构化二进制格式
- Hive和JDBC表
- DataFrames可以从数据源中推断出一个模式,或者手动定义一个模式
- DataFrame schema是在创建时确定的,但是查询是延迟执行的(当一个动作被调用时)
六、实践练习:使用DataFrames和schema
1、基于Hive Table创建DataFrame
1、本练习使用基于devsh Hive数据库中的accounts表的DataFrame。您可以使用Beeline SQL命令行访问Hive来查看模式
$ beeline -u jdbc:hive2://localhost:10000 -e "DESCRIBE devsh.accounts"
2、如果它还没有运行,请启动Spark shell(您可以选择Scala或Python)。
spark-shell
3、使用Hive表devsh.accounts创建一个新的DataFrame。
pyspark> accountsDF = spark.read.table("devsh.accounts")
scala> val accountsDF = spark.read.table("devsh.accounts")
4、打印模式和DataFrame的前几行,注意模式与Hive表的模式是一致的。 5、使用邮政编码为94913的帐户数据行创建一个新的DataFrame,并将结果保存到HDFS目录/devsh_loudacre/ accounts_zip94913中的CSV文件中。您可以在单个命令中完成此操作,如下所示,也可以使用多个命令。
pyspark> accountsDF.where("zipcode = 94913").write.option("header","true").csv("/devsh_loudacre/accounts_zip94913")
scala> accountsDF.where("zipcode = '94913'").write.option("header","true").csv("/devsh_loudacre/accounts_zip94913")
6、使用hdfs在单独的终端窗口查看hdfs中的/devsh_loudacre/accounts_zip94913目录和其中一个保存文件中的数据。确认CSV文件包含标题行,并且只包含所选邮政编码的记录。 7、可选:尝试基于上面创建的CSV文件创建一个新的DataFrame。比较原始accountsDF和新的DataFrame的模式。有什么不同吗?再次尝试,这次将inferSchema选项设置为true并再次进行比较。
2、定义DataFrame的架构
8、如果您还没有这样做,请查看HDFS文件/devsh_loudacre/devices.json中的数据。 9、基于设备创建一个新的DataFrame。json文件。(该命令在推断模式时可能需要几秒钟时间。)
pyspark> devDF = spark.read.json("/devsh_loudacre/devices.json")
scala> val devDF = spark.read.json("/devsh_loudacre/devices.json")
10、查看devDF DataFrame的模式。注意Spark从JSON文件中推断出的列名和类型。特别要注意的是,release_dt列的类型是string,而列中的数据实际上代表一个时间戳。 11、定义正确指定此DataFrame列类型的模式。首先导入包含必要类和类型定义的包。 注:“_”代表全部,与JAVA的“*”一致意思
pyspark> from pyspark.sql.types import *
scala> import org.apache.spark.sql.types._
12、接下来,创建一个StructField对象集合,这些对象表示列定义。release_dt列应该是一个时间戳。
pyspark> devColumns = [StructField("devnum",LongType()),
StructField("make",StringType()),
StructField("model",StringType()),
StructField("release_dt",TimestampType()),
StructField("dev_type",StringType())]
scala> val devColumns = List(
StructField("devnum",LongType),
StructField("make",StringType),
StructField("model",StringType),
StructField("release_dt",TimestampType),
StructField("dev_type",StringType))
13、使用列定义列表创建模式(StructType对象)。
pyspark> devSchema = StructType(devColumns)
scala> val devSchema = StructType(devColumns)
14、重新创建devDF DataFrame,这次使用新的模式。
pyspark> devDF = spark.read.schema(devSchema).json("/devsh_loudacre/devices.json")
scala> val devDF = spark.read.schema(devSchema).json("/devsh_loudacre/devices.json")
15、查看新DataFrame的模式和数据,并确认release_dt列类型现在是时间戳。 16、既然设备数据使用了正确的模式,那么就以Parquet格式写入数据,这会自动嵌入模式。将Parquet数据文件保存到HDFS的“/devsh_loudacre/devices_parquet”目录下。 17、可选:在单独的终端窗口中,使用parquet-tools查看已保存文件的模式。请先下载HDFS目录(或单个文件),再执行该命令。
$ hdfs dfs -get /devsh_loudacre/devices_parquet /tmp/
$ parquet-tools schema /tmp/devices_parquet/
注意,release_dt列的类型被标注为int96;这就是Spark在Parquet中表示时间戳类型的方式。有关拼花工具的更多信息,请运行拼花工具——help。 18、使用保存在devices_parquet中的Parquet文件创建一个新的DataFrame并查看它的模式。注意,Spark能够从Parquet的嵌入式模式中正确推断release_dt列的时间戳类型。
|