| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Spark - parquet 加载时间过长 && parquet 指定 schema 无数据 -> 正文阅读 |
|
[大数据]Spark - parquet 加载时间过长 && parquet 指定 schema 无数据 |
一.引言Parquet 是一种列式存储格式,常用于表结构数据存储,也是 sparkSql 的默认存储格式。spark 读取 parquet 文件时,偶发读取时间过长,正常? parquet 时间在 1-5 s,异常期间最长可达 10 min +,于是开始踩坑之旅。下面是读取日志,正常情况只需 1s 以内,异常时却需要很久。 二.Parquet 读取问题定位与解决1.代码变化 && 数据变化1s 左右的加载时间和出现异常导致 10min + 的加载时间前后代码没有发生过变化,唯一的改变就是读取的 parquet 数量增加了一倍,总大小未改变。因此问题应该出在和 parquet 相关的代码上,这里执行的是官方的 api :
?所以解决问题就需要看下读取 parquet 的具体逻辑了。 2.Schema?Infer And Mergespark 读取 read parquet 时有一个参数 .schema 用于指定 parquet 的列名与列属性,如果直接调用 .read.parquet() 时,sparkSession 需要自己 infer parquet 的 schema , schema 可以理解为 parquet 每列数据的属性,包含 name,type 等等:
这里截距了一部分代码,其中 parquetOptions.mergeSchema?参数代表是否合并 schema,默认为 true,在 spark 中配置该参数:
这段代码还有一段解释含义大致如下 : 如果用户指定 mergeSchema?= false 参数,程序会首先尝试摘要文件,如果没有摘要文件,则返回某个随机的部分文件,所以当该参数为 false 时,infer 会随机选取 parquet 文件进行推断,如果 所有 parquet 文件格式一致则不受影响,如果不一致则会 infer 出错。 如果用户指定 mergeSchema = true? 参数,?则程序认为所有的部分文件都是相同的,此时如果出现 parquet 不匹配,例如名称匹配,类型不匹配时 infer 会报错。但是当文件过多时,如果串行执行效率会很低,所以这里采取并行处理的方式进行 schema 推断,也就是上面 infer 函数的最后一行 mergeSchemaInParallel :
这里给出部分代码,如果想要查看完整的 infer,mergeSchemaInParallel 代码可以查看 :
上述代码通过?parallelize +?mapPartitions 实现 schema 的并行推断,这里涉及到 fileStatus?信息与 Executor 的信息交互,最终合并得到?partiallyMergedSchemas[iterator],所有 parquet 得到的 scheme 构成了一个迭代器,然后通过可变变量 var finalSchema 进行遍历与合并,结果返回 Some(finalSchema),代表推断与merge的结果不可控。 3.原因分析上面只是简单看了源码的实现逻辑,结合前面的代码、数据变化,在 mergeShema 参数为 true 时 读取 parquet 中涉及和 parquet 数量有关的操作有下述: (1) parallel + mapPartitions:在 numParallelism 数量不变时,parquet 数量增加,则 mapPartitions 处理的 partition 数量增加,耗时增加 (2) fileStatus 与 Executor 的通信:parquet 数量增多时,FileStatus 增多,executor 通信量增大,耗时增加,查看运行异常日志发现任务所在机器已出现 IO 负载较大的情况: ?(3) merge:处理数据量增多,iterator 数量增加,merge 操作增加,耗时增加 4.问题解决基于上面耗时的分析,问题解决也很简单,减少 parquet 数量,避免 mergeSchema: A.?spark.sql.parquet.mergeSchema = false 最偷懒的办法,如果可以保证每个 parquet 数据的结构一致性,则可以取消该参数,只选用少量 parquet 文件进行 infer 推断,这样处理文件的数量变小,推断 schema 的速度也会相应增加。 B.?spark.sql.parquet.mergeSchema = true & 不指定 schema 该参数默认为 true,在不手动指定的情况下,可以人为缩减 parquet 数量,这样并行执行的效率会得到提升 C.指定 schema spark.read.parquet 方法支持直接传入 schema :
该方法会指定 Schema 从而避免后续复杂的 infer 阶段,从而跳过 parquet 阶段,直接进入下一个 action 算子。
三.Read?parquet 指定 Schema1.配置 schemaparquet 采用列式存储,相关的存储类型上一篇文章?Spark Sql:RDD - DF 互转?中已经提到过, 第一个参数代表列名,第二个参数代表列属性 :
2.读取数据为空看似任务已经搞定了,但是还有坑,设置 schema 时读取数据为 null,不设置 schema 时读取正常 设置 schema: 不设置 schema : 于是查找了设定 schema 的规则,当我们指定了 schema 时,相当于指定了每个对应位置字段的名称和类型,读取内容时参照如下规则: 指定的 schema 中的字段在 parquet 中默认的 schema 中不存在则返回 null :
指定的 schema 中字段在 parquet 中默认的 schema 中存在但类型不匹配,返回 false :
3.原因分析添加 schema 后显示为 null,根据上述规则可以判定是设定的 schema 的列名在原始 parquet 的 schema 中不存在,相当于 map.getOrElse("col", null) 触发了 else 逻辑,通过原始 df 的 schema 方法可以获得原始 parquet 的 schema :
可以看到原始 schema 为 _1,_2 形式,而设置的 schema 为 A,B... 与之不匹配,所以得到的数据为 null,为什么不匹配,这个涉及到 parquet 的生成逻辑: (1) 读取原始 hive 表,字段为 A,B,C... 生成 parquet 过程中未设置 schema ,从而存储的 parquet 采用了默认 schema 形式,即 _index 的形式 (2) 而读取 parquet 时认为 parquet 的 schema 为原始 hive 结构,所以设置了错误的 schema,从而导致 schema 规则不匹配,从而得到全 null 的数据 4.问题解决A.存储 parquet 存储 parquet 时将 hive 表对应列名与属性的 schema 存入 parquet,这样读取时指定相同 schema 即可 B.读取 schema 数据预处理时可以调用 .schema 方法打印数据的 name 和 dataType,指定相同的 name 和 dataType,例如本例中将 A,B,C ... 替换为 _c0,_c1,_c2... 即可正常显示数据 四.总结spark 加载 parquet 文件时最好可以手动指定 schema ,这样可以避免前期不必要的 merge 操作,优化代码执行速度,非常的好用 👍 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/17 6:05:14- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |