IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 用户画像实践(一) -> 正文阅读

[大数据]用户画像实践(一)

项目背景架构

数据源:数据源通过sqoop同步到hbase中。
标签存储:hbase:用于存储标签。mysql:存储标签元数据。es:标签检索。
标签开发:采用spark引擎进行开发。

一、数据导入
将数据同步至hbase中。有以下几种方式。1、sqoop直接迁移。2将数据迁移至hdfs中,然后通过hbase提供的importtsv工具将数据导入至hbase中,这里也有两种方式,默认的方式是直接put的形式,还有一种是bulkload的方式,将文件转换成hfile后再加载到hbase中。

二、标签开发
1、开发一个工具类 提供两个方法(1)向hbase写入数据。(2)读取hbase中的数据

object HbaseTools{

	def read(spark:SparkSession,zkHost:String,zkPort:String,table:String,family:String,fields:Seq[String]):DataFrame={

	val sc:SparkContext=spark.sparkContext
	//1、设置hbase配置信息 zk地址和端口号
	val conf=HbaseConfiguration.create()
	conf.set("hbase.zookeeper.quorum",zkHost)
	conf.set("hbase.zookeeper.property.clientPort",zkPort)
	
	//2、设置表的名称
	conf.set(TableInputFormat.Input_Table,table)

	//3、设置读取的列簇和列名称
	val scan:Scan=new Scan()
	//4、设置列簇
	val cfBytes:Array[Byte]=Bytes.toBytes(family)
	scan.addFamily(cfBytes)
	//5、设置列名称
	fields.foreach{
		field=>
		scan.addColumn(cfBytes,Bytes.toBytes(filed))
	}
	//6、设置scan过滤数据
	conf.set(TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))

	val hbaseRdd=sc.newApiHadoopRDD(conf,classOf(TableInputFormat),classOf(ImmutableBytesWritable),classOf(Result))
)

//将rdd转化成dataframe
	val rowsRdd=hbaseRdd.map{case (_,result)=>

	val values:Seq[String]=fields.map{field=>
		val value=result.getValue(cfBytes,Bytes.toBytes(field))
		Bytes.toString(value)
		}
	//将values转换成row
	Row.fromSeq(values)
	}

//自定义schema
	val schema:StructType=StructType(fields.map{filed=>
	StructField(filed,StringType,nullable=true)
	})

	//返回df
	spark.createDataFrame(rowsRdd,schema)
}



	def write(dataframe:DataFrame,zkHost:String,zkPort:String,table:String,family:String,rowkeyColumn:String):Unit={

	val conf=HbaseConfiguration.create()
	conf.set("hbase.zookeeper.quorum",zkHost)
	conf.set("hbase.zookeeper.property.clientPort",zkPort)
	
	//2、设置表的名称
	conf.set(TableOutputFormat.Output_Table,table)
	
	//将datafram转换为rdd
	val cfbytes=Bytes.toBytes(family)
	val columns:Array[String]=datatrame.columns
	val datasrdd=dataframe.rdd.map{row=>
		val rowkey=row.getAs[String](rowkeyColumn)
		val rkBytes:Array[Byte]=Bytes.toBytes(rowkey)
	//构建put对象
		val put=new Put(rkBytes)
		columns.foreach{column=>
				val value=Bytes.toBytes(row.getAs[String](column))
				put.addColumn(cfbytes,Bytes.toBytes(column),value))
				
			}
	
		(new ImmutableBytesWriteable(rkBytes,put))
		
		}

	datasrdd.saveAsNewApiHadoopFile(
	"输出路径",
	classOf(ImmutableBytesWriteable),
	classOf(Put),
	classOf(TableOutputFormat),
	conf
)
	
	}





}

2、开发第一个标签用户性别(核心代码)

//1、获取sparksession对象

//2、根据标签id从mysql中读取标签数据
//依据标签id获取对应mysql中标签元数据
val tableTage:String="(select id,name,rule,level from tbl_basic_tag where id =318
union 
select id,name,rule,level from tbl_basic_tag where pid=318) AS tag_table  "

val basicTageDf=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://主机名:3306/?useUnicode=true&characterEncoding=UTF-8")
.option("dbtable",tagTable)
.option("user","root")
.option("password","123456")
.load()


//3、解析rule,从hbase中读取业务数据
val tagRule:String=basicTageDf.filter($"level"===4).head().getAs[String]("rule")
//解析rule,存入map中
val tagRuleMap=tagRule.split("\\n")
.map{line=>
	val array(key,value)=line.trim.split("=")
	(key,value)
}.toMap

//判断数据源
if("hbase".equals(tagRuleMap("inType").toLowerCase)){
	val hbaseMeta=HbaseMeta.getMetaData(tagRuleMap)//这里自己封装一个hebasemeta类 写一个伴生对象 这里省略
	businessDf=HbaseTools.read(spark,hbaseMeta.zkHosts,hbaseMeta.zkPort,hbaseMeta.hbaseTable,hbaseMeta.FieldName.split(","))
}
else{
	System.exit(-1)
}


//获取属性标签
val attrTagRuleDf=basicTageDf.filter($"level"===5)
		   .select($"rule",$"name")

//将属性标签与业务数据进行关联
val modelDf=businessDf.join(attrTagRuleDf,businessDf("gender")===attrTagRuleDf("rule"),inner)
.select( $"id".as ("userId"),$"name".as("gender"))
		   



//4、结果写入hbase
HbaseTools.write(modelDf,"主机名","2181","tbl_profile","user","userId")

//结束
spark.stop()

当我们开发第二个标签的时候,我们会发现在进行规则匹配的过程中会产生很多重复的代码,我们可以将规则匹配这个过程封装成一个工具类(TagTools)并且设计一个标签模板。

工具类:

object TagTools{

	//将dataframe转换成map 为了后续使用广播变量避免join
	def convertMap(tagDf:DataFrame):Map(String,String){
	import tagDf.sparkSession.implicits._
	tagDf.filter($"level"===5)
	.select($"rule",$"name")
	.as[(String,String)]
	.rdd
	.collectAsMap
	.toMap
	}
	
	//打标签,使用广播变量
	def ruleMatchTag(dataframe:DataFrame,field:String,tagDf:DataFrame):dataFrame{
	val spark=dataframe.sparkSession
	import spark.implicits._
	//获取规则map
	val attrTagRuleMap:Map[String,String]=convertMap(tagDf)
	//将map集合数据封装成广播变量
	val attrTagRuleMapBroadCast =spark.sparkContext.broadcast(attrTagRuleMap:Map)
	//自定义UDF函数打标签
	val field_to_tag=udf(
			(field:String)=>attrTagRuleMapBroadCast.value(field)
		)
	}
	//计算标签
	val modelDf:DataFrame=dataframe
	.select($"id".as("userId"),filed_to_tag(col(field).as(field)))
	
	modelDf
	
}

标签模板:

采用模板设计模式,创建一个标签模型开发基类

trait BasicModel extends Logging{

	//变量声明
	val spark:SparkSession = _
	//1.初始化,构建sparksession对象
	def init():Unit={
		val sparkConf=new SparkConf()
		.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
		.setMaster("local[*]")
		.set("spark.sql.shuffle.partitions","4")
		.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
		.rigisterKryoClasses(
		Array(classOf[ImmutableBytesWritable],classOf[Result],classOf[Put])
		spark=SparkSession.builder()
		.config(sparkConf)
		//启用与hive集成
		.enableHiveSupport()
		.config("hive.metastore.uris","thrift://bigdata-cdh01.itcast.cn:9083")
		.config("spark.sql.warehouse.dir","xxxx")
		.getOrCreate()
		)
	}
	//2.获取标签数据
	def getTagData(tagId:Long):dataFrame={
		
		val tagTable=s"(select id,name,rule,level from tbl_basci_tag
						where id=$tagId 
						union
						select id,name,rule,level from tbl_basci_tag where pid=$tagId) as tag_table
		
					"
		val basicTagDf=spark.read
			.format("jdbc")
			.option("driver","com.mysql.jdbc.Driver")
			.option("url","数据库地址")
			.option("dbtable",tagTable)
			.option("user","root")
			.option("password","123456")
			.load()
			
			basicTagDf
			
	}
	//3.获取业务数据
	def getBusinessData(tagDf:DataFrame):dataFrame={
		
			import tagDf.sparkSession.implicits._
			//获取业务标签规则
			val tagRule=tagDf.filter($"level"===4)
							 .head()
							 .getAs[String]("rule")
			//解析rule封装成map			
			val tagRuleMap=tagRule.split("\\n")
						   .map(line=>
						   val array(key,value)=line.trim.split("=")
						   (key,value).toMap
						   				 
						   )
			//判断数据源,读取业务数据 
			val businessDf=null
			if("hbase".equals(tagRuleMap("inType").toLowerCase)){
			val hbaseMeta=HbaseMeta.getMetaData(tagRuleMap)
				businessDf=HbaseTools.read(spark,hbaseMeta.zkHosts,hbaseMeta.zkPort,hbaseMeta.hbaseTable,hbaseMeta.FieldName.split(","))
			}
			else{
				System.exit(-1)
			}
				businessDf
		}
			
			
			
	
	//4.打标签
	def doTag(getBusinessDf:DataFrame,tagDf:DataFrame):DataFrame={
	}
	//5.标签结果保存至hbase
	def saveTag(modelDf:DataFrame):Unit={
	
		HbaseTools.write(modelDf,"主机名","2181","tbl_profile","user","userId")

	}
	//6、关闭资源
	def close():Unit={
		if(null!=spark){
			spark.stop()
		}
	}
	//执行顺序
	def executeModel(tagId:Long){
	init()
	try{
		//获取标签数据
		val tagDf:DataFrame=getTagData(tagId){
		tagDf.persist(StorageLevel.MEMORY_AND_DISK)
		tagDf.count()
		
		//获取业务数据
		val businessDf=getBusinessData(tagDf)
		//计算标签
		val modelDf=doTag(businessDf,tagDf)
		//保存标签
		saveTag(modelDf)
		tagDf.unpersist()
		}catch{
		}
		finally{
		close()
		}
		
				
			
		}
	}
	
	}
}

开发政治面貌标签

//用户的政治面貌标签
class PoliticalModel extends BasicModel{
	
	
	override def doTag(businessDf:DataFrame,tagDf:DataFrame):DataFrame={
		//调用工具类,计算标签
		val modelDf=Tagtools.ruleMatchTag(businessDf,"politicalface",tagDf)
		modelDf
		
	}
}

object PoliticalModel{
	def main(agrs:Array[String]):Unit={
		//创建标签模型
		val tagModel=new PoliticalModel()
		tagModel.execute(328l)
	}
}

通过以上代码可以发现代码当中存在很多硬编码的部分,所以我们需要将以上代码进行重构

1、抽象出一个配置文件config.properties,将集群等配置信息写在配置文件里

2、重构sparksession
将是否是本地模式、是否继承hive也写在配置文件config.properties当中,
将spark应用参数放在单独的另一个配置文件当中spark.config(这里注意.config文件里的value值需要写双引号,.properties不需要)

//通过以下方式遍历配置文件的属性,利用typesafe的config库
val config=ConfigFactory.load("spark.conf")
for(entry->config.entrySet.asScala){
	
	val resource=entry.getValue.origin().resource()
	//判断文件的来源
	if("spark.conf".equals(resource)){
		
		//获取key
		entry.getKey()
		//获取value
		entry.getValue()
	
		
	}
}

重构标签基类

为什么要重构标签基类?我们发现给到一个标签类,我们好像并不知道这个标签类是规则匹配类型还是统计类型标签。

public enum ModelType{
	MATCH,//规则匹配
	ML,//机器学习类
	STATISTICS//统计类
}


abstract class AbstractModel(modelName:String,ModelType:String)extends Logging{

	//将之前写的BasicModel类所有代码拷贝到此处即可
	
}

自定义外部数据源

什么意思?举个例子,自定义外部数据源可以让我们使用
spark.format(“hbase”)
.option(“xxxx”)
.option(“xxxx”)
.load()
的方式进行读取或者写入数据,大家知道,默认的sparksql是提供mysql、csv、json等格式,默认是没有hbase的,我们需要自定义外部数据源。

自定义数据源的方法如下:我们要实现两个类,然后分别实现对应的接口。

在这里插入图片描述

class HbaseRelation extends BaseRelation with TableScan with InsertableRelation with Serializable{
	
	//相当于sparksession
	override def sqlContext:SqlContext=
	
	//schema信息
	override def schema:StructType=
	
	//加载数据
	override def buildScan(): RDD[ROW]=
	
	//写入数据
	override def insert(data:DataFrame,override:Boolean):Unit=


}




class DefaultSource extends RelationProvider with CreatableRelationProvider{

	override def createRelation(sqlContext:SQLContext,
	parameters:Map[String,String]
	):BaseRelation


	override def createRelation (sqlContext:SQLContext,
	mode:SaveMode,
	parameters:Map[String,String]
	data:DataFrame
	):BaseRelation
}

我们只需要将之前的工具类读取、写入hbase的方法修改修改,添加到HbaseRelation中即可。

那么做完以上操作还需要注册数据源,让我们可以用spark.format(“hbase”)的形式进行操作,如果不注册数据源,format里面我们只能写(“HbaseRelation的完整类路径”)

那么怎么进行注册呢?
1、需要让我们的DefaultSource类继承DataSourceRegister接口,实现shortName方法。
2、在项目resource目录下创建META-INF/services 二级目录,在目录下创建一个文件,文件全名称就是datasourceregister这个类的全名称(文本文件),文件里拷贝defaultsouce类的全限定名。

统计类标签开发

年龄段标签开发
步骤
一、在标签平台进行注册
新建四级标签(年龄段)
新建五级标签(50后、60后、。。。。20后)

二、代码开发

class AgeRangeModel extends AbstractModel("年龄段标签"ModelType.STATISTICS){

	override def doTag(businessDf:DataFrame,tagDf:DataFrame):DataFrame={
		import businessDf.sparkSession.implicits._
		import org.apache.spark.sql.functions._
		
		//自定义udf函数,解析tagDf的rule 将“1990.1.1-1999.12.31”解析为二元组
		val rule_to_tuple=udf(
			
			(rule:String)=>{
				val Array(start,end)=rule.split("-").map(_.toInt)
				(start,end)

			}

		)
	}
	//针对属性标签中的规则rule使用udf函数提取start和end
	attrDf=tagDf.filter($"level"===5)
	.select($"name",rule_to_tuple($"rule").as("rules"))
	.select($"name",$"rules._1".as("start"),$"rules._2".as("end"))

	//使用业务数据与属性标签规则数据进行join关联
	val dataModel=bussinessDf
	.select($"id",regexp_replace($"birthday","-","").cast(IntegerType).as("bornDate"))
	.join(attDf)
	.where($"bornDate".beetween($"start",$"end"))
	.select($"id".as("userId"),$"name".as("ageRange"))
	
	dataModel
	
}

object AgeRangeModel {

	def main():Unit{
		val tagModel=new AgeRangeModel().execute(338l)
		
	}

}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-04 15:39:43  更:2022-03-04 15:43:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 9:54:17-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码