数据库表准备
我们这次 Spark-SQL 操作中所有的数据均来自 Hive ,首先在 Hive 中创建表,并导入数据。一共有 3 张表: 1 张用户行为表,1 张城市表,1 张产品表。
主要包含用户的 4 种行为:搜索,点击,下单,支付。数据规则如下:
? 数据文件中每行数据采用下划线分隔数据
? 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种
? 如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据
本次需求关键的字段:
click_product_id,city_id
包含城市及地区
包含商品名称和渠道(自营或第三方)
需求说明
需求:各区域热门商品 Top3
- 查询出来所有的点击记录,并与?
city_info ?表连接,得到每个城市所在的地区,与product_info ?表连接得到产品名称。 - 按照地区和商品?
id ?分组,统计出每个商品在每个地区的总点击次数。 - 每个地区内按照点击次数降序排列。
- 只取前三名。
- 城市备注需要自定义?
UDAF ?函数。
这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。
地区 | 商品名称 | 点击次数 | 城市备注 | 华北 | 商品A | 100000 | 北京21.2%,天津 13.2%,其他 65.6% | 华北 | 商品 P | 80200 | 北京 63.0%,太原 10%,其他 27.0% | 华北 | 商品 M | 40000 | 北京 63.0%,太原 10%,其他 27.0% | 东北 | 商品 J | 92000 | 大连 28%,辽宁 17.0%,其他 55.0% |
代码实现
1.建表:
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark =
SparkSession
.builder()
.enableHiveSupport()
.config(sparkConf)
.config("dfs.client.use.datanode.hostname", "true")
.config("dfs.replication", "2")
.getOrCreate()
spark.sql("use hive")
// 准备数据
spark.sql(
"""
|CREATE TABLE `user_visit_action`(
| `date` string,
| `user_id` bigint,
| `session_id` string,
| `page_id` bigint,
| `action_time` string,
| `search_keyword` string,
| `click_category_id` bigint,
| `click_product_id` bigint,
| `order_category_ids` string,
| `order_product_ids` string,
| `pay_category_ids` string,
| `pay_product_ids` string,
| `city_id` bigint)
|row format delimited fields terminated by '\t'
""".stripMargin)
spark.sql(
"""
|load data local inpath './datas/user_visit_action.txt' into table hive.user_visit_action
""".stripMargin)
spark.sql(
"""
|CREATE TABLE `product_info`(
| `product_id` bigint,
| `product_name` string,
| `extend_info` string)
|row format delimited fields terminated by '\t'
""".stripMargin)
spark.sql(
"""
|load data local inpath './datas/product_info.txt' into table hive.product_info
""".stripMargin)
spark.sql(
"""
|CREATE TABLE `city_info`(
| `city_id` bigint,
| `city_name` string,
| `area` string)
|row format delimited fields terminated by '\t'
""".stripMargin)
spark.sql(
"""
|load data local inpath './datas/city_info.txt' into table hive.city_info
""".stripMargin)
spark.sql("""select * from city_info""").show
spark.close()
}
2.需求实现
文字部分比较麻烦,可以用udf也可以使用sql。
Sql方法:
spark.sql(
"""
|select
|*
|from
|
|(
| select
| t2.*,
| row_number() over(partition by t2.area order by t2.cli_cnt desc) rank
| from
| (
| select
| t1.area,
| t1.product_name,
| count(*) as cli_cnt
| from
| (select
| c.area,
| c.city_name,
| p.product_name,
| u.click_product_id
| from
| city_info c
| left join user_visit_action u on c.city_id = u.city_id
| left join product_info p on u.click_product_id = p.product_id) t1
| where click_product_id >=0
| group by t1.area, t1.product_name
| ) t2
| )t3 where rank <= 3
|
|
|
|""".stripMargin
).createOrReplaceTempView("d1")
// 城市备注部分,将商品和城市连接,需要取得每个商品TOP2城市的点击次数,文字部分用concat,多行聚合可以使用group_concat(sparksql不支持)/concat_ws
spark.sql(
"""
|select t6.product_name,t6.area, concat_ws(',', collect_set(info1)) as info
|from
|(select
|*,
|concat(city,round((case when rank1 = 3 then total-sum_+cnt else cnt end)*100/total,1),'%') info1
|from
|(select *,
|(case when rank1 =3 then '其他' else city_name end) city,
|sum(cnt) over (partition by product_name,area order by rank1 asc) sum_
| from
|(select
|t3.*,
|row_number() over(partition by t3.product_name order by cnt desc) rank1
|
| from
| (select
| t2.*,
| sum(cnt) over(partition by area,product_name) total
| from
| (select
| t1.city_name,
| t1.product_name,
| t1.area,
| count(*) as cnt
| from
| (select
| c.area,
| c.city_name,
| p.product_name,
| u.click_product_id
| from
| city_info c
| left join user_visit_action u on c.city_id = u.city_id
| left join product_info p on u.click_product_id = p.product_id) t1
| where click_product_id >=0
| group by product_name,area,city_name) t2)t3
|join d1 on d1.product_name = t3.product_name and d1.area = t3.area)t4
| where t4.rank1 <=3)t5 order by area,product_name,rank1 asc
|)t6
|group by area,product_name
|order by area,product_name asc
|""".stripMargin
).createOrReplaceTempView("d2")
spark.sql(
"""
|select
| d1.area,
| d1.product_name,
| d1.cli_cnt,
| d2.info
|from
| d1
| left join d2
| on d1.product_name = d2.product_name and d1.area = d2.area
| order by area,rank asc
|""".stripMargin
).show
自定义UDF方法
- 连接三张表的数据,获取完整的数据(只有点击)。
- 将数据根据地区,商品名称分组。
- 统计商品点击次数总和,取?
Top3 。
select
*
from (
select
*,
rank() over( distribute by area order by clickCnt desc ) as rank
from (
select
area,
product_name,
count(*) as clickCnt
from (
select
a.*,
p.product_name,
c.area,
c.city_name
from user_visit_action a
join product_info p on a.click_product_id = p.product_id
join city_info c on a.city_id = c.city_id
where a.click_product_id > -1
) t1 group by area, product_name
) t2
) t3 where rank <= 3
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Aggregator
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
object Spark06_SparkSQL_Test2 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "xqzhao")
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
spark.sql("use xinge")
// 查询基本数据
spark.sql(
"""
|select
| a.*,
| p.product_name,
| c.area,
| c.city_name
|from user_visit_action a
|join product_info p on a.click_product_id = p.product_id
|join city_info c on a.city_id = c.city_id
|where a.click_product_id > -1
""".stripMargin).createOrReplaceTempView("t1")
// 根据区域、商品进行数据聚合
spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF()))
spark.sql(
"""
|select
| area,
| product_name,
| count(*) as clickCnt,
| cityRemark(city_name) as city_remark
|from t1 group by area, product_name
""".stripMargin).createOrReplaceTempView("t2")
// 区域内对点击数量进行排行
spark.sql(
"""
|select
| *,
| rank() over( partition by area order by clickCnt desc ) as rank
|from t2
""".stripMargin).createOrReplaceTempView("t3")
// 取前三名
spark.sql(
"""
|select
| *
|from t3 where rank <= 3
""".stripMargin).show(false)
// truncate = false : 避免因内容过长被截取
spark.close()
}
case class Buffer(var total: Long, var cityMap: mutable.Map[String, Long])
// 自定义聚合函数: 实现城市备注功能
// 1.继承Aggregator,定义泛型
// IN : 城市名称
// BUF :【总点击数量, Map[(city, cnt), (city, cnt)]】
// OUT : 备注信息
// 2.重写方法 (6)
class CityRemarkUDAF extends Aggregator[String, Buffer, String] {
// 缓冲区初始化
override def zero: Buffer = {
Buffer(0, mutable.Map[String, Long]())
}
// 更新缓冲区
override def reduce(buff: Buffer, city: String): Buffer = {
buff.total += 1
val newCount = buff.cityMap.getOrElse(city, 0L)+ 1
buff.cityMap.update(city, newCount)
buff
}
override def merge(buff1: Buffer, buff2: Buffer): Buffer = {
buff1.total += buff2.total
val map1 = buff1.cityMap
val map2 = buff2.cityMap
// buff1.cityMap = map1.foldLeft(map2) {
// case (map, (city, cnt)) => {
// val newCount = map.getOrElse(city, 0L) + cnt
// map.update(city, newCount)
// map
// }
// }
// 上面的写法不太容易看懂,因此换一种合并方法
map2.foreach{
case (city, cnt) => {
val newCount = map1.getOrElse(city, 0L) + cnt
map1.update(city, newCount)
}
}
buff1.cityMap = map1
buff1
}
// 将统计的结果生成字符串信息
override def finish(buff: Buffer): String = {
val remarkList = ListBuffer[String]()
val totalcnt = buff.total
val cityMap = buff.cityMap
// 降序排列
var cityCntList = cityMap.toList.sortWith(
(left, right) => {
left._2 > right._2
}
).take(2)
val hasMore = cityMap.size > 2
var rsum = 0L
cityCntList.foreach {
case (city, cnt) => {
val r = cnt * 100 / totalcnt
remarkList.append(s"${city} ${r}%")
rsum += r
}
}
if (hasMore) {
remarkList.append(s"其他${100 - rsum}%")
}
remarkList.mkString(",")
}
override def bufferEncoder: Encoder[Buffer] = Encoders.product
override def outputEncoder: Encoder[String] = Encoders.STRING
}
}
|