| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> 笔记:分布式大数据技术原理(二)构建在 Hadoop 框架之上的 Hive 与 Impala -> 正文阅读 |
|
[大数据]笔记:分布式大数据技术原理(二)构建在 Hadoop 框架之上的 Hive 与 Impala |
”“” 有了 MapReduce,Tez 和 Spark 之后,程序员发现,MapReduce 的程序写起来真麻烦。他们希望简化这个过程。这就好比你有了汇编语言,虽然你几乎什么都能干了,但是你还是觉得繁琐。你希望有个更高层更抽象的语言层来描述算法和数据处理流程。于是就有了 Pig 和 Hive。Pig 是接近脚本方式去描述 MapReduce,Hive 则用的是 SQL。它们把脚本和 SQL语言翻译成 MapReduce 程序,丢给计算引擎去计算,而你就从繁琐的 MapReduce 程序中解脱出来,用更简单更直观的语言去写程序了。 “”“ ------摘自知乎《如何用形象的比喻描述大数据的技术生态?Hadoop、Hive、Spark 之间是什么关系?》,Xiaoyu Ma,对敏感词做了屏蔽处理 HiveHive 是一个构建于 Hadoop 顶层的数据仓库工具(注意区别分布式计算框架 MapReduce、Storm、Spark,不同的引擎在SQL翻译的逻辑和底层的程序是不一样的,比如 MR 引擎会把 SQL 翻译成 MR 、Spark 引擎会把 SQL 翻译成 RDD 程序、Tez 引擎可以理解为在 MR 的基础上做了 DAG 方向的基于内存的 shuffle 优化),支持大规模数据存储、分析,具有良好的可扩展性。某种程度上可以看作是用户编程接口,本身不存储和处理数据。它依赖分布式文件系统 HDFS 存储数据,依赖分布式并行计算模型 MapReduce 处理数据(底层计算的引擎默认是 MapReduce,可以将引擎更换为 Spark/Tez)。Hive 定义了简单的类似 SQL 的查询语言——HiveQL,用户可以通过编写的 HiveQL 语句运行 MapReduce 任务,可以很容易把原来构建在关系数据库上的数据仓库应用程序移植到 Hadoop 平台上。 Hive 是通过构建元数据,映射 HDFS 文件构建成表,本质还是 HDFS,实现离线大数据仓库。注意,Hive 并不是一个关系数据库。 Hive 中没有定义专门的数据格式,由用户指定,需要指定三个属性:列分隔符、行分隔符 、读取文件数据的方法(Hive 中默认有三个文件格式 TextFile,SequenceFile 以及 RCFile)。 Hive 在查询数据的时候,由于没有索引,需要扫描整个表,因此延迟较高;另外一个导致 Hive 执行延迟高的因素是 MapReduce 框架,由于 MapReduce 本身具有较高的延迟,因此在利用 MapReduce 执行 Hive 查询时,也会有较高的延迟(相对的,数据库如 MySQL 的执行延迟较低,当然,这个低是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive 的并行计算显然能体现出优势)。 – db:在 hdfs 中表现为 ${hive.metastore.warehouse.dir} 目录下一个文件夹 – table:在 hdfs 中表现为所属 db 目录下一个文件夹 – external table:与 table 类似,不过其数据存放位置可以在任意指定路径 – partition:在 hdfs 中表现为 table 目录下的子目录 – bucket:在 hdfs 中表现为同一个表目录下根据 hash 散列之后的多个文件 优点
缺点
架构Hive 的系统架构分为用户接口模块、驱动模块、元数据存储模块。 更一般的架构为: 从架构图上可以很清楚地看出 Hive 和 Hadoop(MapReduce,HDFS)的关系:
用户接口模块包括 CLI、HWI、JDBC、ODBC、Thrift Server;CLI 即 Shell 命令行;Thtift Server 是 Facebook 提供的一种软件框架,可以用来进行可扩展跨语言服务的框架,Hive 集成了该服务,能让不用的语言调用该服务;JDBC/ODBC 是 Hive 的 Java 接口,通过 Thrift Server 接入,然后发送给 Driver;WebGUI 是通过浏览器访问 Hive 驱动模块(Driver)包括解释器、编译器、优化器、执行器,负责把 HiveQL 语句转换成一系列 MapReduce 作业 元数据存储模块(Metastore)是一个独立的关系型数据库(自带 derby 数据库,或 MySQL 数据库),由 Cloudera 公司开发的新型查询系统,它提供 SQL 语义,能查询存储在 Hadoop 的 HDFS 和 HBase 上的 PB 级大数据,在性能上比 Hive 高出3~30倍。元数据包括表的名字、表的列和分区及其属性、表的属性(是否为外部表等)、表的数据所在目录等;?Metastore 默认存在自带的 derby 数据库中,缺点就是不适合多用户操作,并且数据存储目录不固定,数据库跟着Hive走,极度不方便管理,解决方案就是存在我们自己创建的 MySQL 库(本地或远程,远程模式的特点是:Hive 服务和 MetaStore 在不同的进程内,可能是不同的机器,“远”指的是 MetaStore 和 Hive 服务离得“远”) 工作原理下图描述了 Hive 和 Hadoop 之间的工作流程:
Hive,MySql 和 HDFS 之间的关系
MapReduce 实现原理可以通过 Explain 查看一个 SQL 如何变成 MapReduce 作业的过程,例如在 Hive Cli 中执行: Hive 把复杂 SQL 分解成多个 MapReduce Chain 执行,各 MR 的中间结果存为 HDFS 的临时文件,然后链式跑完即可获得最终结果。因此,只需明白其核心即可见微知著,下面介绍 join、group by 和 distinct 的原理(摘抄自美团技术团队的总结) Join select u.name, o.orderid from order o join user u on o.uid = u.uid; 在 Map 的输出 Value 中为不同表的数据打上 Tag 标记,在 Reduce 阶段根据 Tag 判断数据来源。MapReduce的过程如下(这里只是说明最基本的 Join 的实现,还有其他的实现方式): Group By select rank, isonline, count(*) from city group by rank, isonline; 将 GroupBy 的字段组合为 Map 的输出 Key 值,利用 MapReduce 的排序,在 Reduce 阶段保存 LastKey 区分不同的 Key。MapReduce 的过程如下(当然这里只是说明 Reduce 端的非 Hash 聚合过程): Distinct select dealid, count(distinct uid) num from order group by dealid; 当只有一个 Distinct 字段时,如果不考虑 Map 阶段的 Hash GroupBy,只需要将 GroupBy 字段和 Distinct 字段组合为 Map 输出 key,利用 MapReduce 的排序,同时将 GroupBy 字段作为 Reduce 的 Key,在 Reduce 阶段保存 LastKey 即可完成去重: 文件格式(1) Textfile 文本格式,Hive 的默认格式,数据不压缩,磁盘开销大、数据解析开销大。行式存储 对应的 hive API 为:org.apache.hadoop.mapred.TextInputFormat 和?org.apache.hive.ql.io.HiveIgnoreKeyTextOutputFormat (2)SequenceFile Hadoop 提供的一种二进制文件格式,是 Hadoop 支持的标准文件格式(其他生态系统并不适用),可以直接将对序列化到文件中,所以 sequencefile 文件不能直接查看,可以通过 Hadoop fs -text 查看。具有使用方便,可分割,可压缩,可进行切片。行式存储 对应 hive API 为:org.apache.hadoop.mapred.SequenceFileInputFormat 和?org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat (3)RCFile 是一种行列存储相结合的存储方式,先将数据按行进行分块再按列式存储,保证同一条记录在一个块上,避免读取多个块,有利于数据压缩和快速进行列存储。列式存储 对应的 hive API 为:org.apache.hadoop.hive.ql.io.RCFileInputFormat 和 org.apache.hadoop.hive.ql.io.RCFileOutputFormat (4)ORCFile orcfile 是对 rcfile 的优化,可以提高 hive 的读写、数据处理性能,提供更高的压缩效率(目前主流选择之一)。列式存储 (5)Parquet 一种列格式,可提供对其他 hadoop 工具的可移植性, 包括 Hive, Drill, Impala, Crunch, and Pig 对应的 hive API 为:org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat 和?org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat (6)Avro Avro 是一个数据序列化系统,设计用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro 提供的机制使动态语言可以方便地处理 Avro 数据 对应的 hive API 为:org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat 和?org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat 几种文件存储格式的性能测试结果:
数据倾斜定义: 由于数据分布不均匀,造成数据热点。 现象: 一个或几个 key 的记录数与平均记录数差异过大,最长时长远大于平均时长。任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce 子任务未完成。 数据倾斜优化:一般分为 join 引起和 group by 引起分别解决。
Group by 引起的数据倾斜分两方面优化: 第一个: set hive.groupby.mapaggr.checkinterval=100000; set hive.map.aggr.hash.min.reduction=0.5; hive.map.aggr=true(默认)参数控制在 group by 的时候是否 map 局部聚合,但也不是都会局部聚合,如果聚合前后差别不是很大,聚合也就没什么意义了。 后两个设置是判断是否需要做 map 局部聚合,即:预先取100000条数据聚合,如果聚合后的条数/100000>0.5,则不再聚合。 第二个: set Hive.groupby.skewindata=true; 控制启动两个 MR Job 完成,第一个 Job 先不按 GroupBy 字段分发,而是随机分发做一次聚合,然后启动第二个 Job,拿前面聚合过的数据按 GroupBy 字段分发计算出最终结果。但是否生效还存在限制,详情见?Hive-hive.groupby.skewindata 配置相关问题调研 Join 引起的数据倾斜优化主要分两个方向:skew join 和 重写业务逻辑 Skew join: set hive.optimize.skewjoin=true; set hive.skewjoin.key=100000; 记录超过 hive.skewjoin.key(默认100000)阈值的 key 值先写入 hdfs,然后再进行一个 map join 的 job 任务,最终和其他 key 值的结果合并为最终结果。其过程如下图: 重写业务逻辑 这个需要结合具体的场景重写,例如:在日志表与用户表关联时候(通过 user_id 关联),直接关联可能导致 user_id 为 null 的发生数据倾斜,此时可以把日志表中 user_id 为 null 的单独处理,如下: SELECT a.xx, b.yy 常见问题请说明 hive 中 Sort By,Order By,Cluster By,Distrbute By 各代表什么意思 order by:会对输入做全局排序,因此只有一个 reducer(多个 reducer 无法保证全局有序)。只有一个 reducer,会导致当输入规模较大时,需要较长的计算时间 sort by:对分区内的数据进行排序,不是全局排序,其在数据进入 reducer 前完成排序 distribute by:对 map 输出进行分区,按照指定的字段对数据进行划分输出到不同的 reducer 中。常和 sort by 一起使用,例如:select mid, money, name from store distribute by mid sort by mid asc, money asc cluster by:当 distribute by 和 sort by 字段相同时,可使用 cluster by 方式替代,cluster by 具有 distribute by 和 sort by 的组合功能。但是排序只能是升序排序,不能指定排序规则为 ASC 或者 DESC NULL 在 hive 的一般处理 NULL 默认的存储都是 \N,可以在建表时通过 serialization.null.format 设置 NULL 值的过滤,一般是 is null 和 is not null multi-group 新特性的好处 multi group by 可以将查询中的多个 group by 操作组装到一个 MapReduce 任务中,起到优化作用 ImpalaImpala 是一个运行在 Hadoop 之上的大规模并行处理(MPP)查询引擎,提供对 Hadoop 集群数据的高性能、低延迟的 SQL 查询,使用 HDFS 作为底层存储。它对查询的快速响应使交互式查询和对分析查询的调优成为可能,而这些在针对处理长时间批处理作业的 SQL-on-Hadoop 传统技术上是无法完成的。Impala 与 Hive 元数据存储数据库相结合,在这两个组件之间共享数据库表,并且 Impala 与 HiveQL 的语法兼容。因此既可以使用 Impala 也可以使用 Hive 进行建立表、发布查询、装载数据等操作。Impala 可以在已经存在的 Hive 表上执行交互式实时查询。 起源Impala 最开始是参照 Dremel 系统进行设计的。Dremel 是 Google 的交互式数据分析系统,它构建于 Google 的 GFS(Google File System)等系统之上,支撑了 Google 的数据分析服务 BigQuery 等诸多服务。 Dremel 的技术亮点主要有两个:一是实现了嵌套型数据的列存储;二是使用了多层查询树,使得任务可以在数千个节点上并行执行和聚合结果。 列存储在关系型数据库中并不陌生,它可以减少查询时处理的数据量,有效提升查询效率。Dremel 的列存储的不同之处在于它针对的并不是传统的关系数据,而是嵌套结构的数据。Dremel 可以将一条条的嵌套结构的记录转换成列存储形式,查询时根据查询条件读取需要的列,然后进行条件过滤,输出时再将列组装成嵌套结构的记录输出,记录的正向和反向转换都通过高效的状态机实现。 另外,Dremel 的多层查询树则借鉴了分布式搜索引擎的设计,查询树的根节点负责接收查询,并将查询分发到下一层节点,底层节点负责具体的数据读取和查询执行,然后将结果返回上层节点。 Impala 在受到 Google 的 Dremel 启发下开发的实时交互 SQL 大数据查询工具,Impala 没有再使用缓慢的 Hive+MapReduce 批处理,而是通过使用与商用并行关系数据库中类似的分布式查询引擎(由 Query Planner、Query Coordinator 和 Query Exec Engine 三部分组成),可以直接从 HDFS 或 HBase 中用 SELECT、JOIN 和统计函数查询数据,从而大大降低了延迟。 因此可以说,Impala 其实就是 Hadoop 的 Dremel。 为什么使用 Impala
Impala 缺省使用 Parquet 文件格式,这种列式存储对于典型数据仓库场景下的大查询是最优的(Parquet 是一种列式存储,它不像普通数据仓库那样水平存储数据,而是垂直存储数据;当查询在数值列上应用聚合函数时,这种存储方式将带来巨大的性能提升,原因是只需要读取文件中该列的数据,而不是像 Hive 需要读取整个数据集;Parquet文件格式支持高效的压缩编码方式,例如 Hadoop 和 Hive 缺省使用的 snappy 压缩;Parquet 文件也可用 Hive 和 Pig 处理 劣势
适用场景
架构Impala 由不同的守护进程组成,每种守护进程运行在 Hadoop 集群中的特定主机上,其中 Impalad、Statestored、Catalogd 三个守护进程在其架构中扮演主要角色。 Impala 守护进程(Impala Daemon)Impala 的核心组件是一个运行在集群中每个数据节点上的守护进程,物理表现为 Impalad 进程,由三个模块组成:Query Planner、Query Coordinator 和 Query Executor,前两个模块组成前端,负责接收 SQL 查询请求,解析 SQL 并转换成执?计划,交由后端执?。 该进程读写数据文件,接收从 Impala-Shell 命令行、Hue、JDBC、ODBC (Java或其它语言的交互接口)提交的查询请求,将查询工作并行分布到集群的数据节点上,并将查询的中间结果返回给中心协调节点。 可以将查询提交至任意一个数据节点上运行的 Impala 守护进程,此守护进程实例担任该查询的协调器,其它节点提交部分中间结果返给协调器,协调器构建查询的最终结果集。 当在试验环境使用 Impala-Shell 命令行运行 SQL 时,出于方便性,通常总是连接同一个 Impala 守护进程。而在生产环境负载的集群中,可以采用循环的方式,通过 JDBC 或 ODBC 接口,将每个查询轮流提交至不同的 Impala 守护进程,以达到负载均衡。 Impala守护进程持续与 Statestore 进行通信,以确认每个节点健康状况以及是否可以接收新的任务。 当集群中的任何 Impala 节点建立、修改、删除任何类型的对象,或者通过 Impala 处理一个 insert 或 load data 语句时,Catalogd 守护进程(Impala 1.2引入)都会发出广播消息。Impala 守护进程会接收这种从 Catalogd 守护进程发出的广播消息。这种后台通信减少了对 refresh 或 invalidate metadata 语句的需要,而在 Impala 1.2 版本前,这些语句被用于在节点间协调元数据信息。 Impala Statestore叫做 Statestore 的 Impala 组件检查集群中所有数据节点上 Impala 守护进程的健康状况,并将这些信息持续转发给每个 Impala 守护进程,其物理表现为一个名为 Statestored 的守护进程,该进程只需要在集群中的一台主机上启动。 如果 Impala 守护进程由于硬件、软件、网络或其它原因失效,Statestore 会通知所有其它的 Impala 守护进程,这样以后的查询就不会再向不可到达的节点发出请求。 Statestore 的目的只是在发生某种错误时提供帮助,因此在正常操作一个 Impala 集群时,它并不是一个关键组件。即使 Statestore 没有运行或者不可用,Impala 守护进程依然会运行,并像通常一样在它们中分法任务。 这时如果一个 Impala 守护进程失效,仅仅是降低了集群的鲁棒性。当 Statestore 恢复可用后,它会重建与 Impala 守护进程之间的通信并恢复监控功能。 在 Impala 中,所有负载均衡和高可用的考虑都是用于 Impala 守护进程的。Statestored 和 Catalogd 进程没有高可用的需求,因为这些进程即使出现问题也不会引起数据丢失。当这些进程由于所在的主机停机而变成不可用时,可以这样处理:先停止 Impala 服务,然后删除 Impala StateStore 和 Impala Catalog 服务器角色,再在另一台主机上添加这两个角色,最后重启 Impala 服务。 Impala Catalog 服务称为 Catalog 服务的 Impala 组件将 Impala SQL 语句产生的元数据改变转发至集群中的所有数据节点,其物理表现为一个名为 Catalogd 的守护进程,该进程只需要在集群中的一台主机上启动,而且应该与 Statestored 进程在同一台主机上(因为 Catalogd 进程所有请求都是经过 Statestored 进程发送)。 由于 Catalog 服务的存在,当通过执行 Impala 语句而改变元数据时,不需要再发出 refresh 或 invalidate metadata 语句。然而,当通过 Hive 执行建立表、装载数据等操作后,在一个 Impala 节点上执行查询前,仍然需要先发出 refresh 或 invalidate metadata 语句。 例如,通过 Impala 执行的 create table、insert 或其它改变表或改变数据的操作,无需执行 refresh and invalidate metadata 语句。而如果这些操作是用过 Hive 执行的,或者是直接操纵的 HDFS 数据文件,仍需执行 refresh and invalidate metadata 语句(只需在一个 Impala 节点执行,而不是全部节点)。 缺省情况下,元数据在 Impala 启动时异步装载并缓存,这样 Impala 可以立即接收查询请求。如果想让 Impala 等所有元数据装载后再接收查询请求,需要设置 Catalogd 的配置选项 load_catalog_in_background=false Impala 的元数据和元数据存储Impala 使用一个叫做 Metastore 的数据库维护它的表定义信息,同时 Impala 还跟踪其它数据文件底层特性的元数据,如 HDFS 中数据块的物理位置信息。 对于一个有很多分区或很多数据的大表,获取它的元数据可能很耗时,有时需要花上几分钟的时间。因此每个 Impala 节点都会缓存这些元数据,当后面再查询该表时,就可以复用缓存中的元数据。 如果表定义或表中的数据更新了,集群中所有其它的 Impala 守护进程在查询该表前, 都必须能收到最新的元数据,并更新自己缓存的元数据。在 Impala 1.2或更高版本中,这种元数据的更新是自动的,由 Catalogd 守护进程为所有通过 Impala 发出的 DDL 和 DML 语句进行协调。 对于通过 Hive 发出的 DDL 和 DML,或者手工改变了 HDFS 文件的情况,还是需要在 Impala 中使用 refresh 语句(当新的数据文件被加到已有的表上)或 invalidate metadata 语句(新建表、删除表、执行了 HDFS 的 rebalance 操作,或者删除了数据文件)。 Invalidate metadata 语句获取 Metastore 中存储的所有表的元数据。如果能够确定在 Impala 外部只有特定的表被改变,可以为每一个受影响的表使用 refresh 表名,该语句只获取特定表的最新元数据。 工作原理1.Client 提交任务 生成查询计划分为两个阶段:(1)生成单机查询计划,单机执行计划与关系数据库执行计划相同,所用查询优化方法也类似(2)生成分布式查询计划。 根据单机执行计划, 生成真正可执行的分布式执行计划,降低数据移动, 尽量把数据和计算放在一起;注意,单机查询计划不能直接执行, 必须转换成分布式查询计划。
3.任务调度和分发 单机执行计划以?个SQL例?来展示查询计划: select t1.n1, t2.n2, count(1) as c from t1 join t2 on t1.id = t2.id join t3 on t1.id = t3.id where t3.n3 between ‘a’ and ‘f’ group by t1.n1, t2.n2 order by c desc limit 100;
分布式执行计划?分布式执?计划中涉及到多表的 Join,Impala 会根据表的??来决定 Join 的?式,主要有两种分别是 Hash Join 与 Broadcast Join 上?分布式执?计划中可以看出 T1、T2 表??些,? T3 表??些,所以对于 T1 与 T2 的 Join,Impala选择使? Hash Join,对于 T3 表选择使? Broadcast ?式,直接把 T3 表?播到需要 Join 的节点上:
与 Hive 的关系相对于 Hive 使用的优化技术
对比相同点
不同点
Via 煜哥: Impala(常和 Kudu 搭配使用)能实现 transaction 数据摄入实时性、响应实时查询的完整性 Hive 没有 transaction,因此加入 partition,比如 by hour,那么就可以按小时实时更新 Hive 主要公司内部用,不追求实时响应速度 稳健性 Hive on MapReduce > Spark > Impala,因为 Hive 每一步 MapReduce 都有一个入盘的操作,Impala 是 MPP,所以没有 速度 Impala > Spark > Hive 因为 Impala 基于 MPP,默认使用所有资源,省了很多资源分配的操作,但会存在一个人查询要1s,100个人分别同时查询要100s ,同时如果一个地方出错,由于是基于 MPP,所以不能”断点续传“ Hive 和 Impala 是批处理,Spark 是流处理,streaming 阅读材料 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 | -2024/11/23 22:47:53- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |