第10章 Spark案例实操
10.1 数据说明
 上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的 4 种行为:搜索 ,点击 ,下单 ,支付 。数据规则如下:
- 数据文件中每行数据采用下划线分隔数据。
- 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种
- 如果搜索关键字为 null,表示数据不是搜索数据。
- 如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据。
- 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示。
- 支付行为和下单行为类似。
详细字段说明:
编号 | 字段名称 | 字段类型 | 字段含义 |
---|
1 | date | String | 用户点击行为的日期 | 2 | user_id | Long | 用户的 ID | 3 | session_id | String | Session 的 ID | 4 | page_id | Long | 某个页面的 ID | 5 | action_time | String | 动作的时间点 | 6 | search_keyword | String | 用户搜索的关键词 | 7 | click_category_id | String | 某一个商品品类的 ID | 8 | click_product_id | Long | 某一个商品的 ID | 9 | order_category_ids | String | 一次订单中所有品类的 ID 集合 | 10 | order_product_ids | String | 一次订单中所有商品的 ID 集合 | 11 | pay_category_ids | String | 一次支付中所有品类的 ID 集合 | 12 | pay_product_ids | String | 一次支付中所有商品的 ID 集合 | 13 | city_id | Long | 城市 id |
case class UserVisitAction(
date: String,
user_id: Long,
session_id: String,
page_id: Long,
action_time: String,
search_keyword: String,
click_category_id: Long,
click_product_id: Long,
order_category_ids: String,
order_product_ids: String,
pay_category_ids: String,
pay_product_ids: String,
city_id: Long
)
10.2 需求 1:Top10 热门品类

10.2.1 需求说明
品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。
- 鞋 点击数 下单数 支付数
- 衣服 点击数 下单数 支付数
- 电脑 点击数 下单数 支付数
例如:
- 综合排名 = 点击数×20% + 下单数×30% + 支付数×50%
本项目需求优化为:先按照**点击数**排名,靠前的就排名高;如果点击数相同,再比较**下单数**;下单数再相同,就比较**支付数**。
10.2.2 实现方案一
10.2.2.1 需求分析
- 分别统计每个品类点击的次数,下单的次数和支付的次数: (品类, 点击总数) (品类, 下单总数) (品类, 支付总数)。
10.2.2.2 功能实现
object Spark01_Req1_HotCategoryTop10Analysis {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf ().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparkConf)
val actionRDD = sc.textFile("datas/user_visit_action.txt")
val clickActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(6) != "-1"
}
)
val clickCountRDD = clickActionRDD.map(
action => {
val datas = action.split("_")
(datas(6), 1)
}
).reduceByKey(_+_)
val orderActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(8) != "null"
}
)
val orderCountRDD = orderActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(8)
val cids = cid.split(",")
cids.map(id=>(id, 1))
}
).reduceByKey(_+_)
val payActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(10) != "null"
}
)
val payCountRDD = orderActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(10)
val cids = cid.split(",")
cids.map(id=>(id, 1))
}
).reduceByKey(_+_)
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))]
= clickCountRDD.cogroup(orderCountRDD, payCountRDD)
val analysisRDD = cogroupRDD.mapValues {
case (clickIter, orderIter, payIter) => {
var clickCnt = 0
val iter1 = clickIter.iterator
if (iter1.hasNext) {
clickCnt = iter1.next()
}
var orderCnt = 0
val iter2 = orderIter.iterator
if (iter2.hasNext) {
orderCnt = iter2.next()
}
var payCnt = 0
val iter3 = payIter.iterator
if (iter3.hasNext) {
payCnt = iter3.next()
}
(clickCnt, orderCnt, payCnt)
}
}
val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
resultRDD.foreach(println)
sc.stop()
}
}
分析:上述实现方式可能存在以下问题:
- 问题一 :
actionRDD 重复使用 - 问题二 :
cogroup 性能可能较低
object Spark02_Req1_HotCategoryTop10Analysis1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf ().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparkConf)
val actionRDD = sc.textFile("datas/user_visit_action.txt")
actionRDD.cache()
val clickActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(6) != "-1"
}
)
val clickCountRDD = clickActionRDD.map(
action => {
val datas = action.split("_")
(datas(6), 1)
}
).reduceByKey(_+_)
val orderActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(8) != "null"
}
)
val orderCountRDD = orderActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(8)
val cids = cid.split(",")
cids.map(id=>(id, 1))
}
).reduceByKey(_+_)
val payActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(10) != "null"
}
)
val payCountRDD = orderActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(10)
val cids = cid.split(",")
cids.map(id=>(id, 1))
}
).reduceByKey(_+_)
val rdd1 = clickCountRDD.map{
case (cid, cnt) => {
(cid,(cnt, 0, 0))
}
}
val rdd2 = orderCountRDD.map{
case (cid,cnt) => {
(cid, (0, cnt, 0))
}
}
val rdd3 = payCountRDD.map{
case (cid, cnt) => {
(cid,(0, 0, cnt))
}
}
val soruceRDD: RDD[(String, (Int, Int, Int))] = rdd1.union(rdd2).union(rdd3)
val analysisRDD = soruceRDD.reduceByKey(
( t1,t2 ) => {
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
}
)
val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
resultRDD.foreach(println)
sc.stop()
}
}
10.2.3 实现方案二
10.2.3.1 需求分析
- 一次性统计每个品类点击的次数,下单的次数和支付的次数: (品类, (点击总数, 下单总数, 支付总数))。
10.2.3.2 功能实现
…
10.2.4 实现方案三
10.2.4.1 需求分析
10.2.4.2 功能实现
…
10.3 需求 2:Top10 热门品类中每个品类的 Top10 活跃 Session 统计
10.3.1 需求说明
- 在需求一的基础上,增加每个品类用户 session 的点击统计。
10.3.2 需求分析
…
10.3.3 功能实现
…
10.4 需求 3:页面单跳转换率统计
10.4.1 需求说明
…
10.4.2 需求分析
…
10.4.3 功能实现
…
声明:本文是学习时记录的笔记,如有侵权请告知删除! 原视频地址:https://www.bilibili.com/video/BV11A411L7CK
|