IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark从入门到精通02之Spark核心编程 -> 正文阅读

[大数据]Spark从入门到精通02之Spark核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于
处理不同的应用场景。三大数据结构分别是:

  • RDD : 弹性分布式数据集
  • 累加器:分布式共享只写变量
  • 广播变量:分布式共享只读变量

1. RDD

1.1. 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合

  • 弹性
    • 存储的弹性:内存与磁盘的自动切换;
    • 容错的弹性:数据丢失可以自动恢复;
    • 计算的弹性:计算出错重试机制;
    • 分片的弹性:可根据需要重新分片;
  • 分布式:数据存储在大数据集群不同节点上
  • 数据集:RDD 封装了计算逻辑,并不保存数据
  • 数据抽象:RDD 是一个抽象类,需要子类具体实现
  • 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
  • 可分区、并行计算

1.2. 核心属性

在这里插入图片描述

1.2.1. 分区列表

RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性
在这里插入图片描述

1.2.2. 分区计算函数

Spark 在计算时,是使用分区函数对每一个分区进行计算
在这里插入图片描述

1.2.3. RDD 之间的依赖关系

RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系
在这里插入图片描述

1.2.4. 分区器(可选)

当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区
在这里插入图片描述

1.2.5. 首选位置(可选)

计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
在这里插入图片描述

1.3. 执行原理

  • 从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合
  • Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果

RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD
的工作原理

1.3.1. 启动Yarn集群环境

在这里插入图片描述

1.3.2. Spark 通过申请资源创建调度节点和计算节点

在这里插入图片描述

1.3.3. Spark 框架根据需求将计算逻辑根据分区划分成不同的任务

在这里插入图片描述

1.3.4. 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

在这里插入图片描述

从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给
Executor 节点执行计算,接下来我们就一起看看 Spark 框架中 RDD 是具体是如何进行数据
处理的

1.4. 基础编程

1.4.1. RDD创建

在 Spark 中创建 RDD 的创建方式可以分为四种

1.4.1.1. 从集合(内存)中创建 RDD

从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD

package com.michael.core.rdd

import org.apache.spark.{SparkConf, SparkContext}

object RDDCreate {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    val sparkContext = new SparkContext(sparkConf)

    val rdd1 = sparkContext.parallelize(List(1,2,3,4))
    val rdd2 = sparkContext.makeRDD(List(1,2,3,4))

    rdd1.collect().foreach(print) // 1234
    println()
    rdd2.collect().foreach(print) // 1234
    sparkContext.stop()
  }
}

从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法

def makeRDD[T: ClassTag](
	seq: Seq[T],
	numSlices: Int = defaultParallelism): RDD[T] = withScope {
	parallelize(seq, numSlices)
}

1.4.1.1. 从外部存储(文件)创建 RDD

由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase、S3等

package com.michael.core.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDDCreateFromFile {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    val sparkContext = new SparkContext(sparkConf)

    val fileRDD:RDD[String] = sparkContext.textFile("spark-core/src/main/resources/michael.txt")
    fileRDD.collect().foreach(print) // michael testjoy testspark testscala testhello sparkhi scalasparkcore learnhere sparkSQL
    sparkContext.stop()
  }
}

1.4.1.1. 从其他 RDD 创建

主要是通过一个 RDD 运算完后,再产生新的 RDD

1.4.1.1. 直接创建 RDD(new)

使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用

1.4.2. RDD并行度与分区

默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了

package com.michael.core.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDDParallelize {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    val sparkContext = new SparkContext(sparkConf)

    val dataRDD:RDD[Int] = sparkContext.makeRDD(
      List(1,2,3,4),
      4)
    val fileRDD:RDD[String] = sparkContext.textFile(
      "spark-core/src/main/resources/michael.txt",
      2)

    dataRDD.collect().foreach(print) // 1234
    println()
    fileRDD.collect().foreach(print) // michael testjoy testspark testscala testhello sparkhi scalasparkcore learnhere sparkSQL
    sparkContext.stop()
  }
}

读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的
Spark 核心源码如下

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
	(0 until numSlices).iterator.map { i =>
		val start = ((i * length) / numSlices).toInt
		val end = (((i + 1) * length) / numSlices).toInt
		(start, end)
	}
}

读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数
据读取的规则有些差异,具体 Spark 核心源码如下

public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
	long totalSize = 0; // compute total size
	for (FileStatus file: files) { // check we have valid files
		if (file.isDirectory()) {
		throw new IOException("Not a file: "+ file.getPath());
		}
		totalSize += file.getLen();
	}
	long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
	long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
	FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

	for (FileStatus file: files) {
	
		if (isSplitable(fs, path)) {
		long blockSize = file.getBlockSize();
		long splitSize = computeSplitSize(goalSize, minSize, blockSize);
	
		}
		protected long computeSplitSize(long goalSize, long minSize,
		long blockSize) {
		return Math.max(minSize, Math.min(goalSize, blockSize));
}

1.4.3. RDD转换算子

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value

1.4.3.1 Value类型

1.4.3.1.1 map
def map[U: ClassTag](f: T => U): RDD[U]

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换

    val dataRDD:RDD[Int] = sparkContext.makeRDD(List(1,2,3,4,5,6))
    val dataRDD1:RDD[Int] = dataRDD.map(_*2)
    val dataRDD2:RDD[String] = dataRDD1.map(_+"")
1.4.3.1.2 mapPartitions
def mapPartitions[U: ClassTag](
	f: Iterator[T] => Iterator[U],
	preservesPartitioning: Boolean = false): RDD[U]

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据

    val dataRDD:RDD[Int] = sparkContext.makeRDD(List(1,2,3,4,5,6))
    val dataRDD1:RDD[Int] = dataRDD.mapPartitions(
      datas => {
        datas.filter(_%2 == 0) // 246
      })
  • Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作
  • Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
  • Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,推荐使用 map 操作。完成比完美更重要
1.4.3.1.3 mapPartitionsWithIndex
def mapPartitionsWithIndex[U: ClassTag](
	f: (Int, Iterator[T]) => Iterator[U],
	preservesPartitioning: Boolean = false): RDD[U]

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引

// todo
1.4.3.1.4 flatMap
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

// todo
1.4.3.1.5 glom

1.4.3.1.6 groupBy

1.4.3.1.7 filter

1.4.3.1.8 sample

1.4.3.1.9 distinct

1.4.3.1.10 coalesce

1.4.3.1.11 repartition

1.4.3.1.12 sortBy

1.4.3.2 双Value类型

1.4.3.2.1 intersection

1.4.3.2.2 union

1.4.3.2.3 subtract

1.4.3.2.4 zip

1.4.3.3 Key - Value 类型

1.4.3.3.1 partitionBy

1.4.3.3.2 reduceByKey

1.4.3.3.3 groupByKey

1.4.3.3.4 aggregateByKey

1.4.3.3.5 foldByKey

1.4.3.3.6 combineByKey

1.4.3.3.7 join

1.4.3.3.8 leftOuterJoin

1.4.3.3.9 cogroup

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-12 23:31:26  更:2021-10-12 23:33:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 8:05:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码