基本函数
RDD中的map、filter、flatMap以及foreach等函数作为最基本的函数,都是RDD中的每个元素进行操作,将元素传递到函数中进行转换。
函数名 | 用法 | 解释 |
---|
map 函数 | map(f:T=>U) : RDD[T]=>RDD[U] | 表示将RDD经由某一函数f后,转变成另一个RDD。 | flatMap 函数 | flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U]) | 表示将RDD由某一函数f后,转变为一个新的RDD,但是与 map 不同,RDD中每一个元素会被映射成新的 0 到多个元素(f 函数返回的是一个序列 Seq)。 | filter 函数 | filter(f:T=>Bool) : RDD[T]=>RDD[T] | 表示将 RDD 经由某一函数 f 后,只保留 f 返回为 true 的数据,组成新的 RDD。 | foreach 函数 | foreach(func) | 将函数func应用在数据集的每一个元素上,通常用于更新一个累加器,或者和外部存储系统进行交互,例如Redis。 | saveAsTextFile函数 | saveAsTextFile(path:String) | 数据集内部的元素会调用其toString方法,转换为字符串形式,然后根据传入的路径保存成文件,即可以是本地文件系统,也可以是HDFS等。 |
分区函数
每个RDD由多个分区组成,实际开发建议对每个分区数据进行操作,map函数使用mapPartitions代替、foreach函数使用foreachPartitoin代替。
//分区映射函数 mapPartitions
def mapPartitions[U: ClassTag](
//迭代器,将每个分区数据放在迭代器中
f:Iterator[T] => Iterator[U],
preservesPartitioning:Boolean = false
):RDD[U]
//分区输出函数
def foreachPartition(f: Iterator[T] => Unit):Unit
注意:分区函数非常重要,举个例子,比如我们针对四万个元素,如果我们不使用分区函数,那么我们就得对四万个元素一一处理,可能要创建四万个对象,内存顶不住,可如果我们分区将四万个元素分成四个分区,那我们只需要创建四个对象操控每个分区就行了。
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
object SparkIterTest {
def main(args: Array[String]): Unit = {
val sc: SparkContext = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
SparkContext.getOrCreate(sparkConf)
}
val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
val resultRDD: RDD[(String, Int)] = inputRDD
.filter(line => null != line && line.trim.length != 0)
.flatMap(line => line.trim.split("\\s+"))
.mapPartitions{iter =>
iter.map(word => word -> 1)
}
.reduceByKey((tmp, item) => tmp + item)
resultRDD
.foreachPartition{iter =>
val partitionId: Int = TaskContext.getPartitionId()
iter.foreach(tuple => println(s"${partitionId}: $tuple"))
}
sc.stop()
}
}
运行结果:
0: (scala,2)
1: (spark,5)
0: (hive,3)
1: (hadoop,1)
0: (zookeeper,2)
1: (flume,3)
0: (pig,1)
1: (hdfs,2)
0: (yarn,1)
0: (azkaban,2)
0: (kafka,2)
0: (sqoop,2)
Process finished with exit code 0
重分区函数
如何对RDD中分区数目进行调整(增加或者减少分区数目),在RDD函数中主要有如下三个函数。
1. reparation 增加分区函数
函数名称:reparation ,此函数使用必须谨慎,会产生 Shuffle。
// 此函数用于增加RDD分区数目
def repatation(numPatations: Int)(implicit ord: Ordering[T] = null): RDD[T]
2. coalesce 减少分区函数
函数名称:coalesce,此函数不会产生Shuffle,当且仅当降低RDD分区数目。 比如RDD的分区数目为10个分区,此时调用rdd.coalesce(12),不会对RDD进行任何操作。
def coalesce(
numPatition: Int,
shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty
)
(implicit ord: Ordering[T] = null): RDD[T]
3. demo 演示
PairRDDFunctions 调整分区函数 在PairRDDFunctions(此类专门针对RDD中数据类型为KeyValue对提供函数)工具类中 partitionBy 函数:
// 此函数通过传递分区器 Parititoner 改变RDD的分区数目
def partitonBy(partitoner: Partitioner): RDD[(K, V)]
代码演示:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
object SparkPartitionTest {
def main(args: Array[String]): Unit = {
val sc: SparkContext = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
SparkContext.getOrCreate(sparkConf)
}
val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
println(s"inputRDD 分区数目:${inputRDD.getNumPartitions}")
val etlRDD: RDD[String] = inputRDD.repartition(3)
println(s"etlRDD 分区数目:${etlRDD.getNumPartitions}")
val resultRDD: RDD[(String, Int)] = etlRDD
.filter(line => null != line && line.trim.length != 0)
.flatMap(line => line.trim.split("\\s+"))
.mapPartitions{iter =>
iter.map(word => word -> 1)
}
.reduceByKey((tmp, item) => tmp + item)
resultRDD
.coalesce(1)
.foreachPartition{iter =>
val partitionId: Int = TaskContext.getPartitionId()
iter.foreach(tuple => println(s"${partitionId}: $tuple"))
}
sc.stop()
}
}
运行结果:
inputRDD 分区数目:2
etlRDD 分区数目:3
0: (scala,2)
0: (flume,3)
0: (kafka,2)
0: (hadoop,1)
0: (zookeeper,2)
0: (yarn,1)
0: (hdfs,2)
0: (sqoop,2)
0: (spark,5)
0: (hive,3)
0: (pig,1)
0: (azkaban,2)
Process finished with exit code 0
注意:代码中开始是规定了最小分区数目为2,然后通过repartition函数将分区数目增加至3,最后又通过coalesce函数将分区数目降低至1,所以运行结果中显示只有一个分区,都是0 。
聚合函数
在数据分析领域,对数据聚合操作是最为关键的,在Spark框架中各个模块使用时,主要就是聚合函数的使用。
1. Scala集合中的聚合函数
回顾列表list中reduce聚合函数核心概念:聚合的时候,往往需要聚合中间临时变量,查看列表List中聚合函数reduce和fold源码如下:
def reduce[A1 >: A](op : scala.Function2[A1, A1, A1]) : A1 = { /* compiled code */ }
def fold[A1 >: A](z : A1)(op : scala.Function2[A1, A1, A1]) : A1 = { /* compiled code */ }
z:表示中间临时变量的初始值,fold聚合函数比reduce聚合函数:可以设置聚合中间变量初始值
通过代码,看看列表list聚合函数的使用:
object Reduce_test {
def main(args: Array[String]): Unit = {
val list:List[Int] = (1 to 10).toList
val result = list.reduce((tmp, item) => {
println(s"tep = $tmp, item = $item, sum = ${tmp + item}")
tmp + item
})
}
}
运行结果如下:
聚合操作时,往往聚合过程中需要中间临时变量(到底用几个临时变量,由具体业务定)
2. RDD中的聚合函数
在RDD中提供类似Scala中列表List中的聚合函数reduce和fold。
reduce 函数聚合
原理示意图 datasRDD中有俩个分区,然后对每个分区做局部聚合,通过排序临时变量拿到最大的俩个值,然后对将俩个分区的结果进行全局聚合,得到最后结果。
代码如下:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import scala.collection.mutable.ListBuffer
object SparkAggTest {
def main(args: Array[String]): Unit = {
val sc: SparkContext = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
SparkContext.getOrCreate(sparkConf)
}
val datasRDD: RDD[Int] = sc.parallelize(1 to 10, numSlices = 2)
datasRDD.foreachPartition{iter =>
val partitionId: Int = TaskContext.getPartitionId()
iter.foreach(item => println(s"$partitionId: $item"))
}
println("===============================================")
val result: Int = datasRDD
.reduce{(tmp, item) =>
val partitionId: Int = TaskContext.getPartitionId()
println(s"$partitionId: tmp = $tmp, item = $item, sum = ${tmp + item}")
tmp + item
}
println(s"RDD Reduce = $result")
sc.stop()
}
}
运行结果:
1: 6
1: 7
1: 8
0: 1
0: 2
0: 3
1: 9
1: 10
0: 4
0: 5
===============================================
0: tmp = 1, item = 2, sum = 3
1: tmp = 6, item = 7, sum = 13
0: tmp = 3, item = 3, sum = 6
1: tmp = 13, item = 8, sum = 21
0: tmp = 6, item = 4, sum = 10
1: tmp = 21, item = 9, sum = 30
0: tmp = 10, item = 5, sum = 15
1: tmp = 30, item = 10, sum = 40
0: tmp = 15, item = 40, sum = 55
RDD Reduce = 55
Process finished with exit code 0
运行原理分析:
aggregate 高级聚合函数
与 RDD reduce 函数不同,aggregate 函数更加底层,功能更加强大。aggregate对于局部和全局可以设置不同的聚合函数,局部有局部的操作,全局有全局的操作,相对于reduce,更加灵活。
val rest: ListBuffer[Int] = datasRDD.aggregate(new ListBuffer[Int]())(
(tmp: ListBuffer[Int], item: Int) => {
tmp += item
tmp.sorted.takeRight(2)
},
(tmp: ListBuffer[Int], item: ListBuffer[Int]) => {
tmp ++= item
tmp.sorted.takeRight(2)
}
)
println(s"Top2: ${rest.toList.mkString(", ")}")
运行结果原理剖析:
完整 Demo 上述内容完整代码如下。
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import scala.collection.mutable.ListBuffer
object SparkAggTest {
def main(args: Array[String]): Unit = {
val sc: SparkContext = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
SparkContext.getOrCreate(sparkConf)
}
val datasRDD: RDD[Int] = sc.parallelize(1 to 10, numSlices = 2)
datasRDD.foreachPartition{iter =>
val partitionId: Int = TaskContext.getPartitionId()
iter.foreach(item => println(s"$partitionId: $item"))
}
println("===============================================")
val result: Int = datasRDD
.reduce{(tmp, item) =>
val partitionId: Int = TaskContext.getPartitionId()
println(s"$partitionId: tmp = $tmp, item = $item, sum = ${tmp + item}")
tmp + item
}
println(s"RDD Reduce = $result")
println("===============================================")
val rest: ListBuffer[Int] = datasRDD.aggregate(new ListBuffer[Int]())(
(tmp: ListBuffer[Int], item: Int) => {
tmp += item
tmp.sorted.takeRight(2)
},
(tmp: ListBuffer[Int], item: ListBuffer[Int]) => {
tmp ++= item
tmp.sorted.takeRight(2)
}
)
println(s"Top2: ${rest.toList.mkString(", ")}")
sc.stop()
}
}
运行结果:
1: 6
0: 1
1: 7
0: 2
1: 8
0: 3
1: 9
0: 4
1: 10
0: 5
===============================================
1: tmp = 6, item = 7, sum = 13
0: tmp = 1, item = 2, sum = 3
0: tmp = 3, item = 3, sum = 6
1: tmp = 13, item = 8, sum = 21
0: tmp = 6, item = 4, sum = 10
0: tmp = 10, item = 5, sum = 15
1: tmp = 21, item = 9, sum = 30
1: tmp = 30, item = 10, sum = 40
0: tmp = 40, item = 15, sum = 55
RDD Reduce = 55
===============================================
Top2: 9, 10
Process finished with exit code 0
PairRDDFunctions 聚合函数
在 Spark 中有一个 object 对象 PairRDDFunctions ,主要针对RDD中的数据类型是 Key、Value 对的数据提供函数,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。*ByKey 函数:将相同Key的Value进行聚合操作,省去先分组再聚合。
(1) groupByKey 函数(数据倾斜、内存溢出)
// 将相同的Value合在一起,所有的Value存储在迭代器Iterable中
def groupByKey(): RDD[(K, Iterable[V])]
此函数出现的性能问题:
- 1.数据倾斜
- 当某个Key对应的Value值非常多的时候,迭代器中的数据Value非常多。
- 2.OOM
- 所有在SparkCore开发中,原则:能不使用groupByKey就不要使用,不得已而为之,可以完全不适用。
(2) reduceByKey、foldByKey 分组聚合函数
// 将相同的Key的Value进行聚合操作,类似RDD中reduce函数
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
// 此函数比reduceByKey多了聚合时中间临时变量初始值,类似RDD中的fold函数
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
在SparkCore开发中,建议使用reduceByKey函数。
但是reduceByKey和foldByKey聚合以后的结果数据类型与RDD中Value的数据类型是一样的。
(3) aggregateByKey 分组聚合函数
def aggregateByKey[U: ClassTag]
(zeroValue: U)
(
seqOp: (U, V) => U,
combOp: (U, U) => U
): RDD[(K, U)]
在企业中如果对数据聚合使用,不能使用reduceByKey完成时,考虑使用aggregateByKey函数,基本上都能完成任意聚合功能。
完整 Demo
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkAggByKeyTest {
def main(args: Array[String]): Unit = {
val sc: SparkContext = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
new SparkContext(sparkConf)
}
val linesSeq: Seq[String] = Seq(
"hadoop scala hive spark scala sql sql",
"hadoop scala spark hdfs hive spark",
"spark hdfs spark hdfs scala hive spark"
)
val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
val wordsRDD: RDD[(String, Int)] = inputRDD
.flatMap(line => line.split("\\s+"))
.map(word => word -> 1)
val wordsGroupRDD: RDD[(String, Iterable[Int])] = wordsRDD.groupByKey()
val resultRDD: RDD[(String, Int)] = wordsGroupRDD.map{ case (word, values) =>
val count: Int = values.sum
word -> count
}
println(resultRDD.collectAsMap())
val resultRDD2: RDD[(String, Int)] = wordsRDD.reduceByKey((tmp, item) => tmp + item)
println(resultRDD2.collectAsMap())
val resultRDD3 = wordsRDD.foldByKey(0)((tmp, item) => tmp + item)
println(resultRDD3.collectAsMap())
val resultRDD4 = wordsRDD.aggregateByKey(0)(
(tmp: Int, item: Int) => {
tmp + item
},
(tmp: Int, result: Int) => {
tmp + result
}
)
println(resultRDD4.collectAsMap())
Thread.sleep(1000000)
sc.stop()
}
}
运行结果如下:
注意 reduceByKey 和 groupByKey 都产生了 shuffle ,并且reducebyKey的shuffle值更小,原因就是reduceByKey产生了局部聚合。
groupByKey 和 reduceByKey 的区别
最大的区别就是reducebyKey函数有局部聚合,groupByKey函数没有。而对于aggregateByKey函数,则更加底层,更加灵活。
reduceByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
groupByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的函数,将相同key的值聚合到一起,与reduceByKey的区别是只生成一个sequence。
3. 聚合函数总结
针对 RDD 数据的聚合函数要点:
- 聚合操作,分为俩层:分区内数据聚合和分区间聚合数据聚合。
- 聚合操作,往往需要聚合中间临时变量,依据需求而定。
- reduce/fold/aggregate
- reduce函数:只需要传递一个聚合操作函。
- fold函数:不仅需要聚合操作函数,还需要初始化中间临时变量的值。
- aggregate函数:初始化中间临时变量的值,分区内聚合操作函数和分区间聚合操作函数。
关联函数
当俩个RDD的数据类型为二元组 Key/Value 对时,可以依据Key进行关联Join。
1、在SQL中JOIN时
指定 关联字段 a join b on a.xx = b.yy
2、在RDD中数据JOIN,要求RDD中数据类型必须是二元组 (Key,Value)
依据Key进行关联的
1. SQL JOIN
如下图是 SQL JOIN
2. RDD JOIN
RDD中关联JOIN函数都在PairRDDFunctions中,具体截图如下: 具体看一下join(等值连接)函数说明:
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
案例代码演示:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkJoinTest {
def main(args: Array[String]): Unit = {
val sc: SparkContext = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
val empRDD: RDD[(Int, String)] = sc.parallelize(
Seq((101, "zhangsan"), (102, "lisi"), (103, "wangwu"), (104, "zhangliu"))
)
val deptRDD: RDD[(Int, String)] = sc.parallelize(
Seq((101, "sales"), (102, "tech"))
)
val joinRDD: RDD[(Int, (String, String))] = empRDD.join(deptRDD)
joinRDD.foreach{case(deptNo, (empName, deptName)) =>
println(s"deptNo = $deptNo, empName = $empName, deptName = $deptName")
}
val leftJoinRDD: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD)
leftJoinRDD.foreach{case(deptNo, (empName, option)) =>
val deptName: String = option.getOrElse("no")
println(s"deptNo = $deptNo, empName = $empName, deptName = $deptName")
}
sc.stop()
}
}
运行结果如下:
deptNo = 102, empName = lisi, deptName = tech
deptNo = 101, empName = zhangsan, deptName = sales
deptNo = 104, empName = zhangliu, deptName = no
deptNo = 101, empName = zhangsan, deptName = sales
deptNo = 102, empName = lisi, deptName = tech
deptNo = 103, empName = wangwu, deptName = no
Process finished with exit code 0
|