系列文章目录
- 初识推荐系统——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一)
- 利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二)
- 项目主要效果展示——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(三)
- 项目体系架构设计——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(四)
- 基础环境搭建——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(五)
- 创建项目并初始化业务数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(六)
- 离线推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(七)
- ……
项目资源下载
- 电影推荐系统网站项目源码Github地址(可Fork可Clone)
- 电影推荐系统网站项目源码Gitee地址(可Fork可Clone)
- 电影推荐系统网站项目源码压缩包下载(直接使用)
- 电影推荐系统网站项目源码所需全部工具合集打包下载(spark、kafka、flume、tomcat、azkaban、elasticsearch、zookeeper)
- 电影推荐系统网站项目源数据(可直接使用)
- 电影推荐系统网站项目个人原创论文
- 电影推荐系统网站项目前端代码
- 电影推荐系统网站项目前端css代码
前言
??本节博客的内容十分重要,是对之前理论知识的实践运用,所以涉及到代码的编写,还是那句话,读者一定要注意命名、路径等问题,要把我代码中的相关内容替换为您自己的相关内容。当然,在一些晦涩难懂的理论部分我仍会在此篇博文进行讲解。此博客主要是离线推荐服务的建设,其中包括:离线推荐服务、离线统计服务、基于隐语义模型的协同过滤推荐。并且通过离线推荐我们也可以计算出相关数据,为后面的实时推荐打下基础。另外,要明白为什么离线推荐的算法不能用在实时推荐(因为离线推荐算法速度较慢,而实时推荐对时间有要求),还要理解离线推荐算法的精髓。下面就开始今天的学习吧!
一、离线推荐服务
??离线推荐服务是综合用户所有的历史数据,利用设定的离线统计算法和离线推荐算法周期性的进行结果统计与保存,计算的结果在一定时间周期内是固定不变的,变更的频率取决于算法调度的频率 ??离线推荐服务主要计算一些可以预先进行统计和计算的指标,为实时计算和前端业务响应提供数据支撑 ??离线推荐服务主要分为统计性算法、基于ALS的协同过滤推荐算法以及基于ElasticSearch的内容推荐算法 ??在recommender下新建子项目StatisticsRecommender,pom.xml文件中只需引入Spark、Scala和Mongodb的相关依赖:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
??在resources文件夹下引入log4j.properties,然后在src/main/scala下新建scala 单例对象com.IronmanJay.statistics.StatisticsRecommender ??同样,应该先建好样例类,在main()方法中定义配置、创建SparkSession并加载数据,最后关闭spark。代码如下:
package com.IronmanJay.statistics
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
shoot: String, language: String, genres: String, actors: String, directors: String)
case class Rating(uid: Int, mid: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
case class Recommendation(mid: Int, score: Double)
case class GenresRecommendation(genres: String, recs: Seq[Recommendation])
object StatisticsRecommender {
val MONGODB_MOVIE_COLLECTION = "Movie"
val MONGODB_RATING_COLLECTION = "Rating"
val RATE_MORE_MOVIES = "RateMoreMovies"
val RATE_MORE_RECENTLY_MOVIES = "RateMoreRecentlyMovies"
val AVERAGE_MOVIES = "AverageMovies"
val GENRES_TOP_MOVIES = "GenresTopMovies"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://linux:27017/recommender",
"mongo.db" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommeder")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
val ratingDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Rating]
.toDF()
val movieDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Movie]
.toDF()
ratingDF.createOrReplaceTempView("ratings")
spark.stop()
}
二、离线统计服务
2.1 历史热门电影统计
??根据所有历史评分数据,计算历史评分次数最多的电影 ??实现思路:通过Spark SQL读取评分数据集,统计所有评分中评分数最多的电影,然后按照从大到小排序,将最终结果写入MongoDB的RateMoreMovies数据集中
val rateMoreMoviesDF = spark.sql("select mid, count(mid) as count from ratings group by mid")
storeDFInMongoDB(rateMoreMoviesDF, RATE_MORE_MOVIES)
def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit = {
df.write
.option("uri", mongoConfig.uri)
.option("collection", collection_name)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
}
2.2 最近热门电影统计
??根据评分,按月为单位计算最近时间的月份里面评分数最多的电影集合 ??实现思路:通过Spark SQL读取评分数据集,通过UDF函数将评分数据时间修改为月,然后统计每月电影的评分数。统计完成之后将数据写入到MongoDB的RateMoreRecentlyMovies数据集中
val simpleDateFormat = new SimpleDateFormat("yyyyMM")
spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)
val ratingOfYearMonth = spark.sql("select mid, score, changeDate(timestamp) as yearmonth from ratings")
ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")
val rateMoreRecentlyMoviesDF = spark.sql("select mid, count(mid) as count, yearmonth from ratingOfMonth group by yearmonth, mid order by yearmonth desc, count desc")
storeDFInMongoDB(rateMoreRecentlyMoviesDF, RATE_MORE_RECENTLY_MOVIES)
2.3 电影平均得分统计
??根据历史数据中所有用户对电影的评分,周期性的计算每个电影的平均得分 ??实现思路:通过Spark SQL读取保存在MongDB中的Rating数据集,通过执行以下SQL语句实现对于电影的平均分统计
val averageMoviesDF = spark.sql("select mid, avg(score) as avg from ratings group by mid")
storeDFInMongoDB(averageMoviesDF, AVERAGE_MOVIES)
??统计完成之后将生成的新的DataFrame写入到MongoDB的AverageMoviesScore集合中
2.4 类别优质电影统计
??根据提供的所有电影类别, 分别计算每种类型的电影集合中评分最高的 10 个电影 ??实现思路:在计算完整个电影的平均得分之后,将影片集合与电影类型做笛卡尔积,然后过滤掉电影类型不符合的条目,将DataFrame 输出到 MongoDB 的 GenresTopMovies 集合中
val genres = List("Action", "Adventure", "Animation", "Comedy", "Crime", "Documentary", "Drama", "Family", "Fantasy", "Foreign", "History", "Horror", "Music", "Mystery"
, "Romance", "Science", "Tv", "Thriller", "War", "Western")
val movieWithScore = movieDF.join(averageMoviesDF, "mid")
val genresRDD = spark.sparkContext.makeRDD(genres)
val genresTopMoviesDF = genresRDD.cartesian(movieWithScore.rdd)
.filter {
case (genre, movieRow) => movieRow.getAs[String]("genres").toLowerCase.contains(genre.toLowerCase)
}
.map {
case (genre, movieRow) => (genre, (movieRow.getAs[Int]("mid"), movieRow.getAs[Double]("avg")))
}
.groupByKey()
.map {
case (genre, items) => GenresRecommendation(genre, items.toList.sortWith(_._2 > _._2).take(10).map(item => Recommendation(item._1, item._2)))
}
.toDF()
storeDFInMongoDB(genresTopMoviesDF, GENRES_TOP_MOVIES)
三、基于隐语义模型的协同过滤推荐
??项目采用ALS作为协同过滤算法,分别根据MongoDB中的用户评分表和电影数据集计算用户电影推荐矩阵以及电影相似度矩阵
3.1 用户电影推荐矩阵
??通过ALS训练出来的Model来计算所有当前用户电影的推荐矩阵,主要思路如下: ??①:UserId和MovieId做笛卡尔积,产生(uid,mid)的元组 ??②:通过模型预测(uid,mid)的元组 ??③:将预测结果通过预测分值进行排序 ??③:返回分值最大的K个电影,作为当前用户的推荐列表 ??最后生成的数据结构如下,并将数据保存到MongoDB的UserRecs表中 ??新建recommender的子项目OfflineRecommender,引入Spark、Scala、Mongodb和Jblas的依赖如下:
<dependencies>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
??同样经过前期的构建样例类、声明配置、创建SparkSession等步骤,可以加载数据开始计算模型了。在src/main/scala/com.IronmanJay.offline/OfflineRecommender.scala中的核心代码如下:
package com.IronmanJay.offline
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
case class Recommendation(mid: Int, score: Double)
case class UserRecs(uid: Int, recs: Seq[Recommendation])
case class MovieRecs(mid: Int, recs: Seq[Recommendation])
object OfflineRecommender {
val MONGODB_RATING_COLLECTION = "Rating"
val USER_RECS = "UserRecs"
val MOVIE_RECS = "MovieRecs"
val USER_MAX_RECOMMENDATION = 20
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://linux:27017/recommender",
"mongo.db" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
val ratingRDD = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[MovieRating]
.rdd
.map(rating => (rating.uid, rating.mid, rating.score))
.cache()
val userRDD = ratingRDD.map(_._1).distinct()
val movieRDD = ratingRDD.map(_._2).distinct()
val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))
val (rank, iterations, lambda) = (200, 5, 0.1)
val model = ALS.train(trainData, rank, iterations, lambda)
val userMovies = userRDD.cartesian(movieRDD)
val preRatings = model.predict(userMovies)
val userRecs = preRatings
.filter(_.rating > 0)
.map(rating => (rating.user, (rating.MOVIE, rating.rating)))
.groupByKey()
.map {
case (uid, recs) => UserRecs(uid, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))
}
.toDF()
userRecs.write
.option("uri", mongoConfig.uri)
.option("collection", USER_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
3.2 电影相似度矩阵
??通过ALS计算电影相似度矩阵,该矩阵用于查询当前电影的相似电影并为实时推荐系统服务 ??离线计算的ALS算法最终会为用户、电影分别生成最终的特征矩阵,分别是表示用户特征矩阵的
U
(
m
×
k
)
U(m×k)
U(m×k)矩阵,每个用户由
k
k
k个特征描述;表示物品特征矩阵的
V
(
n
×
k
)
V(n×k)
V(n×k)矩阵,每个物品也由
k
k
k个特征描述 ??
V
(
n
×
k
)
V(n×k)
V(n×k)表示物品特征矩阵,每一行是一个
k
k
k维向量,虽然并不知道每一个维度的特征意义是什么,但是
k
k
k个维度的数学向量表示了该行对应电影的特征 ??所以,每个电影用
V
(
n
×
k
)
V(n×k)
V(n×k)每一行的
<
t
1
,
t
2
,
t
3
,
…
…
t
k
>
<t_1,t_2,t_3,……t_k>
<t1?,t2?,t3?,……tk?>向量表示其特征,于是任意两个电影p:特征向量为
<
V
p
=
t
p
1
,
t
p
2
,
t
p
3
,
…
…
t
p
k
>
<V_p=t_{p1},t_{p2},t_{p3},……t_{pk}>
<Vp?=tp1?,tp2?,tp3?,……tpk?>,电影q:特征向量为
<
V
q
=
t
q
1
,
t
q
2
,
t
q
3
,
…
…
t
q
k
>
<V_q=t_{q1},t_{q2},t_{q3},……t_{qk}>
<Vq?=tq1?,tq2?,tq3?,……tqk?>,并且之间的相似度
S
i
m
(
p
,
q
)
Sim(p,q)
Sim(p,q)可以使用
V
p
V_p
Vp?和
V
q
V_q
Vq?的余弦值来表示:
S
i
m
(
p
,
q
)
=
∑
i
=
0
k
(
t
p
i
×
t
q
i
)
∑
i
=
0
k
t
p
i
2
×
∑
i
=
0
k
t
q
i
2
Sim(p,q)=\frac{\sum_{i=0}^{k}(t_{pi}\times t_{qi})}{\sqrt{\sum_{i=0}^{k}t_{pi}^2}\times \sqrt{\sum_{i=0}^{k}t_{qi}^2}}
Sim(p,q)=∑i=0k?tpi2?
?×∑i=0k?tqi2?
?∑i=0k?(tpi?×tqi?)? ??数据集中任意两个电影间的相似度都可以由公式计算得到,电影与电影之间的相似度在一段时间内基本是固定值。最后生成的数据保存到MongoDB的MOVIERecs表中 ??核心代码如下:
val movieFeatures = model.MOVIEFeatures.map {
case (mid, features) => (mid, new DoubleMatrix(features))
}
val movieRecs = movieFeatures.cartesian(movieFeatures)
.filter {
case (a, b) => a._1 != b._1
}
.map {
case (a, b) => {
val simScore = this.consinSim(a._2, b._2)
(a._1, (b._1, simScore))
}
}
.filter(_._2._2 > 0.6)
.groupByKey()
.map {
case (mid, items) => MovieRecs(mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
}
.toDF()
movieRecs.write
.option("uri", mongoConfig.uri)
.option("collection", MOVIE_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
??其中,consinSim是求两个向量余弦相似度的函数,代码实现如下:
def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix): Double = {
movie1.dot(movie2) / (movie1.norm2() * movie2.norm2())
}
3.3 模型评估和参数选取
??在上述模型训练的过程中,直接给定了隐语义模型的rank、iterations、lambda三个参数。对于模型来说这并不一定是最优的参数选取,所以需要对模型进行评估。通常的做法是计算均方根误差(
R
M
S
E
RMSE
RMSE),考察预测评分与实际评分之间的误差
R
M
S
E
=
1
N
∑
t
=
1
N
(
o
b
s
e
r
v
e
d
t
?
p
r
e
d
i
c
t
e
d
t
)
2
RMSE=\sqrt{\frac{1}{N}\sum_{t=1}^{N}(observed_t-predicted_t)^2}
RMSE=N1?t=1∑N?(observedt??predictedt?)2
? ??有了
R
M
S
E
RMSE
RMSE,就可以通过多次调整参数值,来选取
R
M
S
E
RMSE
RMSE最小的一组作为模型的优化选择 ??在scala/com.IronmanJay.offline/下新建单例对象ALSTrainer,代码主体架构如下:
package com.IronmanJay.offline
import breeze.numerics.sqrt
import com.IronmanJay.offline.OfflineRecommender.MONGODB_RATING_COLLECTION
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object ALSTrainer {
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://linux:27017/recommender",
"mongo.db" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
import spark.implicits._
val ratingRDD = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[MovieRating]
.rdd
.map(rating => Rating(rating.uid, rating.mid, rating.score))
.cache()
val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
val trainingRDD = splits(0)
val testRDD = splits(1)
adjustALSParam(trainingRDD, testRDD)
spark.close()
}
}
??其中adjustALSParams方法是模型评估的核心,输入一组训练数据和测试数据,输出计算得到最小RMSE的那组参数。代码实现如下:
def adjustALSParams(trainData: RDD[Rating], testData: RDD[Rating]): Unit = {
val result = for (rank <- Array(50, 100, 200, 300); lambda <- Array(0.01, 0.1, 1))
yield {
val model = ALS.train(trainData, rank, 5, lambda)
val rmse = getRMSE(model, testData)
(rank, lambda, rmse)
}
println(result.minBy(_._3))
}
??计算
R
M
S
E
RMSE
RMSE的函数getRMSE代码实现如下:
def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
val userMOVIEs = data.map(item => (item.user, item.MOVIE))
val predictRating = model.predict(userMOVIEs)
val observed = data.map(item => ((item.user, item.MOVIE), item.rating))
val predict = predictRating.map(item => ((item.user, item.MOVIE), item.rating))
sqrt(
observed.join(predict).map {
case ((uid, mid), (actual, pre)) =>
val err = actual - pre
err * err
}.mean()
)
}
??运行代码,就可以得到目前数据的最优模型参数
总结
??此篇博客也宣告结束啦,这篇博文的代码相对于来说有点多,所以需要读者仔细阅读,要记住这篇博文写的代码不仅仅是作为离线推荐服务使用,在后面的实时推荐服务部分也有需求,所以这篇博客很重要。下篇博文将为大家带来实时推荐服务,那就下篇博客见啦!
|