学习致谢:
https://www.bilibili.com/video/BV1Xz4y1m7cv?p=36
一、 数据
数据网站: http: //www.sogou.com/labs/resource/q.php
二、需求
针对SougoQ用户查询日志数据中不同字段,使用SparkContext读取日志数据,封装到RDD数据集中,调用Transformation函数和Action函数进行处理不同业务统计分析
三、分词工具测试
使用比较流行好用的中文分区:HanLP,面向生产环境的自然语言处理工具包,HanLP是由一系列模型与算法组成的Java工具包,目标是普及自然语言处理在生产环境中的应用 官方网站:http://www.hanlp.com/ 添加maven依赖
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.7</version>
</dependency>
import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import scala.collection.JavaConverters._
object HanLPTest {
object HanLPTest {
def main(args: Array[String]):Unit = {
val words = "[HanLP入门案例]"
val terms: util.List[Term] = HanLP.segment(words)
println(terms)
println(terms.asScala.map(_.word))
val cleanwords1: String = words.replaceAll("HM[/NN]","")
println(cleanwords1)
println(HanLP.segment(cleanwords1).asScala.map(_.word))
val log = """e:00:00 2982199073774412 [360安全卫士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20036/179"""
val cleanwords2 = log.split("Ils+")(2)
println(HanLP.segment(cleanwords2).asScala.map(_.word))
}
}
}
运行结果
四、代码实现
import com.hankcs.hanlp.HanLP
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object SouGouSearchAnalysis {
def main(args: Array[String]): Unit = {
val conf: SparkConf=new SparkConf().setAppName("spark").setMaster("local[*]")
val sc: SparkContext=new SparkContext(conf)
sc.setLogLevel("WARN")
val lines:RDD[String]=sc.textFile("data/SogouQ.sample")
val SogouRecordRDD: RDD[SogouRecord]=lines.map(line=>{
var arr: Array[String]=line.split("\\s+")
SogouRecord(
arr(0),
arr(1),
arr(2),
arr(3).toInt,
arr(4).toInt,
arr(5)
)
})
val wordsRDD:RDD[String]=SogouRecordRDD.flatMap(record=>{
val wordsStr:String =record.queryWords.replaceAll("\\[|\\]","")
import scala.collection.JavaConverters._
HanLP.segment(wordsStr).asScala.map(_.word)
})
val result1: Array[(String,Int)]=wordsRDD
.filter(word=> !word.equals(".")&& !word.equals("+"))
.map((_,1))
.reduceByKey(_+_)
.sortBy(_._2,false)
.take(10)
val userIdAndWordRDD:RDD[(String,String)]=SogouRecordRDD.flatMap(record=>{
val wordsStr:String =record.queryWords.replaceAll("\\[|\\]","")
import scala.collection.JavaConverters._
val words: mutable.Buffer[String]=HanLP.segment(wordsStr).asScala.map(_.word)
val userId: String=record.userId
words.map(word=>(userId,word))
})
val result2: Array[((String,String),Int)]=userIdAndWordRDD
.filter(word=> !word._2.equals(".")&& !word._2.equals("+"))
.map((_,1))
.reduceByKey(_+_)
.sortBy(_._2,false)
.take(10)
val result3: Array[(String,Int)]=SogouRecordRDD.map(record=>{
val timeStr:String=record.queryTime
val hourAndMinunesStr:String =timeStr.substring(0,5)
(hourAndMinunesStr,1)
}).reduceByKey(_+_)
.sortBy(_._2,false)
.take(10)
result1.foreach(println)
result2.foreach(println)
result3.foreach(println)
sc.stop()
}
}
case class SogouRecord(
queryTime:String,
userId:String,
queryWords:String,
resultRank:Int,
clickRank:Int,
clickUrl:String
)
|