IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark学习笔记(5)——RDD的创建 -> 正文阅读

[大数据]Spark学习笔记(5)——RDD的创建

默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能 够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。
但是切分任务的数量不一定等于并行执行的任务数量,比如当节点资源数量小于切分数量时。
在 Spark 中创建 RDD 的创建方式可以分为四种:

一、从集合(内存)中创建 RDD

1.从集合中创建RDD

使用makeRDD方法

    //*号表示本地环境中最大可用核数,采用多线程模拟集群环境
    var sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    //从内存中创建RDD,将内存中集合的数据作为处理的数据源
    val seq = Seq[Int](1,2,3,4)
    //parallelize并行化集合操作是根据一个已经存在的Scala集合和当前系统环境创建的RDD对象。
    // 集合的里面的元素将会被拷贝进入新创建出的一个可被并行操作的分布式数据集。
    //val rdd: RDD[Int] = sc.parallelize(seq)
    //makeRDD里调用的还是parallelize
    val rdd: RDD[Int] = sc.makeRDD(seq)
    rdd.collect().foreach(println)
    sc.stop()

2.分区数量是怎样确定的

从集合(内存)中创建 RDD时分区数量首先由makeRDD传入的第二个参数(可选)指定

  def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }

若没有传,则由defaultParallelism(配置对象(SparkConf)中spark.default.parallelism的值)指定,若此值没有事先设置,则取默认值totalCores(当前运行环境的最大可用核数)
在这里插入图片描述

    //*号表示本地环境中最大可用核数,采用多线程模拟集群环境
    var sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    sparkConf.set("spark.default.parallelism","5")
    val sc = new SparkContext(sparkConf)
    //RDD的并行度 & 分区
    //makeRDD方法可以传递第二个参数,这个参数表示分区的数量。如果不传则方法会使用默认并行度defaultParallelism
    //  scheduler.conf.getInt("spark.default.parallelism", totalCores)
    //  spark在默认情况下会从配置对象sparkConf中获取配置参数spark.default.parallelism
    //  如果获取不到,那么使用totalCores属性,这个属性值为当前运行环境的最大可用核数
    val rdd: RDD[Int] = sc.makeRDD(
      List(1, 2, 3, 4)2
    )
    //将处理的数据保存成分区文件
    rdd.saveAsTextFile("output")
    sc.stop()

3.从集合(内存)中创建RDD时的数据划分

makeRDD中最终实现数据划分的方法核心源码如下

    def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      }
    }

这个方法传入了两个参数,一个是数据集的长度length,一个是分区数量numSlices,结合可以看到一个编号为i分区取的数据集范围就是
[start,end),其中
start = ((i * length) / numSlices).toInt
end = (((i + 1) * length) / numSlices).toInt

二、从外部存储(文件)创建 RDD

由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。

1.从文件中创建RDD

path路径默认以当前环境的根路径为基准。所以可以写绝对路径,也可以写相对路径

//val rdd: RDD[String] = sc.textFile("D:\\IdeaProjects\\syc-classes\\datas\\1.txt")
val rdd: RDD[String] = sc.textFile("datas/1.txt")

另外path也可以是一个目录的路径,可一次读取多个文件

val rdd: RDD[String] = sc.textFile("datas")

path路径还可以使用通配符,只读取一些特定的文件

val rdd: RDD[String] = sc.textFile("datas/1*.txt")

path还可以是分布式存储系统路径:HDFS

val rdd: RDD[String] = sc.textFile("hdfs://linux1:8080/text.txt")

从文件中创建RDD的两个方法:testFile()与wholeTextFiles()
textFile:以行为单位读取数据,不考虑文件来自哪里
wholeTextFiles:以文件为单位读取数据,读取的结果表示为元组,其中第一个元素表示文件路径,第二个元素表示文件内容

2.分区数量是怎样确定的

Spark读取文件底层方法其实采用的是Hadoop的读取方法,因此这里的分区和数据划分可参考Hadoop的切片机制

(1)首先确认最小分区数量

从文件中创建RDD时分区数量首先看testFile()传入的第二个参数(可选),此参数为最小分区数量。

  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

若没有传,则取defaultParallelism(上文已解释此值,此处同)和2中的较小值为最小分区数量
在这里插入图片描述

(2) 然后计算最终实际分区数量

Spark读取文件底层方法其实采用的是Hadoop的读取方法,在org.apache.hadoop.mapred.FileInputFormat类的getSplits()方法中,拿文件总的字节数除以最小分区数量然后取整,得到一个值goalSize,若剩余的字节数小于goalSize的10%,则最终分区数量就是上一步得出的最小分区数量,否则需再增加一个分区。

:以如下文件datas/1.txt文件为例
在这里插入图片描述
这里用@@表示回车换行符,占两个字节,故该文件共7个字节。

1@@
2@@
3

设创建RDD的代码如下,故得最小分区数为2

val rdd: RDD[String] = sc.textFile("datas/1.txt",2)

7/2=3…1,故得goalSize值为3,余数为1,余数大于goalSize的10%,故得最终分区数为3个,分区编号分别为0,1,2

3. 从外部存储(文件)创建 RDD时的数据划分

(1) 计算数据分区的偏移量范围

数据是以偏移量的形式来划分的,1.txt文件中每个字符对应的偏移量为

1@@ => 012
2@@ => 345
3 => 6

上一步计算出的goalSize值3即为偏移量数,故得每个分区的数据偏移量范围

分区号 偏移量范围
0 [0,3]
1 [3,6]
2 [6,7]

(2) 按行读取数据,已读取过的数据不重复读

Spark读取文件是按行读取的,对于一行数据要么不读要么整行全部读取,不存在一行只读取部分数据。因此在给0号分区划分数据时,读到偏移量为3的字符2后,还会继续把这行数据读完。并且已读取过的数据不重复读,故1号分区的数据只有偏移量为6的字符3所在的这一行,2号分区就没有分到数据了,故各个分区最终划分的数据如下
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

可以注意到数据读取到各个分区文件后回车换行符发生了一些变化,这是因为分区的过程中对这些符号进行了一些处理,这并不影响我们的实际数据

三、从其他 RDD 创建

主要是通过一个 RDD 运算完后,再产生新的 RDD。详情请参考后续章节

四、直接创建 RDD(new)

使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-31 16:42:40  更:2021-07-31 16:42:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/5 2:12:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码