IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【用户画像】标签任务开发流程(源码之动态建表、常量类、配置信息、本地调试、发布运行、任务调度) -> 正文阅读

[大数据]【用户画像】标签任务开发流程(源码之动态建表、常量类、配置信息、本地调试、发布运行、任务调度)

一 代码实现

0 开发主线

(1)动态建表

2 如果没有表,需要根据标签定义规则建立标签表。

每一个标签一个表

  • 自动生成建表语句(表名、字段名、分区、格式、存储位置)
    使用tag_info中的tagcode作为表名

  • 字段类型:可以根据tag_valueType得到tag_value的值【文本,数字,浮点,日期】

  • 分区:标签每天晚上进行计算,计算完成后会产生一批新的数据,可以理解为每日全量

  • 压缩:文本格式,不采用压缩,因为以人和标签作为单位,且后续需要进行计算

    create  table tableName (uid string,tag_value tag_valueType)
    comment tagName
    partition by (dt string)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
    location '/hdfs_path/dbName/tableName'
    
//2 如果没有表,需要根据标签定义规则建立标签表
    //    每一个标签一个表
    //    自动生成建表语句(表名、字段名、分区、格式、存储位置)
    //                   使用tag_info中的tagcode作为表名
    //    字段类型:可以根据tag_valueType得到tag_value的值【文本,数字,浮点,日期】
    //    分区:标签每天晚上进行计算,计算完成后会产生一批新的数据,可以理解为每日全量
    //    压缩:文本格式,不采用压缩,因为以人和标签作为单位,且后续需要进行计算
    //    create table tableName (uid string,tag_value tag_valueType)
    //    comment tagName
    //    partition by (dt string)
    //    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
    //    location '/hdfs_path/dbName/tableName'

    // 获取建表语句中的元素
    // 获取表名
    val tableName = tagInfo.tagCode.toLowerCase()

    // 获取tagValueType字段类型(1整数 2浮点 3文本 4日期)
    // 常量值一般不写在代码中,所以创建一个常量类,对应标题1
    val tagValueType = tagInfo.tagValueType match {
      case ConstCode.TAG_VALUE_TYPE_LONG => "bigint"
      case ConstCode.TAG_VALUE_TYPE_DECIMAL => "decimal(16,2)"
      case ConstCode.TAG_VALUE_TYPE_STRING => "string"
      case ConstCode.TAG_VALUE_TYPE_DATE => "string"
    }

    //config.properties文件中
    //hdfs-store.path=hdfs://hadoop101:8020/user_profile:存储位置
    //data-warehouse.dbname=gmall:数仓库
    //user-profile.dbname=user_profile2022:画像库
    val properties: Properties = MyPropertiesUtil.load("config.properties")
    val hdfsPath: String = properties.getProperty("hdfs-store.path")
    val dwDBName: String = properties.getProperty("data-warehouse.dbname")
    val upDBName: String = properties.getProperty("user-profile.dbname")

    // 创建sql
    val createTableSQL =
      s"""
         | create table if not exists $upDBName.$tableName (uid string,tag_value $tagValueType)
         | comment '${tagInfo.tagName}'
         | partitioned by (dt string)
         | ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
         | location '$hdfsPath/$upDBName/$tableName'
         |""".stripMargin

    println(createTableSQL)
    sparkSession.sql(createTableSQL)

(2)插入数据

3 根据标签定义和规则查询数据仓库

4 把数据写入到对应的标签表中

  • 核心:围绕task_info中的task_sql语句进行处理,查出来的数据为UFM,想插入的数据为男女未知,中间需要进行转换,使用case when

    select uid, case query_value
        when 'F' then '女'
        when 'xx' then ''
        when 'xx' then '' end as tag_value
    from ($sql)
    
//3 根据标签定义和规则查询数据仓库
    //
    // select uid, case query_value
    //      when 'F' then '女'
    //      when 'xx' then ''
    //      when 'xx' then '' end as tag_value
    //  from ($sql)
    // 动态生成case when语句
    // 3.1 先生成case when语句

    //将list中的数据转换成一个一个的when then
    //将TaskTagRule(1,7,1,F,8,男), TaskTagRule(2,7,1,M,9,女), TaskTagRule(3,7,1,U,10,未知)
    //转换为List( when F then 男,  when M then 女,  when U then 未知)
    val whenThenList: List[String] = taskTagRuleList.map {
      taskTagRule => s" when '${taskTagRule.queryValue}' then '${taskTagRule.subTagValue}'"
    }
    //将List转换为字符串,按照空格进行分割
    val whenThenSQL: String = whenThenList.mkString(" ")
    val selectSQL =
      s"""
         | select uid,
         |   case query_value
         |  $whenThenSQL end as tag_value
         | from (${taskInfo.taskSql}) tv
         |""".stripMargin
    println(selectSQL)

    //4 把数据写入到对应的标签表中
    //insert执行时,需要跨库操作,从数仓库写入到数仓库,需要添加库名
    //画像库的库名容易添加,但数仓的sql是直接传进来的,怎么解决?
    //可以定义sql在哪里执行
    sparkSession.sql(s"use $dwDBName")
    val insertSQL = s"insert overwrite table $upDBName.$tableName partition (dt='$taskDate') $selectSQL"
    println(insertSQL)
    sparkSession.sql(insertSQL)

(3)完整语句

package com.hzy.userprofile.app

import java.util.Properties

import com.hzy.userprofile.bean.{TagInfo, TaskInfo, TaskTagRule}
import com.hzy.userprofile.constants.ConstCode
import com.hzy.userprofile.dao.{TagInfoDAO, TaskInfoDAO, TaskTagRuleDAO}
import com.hzy.userprofile.util.MyPropertiesUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object TaskSQLApp {
  def main(args: Array[String]): Unit = {

    //0 添加执行环境
    val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app").setMaster("local[*]")
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()


    //1 获得标签定义、标签任务的SQL、字标签的匹配规则,存储在MySQL中
    val taskId: String = args(0)
    val taskDate: String = args(1)
    
    val tagInfo: TagInfo = TagInfoDAO.getTagInfoByTaskId(taskId)
    val taskInfo: TaskInfo = TaskInfoDAO.getTaskInfo(taskId)
    val taskTagRuleList: List[TaskTagRule] = TaskTagRuleDAO.getTaskTagRuleListByTaskId(taskId)

    println(tagInfo)
    println(taskInfo)
    println(taskTagRuleList)

    //2 如果没有表,需要根据标签定义规则建立标签表
    val tableName = tagInfo.tagCode.toLowerCase()
    
    val tagValueType = tagInfo.tagValueType match {
      case ConstCode.TAG_VALUE_TYPE_LONG => "bigint"
      case ConstCode.TAG_VALUE_TYPE_DECIMAL => "decimal(16,2)"
      case ConstCode.TAG_VALUE_TYPE_STRING => "string"
      case ConstCode.TAG_VALUE_TYPE_DATE => "string"
    }

    val properties: Properties = MyPropertiesUtil.load("config.properties")
    val hdfsPath: String = properties.getProperty("hdfs-store.path")
    val dwDBName: String = properties.getProperty("data-warehouse.dbname")
    val upDBName: String = properties.getProperty("user-profile.dbname")

    // 创建sql
    val createTableSQL =
      s"""
         | create table if not exists $upDBName.$tableName (uid string,tag_value $tagValueType)
         | comment '${tagInfo.tagName}'
         | partitioned by (dt string)
         | ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
         | location '$hdfsPath/$upDBName/$tableName'
         |""".stripMargin

    println(createTableSQL)
    sparkSession.sql(createTableSQL)


    //3 根据标签定义和规则查询数据仓库,生成case when语句,将list中的数据转换成一个一个的when then
    val whenThenList: List[String] = taskTagRuleList.map {
      taskTagRule => s" when '${taskTagRule.queryValue}' then '${taskTagRule.subTagValue}'"
    }
    val whenThenSQL: String = whenThenList.mkString(" ")
    val selectSQL =
      s"""
         | select uid,
         |   case query_value
         |  $whenThenSQL end as tag_value
         | from (${taskInfo.taskSql}) tv
         |""".stripMargin
    println(selectSQL)

    //4 把数据写入到对应的标签表中
    sparkSession.sql(s"use $dwDBName")
    val insertSQL = s"insert overwrite table $upDBName.$tableName partition (dt='$taskDate') $selectSQL"
    println(insertSQL)
    sparkSession.sql(insertSQL)
  }
}

1 常量类

在这里插入图片描述

package com.hzy.userprofile.constants

object ConstCode {
  val TAG_VALUE_TYPE_LONG="1"
  val TAG_VALUE_TYPE_DECIMAL="2"
  val TAG_VALUE_TYPE_STRING="3"
  val TAG_VALUE_TYPE_DATE="4"

  val TASK_PROCESS_SUCCESS="1"
  val TASK_PROCESS_ERROR="2"

  val TASK_STAGE_START="1"
  val TASK_STAGE_RUNNING="2"
}

2 配置信息

在task-sql的resources下添加以下几条配置信息

(1)hive-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hadoop101:3306/metastore?createDatabaseIfNotExist=true&amp;characterEncoding=utf-8&amp;useSSL=false</value>
        <description>JDBC connect string for a JDBC metastore</description>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
        <description>Driver class name for a JDBC metastore</description>
    </property>

    <!--> 填写mysql数据库账户密码<-->
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
        <description>username to use against metastore database</description>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
        <description>password to use against metastore database</description>
    </property>
    <property>
        <name>hive.cli.print.header</name>
        <value>true</value>
    </property>

    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>

</configuration>

(2)hdfs-site.xml

<configuration>

    <!-- nn web端访问地址-->
    <property>
        <name>dfs.namenode.http-address</name>
        <value>hadoop101:9870</value>
    </property>
    <!-- 2nn web端访问地址-->
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>hadoop101:9868</value>
    </property>
    <property>
        <name>dfs.client.use.datanode.hostname</name>
        <value>true</value>

    </property>

</configuration>

(3)log4j.properties

log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n

log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1

log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L)  :  %m%n

将task-common\src\main\resources\config.properties剪切到此目录下

原因:不能保证配置信息是公用的,所以在每个任务中单独存放配置信息

(4)config.properties

hdfs-store.path=hdfs://hadoop101:8020/user_profile
#数仓库
data-warehouse.dbname=gmall
#画像库
user-profile.dbname=user_profile1009

#mysql配置
mysql.url=jdbc:mysql://127.0.0.1:3306/user_profile_manager_1009?characterEncoding=utf-8&useSSL=false
mysql.username=root
mysql.password=root

3 本地调试

启动hadoop,hive

# 进入hive创建库
create database user_profile1009;
use user_profile1009;

执行程序会抛出以下异常

Caused by: MetaException(message:Got exception: org.apache.hadoop.security.AccessControlException Permission denied: user=ASUS, access=WRITE, inode=“/”:hzy:supergroup:drwxr-xr-x

原因:HDFS文件ower为‘hzy‘,而目前进行调试使用的是windows主机名

解决方案如下图:

在这里插入图片描述

其中控制台输出黑色的error可以忽略,运行成功后在hive中可以看到tg_base_persona_gender。

查询可看到统计数据。

4 发布运行

(1)环境配置

以上为在本地运行,现需要将程序打包到服务器,自动运行。

在这里插入图片描述

将上述代码注释掉,使用yarn方式部署

//0 添加执行环境
val sparkConf: SparkConf = new SparkConf().setAppName("task_sql_app")//.setMaster("local[*]")

在task-sql中的pom文件中,添加一些配置,目的:将所有没有标记成provided的依赖打到jar包中

<build>
    <plugins>
        <!-- 该插件用于将Scala代码编译成class文件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.4.6</version>
            <executions>
                <execution>
                    <!-- 声明绑定到maven的compile阶段 -->
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

将user-profile-task1009 工程 install

成功标志如下图:

在这里插入图片描述

完成后会出现tasksql-1.0-SNAPSHOT-jar-with-dependencies.jar

(2)标签定义

打开画像管理平台,添加年龄段标签:

  • 上级标签:自然属性
  • 上级标签编码:TG_BASE_PERSONA
  • 标签编码:TG_BASE_PERSONA_AGEGROUP
  • 标签名称:年龄段
  • 标签类型:统计
  • 标签值类型:文本

继续添加4级标签(60、70、80、90、00):

  • 上级标签:年龄段
  • 上级标签编码:TG_BASE_PERSONA_AGEGROUP
  • 标签编码:TG_BASE_PERSONA_AGEGROUP_60
  • 标签名称:60后
  • 标签类型:统计
  • 标签值类型:文本

在3级标签年龄段上添加任务:

  • 启用任务

  • 执行方式:SQL

  • 任务SQL:select id as uid,substr(birthday,3,1) as query_value from dim_user_info ui where dt='9999-99-99'

  • 任务参数:

    --driver-memory=1G 
    --num-executors=3 
    --executor-memory=2G 
    --executor-cores=2
    --conf spark.default.parallelism=12
    
  • 标签规则配置

    • 6 60后
    • 7 70后
    • 8 80后
    • 9 90后
    • 0 00后

(3)上传jar包

进入流程任务管理 – 上传SQL通用任务jar包

  • 任务主类:com.hzy.userprofile.app.TaskSQLApp
  • jar包上传:上传tasksql-1.0-SNAPSHOT-jar-with-dependencies.jar

点击上传,在数据库tag_common_task中可以查看到相应数据,如下图

在这里插入图片描述

其中id对应 file_info表中的id,其中重要的是file path,去HDFS上查看是否有这个文件

5 启动调度

(1)远程提交器部署

上传所需文件到服务器,修改配置文件

配置文件中callback.http.url 为回调地址,用于更新状态,可以将任务提交的状态实时抓取,并发送给画像管理平台,如果使用的是虚拟机,此处选项直接填windows的虚拟地址即可,一般为192.168.XX.101。

如果使用阿里云服务器,服务器需要能够访问windows的80端口,可以使用内网穿透,这里以花生壳为例。

具体配置如下:

在这里插入图片描述

结果:

在这里插入图片描述

说明:外网可以通过随机外网域名访问到内网主机,相当于外网域名和内网主机做了一个映射,没有映射,阿里云无法访问到内网。

callback.type=https
callback.http.url=http://m23o108551.zicp.fun/callback/task-status

启动脚本

#! /bin/bash

case $1 in
"start"){
                echo " --------启动 $i 远程提交器-------"
                nohup java -jar spark-rest-submitter-0.0.3-SNAPSHOT.jar -conf ./application.properties >submitter.log 2>&1 &
};;
"stop"){
                echo " --------停止 $i 远程提交器-------"
                ps -ef | grep rest-submitter| grep -v grep |awk  '{print $2}' | xargs -n1 kill -9

};;
esac

赋予执行权限,执行

./rest-submit.sh start

启动起来之后会出现一个进程 7701 jar

(2)任务调度

修改E:\develop\MyWork\19用户画像\代码\1\user_profile_manager_0224\src\main\resources\application.properties中的信息,如下,用于画像平台去找spark提交器的位置

# 画像平台上传到hdfs的文件地址和路径
hdfs.url=hdfs://hadoop101:8020
hdfs.username=hzy
hdfs.filedir=/user_profile_manage

# 提交远程服务的配置,之后部署远程spark提交器的地址
spark.rest.submitter.url=http://hadoop101:8266/spark-submit

此时如果不手动调度,程序就会在夜里指定时间自动运行

点击画像平台【流程任务管理】的【手动调度任务】

业务日期,一般选择数仓最后一天的日期,也就是最新的时间,选择业务数据的时间:2020-06-15

在任务进程中可以查看到以下信息

在这里插入图片描述

执行完成,任务状态会变为FINISHED

查看yarn服务器结果

在这里插入图片描述

hive的user_profile1009库下会产生两张表

在这里插入图片描述

查看数据是否正常

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-10-31 12:04:09  更:2022-10-31 12:07:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 17:49:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码