一、RDD的 Action 操作
1、reduce(func)
通过func函数聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
2、collect
以数组的形式返回 RDD 中的所有元素. 所有的数据都会被拉到 driver 端, 所以要慎用
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
rdd.collect()
sc.stop()
}
}
3、count()
返回 RDD 中元素的个数
4、take(n)
返回 RDD 中前 n 个元素组成的数组. take 的数据也会拉到 driver 端, 应该只对小数据集使用
5、first
返回 RDD 中的第一个元素. 类似于take(1).
6、takeOrdered(n, [ordering])
返回排序后的前 n 个元素, 默认是升序排列 数据也会拉到 driver 端
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
val cnt = rdd.count()
println(cnt)
val first = rdd.first()
println(first)
val ints: Array[Int] = rdd.take(3)
println(ints.mkString(","))
val rdd1 = sc.makeRDD(List(4,2,3,1))
val ints1: Array[Int] = rdd1.takeOrdered(3)
println(ints1.mkString(","))
sc.stop()
}
}
7、aggregate
aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致
8、 fold
折叠操作,aggregate的简化操作,seqop和combop一样的时候,可以使用fold
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4),2)
val result = rdd.fold(10)(_+_)
println(result)
sc.stop()
}
}
9、 saveAsTextFile(path)
作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用toString方法,将它装换为文件中的文本
10、saveAsSequenceFile(path)
作用:将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统
11、saveAsObjectFile(path)
作用:用于将 RDD 中的元素序列化成对象,存储到文件中。
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("a", 1),("a", 2),("a", 3)
))
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
rdd.saveAsSequenceFile("output2")
sc.stop()
}
}
12、countByKey()
作用:针对(K,V)类型的 RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 应用: 可以用来查看数据是否倾斜
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("a", 1),("a", 2),("a", 3)
))
val stringToLong: collection.Map[String, Long] = rdd.countByKey()
println(stringToLong)
sc.stop()
}
}
13、foreach(func)
作用: 针对 RDD 中的每个元素都执行一次func 每个函数是在 Executor 上执行的, 不是在 driver 端执行的.
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
rdd.collect().foreach(println)
println("******************")
rdd.foreach(println)
sc.stop()
}
}
package com.atguigu.bigdata.spark.core.rdd.operator.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark07_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List[Int]())
val user = new User()
rdd.foreach(
num => {
println("age = " + (user.age + num))
}
)
sc.stop()
}
class User {
var age : Int = 30
}
}
|