6. import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
object _06TestAggregateByKey_exercise { ? ? def main(args: Array[String]): Unit = { ? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]") ? ? ? ? val sc = new SparkContext(conf) ? ? ? ? /** ? ? ? ? ?* 使用aggreateByKey计算每个key出现的次数,与value之和 ? 从而可以计算平均值 ? ? ? ? ?*/ ? ? ? ? val rdd1: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2)
? ? ? ? /** ? ? ? ? ?* 从需求分析中,可知,返回的数据应该是次数与value和,那么能存这样的数据,元组是比较合适的 ? ? ? ? ?*/ ? ? ? ? val result: RDD[(String, (Int, Int))] = rdd1.aggregateByKey((0, 0))( ? ? ? ? ? ? (x, y) => (x._1 + 1, x._2 + y), ? ? ? ? ? ? (x, y) => (x._1 + y._1, x._2 + y._2) ? ? ? ? ) ? ? ? ? //继续求平均值 ? ? ? ? val result1: RDD[(String, Double)] = result.map(x => { ? ? ? ? ? ? var t = x._2 ? ? ? ? ? ? var avg = t._2 / t._1.toDouble ? ? ? ? ? ? (x._1, avg) ? ? ? ? })
? ? ? ? result1.collect().foreach(println) ? ? ? ? // (b,4.0) ? ? ? ? //(a,1.6666666666666667) ? ? } }
7. import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD
//作用: 将kv对形式的RDD的v映射成别的类型 object _07MapValueDemo { ? ? def main(args: Array[String]): Unit = { ? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]") ? ? ? ? val sc = new SparkContext(conf)
? ? ? ? val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2)
? ? ? ? //需求,按key先分组 ,再求和 ? ? ? ? val value: RDD[(String, Iterable[Int])] = rdd.groupByKey() ? ? ? ? val value1: RDD[(String, Int)] = value.mapValues(_.sum) ? ? ? ? value1.collect().foreach(println) ? ? ? ? //(b,12) ? ? ? ? //(a,5)
? ? ? ? println("*****************") ? ? ? ? //将rdd*10进行输出 ? ? ? ? val value2: RDD[(String, Int)] = rdd.mapValues(_ * 10) ? ? ? ? value2.collect().foreach(println) ? ? ? ? //(a,10) ? ? ? ? //(a,20) ? ? ? ? //(b,30) ? ? ? ? //(a,20) ? ? ? ? //(b,40) ? ? ? ? //(b,50) ? ? }
}
8. import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} // ? ? ? ? * ? 第一个参数:就是第一个value的转换操作,使之当成默认值 // ? ? ? ? * ? 第二个参数:用于指定分区内的计算逻辑: // ? ? ? ? * ? 第三个参数:用于指定分区间的计算逻辑 object _08ConbineByKeyDemo { ? ? def main(args: Array[String]): Unit = { ? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]") ? ? ? ? val sc = new SparkContext(conf)
? ? ? ? val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2) ? ? ? ? // ?由于第一个参数是一个函数,而不是一个普通的值,因此对于其他两个参数来说,是动态获取的,那么应该指定一下 ? ? ? ? val result: RDD[(String, Int)] = rdd.combineByKey(x => x, (x: Int, y: Int) => math.max(x, y), (x: Int, y: Int) => x + y) ? ? ? ? result.collect().foreach(println) ? ? ? ? sc.stop() ? ? ? ? // (b,8) ? ? ? ? //(a,4)
? ? } }
9. import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
object _09ReduceAggregateFlodCombineByKeyDemo { ? ? ? ? def main(args: Array[String]): Unit = { ? ? ? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]") ? ? ? ? ? ? val sc = new SparkContext(conf)
? ? ? ? ? ? val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("a", 2), ("b", 4), ("b", 5)), 2)
? ? ? ? ? ? rdd.reduceByKey(_+_) ? ? ? ? ? ? rdd.aggregateByKey(0)(_+_,_+_) ? ? ? ? ? ? rdd.foldByKey(0)(_+_) ? ? ? ? ? ? val value: RDD[(String, Int)] = rdd.combineByKey(x=>x, (x: Int, y: Int) => math.max(x, y), (x: Int, y: Int) => x + y) ? ? ? ? ? ? value.collect().foreach(println)
? ? ? ? ? ? //(b,8) ? ? ? ? ? ? //(a,4) ? ? ? ? } ? ? }
10. import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
object _10JoinDemo { ? ? ? ? def main(args: Array[String]): Unit = { ? ? ? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]") ? ? ? ? ? ? val sc = new SparkContext(conf)
? ? ? ? ? ? val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 4),("d",5)), 2) ? ? ? ? ? ? val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 6), ("b", 7), ("c", 8),("e",9)), 2) ? ? ? ? ? ? /** ? ? ? ? ? ? ?* join/leftOuterJoin/rightOuterJoin算子 ? ? ? ? ? ? ?* 作用:就是让两个pairRDD进行内连接/左外连接/右外连接 ? ? ? ? ? ? ?* ? ? ?通过key连接 ? ? ? ? ? ? ?* ? ? ? ? ? ? ?*/ ? ? ? ? ? ? val value1: RDD[(String, (Int, Int))] = rdd1.join(rdd2) ? ? ? ? ? ? val value2: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2) ? ? ? ? ? ? val value3: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2) ? ? ? ? ? ? value1.collect().foreach(println) ? ? ? ? ? ? println("----------------------------") ? ? ? ? ? ? value2.collect().foreach(println) ? ? ? ? ? ? println("----------------------------") ? ? ? ? ? ? value3.collect().foreach(println)
? ? ? ? ? ? //(b,(3,6)) ? ? ? ? ? ? //(b,(3,7)) ? ? ? ? ? ? //(a,(1,5)) ? ? ? ? ? ? //(a,(2,5)) ? ? ? ? ? ? //(c,(4,8)) ? ? ? ? ? ? //---------------------------- ? ? ? ? ? ? //(d,(5,None)) ? ? ? ? ? ? //(b,(3,Some(6))) ? ? ? ? ? ? //(b,(3,Some(7))) ? ? ? ? ? ? //(a,(1,Some(5))) ? ? ? ? ? ? //(a,(2,Some(5))) ? ? ? ? ? ? //(c,(4,Some(8))) ? ? ? ? ? ? //---------------------------- ? ? ? ? ? ? //(b,(Some(3),6)) ? ? ? ? ? ? //(b,(Some(3),7)) ? ? ? ? ? ? //(e,(None,9)) ? ? ? ? ? ? //(a,(Some(1),5)) ? ? ? ? ? ? //(a,(Some(2),5)) ? ? ? ? ? ? //(c,(Some(4),8)) ? ? ? ? } ? ? }
11. import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
object _11CogroupDemo { ? ? ? ? def main(args: Array[String]): Unit = { ? ? ? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]") ? ? ? ? ? ? val sc = new SparkContext(conf)
? ? ? ? ? ? val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 4),("d",5)), 2) ? ? ? ? ? ? val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 6), ("b", 7), ("c", 8),("e",9)), 2)
? ? ? ? ? ? /** ? ? ? ? ? ? ?* ? ? ? ? ? ? ?* cogroup: ? ? ? ? ? ? ?* ? ? ? ? ? ? ?* 作用: ? ? ? ? ? ? ?* ? ?相当于两个rdd先各自分组(groupByKey),再进行全外jion。 ? ? ? ? ? ? ?* ? ?参考输出结果 ? ? ? ? ? ? ?* ? ? ? ? ? ? ?* (a,(List(1,2),List(5))) ? ? ? ? ? ? ?* (b,(List(3),List(6,7))) ? ? ? ? ? ? ?* (c,(List(4),List(8))) ? ? ? ? ? ? ?* ? ? ? ? ? ? ?*/ ? ? ? ? ? ? val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2) ? ? ? ? ? ? value.collect().foreach(println) ? ? ? ? ? ? // (d,(CompactBuffer(5),CompactBuffer())) ? ? ? ? ? ? //(b,(CompactBuffer(3),CompactBuffer(6, 7))) ? ? ? ? ? ? //(e,(CompactBuffer(),CompactBuffer(9))) ? ? ? ? ? ? //(a,(CompactBuffer(1, 2),CompactBuffer(5))) ? ? ? ? ? ? //(c,(CompactBuffer(4),CompactBuffer(8))) ? ? ? ? } ? ? }
|