| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Flink 第8.8章 Flink FlinkSink 流式数据落盘 FileSink -> 正文阅读 |
|
[大数据]Flink 第8.8章 Flink FlinkSink 流式数据落盘 FileSink |
序Flink版本: 1.15.0 内容: FileSink?的使用方法及实现。 阅读时间:9~30min,取决于是否跟着看源码。word字数5k,csdn字数1w。 前言在 Flink 中,大多数场景都是 kafkaSource & kafkaSink,但不乏需要将数据写一部分到离线中的场景。 对此, Flink 提供了 StreamingFileSink 来处理 Steaming 写出到 FileSystem 接口的文件系统中,并且支持 Exactly-Once 语义。 基于 check point 来实现。并且很好兼容 Hive 等分区场景。今天主要讲讲 FileSink 的使用方法及实现。 FileSink 简单的示例比如一个实时流数据落盘到本地磁盘中: 通过 DataGeneratorSource 产生一个以 "|" 分割的数据源,其中第二项为模拟数据产生时间(watermark)。这里代码并未包含 checkpoint 操作 这样将这个数据推送到了我们本地磁盘中,落盘后的样例是这样的: 可以看到当我们并发度为 2 的时候,两个的前缀标识不同,并且分开写出文件,文夹名为:2022-05-31--09 文件命名规则:.part-<subtaskIndex>-<partFileIndex>.<status>.<UUID> 并且在输出的时候,是有数据到达就输出,而不是一个文件直接落盘。这里的文件都是 inprogress 状态,但其实已经写出完毕。但这是为什么? FileSink在 Flink 官网中,有详细介绍 FileSink 的内容: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/filesystem/ 其中特别说明了,如果使用 FileSink 在 STREAMING 模式的时候,必须开启 checkpoint,不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态,不能保证整个写入流程的安全性。 所以在我们上述的示例中,我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后: 写出的文件就会变为: 写出中: 写完毕后: 可以看到文件将 dot 去掉,并且将 inprogress 及后面的唯一标识也给去掉了。而且并不是说我们文件写完就进行 rename 操作,而是在 checkpoint 后才会将写完毕的文件进行 rename 操作。 其实整文件变化的过程为: In-progress :当前文件正在写入中 Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态 Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态 RollingPolicy写出时,我们可以指定数据 part 滚动的策略,如: 上面写了 1.15 之后的新 API 和之前的 API 的差异。 注意在 1.15之后的时候使用 FileSink 需要导入包: 之前不需要 根据 org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy 可以查看到上面主要的需要实现的方法。 OutputFileConfig我们的文件名还记得吗?.part-<subtaskIndex>-<partFileIndex>.<status>.<UUID> 写出完毕后变为:part-<subtaskIndex>-<partFileIndex> 但我们有时候还是想自定义文件名的嘛,有方法吗?有的,Flink 提供了一个 OutputFileConfig 来自定义前缀和后缀。 所以我们还是不能完全自定义输出的文件。 示例: .withOutputFileConfig(new?OutputFileConfig("ourPrefix","ourSuffix")) 就一行代码,然后变更后的效果: 里面包含了跟之前的对比,可以看到前缀 Flink 帮我们添加了拼接字符 "-",但是在后缀的时候缺没有添加,因为默认的 OutputFileConfig 未包含后缀。 后缀一般设置为文件名后缀,比如“.txt.gz” FileSinkBucket在业务上我们一般会要求数据写出格式(文件夹路径,文件名,还可能包含 dot 文件的 rename 操作)。 所以我们继续看看 Flink 给我们提供怎么样的手段进行配置,或者是否可以自己写一个 FlieSinkBucket。 对于什么是 Bucket,其实官网有给一个图: Bucket 就类似一个路径,通过路径进行 Bucket 的指定 在查看文档接口后,我们发现我们还能指定 Bucket,只需要实现接口 org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner 就能控制 Bucket 的命名方式。 BucketAssigner查看下 BucketAssigner 的结构: 跟以前一样,传入了 Context 提供给用户调用,并且需要用户实现两个方法:getBucketId 和 getSerializer。这里直接给一个示例: 将 element 中的时间戳转换成对应的 Hive 分区后的样式进行测试,得到一下的 bucket 样式: DateTimeBucketAssigner上面除了看文档以外还有什么地方可以了解?一般这种时候,我们可以看他原生默认实现的当时,对于 BucketAssigner 来说有两个,DateTimeBucketAssigner?和?BasePathBucketAssigner。 其中 BasePathBucketAssigner 都是空实现,方法都返回最基础的,所以我们就来看看 DateTimeBucketAssigner的实现方法。 从上面可以看到,DateTimeBucketAssigner 有很多构造方法,最重要的就是两个参数 formatString 和 zoneId。 formatString 决定了我们 bucket 的生成规则,默认为 yyyy-MM-dd--HH。 zoneId 决定时间域,默认为 ZoneId.systemDefault()。 通过构造函数都可以进行更改。 然后在 BucketAssigner 中,我们更希望看到的是 getBucketId() 方法的实现: 所以默认的 DateTimeBucketAssigner 是使用当前的系统处理时间来判断数据写入那个 bucket 中。与我们自定义使用的 element 中的数据的时间还是有很大出入的。但这样能减少同时写出的 bucket 的数量,避免一些其他问题。 Compaction在 1.15 之后为了快速滚动,并且避免小文件的操作,添加了 compact 功能,可以在 checkpoint 的时候进行合并。最简单的示例为: setNumcCompactThreads ????????设置合并的线程数 setSizeThreshold ????????设置大小的门限(小于这个大小的会被合并) enableCompactionOnCheckpoint ????????多少个 checkpoint 信号来了,会进行一次 compact 如果开启了 Compaction,那么必须在 source.sinkTo(fileSink)的时候添加 uid: ????????source.sinkTo(fileSink).uid("fileSink"); 写入时候的变化: 可以看到变成了.. ????????..<prefix>-<subtaskIndex>-<partFileIndex><suffix>.<status>.<UUID> 而且文件大小是持续在增加,在开启后会生成新的压缩后的 pending 文件,然后讲这些 pending 文件发送给提交者,以提交给给正式的文件,最后删除源文件。而且除非完毕,文件将一直保持 dot 文件,意味着这会导致文件对外可见会更慢。 结合图看就是一样开始生成文件,但是 double dot 的形式,等到满足 FileSink的条件后,经过 checkpoint 会变成 dot Finished File,然后如果这个时候触发了 compact,那么就会将可以 compact 的文件进行合并,首先创建一个.compacted 的文件,然后合并完成后变为正式文件 compacted,并删除 dot Finished File。 学会了配置,但流程到底是怎么样的?上面我们走过了整体配置 FileSink 在 STREAMING 模式下的配置和使用方式,能够自定义一些方法和函数解决生产中的各类问题,但这只是开始,我们要梳理清楚整体的 Flink 调用流程才算结束。 还记得官网强调我们必须进行 checkpoint 和 watermark 的设置吗,但在哪里实现的呢? FileSink我们看看 FileSink 的结构: FileSink 实现了四个接口: StatefulSink TwoPhaseCommittingSink WithCompatibleSink WithPreCommitTopology StatefulSink一个安全的写出 Sink,需要实现 serializable;并且需要安全的配置,所有的 writers 都是在 subtasks 中创建。 实现这个接口的还有我们熟悉的: KafkaSink/RuducingUpsertSink等。 主要包含: createWriter:创建一个 Writer return?bucketsBuilder.createWriter(context); restoreWriter: 从状态中恢复一个 Writer ????????FileWriter<IN>?writer?=?bucketsBuilder.createWriter(context); ????????writer.initializeState(recoveredState); ????????return?writer; getWriterStateSerializer:返回序列化方法。 ????????try?{ ????????????return?bucketsBuilder.getWriterStateSerializer(); ????????}?catch?(IOException?e)?{ ????????????//?it's?not?optimal?that?we?have?to?do?this?but?creating?the?serializers?for?the ????????????//?FileSink?requires?(among?other?things)?a?call?to?FileSystem.get()?which?declares ????????????//?IOException. ????????????throw?new?FlinkRuntimeException("Could?not?create?writer?state?serializer.",?e); ????????} TwoPhaseCommittingSink使用两阶段提交的 Sink,我们也在 Kafka 中看到过了。 主要就是由一个执行预提交的 SinkWriter? & 实际提交数据的 Committer 组成。 为了便于分离,SinkWriter 在检查点或者结束输入时进行创建可提交表,并发送给 Committer。 这个就需要实现他的方法进行一个 createCommitter & getCommittableSerializer 的特有方法: createCommitter: 创建 Committer return?bucketsBuilder.createCommitter(); getCommittableSerializer: 返回序列化方法 ????????try?{ ????????????return?bucketsBuilder.getCommittableSerializer(); ????????}?catch?(IOException?e)?{ ????????????//?it's?not?optimal?that?we?have?to?do?this?but?creating?the?serializers?for?the ????????????//?FileSink?requires?(among?other?things)?a?call?to?FileSystem.get()?which?declares ????????????//?IOException. ????????????throw?new?FlinkRuntimeException("Could?not?create?committable?serializer.",?e); ????????} WithCompatibleSink就是在 StatefulSInk 中的安全保证,要求返回一个接收器的状态名称列表。将新的 FileSink 可以从旧的 StreamingFileSink 的状态中回复,并且作为 checkpoint/savepoint 回复直接的替换。 这里 FileSink 直接指定为: return?Collections.singleton("bucket-states"); WithPreCommitTopology可以让用户进行提交信息的更改: CreateWriterorg.apache.flink.connector.file.sink.FileSink.RowFormatBuilder#createWriter org.apache.flink.connector.file.sink.FileSink.BulkFormatBuilder#createWriter 在 FileSInk 中的 CreateWrite 有两种实现方式,我们这里看我们示例中的 BulkFormatBuilder。 可以看到,根据我们的各种参数,最终返回了一个 FileWriter。那么这个 FileWriter 大概率就是我们最终的实现了。 FileWriterorg.apache.flink.connector.file.sink.writer.FileWriter#FileWriter 通过注释可以看到,这个类实现两个功能:
实现了四个接口:
重要方法讲一下。 initializeState当从故障中恢复后,怎么加载状态。 updateActiveBucketId如果单个 bucket 有多个状态,那么会进行合并操作。 其实这个大多数时候是因为两次Flink任务的并发不同导致的,其实我们在实现各种 state 后端的时候都应该考虑这个状态合并问题。 write从 SinkWriter 实现的方法,写数据使用。 首先计算进来的 element 对应的 BucketId,然后通过 Id 获取/创建对应的 bucket,调用 bucket 的写方法来写入数据。 并且将 numRecordsSendCounter + 1。 所以目标更明确了,我们通过这个方法就能了解到 element 可以计算出 bucketId,然后找到对应 bucket 来写入数据,整个操作有两个重要的对象: bucketAssigner & FileWriterBucket。后续我们专门讲讲这两个对象。 flush这个方法从 SinkWriter 实现,当输入结束或者 checkpoint 时调用,用来将 pending 的文件刷新。 为什么要 checkpoint ? prepareCommit准备 commit 阶段方法。 将不活跃的 buckets 遗弃。 特别是使用了 OnCheckpointingRollingPolicy,避免每次都创建 bucket。 然后将活跃的调用 FileWriterBucket#prepareCommit() 方法,这个类后面我们会重点说说。 可以理解为,FileWriter 管理了所有 buckets,这里准备提交所有 buckets,然后调用每个一个 bucket 的 prepareCommit()。 snapshotState对 活跃的 buckets 进行 state 存储。 getOrCreateBucketForBucketId通过 bucketId 来获取或创建 bucket。 Id 是通过每个 element 来获得的。 其实在我们的示例中,创建了一个 BucketAssigner,重写了 getBucketId, 这个路径 String 就是 bucketId: close关闭所有 bucket,停止写入。 将 所有 bucket onProcessingTime调用每个活跃 buckets 的 onProcessingTime。 org.apache.flink.connector.file.sink.writer.FileWriterBucket#onProcessingTime 最后会调用 FileWriteBucket 的 onProcessingTime 方法,来判断是否需要进行滚动了。 并且注册下一次视察的 timer。 activeBuckets活跃的 buckets。 装载在 HashMap 中,key 为 bucketId,value 为 FileWriterBucket。 BucketAssignerorg.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner 现在我们已经差不多完全理解 Bucket 的含义了,其实就是一个包装盒,盒子里可以有多个文件,包装盒有 Id,可以根据 element 和 context 计算出 Id。 所以一共有两个方法和一个 context 对象。 ????????getBucketId & getSerializer 对于不同的实现有不同的作用:
其中 BasePath 的 Id 只有一种为 ""。 DataTime 的 Id 通过处理数据的当前时间转换为 yyyy-MM-dd-HH 的 Id。 Table 的 Id 是通过 rowData 获取 partitionSpec 拼接获得: FileWriterBucketorg.apache.flink.connector.file.sink.writer.FileWriterBucket 真正的 bucket 载体,被管理在 FileWriter 中,可以通过 bucketFactory 构建。先讲讲 bucketFactory 的实现。 bucketFactoryorg.apache.flink.connector.file.sink.writer.FileWriterBucketFactory 主要申明了两个接口,getNewBucket & restoreBucket。 目前实现只有 DefaultFileWriterBucketFactory,直接使用 FileWriterBucket#getNew() & restore() 两个方法对应。 所以跟我们指定 watermark 规则基本一致,通过 FileSink.forRowFormat() 构建了一个基本的 FileSink,然后不断 .withXXX() / .enbaleXXX() 来附加各种属性,来构建一个 bucketFactory,通过 bucketFactory 来创建附带各种属性的 FileWriteBucket 供 FileWriter 使用,也被 FileWriter 所管理。 其中 bucketPath 是由两部分组成 basePath + bucketId,当 bucketId 为空时,直接返回 basePath。 FileWriterBucket 构造方法这里面参数基本我们都已经很熟悉了,除了一个 bucketWriter。 这个参数通过 FileSink.createBucketWriter() 来传递,这个后面再说。 FileWriterBucket 属性重点关注 rollingPolicy,uniqueId,pendingFiles,partCounter,inProgressFileToCleanup,inProgressPart。 rollingPolicy ????????org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy ????????就是我们的滚动策略,这个策略指的是在 bucket 中文件的滚动策略,主要包含三种模式: oncheckpoint/onEvent/onProcessingTime ????????比如我们的经常使用的 DefaultRollingPolicy.builder().withXXX().build() 返回的就是 DefaultRollingPolicy。 uniqueId ????????生成的唯一 UUID。 partCounter ????????当前 bucket 的文件数。 ????????初始化为 0。 ????????这个主要就是我们写出文件的时候的标识:<prefix>-<subtaskIndex>-<partFileIndex><suffix> 中的 partFileIndex 就是 partCounter 这个值: pendingFiles ????????pending 文件列表, ????????当 closePartFile 的时候,会将 inProgressPart 中已经写完待成为 pending 的文件 add 到? pendingFiles 中: 在 prepareCommit 阶段,会将所有 pendingFiles 中的文件提交出去,并进行 clear() 操作: inProgressFileToCleanup ????????主要是在状态恢复的时候使用。 ????????当状态恢复了,将之前的 inProgressPart 文件进行清理转换。 inProgressPart ????????org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter ????????当前正在写入的 Part 文件,包含 bucketId,path,outpuStream,creationTime, BulkWriter(以BulkPartWriter为例)。 在写入数据时,会判断是否有正在写入的文件,如果不存在,或者当前需要进行 shouldRollOnEvent 的时候,就会关闭当前 Part文件,并创建新的 InProgressPart进行写入 ????????org.apache.flink.connector.file.sink.writer.FileWriterBucket#write 当调用 closePartFile 的时候,org.apache.flink.connector.file.sink.writer.FileWriterBucket#closePartFile?会将当前 Part 文件进行 closeForCommit() 操作,并且将结果添加到 pendingFiles 中。 总结所以, Flink 使用 FileSink 来对外进行一整套 conf 的配置,包括基础的输出设定、bucketAssigner、RollingPolicy、outputFileConfig 和 Compact 相关的配置。 本文中 Compact 部分没有讲解实现。 然后在流程实现中包含:
Demo代码
|
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/12 4:00:14- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |