Spark自定义分区器
自定义分区器:创建一个类继承Partitioner,并实现方法
下方代码实现功能:把偶数放到0分区,基数放到1分区
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext, TaskContext}
object TestPartitioner {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("TestPartitioner").setMaster("local"))
sc.setLogLevel("error")
val rdd: RDD[String] = sc.textFile("file//day1014TestPartitioner", 5)
rdd.flatMap(_.split(" ")).map((_, 1)).partitionBy(new MyPartitioner(2)).foreachPartition(fp => {
println("分区ID:" + TaskContext.get.partitionId)
fp.foreach(println)
})
}
}
class MyPartitioner(n: Int) extends Partitioner {
override def numPartitions: Int = n
override def getPartition(key: Any): Int = {
if (key.toString.toInt % 2 == 0) {
0
} else {
1
}
}
}
源文件(空格分隔)
1 2 3 4 5 6 7 8 9 10
运行结果
分区ID:0
(2,1)
(4,1)
(6,1)
(8,1)
(10,1)
分区ID:1
(1,1)
(3,1)
(5,1)
(7,1)
(9,1)
|