Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于 处理不同的应用场景。三大数据结构分别是:
- RDD : 弹性分布式数据集
- 累加器:分布式共享只写变量
- 广播变量:分布式共享只读变量
1. RDD
1.1. 什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合
- 弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片;
- 分布式:数据存储在大数据集群不同节点上
- 数据集:RDD 封装了计算逻辑,并不保存数据
- 数据抽象:RDD 是一个抽象类,需要子类具体实现
- 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
- 可分区、并行计算
1.2. 核心属性
1.2.1. 分区列表
RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性
1.2.2. 分区计算函数
Spark 在计算时,是使用分区函数对每一个分区进行计算
1.2.3. RDD 之间的依赖关系
RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系
1.2.4. 分区器(可选)
当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区
1.2.5. 首选位置(可选)
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
1.3. 执行原理
- 从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合
- Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果
RDD 是 Spark 框架中用于数据处理的核心模型,接下来我们看看,在 Yarn 环境中,RDD 的工作原理
1.3.1. 启动Yarn集群环境
1.3.2. Spark 通过申请资源创建调度节点和计算节点
1.3.3. Spark 框架根据需求将计算逻辑根据分区划分成不同的任务
1.3.4. 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给 Executor 节点执行计算,接下来我们就一起看看 Spark 框架中 RDD 是具体是如何进行数据 处理的
1.4. 基础编程
1.4.1. RDD创建
在 Spark 中创建 RDD 的创建方式可以分为四种
1.4.1.1. 从集合(内存)中创建 RDD
从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD
package com.michael.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object RDDCreate {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(List(1,2,3,4))
val rdd2 = sparkContext.makeRDD(List(1,2,3,4))
rdd1.collect().foreach(print)
println()
rdd2.collect().foreach(print)
sparkContext.stop()
}
}
从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
1.4.1.1. 从外部存储(文件)创建 RDD
由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase、S3等
package com.michael.core.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RDDCreateFromFile {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD:RDD[String] = sparkContext.textFile("spark-core/src/main/resources/michael.txt")
fileRDD.collect().foreach(print)
sparkContext.stop()
}
}
1.4.1.1. 从其他 RDD 创建
主要是通过一个 RDD 运算完后,再产生新的 RDD
1.4.1.1. 直接创建 RDD(new)
使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用
1.4.2. RDD并行度与分区
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了
package com.michael.core.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RDDParallelize {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val dataRDD:RDD[Int] = sparkContext.makeRDD(
List(1,2,3,4),
4)
val fileRDD:RDD[String] = sparkContext.textFile(
"spark-core/src/main/resources/michael.txt",
2)
dataRDD.collect().foreach(print)
println()
fileRDD.collect().foreach(print)
sparkContext.stop()
}
}
读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的 Spark 核心源码如下
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数 据读取的规则有些差异,具体 Spark 核心源码如下
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
long totalSize = 0;
for (FileStatus file: files) {
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
for (FileStatus file: files) {
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
}
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
1.4.3. RDD转换算子
RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 型
1.4.3.1 Value类型
1.4.3.1.1 map
def map[U: ClassTag](f: T => U): RDD[U]
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换
val dataRDD:RDD[Int] = sparkContext.makeRDD(List(1,2,3,4,5,6))
val dataRDD1:RDD[Int] = dataRDD.map(_*2)
val dataRDD2:RDD[String] = dataRDD1.map(_+"")
1.4.3.1.2 mapPartitions
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据
val dataRDD:RDD[Int] = sparkContext.makeRDD(List(1,2,3,4,5,6))
val dataRDD1:RDD[Int] = dataRDD.mapPartitions(
datas => {
datas.filter(_%2 == 0)
})
- Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作
- Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
- Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,推荐使用 map 操作。完成比完美更重要
1.4.3.1.3 mapPartitionsWithIndex
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引
1.4.3.1.4 flatMap
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
1.4.3.1.5 glom
1.4.3.1.6 groupBy
1.4.3.1.7 filter
1.4.3.1.8 sample
1.4.3.1.9 distinct
1.4.3.1.10 coalesce
1.4.3.1.11 repartition
1.4.3.1.12 sortBy
1.4.3.2 双Value类型
1.4.3.2.1 intersection
1.4.3.2.2 union
1.4.3.2.3 subtract
1.4.3.2.4 zip
1.4.3.3 Key - Value 类型
1.4.3.3.1 partitionBy
1.4.3.3.2 reduceByKey
1.4.3.3.3 groupByKey
1.4.3.3.4 aggregateByKey
1.4.3.3.5 foldByKey
1.4.3.3.6 combineByKey
1.4.3.3.7 join
1.4.3.3.8 leftOuterJoin
1.4.3.3.9 cogroup
|