本文实战案例数据来自尚硅谷大数据技术之Spark教程,资源下载链接见评论区。
环境版本信息
操作系统:Windows 10家庭中文版 (21H1) 编程语言:Scala 集成开发环境:IntelliJ IDEA 2021.1.1 (Ultimate Edition) JDK版本:13.0.1 Maven版本:3.8.1 Scala版本:2.12.11 Spark版本:3.0.0
数据集说明
 数据集是尚硅谷的电商网站用户行为数据,主要包含用户的4种行为:搜索,点击,下单,支付。数据规则如下:
- 数据文件中每行数据采用下划线分隔。
- 每一行数据表示用户的一次行为。
- 如果搜索关键字为null,表示数据不是搜索数据。
- 如果点击的品类ID和产品ID为-1,表示数据不是点击数据。
- 针对于下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,ID之间采用逗号分隔;如果本次不是下单行为,则数据采用null表示。
- 支付行为和下单行为类似。
详细字段说明如下:
编号 | 字段名称 | 字段类型 | 字段含义 |
---|
1 | date | String | 用户点击行为的日期 | 2 | user_id | Long | 用户ID | 3 | session_id | String | Session ID | 4 | page_id | Long | 页面ID | 5 | action_time | String | 动作的时间点 | 6 | search_keyword | String | 用户搜索关键字 | 7 | click_category_id | Long | 商品品类ID | 8 | click_product_id | Long | 商品ID | 9 | order_category_ids | String | 一次订单中所有品类的ID集合 | 10 | order_product_ids | String | 一次订单中所有商品的ID集合 | 11 | pay_category_ids | String | 一次支付中所有品类的ID集合 | 12 | pay_product_ids | String | 一次支付中所有商品的ID集合 | 13 | city_id | Long | 城市ID |
需求说明
统计热门品类的Top 10。这里对热门品类的定义是:按照每个品类的点击数量、下单数量、支付数量从高到低排列,即先按照点击数量排名,若相同则比较下单数量,若下单数量相同则比较支付数量。
实现方式1
读取文件中的数据,转换数据结构,其中点击行为转换为(商品品类ID, (1, 0, 0)) ,下单行为转换为(商品品类ID, (0, 1, 0)) ,支付行为转换为(商品品类ID, (0, 0, 1)) 。将商品品类ID相同的数据进行分组聚合,对其降序排序并取Top 10即可满足需求。 在IDEA中新建Maven工程,导入Scala插件支持。 
创建data目录,用于存放数据集。项目结构如下: 
在pom.xml文件中导入Spark相关依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cch</groupId>
<artifactId>user_visit_action</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>13</maven.compiler.source>
<maven.compiler.target>13</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
在Maven项目的target/classes/目录下新建log4j.properties文件,输入以下内容,过滤掉除ERROR以外的信息:
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
log4j.logger.org.apache.spark.repl.Main=ERROR
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
Scala代码如下:
package com.cch.bigdata.spark.core.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Req1_HotCategoryTop10Analysis {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparkConf)
val actionRDD: RDD[String] = sc.textFile("data/user_visit_action.txt").cache()
val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
action => {
val data: Array[String] = action.split("_")
if (data(6) != "-1") {
List((data(6), (1, 0, 0)))
} else if (data(8) != "null") {
val ids: Array[String] = data(8).split(",")
ids.map(id => (id, (0, 1, 0)))
} else if (data(10) != "null") {
val ids: Array[String] = data(10).split(",")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
}
)
val analysisRDD: RDD[(String, (Int, Int, Int))] = flatRDD.reduceByKey(
(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
)
val result: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, ascending = false).take(10)
result.foreach(println)
sc.stop()
}
}
实现方式2
在使用reduceByKey() 方法时会引入Shuffle操作,可以使用累加器对数据进行聚合,提高程序性能。 Scala代码如下:
package com.cch.bigdata.spark.core.req
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Req1_HotCategoryTop10Analysis_Acc {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10AnalysisAcc"))
val actionRDD: RDD[String] = sc.textFile("data/user_visit_action.txt").cache()
val acc = new HotCategoryAccumulator
sc.register(acc, "hotCategory")
actionRDD.foreach(
action => {
val data: Array[String] = action.split("_")
if (data(6) != "-1") {
acc.add((data(6), "click"))
} else if (data(8) != "null") {
val ids: Array[String] = data(8).split(",")
ids.foreach(id => acc.add((id, "order")))
} else if (data(10) != "null") {
val ids: Array[String] = data(10).split(",")
ids.foreach(id => acc.add((id, "pay")))
}
}
)
val accVal: mutable.Map[String, HotCategory] = acc.value
val categories: Iterable[HotCategory] = accVal.values
val result: List[(String, (Int, Int, Int))] = categories.toList.sortWith(
(left, right) => {
if (left.clickCnt > right.clickCnt) {
true
} else if (left.clickCnt == right.clickCnt) {
if (left.orderCnt > right.orderCnt) {
true
} else if (left.orderCnt == right.orderCnt) {
left.payCnt > right.payCnt
} else {
false
}
} else {
false
}
}
).take(10).map(hc => (hc.cid, (hc.clickCnt, hc.orderCnt, hc.payCnt)))
result.foreach(println)
sc.stop()
}
case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)
class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {
private val hcMap = mutable.Map[String, HotCategory]()
override def isZero: Boolean = hcMap.isEmpty
override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = new HotCategoryAccumulator
override def reset(): Unit = hcMap.clear()
override def add(v: (String, String)): Unit = {
val cid = v._1
val actionType = v._2
val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))
if (actionType == "click") {
category.clickCnt += 1
} else if (actionType == "order") {
category.orderCnt += 1
} else if (actionType == "pay") {
category.payCnt += 1
}
hcMap.update(cid, category)
}
override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
val map1: mutable.Map[String, HotCategory] = this.hcMap
val map2: mutable.Map[String, HotCategory] = other.value
map2.foreach {
case (cid, hc) =>
val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))
category.clickCnt += hc.clickCnt
category.orderCnt += hc.orderCnt
category.payCnt += hc.payCnt
map1.update(cid, category)
}
}
override def value: mutable.Map[String, HotCategory] = hcMap
}
}
运行结果
(15,(6120,1672,1259))
(2,(6119,1767,1196))
(20,(6098,1776,1244))
(12,(6095,1740,1218))
(11,(6093,1781,1202))
(17,(6079,1752,1231))
(7,(6074,1796,1252))
(9,(6045,1736,1230))
(19,(6044,1722,1158))
(13,(6036,1781,1161))
|