Hadoop HADOOP是apache旗下的一套开源软件平台 提供的功能:利用服务器集群,根据用户的自定义业务逻辑,对海量数据进行分布式处理 HADOOP的核心组件有 HDFS(分布式文件系统) ARN(运算资源调度系统) MAPREDUCE(分布式运算编程框架)
重点组件: HDFS:分布式文件系统 MAPREDUCE:分布式运算程序开发框架 HIVE:基于大数据技术(文件系统+运算框架)的SQL数据仓库工具 HBASE:基于HADOOP的分布式海量数据库 ZOOKEEPER:分布式协调服务基础组件 Mahout:基于mapreduce/spark/flink等分布式运算框架的机器学习算法库 Oozie:工作流调度框架 Sqoop:数据导入导出工具 Flume:日志数据采集框架
重要特性如下: (1)HDFS中的文件在物理上是分块存储(block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在hadoop2.x版本中是128M,老版本中是64M
(2)HDFS文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data
(3)目录结构及文件分块信息(元数据)的管理由namenode节点承担 ——namenode是HDFS集群主节点,负责维护整个hdfs文件系统的目录树,以及每一个路径(文件)所对应的block块信息(block的id,及所在的datanode服务器)
(4)文件的各个block的存储管理由datanode节点承担 ---- datanode是HDFS集群从节点,每一个block都可以在多个datanode上存储多个副本(副本数量也可以通过参数设置dfs.replication)
(5)HDFS是设计成适应一次写入,多次读出的场景,且不支持文件的修改 1.HDFS集群分为两大角色:NameNode、DataNode、 2.NameNode负责管理整个文件系统的元数据(block日志,镜像文件fsimage ) 3.DataNode 负责管理用户的文件数据块 4.文件会按照固定的大小(blocksize)切成若干块后分布式存储在若干台datanode上 5.每一个文件块可以有多个副本,并存放在不同的datanode上 6.Datanode会定期向Namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量 7.HDFS的内部工作机制对客户端保持透明,客户端请求访问HDFS都是通过向namenode申请来进行
A、内存中有一份完整的元数据(内存meta data) B、磁盘有一个“准完整”的元数据镜像(fsimage)文件(在namenode的工作目录中) C、用于衔接内存metadata和持久化元数据镜像fsimage之间的操作日志(edits文件)
元数据的checkpoint 每隔一段时间,会由secondary namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行merge(这个过程称为checkpoint)
secondary Namenode 的作用:为namenode提供服务: 合并日志和镜像文件. namenode和secondary namenode的工作目录存储结构完全相同,所以,当namenode故障退出需要重新恢复时,可以从secondary namenode的工作目录中将fsimage拷贝到namenode的工作目录,以恢复namenode的元数据
为什么会产生yarn,它解决了什么问题,有什么优势? Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
Yarn最主要的功能就是解决运行的用户程序与yarn框架完全解耦。 Yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、(流式计算)storm程序,(流式计算框架)spark程序……
1.2 MapReduce优缺点 1.2.1 优点 1.MapReduce?易于编程 它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。 2.良好的扩展性 当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。 3.高容错性 MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。 4.适合PB级以上海量数据的离线处理 这里加红字体离线处理,说明它适合离线处理而不适合在线处理。比如像毫秒级别的返回一个结果,MapReduce很难做到。 1.2.2 缺点 MapReduce不擅长做实时计算、流式计算、DAG(有向图)计算。 1.实时计算 MapReduce无法像Mysql一样,在毫秒或者秒级内返回结果。 2.流式计算 流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。 3.DAG(有向图)计算 多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
Shuffer map task 从split中读取数据,进行处理后,输出key/value,对键值对进行Partitioner后,存入到缓存中,缓存默认大小是100M,当缓存内容达到80M时,启动溢写操作,把缓存区数据写入一个溢写文件,在写入文件之前,会对键值对进行分区排序和合并(如果设置的话),当该map task处理完所有数据后,需要对该map生成的所有溢写文件进行merger操作,生成一个文件作为该maptask的成果,reduce task接受到通知后,就回拉取各个map task的成果数据,放到缓存区中,当缓存区内容达到阀值时,同样执行溢写操作,生成溢写文件,当把所有的map task的成果数据读取完毕后,会把生成的所有溢写文件进行merge操作,生成一个文件作为reduce task的输出数据。
MapReduce优化方法 MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。 数据输入 (1)合并小文件:在执行mr任务前将小文件进行合并,大量的小文件会产生大量的map任务,增大map任务装载次数,而任务的装载比较耗时,从而导致mr运行较慢。 (2)采用CombineTextInputFormat来作为输入,解决输入端大量小文件场景。 Map阶段 1)减少溢写(spill)次数:通过调整io.sort.mb及sort.spill.percent参数值,增大触发spill的内存上限,减少spill次数,从而减少磁盘IO。 2)减少合并(merge)次数:通过调整io.sort.factor参数,增大merge的文件数目,减少merge的次数,从而缩短mr处理时间。 3)在map之后,不影响业务逻辑前提下,先进行combine处理,减少 I/O。 Reduce阶段 1)合理设置map和reduce数:两个都不能设置太少,也不能设置太多。太少,会导致task等待,延长处理时间;太多,会导致 map、reduce任务间竞争资源,造成处理超时等错误。 2)设置map、reduce共存:调整slowstart.completedmaps参数,使map运行到一定程度后,reduce也开始运行,减少reduce的等待时间。 3)规避使用reduce:因为reduce在用于连接数据集的时候将会产生大量的网络消耗。 4)合理设置reduce端的buffer:默认情况下,数据达到一个阈值的时候,buffer中的数据就会写入磁盘,然后reduce会从磁盘中获得所有的数据。也就是说,buffer和reduce是没有直接关联的,中间多个一个写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得buffer中的一部分数据可以直接输送到reduce,从而减少IO开销:mapred.job.reduce.input.buffer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读buffer中的数据直接拿给reduce使用。这样一来,设置buffer需要内存,读取数据需要内存,reduce计算也要内存,所以要根据作业的运行情况进行调整。
Zookeeper概述 Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。 Zookeeper=文件系统+通知机制+选举机制 特点 1)Zookeeper:一个领导者(leader),多个跟随者(follower)组成的集群。 2)Leader负责进行投票的发起和决议,更新系统状态 3)Follower用于接收客户请求并向客户端返回结果,在选举Leader过程中参与投票 4)集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。 5)全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的。 6)更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行。 7)数据更新原子性,一次数据更新要么成功,要么失败。 8)实时性,在一定时间范围内,client能读到最新数据。 选举机制 1)半数机制(Paxos 协议):集群中半数以上机器存活,集群可用。所以zookeeper适合装在奇数台机器上。 2)Zookeeper虽然在配置文件中并没有指定master和slave。但是,zookeeper工作时,是有一个节点为leader,其他则为follower,Leader是通过内部的选举机制临时产生的
节点类型 1.Znode有两种类型 短暂(ephemeral):客户端和服务器端断开连接后,创建的节点自己删除 持久(persistent):客户端和服务器端断开连接后,创建的节点不删除 2.Znode有四种形式的目录节点(默认是persistent ) (1)持久化目录节点(PERSISTENT) 客户端与zookeeper断开连接后,该节点依旧存在 (2)持久化顺序编号目录节点(PERSISTENT_SEQUENTIAL) 客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号 (3)临时目录节点(EPHEMERAL) 客户端与zookeeper断开连接后,该节点被删除 (4)临时顺序编号目录节点(EPHEMERAL_SEQUENTIAL) 客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号
Hive 什么是Hive
Hive:由Facebook开源用于解决海量结构化日志的数据统计。 mapreduce : 海量数据的分布式计算框架. Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并提供类SQL查询功能。 本质是:将HQL转化成MapReduce程序 1)Hive处理的数据存储在HDFS 2)Hive分析数据底层的实现是MapReduce 3)执行程序运行在Yarn上 Hive的优缺点 优点 1)操作接口采用类SQL语法,提供快速开发的能力(简单、容易上手)。 2)避免了去写MapReduce,减少开发人员的学习成本。 3)Hive的执行延迟比较高,因此Hive常用于数据分析,对实时性要求不高的场合。 4)Hive优势在于处理大数据,对于处理小数据没有优势,因为Hive的执行延迟比较高。 5)Hive支持用户自定义函数,用户可以根据自己的需求来实现自己的函数。 缺点 1.Hive的HQL表达能力有限 (1)迭代式算法无法表达(第二个计算需要第一个结果) (2)数据挖掘方面不擅长 2.Hive的效率比较低 (1)Hive自动生成的MapReduce作业,通常情况下不够智能化 (2)Hive调优比较困难,粒度较粗 Hive和数据库比较 由于?Hive?采用了类似SQL?的查询语言?HQL(Hive Query Language),因此很容易将?Hive?理解为数据库。其实从结构上来看,Hive?和数据库除了拥有类似的查询语言,再无类似之处。本文将从多个方面来阐述?Hive?和数据库的差异。数据库可以用在?Online?的应用中,但是Hive?是为数据仓库而设计的,清楚这一点,有助于从应用角度理解?Hive?的特性。 1.4.1 查询语言 由于SQL被广泛的应用在数据仓库中,因此,专门针对Hive的特性设计了类SQL的查询语言HQL。熟悉SQL开发的开发者可以很方便的使用Hive进行开发。 1.4.2 数据存储位置 Hive?是建立在?Hadoop?之上的,所有?Hive?的数据都是存储在?HDFS?中的。而数据库则可以将数据保存在块设备或者本地文件系统中。 1.4.3 数据更新 由于Hive是针对数据仓库应用设计的,而数据仓库的内容是读多写少的。因此,Hive中不建议对数据的改写,所有的数据都是在加载的时候确定好的。而数据库中的数据通常是需要经常进行修改的,因此可以使用?INSERT?INTO?…??VALUES?添加数据,使用?UPDATE?…?SET修改数据。 1.4.4 索引 hive不建立索引 Hive在加载数据的过程中不会对数据进行任何处理,甚至不会对数据进行扫描,因此也没有对数据中的某些Key建立索引。Hive要访问数据中满足条件的特定值时,需要暴力扫描整个数据,因此访问延迟较高。由于?MapReduce?的引入,?Hive?可以并行访问数据,因此即使没有索引,对于大数据量的访问,Hive?仍然可以体现出优势。数据库中,通常会针对一个或者几个列建立索引,因此对于少量的特定条件的数据的访问,数据库可以有很高的效率,较低的延迟。由于数据的访问延迟较高,决定了?Hive?不适合在线数据查询。 1.4.5 执行 Hive中大多数查询的执行是通过?Hadoop?提供的?MapReduce?来实现的。而数据库通常有自己的执行引擎。 1.4.6 执行延迟 Hive?在查询数据的时候,由于没有索引,需要扫描整个表,因此延迟较高。另外一个导致?Hive?执行延迟高的因素是?MapReduce框架。由于MapReduce?本身具有较高的延迟,因此在利用MapReduce?执行Hive查询时,也会有较高的延迟。相对的,数据库的执行延迟较低。当然,这个低是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。 1.4.7 可扩展性 由于Hive是建立在Hadoop之上的,因此Hive的可扩展性是和Hadoop的可扩展性是一致的(世界上最大的Hadoop?集群在?Yahoo!,2009年的规模在4000?台节点左右)。而数据库由于?ACID?语义的严格限制,扩展行非常有限。目前最先进的并行数据库?Oracle?在理论上的扩展能力也只有100台左右。 1.4.8 数据规模 由于Hive建立在集群上并可以利用MapReduce进行并行计算,因此可以支持很大规模的数据;对应的,数据库可以支持的数据规模较小。
基本数据类型 表6-1 Hive数据类型 Java数据类型 长度 例子 TINYINT byte 1byte有符号整数 20 SMALINT short 2byte有符号整数 20 INT int 4byte有符号整数 20 BIGINT long 8byte有符号整数 20 BOOLEAN boolean 布尔类型,true或者false TRUE FALSE FLOAT float 单精度浮点数 3.14159 DOUBLE double 双精度浮点数 3.14159 STRING string 字符系列。可以指定字符集。可以使用单引号或者双引号。 ‘now is the time’ “for all good men” TIMESTAMP 时间类型 BINARY 字节数组 对于Hive的String类型相当于数据库的varchar类型,该类型是一个可变的字符串,不过它不能声明其中最多能存储多少个字符,理论上它可以存储2GB的字符数。 集合数据类型 表6-2 数据类型 描述 语法示例 STRUCT 和c语言中的struct类似,都可以通过“点”符号访问元素内容。例如,如果某个列的数据类型是STRUCT{first STRING, last STRING},那么第1个元素可以通过字段.first来引用。 struct() MAP MAP是一组键-值对元组集合,使用数组表示法可以访问数据。例如,如果某个列的数据类型是MAP,其中键->值对是’first’->’John’和’last’->’Doe’,那么可以通过字段名[‘last’]获取最后一个元素 map() ARRAY 数组是一组具有相同类型和名称的变量的集合。这些变量称为数组的元素,每个数组元素都有一个编号,编号从零开始。例如,数组值为[‘John’, ‘Doe’],那么第2个元素可以通过数组名[1]进行引用。 Array() Hive有三种复杂数据类型ARRAY、MAP 和 STRUCT。ARRAY和MAP与Java中的Array和Map类似,而STRUCT与C语言中的Struct类似,它封装了一个命名字段集合,复杂数据类型允许任意层次的嵌套。 类型转化 Hive的原子数据类型是可以进行隐式转换的,类似于Java的类型转换,例如某表达式使用INT类型,TINYINT会自动转换为INT类型,但是Hive不会进行反向转化,例如,某表达式使用TINYINT类型,INT不会自动转换为TINYINT类型,它会返回错误,除非使用CAST操作。 1.隐式类型转换规则如下 (1)任何整数类型都可以隐式地转换为一个范围更广的类型,如TINYINT可以转换成INT,INT可以转换成BIGINT。 (2)所有整数类型、FLOAT和STRING类型都可以隐式地转换成DOUBLE。 (3)TINYINT、SMALLINT、INT都可以转换为FLOAT。 (4)BOOLEAN类型不可以转换为任何其它的类型。 2.可以使用CAST操作显示进行数据类型转换 例如CAST(‘1’ AS INT)将把字符串’1’ 转换成整数1;如果强制类型转换失败,如执行CAST(‘X’ AS INT),表达式返回空值 NULL。 管理表 1.理论 默认创建的表都是所谓的管理表,有时也被称为内部表。因为这种表,Hive会(或多或少地)控制着数据的生命周期。Hive默认情况下会将这些表的数据存储在由配置项hive.metastore.warehouse.dir(例如,/user/hive/warehouse)所定义的目录的子目录下。 当我们删除一个管理表时,Hive也会删除这个表中数据。管理表不适合和其他工具共享数据。 外部表 1.理论 因为表是外部表,所以Hive并非认为其完全拥有这份数据。删除该表并不会删除掉这份数据,不过描述表的元数据信息会被删除掉。 2.管理表和外部表的使用场景 每天将收集到的网站日志定期流入HDFS文本文件。在外部表(原始日志表)的基础上做大量的统计分析,用到的中间表、结果表使用内部表存储,数据通过SELECT+INSERT进入内部表。 向表中装载数据(Load) 1.语法 hive> load data [local] inpath ‘/opt/module/datas/student.txt’ [overwrite] into table student [partition (partcol1=val1,…)]; (1)load data:表示加载数据 (2)local:表示从本地加载数据到hive表(复制);否则从HDFS加载数据到hive表(移动) (3)inpath:表示加载数据的路径 (4)overwrite into:表示覆盖表中已有数据,否则表示追加 (5)into table:表示加载到哪张表 (6)student:表示具体的表 (7)partition:表示上传到指定分区 算术运算符 表6-3 运算符 描述 A+B A和B 相加 A-B A减去B AB A和B 相乘 A/B A除以B A%B A对B取余 A&B A和B按位取与 A|B A和B按位取或 A^B A和B按位取异或 ~A A按位取反 比较运算符(Between/In/ Is Null) 1)下面表中描述了谓词操作符,这些操作符同样可以用于JOIN…ON和HAVING语句中。 表6-4 操作符 支持的数据类型 描述 A=B 基本数据类型 如果A等于B则返回TRUE,反之返回FALSE A<=>B 基本数据类型 如果A和B都为NULL,则返回TRUE,其他的和等号(=)操作符的结果一致,如果任一为NULL则结果为NULL A<>B, A!=B 基本数据类型 A或者B为NULL则返回NULL;如果A不等于B,则返回TRUE,反之返回FALSE A<B 基本数据类型 A或者B为NULL,则返回NULL;如果A小于B,则返回TRUE,反之返回FALSE A<=B 基本数据类型 A或者B为NULL,则返回NULL;如果A小于等于B,则返回TRUE,反之返回FALSE A>B 基本数据类型 A或者B为NULL,则返回NULL;如果A大于B,则返回TRUE,反之返回FALSE A>=B 基本数据类型 A或者B为NULL,则返回NULL;如果A大于等于B,则返回TRUE,反之返回FALSE A [NOT] BETWEEN B AND C 基本数据类型 如果A,B或者C任一为NULL,则结果为NULL。如果A的值大于等于B而且小于或等于C,则结果为TRUE,反之为FALSE。如果使用NOT关键字则可达到相反的效果。 A IS NULL 所有数据类型 如果A等于NULL,则返回TRUE,反之返回FALSE A IS NOT NULL 所有数据类型 如果A不等于NULL,则返回TRUE,反之返回FALSE IN(数值1, 数值2) 所有数据类型 使用 IN运算显示列表中的值 A [NOT] LIKE B STRING 类型 B是一个SQL下的简单正则表达式,如果A与其匹配的话,则返回TRUE;反之返回FALSE。B的表达式说明如下:‘x%’表示A必须以字母‘x’开头,‘%x’表示A必须以字母’x’结尾,而‘%x%’表示A包含有字母’x’,可以位于开头,结尾或者字符串中间。如果使用NOT关键字则可达到相反的效果。 A RLIKE B, A REGEXP B STRING 类型 B是一个正则表达式,如果A与其匹配,则返回TRUE;反之返回FALSE。匹配使用的是JDK中的正则表达式接口实现的,因为正则也依据其中的规则。例如,正则表达式必须和整个字符串A相匹配,而不是只需与其字符串匹配。 空字段赋值 1.函数说明 NVL:给值为NULL的数据赋值,它的格式是NVL( string1, replace_with)。它的功能是如果string1为NULL,则NVL函数返回replace_with的值,否则返回string1的值,如果两个参数都为NULL ,则返回NULL。 行转列 1.相关函数说明 CONCAT(string A/col, string B/col…):返回输入字符串连接后的结果,支持任意个输入字符串; CONCAT_WS(separator, str1, str2,…):它是一个特殊形式的 CONCAT()。第一个参数剩余参数间的分隔符。分隔符可以是与剩余参数一样的字符串。如果分隔符是 NULL,返回值也将为 NULL。这个函数会跳过分隔符参数后的任何 NULL 和空字符串。分隔符将被加到被连接的字符串之间; COLLECT_SET(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生array类型字段。 COLLECT_LIST(col) 不去除重复. 列转行 1.函数说明 EXPLODE(col):将hive一列中复杂的array或者map结构拆分成多行。 LATERAL VIEW 用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias 解释:用于和split, explode等UDTF一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。 窗口函数 1.相关函数说明 OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化 CURRENT ROW:当前行 n PRECEDING:往前n行数据 n FOLLOWING:往后n行数据 UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点 LAG(col,n):往前第n行数据 LEAD(col,n):往后第n行数据 NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。 select name,orderdate,cost, sum(cost) over() as sample1,–所有行相加 sum(cost) over(partition by name) as sample2,–按name分组,组内数据相加 sum(cost) over(partition by name order by orderdate) as sample3,–按name分组,组内数据累加 sum(cost) over(partition by name order by orderdate rows between UNBOUNDED PRECEDING and current row ) as sample4 ,–和sample3一样,由起点到当前行的聚合 sum(cost) over(partition by name order by orderdate rows between 1 PRECEDING and current row) as sample5, --当前行和前面一行做聚合 sum(cost) over(partition by name order by orderdate rows between 1 PRECEDING AND 1 FOLLOWING ) as sample6,–当前行和前边一行及后面一行 sum(cost) over(partition by name order by orderdate rows between current row and UNBOUNDED FOLLOWING ) as sample7 --当前行及后面所有行 from business; (1)查看顾客上次的购买时间 select name,orderdate,cost, lag(orderdate,1,‘1900-01-01’) over(partition by name order by orderdate ) as time1, lag(orderdate,2) over (partition by name order by orderdate) as time2 from business; (2)查询前20%时间的订单信息 select * from ( select name,orderdate,cost, ntile(3) over(order by orderdate) sorted from business ) t where sorted = 1 or sorted = 2; Rank 1.函数说明 RANK() 排序相同时会重复,总数不会变 DENSE_RANK()排序相同时会重复,总数会减少 ROW_NUMBER() 会根据顺序计算 函数 7.1 系统内置函数 1.查看系统自带的函数 hive> show functions; 2.显示自带的函数的用法 hive> desc function upper; 3.详细显示自带的函数的用法 hive> desc function extended upper; 7.2 自定义函数 1)Hive 自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义UDF来方便的扩展。 2)当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。 3)根据用户自定义函数类别分为以下三种: (1)UDF(User-Defined-Function) 一进一出 (2)UDAF(User-Defined Aggregation Function) 聚集函数,多进一出 类似于:count/max/min (3)UDTF(User-Defined Table-Generating Functions) 一进多出 如lateral view explore() 4)官方文档地址 https://cwiki.apache.org/confluence/display/Hive/HivePlugins 5)编程步骤: (1)继承org.apache.hadoop.hive.ql.UDF (2)需要实现evaluate函数;evaluate函数支持重载; (3)在hive的命令行窗口创建函数 a)添加jar add jar linux_jar_path b)创建function, create [temporary] function [dbname.]function_name AS class_name; (4)在hive的命令行窗口删除函数 Drop [temporary] function [if exists] [dbname.]function_name; 6)注意事项 (1)UDF必须要有返回类型,可以返回null,但是返回类型不能为void; 数据倾斜 9.4.1 合理设置Map数 1)通常情况下,作业会通过input的目录产生一个或者多个map任务。 主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。 2)是不是map数越多越好? 答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。 3)是不是保证每个map处理接近128m的文件块,就高枕无忧了? 答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。 针对上面的问题2和3,我们需要采取两种方式来解决:即减少map数和增加map数; 9.4.2 小文件进行合并 在map执行前合并小文件,减少map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。 set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 9.4.3 复杂文件增加Map数 当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。 增加map的方法为:根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。 合理设置Reduce数 1.调整reduce个数方法一 (1)每个Reduce处理的数据量默认是256MB hive.exec.reducers.bytes.per.reducer=256000000 (2)每个任务最大的reduce数,默认为1009 hive.exec.reducers.max=1009 (3)计算reducer数的公式 N=min(参数2,总输入数据量/参数1) 2.调整reduce个数方法二 在hadoop的mapred-default.xml文件中修改 设置每个job的Reduce个数 set mapreduce.job.reduces = 15; 3.reduce个数并不是越多越好 1)过多的启动和初始化reduce也会消耗时间和资源; 2)另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题; 在设置reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的reduce数;使单个reduce任务处理数据量大小要合适; 9.5 并行执行 Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。 通过设置参数hive.exec.parallel值为true,就可以开启并发执行。不过,在共享集群中,需要注意下,如果job中并行阶段增多,那么集群利用率就会增加。 set hive.exec.parallel=true;?? //打开任务并行执行 set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。 当然,得是在系统资源比较空闲的时候才有优势,否则,没资源,并行也起不来。 9.6 严格模式 防止例如笛卡尔积,例如limit 。 Hive提供了一个严格模式,可以防止用户执行那些可能意向不到的不好的影响的查询。 通过设置属性hive.mapred.mode值为默认是非严格模式nonstrict 。开启严格模式需要修改hive.mapred.mode值为strict,开启严格模式可以禁止3种类型的查询。 select e.,d.* from emp e,dept d; hive.mapred.mode strict The mode in which the Hive operations are being performed. In strict mode, some risky queries are not allowed to run. They include: Cartesian Product. No partition being picked up for a query. Comparing bigints and strings. Comparing bigints and doubles. Orderby without limit. 1)对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。 2)对于使用了order by语句的查询,要求必须使用limit语句。因为order by为了执行排序过程会将所有的结果数据分发到同一个Reducer中进行处理,强制要求用户增加这个LIMIT语句可以防止Reducer额外执行很长一段时间。 3)限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行JOIN查询的时候不使用ON语句而是使用where语句,这样关系数据库的执行优化器就可以高效地将WHERE语句转化成那个ON语句。不幸的是,Hive并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。 9.7 JVM重用 JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。 Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。 mapreduce.job.jvm.numtasks 10 How many tasks to run per jvm. If set to -1, there is no limit. 这个功能的缺点是,开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。 9.8 推测执行 在分布式集群环境下,因为程序Bug(包括Hadoop本身的bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。为了避免这种情况发生,Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。 设置开启推测执行参数:Hadoop的mapred-site.xml文件中进行配置 mapreduce.map.speculative true If true, then multiple instances of some map tasks may be executed in parallel. mapreduce.reduce.speculative true If true, then multiple instances of some reduce tasks may be executed in parallel. 不过hive本身也提供了配置项来控制reduce-side的推测执行: hive.mapred.reduce.tasks.speculative.execution true Whether speculative execution for reducers should be turned on. 关于调优这些推测执行变量,还很难给一个具体的建议。如果用户对于运行时的偏差非常敏感的话,那么可以将这些功能关闭掉。如果用户因为输入数据量很大而需要执行长时间的map或者Reduce task的话,那么启动推测执行造成的浪费是非常巨大大。
Flume
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。 Agent Agent是一个JVM进程,它以事件的形式将数据从源头送至目的,是Flume数据传输的基本单元。 相当于一个job mapreduce Agent主要有3个部分组成,Source、Channel、Sink。 1.2.2 Source Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。 source :收集的数据源: exec: 日志文件。 spooling directory :目录 netcat : ip:port 网络请求来源 avro : 上一个flume的sink下沉的数据。 1.2.3 Channel Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全(线程队列)的,可以同时处理几个Source的写入操作和几个Sink的读取操作。 Flume自带两种Channel:Memory Channel和File Channel。 Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。 File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。 1.2.4 Sink Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统Hbase、或者被发送到另一个Flume Agent。 Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件event。 Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、kafka自定义。 1.2.5 Event 传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。 agent; 事件:每运行一个flume,就会开启一个agent. source: 数据源 channel: 传输管道 sink: 下沉 event: 数据基本单元
kafka kafka是什么 消息队列,消息中间件。 1)Apache Kafka是一个开源消息系统,由Scala(斯卡拉)写成。是由Apache软件基金会开发的一个开源消息系统项目。 2)Kafka最初是由LinkedIn(猎聘网求职)(领英)公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。 3)Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic(集合)(主题)进行归类,发送消息者称为Producer(生产者)(数据源),消息接受者称为Consumer(strom,sparkStream),此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。 4)无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。(消费者要读取:offset . 数据的记录号)
Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: ?以时间复杂度为O(1)的方式提供消息持久化能力,并保证即使对TB级以上数据也能保证常数时间的访问性能 ?高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输 ?支持Kafka Server间的消息分区,及分布式消息消费,同时保证每个partition内的消息顺序传输 ?同时支持离线数据(一周)处理和实时数据处理 ? (1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除) 点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。 (2)发布/订阅模式(一对多,数据生产后,推送给所有订阅者) 发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。 为什么需要消息队列 1)解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 2)冗余(kafka特性):重复,多余,副本。 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。 3)扩展性: 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。 4)灵活性 & 峰值处理能力: 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 5)可恢复性: 系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 6)顺序保证: 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka保证一个Partition内的消息的有序性) 7)缓冲: 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。 8)异步通信: 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。 SparkStreaming。按秒处理数据。 strom,纯实时组件,按毫秒处理数据。 Kafka生产过程分析 写入方式 producer采用推(push)模式将消息发布到broker, consumer采用拉()的方式将kafkatipic中的数据进行读取分析。 每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。 分区(Partition) 消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:
我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。 1)分区的原因 (1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了; (2)可以提高并发,因为可以以Partition为单位读写了。 2)分区的原则 (1)指定了patition,则直接使用; (2)未指定patition但指定key,通过对key的value进行hash出一个patition (3)patition和key都未指定,使用轮询选出一个patition。 副本(Replication) 同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replication,而这时需要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。 写入流程 ?producer写入消息流程如下:
1)producer先从zookeeper的 "/brokers/…/state"节点找到该partition的leader 2)producer将消息发送给该leader 3)leader将消息写入本地log 4)followers从leader pull消息,写入本地log后向leader发送ACK 5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK 副本写完之后,回调给leader的一个value。 0 无论是否副本正确写入。 不等待从机回应。 kafka写入速度极快。 1 一台follower,正确写入。 all 所有的follower,正确写入。速度较慢,能保证数据的安全性。 Broker 保存消息 3.2.1 存储方式 物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件),如下: [atguigu@hadoop102 logs]$ ll drwxrwxr-x. 2 atguigu atguigu 4096 8月 6 14:37 first-0 drwxrwxr-x. 2 atguigu atguigu 4096 8月 6 14:35 first-1 drwxrwxr-x. 2 atguigu atguigu 4096 8月 6 14:37 first-2 [atguigu@hadoop102 logs]$ cd first-0 [atguigu@hadoop102 first-0]$ ll -rw-rw-r–. 1 atguigu atguigu 10485760 8月 6 14:33 00000000000000000000.index -rw-rw-r–. 1 atguigu atguigu 219 8月 6 15:07 00000000000000000000.log -rw-rw-r–. 1 atguigu atguigu 10485756 8月 6 14:33 00000000000000000000.timeindex -rw-rw-r–. 1 atguigu atguigu 8 8月 6 14:37 leader-epoch-checkpoint 3.2.2 存储策略 无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据: 1)基于时间:log.retention.hours=168 2)基于大小:log.retention.bytes=1073741824 需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。 3.2.3 Zookeeper存储结构
注意:producer不在zk中注册,消费者在zk中注册。 高级API 1)高级API优点 高级API 写起来简单 不需要自行去管理offset,系统通过zookeeper自行管理。 不需要管理分区,副本等情况,.系统自动管理。 消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset) 可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响) 2)高级API缺点 不能自行控制offset(对于某些特殊需求来说) 不能细化控制如分区、副本、zk等 3.3.2 低级API 1)低级 API 优点 能够让开发者自己控制offset,想从哪里读取就从哪里读取。 自行控制连接分区,对分区自定义进行负载均衡 对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中) 2)低级API缺点 太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。 拦截器原理 Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。 过滤敏感字段。 修改消息的格式 定制化。 对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括: (1)configure(configs) 获取配置信息和初始化数据时调用。 (2)onSend(ProducerRecord): 该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算 (3)onAcknowledgement(RecordMetadata, Exception): success 消息量。 failed 失败消息量。 该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率 (4)close: 关闭interceptor,主要用于执行一些资源清理工作 如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。 大数据技术之HBase Hbase为什么要用? mysql首先磁盘级数据库,存储和读取能力较弱,上百万数据的查询就会造成延时。 hive : hive只能做分析,不能做实时查询(即时性业务处理)处理。 hbase:PB级数据1024G=T 1024T =P 毫秒级结果。查询的速度。针对于分布式文件系统进行数据的存储和即时查询。 什么是HBase
Apache HBase是一个开源的、分布式的、版本化的、非关系型数据库,
HBase的原型是Google的BigTable论文,受到了该论文思想的启发,目前作为Hadoop google 和 hadoop 有什么关系? hadoop 源于google的论文。 Hbase特点 1)海量存储 Hbase适合存储PB级别的海量数据,在PB级别的数据以及采用廉价PC存储的情况下,能在几十到百毫秒内返回数据。这与Hbase的极易扩展性息息相关。正式因为Hbase良好的扩展性,才为海量数据的存储提供了便利。 2)列式存储 这里的列式存储其实说的是列族存储,Hbase是根据列族来存储数据的。列族下面可以有非常多的列,列族在创建表的时候就必须指定。orc 列式存储。 3)极易扩展 Hbase的扩展性主要体现在两个方面,一个是基于上层处理能力(RegionServer)的扩展,一个是基于存储的扩展(HDFS)。 通过横向添加RegionSever的机器,进行水平扩展,提升Hbase上层的处理能力,提升Hbsae服务更多Region的能力。 备注:RegionServer的作用是管理region、承接业务的访问,这个后面会详细的介绍通过横向添加Datanode的机器,进行存储层扩容,提升Hbase的数据存储能力和提升后端存储的读写能力。 4)高并发 由于目前大部分使用Hbase的架构,都是采用的廉价PC,因此单个IO的延迟其实并不小,一般在几十到上百ms之间。这里说的高并发,主要是在并发的情况下,Hbase的单个IO延迟下降并不多。能获得高并发、低延迟的服务。 5)稀疏 稀疏主要是针对Hbase列的灵活性,在列族中,你可以指定任意多的列,在列数据为空的情况下,是不会占用存储空间的。 HBase架构 Hbase架构如图1所示: 图1 HBase架构图 从图中可以看出Hbase是由Client、Zookeeper、Master、HRegionServer、HDFS等几个组件组成,下面来介绍一下几个组件的相关功能: 1)Client Client包含了访问Hbase的接口,另外Client还维护了对应的cache来加速Hbase的访问,比如cache的.META.元数据的信息。 2)Zookeeper HBase通过Zookeeper来做master的高可用、RegionServer的监控、元数据的入口以及集群配置的维护等工作。具体工作如下: 通过Zoopkeeper来保证集群中只有1个master在运行,如果master异常,会通过竞争机制产生新的master提供服务 server1 执行这个命令: hbase.demon.sh start master 通过Zoopkeeper来监控RegionServer的状态,当RegionSevrer有异常的时候,通过回调的形式通知Master RegionServer上下线的信息 通过Zoopkeeper存储元数据的统一入口地址 3)Hmaster master节点的主要职责如下: 为RegionServer分配Region 维护整个集群的负载均衡 维护集群的元数据信息 发现失效的Region,并将失效的Region分配到正常的RegionServer上 当RegionSever失效的时候,协调对应Hlog的拆分 4)HregionServer HregionServer直接对接用户的读写请求,是真正的“干活”的节点。它的功能概括如下: 管理master为其分配的Region 处理来自客户端的读写请求 负责和底层HDFS的交互,存储数据到HDFS 负责Region变大以后的拆分 mysql 百万 表:鞋。 mycat mysql mysql emp emp 负责Storefile的合并工作 5)HDFS HDFS为Hbase提供最终的底层数据存储服务,同时为HBase提供高可用(Hlog存储在HDFS)的支持,具体功能概括如下: 提供元数据和表数据的底层分布式存储服务 数据多副本,保证的高可靠和高可用性 HMaster 功能 1.监控RegionServer 2.处理RegionServer故障转移 3.处理元数据的变更 4.处理region的分配或转移 5.在空闲时间进行数据的负载均衡 6.通过Zookeeper发布自己的位置给客户端 1.3.2 RegionServer 功能 1.负责存储HBase的实际数据 2.处理分配给它的Region 3.刷新缓存到HDFS 4.维护Hlog 5.执行压缩 6.负责处理Region分片 1.2.3 其他组件 1.Write-Ahead logs HBase的修改记录,当对HBase读写数据的时候,数据不是直接写进磁盘,它会在内存中保留一段时间(时间以及数据量阈值可以设定)。但把数据保存在内存中可能有更高的概率引起数据丢失,为了解决这个问题,数据会先写在一个叫做Write-Ahead logfile的文件中,然后再写入内存中。所以在系统出现故障的时候,数据可以通过这个日志文件重建。 2.Region Hbase表的分片,HBase表会根据RowKey值被切分成不同的region存储在RegionServer中,在一个RegionServer中可以有多个不同的region。 3.Store HFile存储在Store中,一个Store对应HBase表中的一个列族。 4.MemStore 顾名思义,就是内存存储,位于内存中,用来保存当前的数据操作,所以当数据保存在WAL中之后,RegsionServer会在内存中存储键值对。 5.HFile 这是在磁盘上保存原始数据的实际的物理文件,是实际的存储文件。StoreFile是以Hfile的形式存储在HDFS的。orc 的文件格式。
HBase数据结构 4.1 RowKey 与nosql数据库们一样,RowKey是用来检索记录的主键。访问HBASE table中的行,只有三种方式: 1.通过单个RowKey访问 2.通过RowKey的range(正则) 3.全表扫描 RowKey行键 (RowKey)可以是任意字符串(最大长度是64KB,实际应用中长度一般为 10-100bytes),在HBASE内部,RowKey保存为字节数组。存储时,数据按照RowKey的字典序(byte order)排序存储。设计RowKey时,要充分排序存储这个特性,将经常一起读取的行存储放到一起。(位置相关性) 4.2 Column Family 列族:HBASE表中的每个列,都归属于某个列族。列族是表的schema的一部 分(而列不是),必须在使用表之前定义。列名都以列族作为前缀。例如 courses:history,courses:math都属于courses 这个列族。 4.3 Cell 由{rowkey, column Family:columu, version} 唯一确定的单元。cell中的数据是没有类型的,全部是字节码形式存贮。 关键字:无类型、字节码 4.4 Time Stamp HBASE 中通过rowkey和columns确定的为一个存贮单元称为cell。每个 cell都保存 着同一份数据的多个版本。版本通过时间戳来索引。时间戳的类型是 64位整型。时间戳可以由HBASE(在数据写入时自动 )赋值,此时时间戳是精确到毫秒 的当前系统时间。时间戳也可以由客户显式赋值。如果应用程序要避免数据版 本冲突,就必须自己生成具有唯一性的时间戳。每个 cell中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。 为了避免数据存在过多版本造成的的管理 (包括存贮和索引)负担,HBASE提供 了两种数据版本回收方式。一是保存数据的最后n个版本,二是保存最近一段 时间内的版本(比如最近七天)。用户可以针对每个列族进行设置。 4.5 命名空间 命名空间的结构:
- Table:表,所有的表都是命名空间的成员,即表必属于某个命名空间,如果没有指定,则在default默认的命名空间中。
- RegionServer group:一个命名空间包含了默认的RegionServer Group。
- Permission:权限,命名空间能够让我们来定义访问控制列表ACL(Access Control List)。例如,创建表,读取表,删除,更新等等操作。
- Quota:限额,可以强制一个命名空间可包含的region的数量。
第5章 HBase原理 5.1 读流程 HBase读数据流程如图3所示
图3所示 HBase读数据流程 1)Client先访问zookeeper,从meta表读取region的位置,然后读取meta表中的数据。meta中又存储了用户表的region信息; 2)根据namespace、表名和rowkey在meta表中找到对应的region信息; 3)找到这个region对应的regionserver; 4)查找对应的region; 5)先从MemStore找数据,如果没有,再到BlockCache里面读; 6)BlockCache还没有,再到StoreFile上读(为了读取的效率); 7)如果是从StoreFile里面读取的数据,不是直接返回给客户端,而是先写入BlockCache,再返回给客户端。 5.2 写流程 Hbase写流程如图2所示
图2 HBase写数据流程 1)Client向HregionServer发送写请求; 2)HregionServer将数据写到HLog(write ahead log)。为了数据的持久化和恢复; 3)HregionServer将数据写到内存(MemStore); 4)反馈Client写成功。 5.3 数据flush过程 BufferOputPutStream out.flush(); 1)当MemStore数据达到阈值(默认是128M,老版本是64M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog中的历史数据; 2)并将数据存储到HDFS中; 5.4 数据合并过程 1)当数据块达到4块,Hmaster将数据块加载到本地,进行合并; 2)当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理; 3)当HregionServer宕机后,将HregionServer上的hlog拆分,然后分配给不同的HregionServer加载,修改.META.; 4)注意:HLog会同步到HDFS。
Oozie简介 Oozie英文翻译为:驯象人。一个基于工作流引擎的开源框架,由Cloudera公司贡献给Apache,提供对Hadoop Mapreduce、Pig Jobs的任务调度与协调。Oozie需要部署到Java Servlet容器中运行。主要用于定时调度任务,多任务可以按照执行的逻辑顺序调度。 二、Oozie的功能模块介绍 2.1、模块
- Workflow
顺序执行流程节点,支持fork(分支多个节点),join(合并多个节点为一个) - Coordinator
定时触发workflow - Bundle Job
绑定多个Coordinator 2.2、常用节点 - 控制流节点(Control Flow Nodes)
控制流节点一般都是定义在工作流开始或者结束的位置,比如start,end,kill等。以及提供工作流的执行路径机制,如decision,fork,join等。 - 动作节点(Action Nodes)
负责执行具体动作的节点,比如:拷贝文件,执行某个Shell脚本等等。
Azkaban 任务调度工具:和oozie 是同样的工具类型。 为什么需要工作流调度系统 1)一个完整的数据分析系统通常都是由大量任务单元组成: shell脚本程序,java程序,mapreduce程序、hive脚本等 2)各任务单元之间存在时间先后及前后依赖关系 3)为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行; 例如,我们可能有这样一个需求,某个业务系统每天产生20G原始数据,我们每天都要对其进行处理,处理步骤如下所示: 1)通过Hadoop先将原始数据上传到HDFS上(HDFS的操作); 2)使用MapReduce对原始数据进行清洗(MapReduce的操作); 3)将清洗后的数据导入到hive表中(hive的导入操作); 4)对Hive中多个表的数据进行JOIN处理,得到一张hive的明细表(创建中间表); 5)通过对明细表的统计和分析,得到结果报表信息(hive的查询操作); 如果你每天都干这样的操作。那么你的5年工作经验,就是一个月经验干5年
原始数据-》HDFS–>mapreduce–》hive–> spark->结果报表。
Azkaban的适用场景 根据以上业务场景: (2)任务依赖(1)任务的结果,(3)任务依赖(2)任务的结果,(4)任务依赖(3)任务的结果,(5)任务依赖(4)任务的结果。一般的做法是,先执行完(1)再执行(2),再一次执行(3)(4)(5)。 这样的话,整个的执行过程都需要人工参加,并且得盯着各任务的进度。但是我们的很多任务都是在深更半夜执行的,通过写脚本设置crontab执行。其实,整个过程类似于一个有向无环图(DAG)。每个子任务相当于大任务中的一个节点,也就是,我们需要的就是一个工作流的调度器,而Azkaban就是能解决上述问题的一个调度器。
1.3 什么是azkaban Azkaban是由Linkedin公司推出的一个批量工作流任务调度器,主要用于在一个工作流内以一个特定的顺序运行一组工作和流程,它的配置是通过简单的key:value对的方式,通过配置中的dependencies 来设置依赖关系。Azkaban使用job配置文件建立任务之间的依赖关系,并提供一个易于使用的web用户界面维护和跟踪你的工作流。 1.4 Azkaban特点 1)兼容任何版本的hadoop 2)易于使用的Web用户界面 3)简单的工作流的上传(你自己写一个工作流的job文件。) 4)方便设置任务之间的关系 5)调度工作流 6)模块化和可插拔的插件机制 7)认证/授权(权限的工作) 8)能够杀死并重新启动工作流 9)有关失败和成功的电子邮件提醒(只支持163邮箱) 1.5 常见工作流调度系统 1)简单的任务调度:直接使用crontab实现; 2)复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如ooize、azkaban等 单一job案例 1)创建job描述文件 [atguigu@hadoop102 jobs]$ vim first.job #first.job type=command command=echo ‘this is my first job’ 2) 将job资源文件打包成zip文件 [atguigu@hadoop102 jobs]$ zip first.zip first.job adding: first.job (deflated 15%) [atguigu@hadoop102 jobs]$ ll 总用量 8 -rw-rw-r–. 1 atguigu atguigu 60 10月 18 17:42 first.job -rw-rw-r–. 1 atguigu atguigu 219 10月 18 17:43 first.zip 注意: 目前,Azkaban上传的工作流文件只支持xxx.zip文件。zip应包含xxx.job运行作业所需的文件和任何文件(文件名后缀必须以.job结尾,否则无法识别)。作业名称在项目中必须是唯一的。 3)通过azkaban的web管理平台创建project并上传job的zip包 首先创建project
上传zip包
4)启动执行该job
点击执行工作流
点击继续
5)Job执行成功
6)点击查看job日志
3.2多job工作流案例 1)创建有依赖关系的多个job描述 第一个job:start.job [atguigu@hadoop102 jobs]$ vim start.job #start.job type=command command=touch /opt/module/kangkang.txt 第二个job:step1.job依赖start.job [atguigu@hadoop102 jobs]$ vim step1.job #step1.job type=command dependencies=start command=echo “this is step1 job” 第三个job:step2.job依赖start.job [atguigu@hadoop102 jobs]$ vim step2.job #step2.job type=command dependencies=start command=echo “this is step2 job” 第四个job:finish.job依赖step1.job和step2.job [atguigu@hadoop102 jobs]$ vim finish.job #finish.job type=command dependencies=step1,step2 command=echo “this is finish job” 2)将所有job资源文件打到一个zip包中 [atguigu@hadoop102 jobs]$ zip jobs.zip start.job step1.job step2.job finish.job updating: start.job (deflated 16%) adding: step1.job (deflated 12%) adding: step2.job (deflated 12%) adding: finish.job (deflated 14%) 3)在azkaban的web管理界面创建工程并上传zip包
5)启动工作流flow
6)查看结果
思考: 将student.txt文件上传到hdfs,根据所传文件创建外部表,再将表中查询到的结果写入到本地文件
Sqoop简介 Apache Sqoop?是一种旨在有效地在Apache Hadoop和诸如关系数据库等结构化数据存储之间传输大量数据的工具。数据迁移工具. Sqoop于2012年3月孵化出来,现在是一个顶级的Apache项目。 Sqoop原理 将导入或导出命令翻译成mapreduce程序来实现。 在翻译出的mapreduce中主要是对inputformat和outputformat进行定制。 Sqoop的简单使用案例 4.1、导入数据 sqoop 做数据迁移: 数据导入, 数据导出 import : 从关系型数据库导入到HDFS HIVE HBASE export : 从 HDFS HIVE 导出到mysql 里. 与命令的顺序无关. 在Sqoop中,“导入”概念指:从非大数据集群(RDBMS)向大数据集群(HDFS,HIVE,HBASE)中传输数据,叫做:导入,即使用import关键字。 4.1.1、RDBMS到HDFS
-
确定Mysql服务开启正常 -
在Mysql中新建一张表并插入一些数据 $ mysql -uroot -p123456 mysql> create database company; mysql> create table company.staff(id int(4) primary key not null auto_increment, name varchar(255), sex varchar(255)); mysql> insert into company.staff(name, sex) values(‘Thomas’, ‘Male’); mysql> insert into company.staff(name, sex) values(‘Catalina’, ‘FeMale’); -
导入数据 (1)全部导入 $ bin/sqoop import –connect jdbc:mysql://master:3306/company –username root –password 123456 –table staff –target-dir /user/company –delete-target-dir –num-mappers 1 –fields-terminated-by “\t” (2)查询导入 $ bin/sqoop import –connect jdbc:mysql://master:3306/company –username root –password 123456 –target-dir /user/company –delete-target-dir –num-mappers 1 –fields-terminated-by “\t” –query ‘select name,sex from staff where id <=1 and
C
O
N
D
I
T
I
O
N
S
;
′
尖
叫
提
示
:
m
u
s
t
c
o
n
t
a
i
n
′
CONDITIONS;' 尖叫提示:must contain '
CONDITIONS;′尖叫提示:mustcontain′CONDITIONS’ in WHERE clause. 尖叫提示:如果query后使用的是双引号,则$CONDITIONS前必须加转移符,防止shell识别为自己的变量。 尖叫提示:–query选项,不能同时与–table选项使用 (3)导入指定列 $ bin/sqoop import –connect jdbc:mysql://master:3306/company –username root –password 123456 –target-dir /user/company –delete-target-dir –num-mappers 1 –fields-terminated-by “\t” –columns id,sex –table staff 尖叫提示:columns中如果涉及到多列,用逗号分隔,分隔时不要添加空格 (4)使用sqoop关键字筛选查询导入数据 $ bin/sqoop import –connect jdbc:mysql://master:3306/stu –username root –password 123456 –target-dir /user/company –delete-target-dir –num-mappers 1 –fields-terminated-by “\t” –table staff –where “id=1” 尖叫提示:在Sqoop中可以使用sqoop import -D property.name=property.value这样的方式加入执行任务的参数,多个参数用空格隔开。 4.1.2、RDBMS到Hive $ bin/sqoop import –connect jdbc:mysql://master:3306/company –username root –password 123456 –table staff –num-mappers 1 –hive-import –fields-terminated-by “\t” –hive-overwrite –hive-table staff_hive 尖叫提示:该过程分为两步,第一步将数据导入到HDFS,第二步将导入到HDFS的数据迁移到Hive仓库 尖叫提示:第一步默认的临时目录是/user/admin/表名 4.2、导出数据 在Sqoop中,“导出”概念指:从大数据集群(HDFS,HIVE,HBASE)向非大数据集群(RDBMS)中传输数据,叫做:导出,即使用export关键字。 4.2.1、HIVE/HDFS到RDBMS $ bin/sqoop export –connect jdbc:mysql://master:3306/company –username root –password 123456 –table staff –num-mappers 1 –export-dir /user/hive/warehouse/staff_hive –input-fields-terminated-by “\t” 尖叫提示:Mysql中如果表不存在,不会自动创建 思考:数据是覆盖还是追加 4.3、脚本打包 使用opt格式的文件打包sqoop命令,然后执行 -
创建一个.opt文件 $ mkdir opt $ touch opt/job_HDFS2RDBMS.opt -
编写sqoop脚本 $ vi opt/job_HDFS2RDBMS.opt
export –connect jdbc:mysql://master:3306/company –username root –password 123456 –table staff –num-mappers 1 –export-dir /user/hive/warehouse/staff_hive –input-fields-terminated-by “\t” 3) 执行该脚本 $ bin/sqoop --options-file opt/job_HDFS2RDBMS.opt CM loudera manager的概念 简单来说,Cloudera Manager是一个拥有集群自动化安装、中心化管理、集群监控、报警功能的一个工具(软件),使得安装集群从几天的时间缩短在几个小时内,运维人员从数十人降低到几人以内,极大的提高集群管理的效率。
1.2 cloudera manager的功能 1)管理:对集群进行管理,如添加、删除节点等操作。 2)监控:监控集群的健康情况,对设置的各种指标和系统运行情况进行全面监控。 3)诊断:对集群出现的问题进行诊断,对出现的问题给出建议解决方案。 4)集成:多组件进行整合。
1.3 cloudera manager的架构
1)Server:负责软件安装、配置,启动和停止服务,管理服务运行的群集。 2)Agent:安装在每台主机上。负责启动和停止的过程,配置,监控主机。 3)Management Service:由一组执行各种监控,警报和报告功能角色的服务。 4)Database:存储配置和监视信息。 5)Cloudera Repository:软件由Cloudera 管理分布存储库。(有点类似Maven的中心仓库) 6)Clients:是用于与服务器进行交互的接口(API和Admin Console) Kylin Kylin定义:麒麟 Apache Kylin是一个开源的分布式分析引擎,提供Hadoop/Spark之上的SQL查询接口及多维分析(OLAP)能力以支持超大规模数据,最初由eBay开发并贡献至开源社区。它能在亚秒内查询巨大的Hive表。 1.2 Kylin架构
1)REST Server REST Server是一套面向应用程序开发的入口点,旨在实现针对Kylin平台的应用开发工作。 此类应用程序可以提供查询、获取结果、触发cube构建任务、获取元数据以及获取用户权限等等。另外可以通过Restful接口实现SQL查询。 2)查询引擎(Query Engine) 当cube准备就绪后,查询引擎就能够获取并解析用户查询。它随后会与系统中的其它组件进行交互,从而向用户返回对应的结果。? 3)路由器(Routing) 在最初设计时曾考虑过将Kylin不能执行的查询引导去Hive中继续执行,但在实践后发现Hive与Kylin的速度差异过大,导致用户无法对查询的速度有一致的期望,很可能大多数查询几秒内就返回结果了,而有些查询则要等几分钟到几十分钟,因此体验非常糟糕。最后这个路由功能在发行版中默认关闭。 4)元数据管理工具(Metadata) Kylin是一款元数据驱动型应用程序。元数据管理工具是一大关键性组件,用于对保存在Kylin当中的所有元数据进行管理,其中包括最为重要的cube元数据。其它全部组件的正常运作都需以元数据管理工具为基础。 Kylin的元数据存储在hbase中。? 5)任务引擎(Cube Build Engine) 这套引擎的设计目的在于处理所有离线任务,其中包括shell脚本、Java API以及Map Reduce任务等等。任务引擎对Kylin当中的全部任务加以管理与协调,从而确保每一项任务都能得到切实执行并解决其间出现的故障。 1.3 Kylin特点 Kylin的主要特点包括支持SQL接口、支持超大规模数据集、亚秒级响应、可伸缩性、高吞吐率、BI工具集成等。 1)标准SQL接口:Kylin是以标准的SQL作为对外服务的接口。 2)支持超大数据集:Kylin对于大数据的支撑能力可能是目前所有技术中最为领先的。早在2015年eBay的生产环境中就能支持百亿记录的秒级查询,之后在移动的应用场景中又有了千亿记录秒级查询的案例。 3)亚秒级响应:Kylin拥有优异的查询相应速度,这点得益于预计算,很多复杂的计算,比如连接、聚合,在离线的预计算过程中就已经完成,这大大降低了查询时刻所需的计算量,提高了响应速度。 4)可伸缩性和高吞吐率:单节点Kylin可实现每秒70个查询,还可以搭建Kylin的集群。 5)BI工具集成 Kylin可以与现有的BI工具集成,具体包括如下内容。 ODBC:与Tableau、Excel、PowerBI等工具集成 JDBC:与Saiku、BIRT等Java工具集成 RestAPI:与JavaScript、Web网页集成 Kylin开发团队还贡献了Zepplin的插件,也可以使用Zepplin来访问Kylin服务。 Impala 什么是Impala Cloudera公司推出,提供对HDFS、Hbase数据的高性能、低延迟的交互式SQL查询功能。 基于Hive,使用内存计算,兼顾数据仓库、具有实时、批处理、多并发等优点。 是CDH平台首选的PB级大数据实时查询分析引擎。
1.2 Impala的优缺点 1.2.1 优点 1)基于内存运算,不需要把中间结果写入磁盘,省掉了大量的I/O开销。 2)无需转换为Mapreduce,直接访问存储在HDFS,HBase中的数据进行作业调度,速度快。 3)使用了支持Data locality的I/O调度机制,尽可能地将数据和计算分配在同一台机器上进行,减少了网络开销。 4)支持各种文件格式,如TEXTFILE?、SEQUENCEFILE 、RCFile、Parquet。 5)可以访问hive的metastore,对hive数据直接做数据分析。
1.2.2 缺点 1)对内存的依赖大,且完全依赖于hive。 2)实践中,分区超过1万,性能严重下降。 3)只能读取文本文件,而不能直接读取自定义二进制文件。 4)每当新的记录/文件被添加到HDFS中的数据目录时,该表需要被刷新。 1.3 Impala的架构
从上图可以看出,Impala自身包含三个模块:Impalad、Statestore和Catalog,除此之外它还依赖Hive Metastore和HDFS。 1)Impalad: 接收client的请求、Query执行并返回给中心协调节点; 子节点上的守护进程,负责向statestore保持通信,汇报工作。 2)Catalog: 分发表的元数据信息到各个impalad中; 接收来自statestore的所有请求。 3)Statestore: 负责收集分布在集群中各个impalad进程的资源信息、各节点健康状况,同步节点信息; 负责query的协调调度。 Impala的外部shell 选项 描述 -h, --help 显示帮助信息 -v or --version 显示版本信息 -i hostname, --impalad=hostname 指定连接运行 impalad 守护进程的主机。默认端口是 21000。 -q query, --query=query 从命令行中传递一个shell 命令。执行完这一语句后 shell 会立即退出。 -f query_file, --query_file= query_file 传递一个文件中的 SQL 查询。文件内容必须以分号分隔 -o?filename?or --output_file?filename 保存所有查询结果到指定的文件。通常用于保存在命令行使用 -q 选项执行单个查询时的查询结果。 -c 查询执行失败时继续执行 -d?default_db?or --database=default_db 指定启动后使用的数据库,与建立连接后使用use语句选择数据库作用相同,如果没有指定,那么使用default数据库 -r or --refresh_after_connect 建立连接后刷新 Impala 元数据 -p, --show_profiles 对 shell 中执行的每一个查询,显示其查询执行计划? -B(–delimited) 去格式化输出 –output_delimiter=character 指定分隔符 –print_header 打印列名 1.连接指定hadoop103的impala主机 [root@hadoop102 datas]# impala-shell -i hadoop103 2.使用-q查询表中数据,并将数据写入文件中 [hdfs@hadoop103 ~]$ impala-shell -q ‘select * from student’ -o output.txt 3.查询执行失败时继续执行 [hdfs@hadoop103 ~]$ vim impala.sql select * from student; select * from stu; select * from student; [hdfs@hadoop103 ~]$ impala-shell -f impala.sql; [hdfs@hadoop103 ~]$ impala-shell -c -f impala.sql; 4.在hive中创建表后,使用-r刷新元数据 hive> create table stu(id int, name string); [hadoop103:21000] > show tables; Query: show tables ±--------+ | name | ±--------+ | student | ±--------+ [hdfs@hadoop103 ~]$ impala-shell -r [hadoop103:21000] > show tables; Query: show tables ±--------+ | name | ±--------+ | stu | | student | ±--------+ 5.显示查询执行计划 [hdfs@hadoop103 ~]$ impala-shell -p [hadoop103:21000] > select * from student; 6.去格式化输出 [root@hadoop103 ~]# impala-shell -q ‘select * from student’ -B --output_delimiter="\t" -o output.txt [root@hadoop103 ~]# cat output.txt 1001 tignitgn 1002 yuanyuan 1003 haohao 1004 yunyun 3.2 Impala的内部shell 选项 描述 help 显示帮助信息 explain 显示执行计划 profile (查询完成后执行) 查询最近一次查询的底层信息 shell 不退出impala-shell执行shell命令 version 显示版本信息(同于impala-shell -v) connect 连接impalad主机,默认端口21000(同于impala-shell -i) refresh 增量刷新元数据库 invalidate metadata 全量刷新元数据库(慎用)(同于 impala-shell -r) history 历史命令 1.查看执行计划 explain select * from student; 2.查询最近一次查询的底层信息 [hadoop103:21000] > select count() from student; [hadoop103:21000] > profile; 3.查看hdfs及linux文件系统 [hadoop103:21000] > shell hadoop fs -ls /; [hadoop103:21000] > shell ls -al ./; 4.刷新指定表的元数据 hive> load data local inpath ‘/opt/module/datas/student.txt’ into table student; [hadoop103:21000] > select * from student; [hadoop103:21000] > refresh student; [hadoop103:21000] > select * from student; 5.查看历史命令 [hadoop103:21000] > history; Impala的数据类型 Hive数据类型 Impala数据类型 长度 TINYINT TINYINT 1byte有符号整数 SMALINT SMALINT 2byte有符号整数 INT INT 4byte有符号整数 BIGINT BIGINT 8byte有符号整数 BOOLEAN BOOLEAN 布尔类型,true或者false FLOAT FLOAT 单精度浮点数 DOUBLE DOUBLE 双精度浮点数 STRING STRING 字符系列。可以指定字符集。可以使用单引号或者双引号。 TIMESTAMP TIMESTAMP 时间类型 BINARY 不支持 字节数组 注意:Impala虽然支持array,map,struct复杂数据类型,但是支持并不完全,一般处理方法,将复杂类型转化为基本类型,通过hive创建表。 HUE HUE=Hadoop User Experience(Hadoop用户体验),直白来说就一个开源的Apache Hadoop UI系统,由Cloudera Desktop演化而来,最后Cloudera公司将其贡献给Apache基金会的Hadoop社区,它是基于Python Web框架Django实现的。通过使用HUE我们可以在浏览器端的Web控制台上与Hadoop集群进行交互来分析处理数据。 HUE与其他框架的集成 3.1、HUE与HDFS 3.1.1、梳理集群环境 hadoop01 hadoop02 hadoop03 hadoop04 oozie zk zk zk hbase hbase hbase Hadoop Hadoop Hadoop 3.1.2、配置HDFS 修改:hdfs-site.xml 属性:dfs.webhdfs.enabled 属性值:true 解释:Enable WebHDFS (REST API) in Namenodes and Datanodes. 修改:core-site.xml 添加 hadoop.proxyuser.hue.hosts hadoop.proxyuser.hue.groups *
#设置代理用户 hadoop.proxyuser.huaqiang.hosts hadoop.proxyuser.huaqiang.groups 如果你的Hadoop配置了高可用,则必须通过httpfs来访问,需要添加如下属性,反则则不必须。(如果HUE服务与Hadoop服务不在同一节点,则必须配置) hadoop.proxyuser.httpfs.hosts hadoop.proxyuser.httpfs.groups 区别:WebHDFS是HDFS内置的组件,已经运行于NameNode和DataNode中。对HDFS文件的读写,将会重定向到文件所在的DataNode,并且会完全利用HDFS的带宽。HttpFS是独立于HDFS的一个服务。对HDFS文件的读写,将会通过它进行中转,它能限制带宽占用。 修改:httpfs-site.xml httpfs.proxyuser.hue.hosts httpfs.proxyuser.hue.groups 解释:以上两个属性主要用于HUE服务与Hadoop服务不在同一台节点上所必须的配置。 提示:
-
如果没有配置NameNode的HA,HUE可以用WebHDFS来管理HDFS -
如果配置了NameNodeHA,则HUE只可用HttpFS来管理HDFS 3.1.3、scp同步配置 //打包 $ tar cvf conf.tar.gz hadoop/ //发送 $ scp -r conf.tar.gz hadoop@192.168.56.87:/home/hadoop/apps/hadoop26/etc/ $ scp -r conf.tar.gz hadoop@192.168.56.87:/home/hadoop/apps/hadoop26/etc/ 3.1.4、启动httpfs服务 $ /home/hadoop/apps/hadoop26/sbin/httpfs.sh start 3.1.5、配置hue.ini文件 找到[hadoop]标签 [[hdfs_clusters]] HA support by using HttpFs [[[default]]] # Enter the filesystem uri ##HDFS服务器地址 fs_defaultfs=hdfs://192.168.56.86:8020 # fs_defaultfs=hdfs://mycluster # NameNode logical name.
# 如果开启了高可用,需要配置如下
## logical_name=mycluster
# Use WebHdfs/HttpFs as the communication mechanism.
# Domain should be the NameNode or HttpFs host.
# Default port is 14000 for HttpFs.
## webhdfs_url=http://localhost:50070/webhdfs/v1
##向HDFS发送命令的请求地址
webhdfs_url=http://192.168.56.86:14000/webhdfs/v1
# Change this if your HDFS cluster is Kerberos-secured
## security_enabled=false
# Default umask for file and directory creation, specified in an octal value.
## umask=022
# Directory of the Hadoop configuration
## hadoop_conf_dir=$HADOOP_CONF_DIR when set or '/etc/hadoop/conf'
#HADOOP的一些配置
hadoop_conf_dir=/home/hadoop/apps/hadoop26/etc/hadoop
hadoop_hdfs_home=/home/hadoop/apps/hadoop26
hadoop_bin=/home/hadoop/apps/hadoop26/bin
3.1.6、测试 开启HUE服务: $ build/env/bin/supervisor 打开HUE的页面,进行HDFS管理。
提示1(不修改也可以): 如果提示错误根目录应该归属于hdfs,如下图:
请修改python变量,位置如下 cd desktop/libs/hadoop/src/hadoop/fs/ vi webhdfs.py 修改其中的变量值为: DEFAULT_HDFS_SUPERUSER = ‘hadoop’ #设置的hue超级用户 然后重启HUE服务即可。 提示2: 启动HUE服务时,请先kill掉之前的HUE服务,如果提示地址被占用,请使用如下命令查看占用8888端口的进程并kill掉: $ netstat -tunlp | grep 8888
3.2、HUE与YARN 3.2.1、配置hue.ini 找到[[yarn_clusters]]标签,涉及修改配置如下: [[yarn_clusters]] [[[default]]] #yarn服务的配置 resourcemanager_host=192.168.56.86 resourcemanager_port=8032 #是否将作业提交到此群集,并监控作业执行情况 submit_to=True #logical_name=cluster-yarn1(如果开高可用的话) #配置yarn资源管理的访问入口 resourcemanager_api_url=http://192.168.56.86:8088 proxy_api_url=http://192.168.56.86:8088 #历史服务器管理的入口,查看作业的历史运行情况 history_server_api_url=http://192.168.56.87:19888 Hadoop启动jobhistoryserver来实现web查看作业的历史运行情况:./mr-jobhistory-daemon.sh start historyserver 3.2.2、重启HUE测试 $ build/env/bin/supervisor //测试 ./bin/yarn jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.1.jar wordcount /input/ /output
3.3、HUE与Hive 3.3.1、修改Hive配置文件apache-hive-2.3.3-bin/conf/hive-site.xml HUE与hive集成需要hive开启HiveServer2服务,相关配置如下: 属性:hive.server2.thrift.port 属性值:10000 描述:?TCP 的监听端口
属性:hive.server2.thrift.bind.host 属性值:192.168.56.88 描述:TCP绑定的主机
属性:hive.server2.long.polling.timeout 属性值:5000 描述:HiveServer2在响应使用长轮询的异步调用之前等待的时间(毫秒)
属性:hive.metastore.uris 属性值:thrift://192.168.56.88:9083 描述:指向的是运行metastore服务的主机 3.3.2、启动Hive相关服务 $ bin/hive --service metastore & $ bin/hive --service hiveserver2 & 提示1:如果设置了uris,在今后使用Hive时,那么必须启动如上两个命令,否则Hive无法正常启动。 提示2:如果如法正确启动,请尝试kill -9 RunJar ,在重新启动 3.3.3、配置hue.ini 找到[beeswax]属性标签,涉及修改如下: [beeswax] hive_server_host=192.168.56.88 hive_server_port=10000 hive_conf_dir=/home/hadoop/apps/apache-hive-2.3.3-bin/conf 3.3.4、重启HUE测试 $ build/env/bin/supervisor
3.4、HUE与Mysql 3.4.1、配置hue.ini 找到[[[mysql]]]标签,涉及修改如下: [[[mysql]]] nice_name=db_mysql engine=mysql host=192.168.56.88 port=3306 user=root password=123456 3.4.2、重启hue.ini测试 启动后即可测试是否成功连接Mysql服务,并且测试是否可以看到数据 $ build/env/bin/supervisor
3.5.3、重启HUE测试 $ build/env/bin/supervisor 提示:如果提示无法关联oozie的share/lib,请使用hdfs命令创建该目录即可: $ bin/hdfs dfs -mkdir -p /user/oozie/share/lib
3.5、HUE与Zookeeper 3.7.1、配置hue.ini 找到[zookeeper]标签,涉及修改如下: [zookeeper] [[clusters]] [[[default]]] host_ports=192.168.56.86:2181,192.168.56.87:2181,192.168.56.88:2181 3.7.2、启动zk测试 启动: zkServer.sh start 状态: zkServer.sh status 3.7.3、重启HUE测试 $ build/env/bin/supervisor
3.6、HUE与HBase(先启动zk) 3.6.1、修改hue.ini配置 找到[hbase]标签,涉及修改内容如下: [hbase] hbase_clusters=(Cluster|192.168.56.87:9090) hbase_conf_dir=/home/hadoop/apps/hbase/conf 3.6.2、启动HBase服务和Hbase的thrift服务 启动: $ bin/start-hbase.sh $ bin/hbase-daemon.sh start thrift 3.6.3、重启HUE进行测试 $ build/env/bin/supervisor 3.7、HUE与Oozie 3.5.1、配置hue.ini 找到[liboozie]标签以及[oozie]标签涉及修改如下: [liboozie]: [liboozie] #运行Oozie服务的URL。 oozie_url=http://192.168.56.87:11000/oozie #HDFS上的位置,提交时部署工作流。 remote_deployement_dir=/user/hadoop/oozie-apps [oozie]: [oozie] #本地存储oozie示例的位置。 local_data_dir=/home/hadoop/apps/oozie-4.0.0-cdh5.3.6/examples #本地存储oozie示例数据的位置。 sample_data_dir=/home/hadoop/apps/oozie-4.0.0-cdh5.3.6/oozie-apps #HDFS上存储oozie示例和工作流的位置。 remote_data_dir=/user/hadoop/oozie-apps #启用Cron来定义协调器的频率替换旧的频率数/单位。 enable_cron_scheduling=true 3.5.2、启动Oozie相关服务 $ bin/oozied.sh start
四、总结 在此我们总结一下集成HUE时,我们开启的后台服务项 4.1、Hadoop(hdfs和yarn) 启动: $ ./bin/start-hdfs.sh $ ./bin/start-hdfs.sh 或 $ ./bin/start-all.sh 注意: $ ~/modules/cdh/hadoop-2.5.0-cdh5.3.6/sbin/httpfs.sh start
4.2、Hive 启动: $ ~/modules/cdh/hive-0.13.1-cdh5.3.6/bin/hive --service metastore $ ~/modules/cdh/hive-0.13.1-cdh5.3.6/bin/hive --service hiveserver2 或 $ ./bin/hive 4.3、zk启动 启动: zkServer.sh start 状态: zkServer.sh status 4.4、HBase 启动: bin/start-hbase.sh 注意:在192.168.56.87 thrift $ bin/hbase-daemon.sh start thrift &
4.5、Oozie 启动: $ bin/oozied.sh start & 注意: 先启动hadoop 和zookpeer Scala 1.数据类型
2.变量和常量的声明 ?定义变量或者常量的时候,也可以写上返回的类型,一般省略,如:val a:Int = 10 ?常量不可再赋值
- /**
-
-
-
- */
- var name = “zhangsan”
- println(name)
- name =“lisi”
- println(name)
- val gender = “m”
- // gender = “m”//错误,不能给常量再赋值
3.类和对象 ?创建类
- class Person{
- val name = “zhangsan”
- val age = 18
- def sayName() = {
- "my name is "+ name
- }
7.} ?创建对象 - object Lesson_Class {
- def main(args: Array[String]): Unit = {
- val person = new Person()
- println(person.age);
- println(person.sayName())
- }
7.}
?对象中的apply方法 object中不可以传参,当创建一个object时,如果传入参数,那么会自动寻找object中的相应参数个数的apply方法。 1./** 2.* Object 相当于单例对象,不能传参 3.* 如果在创建Object时传入参数,那么运行代码时会自动寻找Object中相应参数个数的apply方法 4.*/ 5.object LessonObj { 6. def apply(name:String) = { 7. println(s"name is $name") 8. } 9. def apply(name:String,age:Int) = { 10. println(s"name is $name,age is $age") 11. } 12. def main(args: Array[String]): Unit = { 13. LessonObj(“zhangsan”) 14. LessonObj(“zhangsan”,18) 15. } 16.}
?伴生类和伴生对象 1.class Person(xname :String , xage :Int){ 2. var name = Person.name 3. val age = xage 4. var gender = “m” 5. def this(name:String,age:Int,g:String){ 6. this(name,age) 7. gender = g 8. } 9. 10. def sayName() = { 11. "my name is "+ name 12. } 13. 14.} 15. 16.object Person { 17. val name = “zhangsanfeng” 18. 19. def main(args: Array[String]): Unit = { 20. val person = new Person(“wagnwu”,10,“f”) 21. println(person.age); 22. println(person.sayName()) 23. println(person.gender) 24. } 25.}
注意点: ?建议类名首字母大写 ,方法首字母小写,类和方法命名建议符合驼峰命名法。 ?scala 中的object是单例对象,相当于java中的工具类,可以看成是定义静态的方法的类。object不可以传参数。另:Trait不可以传参数 ?scala中的class类默认可以传参数,默认的传参数就是默认的构造函数。 重写构造函数的时候,必须要调用默认的构造函数。 ?class 类属性自带getter ,setter方法。 ?使用object时,不用new,使用class时要new ,并且new的时候,class中除了方法不执行,其他都执行。 ?如果在同一个文件中,object对象和class类的名称相同,则这个对象就是这个类的伴生对象,这个类就是这个对象的伴生类。可以互相访问私有变量。 ?方法定义语法 用def来定义 ?可以定义传入的参数,要指定传入参数的类型 ?方法可以写返回值的类型也可以不写,会自动推断,有时候不能省略,必须写,比如在递归方法中或者方法的返回值是函数类型的时候。 ?scala中方法有返回值时,可以写return,也可以不写return,会把方法中最后一行当做结果返回。当写return时,必须要写方法的返回值。 ?如果返回值可以一行搞定,可以将{}省略不写 ?传递给方法的参数可以在方法中使用,并且scala规定方法的传过来的参数为val的,不是var的。 ?如果去掉方法体前面的等号,那么这个方法返回类型必定是Unit的。这种说法无论方法体里面什么逻辑都成立,scala可以把任意类型转换为Unit.假设,里面的逻辑最后返回了一个string,那么这个返回值会被转换成Unit,并且值会被丢弃。 1.参数有默认值的方法 ?默认值的函数中,如果传入的参数个数与函数定义相同,则传入的数值会覆盖默认值。 ?如果不想覆盖默认值,传入的参数个数小于定义的函数的参数,则需要指定参数名称。 2.匿名函数 1.有参匿名函数 2.无参匿名函数 3.有返回值的匿名函数 ?可以将匿名函数返回给val定义的值 3.高阶函数 函数的参数是函数,或者函数的返回类型是函数,或者函数的参数和函数的返回类型是函数的函数。 ?函数的参数是函数 ?函数的返回是函数 ?函数的参数和函数的返回是函数 4.柯里化函数 ?高阶函数的简化 1./** 2.* 柯里化函数 3.*/ 4.def fun7(a :Int,b:Int)(c:Int,d:Int) = { 5. a+b+c+d 6.} 7.println(fun7(1,2)(3,4)) Spark Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。项目是用Scala进行编写。 目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark?Streaming、GraphX、MLib、SparkR等子项目,Spark是基于内存计算的大数据并行计算框架。
spark中的RDD是什么,有哪些特性?
答:RDD(Resilient Distributed Dataset)叫做分布式数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可以并行计算的集合
Dataset:就是一个集合,用于存放数据的
Destributed:分布式,可以并行在集群计算
Resilient:表示弹性的,弹性表示
1.RDD中的数据可以存储在内存或者磁盘中;
2.RDD中的分区是可以改变的;
五大特性:
1.A list of partitions:一个分区列表,RDD中的数据都存储在一个分区列表中
2.A function for computing each split:作用在每一个分区中的函数
3.A list of dependencies on other RDDs:一个RDD依赖于其他多个RDD,这个点很重要,RDD的容错机制就是依据这个特性而来的
4.Optionally,a Partitioner for key-value RDDs(eg:to say that the RDD is hash-partitioned):可选的,针对于kv类型的RDD才有这个特性,作用是决定了数据的来源以及数据处理后的去向
5.可选项,数据本地性,数据位置最优 RDD特点 RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
RDD的弹性 1)自动进行内存和磁盘数据存储的切换 存储弹性。 Spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换 2)基于血统的高效容错机制 (过程弹性) 在RDD进行转换和动作的时候,会形成RDD的Lineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来重新生成丢失的RDD数据。 3)Task如果失败会自动进行特定次数的重试 RDD的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。 4)Stage如果失败会自动进行特定次数的重试(是否执行shuffle ) 如果Job的某个Stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4次。 5)Checkpoint和Persist可主动或被动触发 RDD可以通过Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也可以将RDD进行检查点,检查点会将数据存储在HDFS中,该RDD的所有父RDD依赖都会被移除。 6)数据调度弹性 Spark把这个JOB执行模型抽象为通用的有向无环图DAG,可以将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。 7)数据分片的高度弹性 可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。 RDD全称叫做弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他RDD转换而创建,为此,RDD支持丰富的转换操作(如map, join, filter, groupBy等),通过这种转换操作,新的RDD则包含了如何从其他RDDs衍生所必需的信息,所以说RDDs之间是有依赖关系的。基于RDDs之间的依赖,RDDs会形成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区,总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG,然后写回稳定存储。另外RDD还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。可以说Spark最初也就是实现RDD的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统,简单来讲,Spark-RDD的关系类似于Hadoop-MapReduce关系。 RDD的缓存 Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存(cache),是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果 希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。(监测点:checkpoint) RDD缓存方式 RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空 间中。 但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。 RDD检查点机制 Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。 cache 和 checkpoint 是有显著区别的, ?缓存把 RDD 计算出来然后放在内存中,但是RDD 的依赖链(相当于数据库中的redo 日志), 也不能丢掉, 当某个点某个 executor 宕了,上面cache 的RDD就会丢掉, 需要通过 依赖链重放计算出来, 不同的是, checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存储,所以依赖链就可以丢掉了,就斩断了依赖链, 是通过复制实现的高容错。 如果存在以下场景,则比较适合使用检查点机制: 1)DAG中的Lineage过长,如果重算,则开销太大(如在PageRank中)。 2)在宽依赖上做Checkpoint获得的收益更大。 为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。 窄依赖 窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女 2.6.2 宽依赖 宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle,总结:宽依赖我们形象的比喻为超生 DAG的生成 DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。 累加器 累加器用来对信息进行聚合,通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱 动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本, 更新这些副本的值也不会影响驱动器中的对应变量。 如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。 自定义累加器 自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。实现自定义类型累加器需要继承AccumulatorV2并至少覆写下例中出现的方法,下面这个累加器可以用于在程序运行过程中收集一些文本类信息,最终以Set[String]的形式返回。 累加器(accumulator):Accumulator是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它们可用于实现计数器(如MapReduce)或总和计数。 Accumulator是存在于Driver端的,集群上运行的task进行Accumulator的累加,随后把值发到Driver端,在Driver端汇总(Spark UI在SparkContext创建时被创建,即在Driver端被创建,因此它可以读取Accumulator的数值),由于Accumulator存在于Driver端,从节点读取不到Accumulator的数值。 Spark提供的Accumulator主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,但是却给我们提供了多个task对于同一个变量并行操作的功能,但是task只能对Accumulator进行累加操作,不能读取它的值,只有Driver程序可以读取Accumulator的值。
广播变量 广播变量用来高效分发较大的对象。向所有工作节点发送一个 较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发 送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起 来都很顺手。 传统方式下,Spark 会自动把闭包中所有引用到的变量发送到工作节点上。虽然这很方便,但也很低效。原因有二:首先,默认的任务发射机制是专门为小任务进行优化的;其次,事实上你可能会在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。 (1) 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。 任何可序列化的类型都可以这么实现。 (2) 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。 (3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。 Spark SQL 是Spark用来处理`(ETL)数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。 1.1.1RDD RDD是一个懒执行的不可变的可以支持Lambda表达式的并行数据集合。 RDD的最大好处就是简单,API的人性化程度很高。 RDD的劣势是性能限制,它是一个JVM驻内存对象,这也就决定了存在GC的限制和数据增加时Java序列化成本的升高。 RDD 例子如下: 1.1.2Dataframe var rdd = sc.textFile("") var dataframe = spark.read.json("***.json") dataframe.rdd; import?spark.implicits._ rdd.toDF:
与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,Spark DataFrame很好地继承了传统单机数据分析的开发体验。
上图直观地体现了DataFrame和RDD的区别。左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。 DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待 DataFrame也是懒执行的。 性能上比RDD要高,主要有两方面原因: 定制化内存管理 数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制。
优化的执行计划 Json 连接两个文件。Filter(“过滤”) RDD: 两个大文件。 再过滤。比较消耗内存。 查询计划通过Spark catalyst optimiser进行优化.
有一个需求:两个大表.两个超大数据文件. 两个表关联查询,其中一个表中,要通过条件,过滤掉一部分数据.
rdd1.join(rdd2).filter(rdd2)
比如下面一个例子: select * from table t,emp e where t.id = e.id and e.empno>100 为了说明查询优化,我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。 得到的优化执行计划在转换成物 理执行计划的过程中,还可以根据具体的数据源的特性将过滤条件下推至数据源内。最右侧的物理执行计划中Filter之所以消失不见,就是因为溶入了用于执行最终的读取操作的表扫描节点内。 对于普通开发者而言,查询优化器的意义在于,即便是经验并不丰富的程序员写出的次优的查询,也可以被尽量转换为高效的形式予以执行。 Dataframe的劣势在于在编译期缺少类型安全检查,导致运行时出错. 1.1.3Dataset 1)是Dataframe API的一个扩展,是Spark最新的数据抽象 2)用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。 3)Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。 当我需要查询某个对象的某个字段时,不需要加载整个对象(文件)信息。只加载某一列。 4)样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。 5) Dataframe是Dataset的特列,DataFrame=Dataset[Row]?,所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。 6)DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person]. 7)DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。
RDD让我们能够决定怎么做,而DataFrame和DataSet让我们决定做什么,控制的粒度不一样。
1.1.4三者的共性 1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利 2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过。 val?sparkconf?=?new?SparkConf().setMaster(“local”).setAppName(“test”).set(“spark.port.maxRetries”,“1000”) val?spark?=?SparkSession.builder().config(sparkconf).getOrCreate() val?rdd=spark.sparkContext.parallelize(Seq((“a”,?1), (“b”,?1), (“a”,?1))) // map不运行 rdd.map{line=> ??println(“运行”) ??line.1 } 3、三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出 4、三者都有partition的概念 5、三者有许多共同的函数,如filter,排序等 6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持 import?spark.implicits. 7、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型 DataFrame: testDF.map{ ??????case?Row(col1:String,col2:Int)=> ????????println(col1);println(col2) ????????col1 ??????case?_=> ????????“” ????} Dataset: case?class?Coltest(col1:String,col2:Int)extends?Serializable?//定义字段名和类型 ????testDS.map{ ??????case?Coltest(col1:String,col2:Int)=> ????????println(col1);println(col2) ????????col1 ??????case?_=> ????????“” ????} 1.1.5三者的区别 RDD: 1、RDD一般和spark mlib同时使用 2、RDD不支持sparksql操作 DataFrame: 1、与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,如 testDF.foreach{ ??line?=> ????val?col1=line.getAsString ????val?col2=line.getAsString } 每一列的值没法直接访问 2、DataFrame与Dataset一般不与spark ml同时使用 3、DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,如 dataDF.createOrReplaceTempView(“tmp”) spark.sql(“select? ROW,DATE from tmp where DATE is not null order by DATE”).show(100,false) 4、DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然 //保存 val?saveoptions?=?Map(“header”?->?“true”,?“delimiter”?->?“\t”,?“path”?->?“hdfs://master01:9000/test”) datawDF.write.format(“com.atguigu.spark.csv”).mode(SaveMode.Overwrite).options(saveoptions).save() //读取 val?options?=?Map(“header”?->?“true”,?“delimiter”?->?“\t”,?“path”?->?“hdfs://master01:9000/test”) val?datarDF=?spark.read.options(options).format(“com.atguigu.spark.csv”).load() 利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定。 Dataset: Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。 DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段 而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息 case?class?Coltest(col1:String,col2:Int)extends?Serializable?//定义字段名和类型 /** ?rdd (“a”, 1) ?(“b”, 1) ?(“a”, 1) **/ val?test:?Dataset[Coltest]=rdd.map{line=> ??????Coltest(line._1,line._2) ????}.toDS test.map{ ??????line=> ????????println(line.col1) ????????println(line.col2) ????} 可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题 Spark Streaming 流式数据的处理 如果一个项目除了实时计算,还包括了离线批处理、交互式查询等业务功能,而且实时计算中,可能还会牵扯到高延迟批处理、交互式查询等功能,应首选Spark生态,用Spark Core开发离线批处理,用Spark SQL开发交互式查询,用Spark Streaming开发实时计算,三者可以无缝整合,给系统提供非常高的可扩展性。 Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列 有状态转化操作 特殊的Transformations
1.1.1.1 追踪状态变化UpdateStateByKey UpdateStateByKey原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件 更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。 updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。 updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步:
- 定义状态,状态可以是一个任意的数据类型。
- 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。 背压机制
默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔。这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能完全处理当前接收速率接收的数据。如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟)。Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力。 Spark Streaming Backpressure: ?根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。 1.1.1YARN Cluster模式
图2-4 YARN Cluster模式 在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。 Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。 ShuffleMapStage与ResultStage
ShuffleMapStage的结束伴随着shuffle文件的写磁盘。 ResultStage基本上对应代码中的action算子,即将一个函数应用在RDD的各个partition的数据集上,意味着一个job的运行结束。 Shuffle中的任务个数 Spark Shuffle分为map阶段和reduce阶段 初始RDD分区个数由该文件的split个数决定 reduce端的stage默认取spark.default.parallelism这个配置项的值作为分区数,如果没有配置,则以map端的最后一个RDD的分区数作为其分区数(也就是N),那么分区数就决定了reduce端的task的个数。 reduce端数据的读取 1.在reduce task开始执行之前会先让本进程中的MapOutputTrackerWorker向Driver进程中的MapoutPutTrakcerMaster发动请求,请求磁盘小文件位置信息; 当所有的Map task执行完毕后,Driver进程中的MapOutPutTrackerMaster就掌握了所有的磁盘小文件的位置信息 2.完成之前的操作之后,由BlockTransforService去Executor0所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过48M(reduce task每次最多拉取48M数据,将拉来的数据存储到Executor内存的20%内存中)。 HashShuffle解析 shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“划分”。所谓“划分”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。 下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。 此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。 shuffle read的拉取过程是一边拉取一边进行聚合的。 优化后的HashShuffleManager 开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了,此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。 当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件,也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。 SortShuffle解析 ,数据会先写入一个内存数据结构中 每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。 一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。 1.bypass运行机制 每个task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。 而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。 堆内和堆外内存规划 作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。 堆内内存受到JVM统一管理,堆外内存是直接向操作系统进行内存的申请和释放。
|