1、 小文件现象
文件大小只有几KB,数量有4800个。
2、小文件产生的原因
- 在使用spark sql处理数据的过程中,如果有shuffle产生,依赖于spark.sql.shuffle.partitions配置信息,默认为200,当处理的数据量比较大时,通常会把该值调大,以避免单个分区处理的数据太大出现异常或者拖慢整个任务的执行时间。
- 如果没有shuffle产生,文件的数量依赖于数据源的文件数量以及文件是否可切分等特性决定任务的并发度即task数量,如果在进行数据清洗转换或者的过程中通常不会涉及shuffle,此时会产生很多小文件,造成资源的浪费,给NameNode增加压力。
- spark sql 语句中使用union all
通过源码分析,RDD在调用union算子时,最终产生的RDD分区数分两种情况:
- union的RDD分区器已定义并且它们的分区器相同
多个父RDD具有相同的分区器,union后产生的RDD的分区器与父RDD相同切分区数也相同。比如:n个RDD的分区器相同都是defined,分区数是m个。那么这n个RDD最终union生成的一个RDD的分区数仍是m,分区器也是相同的。 - 不满足第一种情况,则通过union生成的RDD的分区数为父RDD的分区数之和。
3、小文件的危害
大量的小文件会影响Hadoop集群管理或者Spark在处理数据时的稳定性:
- 1.Spark SQL写Hive或者直接写入HDFS,过多的小文件会对NameNode内存管理等产生巨大的压力,会影响整个集群的稳定运行
- 2.容易导致task数过多,如果超过参数 spark.driver.maxResultSize的配置(默认1g),会抛出类似如下的异常,影响任务的处理
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 478 tasks (2026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
当然可以通过调大spark.driver.maxResultSize的默认配置来解决问题,但如果不能从源头上解决小文件问题,以后还可能遇到类似的问题。
此外,Spark在处理任务时,一个分区分配一个task进行处理,多个分区并行处理,虽然并行处理能够提高处理效率,但不是意味着task数越多越好。如果数据量不大,过多的task运行反而会影响效率。
4、如何解决小文件问题
- 降低spark.sql.shuffle.partitions配置的值,但会影响处理的并发度
- 使用repartition和coalesce 根据经验调整分区数的大小,但是太不灵活,如果使用spark-sql cli方式,就很不方便
- 在数据入库的时候使用distribute by 字段或者rand(),但是此时对字段的选择就需要慎重
- spark sql adaptive 自适应框架
4.1 spark-sql adaptive框架解决小文件问题
spark.sql.adaptive.enabled true
spark.sql.adaptive.minNumPostShufflePartitions 10
spark.sql.adaptive.maxNumPostShufflePartitions 2000
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 134217728
spark.sql.adaptive.shuffle.targetPostShuffleRowCount 10000000
- 必须要触发shuffle,如果任务中只有map task,需要通过group by 或者distribute 触发shuffle的执行,只有触发shuffle,才能使用adaptive解决小文件问题
4.2 举例
spark - submit \
--conf spark.sql.adaptive.enabled = true \
--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize = 134217728 \
--conf spark.sql.auto.repartition = true \
标题一中的小文件,处理后数量如下: 配置文件
spark - submit \
--conf spark.sql.adaptive.enabled = true \
--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize = 134217728 \
--conf spark.sql.auto.repartition = true \
# 以下配置是针对join操作进行的性能优化
--conf spark.sql.adaptive.join.enabled = true \
--conf spark.sql.adaptive.skewJoin.enabled = true \
--conf spark.shuffle.consolidateFiles = true \
--conf spark.shuffle.service.enabled = true \
--conf spark.sql.adaptive.allowAdditionalShuffle = true
5、小文件过多带来的问题
WARNING: Failed to connect to /172.16.xx.xx:9866 for block, add to deadNodes and continue. java.net.SocketException: Too many open files
|