IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink实时项目电商用户行为分析(3)---实时流量统计之PV统计 -> 正文阅读

[大数据]flink实时项目电商用户行为分析(3)---实时流量统计之PV统计

1.知识点

  • scala样例类
  • 计算pv时的key的处理方式。

? ? ? ? 1)并行处理? (解决数据倾斜问题)

      先map(new MyMapper())//第二种方式:自定义mapper,生成一个随机生成的key

? ? ? ? ? ? ? 再以窗口结束时间做key分组求和? ? ? ? ? ??

? ? ? ? 2) 并行度为1时,结合timeWindowAll如何处理(容易数据倾斜)

  • MapFunction的的使用
  • AggregateFunction预聚合函数的使用
  • WindowFunction的使用
  • KeyedProcessFunction的使用

2.业务目标

3.流程心法

4.模块详解

4.1 创建输入输出样例类

4.2 主object实现

4.2.1 创建执行环境并添加数据源

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(6)

    // 从文件中读取数据,获取resources中的文件,相对路径
    val resource = getClass.getResource("/UserBehavior.csv")
    print(resource.getPath)
    val inputStream: DataStream[String] = env.readTextFile(resource.getPath)

4.2.2 Datastream map转换为输入样例类

  // 转换成样例类类型并提取时间戳和watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val arr = data.split(",")
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
      })   //No implicits found for parameter evidence$8:,需要引入 createTypeInformation,或者直接引入import org.apache.flink.streaming.api.scala._
      .assignAscendingTimestamps(_.timestamp * 1000L)

4.2.3 处理逻辑(1)----filter类型,自定义mapFunction

 val pvStream  = dataStream
      .filter(_.behavior == "pv")
//      .map( data => ("pv",1L))  //第一种方式:定义一个Pv字符串作为分组的哑key,所有数据被分到同一个组,跟timeWindowAll操作,相当于分发到同一个组里去,并行度就无意义了。同样如果有同一个商品是个热点数据,那么大量的同一个商品都会被分到同一个分区中,从而引发数据倾斜。那么如何规避数据倾斜呢?
      .map(new MyMapper())//第二种方式:自定义mapper,生成一个随机生成的key
      .keyBy(_._1)
      .timeWindow(Time.hours(1)) //1小时滚动窗口
      .aggregate(new PvCountAgg(),new PvCountWindowResult()) //报错原因,定义的输入是map中的data是(String,Long),但是"pv",1中的1是Int类型,把1改成1L即可

自定义Mapfunction


class MyMapper() extends MapFunction[UserBehavior,(String,Long)]{
  override def map(value: UserBehavior): (String, Long) = {
    ( Random.nextString(10), 1L )
  }
}

预聚合函数

// 自定义预聚合函数
class PvCountAgg() extends AggregateFunction[(String,Long),Long,Long]{
  override def createAccumulator(): Long = 0L

  override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a + b
}

? 自定义窗口函数

//自定义窗口函数
class PvCountWindowResult extends WindowFunction[Long,PvCount,String,TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
    out.collect(PvCount(window.getEnd,input.head))
  }
}

4.2.4?处理逻辑(2)---以窗口结束时间分组计算PV之和

自定义mapper之后,每个key没hash打散到不同分组,上一步输出的是每个组里的每个key的pv,那么如果要计算当前窗口的总pv,就要按照winEnd来重新keyBy
val totalPvStream =  pvStream
      .keyBy(_.windowEnd)
//     .sum("count" )   //此操作是当前来一条数据就加1,所以会滚动输出,但是我们要的结果是当前窗口收集齐之后再输出,如何来做?
                            // 自定义一个keyedProcessFunction自行处理吧,如何来判断数据都收集齐了呢。用一个定时器
     .process(new TotalPvCountResult())

    totalPvStream.print()

KeyedProcessFunction实现:

? ?每次求和放在一个ValueState中,每来一条数据就状态value+count值。那么什么时候判断收集完了呢。定义个计时器延迟1ms触发,触发后清空状态

class TotalPvCountResult extends KeyedProcessFunction[Long,PvCount,PvCount]{

  //求和就要把每次求和放在状态变量里,如何判断数据都到齐了,增量聚合的话定义一个状态

  lazy val totalPvCountResultState :ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-pv", classOf[Long]))

  override def processElement(i: PvCount, context: KeyedProcessFunction[Long, PvCount, PvCount]#Context, collector: Collector[PvCount]): Unit = {
    // 每来一个数据,将count值叠加在当前的状态上
    val currentTotalCount = totalPvCountResultState.value()
    totalPvCountResultState.update(currentTotalCount + i.count)  //上次输出的PVCOUNT中的count,加上当前状态中的值,再更新当前状态

    //注册定时器1ms后触发
    context.timerService().registerEventTimeTimer(i.windowEnd + 1)
//    collector.collect( )

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
    val totalPvCount = totalPvCountResultState.value()
    out.collect(PvCount(ctx.getCurrentKey, totalPvCount))  //获取ctx.getCurrentKey 当前key,和当前的状态值并且输出
    totalPvCountResultState.clear() //发送之后清空状态

  }
}

4.3 完整代码

package com.iqyi.bi.networkflow_analysis

import org.apache.flink.api.common.functions.{AggregateFunction, MapFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._

import scala.util.Random



// 定义输入数据样例类
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
// 定义输出pv统计的样例类
case class PvCount(windowEnd: Long, count: Long)




object PageView {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(6)

    // 从文件中读取数据,获取resources中的文件,相对路径
    val resource = getClass.getResource("/UserBehavior.csv")
    print(resource.getPath)
    val inputStream: DataStream[String] = env.readTextFile(resource.getPath)

    // 转换成样例类类型并提取时间戳和watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val arr = data.split(",")
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
      })   //No implicits found for parameter evidence$8:,需要引入 createTypeInformation,或者直接引入import org.apache.flink.streaming.api.scala._
      .assignAscendingTimestamps(_.timestamp * 1000L)

    val pvStream  = dataStream
      .filter(_.behavior == "pv")
//      .map( data => ("pv",1L))  //第一种方式:定义一个Pv字符串作为分组的哑key,所有数据被分到同一个组,跟timeWindowAll操作,相当于分发到同一个组里去,并行度就无意义了。同样如果有同一个商品是个热点数据,那么大量的同一个商品都会被分到同一个分区中,从而引发数据倾斜。那么如何规避数据倾斜呢?
      .map(new MyMapper())//第二种方式:自定义mapper,生成一个随机生成的key
      .keyBy(_._1)
      .timeWindow(Time.hours(1)) //1小时滚动窗口
      .aggregate(new PvCountAgg(),new PvCountWindowResult()) //报错原因,定义的输入是map中的data是(String,Long),但是"pv",1中的1是Int类型,把1改成1L即可


    //自定义mapper之后,每个key没hash打散到不同分组,上一步输出的是每个组里的每个key的pv,那么如果要计算当前窗口的总pv,就要按照winEnd来重新keyBy

   val totalPvStream =  pvStream
      .keyBy(_.windowEnd)
//     .sum("count" )   //此操作是当前来一条数据就加1,所以会滚动输出,但是我们要的结果是当前窗口收集齐之后再输出,如何来做?
                            // 自定义一个keyedProcessFunction自行处理吧,如何来判断数据都收集齐了呢。用一个定时器
     .process(new TotalPvCountResult())

    totalPvStream.print()

    env.execute("pv stream")
  }




}

// 自定义预聚合函数
class PvCountAgg() extends AggregateFunction[(String,Long),Long,Long]{
  override def createAccumulator(): Long = 0L

  override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a + b
}

//自定义窗口函数
class PvCountWindowResult extends WindowFunction[Long,PvCount,String,TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
    out.collect(PvCount(window.getEnd,input.head))
  }
}


class TotalPvCountResult extends KeyedProcessFunction[Long,PvCount,PvCount]{

  //求和就要把每次求和放在状态变量里,如何判断数据都到齐了,增量聚合的话定义一个状态

  lazy val totalPvCountResultState :ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-pv", classOf[Long]))

  override def processElement(i: PvCount, context: KeyedProcessFunction[Long, PvCount, PvCount]#Context, collector: Collector[PvCount]): Unit = {
    // 每来一个数据,将count值叠加在当前的状态上
    val currentTotalCount = totalPvCountResultState.value()
    totalPvCountResultState.update(currentTotalCount + i.count)  //上次输出的PVCOUNT中的count,加上当前状态中的值,再更新当前状态

    //注册定时器1ms后触发
    context.timerService().registerEventTimeTimer(i.windowEnd + 1)
//    collector.collect( )

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
    val totalPvCount = totalPvCountResultState.value()
    out.collect(PvCount(ctx.getCurrentKey, totalPvCount))  //获取ctx.getCurrentKey 当前key,和当前的状态值并且输出
    totalPvCountResultState.clear() //发送之后清空状态

  }
}


class MyMapper() extends MapFunction[UserBehavior,(String,Long)]{
  override def map(value: UserBehavior): (String, Long) = {
    ( Random.nextString(10), 1L )
  }
}


  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-30 12:48:25  更:2021-07-30 12:49:26 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 11:28:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码