这篇文章我们来讲讲推荐系统中必不可少的特征工程。
-
用户行为数据 用户行为在推荐系统中一般分为显性反馈行为(Explicit Feedback)和隐性反馈行为(Implicit Feedback)两种。
? 能够反映用户行为特点的隐性反馈是目前特征挖掘的重点。
-
用户关系数据 用户与用户之间可以通过“关注”“好友关系”等连接建立“强关系”,也可以通过“互相点赞”“同处一个社区”,甚至“同看一部电影”建立“弱关系”。 一般是通过 Multi-hot 编码的方式将其转换成特征向量,一些重要的属性标签类特征也可以先转换成 Embedding,比如业界最新的做法是将标签属性类数据与其描述主体一起构建成知识图谱(Knowledge Graph),在其上施以 Graph Embedding 或者 GNN(Graph Neural Network,图神经网络)生成各节点的 Embedding,再输入推荐模型。 -
属性、标签类数据
-
内容类数据 内容类数据往往是大段的描述型文字、图片,甚至视频。 在图片类、视频类或是带有图片的信息流推荐场景中,我们往往会利用计算机视觉模型进行目标检测,抽取图片特征,再把这些特征(要素)转换成标签类数据供推荐系统使用。而文字信息则更多是通过自然语言处理的方法提取关键词、主题、分类等信息,一旦这些特征被提取出来,就跟处理属性、标签类特征的方法一样,通过 Multi-hot 编码,Embedding 等方式输入推荐系统进行训练。 -
场景信息(上下文信息) 最常用的上下文信息是“时间”和通过 GPS、IP 地址获得的“地点”信息。根据推荐场景的不同,上下文信息的范围极广,除了我们上面提到的时间和地点,还包括“当前所处推荐页面"季节"、 “月份”、 “是否节假日”、 “天气”、 “空气质量” 、"社会大事件"等等。
使用PySpark特征处理
1. 类别型编码
代码见:RecPySpark/src/com/sparrowrecsys/offline/pyspark/featureeng/FeatureEngineering.py
One-Hot编码
def oneHotEncoderExample(movieSamples):
samplesWithIdNumber = movieSamples.withColumn("movieIdNumber", F.col("movieId").cast(IntegerType()))
encoder = OneHotEncoder(inputCols=["movieIdNumber"], outputCols=['movieIdVector'], dropLast=True)
oneHotEncoderSamples = encoder.fit(samplesWithIdNumber).transform(samplesWithIdNumber)
oneHotEncoderSamples.printSchema()
oneHotEncoderSamples.show(10)
Multi-Hot编码
def array2vec(genreIndexes, indexSize):
"""
转为稀疏向量形式
1. 排序
2. 按照类别数,都设置为1
3.
"""
genreIndexes.sort()
fill_list = [1.0 for _ in range(len(genreIndexes))]
return Vectors.sparse(indexSize, genreIndexes, fill_list)
def multiHotEncoderExample(movieSamples):
samplesWithGenre = movieSamples.select("movieId", "title", explode(
split(F.col("genres"), "\\|").cast(ArrayType(StringType()))).alias('genre'))
genreIndexer = StringIndexer(inputCol="genre", outputCol="genreIndex")
StringIndexerModel = genreIndexer.fit(samplesWithGenre)
genreIndexSamples = StringIndexerModel.transform(samplesWithGenre).withColumn("genreIndexInt", F.col("genreIndex").cast(IntegerType()))
indexSize = genreIndexSamples.agg(max(F.col("genreIndexInt"))).head()[0] + 1
processedSamples = genreIndexSamples.groupBy('movieId').agg(
F.collect_list('genreIndexInt').alias('genreIndexes')).withColumn("indexSize", F.lit(indexSize))
finalSample = processedSamples.withColumn("vector", udf(array2vec, VectorUDT()) (F.col("genreIndexes"),F.col("indexSize"))
finalSample.printSchema()
finalSample.show(10)
2. 数值型编码
代码见:RecPySpark/src/com/sparrowrecsys/offline/pyspark/featureeng/FeatureEngineering.py
归一化和分桶
def ratingFeatures(ratingSamples):
ratingSamples.printSchema()
ratingSamples.show()
movieFeatures = ratingSamples.groupBy('movieId').agg(F.count(F.lit(1)).alias('ratingCount'),
F.avg("rating").alias("avgRating"),
F.variance('rating').alias('ratingVar')) \
.withColumn('avgRatingVec', udf(lambda x: Vectors.dense(x), VectorUDT())('avgRating'))
movieFeatures.show(10)
ratingCountDiscretizer = QuantileDiscretizer(numBuckets=100, inputCol="ratingCount", outputCol="ratingCountBucket")
ratingScaler = MinMaxScaler(inputCol="avgRatingVec", outputCol="scaleAvgRating")
pipelineStage = [ratingCountDiscretizer, ratingScaler]
featurePipeline = Pipeline(stages=pipelineStage)
movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures)
movieProcessedFeatures.show(10)
3. Embedding
Word2Vec
经典的Embedding方法是Word2Vec,包括CBOW 模型(图 3 左)和 Skip-gram 模型(图 3 右)。
Item2Vec
之后又出现了Word2Vec的扩展——Item2Vec 。Item2Vec 模型的技术细节几乎和 Word2vec 完全一致,只要能够用序列数据的形式把我们要表达的对象表示出来,再把序列数据“喂”给 Word2vec 模型,我们就能够得到任意物品的 Embedding 了。
Item2vec 的提出对于推荐系统来说当然是至关重要的,因为它使得“万物皆 Embedding”成为了可能。对于推荐系统来说,Item2vec 可以利用物品的 Embedding 直接求得它们的相似性,或者作为重要的特征输入推荐模型进行训练,这些都有助于提升推荐系统的效果。
实战:观看电影序列转化为Embedding向量。
代码见:RecPySpark/src/com/sparrowrecsys/offline/pyspark/embedding/Embedding.py
-
过滤出评分>=3.5的数据(考虑到低评分不是用户喜欢的电影) -
根据timestamp排序,形成观影序列。 -
采用Word2Vec()函数,构建Embedding向量。
class UdfFunction:
@staticmethod
def sortF(movie_list, timestamp_list):
"""
sort by time and return the corresponding movie sequence
eg:
input: movie_list:[1,2,3]
timestamp_list:[1112486027,1212546032,1012486033]
return [3,1,2]
"""
pairs = []
for m, t in zip(movie_list, timestamp_list):
pairs.append((m, t))
pairs = sorted(pairs, key=lambda x: x[1])
return [x[0] for x in pairs]
def processItemSequence(spark, rawSampleDataPath):
"""
形成观影序列
"""
ratingSamples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
sortUdf = udf(UdfFunction.sortF, ArrayType(StringType()))
userSeq = ratingSamples \
.where(F.col("rating") >= 3.5) \
.groupBy("userId") \
.agg(sortUdf(F.collect_list("movieId"), F.collect_list("timestamp")).alias('movieIds')) \
.withColumn("movieIdStr", array_join(F.col("movieIds"), " "))
return userSeq.select('movieIdStr').rdd.map(lambda x: x[0].split(' '))
def trainItem2vec(spark, samples, embLength, embOutputPath):
"""
训练Item2Vec
"""
word2vec = Word2Vec().setVectorSize(embLength).setWindowSize(5).setNumIterations(10)
model = word2vec.fit(samples)
synonyms = model.findSynonyms("158", 20)
for synonym, cosineSimilarity in synonyms:
print(synonym, cosineSimilarity)
embOutputDir = '/'.join(embOutputPath.split('/')[:-1])
if not os.path.exists(embOutputDir):
os.makedirs(embOutputDir)
with open(embOutputPath, 'w') as f:
for movie_id in model.getVectors():
vectors = " ".join([str(emb) for emb in model.getVectors()[movie_id]])
f.write(movie_id + ":" + vectors + "\n")
embeddingLSH(spark, model.getVectors())
return model
if __name__ == "__main__":
conf = SparkConf().setAppName('ctrModel').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
file_path = 'file:///Users/XXX/SparrowRecSys/src/main/resources'
rawSampleDataPath = file_path + "/webroot/sampledata/ratings.csv"
embLength = 10
samples = processItemSequence(spark, rawSampleDataPath)
model = trainItem2vec(spark, samples, embLength,
embOutputPath=file_path[7:] + "/webroot/modeldata2/item2vecEmb.csv")
在深度学习推荐系统中,我们经常采用 Embedding 召回这一准确又便捷的方法。但是,在面对百万甚至更高量级的候选集时,线性地逐一计算 Embedding 间的相似度,往往会造成极大的服务延迟。这个时候,我们要解决的问题就是,如何快速找到与一个 Embedding 最相似的 Embedding?这直接决定了召回层的执行速度,进而会影响推荐服务器的响应延迟。
业界解决近似 Embedding 搜索的主要方法是局部敏感哈希。
局部敏感哈希能在常数时间得到最近邻的结果吗?
答案是可以的,如果我们能够精确地控制每个桶内的点的规模是 C,假设每个 Embedding 的维度是 N,那么找到最近邻点的时间开销将永远在 O(C?N) 量级。采用多桶策略之后,假设分桶函数数量是 K,那么时间开销也在 O(K?C?N) 量级,这仍然是一个常数。
def embeddingLSH(spark, movieEmbMap):
movieEmbSeq = []
for key, embedding_list in movieEmbMap.items():
embedding_list = [np.float64(embedding) for embedding in embedding_list]
movieEmbSeq.append((key, Vectors.dense(embedding_list)))
movieEmbDF = spark.createDataFrame(movieEmbSeq).toDF("movieId", "emb")
bucketProjectionLSH = BucketedRandomProjectionLSH(inputCol="emb", outputCol="bucketId", bucketLength=0.1,
numHashTables=3)
bucketModel = bucketProjectionLSH.fit(movieEmbDF)
embBucketResult = bucketModel.transform(movieEmbDF)
print("movieId, emb, bucketId schema:")
embBucketResult.printSchema()
print("movieId, emb, bucketId data result:")
embBucketResult.show(10, truncate=False)
print("Approximately searching for 5 nearest neighbors of the sample embedding:")
sampleEmb = Vectors.dense(0.795, 0.583, 1.120, 0.850, 0.174, -0.839, -0.0633, 0.249, 0.673, -0.237)
bucketModel.approxNearestNeighbors(movieEmbDF, sampleEmb, 5).show(truncate=False)
Graph Embedding
用户和物品之间的相互行为生成了行为关系图。借助这样的关系图,我们自然能够利用 Embedding 技术发掘出物品和物品之间、用户和用户之间,以及用户和物品之间的关系,从而应用于推荐系统的进一步推荐。
def generate_pair(x):
pairSeq = []
previousItem = ''
for item in x:
if not previousItem:
previousItem = item
else:
pairSeq.append((previousItem, item))
previousItem = item
return pairSeq
def generateTransitionMatrix(samples):
"""
转移概率矩阵
"""
pairSamples = samples.flatMap(lambda x: generate_pair(x))
pairCountMap = pairSamples.countByValue()
pairTotalCount = 0
transitionCountMatrix = defaultdict(dict)
itemCountMap = defaultdict(int)
for key, cnt in pairCountMap.items():
key1, key2 = key
transitionCountMatrix[key1][key2] = cnt
itemCountMap[key1] += cnt
pairTotalCount += cnt
transitionMatrix = defaultdict(dict)
itemDistribution = defaultdict(dict)
for key1, transitionMap in transitionCountMatrix.items():
for key2, cnt in transitionMap.items():
transitionMatrix[key1][key2] = transitionCountMatrix[key1][key2] / itemCountMap[key1]
for itemid, cnt in itemCountMap.items():
itemDistribution[itemid] = cnt / pairTotalCount
return transitionMatrix, itemDistribution
def oneRandomWalk(transitionMatrix, itemDistribution, sampleLength):
"""
单次随机游走函数
"""
sample = []
randomDouble = random.random()
firstItem = ""
accumulateProb = 0.0
for item, prob in itemDistribution.items():
accumulateProb += prob
if accumulateProb >= randomDouble:
firstItem = item
break
sample.append(firstItem)
curElement = firstItem
i = 1
while i < sampleLength:
if (curElement not in itemDistribution) or (curElement not in transitionMatrix):
break
probDistribution = transitionMatrix[curElement]
randomDouble = random.random()
accumulateProb = 0.0
for item, prob in probDistribution.items():
accumulateProb += prob
if accumulateProb >= randomDouble:
curElement = item
break
sample.append(curElement)
i += 1
return sample
def randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength):
"""
随机游走采样函数
"""
samples = []
for i in range(sampleCount):
samples.append(oneRandomWalk(transitionMatrix, itemDistribution, sampleLength))
return samples
def graphEmb(samples, spark, embLength, embOutputFilename):
transitionMatrix, itemDistribution = generateTransitionMatrix(samples)
sampleCount = 20000
sampleLength = 10
newSamples = randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength)
rddSamples = spark.sparkContext.parallelize(newSamples)
trainItem2vec(spark, rddSamples, embLength, embOutputFilename)
上述代码最终结果是形成电影Embedding,而用户Embedding则可以采用用户观看的所有电影的Embedding之和,代码如下:
def generateUserEmb(spark, rawSampleDataPath, model, embLength, embOutputPath, saveToRedis, redisKeyPrefix):
"""
用户Embedding向量
"""
ratingSamples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
Vectors_list = []
for key, value in model.getVectors().items():
Vectors_list.append((key, list(value)))
fields = [
StructField('movieId', StringType(), False),
StructField('emb', ArrayType(FloatType()), False)
]
schema = StructType(fields)
Vectors_df = spark.createDataFrame(Vectors_list, schema=schema)
ratingSamples = ratingSamples.join(Vectors_df, on='movieId', how='inner')
result = ratingSamples.select('userId', 'emb').rdd.map(lambda x: (x[0], x[1])) \
.reduceByKey(lambda a, b: [a[i] + b[i] for i in range(len(a))]).collect()
with open(embOutputPath, 'w') as f:
for row in result:
vectors = " ".join([str(emb) for emb in row[1]])
f.write(row[0] + ":" + vectors + "\n")
|