项目背景架构
数据源:数据源通过sqoop同步到hbase中。 标签存储:hbase:用于存储标签。mysql:存储标签元数据。es:标签检索。 标签开发:采用spark引擎进行开发。
一、数据导入 将数据同步至hbase中。有以下几种方式。1、sqoop直接迁移。2将数据迁移至hdfs中,然后通过hbase提供的importtsv工具将数据导入至hbase中,这里也有两种方式,默认的方式是直接put的形式,还有一种是bulkload的方式,将文件转换成hfile后再加载到hbase中。
二、标签开发 1、开发一个工具类 提供两个方法(1)向hbase写入数据。(2)读取hbase中的数据
object HbaseTools{
def read(spark:SparkSession,zkHost:String,zkPort:String,table:String,family:String,fields:Seq[String]):DataFrame={
val sc:SparkContext=spark.sparkContext
val conf=HbaseConfiguration.create()
conf.set("hbase.zookeeper.quorum",zkHost)
conf.set("hbase.zookeeper.property.clientPort",zkPort)
conf.set(TableInputFormat.Input_Table,table)
val scan:Scan=new Scan()
val cfBytes:Array[Byte]=Bytes.toBytes(family)
scan.addFamily(cfBytes)
fields.foreach{
field=>
scan.addColumn(cfBytes,Bytes.toBytes(filed))
}
conf.set(TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))
val hbaseRdd=sc.newApiHadoopRDD(conf,classOf(TableInputFormat),classOf(ImmutableBytesWritable),classOf(Result))
)
val rowsRdd=hbaseRdd.map{case (_,result)=>
val values:Seq[String]=fields.map{field=>
val value=result.getValue(cfBytes,Bytes.toBytes(field))
Bytes.toString(value)
}
Row.fromSeq(values)
}
val schema:StructType=StructType(fields.map{filed=>
StructField(filed,StringType,nullable=true)
})
spark.createDataFrame(rowsRdd,schema)
}
def write(dataframe:DataFrame,zkHost:String,zkPort:String,table:String,family:String,rowkeyColumn:String):Unit={
val conf=HbaseConfiguration.create()
conf.set("hbase.zookeeper.quorum",zkHost)
conf.set("hbase.zookeeper.property.clientPort",zkPort)
conf.set(TableOutputFormat.Output_Table,table)
val cfbytes=Bytes.toBytes(family)
val columns:Array[String]=datatrame.columns
val datasrdd=dataframe.rdd.map{row=>
val rowkey=row.getAs[String](rowkeyColumn)
val rkBytes:Array[Byte]=Bytes.toBytes(rowkey)
val put=new Put(rkBytes)
columns.foreach{column=>
val value=Bytes.toBytes(row.getAs[String](column))
put.addColumn(cfbytes,Bytes.toBytes(column),value))
}
(new ImmutableBytesWriteable(rkBytes,put))
}
datasrdd.saveAsNewApiHadoopFile(
"输出路径",
classOf(ImmutableBytesWriteable),
classOf(Put),
classOf(TableOutputFormat),
conf
)
}
}
2、开发第一个标签用户性别(核心代码)
val tableTage:String="(select id,name,rule,level from tbl_basic_tag where id =318
union
select id,name,rule,level from tbl_basic_tag where pid=318) AS tag_table "
val basicTageDf=spark.read.format("jdbc").option("driver","com.mysql.jdbc.Driver").option("url","jdbc:mysql://主机名:3306/?useUnicode=true&characterEncoding=UTF-8")
.option("dbtable",tagTable)
.option("user","root")
.option("password","123456")
.load()
val tagRule:String=basicTageDf.filter($"level"===4).head().getAs[String]("rule")
val tagRuleMap=tagRule.split("\\n")
.map{line=>
val array(key,value)=line.trim.split("=")
(key,value)
}.toMap
if("hbase".equals(tagRuleMap("inType").toLowerCase)){
val hbaseMeta=HbaseMeta.getMetaData(tagRuleMap)
businessDf=HbaseTools.read(spark,hbaseMeta.zkHosts,hbaseMeta.zkPort,hbaseMeta.hbaseTable,hbaseMeta.FieldName.split(","))
}
else{
System.exit(-1)
}
val attrTagRuleDf=basicTageDf.filter($"level"===5)
.select($"rule",$"name")
val modelDf=businessDf.join(attrTagRuleDf,businessDf("gender")===attrTagRuleDf("rule"),inner)
.select( $"id".as ("userId"),$"name".as("gender"))
HbaseTools.write(modelDf,"主机名","2181","tbl_profile","user","userId")
spark.stop()
当我们开发第二个标签的时候,我们会发现在进行规则匹配的过程中会产生很多重复的代码,我们可以将规则匹配这个过程封装成一个工具类(TagTools)并且设计一个标签模板。
工具类:
object TagTools{
def convertMap(tagDf:DataFrame):Map(String,String){
import tagDf.sparkSession.implicits._
tagDf.filter($"level"===5)
.select($"rule",$"name")
.as[(String,String)]
.rdd
.collectAsMap
.toMap
}
def ruleMatchTag(dataframe:DataFrame,field:String,tagDf:DataFrame):dataFrame{
val spark=dataframe.sparkSession
import spark.implicits._
val attrTagRuleMap:Map[String,String]=convertMap(tagDf)
val attrTagRuleMapBroadCast =spark.sparkContext.broadcast(attrTagRuleMap:Map)
val field_to_tag=udf(
(field:String)=>attrTagRuleMapBroadCast.value(field)
)
}
val modelDf:DataFrame=dataframe
.select($"id".as("userId"),filed_to_tag(col(field).as(field)))
modelDf
}
标签模板:
采用模板设计模式,创建一个标签模型开发基类
trait BasicModel extends Logging{
val spark:SparkSession = _
def init():Unit={
val sparkConf=new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[*]")
.set("spark.sql.shuffle.partitions","4")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.rigisterKryoClasses(
Array(classOf[ImmutableBytesWritable],classOf[Result],classOf[Put])
spark=SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.config("hive.metastore.uris","thrift://bigdata-cdh01.itcast.cn:9083")
.config("spark.sql.warehouse.dir","xxxx")
.getOrCreate()
)
}
def getTagData(tagId:Long):dataFrame={
val tagTable=s"(select id,name,rule,level from tbl_basci_tag
where id=$tagId
union
select id,name,rule,level from tbl_basci_tag where pid=$tagId) as tag_table
"
val basicTagDf=spark.read
.format("jdbc")
.option("driver","com.mysql.jdbc.Driver")
.option("url","数据库地址")
.option("dbtable",tagTable)
.option("user","root")
.option("password","123456")
.load()
basicTagDf
}
def getBusinessData(tagDf:DataFrame):dataFrame={
import tagDf.sparkSession.implicits._
val tagRule=tagDf.filter($"level"===4)
.head()
.getAs[String]("rule")
val tagRuleMap=tagRule.split("\\n")
.map(line=>
val array(key,value)=line.trim.split("=")
(key,value).toMap
)
val businessDf=null
if("hbase".equals(tagRuleMap("inType").toLowerCase)){
val hbaseMeta=HbaseMeta.getMetaData(tagRuleMap)
businessDf=HbaseTools.read(spark,hbaseMeta.zkHosts,hbaseMeta.zkPort,hbaseMeta.hbaseTable,hbaseMeta.FieldName.split(","))
}
else{
System.exit(-1)
}
businessDf
}
def doTag(getBusinessDf:DataFrame,tagDf:DataFrame):DataFrame={
}
def saveTag(modelDf:DataFrame):Unit={
HbaseTools.write(modelDf,"主机名","2181","tbl_profile","user","userId")
}
def close():Unit={
if(null!=spark){
spark.stop()
}
}
def executeModel(tagId:Long){
init()
try{
val tagDf:DataFrame=getTagData(tagId){
tagDf.persist(StorageLevel.MEMORY_AND_DISK)
tagDf.count()
val businessDf=getBusinessData(tagDf)
val modelDf=doTag(businessDf,tagDf)
saveTag(modelDf)
tagDf.unpersist()
}catch{
}
finally{
close()
}
}
}
}
}
开发政治面貌标签
class PoliticalModel extends BasicModel{
override def doTag(businessDf:DataFrame,tagDf:DataFrame):DataFrame={
val modelDf=Tagtools.ruleMatchTag(businessDf,"politicalface",tagDf)
modelDf
}
}
object PoliticalModel{
def main(agrs:Array[String]):Unit={
val tagModel=new PoliticalModel()
tagModel.execute(328l)
}
}
通过以上代码可以发现代码当中存在很多硬编码的部分,所以我们需要将以上代码进行重构
1、抽象出一个配置文件config.properties,将集群等配置信息写在配置文件里
2、重构sparksession 将是否是本地模式、是否继承hive也写在配置文件config.properties当中, 将spark应用参数放在单独的另一个配置文件当中spark.config(这里注意.config文件里的value值需要写双引号,.properties不需要)
val config=ConfigFactory.load("spark.conf")
for(entry->config.entrySet.asScala){
val resource=entry.getValue.origin().resource()
if("spark.conf".equals(resource)){
entry.getKey()
entry.getValue()
}
}
重构标签基类
为什么要重构标签基类?我们发现给到一个标签类,我们好像并不知道这个标签类是规则匹配类型还是统计类型标签。
public enum ModelType{
MATCH,
ML,
STATISTICS
}
abstract class AbstractModel(modelName:String,ModelType:String)extends Logging{
}
自定义外部数据源
什么意思?举个例子,自定义外部数据源可以让我们使用 spark.format(“hbase”) .option(“xxxx”) .option(“xxxx”) .load() 的方式进行读取或者写入数据,大家知道,默认的sparksql是提供mysql、csv、json等格式,默认是没有hbase的,我们需要自定义外部数据源。
自定义数据源的方法如下:我们要实现两个类,然后分别实现对应的接口。
class HbaseRelation extends BaseRelation with TableScan with InsertableRelation with Serializable{
override def sqlContext:SqlContext=
override def schema:StructType=
override def buildScan(): RDD[ROW]=
override def insert(data:DataFrame,override:Boolean):Unit=
}
class DefaultSource extends RelationProvider with CreatableRelationProvider{
override def createRelation(sqlContext:SQLContext,
parameters:Map[String,String]
):BaseRelation
override def createRelation (sqlContext:SQLContext,
mode:SaveMode,
parameters:Map[String,String]
data:DataFrame
):BaseRelation
}
我们只需要将之前的工具类读取、写入hbase的方法修改修改,添加到HbaseRelation中即可。
那么做完以上操作还需要注册数据源,让我们可以用spark.format(“hbase”)的形式进行操作,如果不注册数据源,format里面我们只能写(“HbaseRelation的完整类路径”)
那么怎么进行注册呢? 1、需要让我们的DefaultSource类继承DataSourceRegister接口,实现shortName方法。 2、在项目resource目录下创建META-INF/services 二级目录,在目录下创建一个文件,文件全名称就是datasourceregister这个类的全名称(文本文件),文件里拷贝defaultsouce类的全限定名。
统计类标签开发
年龄段标签开发 步骤 一、在标签平台进行注册 新建四级标签(年龄段) 新建五级标签(50后、60后、。。。。20后)
二、代码开发
class AgeRangeModel extends AbstractModel("年龄段标签",ModelType.STATISTICS){
override def doTag(businessDf:DataFrame,tagDf:DataFrame):DataFrame={
import businessDf.sparkSession.implicits._
import org.apache.spark.sql.functions._
val rule_to_tuple=udf(
(rule:String)=>{
val Array(start,end)=rule.split("-").map(_.toInt)
(start,end)
}
)
}
attrDf=tagDf.filter($"level"===5)
.select($"name",rule_to_tuple($"rule").as("rules"))
.select($"name",$"rules._1".as("start"),$"rules._2".as("end"))
val dataModel=bussinessDf
.select($"id",regexp_replace($"birthday","-","").cast(IntegerType).as("bornDate"))
.join(attDf)
.where($"bornDate".beetween($"start",$"end"))
.select($"id".as("userId"),$"name".as("ageRange"))
dataModel
}
object AgeRangeModel {
def main():Unit{
val tagModel=new AgeRangeModel().execute(338l)
}
}
|