IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark读取zip文件原理与详解 -> 正文阅读

[大数据]Spark读取zip文件原理与详解

import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream

import org.apache.spark.input.PortableDataStream

val dataAndPortableRDD = sc.binaryFiles("zipData path")

val dataRDD = dataAndPortableRDD.flatMap { case (name: String, content: PortableDataStream) =>
  val zis = new ZipInputStream(content.open)
  Stream.continually(zis.getNextEntry)
    .takeWhile(_ != null)
    .flatMap { _ =>
      //注意,这里非并行
      val br = new BufferedReader(new InputStreamReader(zis))
      Stream.continually(br.readLine()).takeWhile(_ != null)
     //实际上,还要关br和new InputStreamReader(zis)
    }#::: { zis.close; Stream.empty[String] }//不加#::: { zis.close; Stream.empty[String] }会不关闭连接
}

dataRDD.take(10).foreach(println)


//另一种
   sc.binaryFiles(path).repartition(2000)
      .flatMap{
        case (zipFilePath: String, context:PortableDataStream) => {
            val zis = new ZipInputStream(context.open())
            Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap(zipEn => {
                if(!zipEn.isDirectory) {
                  scala.io.Source.fromInputStream(zis,"UTF-8").getLines()
                }else {
                  None
                }
              })#::: { zis.close; Stream.empty[String]}
        }
      }.repartition(2000)

问题:

当文件比较大时,效率特别慢,比如:

13G的文件读取,50核300G内存情况下,5h还不能处理完(实际上,是因为没有关闭连接而卡住导致的)

spark2.3的官方文档中也写到:

Partitioning is determined by data locality. This may result in too few partitions by default.

  /** Returns the longest prefix of this `Stream` whose elements satisfy the
   * predicate `p`.
   *
   * @param p the test predicate.
   * @return A new `Stream` representing the values that satisfy the predicate
   * `p`.
   *
   * @example {{{
   + naturalsFrom(0) takeWhile { _ < 5 } mkString ", "
   * produces: "0, 1, 2, 3, 4"
   * }}}
   */
  override def takeWhile(p: A => Boolean): Stream[A] =
    if (!isEmpty && p(head)) cons(head, tail takeWhile p)
    else Stream.Empty

binaryFile:https://spark.apache.org/docs/2.0.2/api/scala/index.html#org.apache.spark.SparkContext

zipInputStream:

https://docs.oracle.com/javase/7/docs/api/java/util/zip/ZipInputStream.html\

PortableDataStream:

https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.input.PortableDataStream

Stream:

https://zhuanlan.zhihu.com/p/130958554

分区问题:

https://www.coder.work/article/6527071

不关闭会出问题,表现为大文件时处理完后卡住不动

https://stackoverflow.com/questions/32080475/how-to-read-a-zip-containing-multiple-files-in-apache-spark

最佳答案

Spark 2.4+?,问题应该得到解决,请参阅此答案下方的@Rahul 评论。

Spark 2.1-2.3?minPartitions?的?binaryFiles()?参数被忽略。请参阅?Spark-16575?和?commit changes to function setMinPartitions()?。请注意提交更改如何在函数中不再使用?minPartitions!

如果您使用?binaryFiles()?读取多个二进制文件,则输入文件将根据以下内容合并到分区中:

  • spark.files.maxPartitionBytes?,默认 128 MB
  • spark.files.openCostInBytes?,默认 4 MB
  • spark.default.parallelism
  • 输入的总大小
  • here?描述了前三个配置项。查看上面的提交更改以查看实际计算。

    我有一个场景,我希望每个输入分区最多 40 MB,因此每个任务 40 MB……以在解析时增加并行度。 (Spark 将 128 MB 放入每个分区,降低了我的应用程序的速度。)我在调用?spark.files.maxPartitionBytes?之前将?binaryFiles()?设置为 40 M:
    spark = SparkSession \
       .builder \
       .config("spark.files.maxPartitionBytes", 40*1024*1024)
    

    对于只有一个输入文件,@user9864979 的答案是正确的:不能仅使用?binaryFiles()?将单个文件拆分为多个分区。

    使用?Spark 1.6?读取多个文件时,minPartitions?参数确实有效,您必须使用它。如果不这样做,您将遇到?Spark-16575?问题:您的所有输入文件将仅被读入两个分区!

    您会发现 Spark 通常会为您提供比您要求的更少的输入分区。我有一个场景,我希望每两个输入二进制文件有一个输入分区。我发现将?minPartitions?设置为“输入文件数 * 7/10”大致可以满足我的需求。我有另一种情况,我希望每个输入文件有一个输入分区。我发现将?minPartitions?设置为“输入文件数 * 2”给了我想要的。

    Spark 1.5?binaryFiles()?的行为:每个输入文件都有一个分区。
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-29 09:09:56  更:2021-08-29 09:27:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:51:53-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码