IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink状态计算TTL -> 正文阅读

[大数据]Flink状态计算TTL

在Flink开发过程当中经常会遇到类似:统计当一个用户从进入商品页面到下单的时间,一个用户从编辑到保存的时间,

分析

这种问题大多是通过前端打点来获取用户行为,这就会造成有很多用户点击了商品或者编辑了,但是最终并没有购买或者保存,所以这时候我们就要有一个对点击商品或者编辑的开始打点数据设置一个过期时间,最开始想的是用redis来存储开始的行为信息,在redis对数据设置过期时间,理论上是没什么问题,但是考虑一般能不依赖外部系统就不依赖的原则,另外不确定redis能否扛住非常大的QPS,而且每天的数据量很大,过期时间要求是24小时,所以存在redis代价也有点儿大,所以也就找了其他方式的实现

这种问题一般第一时间想到的是用CEP来解决,我这两种都有做了实现,今天主要分享一下Flink基于状态的实现

flink本身支持的状态类型有以下几种:

  • ValueState<T>: :保存一个可以更新和检索的值(上述,每个值都到当前输入数据的关键,因此算子接收到的关键都可能对应一个值)。这个值可以通过update(T)更新,通过T value()进行检索。

  • ListState<T>:保存一个元素的列表可以往这个列表中追加数据,并在当前的列表上进行检索可以通过。?add(T)或者addAll(List<T>)进行添加元素,通过Iterable<T> get()获得整个列表还可以通过。update(List<T>)覆盖当前的列表。

  • ReducingState<T>:保存一个单值,显示添加到状态的所有值的聚合。接口与ListState类似,但使用add(T)增加的元素,会使用提供的ReduceFunction进行聚合。

  • AggregatingState<IN, OUT>:保留一个单值,显示添加到状态的所有值的聚合。和ReducingState相反的是,聚合类型可能与添加到状态的元素的类型不同。 接口与ListState类似,但使用add(IN)添加的元素会指定AggregateFunction进行聚合。

  • MapState<UK, UV>: 你维护了一个映射列表。可以添加对到状态的映射,也可以创建当前所有映射的替代映射。使用put(UK,UV)或者putAll(Map<UK,UV>)添加映射。使用get(UK)检索特定的键。使用entries()keys()values()分别搜索映射、键和值的可取你也可以通过isEmpty()来判断是否有任何键值对。

模拟数据:

第一列代表id,第二列代表动作,第三列代表时间戳

("a", "add", 1000),
("b", "add", 1000),
("a", "add", 2000),
("a", "save", 3000),
("c", "save", 2000),
("c", "add", 3000),
("b", "save", 2000)

代码:

自定义Flatmap函数

class MyFlatMap extends RichFlatMapFunction[(String, String, Int),(String, Int)]{

  var mapState:MapState[String,Int] = _

  override def open(parameters: Configuration): Unit = {
    val ttlConfig = StateTtlConfig
      .newBuilder(Time.seconds(5)) //TTl过期时间
      .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//仅在创建和写入时更新
      .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//不返回过期数据
      .build
    val my = new MapStateDescriptor[String, Int]("my", createTypeInformation[String], createTypeInformation[Int])
    my.enableTimeToLive(ttlConfig)
    mapState = getRuntimeContext.getMapState(
      my
    )
  }

  override def flatMap(in: (String, String, Int), collector: Collector[(String, Int)]): Unit = {
    //如果是动作的开始即向mapState添加
    if (in._2.equals("add")){
      mapState.put(in._1,in._3)
    //如果是动作的结束即去mapState获取是否存在相同id的开始动作,如果有则进行时间减法并清除mapState中当前id的元素
    }else if(in._2.equals("save")){
      if(mapState.contains(in._1)){
        collector.collect(in._1,in._3 - mapState.get(in._1))
        mapState.remove(in._1)
      }
    }
  }
}

主函数:

    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    val lines: DataStream[(String, String, Int)] = env.fromCollection(List(
      ("a", "add", 1000),
      ("b", "add", 1000),
      ("a", "add", 2000),
      ("a", "save", 3000),
      ("c", "save", 2000),
      ("c", "add", 3000),
      ("b", "save", 2000)
    ))

//  主要用来测试TTL超时是否真的将数据从mapState删除
//    val lines: DataStream[(String, String, Int)] = env.socketTextStream("localhost",9999).map(line =>{
//      val strings = line.split(",")
//      (strings(0),strings(1),strings(2).toInt)
//    })
    val keyed: KeyedStream[(String, String, Int), String] = lines.keyBy(x=>x._1)

    val result: DataStream[(String, Int)] = keyed.flatMap(new MyFlatMap)

    result.print()



    env.execute()

输出结果:

3> (a,1000)
1> (b,1000)

总结:

其实就是在自定义的flatmap内定义了一个带TTL的map,当连续数据的开始数据到来时即向这个map内插入数据,待连续数据的结束数据到来时,就去这个map里查看相同id的数据数据是否存在(不存在可能是已经超过TTL设置的时间或者延迟数据,数据乱序,1.11版本目前官网说的是只支持处理时间的TTL),存在即做时间减法得到点击商品到购买或者编辑到保存的时间,然后用collector收集并返回。

参考资料:Flink官网:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/state.html

欢迎大佬指教

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-30 12:48:25  更:2021-07-30 12:50:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 10:04:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码