一 代码实现
0 开发主线
(1)动态建表
2 如果没有表,需要根据标签定义规则建立标签表。
每一个标签一个表
-
自动生成建表语句(表名、字段名、分区、格式、存储位置) 使用tag_info中的tagcode作为表名 -
字段类型:可以根据tag_valueType得到tag_value的值【文本,数字,浮点,日期】 -
分区:标签每天晚上进行计算,计算完成后会产生一批新的数据,可以理解为每日全量 -
压缩:文本格式,不采用压缩,因为以人和标签作为单位,且后续需要进行计算 create table tableName (uid string,tag_value tag_valueType)
comment tagName
partition by (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
location '/hdfs_path/dbName/tableName'
val tableName = tagInfo.tagCode.toLowerCase()
val tagValueType = tagInfo.tagValueType match {
case ConstCode.TAG_VALUE_TYPE_LONG => "bigint"
case ConstCode.TAG_VALUE_TYPE_DECIMAL => "decimal(16,2)"
case ConstCode.TAG_VALUE_TYPE_STRING => "string"
case ConstCode.TAG_VALUE_TYPE_DATE => "string"
}
val properties: Properties = MyPropertiesUtil.load("config.properties")
val hdfsPath: String = properties.getProperty("hdfs-store.path")
val dwDBName: String = properties.getProperty("data-warehouse.dbname")
val upDBName: String = properties.getProperty("user-profile.dbname")
val createTableSQL =
s"""
| create table if not exists $upDBName.$tableName (uid string,tag_value $tagValueType)
| comment '${tagInfo.tagName}'
| partitioned by (dt string)
| ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
| location '$hdfsPath/$upDBName/$tableName'
|""".stripMargin
println(createTableSQL)
sparkSession.sql(createTableSQL)
(2)插入数据
3 根据标签定义和规则查询数据仓库
4 把数据写入到对应的标签表中
-
核心:围绕task_info中的task_sql语句进行处理,查出来的数据为UFM,想插入的数据为男女未知,中间需要进行转换,使用case when select uid, case query_value
when 'F' then '女'
when 'xx' then ''
when 'xx' then '' end as tag_value
from ($sql)
val whenThenList: List[String] = taskTagRuleList.map {
taskTagRule => s" when '${taskTagRule.queryValue}' then '${taskTagRule.subTagValue}'"
}
val whenThenSQL: String = whenThenList.mkString(" ")
val selectSQL =
s"""
| select uid,
| case query_value
| $whenThenSQL end as tag_value
| from (${taskInfo.taskSql}) tv
|""".stripMargin
println(selectSQL)
sparkSession.sql(s"use $dwDBName")
val insertSQL = s"insert overwrite table $upDBName.$tableName partition (dt='$taskDate') $selectSQL"
println(insertSQL)
sparkSession.sql(insertSQL)
(3)完整语句
package com.hzy.userprofile.app
import java.util.Properties
import com.hzy.userprofile.bean.{TagInfo, TaskInfo, TaskTagRule}
import com.hzy.userprofile.constants.ConstCode
import com.hzy.userprofile.dao.{TagInfoDAO, TaskInfoDAO, TaskTagRuleDAO}
import com.hzy.userprofile.util.MyPropertiesUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object TaskSQLApp {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app").setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val taskId: String = args(0)
val taskDate: String = args(1)
val tagInfo: TagInfo = TagInfoDAO.getTagInfoByTaskId(taskId)
val taskInfo: TaskInfo = TaskInfoDAO.getTaskInfo(taskId)
val taskTagRuleList: List[TaskTagRule] = TaskTagRuleDAO.getTaskTagRuleListByTaskId(taskId)
println(tagInfo)
println(taskInfo)
println(taskTagRuleList)
val tableName = tagInfo.tagCode.toLowerCase()
val tagValueType = tagInfo.tagValueType match {
case ConstCode.TAG_VALUE_TYPE_LONG => "bigint"
case ConstCode.TAG_VALUE_TYPE_DECIMAL => "decimal(16,2)"
case ConstCode.TAG_VALUE_TYPE_STRING => "string"
case ConstCode.TAG_VALUE_TYPE_DATE => "string"
}
val properties: Properties = MyPropertiesUtil.load("config.properties")
val hdfsPath: String = properties.getProperty("hdfs-store.path")
val dwDBName: String = properties.getProperty("data-warehouse.dbname")
val upDBName: String = properties.getProperty("user-profile.dbname")
val createTableSQL =
s"""
| create table if not exists $upDBName.$tableName (uid string,tag_value $tagValueType)
| comment '${tagInfo.tagName}'
| partitioned by (dt string)
| ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
| location '$hdfsPath/$upDBName/$tableName'
|""".stripMargin
println(createTableSQL)
sparkSession.sql(createTableSQL)
val whenThenList: List[String] = taskTagRuleList.map {
taskTagRule => s" when '${taskTagRule.queryValue}' then '${taskTagRule.subTagValue}'"
}
val whenThenSQL: String = whenThenList.mkString(" ")
val selectSQL =
s"""
| select uid,
| case query_value
| $whenThenSQL end as tag_value
| from (${taskInfo.taskSql}) tv
|""".stripMargin
println(selectSQL)
sparkSession.sql(s"use $dwDBName")
val insertSQL = s"insert overwrite table $upDBName.$tableName partition (dt='$taskDate') $selectSQL"
println(insertSQL)
sparkSession.sql(insertSQL)
}
}
1 常量类
package com.hzy.userprofile.constants
object ConstCode {
val TAG_VALUE_TYPE_LONG="1"
val TAG_VALUE_TYPE_DECIMAL="2"
val TAG_VALUE_TYPE_STRING="3"
val TAG_VALUE_TYPE_DATE="4"
val TASK_PROCESS_SUCCESS="1"
val TASK_PROCESS_ERROR="2"
val TASK_STAGE_START="1"
val TASK_STAGE_RUNNING="2"
}
2 配置信息
在task-sql的resources下添加以下几条配置信息
(1)hive-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop101:3306/metastore?createDatabaseIfNotExist=true&characterEncoding=utf-8&useSSL=false</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>
(2)hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop101:9870</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop101:9868</value>
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
</property>
</configuration>
(3)log4j.properties
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
将task-common\src\main\resources\config.properties剪切到此目录下
原因:不能保证配置信息是公用的,所以在每个任务中单独存放配置信息
(4)config.properties
hdfs-store.path=hdfs://hadoop101:8020/user_profile
#数仓库
data-warehouse.dbname=gmall
#画像库
user-profile.dbname=user_profile1009
#mysql配置
mysql.url=jdbc:mysql://127.0.0.1:3306/user_profile_manager_1009?characterEncoding=utf-8&useSSL=false
mysql.username=root
mysql.password=root
3 本地调试
启动hadoop,hive
create database user_profile1009;
use user_profile1009;
执行程序会抛出以下异常
Caused by: MetaException(message:Got exception: org.apache.hadoop.security.AccessControlException Permission denied: user=ASUS, access=WRITE, inode=“/”:hzy:supergroup:drwxr-xr-x
原因:HDFS文件ower为‘hzy‘,而目前进行调试使用的是windows主机名
解决方案如下图:
其中控制台输出黑色的error可以忽略,运行成功后在hive中可以看到tg_base_persona_gender。
查询可看到统计数据。
4 发布运行
(1)环境配置
以上为在本地运行,现需要将程序打包到服务器,自动运行。
将上述代码注释掉,使用yarn方式部署
val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app")
在task-sql中的pom文件中,添加一些配置,目的:将所有没有标记成provided的依赖打到jar包中
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
将user-profile-task1009 工程 install
成功标志如下图:
完成后会出现tasksql-1.0-SNAPSHOT-jar-with-dependencies.jar
(2)标签定义
打开画像管理平台,添加年龄段标签:
- 上级标签:自然属性
- 上级标签编码:TG_BASE_PERSONA
- 标签编码:TG_BASE_PERSONA_AGEGROUP
- 标签名称:年龄段
- 标签类型:统计
- 标签值类型:文本
继续添加4级标签(60、70、80、90、00):
- 上级标签:年龄段
- 上级标签编码:TG_BASE_PERSONA_AGEGROUP
- 标签编码:TG_BASE_PERSONA_AGEGROUP_60
- 标签名称:60后
- 标签类型:统计
- 标签值类型:文本
在3级标签年龄段上添加任务:
-
启用任务 -
执行方式:SQL -
任务SQL:select id as uid,substr(birthday,3,1) as query_value from dim_user_info ui where dt='9999-99-99' -
任务参数: --driver-memory=1G
--num-executors=3
--executor-memory=2G
--executor-cores=2
--conf spark.default.parallelism=12
-
标签规则配置
- 6 60后
- 7 70后
- 8 80后
- 9 90后
- 0 00后
(3)上传jar包
进入流程任务管理 – 上传SQL通用任务jar包
- 任务主类:com.hzy.userprofile.app.TaskSQLApp
- jar包上传:上传tasksql-1.0-SNAPSHOT-jar-with-dependencies.jar
点击上传,在数据库tag_common_task中可以查看到相应数据,如下图
其中id对应 file_info表中的id,其中重要的是file path,去HDFS上查看是否有这个文件
5 启动调度
(1)远程提交器部署
上传所需文件到服务器,修改配置文件
配置文件中callback.http.url 为回调地址,用于更新状态,可以将任务提交的状态实时抓取,并发送给画像管理平台,如果使用的是虚拟机,此处选项直接填windows的虚拟地址即可,一般为192.168.XX.101。
如果使用阿里云服务器,服务器需要能够访问windows的80端口,可以使用内网穿透,这里以花生壳为例。
具体配置如下:
结果:
说明:外网可以通过随机外网域名访问到内网主机,相当于外网域名和内网主机做了一个映射,没有映射,阿里云无法访问到内网。
callback.type=https
callback.http.url=http://m23o108551.zicp.fun/callback/task-status
启动脚本
#! /bin/bash
case $1 in
"start"){
echo " --------启动 $i 远程提交器-------"
nohup java -jar spark-rest-submitter-0.0.3-SNAPSHOT.jar -conf ./application.properties >submitter.log 2>&1 &
};;
"stop"){
echo " --------停止 $i 远程提交器-------"
ps -ef | grep rest-submitter| grep -v grep |awk '{print $2}' | xargs -n1 kill -9
};;
esac
赋予执行权限,执行
./rest-submit.sh start
启动起来之后会出现一个进程 7701 jar
(2)任务调度
修改E:\develop\MyWork\19用户画像\代码\1\user_profile_manager_0224\src\main\resources\application.properties中的信息,如下,用于画像平台去找spark提交器的位置
hdfs.url=hdfs://hadoop101:8020
hdfs.username=hzy
hdfs.filedir=/user_profile_manage
spark.rest.submitter.url=http://hadoop101:8266/spark-submit
此时如果不手动调度,程序就会在夜里指定时间自动运行
点击画像平台【流程任务管理】的【手动调度任务】
业务日期,一般选择数仓最后一天的日期,也就是最新的时间,选择业务数据的时间:2020-06-15
在任务进程中可以查看到以下信息
执行完成,任务状态会变为FINISHED
查看yarn服务器结果
hive的user_profile1009库下会产生两张表
查看数据是否正常
|