| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Flink之双流Join原理解析 -> 正文阅读 |
|
[大数据]Flink之双流Join原理解析 |
? ? ? ? 之前研究了SparkSQL中Join的原理,这次来研究下Flink中的双流Join的原理。 ? ? Flink中的Join分为Window?Join?和?Interval join两种。前者是将数据缓存在Window中,然后再进行Join,所以感觉本质上其实和SparkSQL中的Join一样,算是个离线数据Join;Interval join不太一样,是一条流中的每个元素和另外一条流某个时间范围区间的所有元素进行Join;详细的分析我们后续马上就会讲到。Flink双流Join详细完整的demo可见参考中的链接,那位博主的Demo写的非常好。 Window Join:????????Flink中的Window Join支持Tumbling Window Join、Sliding Window Join和Session Window Join这三种方式。以Tumbling Window Join为例,Join时如下图所示: ? ? ?其他Join图示可见官网,参考中已给出链接。Window Join具体有两种实现的API:Join 和 coGroup。Join API只能实现Inner Join,如果要实现 Left Join、Right Join 、Full?Outer Join等操作,得使用coGroup API,后续可以看到Join底层也是基于coGroup实现的,Join只会输出Key能够匹配上的两侧数据,coGroup如果没有找到匹配的Key,单侧数据也会输出。Join代码示例如下:
? ? Join和coGroup操作其实是将两条流中的数据进行Shuffle,将相同key的数据划分到一起,然后调用用户编写的Function对数据进行操作。我们来看下Join的底层实现,可以看到,Join底层最终还是调用coGroup?API来实现的: ? ? ?JoinCoGroupFunction内部会对两条流中Join上的Key的数据进行遍历,使用二重for循环调用用户编写的函数对数据进行处理,然后输出到下游: ? ? coGroup底层的具体实现是给两条流分别打上标记,然后将两条流union成一条流,接一个keyBy将相同Key的数据划分到一起,接着调用CoGroupWindowFunction对数据进行处理: ? ? CoGroupWindowFunction内部会对数据进行分类,讲相同Key,不同流的数据拆分到两个不同的集合里面,然后调用coGroup()方法,这个方法会调用到外部用户自己实现的方法,从而实现Left Join等功能: Interval Join:? ? 该种Join方式是流种的一条数据和另外一条流种一段时间范围内的数据进行Join(inner ,left outer, right outer , full outer 等皆可),如下图所示: ? ? 其实这张图画的不全或者说意思没有解释到位,黄色的整条流种如果来了一条数据,那么就去绿色的那条流中找[?timestamp - lower bound ,?timestamp + upper bound ] 这段时间范围内去找对应的数据,找到了就进行Join。如果是绿色流中来了一条数据,对应的会去黄色这条流中找?[?timestamp - upper bound ,?timestamp + lower?bound ] 这段时间范围内的数据,找到了就进行Join。 代码示例如下:
底层实现时是将两条流先connect合并成一条,然后调用IntervalJoinOperator方法开始处理数据: ? ? IntervalJoinOperator方法初始化的时候会初始化两个MapState,分别存放左右流中的数据,而Key就是timestamp。迟到的数据会被丢弃,不迟到的数据则会被缓存,缓存好了之后,接下来就是Join的操作了:
processElement()内部具体实现如下: private <THIS, OTHER> void processElement( final StreamRecord<THIS> record, final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer, final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception { final THIS ourValue = record.getValue(); final long ourTimestamp = record.getTimestamp(); if (ourTimestamp == Long.MIN_VALUE) { throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " + "interval stream joins need to have timestamps meaningful timestamps."); } // 丢弃迟到的数据 if (isLate(ourTimestamp)) { return; } // 数据加入缓存 addToBuffer(ourBuffer, ourValue, ourTimestamp); // 从对方流中获取相应时间范围内的数据,然后进行Join for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) { final long timestamp = bucket.getKey(); // 不是对应时间范围内的数据就不进行Join了 if (timestamp < ourTimestamp + relativeLowerBound || timestamp > ourTimestamp + relativeUpperBound) { continue; } for (BufferEntry<OTHER> entry: bucket.getValue()) { if (isLeft) { ? // collect()方法中会调用用户自定义的userFunction(),真正进行Join操作 collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); } else { collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); } } } // 时间已致,清理无效的缓存数据。 以上面的图为例,如果水印时间完全超过那片黄色覆盖的范围,那么黄色时间范围内的数据就可以清理掉了 long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; if (isLeft) { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); } } ? ? 至此,IntervalJoin就完成了。 哈哈哈,以上理论上的分析,生产环境还没有真正实践过,如果有不对的地方,欢迎各位大佬帮忙指正哈。 参考: ????????https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/joining/(Flink Join?官方文档介绍) ????????https://zhuanlan.zhihu.com/p/340560908(Flink Join Demo) ????????https://blog.csdn.net/lmalds/article/details/55259718(Flink中的allowedLateness) ? ? ? ??https://www.jianshu.com/p/11b482394c73(Flink Interval Join说明) |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 | -2024/11/23 13:29:20- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |