数据源API结构
Read API 结构
读取数据的核心结构如下:
DataFrameReader.format(...).option("key","value").schema(...).load()
使用此格式可以读取所有数据源,format 是可选的,默认情况下 Spark 将使用 Parquet 格式,option 能配置键值对(key-value) 来参数化读取数据的方法。最后,如果数据源包含某种 Schema 或者你想使用模式推理(Schema inference),则可以选择指定 schema 。
数据读取基础
Spark 数据读取使用DataFrameReader,通过 SparkSession 的 read 属性得到:spark.read 有了DataFrame reader 之后,还需要指定几个值:
- format
- schema
- read模式
- 一些列option选项
format,option 和Schema 都会返回一个DataFrameReader,它可以进一步的转换,并且都是可选的(那些仅有唯一可选项的就只能选择唯一的可选项)。每个数据源都有一组特定的选项,用于设置如何将数据读入Spark 。至少需要为DataFrameReader 提供一个读取路径 下面是一个整体结构的例子:
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.schema(someSchema)
.load(inputPath)
spark 的读取模式:
读取模式 | 说明 |
---|
permissive | 当遇到错误格式的记录是,将所有字段设置为null并将所有错误格式的记录放在名为_corrupt_record 字符串列中 | dropMalformed | 删除包含错误格式记录的行 | failFast | 遇到错误格式的记录后立即返回失败 |
默认是permissive
Write API 结构
写数据的核心结构如下:
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
使用此格式可以向所有数据源写入数据。format 是可选的,默认情况下是Spark 使用Parquet 格式,option 任用于配置写出数据的方法,partitionBy,bucketBy,sortBy 仅适用于基于文件的数据源,可以使用这些方法来控制写出目标文件的具体结构。
写数据基础
写数据与读取数据非常相似,不同的是,需要用到的是DataFrameWriter 而不是DataFrameReader ,因为总是需要将数据写入一些给定数据源中,所以通过每个DataFrame的writer 属性来获得DataFrameWriter,有了DataFrameWriter 之后。我们需要指定三个值:format、一系列option 选项和save 模式,并且必须至少提供一条写入路径(来制定目标地址) 下面是一个整体结构的例子:
dataframe.write.format("csv")
.option("mode", "OVERWRITE")
.option("dateFormat", "yyyy-MM-dd")
.option("path", "path/to/file(s)")
.save()
Spark 的保存模式
保存模式 | 说明 |
---|
append | 将输出文件追加到目标路径已存在的文件上或目录的文件列表 | overwrite | 将完全覆盖目标路径中已存在的任何数据 | errorIfExists | 如果目标路径已存在数据或文件,则抛出错误并返回写入操作失败 | ignore | 如果目标路径已存在数据或文件,则不执行任何操作 |
默认情况是 :errorIfExists
并行写数据(重点)
写数据涉及的文件数量取决于DataFrame 的分区数。默认情况是每个数据分片都会有一定的数据写入,这意味着虽然我们指定大的是一个“文件”,但实际上它是由一个文件夹的多个文件组成,每个文件对应着一个数据分片。 例子:**将DataFrame 中数据保存到单个文件中 ** 方法1:
csvFile.repartition(1).write.format("csv").save(outputPath)
方法2:
csvFile.coalesce(1).write.format("csv").save(outputPath)
|