/**
- Author itcast *1.热门搜索词 2.用户热门搜索词(带上用户id) 3.各个时间段搜索热度 /import com.hankcs.hanlp.HanLPimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}import shapeless.recordimport spire.std.tuplesimport scala.collection.immutable.StringOpsimport scala.collection.mutable object SougouSearchLogAnalysis { def main(args: Array[String]): Unit = { //TODO 0.准备环境 val conf: SparkConf = new SparkConf().setAppName(“spark”).setMaster("local[]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel(“WARN”) //TODO 1.加载数据 val lines: RDD[String] = sc.textFile(path = “data/input/SogouQ.sample”)//数据的位置 //TODO 2.处理数据 //1.封装数据 //lines.map(line=>{//加入value后变为://map是一个进去一个出去 val SogouRecordRDD: RDD[SogouRecord] = lines.map(line => { val arr:Array[String] = line.split( regex = “\s+”) SogouRecord( arr(0), arr(1), arr(2), arr(3).toInt, arr(4).toInt, arr(5) ) }) // )后加.value可直接出来 //2.切割数据 val wordsRDD: RDD[String] = SogouRecordRDD.flatMap(record=>{//flatMap是一个进去,多个出去(出去之后会被压扁) //360安全卫士==>[360,安全卫士] val wordsStr : String = record.queryWords.replaceAll(regex = “\[|\]”, replacement="") // 360安全卫士 import scala.collection.JavaConverters._ //将Java集合转为scala集合 //println(HanLP.setment(cleanWords2).asScala.map(.word)) //ArrayBuffer(360, 安全卫士) HanLP.segment(wordsStr).asScala.map(.word) //ArrayBuffer(360, 安全卫士) //} : SogouRecord => Unit : Unit) }) //TODO 3.统计指标 //–1.热门搜索词 val result1: Array[String, Int)] = wordsRDD.map((, 1)) .filter(word => !word.equals(".") && !word.equals("+")) .map((, 1)) .reduceByKey(_ + ) .sortBy(.2, ascending = false) .take( num = 10) //–2.用户热门搜索词(带上用户id) val userIdAndWordRDD: RDD[(String, String)] = SogouRecordRDD.flatMap(record => { val wordsStr: String = record.queryWords.replaceAll(regex = “\[|\]”, replacement = “”) import scala.collection.JavaConverters. val words: mutable.Buffer[String] = HanLP.segment(wordsStr).asScala.map(_.word) val userId: String = record.userId
words.map(word => (userId, word)) }) userIdAndWordRDD val result2: Array[((String, String), Int)] = userIdAndWordRDD .filter(t=> !t.2.equals(".") && !t.2.equals("+")) .map((, 1)) .reduceByKey( + ) .sortBy(.2, ascending = false) .take( num = 10) //–3.各个时间段搜索热度 val result3: Array[String, Int)] = SogouRecordRDD.map(record => { val timeStr: String = record.queryTime val hourAndMitunesStr: String = timeStr.substring(0, 5) (hourAndMitunesStr, 1) }).reduceByKey( + ) .sortBy(._2, ascending = false) .take( num = 10) //TODO 4.输出结果 result1.foreach(println) result2.foreach(println) result3.foreach(println) //TODO 5.释放资源 sc.stop() } //准备一个样例类用来封装数据 / * 用户搜索点击网页记录Record * @param queryTime 访问时间, 格式为:HH:mm:ss * @param userID 用户ID * @param queryWords 查询词 * @param resultRank 该URL在返回结果中的排名 * @param clickRank 用户点击的顺序号 * @param clickUrl 用户点击的URL */ case class SogouRecord( queryTime: String, userID: String, queryWords: String, resultRank: Int, clickRank: Int, clickUrl: String )}
|