IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SparkSQL——各区域热门商品TOP3 -> 正文阅读

[大数据]SparkSQL——各区域热门商品TOP3

数据库表准备

我们这次 Spark-SQL 操作中所有的数据均来自 Hive,首先在 Hive 中创建表,并导入数据。一共有 3 张表: 1 张用户行为表,1 张城市表,1 张产品表

  • 用户行为表user_visit_action:

主要包含用户的 4 种行为:搜索,点击,下单,支付。数据规则如下:

? 数据文件中每行数据采用下划线分隔数据

? 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种

? 如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据

本次需求关键的字段:

click_product_id,city_id

  • 城市表city_info:

包含城市及地区

  • 产品表product_info:

包含商品名称和渠道(自营或第三方)

需求说明

需求:各区域热门商品 Top3

  • 查询出来所有的点击记录,并与?city_info?表连接,得到每个城市所在的地区,与product_info?表连接得到产品名称。
  • 按照地区和商品?id?分组,统计出每个商品在每个地区的总点击次数。
  • 每个地区内按照点击次数降序排列。
  • 只取前三名。
  • 城市备注需要自定义?UDAF?函数。

这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。

地区商品名称点击次数城市备注
华北商品A100000北京21.2%,天津 13.2%,其他 65.6%
华北商品 P80200北京 63.0%,太原 10%,其他 27.0%
华北商品 M40000北京 63.0%,太原 10%,其他 27.0%
东北商品 J92000大连 28%,辽宁 17.0%,其他 55.0%

代码实现

1.建表:

 def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")
 
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
        val spark = 
            SparkSession
            .builder()
            .enableHiveSupport()
            .config(sparkConf)
            .config("dfs.client.use.datanode.hostname", "true")
            .config("dfs.replication", "2")
            .getOrCreate()
 
        spark.sql("use hive")
 
        // 准备数据
        spark.sql(
            """
              |CREATE TABLE `user_visit_action`(
              |  `date` string,
              |  `user_id` bigint,
              |  `session_id` string,
              |  `page_id` bigint,
              |  `action_time` string,
              |  `search_keyword` string,
              |  `click_category_id` bigint,
              |  `click_product_id` bigint,
              |  `order_category_ids` string,
              |  `order_product_ids` string,
              |  `pay_category_ids` string,
              |  `pay_product_ids` string,
              |  `city_id` bigint)
              |row format delimited fields terminated by '\t'
            """.stripMargin)
 
        spark.sql(
            """
              |load data local inpath './datas/user_visit_action.txt' into table hive.user_visit_action
            """.stripMargin)
 
        spark.sql(
            """
              |CREATE TABLE `product_info`(
              |  `product_id` bigint,
              |  `product_name` string,
              |  `extend_info` string)
              |row format delimited fields terminated by '\t'
            """.stripMargin)
 
        spark.sql(
            """
              |load data local inpath './datas/product_info.txt' into table hive.product_info
            """.stripMargin)
 
        spark.sql(
            """
              |CREATE TABLE `city_info`(
              |  `city_id` bigint,
              |  `city_name` string,
              |  `area` string)
              |row format delimited fields terminated by '\t'
            """.stripMargin)
 
        spark.sql(
            """
              |load data local inpath './datas/city_info.txt' into table hive.city_info
            """.stripMargin)
 
        spark.sql("""select * from city_info""").show
 
 
        spark.close()
    }

2.需求实现

文字部分比较麻烦,可以用udf也可以使用sql。

Sql方法:

    spark.sql(
      """
        |select
        |*
        |from
        |
        |(
        | select
        |     t2.*,
        |     row_number() over(partition by t2.area order by t2.cli_cnt desc) rank
        |     from
        |     (
        |     select
        |       t1.area,
        |       t1.product_name,
        |       count(*) as cli_cnt
        |       from
        |       (select
        |         c.area,
        |         c.city_name,
        |         p.product_name,
        |         u.click_product_id
        |         from
        |         city_info c
        |         left join user_visit_action u on c.city_id =  u.city_id
        |         left join product_info p on u.click_product_id = p.product_id) t1
        |         where click_product_id >=0
        |         group by t1.area, t1.product_name
        |         ) t2
        |      )t3 where rank <= 3
        |
        |
        |
        |""".stripMargin
    ).createOrReplaceTempView("d1")
 
    //    城市备注部分,将商品和城市连接,需要取得每个商品TOP2城市的点击次数,文字部分用concat,多行聚合可以使用group_concat(sparksql不支持)/concat_ws
    spark.sql(
      """
        |select t6.product_name,t6.area, concat_ws(',', collect_set(info1)) as info
        |from
        |(select
        |*,
        |concat(city,round((case when rank1 = 3 then total-sum_+cnt else cnt end)*100/total,1),'%') info1
        |from
        |(select *,
        |(case when rank1 =3 then '其他' else city_name end) city,
        |sum(cnt) over (partition by product_name,area order by rank1 asc) sum_
        | from
        |(select
        |t3.*,
        |row_number() over(partition by t3.product_name order by cnt desc) rank1
        |
        | from
        | (select
        |   t2.*,
        |   sum(cnt) over(partition by area,product_name) total
        |   from
        |   (select
        |      t1.city_name,
        |      t1.product_name,
        |      t1.area,
        |      count(*) as cnt
        |      from
        |       (select
        |         c.area,
        |         c.city_name,
        |         p.product_name,
        |         u.click_product_id
        |         from
        |         city_info c
        |         left join user_visit_action u on c.city_id =  u.city_id
        |         left join product_info p on u.click_product_id = p.product_id) t1
        |       where click_product_id >=0
        |       group by product_name,area,city_name) t2)t3
        |join d1 on d1.product_name = t3.product_name and d1.area = t3.area)t4
        |  where  t4.rank1 <=3)t5  order by area,product_name,rank1 asc
        |)t6
        |group by area,product_name
        |order by area,product_name asc
        |""".stripMargin
    ).createOrReplaceTempView("d2")
 
    spark.sql(
      """
        |select
        |   d1.area,
        |   d1.product_name,
        |   d1.cli_cnt,
        |   d2.info
        |from
        |   d1
        |   left join d2
        |   on d1.product_name = d2.product_name and d1.area = d2.area
        |   order by area,rank asc
        |""".stripMargin
    ).show

自定义UDF方法

  • 连接三张表的数据,获取完整的数据(只有点击)。
  • 将数据根据地区,商品名称分组。
  • 统计商品点击次数总和,取?Top3
select
	*
from (
	select 
		*,
		rank() over( distribute by area order by clickCnt desc ) as rank
	from (
		select
			area,
			product_name,
			count(*) as clickCnt 
		from (
			select
				a.*,
				p.product_name,
				c.area,
				c.city_name
			from user_visit_action a
			join product_info p on a.click_product_id = p.product_id 
			join city_info c on a.city_id = c.city_id
			where a.click_product_id > -1
		) t1 group by area, product_name
	) t2
) t3 where rank <= 3
  • 实现自定义聚合函数显示备注。
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Aggregator

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

object Spark06_SparkSQL_Test2 {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "xqzhao")

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    val spark = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

    spark.sql("use xinge")

    // 查询基本数据
    spark.sql(
      """
        |select
        |  a.*,
        |	 p.product_name,
        |	 c.area,
        |	 c.city_name
        |from user_visit_action a
        |join product_info p on a.click_product_id = p.product_id
        |join city_info c on a.city_id = c.city_id
        |where a.click_product_id > -1
      """.stripMargin).createOrReplaceTempView("t1")

    // 根据区域、商品进行数据聚合
    spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF()))
    spark.sql(
      """
        |select
        |  area,
        |  product_name,
        |  count(*) as clickCnt,
        |  cityRemark(city_name) as city_remark
        |from t1 group by area, product_name
        """.stripMargin).createOrReplaceTempView("t2")

    // 区域内对点击数量进行排行
    spark.sql(
      """
        |select
        |	 *,
        |	 rank() over( partition by area order by clickCnt desc ) as rank
        |from  t2
        """.stripMargin).createOrReplaceTempView("t3")

    // 取前三名
    spark.sql(
      """
        |select
        |	 *
        |from  t3 where rank <= 3
      """.stripMargin).show(false)
    // truncate = false : 避免因内容过长被截取

    spark.close()
  }

  case class Buffer(var total: Long, var cityMap: mutable.Map[String, Long])
  // 自定义聚合函数: 实现城市备注功能
  // 1.继承Aggregator,定义泛型
  //    IN : 城市名称
  //    BUF :【总点击数量, Map[(city, cnt), (city, cnt)]】
  //    OUT : 备注信息
  // 2.重写方法 (6)
  class CityRemarkUDAF extends Aggregator[String, Buffer, String] {
    // 缓冲区初始化
    override def zero: Buffer = {
      Buffer(0, mutable.Map[String, Long]())
    }

    // 更新缓冲区
    override def reduce(buff: Buffer, city: String): Buffer = {
      buff.total += 1
      val newCount = buff.cityMap.getOrElse(city, 0L)+ 1
      buff.cityMap.update(city, newCount)
      buff
    }
    
    override def merge(buff1: Buffer, buff2: Buffer): Buffer = {
      buff1.total += buff2.total
      val map1 = buff1.cityMap
      val map2 = buff2.cityMap

//      buff1.cityMap = map1.foldLeft(map2) {
//        case (map, (city, cnt)) => {
//          val newCount = map.getOrElse(city, 0L) + cnt
//          map.update(city, newCount)
//          map
//        }
//      }
      // 上面的写法不太容易看懂,因此换一种合并方法
      map2.foreach{
        case (city, cnt) => {
          val newCount = map1.getOrElse(city, 0L) + cnt
          map1.update(city, newCount)
        }
      }
      buff1.cityMap = map1
      buff1
    }

    // 将统计的结果生成字符串信息
    override def finish(buff: Buffer): String = {
      val remarkList = ListBuffer[String]()
      val totalcnt = buff.total
      val cityMap = buff.cityMap

      // 降序排列
      var cityCntList = cityMap.toList.sortWith(
        (left, right) => {
          left._2 > right._2
        }
      ).take(2)

      val hasMore = cityMap.size > 2
      var rsum = 0L
      cityCntList.foreach {
        case (city, cnt) => {
          val r = cnt * 100 / totalcnt
          remarkList.append(s"${city} ${r}%")
          rsum += r
        }
      }
      if (hasMore) {
        remarkList.append(s"其他${100 - rsum}%")
      }

      remarkList.mkString(",")
    }

    override def bufferEncoder: Encoder[Buffer] = Encoders.product

    override def outputEncoder: Encoder[String] = Encoders.STRING
  }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-25 18:11:43  更:2022-06-25 18:13:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/19 23:29:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码