IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark实现item2Vec算法-附scala代码 -> 正文阅读

[大数据]spark实现item2Vec算法-附scala代码

本文记录了使用spark实现item2vec算法的相关内容,欢迎做相关工作的同学与我联系zhaoliang19960421@outlook.com

/**
* 本代码以做脱敏处理,与原公司、原业务无关,特此声明
/
package *

import *.SparkContextUtils.createSparkSession
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.ml.linalg._

/**
 * @author zhaoliang6 on 20220406
 *         基于word2vec算法构造item2vec
 *         生成item向量,用户侧使用average pooling 构造user向量
 */
object Item2Vec {
    def main(args: Array[String]): Unit = {
        val Array(locale: String, startDate: String, endDate: String) = args
        val sparkSession: SparkSession = createSparkSession(this.getClass.getSimpleName)
        val userItemSeqDf = getUserItemSeq(sparkSession, startDate, endDate)
        val model = getWord2VecModel(userItemSeqDf, "usage_seq", "vector")
        val itemVec = getItemVec(model)
        val userVec = getUserVec(sparkSession, userItemSeqDf, itemVec)

    }

    /**
     * 给定的item下的最相似的前topN个结果
     */
    def getItemSim(model: Word2VecModel, item: String, topN: Int): Unit = {
        try {
            println(s"$item 最相似的前${topN}个结果是:")
            model.findSynonyms(item, topN).show(truncate = false)
        } catch {
            case ex: Exception => println(s"$item 不存在")
        }
    }

    /**
     * 将用户序列下的item求平均得到用户向量
     */
    def getUserVec(sparkSession: SparkSession, orgDf: DataFrame, itemVec: DataFrame): DataFrame = {
        val arrayDefaultVec = new Array[Double](200)

        def itemVecAagPoolingUDF(map: scala.collection.Map[String, Array[Double]]): UserDefinedFunction = udf((seq: mutable.WrappedArray[String]) => {
            val res = ArrayBuffer[Array[Double]]()
            res.appendAll(seq.map(map.getOrElse(_, arrayDefaultVec)))
            val tmp: (Array[Double], Int) = res.map(e => (e, 1)).reduce((x, y) => {
                (x._1.zip(y._1).map(a => a._1 + a._2), x._2 + y._2)
            })
            if (tmp._2 > 0) tmp._1.map(e => e / tmp._2)
            else arrayDefaultVec
        })

        val itemVecBC = sparkSession.sparkContext.broadcast(itemVec.rdd.map(r => (r.getString(0), r.getSeq[Double](1).toArray)).collectAsMap())
        val userVecDf = orgDf
            .withColumn("vector", itemVecAagPoolingUDF(itemVecBC.value)(col("usage_seq")))
            .select("gaid", "vector")
        userVecDf
    }

    /**
     * 基于w2v 得到item向量
     */
    def getItemVec(model: Word2VecModel): DataFrame = {
        def vector2ArrayUDF(): UserDefinedFunction = udf((vec: Vector) => {
            val norm = Vectors.norm(vec, 2)
            vec.toArray.map(e => if (norm != 0) e / norm else 0.0)
        })

        val itemVec = model.getVectors
            .select(col("word").as("pkg"), col("vector").as("org_vector"))
            .withColumn("vectorArray", vector2ArrayUDF()(col("vector")))
            .selectExpr("word as item", "vectorArray")
        itemVec
    }

    /**
     * 获得user-itemSeq
     */
    def getUserItemSeq(sparkSession: SparkSession, startDate: String, endDate: String): DataFrame = {
        def getSeqUDF(): UserDefinedFunction = udf((seq: mutable.WrappedArray[GenericRowWithSchema]) => {
            val listSeq = ArrayBuffer[String]()
            seq.sortBy(e => e.getAs[Long]("timestamp"))
            var pkg = seq.head.getAs[String]("pkg")
            var open = seq.head.getAs[Long]("timestamp")
            var dura = seq.head.getAs[Double]("duration")
            listSeq.append(pkg)
            seq.drop(0).foreach(e => {
                val tmp_pkg = e.getAs[String]("pkg")
                val tmp_open = e.getAs[Long]("timestamp")
                val tmp_dura = e.getAs[Double]("duration")
                if (!tmp_pkg.equals(pkg) || (tmp_pkg.equals(pkg) && ((tmp_open - open) / 1000 - dura > 10)))
                    listSeq.append(tmp_pkg)
                pkg = tmp_pkg
                open = tmp_open
                dura = tmp_dura
            })
            listSeq
        })

        val dfAppUsage = sparkSession.read.parquet("hdfs://***")
            .where(s"date between $startDate and $endDate")
            .groupBy("gaid")
            .agg(collect_list(struct("pkg", "timestamp", "duration")).as("seq"))
            .withColumn("usage_seq", getSeqUDF()(col("seq")))
            .withColumn("seq_len", size(col("usage_seq")))
            .where("seq_len > 10") // 最短的路径
            .selectExpr("gaid", "usage_seq")
        dfAppUsage
    }

    /**
     * 获得word2vec模型
     */
    def getWord2VecModel(orgDf: DataFrame, inputCol: String, outputCol: String): Word2VecModel = {
        val model: Word2VecModel = new Word2Vec()
            .setInputCol(inputCol)
            .setOutputCol(outputCol)
            .setSeed(1024)
            .setMaxIter(10)
            .setMinCount(5)
            .setVectorSize(200)
            .setWindowSize(5)
            .setNumPartitions(1000)
            .setMaxSentenceLength(100)
            .fit(orgDf)
        model
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-07 22:46:53  更:2022-04-07 22:47:26 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 14:05:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码