IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 实验四 Spark程序设计进阶 -> 正文阅读

[大数据]实验四 Spark程序设计进阶

数据来源:https://grouplens.org/datasets/movielens/

使用的文件内容如下:

用户表:
在这里插入图片描述
电影表:
在这里插入图片描述

评分表:
在这里插入图片描述

三个表数据详情如下:

在这里插入图片描述

1、求被评分次数最多的 10 部电影,并给出评分次数(电影名,评分次数)

package com.spark.homework.movie


import org.apache.spark.{SparkConf, SparkContext}

object code_01{
    def main(args: Array[String]): Unit = {
        // TODO 建立和Spark框架的连接

        val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)

        // TODO 执行业务操作1

        //1、读取文件,提取需要的数据

        // 用户id 性别
        val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        // 电影id 1
        val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
                .map(x => x.split("::"))
                .map(x => (x(1),1))

        //电影id  电影名
        val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))


        //(电影id,(电影名,1)) => (电影名,N)
        val top10_movie = movies.join(ratings)
                .map(x => (x._2._1, x._2._2))
                .reduceByKey(_+_)
                .takeOrdered(10)(Ordering[Int].reverse.on(x => x._2))

        for (elem <- top10_movie) {println(elem)}

        // TODO 关闭连接
        sc.stop();
    }

}

2、分别求男性,女性当中评分最高的 10 部电影(性别,电影名,影评分)

package com.spark.homework.movie

import org.apache.spark.{SparkConf, SparkContext}

object code_02 {
    def main(args: Array[String]): Unit = {
        // TODO 建立和Spark框架的连接

        val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)

        // TODO 执行业务操作1

        //1、读取文件,提取需要的数据

        // 用户id 性别
        val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        // 用户id  (电影id,评分)
        val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),(x(1),x(2).toDouble)))

        //电影id  电影名
        val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        //分别男女
        val maleUsers = users.filter(x => x._2 == "F")
        val femaleUsers = users.filter(x => x._2 == "M")

        //(用户id,(性别,(电影id,评分))) => (电影id,(评分,1)) => 求和 =>(电影id,平均评分)
        val maleMovie = maleUsers.join(ratings)
                        .map(x => (x._2._2._1,(x._2._2._2,1)))
                        .reduceByKey((a,b)=>(a._1+b._1,a._2+b._2))
                        .map(x => (x._1,x._2._1/x._2._2))

        val femaleMovie = femaleUsers.join(ratings)
                        .map(x => (x._2._2._1,(x._2._2._2,1)))
                        .reduceByKey((a,b)=>(a._1+b._1,a._2+b._2))
                        .map(x => (x._1,x._2._1/x._2._2))

        //(电影id,平均评分) => (电影id,(平均评分,电影名)) => (电影名,平均评分)
        val maleMovieName = maleMovie.join(movies).map(x => (x._2._2, x._2._1))
        val femaleMovieName = femaleMovie.join(movies).map(x => (x._2._2, x._2._1))

        //取点击量最高的前10个
        val top10_male = maleMovieName.takeOrdered(10)(Ordering[Double].reverse.on(x => x._2))
        val top10_female = femaleMovieName.takeOrdered(10)(Ordering[Double].reverse.on(x => x._2))

        println("男性:")
        for (elem <- top10_male) {println(elem)}

        println("女性: ")
        for (elem <- top10_female) {println(elem)}

        // TODO 关闭连接
        sc.stop();
    }

}

3、分别求男性,女性看过评分次数最多的 10 部电影(性别,电影名)

package com.spark.homework.movie


import org.apache.spark.{SparkConf, SparkContext}

object code_03{
    def main(args: Array[String]): Unit = {
        // TODO 建立和Spark框架的连接

        val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)

        // TODO 执行业务操作1

        //1、读取文件,提取需要的数据

        // 用户id 性别
        val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        // 用户id  电影id
        val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        //电影id  电影名
        val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        //分别男女
        val maleUsers = users.filter(x => x._2 == "F")
        val femaleUsers = users.filter(x => x._2 == "M")

        //(用户id,(性别,电影id)) => (电影id,1) => (电影id,N)
        val maleMovie = maleUsers.join(ratings).map(x => (x._2._2,1)).reduceByKey(_+_)
        val femaleMovie = femaleUsers.join(ratings).map(x => (x._2._2,1)).reduceByKey(_+_)

        //(电影id,(N,电影名)) => (电影名,N)
        val maleMovieName = maleMovie.join(movies).map(x => (x._2._2, x._2._1))
        val femaleMovieName = femaleMovie.join(movies).map(x => (x._2._2, x._2._1))

        //取点击量最高的前10个
        val top10_male = maleMovieName.takeOrdered(10)(Ordering[Int].reverse.on(x => x._2))
        val top10_female = femaleMovieName.takeOrdered(10)(Ordering[Int].reverse.on(x => x._2))

        println("男性:")
        for (elem <- top10_male) {println(elem._1)}

        println("女性: ")
        for (elem <- top10_female) {println(elem._1)}

        // TODO 关闭连接
        sc.stop();
    }

}

4、年龄段在“18-24”的男人,最喜欢看(评分次数最多的)10部电影

package com.spark.homework.movie

import org.apache.spark.{SparkConf, SparkContext}

object code_04 {
    def main(args: Array[String]): Unit = {
        // TODO 建立和Spark框架的连接

        val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)

        // TODO 执行业务操作1

        // 用户id 性别
        val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
                .map(x => x.split("::"))
                .filter(x => x.toList(2) == "18") //年龄18~24
                .map(x => (x(0),x(1)))

        // 用户id  电影id
        val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        //电影id  电影名
        val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        //取男性
        val maleUsers = users.filter(x => x._2 == "F")

        //(用户id,(性别,电影id)) => (电影id,1) => (电影id,N)
        val maleMovie = maleUsers.join(ratings).map(x => (x._2._2,1)).reduceByKey(_+_)

        //(电影id,(N,电影名)) => (电影名,N)
        val maleMovieName = maleMovie.join(movies).map(x => (x._2._2, x._2._1))

        //取点击量最高的前10个
        val top10_male = maleMovieName.takeOrdered(10)(Ordering[Int].reverse.on(x => x._2))
        
        for (elem <- top10_male) {println(elem._1)}
        
        // TODO 关闭连接
        sc.stop();
    }

}

5、求 movieid = 2116 这部电影各年龄段(因为年龄就只有 7 个,就按这个 7 个分就好了)的平均影评(年龄段,影评分)

package com.spark.homework.movie

import org.apache.spark.{SparkConf, SparkContext}

object code_05 {
    def main(args: Array[String]): Unit = {
        // TODO 建立和Spark框架的连接

        val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)

        // TODO 执行业务操作1

        // 用户id 年龄
        val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(2)))

        // 用户id  评分
        val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
                .map(x => x.split("::"))
                .filter(x => x(1) == "2116")
                .map(x => (x(0),x(2)))

        //(用户id,(年龄,评分)) => (年龄,(评分,1)) => (年龄,(sum评分,sum)) => (年龄,均值)
        val score = users.join(ratings).map(x => (x._2._1.toInt, (x._2._2.toInt, 1)))
                .reduceByKey((a,b) =>(a._1+b._1,a._2+b._2))
                .map(x => (x._1, 1.0 * x._2._1 / x._2._2))

        for (elem <- score.collect()) {
            println(elem)
        }

        // TODO 关闭连接
        sc.stop();
    }

}

6、求最喜欢看电影(影评次数最多)的那位女性评最高分的 10 部电影的平均影评分(观影者,电影名,影评分)

package com.spark.homework.movie

import org.apache.spark.{SparkConf, SparkContext}

object code_06 {
    def main(args: Array[String]): Unit = {
        // TODO 建立和Spark框架的连接

        val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)

        // TODO 执行业务操作1

        // 用户id 性别
        val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        // 用户id 电影id 评分
        val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
                .map(x => x.split("::"))

        //电影id  电影名
        val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        //(用户id,1) => (用户id,(1,评分)) => (用户id,1) => 聚合(用户id,N) => 排序取最大的N
        val top = ratings.map(x => (x(0), 1))
                .join(users.filter(x => x._2 == "F"))
                .map(x => (x._1, 1))
                .reduceByKey(_ + _)
                .takeOrdered(1)(Ordering[Int].reverse.on(x => x._2))

        //最喜欢看电影的女性的id
        val id = top(0)._1

        //过滤id => (电影id,评分) => 取评分前10
        val top10_movesId = ratings.filter(x => x(0) == id)
                .map(x => (x(1), x(2)))
                .takeOrdered(10)(Ordering[Int].reverse.on(x => x._2.toInt))

        for (elem <- top10_movesId) {
            // 当前电影id
            val moveId = elem._1

            // 当前电影名
            val moveName = movies.filter(x => x._1 == moveId).collect()(0)._2

            // 过滤电影 => 统计得分(moveName,(评分,1)) => (moveName (sum评分,sum)) => (moveName,平均评分)
            val score = ratings.filter(x => x(1) == moveId)
                    .map(x => (moveName, (x(2).toInt, 1)))
                    .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
                    .map(x => (x._1, 1.0 * x._2._1 / x._2._2))

            println(score.collect().toList)
        }

        // TODO 关闭连接
        sc.stop();
    }

}

7、求好片(评分>=4.0)最多的那个年份的最好看的 10 部电影(好看定义为平均评分最高)

package com.spark.homework.movie

import org.apache.spark.{SparkConf, SparkContext}

object code_07 {
    def main(args: Array[String]): Unit = {
        // TODO 建立和Spark框架的连接

        val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)

        // TODO 执行业务操作1

        // 用户id 性别
        val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        //电影id 评分
        val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
                .map(x => x.split("::"))
                .map(x => (x(1),x(2).toDouble))

        //电影id  电影名
        val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        //(电影id,(电影名,评分)) => 过滤大于大于4评分电影 => (电影id,1) => 计算数量 => 取评分数量最多的电影的id
        val top1_moveId = movies.join(ratings.filter(x => x._2 >= 4))
                .map(x => (x._1, 1))
                .reduceByKey(_ + _)
                .takeOrdered(10)(Ordering[Int].reverse.on(x => x._2))(0)._1

        //id为top1_moveId 的电影名
        val top1_moveName = movies.filter(x => x._1 == top1_moveId).collect()(0)._2

        //取电影播放年份
        val year = top1_moveName.substring(top1_moveName.length - 5, top1_moveName.length - 1)

        //过滤年份 => 合并(电影id,(评分,电影名)) => (电影名,(评分,1))=>求和(电影名,(sum评分,sum)) => 求平均 => 取平均最高的10个
        val score = ratings.join(movies.filter(x => x._2.contains(year)))
                .map(x => (x._2._2, (x._2._1, 1)))
                .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
                .map(x => (x._1, x._2._1 / x._2._2))
                .takeOrdered(10)(Ordering[Double].reverse.on(x => x._2))

        for (elem <- score) {println(elem)}

        // TODO 关闭连接
        sc.stop();
    }

}

8、求 1997 年上映的电影中,评分最高的 10 部 Comedy 类电影

package com.spark.homework.movie

import org.apache.spark.{SparkConf, SparkContext}

object code_08 {
    def main(args: Array[String]): Unit = {
        // TODO 建立和Spark框架的连接

        val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)

        // TODO 执行业务操作1

        // 用户id 性别
        val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        //电影id 评分
        val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
                .map(x => x.split("::"))
                .map(x => (x(1),x(2).toDouble))

        //电影id  电影名
        //过滤类型 and 年份
        val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
                .map(x => x.split("::"))
                .filter(x => x(2).contains("Comedy") && x(1).contains("1997"))
                .map(x => (x(0),x(1)))


        //合并(电影id,(评分,电影名)) => (电影名,(评分,1))=>求和(电影名,(sum评分,sum)) => 求平均 => 取平均最高的10个
        val score = ratings.join(movies)
                .map(x => (x._2._2, (x._2._1, 1)))
                .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
                .map(x => (x._1, x._2._1 / x._2._2))
                .takeOrdered(10)(Ordering[Double].reverse.on(x => x._2))

        for (elem <- score) {println(elem)}

        // TODO 关闭连接
        sc.stop();
    }

}

9、该影评库中各种类型电影中评价最高的 5 部电影(类型,电影名,平均影评分)

package com.spark.homework.movie

import org.apache.spark.{SparkConf, SparkContext}

object code_09 {
    def main(args: Array[String]): Unit = {
        // TODO 建立和Spark框架的连接

        val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)

        // TODO 执行业务操作1

        // 用户id 性别
        val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        //电影id 评分
        val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
                .map(x => x.split("::"))
                .map(x => (x(1),x(2).toDouble))

        //电影id  电影名
        val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
                .map(x => x.split("::"))

        //取得所有电影的类型
        val moveType = movies.map(x => x(2)).flatMap(x => x.split('|')).distinct().collect()

        for (elem <- moveType) {
            //对于每种类型,获取评价最高的
            
            //(电影id,(电影名,评分)) => (电影名,(评分,1)) => 求和,求平均 => 取最高的5个
            val top5_movie = movies.filter(x => x(2).contains(elem))
                    .map(x => (x(0), x(1))).join(ratings)
                    .map(x => (x._2._1, (x._2._2, 1)))
                    .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
                    .map(x => (x._1, x._2._1 / x._2._2))
                    .takeOrdered(5)((Ordering[Double].reverse.on(x => x._2)))
            println(elem + ":")
            for (movie <- top5_movie) {println(movie)}
        }
        // TODO 关闭连接
        sc.stop();
    }

}

10、各年评分最高的电影类型(年份,类型,影评分)

package com.spark.homework.movie

import org.apache.spark.{SparkConf, SparkContext}

object code_10 {
    def main(args: Array[String]): Unit = {
        // TODO 建立和Spark框架的连接

        val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)

        // TODO 执行业务操作1

        // 用户id 性别
        val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
                .map(x => x.split("::"))
                .map(x => (x(0),x(1)))

        //电影id 评分
        val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
                .map(x => x.split("::"))
                .map(x => (x(1),x(2).toDouble))

        //电影id  电影名
        val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
                .map(x => x.split("::"))

        //取得所有电影上映时间
        val years = movies.map(x => x(1)).map(x => x.substring(x.length - 5,x.length - 1)).distinct().collect()

        for (elem <- years) {
            //对于每年,获取评价最高的

            //(电影id,(电影名,评分)) => (电影名,(评分,1)) => 求和,求平均 => 取最高的5个
            val top_movie = movies.filter(x => x(1).contains(elem))
                    .map(x => (x(0), x(2))) //(电影id,类型)
                    .join(ratings) //(电影id,(类型,评分))
                    .map(x => (x._2._1, (x._2._2, 1))) //(类型,(评分,1))
                    .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)) //求和
                    .map(x => (x._1, x._2._1 / x._2._2)) //求平均
                    .takeOrdered(1)((Ordering[Double].reverse.on(x => x._2))) //取最大
            println(elem + ":" + top_movie(0))
        }
        // TODO 关闭连接
        sc.stop();
    }

}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-06 23:15:00  更:2022-04-06 23:18:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 5:40:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码