spark学习笔记—核心算子(二)
distinct算子
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
val map = new ExternalAppendOnlyMap[T, Null, Null](
createCombiner = _ => null,
mergeValue = (a, b) => a,
mergeCombiners = (a, b) => a)
map.insertAll(partition.map(_ -> null))
map.iterator.map(_._1)
}
partitioner match {
case Some(_) if numPartitions == partitions.length =>
mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
}
}
val result: RDD[Int] = nums.map((_, null)).reduceByKey((x, y) => x).map(_._1)
result.foreach(println)
cogroup算子
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, defaultPartitioner(self, other))
}
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
intersection算子
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
使用cogroup实现intersect算子
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5))
val rdd2: RDD[Int] = sc.makeRDD(List(4, 5, 6, 7, 8))
val rdd3: RDD[(Int, Null)] = rdd1.map((_, null))
val rdd4: RDD[(Int, Null)] = rdd2.map((_, null))
val grouped: RDD[(Int, (Iterable[Null], Iterable[Null]))] = rdd3.cogroup(rdd4)
val res: RDD[Int] = grouped.filter(
t => t._2._1.nonEmpty && t._2._2.nonEmpty
).keys
val resultRDD: RDD[Int] = rdd1.intersection(rdd2)
使用cogroup实现join算子
val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("tom", 1), ("tom", 2), ("jerry", 3), ("ketty", 2)))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("jerry", 1), ("tom", 2), ("shuke", 2)))
val rdd3: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
val result: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
val rdd4: RDD[(String, (Int, Int))] = rdd3.flatMapValues(t => {
for (x <- t._1.iterator; y <- t._2.iterator) yield (x, y)
})
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("tom", 1), ("tom", 2), ("jerry", 3), ("ketty", 2)))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("jerry", 1), ("tom", 2), ("shuke", 2)))
val rdd3: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
val result: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
val rdd4: RDD[(String, (Int, Int))] = rdd3.flatMapValues(t => {
for (x <- t._1.iterator; y <- t._2.iterator) yield (x, y)
})
val leftJoinRDD: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
leftJoinRDD.collect().foreach(println)
val rdd5: RDD[(String, (Int, Option[Int]))] = rdd3.flatMapValues((t: (Iterable[Int], Iterable[Int])) => {
if (t._2.isEmpty) {
t._1.map((_, None))
} else {
for (x <- t._1.iterator; y <- t._2.iterator) yield (x, Some(y))
}
})
val value: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)
val value1: RDD[(String, (Option[Int], Int))] = rdd3.flatMapValues(
t => {
if (t._1.isEmpty) {
t._2.map((None, _))
} else {
for (x <- t._1.iterator; y <- t._2.iterator) yield (Some(x), y)
}
}
)
value.collect().foreach(println)
val fullOuterJoinRDD: RDD[(String, (Option[Int], Option[Int]))] = rdd3.flatMapValues {
case (i1, Seq()) => i1.iterator.map(x => (Some(x), None))
case (Seq(), i2) => i2.iterator.map(x => (None, Some(x)))
case (i1, i2) => for (a <- i1.iterator; b <- i2.iterator) yield (Some(a), Some(b))
}
count算子
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
rdd中的cache操作
- 一个application多次触发action,为了服用前面RDD计算好的数据,避免反复读取HDFS(数据源)中的数据或者重复计算
- 缓存,可以将数据缓存到内存或者磁盘(executor所在的磁盘),第一次触发action才放入在内存或者磁盘,以后再触发action会读取缓存的RDD的数据进行操作并且复用缓存的数据
- 一个RDD多次触发action缓存才有意义
- 如果将数据缓存在内存,内存不够,以分区为单位,只缓存部分分区的数据
- 支持多种StorageLevel,可以将数据序列化,默认放入内存使用的是java对象存储,但是占用空间大,优点是速度快,也可以使用其他的序列化的方式
- cache底层调用的是persist方法,可以指定其他的存储级别
- cache和persist方法,严格来说不是Transformation,因为没有生成新的rdd,只是标记当前的rdd需要cache或者persist
- 原始的数据,经过整理过滤后再进行cache或者persist效果会更佳
rdd中的checkpoint操作
-
使用场景:适合复杂的计算(机器学习、迭代计算)为了避免丢失数据重复计算、可以将宝贵的中间结果保存在hdfs中、保证中间结果的安全 -
在调用rdd的checkpoint方法之前,一定要指定checkpoint的目录,即sc.setCheckPointDir -
为了保证中间结果的安全,将数据保存在HDFS、分布式文件系统中可以保证数据不丢 -
第一次触发action,才做checkpoint,会额外触发一个job,这个job的母的就是将中间结果保存在HDFS中 -
如果rdd做了checkpoint、这个rdd之前的依赖关系就不再使用了 -
触发多次action,checkpoint才有意义、多用于迭代计算 -
checkpoint严格的说,不是transformation,只是标记当前rdd要做checkpoint -
如果checkpoint前,对rdd进行了cache,可以避免数据重复计算,如果有cache的数据优先使用cache,没有再使用checkpoint,如果checkpoint过保存在hdfs中的数据丢了,在对相关的数据进行操作时会报错
统计连续登录的三天及以上的用户
这个问题可以拓展到很多相似的问题:连续几个月充值会员、连续天数有商品卖出、连续打滴滴、连续逾期
测试数据:用户id、登入日期
原始数据
guid01,2018-02-28
guid01,2018-03-01
guid01,2018-03-02
guid01,2018-03-04
guid01,2018-03-05
guid01,2018-03-06
guid01,2018-03-07
guid02,2018-03-01
guid02,2018-03-02
guid02,2018-03-03
guid02,2018-03-06
object UserContinuedLogin {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("data/login.log")
val mapRDD: RDD[(String, String)] = rdd.map(line => {
val strings: Array[String] = line.split(",")
(strings(0), strings(1))
})
val groupRDD: RDD[(String, Iterable[String])] = mapRDD.groupByKey()
val flatMapRDD: RDD[(String, (String, String))] = groupRDD.flatMapValues(it => {
val sorted: List[String] = it.toSet.toList.sortBy((x: String) => x)
val calendar: Calendar = Calendar.getInstance()
var sdf = new SimpleDateFormat("yyyy-MM-dd")
var index = 0
sorted.map(dateStr => {
val date: Date = sdf.parse(dateStr)
calendar.setTime(date)
calendar.add(Calendar.DATE, -index)
index += 1
(dateStr, sdf.format(calendar.getTime))
})
})
val result: RDD[(String, Int, String, String)] = flatMapRDD.map(t => ((t._1, t._2._2), t._2._1)).groupByKey().mapValues(it => {
val list = it.toList.sorted
val times = list.size
val beginTime = list.head
val endTime = list.last
(times, beginTime, endTime)
}).filter(t => t._2._1 >= 3).map(t => {
(t._1._1, t._2._1, t._2._2, t._2._3)
})
println(result.collect().toBuffer)
}
}
统计每门学科最受欢迎的老师前三名
原始数据如下
http://bigdata.edu360.cn/laozhang
http://bigdata.edu360.cn/laozhang
http://bigdata.edu360.cn/laozhao
http://bigdata.edu360.cn/laozhao
http://bigdata.edu360.cn/laozhao
http://bigdata.edu360.cn/laozhao
http://bigdata.edu360.cn/laozhao
直接toList进行排序
val sc: SparkConf = new SparkConf().setMaster("local").setAppName("TopTeacher");
val context = new SparkContext(sc)
val rdd: RDD[String] = context.textFile("data/teacher.log")
val mapRdd: RDD[((String, String), Int)] = rdd.map({
line: String => {
val strings: Array[String] = line.split("/")
val teacher: String = strings(3)
val course: String = strings(2).split("\\.")(0)
((course, teacher), 1)
}
})
val reduceRdd: RDD[((String, String), Int)] = mapRdd.reduceByKey(_ + _)
val groupRdd: RDD[(String, Iterable[((String, String), Int)])] = reduceRdd.groupBy(_._1._1)
va l result: RDD[(String, List[((String, String), Int)])] = groupRdd.mapValues(_.toList.sortBy(_._2).reverse.take(3))
过滤后求topN
val sc: SparkConf = new SparkConf().setMaster("local").setAppName("TopTeacher");
val context = new SparkContext(sc)
val rdd: RDD[String] = context.textFile("data/teacher.log")
val mapRdd: RDD[((String, String), Int)] = rdd.map({
line: String => {
val strings: Array[String] = line.split("/")
val teacher: String = strings(3)
val course: String = strings(2).split("\\.")(0)
((course, teacher), 1)
}
})
val reduceRdd: RDD[((String, String), Int)] = mapRdd.reduceByKey(_ + _)
val subject = List("bigdata", "javaee", "kafka", "hive")
for (sb <- subject){
val filtered: RDD[((String, String), Int)] = reduceRdd.filter((_: ((String, String), Int))._1._1 == sb)
val favTeacher: Array[((String, String), Int)] = filtered.sortBy((_: ((String, String), Int))._2, ascending = false).take(3)
implicit val orderRules:Ordering[((String,String),Int)] = Ordering[Int].on(t => t._2)
val res = reduceRdd.top(2)
print(res.toBuffer)
}
val subjects: Array[String] = reduceRdd.map(_._1._2).distinct().collect()
val subjectPartitioner = new SubjectPartitioner(subjects)
val partitioned: RDD[((String, String), Int)] = reduceRdd.partitionBy(subjectPartitioner)
class SubjectPartitioner(val subjects: Array[String]) extends Partitioner{
val nameToNum = new mutable.HashMap[String,Int]()
var i = 0
for (sub <- subjects){
nameToNum(sub) = i
i += 1
}
override def numPartitions: Int = subjects.length
override def getPartition(key: Any): Int = {
val tuple: (String, String) = key.asInstanceOf[(String, String)]
nameToNum(tuple._1)
}
}
val partitionedRDD: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {
it.toList.sortBy(-_._2).take(2).iterator
})
val value2: RDD[((String, String), Int)] = partitioned.mapPartitions(
it => {
val value1: Ordering[((String, String), Int)] = Ordering[Int].on[((String, String), Int)](-_._2)
val sorter = new mutable.TreeSet[((String, String), Int)]()
it.foreach(
e => {
sorter.add(e)
if (sorter.size > 2) {
sorter -= sorter.last
}
}
)
sorter.iterator
}
)
println(value2.collect().toBuffer)
reduce中传入自定义的分区器减少shuffle的数量
val sc: SparkConf = new SparkConf().setMaster("local").setAppName("TopTeacher");
val context = new SparkContext(sc)
val rdd: RDD[String] = context.textFile("data/teacher.log")
val mapRdd: RDD[((String, String), Int)] = rdd.map({
line: String => {
val strings: Array[String] = line.split("/")
val teacher: String = strings(3)
val course: String = strings(2).split("\\.")(0)
((course, teacher), 1)
}
})
val subjectPartitioner = new SubjectPartitioner(subjects)
val reduceRdd: RDD[((String, String), Int)] = mapRdd.reduceByKey(subjectPartitioner, _ + _)
val value2: RDD[((String, String), Int)] = reduceRdd.mapPartitions(
it => {
val value1: Ordering[((String, String), Int)] = Ordering[Int].on[((String, String), Int)](-_._2)
val sorter = new mutable.TreeSet[((String, String), Int)]()
it.foreach(
e => {
sorter.add(e)
if (sorter.size > 2) {
sorter -= sorter.last
}
}
)
sorter.iterator
}
)
|