第一题:
给定一组键值对("spark",2),("hadoop",6),("hadoop",4),("spark",6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
val rdd = sc.makeRDD(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
object Work03 {
def main(args: Array[String]): Unit = {
val upAndDown: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Work3")
val sc = new SparkContext(upAndDown)
val rdd = sc.makeRDD(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
val tuples: Array[(String, Int)] = rdd.map(x => {
(x._1, (x._2, 1))
}).reduceByKey((x, y) => {
(x._1 + y._1, x._2 + y._2)
}).map(x => {
(x._1, x._2._1 / x._2._2)
}).collect()
tuples.foreach(println)
}
}
2、Super WordCount
要求:按照count值降序显示前50行数据,将word转换成小写,去除标点符号,去除停用词。备注:标点符号、停用词自定义【什么是停用词,见百度】。
object Work03 {
def main(args: Array[String]): Unit = {
val upAndDown: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Work3")
val sc = new SparkContext(upAndDown)
val rdd: RDD[String] = sc.textFile("datas/words.txt")
var tyc = List("a","an")
val value: RDD[String] = rdd.map(str => {
var str1: String = str.replace(",", " ").replace(".", " ")
str1
})
val value2: RDD[(String, Int)] = value.flatMap(_.split(" ")).filter(x => {
!tyc.contains(x)
}).map(x => {
(x.toLowerCase, 1)
}).reduceByKey(_ + _)
val tuples: RDD[(String, Int)] =value2.sortBy(_._2, false)
val tuples1: Array[(String, Int)] = tuples.collect()
tuples1.foreach(println)
tuples.foreach(println)
}
}
3、广告统计案例
数据格式: timestamp province city userid adid
时间点 省份 城市 用户 广告
用户ID范围:0-99;省份,城市,ID相同:0-9;adid:0-19
需求:
1、统计每一个省份点击TOP3的广告ID
2、统计每一个省份每一个小时的TOP3广告ID
需求一参考答案:
(Hunan,List((5,2273), (1,2202), (2,2193)))
(Hebei,List((7,2250), (8,2240), (3,2234)))
(Henan,List((6,2287), (0,2237), (4,2201)))
(Hubei,List((8,2289), (6,2241), (2,2237)))
(Jiangsu,List((7,2250), (3,2199), (6,2192)))
需求二参考答案:
((0,Hubei),List((8,2289), (6,2241), (2,2237)))
((0,Hunan),List((5,2273), (1,2202), (2,2193)))
((0,Henan),List((6,2287), (0,2237), (4,2201)))
((0,Jiangsu),List((7,2250), (3,2199), (6,2192)))
((0,Hebei),List((7,2250), (8,2240), (3,2234)))
object Work03 {
def main(args: Array[String]): Unit = {
val upAndDown: SparkConf = new SparkConf().setMaster("local[*]").setAppName("aaa")
val sc = new SparkContext(upAndDown)
val value: RDD[String] = sc.textFile("datas/Advert.log")
val result = value.map(x => {
val strings: Array[String] = x.split("\t")
(strings(1), (strings(4), 1))
}).groupByKey().map((x) => {
(x._1,x._2.groupBy(_._1).map((a) => {
(a._1, a._2.size)
}))
}).map((x)=>{
(x._1,x._2.toList.sortBy(x=>{
x._2
}).reverse.take(3))
}).collect()
result.foreach(println)
}
}
需求二:
object Work03 {
def main(args: Array[String]): Unit = {
val format = new SimpleDateFormat("yyyy-MM-dd HH")
val upAndDown: SparkConf = new SparkConf().setMaster("local[*]").setAppName("abc")
val sc = new SparkContext(upAndDown)
val value: RDD[String] = sc.textFile("datas/Advert.log")
val result: Array[((String, String), List[(String, Int)])] = value.map(x => {
val strings: Array[String] = x.split("\t")
((format.format(new Date(strings(0).toLong)), strings(1)), (strings(4), 1))
}).groupBy(_._1).map((x) => {
(x._1, x._2.map(_._2).groupBy(_._1).map((y) => {
(y._1, y._2.size)
}))
}).map((x) => {
(x._1, x._2.toList.sortBy(_._2).reverse.take(3))
}).collect()
result.foreach(println)
}
}
|