IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 6.Spark核心编程—RDD的创建、RDD 并行度与分区、transformation算子 -> 正文阅读

[大数据]6.Spark核心编程—RDD的创建、RDD 并行度与分区、transformation算子

第9章 Spark基础编程

9.1 RDD的创建

  • Spark 中创建 RDD 的创建方式可以分为四种:

9.1.1 从集合(内存)中创建 RDD

  • 集合中创建 RDD,Spark 主要提供了两个方法:parallelizemakeRDD,但后续创建RDD时推荐直接使用makeRDD
// TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)

// TODO 创建RDD
// 从内存中创建RDD,将内存中集合的数据作为处理的数据源
val rdd1 = sparkContext.parallelize(
	List(1,2,3,4)
)
val rdd2 = sparkContext.makeRDD(
	List(1,2,3,4)
)

rdd1.collect().foreach(println)
rdd2.collect().foreach(println)

// TODO 关闭环境
sparkContext.stop()
  • 注:makeRDD 方法其实就是调用的 parallelize 方法。因此后续创建RDD时可以直接选用makeRDD

9.1.2 从外部存储(文件)创建 RDD

  • 使用textFile函数从外部存储(文件)创建 RDD
  • 由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFSHBase 等。
// TODO 创建RDD
// 从文件中创建RDD,将文件中的数据作为处理的数据源
// path路径默认以当前环境的根路径为基准。可以写绝对路径,也可以写相对路径
val rdd1: RDD[String] = sc.textFile("datas/1.txt")
// path路径可以是文件的具体路径,也可以目录名称
val rdd2: RDD[String] = sc.textFile("datas")
// path还可以是分布式存储系统路径: HDFS
val rdd3 = sc.textFile("hdfs://hadoop100:8020/hello.txt")

rdd1.collect().foreach(println)
  • 如果想知道输出的数据来自于那个文件,可以使用wholeTextFiles
// textFile :以行为单位来读取数据,读取的数据都是字符串
// wholeTextFiles :以文件为单位读取数据
//读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
val rdd = sc.wholeTextFiles( path = "datas")

9.1.3 从其他 RDD 创建

  • 主要是通过一个 RDD 运算完后,再产生新的 RDD。详情请参考后续章节

9.1.4 直接创建 RDD(new)

  • 使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。

9.2 RDD 并行度与分区

  • 默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。
  • 注:这里的并行执行的任务数量,并不是指的切分任务的数量,不要混淆了。
// TODO 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
sparkConf.set("spark.default.parallelism", 5)
val sc = new SparkContext(sparkConf)

// TODO 创建RDD
// RDD的并行度&分区
// makeRDD方法可以传递第二个参数,这个参数表示分区的数量
// 第二个参数可以不传递的,那么makeRDD方法会使用默认值 : defaultParallelism(默认并行度)
//      scheduler. conf.getInt("spark.default.parallelism ", totalCores)
//      spark在默认情况下,从配置对象中获取配置参数: spark.default.parallelism
//      如果获取不到,那么使用totalCores属性,这个属性取值为当前环境的最大可用核数
// val rdd = sc.makeRDD(List(1,2,3,4), 2)
val rdd = sc.makeRDD(List(1,2,3,4))

//将处理的数据保存成分区文件
rdd.saveAsTextFile( path ="output")
  • 读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的 Spark 核心源码如下:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
	(0 until numSlices).iterator.map { i =>
		val start = ((i * length) / numSlices).toInt
		val end = (((i + 1) * length) / numSlices).toInt
		(start, end)
	}
}
  • 读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体 Spark 核心源码如下:
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
	long totalSize = 0; // compute total size
	for (FileStatus file: files) { // check we have valid files
		if (file.isDirectory()) {
			throw new IOException("Not a file: "+ file.getPath());
		}
		totalSize += file.getLen();
	}
	long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
	long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
	FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
 
	...
 
	for (FileStatus file: files) {
 
		...
 
 		if (isSplitable(fs, path)) {
 			long blockSize = file.getBlockSize();
 			long splitSize = computeSplitSize(goalSize, minSize, blockSize);
 			...
		}
	}
}
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
	return Math.max(minSize, Math.min(goalSize, blockSize));
}

9.3 transformation算子

  • transformation算子 又称为 转换算子transformation算子通过对现有 RDD 中的每个元素应用转换逻辑来生成新的 RDD。一些转换函数可以对元素进行拆分、过滤掉某些元素或按某种排序计算。可以按顺序执行多个transformation算子。但在transformation阶段,不会执行任何真正的任务。

transformation算子 清单:
9.3.1-----map
9.3.2-----mapPartitions
9.3.3-----mapPartitionsWithIndex
9.3.4-----flatMap
9.3.5-----glom
9.3.6-----groupBy
9.3.7-----filter
9.3.8-----sample
9.3.9-----distinct
9.3.10-----coalesce
9.3.11-----9.3.12-----
9.3.13-----9.3.14-----
9.3.15-----9.3.16-----
9.3.17-----9.3.18-----
9.3.19-----9.3.20-----
9.3.21-----9.3.22-----
9.3.23-----9.3.24-----
9.3.25-----``


9.3.1—map

  • (1) 函数签名
map(func)
  • (2) 函数说明
    map将源数据中的每个元素都传递给函数func, 返回一个新的分布式数据集。
// val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD: RDD[Int] = sparkContext.textFile("wiki.txt")
val dataRDD1: RDD[Int] = dataRDD.map(
	num => {
		num * 2
	}
)

9.3.2—mapPartitions

  • (1) 函数签名
mapPartitions(func)
  • (2) 函数说明
    mapPartitionsmap类似,将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。如果RDD的类型为T,则返回的func的类型为Iteratoe<T> => Iteratoe<T>
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
	datas => {
		datas.filter(_==2)
	}
)

注1:map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions算子是以分区为单位进行批处理操作。
注2:在内存有限的情况下,不推荐使用mapPartitions,应使用map操作。


9.3.3—mapPartitionsWithIndex

  • (1) 函数签名
mapPartitionsWithIndex(func)
  • (2) 函数说明
    mapPartitionsWithIndexmapPartitions类似,但会为fun函数提供一个整数值来表示分区分区的索引。当RDD的类型为T,则返回的func的类型为Iteratoe<T> => Iteratoe<T>
val dataRDD1 = dataRDD.mapPartitionsWithIndex(
	(index, datas) => {
		datas.map(index, _)
	}
)

9.3.4 flatMap(重要)

  • (1) 函数签名
flatMap(func)
  • (2) 函数说明
    flatMapmap类似,但每个输入元素都可映射到0或多个输出元素(即这里的 func 会返回一个 seq 而非单个元素)。也即将处理的数据进行扁平化后再进行映射处理
val dataRDD = sparkContext.makeRDD(
	List(List(1,2), List(3,4)), 1)
val dataRDD1 = dataRDD.flatMap(
	list => list
)

9.3.5—glom

  • (1) 函数签名
glom()
  • (2) 函数说明
    将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
val dataRDD = sparkContext.makeRDD(List(1,2,3,4), 1)
val dataRDD1:RDD[Array[Int]] = dataRDD.glom()

9.3.6—groupBy

  • (1) 函数签名
groupBy(func)
  • (2) 函数说明
    将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中
    一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
val dataRDD = sparkContext.makeRDD(List(1,2,3,4), 1)
val dataRDD1 = dataRDD.groupBy(
	_%2
)

9.3.7—filter

  • (1) 函数签名
filter(func)
  • (2) 函数说明
    将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现 数据倾斜
val dataRDD = sparkContext.makeRDD(List(1,2,3,4), 1)
 val dataRDD1 = dataRDD.filter(_%2 == 0)

9.3.8—sample

  • (1) 函数签名
sample(withReplacemment, fraction, seed)
  • (2) 函数说明
    根据指定的规则从数据集中抽取数据。数据的部分采样,包含或者不包含替换,使用了给定的随机数字生成器 seed
val dataRDD = sparkContext.makeRDD(List(
	1,2,3,4
),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)

// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)

9.3.9—distinct

  • (1) 函数签名
distinct([numTasks])
  • (2) 函数说明
    返回一个新的数据集,其中包含源数据集中的不同元素。
val dataRDD = sparkContext.makeRDD(List(
	1,2,3,4,1,2
),1)
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)

9.1.10—coalesce

  • (1) 函数签名
coalesce(numPartitions)
  • (2) 函数说明
    RDD 的分区数量减至 numPartitions,这样可将一个大数据集过滤成小数据集,从而执行更高效的操作。
    注:当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少
    分区的个数,减小任务调度成本
val dataRDD = sparkContext.makeRDD(List(
	1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)

声明:本文是学习时记录的笔记,如有侵权请告知删除!
原视频地址:https://www.bilibili.com/video/BV11A411L7CK

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-25 11:45:16  更:2021-07-25 11:45:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/1 0:12:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码