1)函数签名:
def glom():RDD[Array[T]]
2)功能说明
该操作将RDD中每一个分区变成一个数组,并放置在新的RDD中,数组中的元素的类型与原分区中的元素类型一致
3)需求说明:创建一个2个分区的RDD,并将每个分区的数据放到一个数组,求出每个分区的最大值
4)具体实现:
package com.huc.Spark1.value
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test05_glom {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.使用Scala进行spark编程
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 2)
val value: RDD[Array[Int]] = rdd.glom()
// println(value.map(list => list.max).collect().toList)
// println(value.map(_.toList).mapPartitionsWithIndex((index, datas) => datas.map(data => (index, data))).collect().mkString(","))
val value1: RDD[String] = sc.textFile("input/2.txt", 1)
value1.glom().map(_.toList).collect().foreach(println)
println(value1.glom().map(_.toList).collect().toList)
value1.glom().map(_.toList.mkString("\n")).collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
结果:
List(select, name,, age,, sex, from, emp_info, where, age < 10;)
List(List(select, name,, age,, sex, from, emp_info, where, age < 10;))
select
name,
age,
sex
from
emp_info
where
age < 10;
package com.huc.Spark.value
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test05_glom {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.使用Scala进行spark编程
// 创建一个RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)
// 求每个分区的最大值
val maxRDD: RDD[Int] = rdd.glom().map((l: Array[Int]) => l.max)
// 求出所有分区的最大值的和2+4
println(rdd.collect().toList)
println(maxRDD.collect().sum)
//4.关闭连接
sc.stop()
}
}
|