算子分区数、分区器如何确定?
如何确定新的RDD的分区数?
??一、如果设置了并行度,则使用并行度为分区数;
??二、若没设置并行度,则取上游最大分区数,作为下游分区数。
??三、若上游没有分区器,且没设置并行度,则使用默认并行度作为分区数(默认机器总核数)
如何确定分区器?
??一、默认先取上游最大分区数的分区器;
????①如果有分区器,且分区数"大于或等于 "设置的并行度,则使用该分区器和分区数;
????②若分区数"小于 "设置的并行度数,则还是使用HashPartitioner,且分区数为设置的并行度数;
????③如果有分区器,但没有设置并行度,则直接使用该分区器和分区数
??二、否则,就使用HashPartitioner,且分区数为设置的并行度数!
四种情况:
一、设置了并行度,上游没有分区器
??则直接使用HashPartitioner,分区数为并行度
二、设置了并行度,上游有分区器
??①并行度> 上游最大分区数,则使用HashPartitioner,分区数为并行度
??②并行度<= 上游最大分区数,则使用该分区器和分区数
三、没设置并行度,上游有分区器
??则直接使用上游分区数最大的 分区器和分区数
四、没设置并行度,上游没有分区器
??则使用HashPartitioner,分区数为并行度默认值(机器总核数)
总结:
??默认不设置并行度,取上游最大分区数,作为下游分区数。
??默认取上游最大分区数的分区器,如果没有,就使用HashPartitioner!
Value 类型
1. map() 改变结构就用map
函数签名:
map[U: ClassTag](f: T => U): RDD[U]
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
函数说明
参数f是一个函数,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。
将处理的数据逐条进行映射转换,这里的转换可以是值的转换,也可以是类型的转换。
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(
num => {
num * 2
}
)
val dataRDD2: RDD[String] = dataRDD1.map(
num => {
"" + num
}
)
图片中的说明:先把一个数据拿过来以后进行 *2 操作,例如拿1 过来后 *2 = 2 后,1这个数据就离开这块区域,然后进行第二个数据的处理…
2. mapPartitions() 以分区为单位执行Map
函数签名
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
函数说明
f: Iterator[T] => Iterator[U] :f函数把每个分区的数据分别放入到迭代器中(批处理)。
preservesPartitioning: Boolean = false :是否保留RDD的分区信息。
功能:一次处理一个分区数据。
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
每一个分区的数据会先到内存空间,然后才进行逻辑操作,整个分区操作完之后,拿到分区的数据才会释放掉。
从性能方面讲:批处理效率高,从内存方面:需要内存空间较大
val dataRDD: RDD[Int] = sc.makeRDD(1 to 4, 2)
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
datas => {
datas.filter(_ == 2)
}
)
小功能:获取每个数据分区的最大值
val dataRDD: RDD[Int] = sc.makeRDD(List(1,3,6,2,5),2)
val rdd: RDD[Int] = dataRDD.mapPartitions(
iter => {
List(iter.max).iterator
}
)
println(rdd.collect().mkString(","))
思考一个问题:map和mapPartitions的区别?
map算子每一次处理一条数据,而mapPartitions算子每一次将一个分区的数据当成一个整体进行数据处理。
如果一个分区的数据没有完全处理完,那么所有的数据都不会释放,即使前面已经处理完的数据也不会释放。容易出现内存溢出,所以当内存空间足够大时,为了提高效率,推荐使用mapPartitions算子
有些时候,完成比完美更重要
3. mapPartitionsWithIndex() 带分区号
函数签名
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
函数说明
f: (Int, Iterator[T]) => Iterator[U] :f函数把每个分区的数据分别放入到迭代器中(批处理)并且加上分区号
参数Int :为分区号
参数Iterator[T] :为一个迭代器,内容为一个分区中所有的数据;
函数的返回Iterator[U] :分区内每个数据经过转换以后数据形成的迭代器。
作用:比mapPartitions多一个整数参数表示分区号,在处理数据同时可以获取当前分区索引。
val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)
val mapRdd = rdd.mapPartitionsWithIndex(
(index, items) => {
items.map((index, _))
}
)
mapRdd.collect().foreach(println)
结果:
(0,1)
(0,2)
(1,3)
(1,4)
小功能:获取第二个数据分区的数据
val dataRDD: RDD[Int] = sc.makeRDD(List(1,3,6,2,5,4),3)
val rdd = dataRDD.mapPartitionsWithIndex(
(index, iter) => {
if ( index == 1 ) {
iter
} else {
Nil.iterator
}
}
)
println(rdd.collect().mkString(","))
4. flatMap() 扁平化
函数签名
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
函数说明
与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。
本质:要求返回一个集合,会自动将集合压平(得到里面的元素)
区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。
val listRDD=sc.makeRDD(List(List(1,2),List(3,4)), 2)
val mapRdd: RDD[Int]= listRDD.flatMap(
item=>item
)
mapRdd.collect().foreach(println)
结果:
1
2
3
4
小功能:将List(List(1,2),3,List(4,5)) 进行扁平化操作
val dataRDD = sc.makeRDD( List(List(1,2),3,List(4,5)) )
val rdd = dataRDD.flatMap(
data => {
data match {
case list: List[_] =>list
case d => List(d)
}
}
)
5. glom() 分区转换数组
函数签名
def glom(): RDD[Array[T]]
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
函数说明
作用:将同一个分区内的数据转换成数组,分区不变
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
该操作将RDD中每一个分区的数据变成一个数组,并放置在新的RDD中,数组中元素的类型与原分区中元素类型一致。
val rdd = sc.makeRDD(1 to 4, 2)
val mapRdd = rdd.glom().map(_.max)
mapRdd.collect().foreach(println)
结果:
2
4
6. groupBy() 分组
函数签名
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}
函数说明
分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
groupBy 可以决定数据的分类,但是分类后的数据去哪个分区此算子无法决定
将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为shuffle 。极限情况下,数据可能被分在同一个分区中
注意:一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
小功能:将List("Hello", "hive", "hbase", "Hadoop") 根据单词首写字母进行分组。
val dataRDD = sc.makeRDD(List("Hello", "hive", "hbase", "Hadoop"), 2)
dataRDD.groupBy(word=>{
word(0)
})
小功能:WordCount。
val dataRDD = sc.makeRDD(List("Hello World", "Hello", "Hello"))
println(dataRDD
.flatMap(_.split(" "))
.groupBy(word => word)
.map(kv => (kv._1, kv._2.size))
.collect().mkString(",")
)
7. filter() 过滤
函数签名
def filter(f: T => Boolean): RDD[T]
函数说明
按照传入函数的返回值进行筛选过滤,符合规则的数据保留(保留为true的数据 ),不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。
val rdd = sc.makeRDD(1 to 4, 2)
val filterRdd = rdd.filter(_%2 == 0)
filterRdd.collect().foreach(println)
结果:
2
4
8. sample() 采样
函数签名
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
函数说明
withReplacement :true 为有放回的抽样,false 为无放回的抽样。
fraction 表示:以指定的随机种子随机抽样出数量为fraction的数据。
seed 表示:指定随机数生成器种子。
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.sample(false, 0.5)
val dataRDD2 = dataRDD.sample(true, 2)
思考一个问题:sample有啥用,抽奖吗?
在实际开发中,往往会出现数据倾斜的情况,那么可以从数据倾斜的分区中抽取数据,查看数据的规则,分析后,可以进行改善处理,让数据更加均匀
9. distinct() 去重
函数签名
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
函数说明
对内部的元素去重,distinct 后会生成与原RDD分区个数不一致的分区数。上面的函数还可以对去重后的修改分区个数。
val distinctRdd: RDD[Int] = sc.makeRDD(List(1,2,1,5,2))
distinctRdd.distinct(2)
.collect().foreach(println)
结果
1
2
5
思考一个问题:如果不用该算子,你有什么办法实现数据去重?
可以通过reduceByKey的方式去重
dataRDD
.map(x => (x, null))
.reduceByKey((v1, v2) => v1, numPartitions)
.map(_._1)
10. coalesce() 合并分区
函数签名
def coalesce(numPartitions: Int,
shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]
函数说明
该算子重点在减少分区,我们在重置分区的个数的时候,参数值不要比原有分区数量多,因为该算子默认是不会打乱数据重新,没有shuffle,所以分区设置多了,多余的分区不会有数据。
我们在使用这个算子的时候,只需要传递重置的分区数量即可,其他的参数使用默认值;
如果想扩大分区,有新的算子可以实现,不过底层还是调用coalesce,只是将参数2设置为true
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
当spark程序中,存在过多的小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本
val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4),4)
val mapRdd: RDD[Int] = rdd.coalesce(2)
mapRdd.mapPartitionsWithIndex(
(index,values)=>values.map((index,_))
)
.collect().foreach(println)
结果
(0,1)
(0,2)
(1,3)
(1,4)
无shuffle
设置2个分区后的结果:
(0,1) (0,2) (1,3) (1,4)
设置3个分区后的结果:
(0,1) (1,2) (2,3) (2,4)
设置4个或者以上
(0,1) (1,2) (2,3) (3,4)
设置true后开启shuffle
设置1 ,2后的结果
(0,1) (0,2) (0,3) (0,4)
设置3后的结果
(0,1) (1,2) (1,3) (2,4)
设置4后的结果
(3,1) (3,2) (3,3) (3,4)
....
源码
for (i <- 0 until maxPartitions) {
val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
(rangeStart until rangeEnd).foreach{ j => groupArr(i).partitions += prev.partitions(j) }
}
解释说明:
maxPartitions:传进来的新分区数
prev.partitions:之前RDD的分区数
分区i
开始 = 分区号*前一个分区数 / 新的分区数
结束 = (分区号+1)*前一个分区数 / 新的分区数
11. repartition() 重新分区(执行Shuffle)
函数签名
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
函数说明
该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true 。
这个参数即可以扩大分区,也可以缩小分区的数量,但是我们一般用来扩大分区,缩小分区可以使用coalesce算子
无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。
val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8),4)
val mapRdd: RDD[Int] = rdd.repartition(8)
mapRdd.mapPartitionsWithIndex(
(index,values) =>values.map((index,_))
).collect().foreach(println)
结果
(6,1)
(6,3)
(6,5)
(6,7)
(7,2)
(7,4)
(7,6)
(7,8)
思考一个问题:coalesce和repartition区别?
repartition方法其实就是coalesce方法,只不过肯定使用了shuffle操作。让数据更均衡一些,可以有效防止数据倾斜问题。
如果缩减分区,一般就采用coalesce;如果扩大分区,就采用repartition
这两个算子只是决定分区数,并不能决定分区的数据如何分区,即只针对分区数,并不针对数据
12. sortBy() 排序
函数签名
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
函数说明
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。
val rdd: RDD[Int] = sc.makeRDD(Array(1,5,3,2))
val mapRdd = rdd.sortBy(item=>item, false)
mapRdd.collect().foreach(println)
结果
5
3
2
1
13. pipe() 管道,调用shell脚本
函数签名
def pipe(command: String): RDD[String]
函数说明
该方法可以让我们使用任意一种语言实现Spark作业中的部分逻辑,只要它能读写Unix标准的流就行,例如Python 、shell 等脚本
管道,针对每个分区,都调用一次脚本,返回输出的RDD。
注意:脚本需要放在计算节点可以访问到的位置
实际案例
- 有2万个文件,每个10G,放在HDFS上了,总量200TB的数据需要分析。
- 分析程序本身已经写好了,程序接受一个参数:文件路径
- 如何用spark完成集群整个分析任务?
想到了Spark Pipe 应该可以完成:
总的来说就是Spark有一个pipe的编程接口,用的是Unix的标准输入和输出,类似于 Unix的 | 管道,例如: ls | grep ^d
第一步:创建RDD
第二步:创建一个Shell脚本启动分析任务
第三步:RDD对接到启动脚本
总结一下
dataRDD里面包含了我们要分析的文件列表,这个列表会被分发到spark集群,然后spark的工作节点会分别启动一个launch.sh 脚本,接受文件列表作为输入参数,在launch.sh 脚本的循环体用这些文件列表启动具体的分析任务
好处
-
既有程序analysis_program.sh 不需要任何修改,做到了重用,这是最大的好处 -
使用集群来做分析,速度比以前更快了(线性提升) -
提高了机器的利用率(以前可能是一台机器分析)
key-value 类型算子
一个类中,隐式变量的类型只能有一种
1、Spark中有很多方法都是基于Key 进行操作,所以数据格式应该为键值对(对偶元素 )才能使用这些方法
2、如果数据类型是k-v 类型,那么Spark会将RDD自动转换补充很多新的功能——>功能的扩展
3、那么是如果实现的?
-
通过隐式转换 -
如果数据类型为k-v 类型(即RDD[k,v] ),在RDD的伴生对象中,会将当前的RDD转换为PairRDDFunctions 对象 -
所有的k-v 类型的扩展方法,都来自PairRDDFunctions 类中的方法
只有K-V类型的算子才有分区器
1. partitionBy() 按照K重新分区
函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
函数说明
将RDD[K,V] 中的K按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner
如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。
思考一个问题:如果重分区的分区器和当前RDD的分区器一样怎么办?
答:不进行任何的处理。不会再次重分区。
思考一个问题:Spark还有其他分区器吗?
答:有一个RangePartitioner ,在sortBy中使用
思考一个问题:如果想按照自己的方法进行数据分区怎么办?
答:自定义分区器
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
val mapRdd: RDD[(Int, String)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
mapRdd.mapPartitionsWithIndex{
(index,values)=>values.map((index,_))
}.collect().foreach(println)
结果
(0,(2,bbb))
(1,(1,aaa))
(1,(3,ccc))
自定义分区规则
要实现自定义分区器,需要继承org.apache.spark.Partitioner 类,并实现下面三个方法。
(1)numPartitions:Int :返回创建出来的分区数。
(2)getPartition(key: Any):Int :返回给定键的分区编号(0 到 numPartitions-1 )。
(3)equals() :Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同。
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
val mapRdd: RDD[(Int, String)] = rdd.partitionBy(new MyPartition(2))
mapRdd.mapPartitionsWithIndex{
(index,values)=>values.map((index,_))
}.collect().foreach(println)
class MyPartition(num:Int) extends Partitioner{
override def numPartitions: Int = num
override def getPartition(key: Any): Int = {
if(key.isInstanceOf[Int]){
val i: Int = key.asInstanceOf[Int]
if(i%2==0){
0
}else{
1
}
}else{
0
}
}
}
结果
(0,(2,bbb))
(1,(1,aaa))
(1,(3,ccc))
2. reduceByKey() 按照K聚合V
函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
函数说明
该操作可以将RDD[K,V] 中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。
形参func :表示对相同Key的value处理的逻辑
形参numPartitions :聚合结果后的,分区的数量
函数返回值:聚合以后的结果,返回值数据类型和原数据value类型一致
触发一个shuffle 就会划分一个新阶段
val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
val mapRdd: RDD[(String, Int)] = rdd.reduceByKey((v1,v2)=>v1+v2)
mapRdd.collect().foreach(println)
结果
(a,6)
(b,7)
3. groupByKey() 按照K重新分组
函数签名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
函数说明
groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。该操作可以指定分区器或者分区数(默认使用HashPartitioner)
val rdd = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))
val mapRdd: RDD[(String, Iterable[Int])] = rdd.groupByKey(2)
mapRdd.mapPartitionsWithIndex{
(index,values)=>values.map((index,_))
}.collect().foreach(println)
结果
(0,(b,CompactBuffer(5, 2)))
(1,(a,CompactBuffer(1, 5)))
思考一个问题:reduceByKey和groupByKey的区别?
reduceByKey :先根据数据的Key进行分组,然后再对相同Key的value进行数据聚合处理,此算子会对value再shuffle之前进行预聚合处理(聚合逻辑和参数逻辑一致),返回值类型RDD[(K, V)]
groupByKey :根据数据的key进行分组,返回值类型RDD[(K, Iterable[V])]
reduceByKey与groupByKey相比,多了对value的聚合操作,
注意,并且reduceByKey 会对value再shuffle之前进行预聚合,如果最后的结果不需要对value进行一些聚合操作,那一定不能使用reduceByKey ,否则会出现结果数据不准的情况
两个算子在实现相同的业务功能时,reduceByKey存在预聚和功能,所以性能比较高,推荐使用。但是,不是说一定就采用这个方法,需要根据场景来选择
思考一个问题:groupBy 和 groupByKey 的区别?
groupBy :根据指定的规则对数据进行分组,有逻辑形参,返回值类型RDD[(K, Iterable[T])]
groupByKey :直接根据数据的key进行分组,没有逻辑形参,返回值类型RDD[(K, Iterable[T])]
4. aggregateByKey() 按照K处理分区内和分区间逻辑
函数签名
def aggregateByKey[U: ClassTag]
(zeroValue: U)
(
seqOp: (U, V) => U,
combOp: (U, U) => U
): RDD[(K, U)]
函数说明
1)zeroValue (初始值):给每一个分区中的每一种key一个初始值。
这个初始值的理解:这个初始值就是与第一个值进行比较,保证第一次对比下去。
(2)seqOp(分区内):函数用于在每一个分区中用初始值逐步迭代value。
(3)combOp(分区间):函数用于合并每个分区中的结果。
使用场景:分区内 和 分区间 的计算逻辑不同时
根据key进行聚合,将数据根据不同的规则进行分区内计算和分区间计算,计算逻辑均是针对于value的操作
取出每个分区内相同key的最大值然后分区间相加
val rdd = sc.makeRDD(List(
("a",1),("a",2),("c",3),
("b",4),("c",5),("c",6)
),2)
val resultRDD = rdd.aggregateByKey(10)(
(x, y) => math.max(x,y),
(x, y) => x + y
)
resultRDD.collect().foreach(println)
5. foldByKey() 分区内和分区间相同的aggregateByKey()
函数签名
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
函数说明
使用场景:分区内 和 分区间 的计算逻辑相同时
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为foldByKey
val rdd = sc.makeRDD(List(("a",1),("b",1),("b",1),("a",1)),2)
rdd.foldByKey(0)(_+_)
.collect().foreach(println)
结果
(b,2)
(a,2)
6. combineByKey() 转换结构后分区内和分区间操作
函数签名
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C
): RDD[(K, C)]
函数说明
(1)createCombiner(转换数据的结构): combineByKey()
会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。
如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。
(2)mergeValue(分区内):
如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。
(3)mergeCombiners(分区间):
由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的 mergeCombiners()方法将各个分区的结果进行合并。
使用场景:当计算时发现key的value不符合计算规则的格式时,可以选择conbineByKey
最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
小练习:将数据List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个key的平均值
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
(_, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
combineRdd.map{
case ( key, ( total, count ) ) => (key, total / count )
}.collect().foreach(println)
思考一个问题:reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?
从源码的角度来讲,四个算子的底层逻辑是相同的。
-
reduceByKey :不会对第一个value进行处理,分区内和分区间计算规则相同。 -
aggregateByKey :会将初始值和第一个value使用分区内计算规则进行计算。 -
foldByKey :会将初始值和第一个value使用分区内计算规则,分区内和分区间的计算规则相同。 -
combineByKey :第一个参数就是对第一个value进行处理,所有无需初始值
从源码的角度发现,如上4个算子底层逻辑是相同,唯一不同的区别是参数不同。
参数1: createCombiner,分区内相同key的第一个v的转换逻辑
参数2: mergeValue,分区内部的计算逻辑
参数3: mergeCombiners,分区间的计算逻辑
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)
reduceByKey
源码如下:
参数1:没有任何的转换,对key的第一个value没有转换
参数2和参数3相同,即分区内和分区间的计算逻辑保持一致。
combineByKeyWithClassTag[V](
(v: V) => v,
func,
func )
aggregateByKey
源码如下:
参数1:传递的初始值会和每一个不同key的第一个value按照分区内计算逻辑进行计算
参数2:分区内计算逻辑
参数3:分区间的计算逻辑
combineByKeyWithClassTag[U](
(v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp,
combOp )
foldByKey
源码如下:
参数1:传递的初始值会和每一个不同key的第一个value按照分区内计算逻辑进行计算
参数2和参数3一致:分区内和分区间的计算逻辑保持一致
combineByKeyWithClassTag[V](
(v: V) => cleanedFunc(createZero(), v),
cleanedFunc,
cleanedFunc )
combineByKey
源码如下:
参数1:分区内每个相同key的第一个v的转换逻辑,所以无需传递初始值
参数2:分区内计算逻辑
参数3:分区间的计算逻辑
combineByKeyWithClassTag(
createCombiner,
mergeValue,
mergeCombiners )
7. sortByKey() 排序
函数签名
def sortByKey(
ascending: Boolean = true,
numPartitions: Int = self.partitions.length
): RDD[(K, V)] {
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
函数说明
有两个形参,均有默认值:
形参1:ascending: Boolean = true
排序的顺序,默认是升序,如果需要降序,则输入false
形参2:numPartitions: Int = self.partitions.length
排序以后分区的数量,默认等于上一个rdd的分区的数量。
还可以自定义分区的规则。步骤:
-
继承与ordered,并混入serializable -
重写compare方法,指定排序比较的规则
在一个(K,V)的RDD上调用,K必须实现Ordered 接口,返回一个按照key进行排序的
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)
小功能:设置key为自定义类User
class User extends Ordered[User] with Serializable{
override def compare(that: User): Int = {
if (this.name > that.name){
1
}else if (this.name == that.name){
this.age - that.age
}else{
-1
}
}
}
8. mapValues() 只对V进行操作
函数签名
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
函数说明
针对于(K,V)形式的类型只对V进行操作
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (1, "d"), (2, "b"), (3, "c")))
val mapRdd: RDD[(Int, String)] = rdd.mapValues(_+">>>>")
mapRdd.collect().foreach(println)
结果
(1,a>>>>)
(1,d>>>>)
(2,b>>>>)
(3,c>>>>)
9. join() 连接
函数签名
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
函数说明
在类型为(K,V) 的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V1,V2)) 的RDD
情况1:如果当前RDD中key在连接的RDD中没有,那么这条数据就不会被关联,数据则没有
情况2:如果当前RDD中相同的Key有多条数据,且另外一个RDD与子相同的key也有多条数据,那么就出现了笛卡尔积错误
正常情况
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)
少key的情况
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1)))
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("c",2)))
rdd.join(rdd1).collect().mkString(",")
多重复key的情况
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1),("a",2)))
val rdd3: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("c",2),("a",2)))
rdd2.join(rdd3).collect().mkString(",")
会出现数据重复,笛卡尔乘积的现象
思考一个问题:如果key存在不相等呢?
如果key不相等,对应的数据无法连接;如果key有重复的,那么数据会多次连接
10. leftOuterJoin() 左外连接
函数签名
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
函数说明
类似于SQL语句的左外连接
注意返回值:
-
如果两个RDD有相同的key,则为:(a,(1,Some(21))) -
如果主RDD中的key,在从RDD没有对应的key,则为:(d,(2,None))
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1),("d",2),("a",2)))
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("a",2)))
rdd.leftOuterJoin(rdd1).collect().mkString(",")
11. cogroup() 联合
函数签名
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
函数说明
在类型为(K,V) 和(K,W) 的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>)) 类型的RDD
返回值:(K,(Iterable<V>,Iterable<W>)) ,是一个元组
第一个元素:RDD的key
第二个元素:还是一个元组
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",1),("c",1)))
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a",21),("b",2),("b",2),("d",1)))
rdd.cogroup(rdd1).collect().foreach(println)
双Value 类型
双Value:表示是两个RDD之间进行操作,类似sacla中集合的并集(union )、交集(intersect )、差集(diff )、拉链(zip )
1. intersection() 交集
函数签名
def intersection(other: RDD[T]): RDD[T]
函数说明
1、数据打乱重组,有shuffle过程
2、返回的RDD的分区数量,为两个RDD最大的分区数量
3、两个RDD的数据类型必须保持一致,否者编译时报错
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD: RDD[Int] = dataRDD1.intersection(dataRDD2)
2. union() 并集
函数签名
def union(other: RDD[T]): RDD[T]
函数说明
1、分区:分区合并
2、数据:数据合并
3、两个RDD的数据类型必须保持一致,否者编译不通过
对源RDD和参数RDD求并集后返回一个新的RDD
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD: RDD[Int] = dataRDD1.union(dataRDD2)
3. subtract() 差集
函数签名
def subtract(other: RDD[T]): RDD[T]
函数说明
1、分区:返回的RDD的分区数量,等于调用这个方法的RDD的分区数量
2、有数据打乱重组过程,有shuffle过程
3、数据:返回当前RDD除去和参数RDD共同的数据集
4、两个RDD的数据类型必须保持一致,否者编译时报错
以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD: RDD[Int] = dataRDD1.subtract(dataRDD2)
4. zip() 拉链
函数签名
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
函数说明
1、分区数量相同,每个分区的数据量不相等,
报错:Can only zip RDDs with same number of elements in each partition
??只有两个RDD的每个分区数据量相同才能拉链
2、分区数量不相同,每个分区的数量量相同,
报错:Can’t zip RDDs with unequal numbers of partitions
??RDD的分区数量不同不能拉链
综上:只有两个RDD的分区数量和每个分区数据量相等,才能拉链(拉链的规则需要对应)
3、返回的RDD的数据是元组
将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的元素。
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD: RDD[(Int, Int)] = dataRDD1.zip(dataRDD2)
思考一个问题:如果两个RDD数据类型不一致怎么办?
编译时报错,会发生错误
思考一个问题:如果两个RDD数据分区不一致怎么办?
如果数据分区不一致,会发生错误
思考一个问题:如果两个RDD分区数据数量不一致怎么办?
如果数据分区中数据量不一致,也会发生错误。
|