点击流模型的生成plus(解决数据倾斜问题)
如果某个用户访问记录过多,则生成page view模型的时候,会产生数据倾斜,此时该怎样处理呢? 思路:
-
通过Spark UI找到产生倾斜的shuffle算子 -
分析产生倾斜的原因 -
对倾斜的key进行处理
-
对key增加随机数 -
增加并行度 -
单独处理倾斜的key -
rangepartioner+key转换处理 -
编写代码进行验证 本次倾斜解决具体思路 -
转换key:guid–>guid+time_local, -
利用sortbykey对guid+time_local进行全局排序 -
使用rangepartioner按照key的范围进行划分(保证每个分区数据大致均匀) -
第一次生成sessionid,使用累加器收集边界处的数据 -
处理边界处数据获取正确的sessionid 代码实现:
package com.yyds.service
import java.util
import java.util.UUID
import com.itheima.bean.{PageViewsBeanCase, WebLogBean}
import com.itheima.util.DateUtil
import org.apache.spark.RangePartitioner
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.CollectionAccumulator
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
class PageViewService {
}
object PageViewService {
def savePageViewToHdfs(filterStaticWeblogRdd: RDD[WebLogBean]) = {
val uidTimeRdd: RDD[(String, WebLogBean)] = filterStaticWeblogRdd.map(bean => (bean.guid + "&" + bean.time_local, bean))
val sortedUidTimeRdd: RDD[(String, WebLogBean)] = uidTimeRdd.sortByKey()
val rangeRdd: RDD[(String, WebLogBean)] = sortedUidTimeRdd.partitionBy(new RangePartitioner(100, sortedUidTimeRdd))
val spark: SparkSession = SparkSession.getDefaultSession.get
val headTailList: CollectionAccumulator[(String, String)] = spark.sparkContext.collectionAccumulator[(String, String)]("headTailList")
val questionSessionRdd: RDD[(WebLogBean, String, Int, Long)] = generateSessionid(rangeRdd, headTailList)
questionSessionRdd.cache()
questionSessionRdd.count()
val headTailListValue: util.List[(String, String)] = headTailList.value
questionSessionRdd.saveAsTextFile("/questionSessionRdd")
import collection.JavaConverters._
val buffer: mutable.Buffer[(String, String)] = headTailListValue.asScala
val map: mutable.HashMap[String, String] = mutable.HashMap(buffer.toMap.toSeq: _*)
val correctMap: mutable.HashMap[String, String] = processBoundaryMap(map)
val questionBroadCast: Broadcast[mutable.HashMap[String, String]] = spark.sparkContext.broadcast(correctMap)
val correctRdd: RDD[(WebLogBean, String, Int, Long)] =
repairBoundarySession(questionSessionRdd, questionBroadCast)
val pageviewRdd: RDD[PageViewsBeanCase] = correctRdd.map(
t => {
PageViewsBeanCase(
t._2, t._1.remote_addr, t._1.time_local, t._1.request, t._3, t._4,
t._1.http_referer, t._1.http_user_agent, t._1.body_bytes_sent, t._1.status, t._1.guid
)
}
)
pageviewRdd.saveAsTextFile("/pageviewrdd.txt")
}
def repairBoundarySession(uidTimeSessionStepLongRdd: RDD[( WebLogBean, String, Int, Long)],
questionBroadCast: Broadcast[mutable.HashMap[String, String]]) = {
val questionMap: mutable.HashMap[String, String] = questionBroadCast.value
val correctRdd: RDD[(WebLogBean, String, Int, Long)] = uidTimeSessionStepLongRdd.mapPartitionsWithIndex(
(index, iter) => {
var orginList = iter.toList
val firstLine: String = questionMap.getOrElse(index + "&first", "")
val lastLine: String = questionMap.getOrElse(index + "&last", "")
if (lastLine != "") {
val buffer: mutable.Buffer[(WebLogBean, String, Int, Long)] = orginList.toBuffer
val lastTuple: (WebLogBean, String, Int, Long) = buffer.remove(buffer.size - 1)
buffer += ((lastTuple._1, lastTuple._2, lastTuple._3, lastLine.toLong))
orginList = buffer.toList
}
if (firstLine != "") {
val firstArr: Array[String] = firstLine.split("&")
val tuples: List[(WebLogBean, String, Int, Long)] = orginList.map {
t => {
if (t._2.equals(firstArr(2))) {
(t._1, firstArr(0), firstArr(1).toInt + t._3.toInt, t._4)
} else {
t
}
}
}
orginList=tuples
}
orginList.iterator
}
)
correctRdd
}
def processBoundaryMap(map: mutable.HashMap[String, String]) = {
val correctMap: mutable.HashMap[String, String] = new mutable.HashMap[String, String]()
for (num <- 1 until (map.size / 2)) {
val numFirstMsg: String = map.get(num + "&first").get
val numLastMsg: String = map.get(num + "&last").get
val lastPartLastMsg: String = map.get((num - 1) + "&last").get
val numLastArr: Array[String] = numLastMsg.split("&")
val lastPartLastArr: Array[String] = lastPartLastMsg.split("&")
val numFirstArr: Array[String] = numFirstMsg.split("&")
if (lastPartLastArr(0).equals(numFirstArr(0))) {
val timediff = DateUtil.getTimeDiff(lastPartLastArr(1), numFirstArr(1))
if (timediff < 30 * 60 * 1000) {
correctMap.put((num - 1) + "&last", timediff.toString)
if (lastPartLastArr.size > 5) {
correctMap.put(num + "&first", lastPartLastArr(lastPartLastArr.size - 2) + "&"
+ lastPartLastArr(lastPartLastArr.size - 1) + "&" + numFirstArr(2))
} else {
correctMap.put(num + "&first", lastPartLastArr(2) + "&" + lastPartLastArr(3) + "&" + numFirstArr(2))
}
if (numFirstArr(2).equals(numLastArr(2))) {
if (lastPartLastArr.size > 5) {
map.put(num + "&last", numLastMsg + "&" + lastPartLastArr(lastPartLastArr.size - 2) + "&" +
(lastPartLastArr(lastPartLastArr.size - 1).toInt + numLastArr(4).toInt))
} else {
map.put(num + "&last", numLastMsg + "&" + lastPartLastArr(2) + "&" + (lastPartLastArr(3).toInt + numLastArr(4).toInt))
}
}
}
}
}
correctMap
}
def generateSessionid(rangeRdd: RDD[(String, WebLogBean)],
headTailList: CollectionAccumulator[(String, String)]) = {
val sessionidStepPageRdd: RDD[(WebLogBean, String, Int, Long)] = rangeRdd.mapPartitionsWithIndex {
(index, iter) => {
val list: List[(String, WebLogBean)] = iter.toList
val resultTupleList: ListBuffer[(WebLogBean, String, Int, Long)] = new ListBuffer[(WebLogBean, String, Int, Long)]()
var sessionid = UUID.randomUUID().toString
var step = 1
var pagestaylong: Long = 60000
import scala.util.control.Breaks._
breakable {
for (num <- 0 until (list.size)) {
val currentTuple: (String, WebLogBean) = list(num)
if (num == 0) {
headTailList.add((index + "&first", currentTuple._1 + "&" + sessionid))
}
if (list.size == 1) {
resultTupleList += ((currentTuple._2, sessionid, step, pagestaylong))
sessionid = UUID.randomUUID().toString
break()
}
breakable {
if (num == 0) {
break()
}
val lastTuple: (String, WebLogBean) = list(num - 1)
val currentUidTime: String = currentTuple._1
val lastUidTime: String = lastTuple._1
val currentUidTimeArr: Array[String] = currentUidTime.split("&")
val lastUidTimeArr: Array[String] = lastUidTime.split("&")
val timeDiff = DateUtil.getTimeDiff(lastUidTimeArr(1), currentUidTimeArr(1))
if (lastUidTimeArr(0).equals(currentUidTimeArr(0)) && timeDiff < 30 * 60 * 1000) {
resultTupleList += ((lastTuple._2, sessionid, step, timeDiff))
step += 1
} else {
resultTupleList += ((lastTuple._2, sessionid, step, pagestaylong))
sessionid = UUID.randomUUID().toString
step = 1
}
if (num == list.size - 1) {
resultTupleList += ((currentTuple._2, sessionid, step, pagestaylong))
headTailList.add((index + "&last", currentTuple._1 + "&" + sessionid + "&" + step + "&" + list.size))
sessionid = UUID.randomUUID().toString
}
}
}
}
resultTupleList.toIterator
}
}
sessionidStepPageRdd
}
}
|