第9章 Spark基础编程
9.1 RDD的创建
- 在
Spark 中创建 RDD 的创建方式可以分为四种:
9.1.1 从集合(内存)中创建 RDD
- 从
集合 中创建 RDD ,Spark 主要提供了两个方法:parallelize 和 makeRDD ,但后续创建RDD时推荐直接使用makeRDD 。
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 = sparkContext.parallelize(
List(1,2,3,4)
)
val rdd2 = sparkContext.makeRDD(
List(1,2,3,4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
sparkContext.stop()
- 注:
makeRDD 方法其实就是调用的 parallelize 方法。因此后续创建RDD时可以直接选用makeRDD
9.1.2 从外部存储(文件)创建 RDD
- 使用
textFile 函数从外部存储(文件)创建 RDD - 由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如
HDFS 、HBase 等。
val rdd1: RDD[String] = sc.textFile("datas/1.txt")
val rdd2: RDD[String] = sc.textFile("datas")
val rdd3 = sc.textFile("hdfs://hadoop100:8020/hello.txt")
rdd1.collect().foreach(println)
- 如果想知道输出的数据来自于那个文件,可以使用
wholeTextFiles
val rdd = sc.wholeTextFiles( path = "datas")
9.1.3 从其他 RDD 创建
- 主要是通过一个 RDD 运算完后,再产生新的 RDD。详情请参考后续章节
9.1.4 直接创建 RDD(new)
- 使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。
9.2 RDD 并行度与分区
- 默认情况下,
Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度 。这个数量可以在构建 RDD 时指定。 - 注:这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
sparkConf.set("spark.default.parallelism", 5)
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
rdd.saveAsTextFile( path ="output")
- 读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的 Spark 核心源码如下:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
- 读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体 Spark 核心源码如下:
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
long totalSize = 0;
for (FileStatus file: files) {
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
...
for (FileStatus file: files) {
...
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
...
}
}
}
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
9.3 transformation算子
transformation算子 又称为 转换算子 ,transformation算子 通过对现有 RDD 中的每个元素应用转换逻辑来生成新的 RDD 。一些转换函数可以对元素进行拆分、过滤掉某些元素或按某种排序计算。可以按顺序执行多个transformation算子 。但在transformation 阶段,不会执行任何真正的任务。
transformation算子 清单: 9.3.1-----map 9.3.2-----mapPartitions 9.3.3-----mapPartitionsWithIndex 9.3.4-----flatMap 9.3.5-----glom 9.3.6-----groupBy 9.3.7-----filter 9.3.8-----sample 9.3.9-----distinct 9.3.10-----coalesce 9.3.11-----9.3.12----- 9.3.13-----9.3.14----- 9.3.15-----9.3.16----- 9.3.17-----9.3.18----- 9.3.19-----9.3.20----- 9.3.21-----9.3.22----- 9.3.23-----9.3.24----- 9.3.25-----``
9.3.1—map
map(func)
- (2) 函数说明
map 将源数据中的每个元素都传递给函数func, 返回一个新的分布式数据集。
val dataRDD: RDD[Int] = sparkContext.textFile("wiki.txt")
val dataRDD1: RDD[Int] = dataRDD.map(
num => {
num * 2
}
)
9.3.2—mapPartitions
mapPartitions(func)
- (2) 函数说明
mapPartitions 与map 类似,将待处理的数据以分区 为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。如果RDD 的类型为T ,则返回的func 的类型为Iteratoe<T> => Iteratoe<T>
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
datas => {
datas.filter(_==2)
}
)
注1:map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。 注2:在内存有限的情况下,不推荐使用mapPartitions ,应使用map 操作。
9.3.3—mapPartitionsWithIndex
mapPartitionsWithIndex(func)
- (2) 函数说明
mapPartitionsWithIndex 与mapPartitions 类似,但会为fun函数提供一个整数值来表示分区分区的索引。当RDD 的类型为T ,则返回的func 的类型为Iteratoe<T> => Iteratoe<T>
val dataRDD1 = dataRDD.mapPartitionsWithIndex(
(index, datas) => {
datas.map(index, _)
}
)
9.3.4 flatMap (重要)
flatMap(func)
- (2) 函数说明
flatMap 与map 类似,但每个输入元素都可映射到0或多个输出元素(即这里的 func 会返回一个 seq 而非单个元素)。也即将处理的数据进行扁平化后再进行映射处理
val dataRDD = sparkContext.makeRDD(
List(List(1,2), List(3,4)), 1)
val dataRDD1 = dataRDD.flatMap(
list => list
)
9.3.5—glom
glom()
- (2) 函数说明
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
val dataRDD = sparkContext.makeRDD(List(1,2,3,4), 1)
val dataRDD1:RDD[Array[Int]] = dataRDD.glom()
9.3.6—groupBy
groupBy(func)
- (2) 函数说明
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中 一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
val dataRDD = sparkContext.makeRDD(List(1,2,3,4), 1)
val dataRDD1 = dataRDD.groupBy(
_%2
)
9.3.7—filter
filter(func)
- (2) 函数说明
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现 数据倾斜 。
val dataRDD = sparkContext.makeRDD(List(1,2,3,4), 1)
val dataRDD1 = dataRDD.filter(_%2 == 0)
9.3.8—sample
sample(withReplacemment, fraction, seed)
- (2) 函数说明
根据指定的规则从数据集中抽取数据。数据的部分采样,包含或者不包含替换,使用了给定的随机数字生成器 seed 。
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4
),1)
val dataRDD1 = dataRDD.sample(false, 0.5)
val dataRDD2 = dataRDD.sample(true, 2)
9.3.9—distinct
distinct([numTasks])
- (2) 函数说明
返回一个新的数据集,其中包含源数据集中的不同元素。
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),1)
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)
9.1.10—coalesce
coalesce(numPartitions)
- (2) 函数说明
将 RDD 的分区数量减至 numPartitions ,这样可将一个大数据集过滤成小数据集,从而执行更高效的操作。 注:当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少 分区的个数,减小任务调度成本
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)
声明:本文是学习时记录的笔记,如有侵权请告知删除! 原视频地址:https://www.bilibili.com/video/BV11A411L7CK
|