数据获取和处理以及环境的配置详见上一篇文章: https://blog.csdn.net/qq_42754919/article/details/119679441
基于物品的协同过滤(Item-CF),只需收集用户的常规行为数据(比如点击、收藏、购买)就可以得到商品间的相似度,在实际项目中应用很广。
1. 基于物品的协同过滤(Item-CF)
基于物品的协同过滤(Item-CF)主要思想:对于同一个用户购买的不同商品内部应该存在某种相关性。因此可以用现存的行为数据判断两个物品的相似程度,分析商品受众的相似程度,进而得出商品间的相似度。当一个新的用户购买某个商品后,可以用商品相似度推荐给当前用户其他某种类型的商品,未必和之前购买的商品统一类型。我们把这种方法定义为物品的“同现相似度”,公式如下:我们把这种方法定义为物品的“同现相似度”,公式如下: 其中,Ni 是购买商品 i的数量,Nj 是购买商品 j 的数量,分子表示同一个用户同时购买两个商品的数量。
2. 代码
主要思路:
- 首先统计每个商品出现的频次,然后按用户ID进行内连接
- 使用sparkSQL获取每个商品出现的次数和用户的数量
- 带入公式计算出同现相似度,并过滤出相同两个商品
spark.sql()代码解析:从joined表中获取数据,按照 productId1,productId2进行分类。因此就可能存在多个userId,统计userId个数就是购买过两个商品的次数,并且count1和count2都是相同的,只需要获取第一个即可。最终count(userId) as cocount作为公式的分子,count1和count2作为公式的分母。
package com.root.itemcf
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
case class ProductRating(UserId:Int, ProductId:Int, Score:Double, Time:Int)
case class MongoConfig(uri:String, db:String)
case class Recommenderdation(productId:Int, score:Double)
case class ProductRecs(productId:Int,recs:Seq[Recommenderdation])
object ItemCFRecommender {
val MongoDB_Rating="Rating"
val ItemCF_PRODUCT_RECS = "ItemCFProductRecs"
val USER_MAX_RECOMMENDATION = 10
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ItemCFRecommender")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
val ratingDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MongoDB_Rating)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.map(x => (x.ProductId, x.UserId))
.toDF("userId", "productId")
.cache()
val productRatingCountDF = ratingDF.groupBy("productId").count()
val ratingWithCountDF = ratingDF.join(productRatingCountDF, "productId")
val joinedDF = ratingWithCountDF.join(ratingWithCountDF, "userId")
.toDF("userId", "productId1", "count1", "productId2", "count2")
joinedDF.show()
joinedDF.createOrReplaceTempView("joined")
val cooccurrenceDF = spark.sql(
"""
|select productId1,productId2,
|count(userId) as cocount,
|first(count1) as count1,
|first(count2) as count2
|from joined
|group by productId1,productId2
|""".stripMargin).cache()
cooccurrenceDF.show()
val simDF = cooccurrenceDF.map {
row =>
val coocSim = cooccurrenceSim(row.getAs[Long]("cocount"), row.getAs[Long]("count1"), row.getAs[Long]("count1"))
(row.getInt(0), (row.getInt(1), coocSim))
}
.rdd
.groupByKey()
.map { case (productId, recs) =>
ProductRecs(productId, recs.toList
.filter(x => x._1 != productId)
.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION)
.map(x => Recommenderdation(x._1, x._2)))
}
.toDF()
simDF.write
.option("uri",mongoConfig.uri)
.option("collection",ItemCF_PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
def cooccurrenceSim(coCount: Long, count1: Long, count2: Long): Double = {
coCount / math.sqrt(count1 * count2)
}
}
3. 结果展示
|