基本概念
行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发触发作业(Job)的执行。其底层代码调用的就是runJob的方法,底层会创建ActiveJob,并提交执行。
算子介绍
1. reduce
函数定义
def reduce(f: (T, T) => T): T
说明 聚集RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
2. collect
函数定义
def collect(): Array[T]
说明 在驱动程序中,以数组Array 的形式返回数据集的所有元素 。
3. count
函数定义
def count(): Long
说明 返回RDD 中元素的个数。
4. first
函数定义
def first(): T
说明 返回RDD 中的第一个元素 。
5. take
函数定义
def take(num: Int): Array[T]
说明 返回一个由RDD 的前 n 个元素组成的数组。
6. takeOrdered
函数定义
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
说明 返回该RDD 排序后的前 n 个元素组成的数组。
案例实操1-6
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
val cnt = rdd.count()
println(cnt)
val first = rdd.first()
println(first)
val ints: Array[Int] = rdd.take(3)
println(ints.mkString(","))
val rdd1 = sc.makeRDD(List(4,2,3,1))
val ints1: Array[Int] = rdd1.takeOrdered(3)
println(ints1.mkString(","))
sc.stop()
}
}
7. aggregate
函数定义
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
说明 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。
8. fold
函数定义
def fold(zeroValue: T)(op: (T, T) => T): T
说明 折叠操作,aggregate 的简化版操作。
案例实操7-8
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4),2)
val result: Int = rdd.fold(10)(_ + _)
println(result)
sc.stop()
}
}
9. countByKey
函数定义
def countByKey(): Map[K, Long]
说明 统计每种key 的个数。
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,1,3,4),2)
val rdd1 = sc.makeRDD(List(("a",1),("a",2),("a",3)))
val intToLong: collection.Map[Int, Long] = rdd.countByValue()
println(intToLong)
val stringToLong: collection.Map[String, Long] = rdd1.countByKey()
println(stringToLong)
sc.stop()
}
}
案例实操
10. save相关算子
函数定义
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
说明 将数据保存到不同格式的文件中。
案例实操
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
rdd.saveAsSequenceFile("output2")
sc.stop()
}
}
11. foreach
函数定义
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
说明 分布式遍历RDD 中的每一个元素,调用指定函数。
案例实操
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4) )
rdd.collect().foreach(println)
println("*************")
rdd.foreach(println)
sc.stop()
}
}
|