数据湖定义
业界对于数据湖的定义存在一定争议,个人认为数据湖就是针对传统hive数仓不支持acid、upsert、schema evolution等痛点上,提出的一种数据存储库。 hive的痛点:hive主要特性是提供了sql解析和元数据管理的功能,统一管理了存储在hdfs上数据的shcmea信息。但是设计之初hive并没有考虑支持upsert,schema evolution等特性,基于这些业务痛点,数据湖应运而生。
湖仓湖的核心特点
1、ACID特性的支持
ACDID即数据库中事务特性,但是数据湖的事务和oltp数据库的数据特性不同,数据湖是粗粒度的事务控制(filegroup级别)。所谓事务,本质就是一个并发问题,本质在于解决读写冲突和写写冲突。以hudi为例,支持行级并发控制和列级别并发控制(通过TimeLine实现)。
并发控制:hudi中的每个commit都被抽象为TimeLine上的一个instance,instance记录了本次操作的行为、时间戳和状态。冲突检查会在 instant 状态变换的两个节点进行,一个是 requested 转 inflight 状态,一个是 inflight 转 completed 状态。其中,后者状态变换时,会进行加锁操作,以实现版本隔离。 冲突检查即是对 instant 创建到状态变化的过程中其他已经完成/正在执行的 instant 之间的进行冲突检查,检查策略分为行列两种。
- 行级别的冲突检查即是不能同时有两个 instant 往同一个 file group 写。
- 列级别的冲突检查即是可以有两个 instant 往同一个 file group 写,但是两个 instant 写入的schema 不可以存在交集。
- 每个 instant 只写入 schema 中的部分列,log 文件中的数据只包含 schema 中的部分
- Compaction 按主键拼接不同列下的数据,Parquet 文件中存储的数据拥有完整的 schema
2、upsert支持
hudi支持对数据的upsert操作,对于upsert操作的支持是通过hudi的文件组织特性保证的。hudi中分为COW和MOR两种类型的表,upsert操作的时候,会根据每条数据的record key进行定位。
- COW表:使用专门的列式文件格式存储数据,更新时保存多版本,并且在写的过程中通过异步的Merge来实现重写(Rewrite)数据文件。
COW表只包含列式格式的Base文件,每次执行COMMIT操作会生成新版本的Base文件,最终执行COMPACTION操作时还是会生成列式格式的Base文件。每次执行INSERT或UPDATE操作,都会在Timeline上生成一个的COMMIT,同时对应着一个文件分片(File Slice)。如果是INSERT操作则生成文件分组的第一个新的文件分片,如果是UPDATE操作则会生成一个新版本的文件分片。 - MOR表:使用列式和行式文件格式混合的方式来存储数据,列式文件格式比如Parquet,行式文件格式比如Avro。更新时写入到增量(Delta)文件中,之后通过同步或异步的COMPACTION操作,生成新版本的列式格式文件。
Merge-On-Read表存在列式格式的Base文件,也存在行式格式的增量(Delta)文件,新到达的更新都会写到增量日志文件中,根据实际情况进行COMPACTION操作来将增量文件合并到Base文件上。通常,需要有效的控制增量日志文件的大小,来平衡读放大和写放大的影响。 下图中,每个文件分组都对应一个增量日志文件(Delta Log File)。COMPACTION操作在后台定时执行,会把对应的增量日志文件合并到文件分组的Base文件中,生成新版本的Base文件。
3、增量查询
Hudi支持三种查询类型:
- Snapshot Query:只能查询到给定COMMIT或COMPACTION后的最新快照数据。对于Copy-On-Write表,Snapshot Query能够查询到,已经存在的列式格式文件(Parquet文件);对于Merge-On-Read表,Snapshot Query能够查询到,通过合并已存在的Base文件和增量日志文件得到的数据。
- Incremental Query:可以根据commit时间查询固定时间之后的数据。即只能查询到最新写入Hudi表的数据,也就是给定的COMMIT/COMPACTION之后的最新数据。
- Read Optimized Query:只能查询到给定的COMMIT/COMPACTION之前所限定范围的最新数据。也就是说,只能看到列式格式Base文件中的最新数据。
4、schema evolution
hive更改表schema后需要全表回溯数据,是一种很重的操作。而iceberg的schema evolution特性可以支持修改表的schema。 基本原理是将底层parquet文件中的schema信息和iceberg中的schema建立ID映射。parquet文件的footer中会存储文件中的列信息,将parquet文件中的列信息和iceberg metastore中的列信息通过一个唯一ID建立映射关系。当读取文件时,根据iceberg metastore中列的ID信息,在parquet文件filter出对应列数据。写数据时将column ID和数据一起写入文件中,新列赋新ID,删除的ID不复。 读取数据时,用ID做映射,如果数据文件中没有,如果:
- 数据文件中没有,而metadata中有,说明表进行了add column操作,没有的column赋null值
- 数据文件中有,而metadata中有,说明表进行了delete column操作,读取parquet文件时过滤删除的列数据。
- 数据文件和metadate有相同的column ID但是column name不一样,说明表进行了rename操作。
partition evolution:iceberg支持更改表的分区字段,如开始为date分区,之后可以改为date、hour分区。因为iceberg数据中包含timestamp列,通过设置partition transform方式,iceberg会记录转换关系,并按需要进行partition evolution
5、timeline service
Hudi内部对每个表都维护了一个Timeline,这个Timeline是由一组作用在某个表上的Instant对象组成。Instant表示在某个时间点对表进行操作的,从而达到某一个状态的表示,所以Instant包含Instant Action,Instant Time和Instant State这三个内容,它们的含义如下所示:
- Instant Action:对Hudi表执行的操作类型,目前包括COMMITS、CLEANS、DELTA_COMMIT、COMPACTION、ROLLBACK、SAVEPOINT这6种操作类型。
- Instant Time:表示一个时间戳,这个时间戳必须是按照Instant Action开始执行的时间顺序单调递增的。
- Instant State:表示在指定的时间点(Instant Time)对Hudi表执行操作(Instant Action)后,表所处的状态,目前包括REQUESTED(已调度但未初始化)、INFLIGHT(当前正在执行)、COMPLETED(操作执行完成)这3种状态。
根据上图,说明如下:
- 例子场景是,在10:00~10.20之间,要对一个Hudi表执行Upsert操作,操作的频率大约是5分钟执行一次。
- 每次操作执行完成,会看到对应这个Hudi表的Timeline上,有一系列的COMMIT元数据生成。
- 当满足一定条件时,会在指定的时刻对这些COMMIT进行CLEANS和COMPACTION操作,这两个操作都是在后台完成,其中在10:05之后执行了一次CLEANS操作,10:10之后执行了一次COMPACTION操作。
我们看到,从数据生成到最终到达Hudi系统,可能存在延迟,如图中数据大约在07:00、08:00、09:00时生成,数据到达大约延迟了分别3、2、1小时多,最终生成COMMIT的时间才是Upsert的时间。对于数据到达时间(Arrival Time)和事件时间(Event Time)相关的数据延迟性(Latency)和完整性(Completeness)的权衡,Hudi可以将数据Upsert到更早时间的Buckets或Folders下面。通过使用Timeline来管理,当增量查询10:00之后的最新数据时,可以非常高效的找到10:00之后发生过更新的文件,而不必根据延迟时间再去扫描更早时间的文件,比如这里,就不需要扫描7:00、8:00或9:00这些时刻对应的文件(Buckets)。
Hudi文件管理
1、文件版本
一个新的 base commit time 对应一个新的 FileSlice,实际就是一个新的数据版本。HUDI 通过 TableFileSystemView 抽象来管理 table 对应的文件,比如找到所有最新版本 FileSlice 中的 base file (Copy On Write Snapshot 读)或者 base + log files(Merge On Read 读)。 通过 Timeline 和 TableFileSystemView 抽象,HUDI 实现了非常便捷和高效的表文件查找。
2、文件格式
Hoodie 的每个 FileSlice 中包含一个 base file (merge on read 模式可能没有)和多个 log file (copy on write 模式没有)。 每个文件的文件名都带有其归属的 FileID(即 FileGroup Identifier)和 base commit time(即 InstanceTime)。通过文件名的 group id 组织 FileGroup 的 logical 关系;通过文件名的 base commit time 组织 FileSlice 的逻辑关系。 HUDI 的 base file (parquet 文件) 在 footer 的 meta 去记录了 record key 组成的 BloomFilter,用于在 file based index 的实现中实现高效率的 key contains 检测。只有不在 BloomFilter 的 key 才需要扫描整个文件消灭假阳。 HUDI 的 log (avro 文件)是自己编码的,通过积攒数据 buffer 以 LogBlock 为单位写出,每个 LogBlock 包含 magic number、size、content、footer 等信息,用于数据读、校验和过滤。
3、Index
Hoodie key (record key + partition path) 和 file id (FileGroup) 之间的映射关系,数据第一次写入文件后保持不变,所以,一个 FileGroup 包含了一批 record 的所有版本记录。Index 用于区分消息是 INSERT还是 UPDATE。 Index创建过程:
- 新增 records 找到映射关系:record key => target partition
- 当前最新的数据 找到映射关系:partition => (fileID, minRecordKey, maxRecordKey) LIST (如果是 base files 可加速)
- 新增 records 找到需要搜索的映射关系:fileID => HoodieKey(record key + partition path) LIST,key 是候选的 fileID
- 通过 HoodieKeyLookupHandle 查找目标文件(通过 BloomFilter 加速)
Hudi写入流程
1、COW
- 先对 records 按照 record key 去重
- 首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)
- 对于 update 消息,会直接找到对应 key 所在的最新 FileSlice 的 base 文件,并做 merge 后写新的 base file (新的 FileSlice)
- 对于 insert 消息,会扫描当前 partition 的所有 SmallFile(小于一定大小的 base file),然后 merge 写新的 FileSlice;如果没有 SmallFile,直接写新的 FileGroup + FileSlice
2、MOR - 先对 records 按照 record key 去重(可选)
- 首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)
- 如果是 insert 消息,如果 log file 不可建索引(默认),会尝试 merge 分区内最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果没有 base file 就新写一个 FileGroup + FileSlice + base file;如果 log file 可建索引,尝试 append 小的 log file,如果没有就新写一个 FileGroup + FileSlice + base file
- 如果是 update 消息,写对应的 file group + file slice,直接 append 最新的 log file(如果碰巧是当前最小的小文件,会 merge base file,生成新的 file slice)log file 大小达到阈值会 roll over 一个新的
Hudi Compact操作
- 没有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 并写 base file
- 有 base file:走 copy on write upsert 流程,先读 log file 建 index,再读 base file,最后读 log file 写新的 base file
Flink 和 Spark streaming 的 writer 都可以 apply 异步的 compaction 策略,按照间隔 commits 数或者时间来触发 compaction 任务,在独立的 pipeline 中执行。
总结
通过对写流程的梳理我们了解到 HUDI 相对于其他数据湖方案的核心优势:
- 写入过程充分优化了文件存储的小文件问题,Copy On Write 写会一直将一个 bucket (FileGroup)的 base 文件写到设定的阈值大小才会划分新的 bucket;Merge On Read 写在同一个 bucket 中,log file 也是一直 append 直到大小超过设定的阈值 roll over。
- 对 UPDATE 和 DELETE 的支持非常高效,一条 record 的整个生命周期操作都发生在同一个 bucket,不仅减少小文件数量,也提升了数据读取的效率(不必要的 join 和 merge)。
|