系列文章目录
- 初识推荐系统——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一)
- 利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二)
- 项目主要效果展示——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(三)
- 项目体系架构设计——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(四)
- 基础环境搭建——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(五)
- 创建项目并初始化业务数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(六)
- ……
项目资源下载
- 电影推荐系统网站项目源码Github地址(可Fork可Clone)
- 电影推荐系统网站项目源码Gitee地址(可Fork可Clone)
- 电影推荐系统网站项目源码压缩包下载(直接使用)
- 电影推荐系统网站项目源码所需全部工具合集打包下载(spark、kafka、flume、tomcat、azkaban、elasticsearch、zookeeper)
- 电影推荐系统网站项目源数据(可直接使用)
- 电影推荐系统网站项目个人原创论文
- 电影推荐系统网站项目前端代码
- 电影推荐系统网站项目前端css代码
前言
??今天给大家带来的博文是关于代码项目的初始化以及整个项目所需数据的初始化,其中包括,在
I
D
E
A
IDEA
IDEA中创建
m
a
v
e
n
maven
maven项目、数据加载准备、数据初始化到
M
o
n
g
o
D
B
MongoDB
MongoDB、数据初始化到
E
l
a
s
t
i
c
S
e
a
r
c
h
ElasticSearch
ElasticSearch等内容,通过这篇博文我们就可以把整个项目的框架搭建起来了。另外有一点很重要,关于代码的内容大家要注意可能和我的命名不同,当然允许不同,但是要注意修改相关的位置,相信能做到这里的读者应该都是有一定基础的,但还是要提醒一下,需要注意。当然,读者还是要有Scala和Maven的基础。下面就开始今天的学习吧!
一、在
I
D
E
A
IDEA
IDEA中创建
M
a
v
e
n
Maven
Maven项目
??项目主体用
S
c
a
l
a
Scala
Scala编写,采用
I
D
E
A
IDEA
IDEA作为开发环境进行项目编写,采用
M
a
v
e
n
Maven
Maven作为项目构建和管理工具 ??首先打开
I
D
E
A
IDEA
IDEA,创建一个
M
a
v
e
n
Maven
Maven项目,命名为MovieRecommendSystem。为了方便后期的联调,会把业务系统的代码也添加进来,所以可以以MovieRecommendSystem作为父项目,并在其下建一个名为recommender的子项目,然后再在下面搭建多个子项目用于提供不同的推荐服务。
1.1 项目框架搭建
??在MovieRecommendSystem的pom.xml文件中加入元素<packaging>pom</packaging>,然后新建一个maven module。子项目的第一步是初始化业务数据,所以子项目命名为DataLoader ??父项目只是为了规范化项目结构,方便依赖管理,本身是不需要代码实现的,所以MovieRecommendSystem和recommender下的src文件夹都可以删掉 ??目前的整体项目框架如下:
1.2 声明项目中工具的版本信息
??整个项目需要用到多个工具,它们的不同版本可能会对程序运行造成影响,所以应该在最外层的MovieRecommendSystem中声明所有子项目共用的版本信息。在MovieRecommendSystem/pom.xml中加入以下配置:
<properties>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.22</slf4j.version>
<mongodb-spark.version>2.0.0</mongodb-spark.version>
<casbah.version>3.1.1</casbah.version>
<elasticsearch-spark.version>5.6.2</elasticsearch-spark.version>
<elasticsearch.version>5.6.2</elasticsearch.version>
<redis.version>2.9.0</redis.version>
<kafka.version>0.10.2.1</kafka.version>
<spark.version>2.1.1</spark.version>
<scala.version>2.11.8</scala.version>
<jblas.version>1.2.1</jblas.version>
</properties>
1.3 添加项目依赖
??首先,对于整个项目而言,应该有同样的日志管理,在MovieRecommendSystem中引入公有依赖:
<dependencies>
<!—引入共同的日志管理工具 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
??同样,对于maven项目的构建,可以引入公有的插件:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManageme>
</build>
??然后,在MovieRecommendSystem/recommender/ pom.xml模块中,可以为所有的推荐模块声明spark相关依赖(这里的dependencyManagement表示仅声明相关信息,子项目如果依赖需要自行导入)
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
??由于各推荐模块都是scala代码,还应该引入scala-maven-plugin插件,用于scala程序的编译。因为插件已经在父项目中声明,所以这里不需要再声明版本和具体配置:
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
??对于具体的DataLoader子项目,需要spark相关组件,还需要mongodb的相关依赖,在MovieRecommendSystem/recommender/DataLoader/pom.xml文件中引入所有依赖(在父项目中已声明的不需要再加详细信息):
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>${elasticsearch-spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
??至此,做数据加载需要的依赖都已配置好,可以开始写代码了
二、数据加载准备
??在src/main/目录下,可以看到已有的默认源文件目录是java,可以将其改名为scala。将数据文件movies.csv、ratings.csv、tags.csv复制到资源文件目录src/main/resources下,将从这里读取数据并加载到Mongodb和Elasticsearch中
2.1
M
o
v
i
e
s
Movies
Movies数据集
??数据格式如下:
mid,name,descri,timelong,issue,shoot,language,genres,actors,directors
??例子如下:
1^Toy Story (1995)^ ^81 minutes^March 20, 2001^1995^English ^Adventure|Animation|Children|Comedy|Fantasy ^Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn ^John Lasseter
??Movie数据集有10个字段,每个字段之间通过“^”符号进行分割
字段名 | 字段类型 | 字段描述 | 字段备注 |
---|
mid | Int | 电影的ID | | name | String | 电影的名称 | | descri | String | 电影的描述 | | timelong | String | 电影的时长 | | shoot | String | 电影拍摄时间 | | issue | String | 电影发布时间 | | language | Array[String] | 电影语言 | 每一项用“|”分割 | genres | Array[String] | 电影所属类别 | 每一项用“|”分割 | director | Array[String] | 电影的导演 | 每一项用“|”分割 | actors | Array[String] | 电影的演员 | 每一项用“|”分割 |
2.2
R
a
t
i
n
g
s
Ratings
Ratings数据集
??数据格式如下:
userId,movieId,rating,timestamp
??例子如下:
1,31,2.5,1260759144
??Rating数据集有4个字段,每个字段之间通过“,”符号进行分割
字段名 | 字段类型 | 字段描述 | 字段备注 |
---|
uid | Int | 用户的ID | | mid | Int | 电影的ID | | score | Double | 电影的分值 | | timestamp | Long | 评分的时间 | |
2.3
T
a
g
Tag
Tag数据集
??数据格式如下:
userId,movieId,tag,timestamp
??例子如下:
1,31,action,1260759144
??Tag数据集有4个字段,每个字段之间通过“,”符号进行分割
字段名 | 字段类型 | 字段描述 | 字段备注 |
---|
uid | Int | 用户的ID | | mid | Int | 电影的ID | | tag | String | 电影的标签 | | timestamp | Long | 评分的时间 | |
2.4 日志管理配置文件
??log4j对日志的管理,需要通过配置文件来生效。在src/main/resources下新建配置文件log4j.properties,写入以下内容:
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
三、数据初始化到
M
o
n
g
o
D
B
MongoDB
MongoDB
3.1 启动
M
o
n
g
o
D
B
MongoDB
MongoDB数据库
[bigdata@linux mongodb]$ bin/mongod -config data/mongodb.conf
3.2 数据加载程序主体实现
??为原始数据定义几个样例类,通过SparkContext的textFile方法从文件中读取数据,并转换成DataFrame,再利用Spark SQL提供的write方法进行数据的分布式插入 ??在DataLoader/src/main/scala下新建package,命名为com.IronmanJay.recommender,新建名为DataLoader的scala class文件,也就是:DataLoader/src/main/scala/com.IronmanJay.recommerder/DataLoader.scala ??程序主体代码如下:
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
shoot: String, language: String, genres: String, actors: String, directors: String)
case class Rating(uid: Int, mid: Int, score: Double, timestamp: Int)
case class Tag(uid: Int, mid: Int, tag: String, timestamp: Int)
case class MongoConfig(uri: String, db: String)
case class ESConfig(httpHosts: String, transportHosts: String, index: String, clustername: String)
object DataLoader {
val MOVIE_DATA_PATH = "D:\\Software\\MovieRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\movies.csv"
val RATING_DATA_PATH = "D:\\Software\\MovieRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\ratings.csv"
val TAG_DATA_PATH = "D:\\Software\\MovieRecommendSystem\\recommender\\DataLoader\\src\\main\\resources\\tags.csv"
val MONGODB_MOVIE_COLLECTION = "Movie"
val MONGODB_RATING_COLLECTION = "Rating"
val MONGODB_TAG_COLLECTION = "Tag"
val ES_MOVIE_INDEX = "Movie"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://linux:27017/recommender",
"mongo.db" -> "recommender",
"es.httpHosts" -> "linux:9200",
"es.transportHosts" -> "linux:9300",
"es.index" -> "recommender",
"es.cluster.name" -> "es-cluster"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val movieRDD = spark.sparkContext.textFile(MOVIE_DATA_PATH)
val movieDF = movieRDD.map(
item => {
val attr = item.split("\\^")
Movie(attr(0).toInt, attr(1).trim, attr(2).trim, attr(3).trim, attr(4).trim, attr(5).trim, attr(6).trim, attr(7).trim, attr(8).trim, attr(9).trim)
}
).toDF()
val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
val ratingDF = ratingRDD.map(item => {
val attr = item.split(",")
Rating(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
}).toDF()
val tagRDD = spark.sparkContext.textFile(TAG_DATA_PATH)
val tagDF = tagRDD.map(item => {
val attr = item.split(",")
Tag(attr(0).toInt, attr(1).toInt, attr(2).trim, attr(3).toInt)
}).toDF()
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
storeDataInMongoDB(movieDF, ratingDF, tagDF)
import org.apache.spark.sql.functions._
val newTag = tagDF.groupBy($"mid")
.agg(concat_ws("|", collect_set($"tag")).as("tags"))
.select("mid", "tags")
val movieWithTagsDF = movieDF.join(newTag, Seq("mid"), "left")
implicit val esConfig = ESConfig(config("es.httpHosts"), config("es.transportHosts"), config("es.index"), config("es.cluster.name"))
storeDataInES(movieWithTagsDF)
spark.stop()
}
}
3.3 将数据写入
M
o
n
g
o
D
B
MongoDB
MongoDB
??接下来,实现storeDataInMongo方法,将数据写入Mongodb中:
def storeDataInMongoDB(movieDF: DataFrame, ratingDF: DataFrame, tagDF:
DataFrame)(implicit mongoConfig: MongoConfig): Unit = {
val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))
mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).dropCollection()
mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).dropCollection()
mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).dropCollection()
movieDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
ratingDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
tagDF.write
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_TAG_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
mongoClient.close()
}
四、数据初始化到
E
l
a
s
t
i
c
S
e
a
r
c
h
ElasticSearch
ElasticSearch
4.1 启动
E
l
a
s
t
i
c
S
e
a
r
c
h
ElasticSearch
ElasticSearch数据库
??这里有一个小坑,如果有的读者习惯使用root用户,那么启动ElasticSearch的时候要切换为非root用户,并且这个非root用户要被授权,关闭的时候也如此,否则会出现打不开或者其他问题的现象。启动ElasticSearch的命令如下:
[bigdata@linux elasticsearch-5.6.2]$ ./bin/elasticsearch -d
4.2 将数据写入
E
l
a
s
t
i
c
S
e
a
r
c
h
ElasticSearch
ElasticSearch
??与上节类似,同样主要通过Spark SQL提供的write方法进行数据的分布式插入,实现storeDataInES方法:
def storeDataInES(movieDF: DataFrame)(implicit eSConfig: ESConfig): Unit
= {
val settings: Settings = Settings.builder().put("cluster.name", eSConfig.clustername).build()
val esClient = new PreBuiltTransportClient(settings)
val REGEX_HOST_PORT = "(.+):(\\d+)".r
eSConfig.transportHosts.split(",").foreach {
case REGEX_HOST_PORT(host: String, port: String) => {
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port.toInt))
}
}
if (esClient.admin().indices().exists(new IndicesExistsRequest(eSConfig.index))
.actionGet()
.isExists
) {
esClient.admin().indices().delete(new DeleteIndexRequest(eSConfig.index))
}
esClient.admin().indices().create(new CreateIndexRequest(eSConfig.index))
movieDF.write
.option("es.nodes", eSConfig.httpHosts)
.option("es.http.timeout", "100m")
.option("es.mapping.id", "mid")
.option("es.nodes.wan.only", "true")
.mode("overwrite")
.format("org.elasticsearch.spark.sql")
.save(eSConfig.index + "/" + ES_MOVIE_INDEX)
}
总结
??最近有点忙,考研复试结束之后再到拟录取之后,一直在摆(bushi),所以把这个系列给搁置了,刚刚抽出空给这次补上了,后面我争取一周一篇(立一个小小的flag)。也请大家一起努力,加油,我在下篇博客等你!哦,对了,这篇博客因为有代码,所以有些小细节大家一定要注意,比如命名之类的,一定注意!
|