IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 人工智能 -> 推荐系统中常用的特征工程 -> 正文阅读

[人工智能]推荐系统中常用的特征工程

这篇文章我们来讲讲推荐系统中必不可少的特征工程。

  1. 用户行为数据

    用户行为在推荐系统中一般分为显性反馈行为(Explicit Feedback)和隐性反馈行为(Implicit Feedback)两种。

img

? 能够反映用户行为特点的隐性反馈是目前特征挖掘的重点。

  1. 用户关系数据

    用户与用户之间可以通过“关注”“好友关系”等连接建立“强关系”,也可以通过“互相点赞”“同处一个社区”,甚至“同看一部电影”建立“弱关系”。

    一般是通过 Multi-hot 编码的方式将其转换成特征向量,一些重要的属性标签类特征也可以先转换成 Embedding,比如业界最新的做法是将标签属性类数据与其描述主体一起构建成知识图谱(Knowledge Graph),在其上施以 Graph Embedding 或者 GNN(Graph Neural Network,图神经网络)生成各节点的 Embedding,再输入推荐模型。

  2. 属性、标签类数据

img

  1. 内容类数据

    内容类数据往往是大段的描述型文字、图片,甚至视频。

    在图片类、视频类或是带有图片的信息流推荐场景中,我们往往会利用计算机视觉模型进行目标检测,抽取图片特征,再把这些特征(要素)转换成标签类数据供推荐系统使用。而文字信息则更多是通过自然语言处理的方法提取关键词、主题、分类等信息,一旦这些特征被提取出来,就跟处理属性、标签类特征的方法一样,通过 Multi-hot 编码,Embedding 等方式输入推荐系统进行训练。

  2. 场景信息(上下文信息)

    最常用的上下文信息是“时间”和通过 GPS、IP 地址获得的“地点”信息。根据推荐场景的不同,上下文信息的范围极广,除了我们上面提到的时间和地点,还包括“当前所处推荐页面"季节"、 “月份”、 “是否节假日”、 “天气”、 “空气质量” 、"社会大事件"等等。

使用PySpark特征处理

1. 类别型编码

代码见:RecPySpark/src/com/sparrowrecsys/offline/pyspark/featureeng/FeatureEngineering.py

One-Hot编码

def oneHotEncoderExample(movieSamples):
  	# 把电影ID转为Int类型
    samplesWithIdNumber = movieSamples.withColumn("movieIdNumber", F.col("movieId").cast(IntegerType()))
    # OneHot编码,输入字段:movieIdNumber,输出字段:movieIdVector
    # dropLast=True,表示删除最后一列字段
    encoder = OneHotEncoder(inputCols=["movieIdNumber"], outputCols=['movieIdVector'], dropLast=True)
    oneHotEncoderSamples = encoder.fit(samplesWithIdNumber).transform(samplesWithIdNumber)
    # 输出Schema
    oneHotEncoderSamples.printSchema()
    oneHotEncoderSamples.show(10)

Multi-Hot编码

def array2vec(genreIndexes, indexSize):
  	"""
  	转为稀疏向量形式
  	1. 排序
  	2. 按照类别数,都设置为1
  	3. 
  	"""
  	# 转为向量形式
    # 1. 排序
    # 2. 创建一个稀疏向量
    genreIndexes.sort()
    fill_list = [1.0 for _ in range(len(genreIndexes))]
    return Vectors.sparse(indexSize, genreIndexes, fill_list)
def multiHotEncoderExample(movieSamples):
  	# 将genres特征,通过|划分开,将一行转化为多行,输出字段为genre,字符串类型。
    samplesWithGenre = movieSamples.select("movieId", "title", explode(
        split(F.col("genres"), "\\|").cast(ArrayType(StringType()))).alias('genre'))
    
    # 将字符串类型的值转化为Index形式,并输出为Int类型
    genreIndexer = StringIndexer(inputCol="genre", outputCol="genreIndex")
    StringIndexerModel = genreIndexer.fit(samplesWithGenre)
    genreIndexSamples = StringIndexerModel.transform(samplesWithGenre).withColumn("genreIndexInt", F.col("genreIndex").cast(IntegerType()))
    # 获取不同类别数
    indexSize = genreIndexSamples.agg(max(F.col("genreIndexInt"))).head()[0] + 1
    # 聚合成列表形式,并增加一列 不同类别数
    processedSamples = genreIndexSamples.groupBy('movieId').agg(
        F.collect_list('genreIndexInt').alias('genreIndexes')).withColumn("indexSize", F.lit(indexSize))
    # 增加稀疏向量纬度
    finalSample = processedSamples.withColumn("vector", udf(array2vec, VectorUDT())		 (F.col("genreIndexes"),F.col("indexSize"))
    finalSample.printSchema()
    finalSample.show(10)

2. 数值型编码

代码见:RecPySpark/src/com/sparrowrecsys/offline/pyspark/featureeng/FeatureEngineering.py

归一化和分桶

def ratingFeatures(ratingSamples):
    ratingSamples.printSchema()
    ratingSamples.show()
    # 计算电影评分人数和电影平均得分
    movieFeatures = ratingSamples.groupBy('movieId').agg(F.count(F.lit(1)).alias('ratingCount'),
                                                         F.avg("rating").alias("avgRating"),
                                                         F.variance('rating').alias('ratingVar')) \
        .withColumn('avgRatingVec', udf(lambda x: Vectors.dense(x), VectorUDT())('avgRating'))
    movieFeatures.show(10)
    # 分桶,分桶数量100
    ratingCountDiscretizer = QuantileDiscretizer(numBuckets=100, inputCol="ratingCount", outputCol="ratingCountBucket")
    # 0-1标准化
    ratingScaler = MinMaxScaler(inputCol="avgRatingVec", outputCol="scaleAvgRating")
    pipelineStage = [ratingCountDiscretizer, ratingScaler]
    # pipeline
    featurePipeline = Pipeline(stages=pipelineStage)
    movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures)
    movieProcessedFeatures.show(10)

3. Embedding

Word2Vec

经典的Embedding方法是Word2Vec,包括CBOW 模型(图 3 左)和 Skip-gram 模型(图 3 右)。

img

Item2Vec

之后又出现了Word2Vec的扩展——Item2Vec 。Item2Vec 模型的技术细节几乎和 Word2vec 完全一致,只要能够用序列数据的形式把我们要表达的对象表示出来,再把序列数据“喂”给 Word2vec 模型,我们就能够得到任意物品的 Embedding 了。

Item2vec 的提出对于推荐系统来说当然是至关重要的,因为它使得“万物皆 Embedding”成为了可能。对于推荐系统来说,Item2vec 可以利用物品的 Embedding 直接求得它们的相似性,或者作为重要的特征输入推荐模型进行训练,这些都有助于提升推荐系统的效果。

实战:观看电影序列转化为Embedding向量。

代码见:RecPySpark/src/com/sparrowrecsys/offline/pyspark/embedding/Embedding.py

  1. 过滤出评分>=3.5的数据(考虑到低评分不是用户喜欢的电影)

  2. 根据timestamp排序,形成观影序列。

  3. 采用Word2Vec()函数,构建Embedding向量。

class UdfFunction:
    @staticmethod
    def sortF(movie_list, timestamp_list):
        """
        sort by time and return the corresponding movie sequence
        eg:
            input: movie_list:[1,2,3]
                   timestamp_list:[1112486027,1212546032,1012486033]
            return [3,1,2]
        """
        pairs = []
        for m, t in zip(movie_list, timestamp_list):
            pairs.append((m, t))
        # sort by time
        pairs = sorted(pairs, key=lambda x: x[1])
        return [x[0] for x in pairs]


def processItemSequence(spark, rawSampleDataPath):
  """
  形成观影序列
  """
    # rating data
    ratingSamples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
    # ratingSamples.show(5)
    # ratingSamples.printSchema()
    sortUdf = udf(UdfFunction.sortF, ArrayType(StringType()))
    # 把原始的rating数据处理成序列数据
    userSeq = ratingSamples \
        .where(F.col("rating") >= 3.5) \
        .groupBy("userId") \
        .agg(sortUdf(F.collect_list("movieId"), F.collect_list("timestamp")).alias('movieIds')) \
        .withColumn("movieIdStr", array_join(F.col("movieIds"), " "))
    # userSeq.select("userId", "movieIdStr").show(10, truncate = False)
    # 把序列数据筛选出来,丢掉其他过程数据
    return userSeq.select('movieIdStr').rdd.map(lambda x: x[0].split(' '))
  
  
def trainItem2vec(spark, samples, embLength, embOutputPath):
  """
  训练Item2Vec
  """
  	# word2Vec设置向量维度10,窗口5,迭代次数10
    word2vec = Word2Vec().setVectorSize(embLength).setWindowSize(5).setNumIterations(10)
    model = word2vec.fit(samples)
    # 查看与电影ID为158最相近的前20部电影
    synonyms = model.findSynonyms("158", 20)
    for synonym, cosineSimilarity in synonyms:
        print(synonym, cosineSimilarity)
    # 如果不存在文件夹就创建
    embOutputDir = '/'.join(embOutputPath.split('/')[:-1])
    if not os.path.exists(embOutputDir):
        os.makedirs(embOutputDir)
    # 将电影Embedding后的向量写入文件中
    with open(embOutputPath, 'w') as f:
        for movie_id in model.getVectors():
            vectors = " ".join([str(emb) for emb in model.getVectors()[movie_id]])
            f.write(movie_id + ":" + vectors + "\n")
    # 局部敏感哈希
    embeddingLSH(spark, model.getVectors())
    return model

  
if __name__ == "__main__":
  	conf = SparkConf().setAppName('ctrModel').setMaster('local')
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    # Change to your own filepath
    file_path = 'file:///Users/XXX/SparrowRecSys/src/main/resources'
    rawSampleDataPath = file_path + "/webroot/sampledata/ratings.csv"
    embLength = 10
    # 形成用户观影序列
    samples = processItemSequence(spark, rawSampleDataPath)
    # 训练Item2Vec
    model = trainItem2vec(spark, samples, embLength, 
                          embOutputPath=file_path[7:] + "/webroot/modeldata2/item2vecEmb.csv")

在深度学习推荐系统中,我们经常采用 Embedding 召回这一准确又便捷的方法。但是,在面对百万甚至更高量级的候选集时,线性地逐一计算 Embedding 间的相似度,往往会造成极大的服务延迟。这个时候,我们要解决的问题就是,如何快速找到与一个 Embedding 最相似的 Embedding?这直接决定了召回层的执行速度,进而会影响推荐服务器的响应延迟。

业界解决近似 Embedding 搜索的主要方法是局部敏感哈希。

局部敏感哈希能在常数时间得到最近邻的结果吗?

答案是可以的,如果我们能够精确地控制每个桶内的点的规模是 C,假设每个 Embedding 的维度是 N,那么找到最近邻点的时间开销将永远在 O(C?N) 量级。采用多桶策略之后,假设分桶函数数量是 K,那么时间开销也在 O(K?C?N) 量级,这仍然是一个常数。

def embeddingLSH(spark, movieEmbMap):
    movieEmbSeq = []
    # 将电影embedding数据转换成dense Vector的形式,便于之后处理
    for key, embedding_list in movieEmbMap.items():
        embedding_list = [np.float64(embedding) for embedding in embedding_list]
        movieEmbSeq.append((key, Vectors.dense(embedding_list)))
    movieEmbDF = spark.createDataFrame(movieEmbSeq).toDF("movieId", "emb")
    # 利用Spark MLlib创建LSH分桶模型。BucketLength:分桶宽度;numHashTables:多桶策略的分桶次数
    bucketProjectionLSH = BucketedRandomProjectionLSH(inputCol="emb", outputCol="bucketId", bucketLength=0.1,
                                                      numHashTables=3)
    bucketModel = bucketProjectionLSH.fit(movieEmbDF)
    embBucketResult = bucketModel.transform(movieEmbDF)
    # 打印分桶结果
    print("movieId, emb, bucketId schema:")
    embBucketResult.printSchema()
    print("movieId, emb, bucketId data result:")
    embBucketResult.show(10, truncate=False)
    # 尝试对一个示例Embedding查找最近邻
    print("Approximately searching for 5 nearest neighbors of the sample embedding:")
    sampleEmb = Vectors.dense(0.795, 0.583, 1.120, 0.850, 0.174, -0.839, -0.0633, 0.249, 0.673, -0.237)
    bucketModel.approxNearestNeighbors(movieEmbDF, sampleEmb, 5).show(truncate=False)

Graph Embedding

用户和物品之间的相互行为生成了行为关系图。借助这样的关系图,我们自然能够利用 Embedding 技术发掘出物品和物品之间、用户和用户之间,以及用户和物品之间的关系,从而应用于推荐系统的进一步推荐。

img

def generate_pair(x):
    # eg:
    # watch sequence:['858', '50', '593', '457']
    # return:[['858', '50'],['50', '593'],['593', '457']]
    pairSeq = []
    previousItem = ''
    for item in x:
        if not previousItem:
            previousItem = item
        else:
            pairSeq.append((previousItem, item))
            previousItem = item
    return pairSeq


def generateTransitionMatrix(samples):
  """
  转移概率矩阵
  """
  	# 通过flatMap操作把观影序列打碎成一个个影片对
    pairSamples = samples.flatMap(lambda x: generate_pair(x))
    # 统计影片对的数量
    pairCountMap = pairSamples.countByValue()
    pairTotalCount = 0
    # 
    transitionCountMatrix = defaultdict(dict)
    itemCountMap = defaultdict(int)
    for key, cnt in pairCountMap.items():
        key1, key2 = key
        transitionCountMatrix[key1][key2] = cnt
        itemCountMap[key1] += cnt
        pairTotalCount += cnt
    # 跳转边权重
    transitionMatrix = defaultdict(dict)
    # 相关出边权重
    itemDistribution = defaultdict(dict)
    for key1, transitionMap in transitionCountMatrix.items():
        for key2, cnt in transitionMap.items():
            transitionMatrix[key1][key2] = transitionCountMatrix[key1][key2] / itemCountMap[key1]
    for itemid, cnt in itemCountMap.items():
        itemDistribution[itemid] = cnt / pairTotalCount
    return transitionMatrix, itemDistribution


def oneRandomWalk(transitionMatrix, itemDistribution, sampleLength):
  """
  单次随机游走函数
  """
    sample = []
    # pick the first element
    # 随机选择起始点
    randomDouble = random.random()
    firstItem = ""
    accumulateProb = 0.0
    for item, prob in itemDistribution.items():
        accumulateProb += prob
        if accumulateProb >= randomDouble:
            firstItem = item
            break
    sample.append(firstItem)
    curElement = firstItem
    i = 1
    while i < sampleLength:
        if (curElement not in itemDistribution) or (curElement not in transitionMatrix):
            break
        probDistribution = transitionMatrix[curElement]
        randomDouble = random.random()
        accumulateProb = 0.0
        for item, prob in probDistribution.items():
            accumulateProb += prob
            if accumulateProb >= randomDouble:
                curElement = item
                break
        sample.append(curElement)
        i += 1
    return sample


def randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength):
  """
  随机游走采样函数
  """
    samples = []
    for i in range(sampleCount):
        samples.append(oneRandomWalk(transitionMatrix, itemDistribution, sampleLength))
    return samples
  

def graphEmb(samples, spark, embLength, embOutputFilename):
  	# 转移概率矩阵
    transitionMatrix, itemDistribution = generateTransitionMatrix(samples)
    # 随机游走采样次数
    sampleCount = 20000
    # 随机游走长度
    sampleLength = 10
    # 随机游走函数,获得20000*10的游走矩阵
    newSamples = randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength)
    rddSamples = spark.sparkContext.parallelize(newSamples)
    # Item2Vec训练
    trainItem2vec(spark, rddSamples, embLength, embOutputFilename)

上述代码最终结果是形成电影Embedding,而用户Embedding则可以采用用户观看的所有电影的Embedding之和,代码如下:

def generateUserEmb(spark, rawSampleDataPath, model, embLength, embOutputPath, saveToRedis, redisKeyPrefix):
  """
  用户Embedding向量
  """
    ratingSamples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
    # 物品Embedding,eg: [movieid1: [v1, v2, v3,...,v10], movieid2: [v1, v2, v3,...,v10], ...]
    Vectors_list = []
    for key, value in model.getVectors().items():
        Vectors_list.append((key, list(value)))
    fields = [
        StructField('movieId', StringType(), False),
        StructField('emb', ArrayType(FloatType()), False)
    ]
    schema = StructType(fields)
    Vectors_df = spark.createDataFrame(Vectors_list, schema=schema)
    ratingSamples = ratingSamples.join(Vectors_df, on='movieId', how='inner')
    # 根据userId聚合,emb累加
    result = ratingSamples.select('userId', 'emb').rdd.map(lambda x: (x[0], x[1])) \
        .reduceByKey(lambda a, b: [a[i] + b[i] for i in range(len(a))]).collect()
    with open(embOutputPath, 'w') as f:
        for row in result:
            vectors = " ".join([str(emb) for emb in row[1]])
            f.write(row[0] + ":" + vectors + "\n")
  人工智能 最新文章
2022吴恩达机器学习课程——第二课(神经网
第十五章 规则学习
FixMatch: Simplifying Semi-Supervised Le
数据挖掘Java——Kmeans算法的实现
大脑皮层的分割方法
【翻译】GPT-3是如何工作的
论文笔记:TEACHTEXT: CrossModal Generaliz
python从零学(六)
详解Python 3.x 导入(import)
【答读者问27】backtrader不支持最新版本的
上一篇文章      下一篇文章      查看所有文章
加:2021-09-24 10:34:02  更:2021-09-24 10:36:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/11 16:52:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码