前言
好久没有更新 “好” 文章了,内心很过意不去,怎么变的这么懒了,哈哈哈哈哈
正好,最近数据湖的概念火的一塌糊涂,特别是 Hudi , 与 Flink 的结合越来越好,可以说 Flink + Hudi 就是未来的趋势,这不,我就来简单讲讲,给 “小白”当个引路人,让知识传播给大家,毕竟我也是从小白看着别人的文章过来的
Hudi
概述
Apache Hudi (简称:Hudi) 使得您能在hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,使得除了经典的批处理之外,还可以在数据湖上进行流处理 ,分别是 :Update/Delete 和 变更流
这是官方对Hudi描述的中文翻译
简单来说,就是构建在分布式存储系统上的一个服务,可以实现对底层存储系统数据的批流处理
此时你肯定很懵逼,HDFS不是不支持update操作吗,是的,在之前,我们想要更新HDFS里面存储的数据,哪怕是一个字段的值,我们都只能使用诸如Hive这样的工具,对表的数据进行覆写(overwrite)操作来实现数据的更新,所以就体现了HDFS的特性,一次写入,多次读取 。但是随着时代的发展 , HDFS的局限性就出来了 , Flink 的兴起标志着 “流” 才是未来 , 但是由于HDFS设计之初就是为了满足大数据场景的存储,就不会去迎合流处理,但是呢,业务的发展会促进技术的诞生,既然HDFS不能去迎合流处理,那基于HDFS的上层抽象可以吗?比如Hive , HBase ? 其实也是不行的 ,Hive 不论是使用MR、 Spark 其实都还是 覆写整个文件来达到数据更新的效果,而且这种方式时效性也是不行的,HBase虽然依靠合理的RowKey设计,可以让查询效率还不错,但是并不适合与Flink这样的流计算任务对接,而且也不适合OLAP的分析的场景。So ,结合以上问题点,Hudi的一些特性就产生了!
首先 , 我们需要一款基于HDFS的抽象,因为HDFS的分布式存储是Hadoop的核心 ,然后还要支持流式写入,方便与实时计算框架对接,那么数据写入后,对用户来说,应该是一个什么样的表现形式呢 ? 没错,就是表 ,万物皆可SQL!那么既然是一张表了,就应该实现对表的CRUD操作,并且还要支持事务,确保数在写入中途出错可以回滚,那么既然可以回滚,就可以有版本概念,支持MVCC,既然有了版本概念,那么在读取数据的时候,也有可以有不同的形式了
其实现在看来,Hudi这样的产品出现的出现一定是种必然,因为它的出现补全了现阶段实时流程的一块拼图
那么Hudi到底是如何去实现表的CRUD以及版本控制呢?让我们一探究竟吧!
设计原则
流式读/写:Hudi借鉴了数据库设计的原理,从零设计,应用于大型数据集记录流的输入和输出。为此,Hudi提供了索引实现,可以将记录的键快速映射到其所在的文件位置。同样,对于流式输出数据,Hudi通过其特殊列添加并跟踪记录级的元数据,从而可以提供所有发生变更的精确增量流 自管理: Hudi注意到用户可能对数据新鲜度(写友好)与查询性能(读/查询友好)有不同的期望,它支持了三种查询类型,这些类型提供实时快照,增量流以及稍早的纯列数据。在每一步,Hudi都努力做到自我管理(例如自动优化编写程序的并行性,保持文件大小)和自我修复(例如:自动回滚失败的提交),即使这样做会稍微增加运行时成本(例如:在内存中缓存输入数据已分析工作负载)。如果没有这些内置的操作杠杆/自我管理功能,这些大型流水线的运营成本通常会翻倍 物皆日志: Hudi还具有 append only、云数据友好的设计,该设计实现了日志结构化存储系统的原理,可以无缝管理所有云提供商的数据 键-值数据模型: 在写方面,Hudi表被建模为键值对数据集,其中每条记录都有一个唯一的记录键。此外,一个记录键还可以包括分区路径,在该路径下,可以对记录进行分区和存储。这通常有助于减少索引查询的搜索空间
表设计
了解了Hudi项目的关键技术动机后,现在让我们更深入地研究Hudi系统本身的设计。在较高的层次上,用于写Hudi表的组件使用了一种受支持的方式嵌入到Apache Spark作业中,它会在支持DFS的存储上生成代表Hudi表的一组文件。然后,在具有一定保证的情况下,诸如Apache Spark、Presto、Apache Hive之类的查询引擎可以查询该表 Hudi表的三个主要组件:
1)有序的时间轴元数据:类似于数据库事务日志
2)分层布局的数据文件:实际写入表中的数据。
3)索引(多种实现方式):映射包含指定记录的数据集
Hudi提供了以下功能来对基础数据进行写入、查询,这使其成为大型数据湖的重要模块:
1)支持快速,可插拔索引的upsert()
2)高效、只扫描新数据的增量查询
3)原子性的数据发布和回滚,支持恢复的Savepoint
4)使用mvcc(多版本并发控制)风格设计的读和写快照隔离
5)使用统计信息管理文件大小
6)已有记录update/delta的自管理压缩
7)审核数据修改的时间轴元数据
8)满足GDPR(通用数据保护条例)、数据删除功能
时间轴
在其核心,Hudi维护了一条包含在不同的即时时间(instant time)对数据集做的所有instant操作的timeline,从而提供表的即时视图,同时还有效支持按到达顺序进行数据检索 时间轴类似于数据库的redo/transaction日志,由一组时间轴实例组成 Hudi保证在时间轴上执行的操作的原子性和基于即时时间的时间轴一致性 时间轴被实现为表基础路径下.hoodie 元数据文件夹下的一组文件。具体来说,最新的instant被保存为单个文件,而较旧的instant被存档到时间轴归档文件夹中,以限制writers和queries列出的文件数量 一个Hudi 时间轴instant由下面几个组件构成: 1)操作类型:对数据集执行的操作类型 2)即时时间:即时时间通常是一个时间戳(例如:20190117010349),该时间戳按操作开始时间的顺序单调增加 3)即时状态:instant的当前状态;每个instant都有avro或者json格式的元数据信息,详细的描述了该操作的状态以及这个即时时刻instant的状态 关键的Instant操作类型有:
1)COMMIT:一次提交表示将一组记录原子写入到数据集中
2)CLEAN: 删除数据集中不再需要的旧文件版本的后台活动
3)DELTA_COMMIT:将一批记录原子写入到MergeOnRead存储类型的数据集中,其中一些/所有数据都可以只写到增量日志中
4)COMPACTION: 协调Hudi中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式。在内部,压缩表现为时间轴上的特殊提交
5)ROLLBACK: 表示提交/增量提交不成功且已回滚,删除在写入过程中产生的所有部分文件;
6)SAVEPOINT: 将某些文件组标记为"已保存",以便清理程序不会将其删除。
在发生灾难/数据恢复的情况下,它有助于将数据集还原到时间轴上的某个点;任何给定的即时都会处于以下状态之一:
a)REQUESTED:表示已调度但尚未初始化
b)INFLIGHT: 表示当前正在执行该操作
c)COMPLETED: 表示在时间轴上完成了该操作.
数据文件
Hudi将表组织成DFS上基本路径下的文件夹结构中 如果表是分区的,则在基本路径下还会有其他的分区,这些分区是包含该分区数据的文件夹,与Hive表非常类似 每个分区均由相对于基本路径的分区路径唯一标识 在每个分区内,文件被组织成文件组,由文件ID唯一标识。其中每个切片包含在某个提交/压缩即时时间生成的基本列文件(*.parquet) 以及一组日志文件(*.log*) ,该文件包含自生成基本文件以来对基本文件的插入/更新。 Hudi采用了MVCC设计,压缩操作会将日志和基本文件合并以产生新的文件片,而清理操作则将未使用的/较旧的文件片删除以回收DFS上的空间
索引
Hudi通过索引机制提供高效的upsert操作,该机制会将一个记录键+分区路径组合一致性的映射到一个文件ID.这个记录键和文件组/文件ID之间的映射自记录被写入文件组开始就不会再改变 简而言之,这个映射文件组包含了一组文件的所有版本 Hudi当前提供了3种索引实现(HBaseIndex,、HoodieBloomIndex(HoodieGlobalBloomIndex)、InMemoryHashIndex)来映射一个记录键到包含该记录的文件ID 这将使我们无需扫描表中的每条记录,就可显著提高upsert速度。 Hudi索引可以根据其查询分区记录的能力进行分类: 1)全局索引:不需要分区信息即可查询记录键映射的文件ID。比如,写程序可以传入null或者任何字符串作为分区路径(partitionPath),但索引仍然会查找到该记录的位置。全局索引在记录键在整张表中保证唯一的情况下非常有用,但是查询的消耗随着表的大小呈函数式增加 2)非全局索引:与全局索引不同,非全局索引依赖分区路径(partitionPath),对于给定的记录键,它只会在给定分区路径下查找该记录。这比较适合总是同时生成分区路径和记录键的场景,同时还能享受到更好的扩展性,因为查询索引的消耗只与写入到该分区下数据集大小有关系
表类型
Copy On Write
COW表写的时候数据直接写入basefile,(parquet)不写log文件。所以COW表的文件片只包含basefile(一个parquet文件构成一个文件片)。这种的存储方式的Spark DAG相对简单。关键目标是是使用partitioner将tagged Hudi记录RDD(所谓的tagged是指已经通过索引查询,标记每条输入记录在表中的位置)分成一些列的updates和inserts.为了维护文件大小,我们先对输入进行采样,获得一个工作负载profile,这个profile记录了输入记录的insert和update、以及在分区中的分布等信息。把数据从新打包,这样: 1)对于updates, 该文件ID的最新版本都将被重写一次,并对所有已更改的记录使用新值 2)对于inserts,记录首先打包到每个分区路径中的最小文件中,直到达到配置的最大大小。之后的所有剩余记录将再次打包到新的文件组,新的文件组也会满足最大文件大小要求
Merge On Read
MOR表写数据时,记录首先会被快速的写进日志文件,稍后会使用时间轴上的压缩操作 将其与基础文件合并。根据查询是读取日志中的合并快照流还是变更流,还是仅读取未合并的基础文件,MOR表支持多种查询类型。在高层次上,MOR writer在读取数据时会经历与COW writer 相同的阶段。这些更新将追加到最新文件篇的最新日志文件中,而不会合并。 对于insert,Hudi支持两种模式: 1)插入到日志文件:有可索引日志文件的表会执行此操作(HBase索引) 2)插入parquet文件:没有索引文件的表(例如布隆索引)与写时复制(COW)一样,对已标记位置的输入记录进行分区,以便将所有发往相同文件id的upsert分到一组。这批upsert会作为一个或多个日志块写入日志文件。Hudi允许客户端控制日志文件大小。对于写时复制(COW)和读时合并(MOR)writer来说,Hudi的WriteClient是相同的。几轮数据的写入将会累积一个或多个日志文件。这些日志文件与基本的parquet文件(如有)一起构成一个文件片,而这个文件片代表该文件的一个完整版本。这种表是用途最广、最高级的表。为写(可以指定不同的压缩策略,吸收突发写流量)和查询(例如权衡数据的新鲜度和查询性能)提供了很大的灵活性。同时它包含一个学习曲线,以便在操作上掌控他
写设计
写操作
了解Hudi数据源或者deltastreamer工具提供的3种不同写操作以及如何最好的利用他们可能会有所帮助。这些操作可以在对数据集发出的每个commit/delta commit中进行选择/更改
1)upsert操作:这是默认操作,在该操作中,首先通过查询索引将数据记录标记为插入或更新,然后再运行试探法确定如何最好地将他们打包到存储,以对文件大小进行优化,最终将记录写入。对于诸如数据库更改捕获之类的用例,建议在输入几乎肯定包含更新的情况下使用此操作
2)insert操作:与upsert相比,insert操作也会运行试探法确定打包方式,优化文件大小,但会完全跳过索引查询。因此对于诸如日志重复数据删除(结合下面提到的过滤重复项选项)的用例而言,它比upsert的速度快得多。这也适用于数据集可以容忍重复项,但只需要Hudi具有事务性写/增量拉取/存储管理功能的用例
3)bulk insert操作:upsert 和insert操作都会将输入记录保留在内存中,以加快存储启发式计算速度,因此对于最初加载/引导Hudi数据集的用例而言可能会很麻烦。Bulk insert提供了与insert相同的语义,同时实现了基于排序的数据写入算法,该算法可以很好的扩展数百TB的初始负载。但是这只是在调整文件大小方面进行的最大努力,而不是像insert/update那样保证文件大小
压缩
压缩是一个 instant操作,它将一组文件片作为输入,将每个文件切片中的所有日志文件与其basefile文件(parquet文件)合并,以生成新的压缩文件片,并写为时间轴上的一个commit。压缩仅适用于读时合并(MOR)表类型,并且由压缩策略(默认选择具有最大未压缩日志的文件片)决定选择要进行压缩的文件片。这个压缩策略会在每个写操作之后评估。 从高层次上讲,压缩有两种方式: 1)同步压缩:这里的压缩由写程序进程本身在每次写入之后同步执行的,即直到压缩完成后才能开始下一个写操作。就操作而言,这个是最简单的,因为无需安排单独的压缩过程,但保证的数据新鲜度最低。不过,如果可以在每次写操作中压缩最新的表分区,同时又能延迟迟到/较旧分区的压缩,这种方式仍然非常有用。 2)异步压缩:使用这种方式,压缩过程可以与表的写操作同时异步运行。这样具有明显的好处,即压缩不会阻塞下一批数据写入,从而产生近实时的数据新鲜度。Hudi DeltaStreamer之类的工具支持边界的连续模式,其中的压缩和写入操作就是以这种方式在单个Spark运行时集群中进行的
清理
清理是一项基本的即时操作,其执行的目的是删除旧的文件片,并限制表占用的存储空间。清理会在每次写操作之后自动执行,并利用时间轴服务器上缓存的时间轴元数据来避免扫描整个表来评估清理时机。 Hudi支持两种清理方式: 1)按commits / deltacommits清理:这是增量查询中最常见且必须使用的模式。以这种方式,Cleaner会保留最近N次commit/delta commit提交中写入的所有文件切片,从而有效提供在任何即时范围内进行增量查询的能力。尽管这对于增量查询很有帮助,但由于保留了配置范围内所有版本的文件片,因此,在某些高写入负载的场景下可能需要更大的存储空间 2)按保留的文件片清理:这是一种更为简单的清理方式,这里我们仅保存每个文件组中的最后N个文件片。诸如Apache Hive之类的某些查询引擎会处理非常大的查询,这些查询可能需要几个小时才能完成,在这种情况下,将N设置为足够大以至于不会删除查询仍然可以访问的文件片是很有用的。此外,清理操作会保证每个文件组下面会一直只有一个文件片(最新的一片)
DFS访问优化
Hudi还对表中存储的数据执行了几种秘钥存储管理功能。在DFS上存储数据的关键是管理文件大小和计数以及回收存储空间。例如,HDFS在处理小文件问题上臭名昭著–在NameNode上施加内存/RPC压力,可能破坏整个集群的稳定性。通常,查询引擎可在适当大小的列文件上提供更好的性能,因为它们可以有效地摊销获取列统计信息等的成本。即使在某些云数据存储上,列出包含大量小文件的目录也会产生成本。 下面是一些Hudi高效写,管理数据存储的方法: 1)小文件处理特性会剖析输入的工作负载,并将内容分配到现有的文件组,而不是创建新文件组(这会导致生成小文件) 2)在writer中使用一个时间轴缓存,这样只要Spark集群不每次都重启,后续的写操作就不需要列出DFS目录来获取指定分区路径下的文件片列表 3)用户还可以调整基本文件和日志文件大小之间的比值系数以及期望的压缩率,以便将足够数量的insert分到统一文件组,从而生成大小合适的基本文件 4)智能调整bulk insert并行度,可以再次调整大小合适的初始文件组。实际上,正确执行此操作非常关键,因为文件组一旦创建就不能被删除,而只能如前面所述对其进行扩展
查询
鉴于这种灵活而全面的数据布局和丰富的时间线,Hudi能够支持三种不同的查询表方式,具体取决于表的类型。查询类型COW/MOR 快照查询,查询在给定表或表分区中所有文件片中的最新基本文件上执行,将查看到最新提交的记录。通过并到给定表或表分区中的所有文件切片中最新的基本文件及其日志文件合来执行查询,将看到最新的delta-commit操作写入的的记录。增量查询在给定的开始,结束即时时间范围内,对最新的基本文件执行查询(称为增量查询窗口),同时仅使用Hudi指定的列提取在此窗口中写入的记录。查询是在增量查询窗口中对最新的文件片执行的,具体取决于窗口本身,读取基本块或日志块中读取记录的组合。读优化查询和快照查询相同仅访问基本文件,提供给定文件片自上次执行压缩操作以来的数据。通常查询数据的最新程度的保证取决于压缩策略
a)快照查询 可查看给定delta commit或者commit即时操作后表的最新快照。在读时合并(MOR)表的情况下,它通过即时合并最新文件片的基本文件和增量文件来提供近实时表(几分钟)。对于写时复制(COW),它可以替代现有的parquet表(或相同基本文件类型的表),同时提供upsert/delete和其他写入方面的功能
b)增量查询 可查看自给定commit/delta commit即时操作以来新写入的数据。有效的提供变更流来启用增量数据管道
c)读优化查询 可查看给定的commit/compact即时操作的表的最新快照。仅将最新文件片的基本/列文件暴露给查询,并保证与非Hudi表相同的列查询性能。指标读优化查询快照查询数据延迟高低查询延迟低高
未完待遇…
|