IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink之双流Join原理解析 -> 正文阅读

[大数据]Flink之双流Join原理解析

? ? ? ? 之前研究了SparkSQL中Join的原理,这次来研究下Flink中的双流Join的原理。

? ? Flink中的Join分为Window?Join?和?Interval join两种。前者是将数据缓存在Window中,然后再进行Join,所以感觉本质上其实和SparkSQL中的Join一样,算是个离线数据Join;Interval join不太一样,是一条流中的每个元素和另外一条流某个时间范围区间的所有元素进行Join;详细的分析我们后续马上就会讲到。Flink双流Join详细完整的demo可见参考中的链接,那位博主的Demo写的非常好。

Window Join:

????????Flink中的Window Join支持Tumbling Window Join、Sliding Window Join和Session Window Join这三种方式。以Tumbling Window Join为例,Join时如下图所示:

? ? ?其他Join图示可见官网,参考中已给出链接。Window Join具体有两种实现的API:Join 和 coGroup。Join API只能实现Inner Join,如果要实现 Left Join、Right Join 、Full?Outer Join等操作,得使用coGroup API,后续可以看到Join底层也是基于coGroup实现的,Join只会输出Key能够匹配上的两侧数据,coGroup如果没有找到匹配的Key,单侧数据也会输出。Join代码示例如下:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

? ? Join和coGroup操作其实是将两条流中的数据进行Shuffle,将相同key的数据划分到一起,然后调用用户编写的Function对数据进行操作。我们来看下Join的底层实现,可以看到,Join底层最终还是调用coGroup?API来实现的:

? ? ?JoinCoGroupFunction内部会对两条流中Join上的Key的数据进行遍历,使用二重for循环调用用户编写的函数对数据进行处理,然后输出到下游:

? ? coGroup底层的具体实现是给两条流分别打上标记,然后将两条流union成一条流,接一个keyBy将相同Key的数据划分到一起,接着调用CoGroupWindowFunction对数据进行处理:

? ? CoGroupWindowFunction内部会对数据进行分类,讲相同Key,不同流的数据拆分到两个不同的集合里面,然后调用coGroup()方法,这个方法会调用到外部用户自己实现的方法,从而实现Left Join等功能:

Interval Join:

? ? 该种Join方式是流种的一条数据和另外一条流种一段时间范围内的数据进行Join(inner ,left outer, right outer , full outer 等皆可),如下图所示:

? ? 其实这张图画的不全或者说意思没有解释到位,黄色的整条流种如果来了一条数据,那么就去绿色的那条流中找[?timestamp - lower bound ,?timestamp + upper bound ] 这段时间范围内去找对应的数据,找到了就进行Join。如果是绿色流中来了一条数据,对应的会去黄色这条流中找?[?timestamp - upper bound ,?timestamp + lower?bound ] 这段时间范围内的数据,找到了就进行Join。

代码示例如下:

stream.keyBy(<KeySelector>)
? ? ? .intervalJoin(otherStream.keyBy(<KeySelector>))
? ? ? .between(Time.seconds(-10), Time.seconds(10)) // 上下20s之内的数据
? ? ? .upperBoundExclusive() // 排除上界临界点那一批数据
? ? ? .process(new ProcessJoinFunction<>())

底层实现时是将两条流先connect合并成一条,然后调用IntervalJoinOperator方法开始处理数据:

? ? IntervalJoinOperator方法初始化的时候会初始化两个MapState,分别存放左右流中的数据,而Key就是timestamp。迟到的数据会被丢弃,不迟到的数据则会被缓存,缓存好了之后,接下来就是Join的操作了:

? ? 左右流的数据到达的时候,会分别调用不同的processElement()方法来处理数据,左流调用的是processElement1(),右流调用的是processElement2(),注意倒数第二和第三个参数,就是左右流来了一条数据之后,怎么去对方的流中找可以对应的元素:

@Override
public void processElement1(StreamRecord<T1> record) throws Exception {
   processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}


@Override
public void processElement2(StreamRecord<T2> record) throws Exception {
   processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}

processElement()内部具体实现如下:

private <THIS, OTHER> void processElement(
      final StreamRecord<THIS> record,
      final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
      final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
      final long relativeLowerBound,
      final long relativeUpperBound,
      final boolean isLeft) throws Exception {

   final THIS ourValue = record.getValue();
   final long ourTimestamp = record.getTimestamp();

   if (ourTimestamp == Long.MIN_VALUE) {
      throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
            "interval stream joins need to have timestamps meaningful timestamps.");
   }
   // 丢弃迟到的数据
   if (isLate(ourTimestamp)) {
      return;
   }
    // 数据加入缓存
   addToBuffer(ourBuffer, ourValue, ourTimestamp);
   // 从对方流中获取相应时间范围内的数据,然后进行Join
   for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
      final long timestamp  = bucket.getKey();
      // 不是对应时间范围内的数据就不进行Join了
      if (timestamp < ourTimestamp + relativeLowerBound ||
            timestamp > ourTimestamp + relativeUpperBound) {
         continue;
      }

      for (BufferEntry<OTHER> entry: bucket.getValue()) {
         if (isLeft) {
?           // collect()方法中会调用用户自定义的userFunction(),真正进行Join操作
            collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
         } else {
            collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
         }
      }
   }
   // 时间已致,清理无效的缓存数据。 以上面的图为例,如果水印时间完全超过那片黄色覆盖的范围,那么黄色时间范围内的数据就可以清理掉了
   long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
   if (isLeft) {
      internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
   } else {
      internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
   }
}

? ? 至此,IntervalJoin就完成了。 哈哈哈,以上理论上的分析,生产环境还没有真正实践过,如果有不对的地方,欢迎各位大佬帮忙指正哈。

参考:

????????https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/joining/(Flink Join?官方文档介绍)

????????https://zhuanlan.zhihu.com/p/340560908(Flink Join Demo)

????????https://blog.csdn.net/lmalds/article/details/55259718(Flink中的allowedLateness)

? ? ? ??https://www.jianshu.com/p/11b482394c73(Flink Interval Join说明)

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-26 12:10:28  更:2021-08-26 12:10:30 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:48:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码