Spark处理特征和样本优势
- 既然选择用Tensorflow框架进行模型训练,为什么不用Tensorflow处理特征和样本呢?
- 答:Spark的专长就是数据处理,而Tensorflow不擅长分布式的并行数据处理,在并行数据处理能力上远不及成百上千节点的Spark的。在面对海量数据的时候,利用Spark进行数据清洗、数据预处理、特征提取的话,可以为Tensorflow减轻负担。
物品和用户特征
MovieLens数据集中,可供我们提取特征的数据表有两个,分别是movies表和ratings表,数据格式如下图:
- 根据两张表我们可以整理以下特征:
- 特征部分代码如下:
val movieRatingFeatures = samplesWithMovies3.groupBy(col("movieId"))
.agg(count(lit(1)).as("movieRatingCount"),
avg(col("rating")).as("movieAvgRating"),
stddev(col("rating")).as("movieRatingStddev"))
-
计算统计型特征的典型方法,就是利用Spark中的groupBy操作,将原始评分数据按照movieId分组,然后用agg聚合操作来计算一些统计型特征。我们就分别使用了count内置聚合函数来统计电影评价次数(movieRatingCount),用avg函数来统计评分均值(movieAvgRating),以及使用stddev函数来计算评价分数的标准差(movieRatingStddev)。 -
总结:一般来说,我们不会人为预设哪个特征有用或无用,而是让模型自己去判断,如果一个特征的加入没有提升模型效果,我们再去除这个特征。就像我刚才虽然提取了不少特征,但并不是说每个模型都会使用全部的特征,而是根据模型结构、模型效果有针对性地部分使用它们。
最终的训练样本
- 对于一个推荐模型来说,它的根本任务是预测一个用户U对一个物品I在场景C下的喜好分数。所以在训练时,我们要为模型生成一组包含U、I、C的特征,以及最终真实得分的样本。在SparrowRecsys中,这样的样本就是基于评分数据ratings,联合用户、物品特征得来的。
- 用户特征和物品特征都需要我们提前生成好,然后让它们与ratings数据进行join后,生成最终的训练样本。代码如下:
val ratingSamples = spark.read.format("csv").option("header", "true").load(ratingsResourcesPath.getPath)
val ratingSamplesWithLabel = addSampleLabel(ratingSamples)
val samplesWithMovieFeatures = addMovieFeatures(movieSamples, ratingSamplesWithLabel)
val samplesWithUserFeatures = addUserFeatures(samplesWithMovieFeatures)
- 对于MovieLens数据集来说,用户对电影的评分是最直接的标签数据,因为它就是我们想要预测的用户对电影的评价,所以ratings表中的0-5的评分数据自然可以作为样本的标签。
- 但对于很多应用来说,我们基本上不可能拿到它们的评分数据,更多的是点击、观看、购买这些隐性的反馈数据,所以业界更多使用 CTR 预估这类解决二分类问题的模型去解决推荐问题。
- MovieLens数据集处理:把评分大于等于3.5分的样本标签标识为1,意为“喜欢”,评分小于3.5分的样本标签标识为0,意为“不喜欢”。这样一来,我们可以完全把推荐问题转换为CTR预估问题。
- 相关代码如下:
def addSampleLabel(ratingSamples:DataFrame): DataFrame ={
ratingSamples.show(10, truncate = false)
ratingSamples.printSchema()
val sampleCount = ratingSamples.count()
ratingSamples.groupBy(col("rating")).count().orderBy(col("rating"))
.withColumn("percentage", col("count")/sampleCount).show(100,truncate = false)
ratingSamples.withColumn("label", when(col("rating") >= 3.5, 1).otherwise(0))
}
避免引入未来信息
- 什么叫未来信息?
- 答:如果在t时刻进行模型预测,那么t+1时刻的信息就是未来信息。这个问题在模型线上服务的时候是不存在的,因为未来的事情还未发生,我们不可能知道。但在离线训练的时候,我们就容易犯这样的错误。比如说,我们利用t时刻的样本进行训练,但是使用了全量的样本生成特征,这些特征就包含了t+1时刻的未来信息,这就是一个典型的引入未来信息的错误例子。
- 在Spark中,处理这些跟历史行为相关的特征呢需要用到window函数了。请参考部分代码:
withColumn("userAvgRating", avg(col("rating"))
.over(Window.partitionBy("userId")
.orderBy(col("timestamp")).rowsBetween(-100, -1)))
- “over(Window.partitionBy(“userId”).orderBy(col(“timestamp”)))”操作就是做rating平均这个操作的时候,不对userId下面的所有评分取平均值,而是要创建一个滑动窗口,先把这个用户下面的评分按照时间排序,再让这个滑动窗口滑动,滑动窗口的位置始终在当前rating前一个rating的位置。
特征数据存入线上
- 我们还是将其存入特征数据库Redis,线上推断的时候,再把所需的用户特征和物品特征分别取出,拼接成模型所需的特征向量就可以了。
- 相关部分代码如下:
val userKey = userFeaturePrefix + sample.getAs[String]("userId")
val valueMap = mutable.Map[String, String]()
valueMap("userRatedMovie1") = sample.getAs[String]("userRatedMovie1")
valueMap("userRatedMovie2") = sample.getAs[String]("userRatedMovie2")
...
valueMap("userAvgRating") = sample.getAs[String]("userAvgRating")
valueMap("userRatingStddev") = sample.getAs[String]("userRatingStddev")
redisClient.hset(userKey, JavaConversions.mapAsJavaMap(valueMap))
- 代码中使用了Redis一个新的操作hset,它的作用是把一个Map存入Redis。优势:对于这里的用户特征来说,Map中存储的就是特征的键值对,又因为这个Map本身是userId的值,所以,每个userId都拥有一组用户特征。这样一来,我们就可以在推荐服务器内部,通过userId来取出所有对应的用户特征了。
|