IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark - parquet 加载时间过长 && parquet 指定 schema 无数据 -> 正文阅读

[大数据]Spark - parquet 加载时间过长 && parquet 指定 schema 无数据

一.引言

Parquet 是一种列式存储格式,常用于表结构数据存储,也是 sparkSql 的默认存储格式。spark 读取 parquet 文件时,偶发读取时间过长,正常? parquet 时间在 1-5 s,异常期间最长可达 10 min +,于是开始踩坑之旅。下面是读取日志,正常情况只需 1s 以内,异常时却需要很久。

二.Parquet 读取问题定位与解决

1.代码变化 && 数据变化

1s 左右的加载时间和出现异常导致 10min + 的加载时间前后代码没有发生过变化,唯一的改变就是读取的 parquet 数量增加了一倍,总大小未改变。因此问题应该出在和 parquet 相关的代码上,这里执行的是官方的 api :

spark.read.parquet(path)

?所以解决问题就需要看下读取 parquet 的具体逻辑了。

2.Schema?Infer And Merge

spark 读取 read parquet 时有一个参数 .schema 用于指定 parquet 的列名与列属性,如果直接调用 .read.parquet() 时,sparkSession 需要自己 infer parquet 的 schema , schema 可以理解为 parquet 每列数据的属性,包含 name,type 等等:

override def inferSchema(
     sparkSession: SparkSession,
     parameters: Map[String, String],
     files: Seq[FileStatus]): Option[StructType] = {
  val parquetOptions = new ParquetOptions(parameters, sparkSession.sessionState.conf)

  // Should we merge schemas from all Parquet part-files?
  val shouldMergeSchemas = parquetOptions.mergeSchema

  ...

  ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
}

这里截距了一部分代码,其中 parquetOptions.mergeSchema?参数代表是否合并 schema,默认为 true,在 spark 中配置该参数:

conf spark.sql.parquet.mergeSchema=true \

这段代码还有一段解释含义大致如下 :

如果用户指定 mergeSchema?= false 参数,程序会首先尝试摘要文件,如果没有摘要文件,则返回某个随机的部分文件,所以当该参数为 false 时,infer 会随机选取 parquet 文件进行推断,如果 所有 parquet 文件格式一致则不受影响,如果不一致则会 infer 出错。

如果用户指定 mergeSchema = true? 参数,?则程序认为所有的部分文件都是相同的,此时如果出现 parquet 不匹配,例如名称匹配,类型不匹配时 infer 会报错。但是当文件过多时,如果串行执行效率会很低,所以这里采取并行处理的方式进行 schema 推断,也就是上面 infer 函数的最后一行 mergeSchemaInParallel :

  def mergeSchemasInParallel(
      filesToTouch: Seq[FileStatus],
      sparkSession: SparkSession): Option[StructType] = {
    val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
    val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
    val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())

    val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
      sparkSession.sparkContext.defaultParallelism)

    val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles

    // Issues a Spark job to read Parquet schema in parallel.
    val partiallyMergedSchemas =
      sparkSession
        .sparkContext
        .parallelize(partialFileStatusInfo, numParallelism)
        .mapPartitions { iterator =>
          // Resembles fake `FileStatus`es with serialized path and length information.

          ...

        }.collect()

    if (partiallyMergedSchemas.isEmpty) {
      None
    } else {
      var finalSchema = partiallyMergedSchemas.head
      partiallyMergedSchemas.tail.foreach { schema =>
        try {
          finalSchema = finalSchema.merge(schema)
        } catch { case cause: SparkException =>
          throw new SparkException(
            s"Failed merging schema:\n${schema.treeString}", cause)
        }
      }
      Some(finalSchema)
    }
  }

这里给出部分代码,如果想要查看完整的 infer,mergeSchemaInParallel 代码可以查看 :

org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

上述代码通过?parallelize +?mapPartitions 实现 schema 的并行推断,这里涉及到 fileStatus?信息与 Executor 的信息交互,最终合并得到?partiallyMergedSchemas[iterator],所有 parquet 得到的 scheme 构成了一个迭代器,然后通过可变变量 var finalSchema 进行遍历与合并,结果返回 Some(finalSchema),代表推断与merge的结果不可控。

3.原因分析

上面只是简单看了源码的实现逻辑,结合前面的代码、数据变化,在 mergeShema 参数为 true 时 读取 parquet 中涉及和 parquet 数量有关的操作有下述:

(1) parallel + mapPartitions:在 numParallelism 数量不变时,parquet 数量增加,则 mapPartitions 处理的 partition 数量增加,耗时增加

(2) fileStatus 与 Executor 的通信:parquet 数量增多时,FileStatus 增多,executor 通信量增大,耗时增加,查看运行异常日志发现任务所在机器已出现 IO 负载较大的情况:

?(3) merge:处理数据量增多,iterator 数量增加,merge 操作增加,耗时增加

4.问题解决

基于上面耗时的分析,问题解决也很简单,减少 parquet 数量,避免 mergeSchema:

A.?spark.sql.parquet.mergeSchema = false

最偷懒的办法,如果可以保证每个 parquet 数据的结构一致性,则可以取消该参数,只选用少量 parquet 文件进行 infer 推断,这样处理文件的数量变小,推断 schema 的速度也会相应增加。

B.?spark.sql.parquet.mergeSchema = true & 不指定 schema

该参数默认为 true,在不手动指定的情况下,可以人为缩减 parquet 数量,这样并行执行的效率会得到提升

C.指定 schema

spark.read.parquet 方法支持直接传入 schema :

spark.read.schema(TABLE_SCHEME).parquet(path)

该方法会指定 Schema 从而避免后续复杂的 infer 阶段,从而跳过 parquet 阶段,直接进入下一个 action 算子。

  def schema(schema: StructType): DataFrameReader = {
    this.userSpecifiedSchema = Option(schema)
    this
  }

三.Read?parquet 指定 Schema

1.配置 schema

parquet 采用列式存储,相关的存储类型上一篇文章?Spark Sql:RDD - DF 互转?中已经提到过,

第一个参数代表列名,第二个参数代表列属性 :

  final val TABLE_SCHEME = StructType(Array(
    StructField("A", StringType),
    StructField("B", StringType),
    StructField("C", StringType),
    StructField("D", StringType),
    StructField("E", StringType),
    StructField("F", StringType),
    StructField("G", StringType),
    StructField("H", StringType)
  ))

    val conf = new SparkConf().setMaster("local[*]")

    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
            
     spark.read.schema(TABLE_SCHEME).parquet(path).rdd.take(10).foreach(row => {
      println(row)
    })

2.读取数据为空

看似任务已经搞定了,但是还有坑,设置 schema 时读取数据为 null,不设置 schema 时读取正常

设置 schema:

不设置 schema :

于是查找了设定 schema 的规则,当我们指定了 schema 时,相当于指定了每个对应位置字段的名称和类型,读取内容时参照如下规则:

指定的 schema 中的字段在 parquet 中默认的 schema 中不存在则返回 null :

null,null,null,null,null,null,null,null,null,null,null,null

指定的 schema 中字段在 parquet 中默认的 schema 中存在但类型不匹配,返回 false :

null,null,null,null,null,null,null,null,null,null,null,false

3.原因分析

添加 schema 后显示为 null,根据上述规则可以判定是设定的 schema 的列名在原始 parquet 的 schema 中不存在,相当于 map.getOrElse("col", null) 触发了 else 逻辑,通过原始 df 的 schema 方法可以获得原始 parquet 的 schema :

    spark.read.parquet(path).schema.map(x => {
      println(x.name, x.dataType)
    })
(_c0,StringType)
(_c1,StringType)
(_c2,StringType)
(_c3,StringType)
(_c4,StringType)
(_c5,StringType)

...

(_c41,StringType)
(_c42,StringType)

可以看到原始 schema 为 _1,_2 形式,而设置的 schema 为 A,B... 与之不匹配,所以得到的数据为 null,为什么不匹配,这个涉及到 parquet 的生成逻辑:

(1) 读取原始 hive 表,字段为 A,B,C... 生成 parquet 过程中未设置 schema ,从而存储的 parquet 采用了默认 schema 形式,即 _index 的形式

(2) 而读取 parquet 时认为 parquet 的 schema 为原始 hive 结构,所以设置了错误的 schema,从而导致 schema 规则不匹配,从而得到全 null 的数据

4.问题解决

A.存储 parquet

存储 parquet 时将 hive 表对应列名与属性的 schema 存入 parquet,这样读取时指定相同 schema 即可

B.读取 schema

数据预处理时可以调用 .schema 方法打印数据的 name 和 dataType,指定相同的 name 和 dataType,例如本例中将 A,B,C ... 替换为 _c0,_c1,_c2... 即可正常显示数据

四.总结

spark 加载 parquet 文件时最好可以手动指定 schema ,这样可以避免前期不必要的 merge 操作,优化代码执行速度,非常的好用 👍

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-16 17:44:46  更:2021-12-16 17:46:25 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 6:05:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码