6. SparkCore项目实战
6.1 数据准备
本项目的数据是采集电商网站的用户行为数据,主要包含用户的4种行为:搜索、点击、下单和支付。
1)数据格式 (1)数据采用_分割字段 (2)每一行表示用户的一个行为,所以每一行只能是四种行为中的一种。 (3)如果搜索关键字是null,表示这次不是搜索 (4)如果点击的品类id和产品id是-1表示这次不是点击 (5)下单行为来说一次可以下单多个产品,所以品类id和产品id都是多个,id之间使用逗号,分割。如果本次不是下单行为,则他们相关数据用null来表示 (6)支付行为和下单行为类似
2)数据字段详细说明
编号 | 字段名称 | 字段类型 | 字段含义 |
---|
1 | date | String | 用户点击行为的日期 | 2 | user_id | Long | 用户的ID | 3 | session_id | String | Session的ID | 4 | page_id | Long | 某个页面的ID | 5 | action_time | String | 动作的时间点 | 6 | search_keyword | String | 用户搜索的关键词 | 7 | click_category_id | Long | 某一个商品品类的ID | 8 | click_product_id | Long | 某一个商品的ID | 9 | order_category_ids | String | 一次订单中所有品类的ID集合 | 10 | order_product_ids | String | 一次订单中所有商品的ID集合 | 11 | pay_category_ids | String | 一次支付中所有品类的ID集合 | 12 | pay_product_ids | String | 一次支付中所有商品的ID集合 | 13 | city_id | Long | 城市id |
6.2 需求1:Top10热门品类
需求说明:品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。
6.2.1 需求分析一:
分别统计每个产品点击的次数,下单的次数,支付的次数
6.2.2 代码实现一:
package com.atguigu.spark.day07
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ListBuffer
object Spark01_TopN_req1 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val dataRDD: RDD[String] = sc.textFile("E:\\spark-0701\\input\\user_visit_action.txt")
val actionRDD: RDD[UserVisitAction] = dataRDD.map {
line => {
val fields: Array[String] = line.split("_")
UserVisitAction(
fields(0),
fields(1).toLong,
fields(2),
fields(3).toLong,
fields(4),
fields(5),
fields(6).toLong,
fields(7).toLong,
fields(8),
fields(9),
fields(10),
fields(11),
fields(12).toLong
)
}
}
val infoRDD: RDD[CategoryCountInfo] = actionRDD.flatMap {
userAction => {
if (userAction.click_category_id != -1) {
List(CategoryCountInfo(userAction.click_category_id.toString, 1, 0, 0))
} else if (userAction.order_category_ids != "null") {
val ids: Array[String] = userAction.order_category_ids.split(",")
val categoryCountInfoList: ListBuffer[CategoryCountInfo] = ListBuffer[CategoryCountInfo]()
for (id <- ids) {
categoryCountInfoList.append(CategoryCountInfo(id, 0, 1, 0))
}
categoryCountInfoList
} else if (userAction.pay_category_ids != "null") {
val ids: Array[String] = userAction.pay_category_ids.split(",")
val categoryCountInfoList: ListBuffer[CategoryCountInfo] = ListBuffer[CategoryCountInfo]()
for (id <- ids) {
categoryCountInfoList.append(CategoryCountInfo(id, 0, 0, 1))
}
categoryCountInfoList
} else {
Nil
}
}
}
val groupRDD: RDD[(String, Iterable[CategoryCountInfo])] = infoRDD.groupBy(_.categoryId)
val reduceRDD: RDD[(String, CategoryCountInfo)] = groupRDD.mapValues {
datas => {
datas.reduce {
(info1, info2) => {
info1.clickCount = info1.clickCount + info2.clickCount
info1.orderCount = info2.clickCount + info2.clickCount
info1.payCount = info1.payCount + info2.payCount
info1
}
}
}
}
val mapRDD: RDD[CategoryCountInfo] = reduceRDD.map(_._2)
val res: Array[CategoryCountInfo] = mapRDD.sortBy(info => (info.clickCount, info.orderCount, info.payCount), false).take(10)
res.foreach(println)
sc.stop()
}
}
case class UserVisitAction(date: String,
user_id: Long,
session_id: String,
page_id: Long,
action_time: String,
search_keyword: String,
click_category_id: Long,
click_product_id: Long,
order_category_ids: String,
order_product_ids: String,
pay_category_ids: String,
pay_product_ids: String,
city_id: Long)
case class CategoryCountInfo(categoryId: String,
var clickCount: Long,
var orderCount: Long,
var payCount: Long)
输出:
CategoryCountInfo(15,6120,0,1259)
CategoryCountInfo(2,6119,2,1196)
CategoryCountInfo(20,6098,0,1244)
CategoryCountInfo(12,6095,2,1218)
CategoryCountInfo(11,6093,0,1202)
CategoryCountInfo(17,6079,2,1231)
CategoryCountInfo(7,6074,2,1252)
CategoryCountInfo(9,6045,2,1230)
CategoryCountInfo(19,6044,2,1158)
CategoryCountInfo(13,6036,2,1161)
6.2.3 需求分析二
采用累加器,避免shuffle过程。 最好的办法应该是遍历一次能够计算出来上述的3个指标。使用累加器可以达成我们的需求。 (1)遍历全部日志数据,根据品类id和操作类型分别累加,需要用到累加器 定义累加器,当碰到订单和支付业务的时候注意拆分字段才能得到品类 id (2)遍历完成之后就得到每个品类 id 和操作类型的数量. (3)按照点击下单支付的顺序来排序 (4)取出 Top10
6.3 需求2:Top10热门品类中每个品类的Top10活跃Session统计
6.3.1 需求分析
- 需求描述
对于排名前10的品类,分别获取每个品类点击次数排名前10的sessionId。(注意: 这里我们只关注点击次数,不关心下单和支付次数) 这个就是说,对于top10的品类,每一个都要获取对它点击次数排名前10的sessionId。这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的session的行为。 - 分析思路
通过需求1,获取TopN热门品类的id 将原始数据进行过滤(1.保留热门品类 2.只保留点击操作) 对session的点击数进行转换 (category-session,1) 对session的点击数进行统计 (category-session,sum) 将统计聚合的结果进行转换 (category,(session,sum)) 将转换后的结构按照品类进行分组 (category,Iterator[(session,sum)]) 对分组后的数据降序 取前10
|