IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark-核心编程(七)Spark案例实操即工程化代码 -> 正文阅读

[大数据]Spark-核心编程(七)Spark案例实操即工程化代码

Spark案例实操

数据如下:

在这里插入图片描述

数据解析如下:

# 以第一行为例
2019-07-17  日期
95  用户ID
26070e87-1ad7-49a3-8fb3-cc741facaddf  sessionID
37  页面ID
2019-07-17 00:00:02  动作时间
手机  搜索-关键字,如果该字段不为null说明当前是搜索操作
-1    点击-品类ID,如果该字段不为-1说明当前操作是点击
-1    点击-产品ID,如果该字段不为-1说明当前操作是点击
null  下单-品类ID,如果该字段不为null说明当前操作是下单操作,多个ID用,隔开
null  下单-产品ID,如果该字段不为null说明当前操作是下单操作,多个ID用,隔开
null  支付-品类ID,如果该字段不为null说明当前操作是支付操作,多个ID用,隔开
null  支付-产品ID,如果该字段不为null说明当前操作是支付操作,多个ID用,隔开
3     城市id

上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的 4 种行为:搜索,点击,下单,支付。数据规则如下:

数据文件中每行数据采用下划线分隔数据

每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种

如果搜索关键字为 null,表示数据不是搜索数据

如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据

针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示

支付行为和下单行为类似

编号字段名称字段类型字段含义
1dateString用户点击行为的日期
2user_idLong用户的 ID
3session_idStringSession 的 ID
4page_idLong某个页面的 ID
5action_timeString动作的时间点
6search_keywordString用户搜索的关键词
7click_category_idLong某一个商品品类的 ID
8click_product_idLong某一个商品的 ID
9order_category_idsString一次订单中所有品类的 ID 集合
10order_product_idsString一次订单中所有商品的 ID 集合
11pay_category_idsString一次支付中所有品类的 ID 集合
12pay_product_idsString一次支付中所有商品的 ID 集合
13city_idLong城市 id

需求一:TOP10热门品类

品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。

鞋 点击数 下单数 支付数

例如, 综合排名 = 点击数20%+下单数30%+支付数*50%

本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。

第一种实现方法:

object TestHostCategoryTop10T1 {

    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HostCategoryTop10")
        val sc = new SparkContext(sparkConf)

        // TOP10热门品类
        // 1、读取原始日志数据
        val rdd: RDD[String] = sc.textFile("datas/spark-core/user_visit_action.txt")

        // 2、统计品类的点击数量:(品类ID,点击数量)
        val clickActionRdd: RDD[String] = rdd.filter(_.split("_")(6) != "-1")
        val clickCountRdd: RDD[(String, Int)] = clickActionRdd.map((action: String) => (action.split("_")(6), 1)).reduceByKey(_ + _)

        // 3、统计品类的下单数量:(品类ID,下单数量)
        val orderActionRdd: RDD[String] = rdd.filter(_.split("_")(8) != "null")
        val orderCountRdd: RDD[(String, Int)] = orderActionRdd.flatMap(action => {
            action.split("_")(8).split(",").map((_, 1))
        }).reduceByKey(_ + _)

        // 4、统计品类的支付数量:(品类ID,支付数量)
        val payActionRdd: RDD[String] = rdd.filter(_.split("_")(10) != "null")
        val payCountRdd: RDD[(String, Int)] = payActionRdd.flatMap(action => {
            action.split("_")(10).split(",").map((_, 1))
        }).reduceByKey(_ + _)

        // 5、讲品类进行排序,并且取前十名
        // 点击数量排序,下单数量排序,支付数量排序
        // 元祖排序:先比较第一个,再比较第二个,再比较第三个,一次类推
        // (品类ID, (点击数量, 下单数量, 支付数量))
        // 连接数据 cogroup = connect + group
        val cogrouprdd: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
            clickCountRdd.cogroup(orderCountRdd, payCountRdd)
        val analysisRDD: RDD[(String, (Int, Int, Int))] = cogrouprdd.mapValues {
            case (clickIter, orderIter, payIter) => {
                var clickCount = 0;
                if (clickIter.iterator.hasNext) {
                    clickCount = clickIter.iterator.next()
                }

                var orderCount = 0;
                if (orderIter.iterator.hasNext) {
                    orderCount = orderIter.iterator.next()
                }

                var payCount = 0;
                if (payIter.iterator.hasNext) {
                    payCount = payIter.iterator.next()
                }

                (clickCount, orderCount, payCount)
            }
        }

        val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, false).take(10)

        // 6、采集
        resultRDD.foreach(println)
//        (15,(6120,1672,1259))
//        (2,(6119,1767,1196))
//        (20,(6098,1776,1244))
//        (12,(6095,1740,1218))
//        (11,(6093,1781,1202))
//        (17,(6079,1752,1231))
//        (7,(6074,1796,1252))
//        (9,(6045,1736,1230))
//        (19,(6044,1722,1158))
//        (13,(6036,1781,1161))

        sc.stop()
    }
}

第二种方法:

object TestHostCategoryTop10T2 {

    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HostCategoryTop10")
        val sc = new SparkContext(sparkConf)

        // TOP10热门品类
        // 1、读取原始日志数据
        val rdd: RDD[String] = sc.textFile("datas/spark-core/user_visit_action.txt")
        rdd.cache()

        // Q1:数据源被重复使用的次数过多
        // Q2:cogroup有可能存在Shuffle,性能较低
        // (品类ID,点击数量) => (品类ID,(点击数量, 0, 0))
        // (品类ID,下单数量) => (品类ID,(0, 下单数量, 0))
        // (品类ID,支付数量) => (品类ID,(0, 0, 支付数量))
        // 然后两两聚合最终形成:(品类ID, (点击数量, 下单数量, 支付数量))

        // 2、统计品类的点击数量:(品类ID,点击数量)
        val clickActionRdd: RDD[String] = rdd.filter(_.split("_")(6) != "-1")
        val clickCountRdd: RDD[(String, (Int, Int, Int))] = clickActionRdd.map((action: String) => (action.split("_")(6), 1))
            .reduceByKey(_ + _)
            .mapValues((_, 0, 0))

        // 3、统计品类的下单数量:(品类ID,下单数量)
        val orderActionRdd: RDD[String] = rdd.filter(_.split("_")(8) != "null")
        val orderCountRdd: RDD[(String, (Int, Int, Int))] = orderActionRdd.flatMap(action => {
            action.split("_")(8).split(",").map((_, 1))
        }).reduceByKey(_ + _).mapValues((0, _, 0))

        // 4、统计品类的支付数量:(品类ID,支付数量)
        val payActionRdd: RDD[String] = rdd.filter(_.split("_")(10) != "null")
        val payCountRdd: RDD[(String, (Int, Int, Int))] = payActionRdd.flatMap(action => {
            action.split("_")(10).split(",").map((_, 1))
        }).reduceByKey(_ + _).mapValues((0, 0, _))

        // 5、讲品类进行排序,并且取前十名
        // 点击数量排序,下单数量排序,支付数量排序
        // 元祖排序:先比较第一个,再比较第二个,再比较第三个,一次类推
        // 将三个数据源合并在一起,统一进行聚合计算
        val analysisRdd: RDD[(String, (Int, Int, Int))] = clickCountRdd.union(orderCountRdd).union(payCountRdd).reduceByKey(
            (t1, t2) => {
                (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
            }
        )
        val resultRDD: Array[(String, (Int, Int, Int))] = analysisRdd.sortBy(_._2, false).take(10)

        // 6、采集
        resultRDD.foreach(println)

        sc.stop()
    }
}

第三种方法实现:

object TestHostCategoryTop10T3 {

    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HostCategoryTop10")
        val sc = new SparkContext(sparkConf)

        // TOP10热门品类
        // 1、读取原始日志数据
        val rdd: RDD[String] = sc.textFile("datas/spark-core/user_visit_action.txt")

        // Q1:存在大量的Shuffle操作(reduceByKey)
        // reduceByKey,聚合算子,spark会提供优化,缓存

        // 2、将数据转换结构
        // 点击场合:(品类, (1, 0, 0))
        // 下单场合:(品类, (0, 1, 0))
        // 支付场合:(品类, (0, 0, 1))
        val flatRDD: RDD[(String, (Int, Int, Int))] = rdd.flatMap(
            action => {
                val datas = action.split("_")
                if (datas(6) != "-1") {
                    // 点击场合
                    List((datas(6), (1, 0, 0)))
                } else if (datas(8) != "null") {
                    // 下单场合
                    datas(8).split(",").map((_, (0, 1, 0)))
                } else if (datas(10) != "null") {
                    // 支付场合
                    datas(10).split(",").map((_, (0, 0, 1)))
                } else {
                    Nil
                }
            }
        )


        // 3、将相同的品类ID的数据进行分区聚合
        // (品类ID, (点击数量, 下单数量, 支付数量))
        val analysisRdd: RDD[(String, (Int, Int, Int))] =flatRDD.reduceByKey(
            (t1, t2) => {
                (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
            }
        )

        // 4、降品类进行排序,并且取前十名
        val resultRDD: Array[(String, (Int, Int, Int))] = analysisRdd.sortBy(_._2, false).take(10)

        // 5、采集
        resultRDD.foreach(println)

        sc.stop()
    }
}

第四种实现方法

object TestHostCategoryTop10T4 {

    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HostCategoryTop10")
        val sc = new SparkContext(sparkConf)

        // TOP10热门品类
        // 1、读取原始日志数据
        val rdd: RDD[String] = sc.textFile("datas/spark-core/user_visit_action.txt")

        // Q1:存在Shuffle操作(reduceByKey)

        // 声明累加器
        val acc = new HotCategoryAccumulator;
        sc.register(acc, "hotCategory")


        // 2、将数据转换结构
        rdd.foreach(
            action => {
                val datas = action.split("_")
                if (datas(6) != "-1") {
                    // 点击场合
                    acc.add(datas(6), "click")
                } else if (datas(8) != "null") {
                    // 下单场合
                    datas(8).split(",").foreach(acc.add(_, "order"))
                } else if (datas(10) != "null") {
                    // 支付场合
                    datas(10).split(",").foreach(acc.add(_, "pay"))
                }
            }
        )

        val categories: mutable.Iterable[HotCategory] = acc.value.map(_._2)
        val sortList: List[HotCategory] = categories.toList.sortWith(
            (left, right) => {
                if (left.clickCnt > right.clickCnt) {
                    true
                } else if (left.clickCnt == right.clickCnt) {
                    if (left.orderCnt > right.orderCnt) {
                        true
                    } else if (left.orderCnt == right.orderCnt) {
                        left.payCnt > right.payCnt
                    } else {
                        false
                    }
                } else {
                    false
                }
            }
        )

        // 4、降品类进行排序,并且取前十名
        val result = sortList.take(10)

        // 5、采集
        result.foreach(println)

        sc.stop()
    }

    /**
     * 自定义累加器
     * 1、继承AccumulatorV2,定义泛型
     *    IN:  (品类ID, 行为类型)
     *    OUT: mutable.Map[String, HotCategory]
     * 2、重写方法(6)
     */
    class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {

        private val hcMap = mutable.Map[String, HotCategory]()

        override def isZero: Boolean = hcMap.isEmpty

        override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] =
            new HotCategoryAccumulator

        override def reset(): Unit = hcMap.clear()

        override def add(v: (String, String)): Unit = {
            val cid: String = v._1
            val actionType: String = v._2
            val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))
            if (actionType == "click") {
                category.clickCnt += 1
            } else if (actionType == "order") {
                category.orderCnt += 1
            } else if (actionType == "pay") {
                category.payCnt += 1
            }

            hcMap.update(cid, category)
        }

        override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
            val map1 = this.hcMap
            val map2 = other.value

            map2.foreach {
                case (cid, hc) => {
                    val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))
                    category.clickCnt += hc.clickCnt
                    category.orderCnt += hc.orderCnt
                    category.payCnt += hc.payCnt

                    map1.update(cid, category)
                }
            }
        }

        override def value: mutable.Map[String, HotCategory] = hcMap
    }

    case class HotCategory(var cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)
}

需求二:Top10 热门品类中每个品类的 Top10 活跃 Session 统计

需求简化:在需求一的基础上,增加每个品类用户 session 的点击统计

object TestHostCategoryTop10Session1 {

    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HostCategoryTop10")
        val sc = new SparkContext(sparkConf)

        // 1、读取原始日志数据
        val rdd: RDD[String] = sc.textFile("datas/spark-core/user_visit_action.txt")
        rdd.cache()

        val top10: Array[String] = top10Category(rdd)

        // 1、过滤原始数据,保留点击和前10品类id
        val filterRdd: RDD[String] = rdd.filter(
            action => {
                val datas = action.split("_")
                if (datas(6) != "-1") {
                    top10.contains(datas(6))
                } else {
                    false
                }
            }
        )

        // 2、将品类ID和sessionid进行点击量的统计
        val reduceRdd: RDD[((String, String), Int)] = filterRdd.map(
            action => {
                val datas = action.split("_")
                ((datas(6), datas(2)), 1)
            }
        ).reduceByKey(_ + _)

        // 将统计的结果进行结构的转换
        // ((品类ID, sessionId), sum) => (品类ID, (sessionId, sum))
        val mapRDD: RDD[(String, (String, Int))] = reduceRdd.map {
            case ((cid, sid), sum) => (cid, (sid, sum))
        }

        // 4、相同的品类进行分组
        val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()

        // 5、讲分组后的数据进行点击量的排序,取前时
        val resultRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues(
            iter => {
                iter.toList.sortBy(_._2).reverse.take(10)
            }
        )

        // 5、采集
        resultRDD.collect.foreach(println)

        sc.stop()
    }

    def top10Category (rdd: RDD[String]): Array[String] = {
        rdd.flatMap(
            (action: String) => {
                val datas: Array[String] = action.split("_")
                if (datas(6) != "-1") {
                    // 点击场合
                    List((datas(6), (1, 0, 0)))
                } else if (datas(8) != "null") {
                    // 下单场合
                    datas(8).split(",").map((_, (0, 1, 0)))
                } else if (datas(10) != "null") {
                    // 支付场合
                    datas(10).split(",").map((_, (0, 0, 1)))
                } else {
                    Nil
                }
            }
        ).reduceByKey(
            (t1, t2) => {
                (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
            }
        ).sortBy(_._2, false).take(10).map(_._1)
    }
}

需求三:页面单跳转换率统计

页面单跳转化率

计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。

比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。
在这里插入图片描述

统计页面单跳转化率意义

产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。

数据分析师,可以此数据做更深一步的计算和分析。

企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略或策略。

object TestPageFlow {

    def main(args: Array[String]): Unit = {
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HostCategoryTop10")
        val sc = new SparkContext(sparkConf)

        val rdd: RDD[String] = sc.textFile("datas/spark-core/user_visit_action.txt")

        val dataRDD: RDD[UserVisitAction] = rdd.map(
            action => {
                val datas = action.split("_")
                UserVisitAction(
                    datas(0),
                    datas(1).toLong,
                    datas(2),
                    datas(3).toLong,
                    datas(4),
                    datas(5),
                    datas(6).toLong,
                    datas(7).toLong,
                    datas(8),
                    datas(9),
                    datas(10),
                    datas(11),
                    datas(12).toLong
                )
            }
        )
        dataRDD.cache()

        // 对指定页面的连续跳转进行统计
        // 1-2,2-3,3-4,4-5,5-6,6-7
        val ids = List(1L, 2L, 3L, 4L, 5L, 6L, 7L)
        val okFlowIds = ids.zip(ids.tail)

        // 计算分母
        // 分母过滤
        val pageIdToCountMap: Map[Long, Long] = dataRDD.filter(action => ids.init.contains(action.page_id))
            .map(action => (action.page_id, 1L))
            .reduceByKey(_ + _).collect.toMap
//        val pageIdToCountMap: Map[Long, Long] = dataRDD.map(action => (action.page_id, 1L)).reduceByKey(_ + _).collect.toMap

        // 计算分子
        // 根据session进行分组
        val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = dataRDD.groupBy(_.session_id)

        // 分组后,根据访问事件进行排序(升序)
        val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
            iter => {
                val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)

                // 【1, 2, 3, 4】
                // 【1, 2】,【2, 3】,【3, 4】
                // 【1-2, 2-3, 3-4】
                // Sliding:滑窗
                // 【1, 2, 3, 4】
                // 【2, 3, 4】
                // zip: 拉链
                val flowIds: List[Long] = sortList.map(_.page_id)
                val pageFlowIdList: List[(Long, Long)] = flowIds.zip(flowIds.tail)

                // 将不合法的页面跳转进行过滤
                pageFlowIdList.filter(
                    t => okFlowIds.contains(t)
                ).map((_, 1))
            }
        )
        // ((1,2), 1) => ((1, 2), sum)
        val flatRdd: RDD[((Long, Long), Int)] = mvRDD.map(_._2).flatMap(list => list)
        val reduceRDD: RDD[((Long, Long), Int)] = flatRdd.reduceByKey(_ + _)

        // 计算单挑转换率--分子除以分母
        reduceRDD.foreach{
            case ((page1, page2), sum) => {
                val lon: Long = pageIdToCountMap.getOrElse(page1, 0L)

                println(s"页面${page1}跳转到${page2}的单跳转换率为:${sum.toDouble / lon}")
            }
        }

        sc.stop()
    }

    //用户访问动作表
    case class UserVisitAction(
        date: String,//用户点击行为的日期
        user_id: Long,//用户的 ID
        session_id: String,//Session 的 ID
        page_id: Long,//某个页面的 ID
        action_time: String,//动作的时间点
        search_keyword: String,//用户搜索的关键词
        click_category_id: Long,//某一个商品品类的 ID
        click_product_id: Long,//某一个商品的 ID
        order_category_ids: String,//一次订单中所有品类的 ID 集合
        order_product_ids: String,//一次订单中所有商品的 ID 集合
        pay_category_ids: String,//一次支付中所有品类的 ID 集合
        pay_product_ids: String,//一次支付中所有商品的 ID 集合
        city_id: Long//城市 id
    )
}

// 过滤之后的结果:
页面2跳转到3的单跳转换率为:0.019949423995504357
页面4跳转到5的单跳转换率为:0.018323153803442533
页面1跳转到2的单跳转换率为:0.01510989010989011
页面3跳转到4的单跳转换率为:0.016884531590413945
页面5跳转到6的单跳转换率为:0.014594442885209093
页面6跳转到7的单跳转换率为:0.0192040077929307

工程化代码

TApplication

trait TApplication {

    def start(master: String = "local[*]", appName: String = "Application")(Op: => Unit) = {

        val sparkConf = new SparkConf().setMaster(master).setAppName(appName)

        val sc = new SparkContext(sparkConf)

        EnvUtil.put(sc)

        try {
            Op
        } catch {
            case ex => println(ex.getMessage)
        }

        sc.stop()
        EnvUtil.clear()
    }
}

TController

trait TController {

    def dispatch(): Any
}

TService

trait TService {

    def dataAnalysis(): Any
}

TDao

trait TDao {

    def readFile(path: String): RDD[String] = {

        EnvUtil.take.textFile(path)
    }

}

EnvUtil

object EnvUtil {

    // ThreadLocal可以对线程的内存进行控制,存储数据,共享数据,只能作为共享读数据,对于写数据是有线程安全问题的
    private val scLocal = new ThreadLocal[SparkContext]()

    def put(sc: SparkContext): Unit = {
        scLocal.set(sc)
    }

    def take(): SparkContext = {
        scLocal.get()
    }

    def clear(): Unit = {
        scLocal.remove()
    }
}

WordCountApplication

object WordCountApplication extends App with TApplication{

    start() {
        val controller = new WordCountController

        controller.dispatch()
    }

}

WordCountController

class WordCountController extends TController{

    private val service = new WordCountService

    // 调度
    def dispatch (): Unit = {

        val wordCount: Array[(String, Int)] = service.dataAnalysis()

        wordCount.foreach(println)
    }
}

WordCountService

class WordCountService extends TService{

    private val dao = new WordCountDao

    def dataAnalysis(): Array[(String, Int)] = {

        val lines = dao.readFile("datas/spark-core/wordCount")

        val words = lines.flatMap(_.split(" "))

        val wordToOne = words.map(word => (word, 1))

        val wordGroup = wordToOne.groupBy(word => word._1)

        wordGroup.map {
            case (word, list) => {
                val tuple = list.reduce((a, b) => {
                    (word, a._2 + b._2)
                })
                tuple
            }
        }.collect

    }
}

WordCountDao

class WordCountDao extends TDao{

}

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-06 15:57:19  更:2021-08-06 15:57:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 18:29:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码