大数据课程——Spark RDD 编程
?
实验内容以及要求
现有大约500万条搜索引擎产生的记录,数据格式如下: 每一行包含6个字段: 字段1代表数据产生的时间; 字段2代表用户,即UID; 字段3代表用户搜索关键词; 字段4代表URL超链接在返回结果中的排名; 字段5代表用户单击超链接的顺序号; 字段6代表用户单击的URL超链接的地址。 请编写Scala程序,实现如下功能: (1)统计用户数量,输出格式如下: (2)统计搜索次数在20次及以上的用户UID及搜索次数,输出格式(按照搜索次数降序排列,搜索次数相同时按UID升序排列)如下: 注意: (1)数据请上传至HDFS; (2)统计结果保存至HDFS。
?
问题总结
首次使用Scala语言进行编程
本次实验对于任务的思路来说,其实并不难,跟之前的某次实验是一样的。但难点主要在于对于Scala语言编程的首次使用,由于对Scala的语法不熟悉成为了本次实验的难点,比如不知道怎么写函数,怎么调用函数等。
?
节点拒绝连接
在实验过程中,本人猜测是zookeeper的问题,可能某个节点宕掉,导致运行结果不完整,会报“拒绝连接”的错误。如下图所示。后来重启了几次就解决了。
? ?
任务一
实现思路
得到数据后,首先进行数据分割,提取字段1,得到所有用户RDD。但其中会存在重复的用户(因为一个用户会进行多次搜索),所以要进行去重操作,除去重复的用户数据。最后添加行号,调整排序得到最终结果,最后一名用户的编号即为总人数。 ?
核心代码(详细代码放在文末)
?
运行结果
?
任务二
实现思路
得到数据后,进行分割,提取字段1,并且将字段1和数字1组成一个元组作为一条新的记录(因为每个记录代表该用户进行了一次搜索)。 合并相同key值的记录,将其value值相加,统计该用户的总共的搜索次数。 使用二次排序,将统计结果进行排序,最后输出搜索次数大于等于20的记录。
?
核心代码
?
运行结果
运行中的图片
?
?
具体代码
MySortClass.scala
package spark.demo
class MySortClass(val x:String, val y:Int) extends Serializable with Ordered[MySortClass] {
override def compare(that: MySortClass): Int = {
if (!this.y.equals(that.y)) {
this.y - that.y
}
else {
this.x.hashCode - that.x.hashCode
}
}
}
?
SearchCount.scala
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import spark.demo.MySortClass
object SearchCount{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("My-Spark-Work")
conf.setMaster("spark://centos01:7077");
val sc = new SparkContext();
val linesRDD:RDD[String] = sc.textFile(args(0))
val usersRDD:RDD[String] = linesRDD.map(_.split("\t")(1))
val paresRDD:RDD[(String, Int)] = usersRDD.map((_,1))
val userCountsRDD:RDD[(String, Int)] = paresRDD.reduceByKey((x,y)=>x+y)
val tmpRDD = userCountsRDD.map(a =>(new MySortClass(a._1, a._2), a))
val userCountsSortRDD = tmpRDD.sortByKey()
val resultRDD = userCountsSortRDD.map(a=>{if(a._2._2>=20)a._2})
resultRDD.saveAsTextFile(args(1))
sc.stop();
}
}
?
UserCount.scala
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object UserCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("My-Spark-Work")
conf.setMaster("spark://centos01:7077");
val sc = new SparkContext();
val linesRDD:RDD[String] = sc.textFile(args(0))
val test = "qwe"
val usersRDD:RDD[String] = linesRDD.map(_.split("\t")(1))
val tmpRDD:RDD[(String, Int)] = usersRDD.map((_, 1))
val reduceRDD:RDD[(String, Int)] = tmpRDD.reduceByKey((x,y)=>x+y)
val reduceUserRDD:RDD[String] = reduceRDD.map(line=>{line._1})
val indexRDD:RDD[(String, Long)] = reduceUserRDD.zipWithIndex().map{case(x, y)=>(x, y+1)}
val resultRDD:RDD[(Long, String)] = indexRDD.map(line => {(line._2, line._1)})
resultRDD.saveAsTextFile(args(1))
sc.stop();
}
}
?
WordCount.scala
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("My-Spark-Work")
conf.setMaster("spark://centos01:7077");
val sc = new SparkContext();
val linesRDD:RDD[String] = sc.textFile(args(0))
val wordsRDD:RDD[String] = linesRDD.flatMap(_.split(" "))
val paresRDD:RDD[(String, Int)] = wordsRDD.map((_, 1))
val wordCountsRDD:RDD[(String, Int)] = paresRDD.reduceByKey(_+_)
val wordCountsSortRDD:RDD[(String, Int)] = wordCountsRDD.sortBy(_._2, false)
wordCountsSortRDD.saveAsTextFile(args(1))
sc.stop();
}
}
|