数据来源:https://grouplens.org/datasets/movielens/
使用的文件内容如下:
用户表: 电影表:
评分表:
三个表数据详情如下:
1、求被评分次数最多的 10 部电影,并给出评分次数(电影名,评分次数)
package com.spark.homework.movie
import org.apache.spark.{SparkConf, SparkContext}
object code_01{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
.map(x => x.split("::"))
.map(x => (x(1),1))
val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val top10_movie = movies.join(ratings)
.map(x => (x._2._1, x._2._2))
.reduceByKey(_+_)
.takeOrdered(10)(Ordering[Int].reverse.on(x => x._2))
for (elem <- top10_movie) {println(elem)}
sc.stop();
}
}
2、分别求男性,女性当中评分最高的 10 部电影(性别,电影名,影评分)
package com.spark.homework.movie
import org.apache.spark.{SparkConf, SparkContext}
object code_02 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
.map(x => x.split("::"))
.map(x => (x(0),(x(1),x(2).toDouble)))
val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val maleUsers = users.filter(x => x._2 == "F")
val femaleUsers = users.filter(x => x._2 == "M")
val maleMovie = maleUsers.join(ratings)
.map(x => (x._2._2._1,(x._2._2._2,1)))
.reduceByKey((a,b)=>(a._1+b._1,a._2+b._2))
.map(x => (x._1,x._2._1/x._2._2))
val femaleMovie = femaleUsers.join(ratings)
.map(x => (x._2._2._1,(x._2._2._2,1)))
.reduceByKey((a,b)=>(a._1+b._1,a._2+b._2))
.map(x => (x._1,x._2._1/x._2._2))
val maleMovieName = maleMovie.join(movies).map(x => (x._2._2, x._2._1))
val femaleMovieName = femaleMovie.join(movies).map(x => (x._2._2, x._2._1))
val top10_male = maleMovieName.takeOrdered(10)(Ordering[Double].reverse.on(x => x._2))
val top10_female = femaleMovieName.takeOrdered(10)(Ordering[Double].reverse.on(x => x._2))
println("男性:")
for (elem <- top10_male) {println(elem)}
println("女性: ")
for (elem <- top10_female) {println(elem)}
sc.stop();
}
}
3、分别求男性,女性看过评分次数最多的 10 部电影(性别,电影名)
package com.spark.homework.movie
import org.apache.spark.{SparkConf, SparkContext}
object code_03{
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val maleUsers = users.filter(x => x._2 == "F")
val femaleUsers = users.filter(x => x._2 == "M")
val maleMovie = maleUsers.join(ratings).map(x => (x._2._2,1)).reduceByKey(_+_)
val femaleMovie = femaleUsers.join(ratings).map(x => (x._2._2,1)).reduceByKey(_+_)
val maleMovieName = maleMovie.join(movies).map(x => (x._2._2, x._2._1))
val femaleMovieName = femaleMovie.join(movies).map(x => (x._2._2, x._2._1))
val top10_male = maleMovieName.takeOrdered(10)(Ordering[Int].reverse.on(x => x._2))
val top10_female = femaleMovieName.takeOrdered(10)(Ordering[Int].reverse.on(x => x._2))
println("男性:")
for (elem <- top10_male) {println(elem._1)}
println("女性: ")
for (elem <- top10_female) {println(elem._1)}
sc.stop();
}
}
4、年龄段在“18-24”的男人,最喜欢看(评分次数最多的)10部电影
package com.spark.homework.movie
import org.apache.spark.{SparkConf, SparkContext}
object code_04 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
.map(x => x.split("::"))
.filter(x => x.toList(2) == "18")
.map(x => (x(0),x(1)))
val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val maleUsers = users.filter(x => x._2 == "F")
val maleMovie = maleUsers.join(ratings).map(x => (x._2._2,1)).reduceByKey(_+_)
val maleMovieName = maleMovie.join(movies).map(x => (x._2._2, x._2._1))
val top10_male = maleMovieName.takeOrdered(10)(Ordering[Int].reverse.on(x => x._2))
for (elem <- top10_male) {println(elem._1)}
sc.stop();
}
}
5、求 movieid = 2116 这部电影各年龄段(因为年龄就只有 7 个,就按这个 7 个分就好了)的平均影评(年龄段,影评分)
package com.spark.homework.movie
import org.apache.spark.{SparkConf, SparkContext}
object code_05 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(2)))
val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
.map(x => x.split("::"))
.filter(x => x(1) == "2116")
.map(x => (x(0),x(2)))
val score = users.join(ratings).map(x => (x._2._1.toInt, (x._2._2.toInt, 1)))
.reduceByKey((a,b) =>(a._1+b._1,a._2+b._2))
.map(x => (x._1, 1.0 * x._2._1 / x._2._2))
for (elem <- score.collect()) {
println(elem)
}
sc.stop();
}
}
6、求最喜欢看电影(影评次数最多)的那位女性评最高分的 10 部电影的平均影评分(观影者,电影名,影评分)
package com.spark.homework.movie
import org.apache.spark.{SparkConf, SparkContext}
object code_06 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
.map(x => x.split("::"))
val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val top = ratings.map(x => (x(0), 1))
.join(users.filter(x => x._2 == "F"))
.map(x => (x._1, 1))
.reduceByKey(_ + _)
.takeOrdered(1)(Ordering[Int].reverse.on(x => x._2))
val id = top(0)._1
val top10_movesId = ratings.filter(x => x(0) == id)
.map(x => (x(1), x(2)))
.takeOrdered(10)(Ordering[Int].reverse.on(x => x._2.toInt))
for (elem <- top10_movesId) {
val moveId = elem._1
val moveName = movies.filter(x => x._1 == moveId).collect()(0)._2
val score = ratings.filter(x => x(1) == moveId)
.map(x => (moveName, (x(2).toInt, 1)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.map(x => (x._1, 1.0 * x._2._1 / x._2._2))
println(score.collect().toList)
}
sc.stop();
}
}
7、求好片(评分>=4.0)最多的那个年份的最好看的 10 部电影(好看定义为平均评分最高)
package com.spark.homework.movie
import org.apache.spark.{SparkConf, SparkContext}
object code_07 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
.map(x => x.split("::"))
.map(x => (x(1),x(2).toDouble))
val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val top1_moveId = movies.join(ratings.filter(x => x._2 >= 4))
.map(x => (x._1, 1))
.reduceByKey(_ + _)
.takeOrdered(10)(Ordering[Int].reverse.on(x => x._2))(0)._1
val top1_moveName = movies.filter(x => x._1 == top1_moveId).collect()(0)._2
val year = top1_moveName.substring(top1_moveName.length - 5, top1_moveName.length - 1)
val score = ratings.join(movies.filter(x => x._2.contains(year)))
.map(x => (x._2._2, (x._2._1, 1)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.map(x => (x._1, x._2._1 / x._2._2))
.takeOrdered(10)(Ordering[Double].reverse.on(x => x._2))
for (elem <- score) {println(elem)}
sc.stop();
}
}
8、求 1997 年上映的电影中,评分最高的 10 部 Comedy 类电影
package com.spark.homework.movie
import org.apache.spark.{SparkConf, SparkContext}
object code_08 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
.map(x => x.split("::"))
.map(x => (x(1),x(2).toDouble))
val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
.map(x => x.split("::"))
.filter(x => x(2).contains("Comedy") && x(1).contains("1997"))
.map(x => (x(0),x(1)))
val score = ratings.join(movies)
.map(x => (x._2._2, (x._2._1, 1)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.map(x => (x._1, x._2._1 / x._2._2))
.takeOrdered(10)(Ordering[Double].reverse.on(x => x._2))
for (elem <- score) {println(elem)}
sc.stop();
}
}
9、该影评库中各种类型电影中评价最高的 5 部电影(类型,电影名,平均影评分)
package com.spark.homework.movie
import org.apache.spark.{SparkConf, SparkContext}
object code_09 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
.map(x => x.split("::"))
.map(x => (x(1),x(2).toDouble))
val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
.map(x => x.split("::"))
val moveType = movies.map(x => x(2)).flatMap(x => x.split('|')).distinct().collect()
for (elem <- moveType) {
val top5_movie = movies.filter(x => x(2).contains(elem))
.map(x => (x(0), x(1))).join(ratings)
.map(x => (x._2._1, (x._2._2, 1)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.map(x => (x._1, x._2._1 / x._2._2))
.takeOrdered(5)((Ordering[Double].reverse.on(x => x._2)))
println(elem + ":")
for (movie <- top5_movie) {println(movie)}
}
sc.stop();
}
}
10、各年评分最高的电影类型(年份,类型,影评分)
package com.spark.homework.movie
import org.apache.spark.{SparkConf, SparkContext}
object code_10 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val users = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\users.dat")
.map(x => x.split("::"))
.map(x => (x(0),x(1)))
val ratings = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\ratings.dat")
.map(x => x.split("::"))
.map(x => (x(1),x(2).toDouble))
val movies = sc.textFile("E:\\BigData\\homework\\Spark作业\\实验八\\movies.dat")
.map(x => x.split("::"))
val years = movies.map(x => x(1)).map(x => x.substring(x.length - 5,x.length - 1)).distinct().collect()
for (elem <- years) {
val top_movie = movies.filter(x => x(1).contains(elem))
.map(x => (x(0), x(2)))
.join(ratings)
.map(x => (x._2._1, (x._2._2, 1)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.map(x => (x._1, x._2._1 / x._2._2))
.takeOrdered(1)((Ordering[Double].reverse.on(x => x._2)))
println(elem + ":" + top_movie(0))
}
sc.stop();
}
}
|