IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> hadoop存储与分析 -> 正文阅读

[大数据]hadoop存储与分析

Apache Hadoop

## 背景

随着信息化互联网|物联网发展要求,万物互联趋势势在必行。随之引发架构的演变由单一架构向高并发分布式架构演变。数据的存储也开始由原始的单机存储演变为分布式存储。

  • JavaWeb:为了应对高并发和分布式提出的是LNMP :(Linux、Nginx、MySQL、PHP )思想。

  • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ONellWHG-1628160864356)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210301152450238.png)]

  • 海量数据存储|数据分析 :存储方案(HDFS)、计算方案(Map Reduce、StormSparkFlink)

大数据背景

分布式:服务间出现跨机器、跨进程通信同一称为分布式

  • 存储
    • 单机存储:容量限制、扩展性差、数据灾备问题
    • 分布式存储:使用存储的集群实现对海量数据的并行化的读写,提升系统写的吞吐能力。目前针对传统业务领域的分布式文件存储方案有 文件存储、块存储。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KzXMGhOJ-1628160864359)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210301152426291.png)]

  • 计算(分析)
    • 单机分析/计算:慢,受限于单机存储的内存、CPU、网络限制
    • 分布式计算:将计算任务交给专门的计算集群负责任务的计算。打破单机计算的瓶颈,实现并行计算,模拟多核CPU的计算能力。可以实现在一定的时间内达到对数据的有效分析。

Hadoop 出现

为了解决海量数据引发的一系列问题,人们通过借鉴Googl发布Google File System和simple Data processing on large cluster的论文在早期的Nutch项目中实现了Hadoop的雏形,早期在Nutch中有两个板块:NDFS(Nutch Distribute File System)、MapReduce分别解决该工程的存储和计算两个问题,而后将该板块冲Nutch中剥离形成独立的模块,最后更名为Hadoop。

  • HDFS :Hadoop Distribute File System 分布式文件系统

  • MapReduce :分发-->汇总,MapReduce是hadoop中通用的分布式并行计算框架。

人称Hadoop之父的Doug Cutting,Apache软件基金会主席,是Lucene、Nutch 、Hadoop等项目的发起人。最开始Hadoop只是Apache Lucene的子项目Nutch的一部分。Lucene 是全球第一个开源的全文检索引擎工具包, Nutch基于Lucene,并具有网页抓取和解析的功能,可以实现一个搜索引擎的开发,但是如果投入使用的话就必须在极短时间内做出反应,并且能够实现短时间内对亿级数量的网页进行分析处理,这就需要考虑分布式任务处理、故障恢复、负载均衡这些问题。后来Doug Cutting 借鉴谷歌的Google File SystemMapReduce:Simplified Data Processing On Large Clusters两篇论文,移植了其中的技术,并将其命名为:Hadoop。

下载地址:https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-2.9.2/hadoop-2.9.2.tar.gz

环境搭建

环境准备

  • 安装虚拟器并且安装CentOS-7 64位

  • 安装JDK,并且配置环境变量

    ①安装jdk

[root@CentOS ~]# rpm -ivh  jdk-8u191-linux-x64.rpm
警告:jdk-8u191-linux-x64.rpm: 头V3 RSA/SHA256 Signature, 密钥 ID ec551f03: NOKEY
准备中...                          ################################# [100%]
正在升级/安装...
   1:jdk1.8-2000:1.8.0_191-fcs        ################################# [100%]
Unpacking JAR files...
        tools.jar...
        plugin.jar...
        javaws.jar...
        deploy.jar...
        rt.jar...
        jsse.jar...
        charsets.jar...
        localedata.jar...

默认情况乱下,JDK安装在/usr/java路径下

②配置环境变量

[root@CentOS ~]# vi .bashrc 
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
# 使用source命令重新加载变量
[root@CentOS ~]# source ~/.bashrc
  • 配置主机名
[root@CentOS ~]# vi /etc/hostname
CentOS

修改完成主机名之后需要reboot重启

  • 配置主机名与IP的映射关系

    ①查看ip

[root@CentOS ~]# ip addr
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
    inet6 ::1/128 scope host
       valid_lft forever preferred_lft forever
2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
    link/ether 00:0c:29:3c:e6:31 brd ff:ff:ff:ff:ff:ff
    inet 192.168.73.130/24 brd 192.168.73.255 scope global noprefixroute dynamic ens33
       valid_lft 1427sec preferred_lft 1427sec
    inet6 fe80::fffe:2129:b1f8:2c9b/64 scope link noprefixroute
       valid_lft forever preferred_lft forever

可以看到ens33这一块网卡的地址为192.168.73.130然后再/etc/hosts中映射主机名和IP的关系.

[root@CentOS ~]# vi /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.73.130 CentOS
  • 配置SSH免密码认证

    ①生成认证所需的公私钥对

[root@CentOS ~]# ssh-keygen -t rsa -P ''
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Created directory '/root/.ssh'.
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:KIx5N+++qLzziq6LaCBT2g5Dqcq0j+TxGV3jJXs7cwc root@CentOS
The key's randomart image is:
+---[RSA 2048]----+
|                 |
|                 |
|  .              |
| o.+   .         |
|o+o + ++S.       |
|O....ooo=  E     |
|*B.. . o..  .    |
|*+=o+  o.o.. .   |
|**++=*o.+o+ .    |
+----[SHA256]-----+

②添加信任列表,继而实现免密码认证

[root@CentOS ~]# ssh-copy-id CentOS
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/root/.ssh/id_rsa.pub"
The authenticity of host 'centos (192.168.73.130)' can't be established.
ECDSA key fingerprint is SHA256:WnqQLGCjyJjgb9IMEUUhz1RLkpxvZJxzEZjtol7iLac.
ECDSA key fingerprint is MD5:45:05:12:4c:d6:1b:0c:1a:fc:58:00:ec:12:7e:c1:3d.
Are you sure you want to continue connecting (yes/no)? yes
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
Are you sure you want to continue connecting (yes/no)? yes
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
root@centos's password:
Number of key(s) added: 1

Now try logging into the machine, with:   "ssh 'CentOS'"
and check to make sure that only the key(s) you wanted were added.

③测试是否设置ssh免密码成功

[root@CentOS ~]# ssh root@CentOS
Last failed login: Fri Sep 25 14:19:39 CST 2020 from centos on ssh:notty
There was 1 failed login attempt since the last successful login.
Last login: Fri Sep 25 11:58:52 2020 from 192.168.73.1

如果无需输入密码说明SSH免密码认证成功!

  • 关闭防火墙
[root@CentOS ~]# systemctl stop firewalld.service # 关闭服务
[root@CentOS ~]# systemctl disable firewalld.service # 关闭开机自启
[root@CentOS ~]# firewall-cmd --state # 查看防火墙状态
not running

HADOOP安装

  • 解压并安装Hadoop,下载https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-2.9.2/hadoop-2.9.2.tar.gz
[root@CentOS ~]# tar -zxf hadoop-2.9.2.tar.gz -C /usr/
  • 配置HADOOP_HOME环境变量
[root@CentOS ~]# vi .bashrc
JAVA_HOME=/usr/java/latest
HADOOP_HOME=/usr/hadoop-2.9.2/
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
# 重新加载HADOOP_HOME环境变量
[root@CentOS ~]# source .bashrc
  • 配置hadoop配置文件etc/hadoop/{core-site.xml|hdfs-site.xml|slaves}

①配置core-site.xml

[root@CentOS ~]# cd /usr/hadoop-2.9.2/
[root@CentOS hadoop-2.9.2]# vi etc/hadoop/core-site.xml
<!--NameNode访问入口-->
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://CentOS:9000</value>
</property>
<!--hdfs工作基础目录-->
<property>
    <name>hadoop.tmp.dir</name>
    <value>/usr/hadoop-2.9.2/hadoop-${user.name}</value>
</property>

②配置hdfs-site.xml

[root@CentOS ~]# cd /usr/hadoop-2.9.2/
[root@CentOS hadoop-2.9.2]# vi etc/hadoop/hdfs-site.xml
<!--block副本因子-->
<property>
    <name>dfs.replication</name>
    <value>1</value>
</property>
<!--配置Sencondary namenode所在物理主机-->
<property>
    <name>dfs.namenode.secondary.http-address</name>
    <value>CentOS:50090</value>
</property>

③配置slaves文本文件

vi etc/hadoop/slaves

CentOS
  • 启动HDFS系统

①在第一次启动HDFS系统的时候,需要对系统做一次格式化,主要为后续的启动做准备,这里需要注意只有再第一次启动的时候才需要这么做,以后再次启动HDFS的时候可以忽略这一步骤!

[root@CentOS ~]# hdfs namenode -format
...
20/09/25 14:31:23 INFO common.Storage: Storage directory /usr/hadoop-2.9.2/hadoop-root/dfs/name has been successfully formatted.
...

在创建HDFS中NameNode服务启动时候需要加载的镜像文件。

②启动HDFS服务

启动脚本放置再sbin目录下,因为我们已经将sbin目录设置到PATH中了,所以可以直接使用start-dfs.sh脚本启动HDFS,如果想关闭HDFS系统可以使用stop-dfs.sh

[root@CentOS ~]# start-dfs.sh
Starting namenodes on [CentOS]
CentOS: starting namenode, logging to /usr/hadoop-2.9.2/logs/hadoop-root-namenode-CentOS.out
CentOS: starting datanode, logging to /usr/hadoop-2.9.2/logs/hadoop-root-datanode-CentOS.out
Starting secondary namenodes [CentOS]
CentOS: starting secondarynamenode, logging to /usr/hadoop-2.9.2/logs/hadoop-root-secondarynamenode-CentOS.out

启动成功以后,用户可以使用JDK自带的jsp指令查看java进程,正常可以看到DataNode、NameNode、SecondaryNameNode三个服务。

[root@CentOS ~]# jps
3457 DataNode
3691 SecondaryNameNode
3325 NameNode
4237 Jps
[root@CentOS ~]#

最后用户可以访问NameNode服务内嵌的WEB页面,查看HDFS的运行状态,默认该服务的监听端口是50070,访问效果如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-77sS5d9y-1628160864362)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210302153513387.png)]

HDFS架构

简介

Hadoop分布式文件系统(HDFS)是指被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统(Distributed File System)。它和现有的分布式文件系统有很多共同点。但同时,它和其他的分布式文件系统的区别也是很明显的。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。HDFS放宽了一部分POSIX约束,来实现流式读取文件系统数据的目的。HDFS在最开始是作为Apache Nutch搜索引擎项目的基础架构而开发的。HDFS是Apache Hadoop Core项目的一部分。

HDFS有着高容错性(fault-tolerant)的特点,并且设计用来部署在低廉的(low-cost)硬件上。而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。

架构

NameNode & DataNodes

HDFS是一个master/slave架构,一个HDFS的集群包含一个NameNode,该服务是主服务负责管管文件系统的Namespace和响应客户端的常规访问。另外,有很多个DataNode节点,每个DataNode负责管理存储在DataNode运行所在主机上得文件。HDFS暴露了一个文件系统Namespace以及允许将用户数据存储到文件里。HDFS底层会将文件切分成1~N个块,这些块被存储在一系列的DataNode上,NameNode负责修改Namespace的DDL操作例如:打开、关闭、修改文件或者文件夹。NameNode决定了数据块到DataNode的映射。DataNode负责响应客户端的读写请求,同时在接收到来自于NameNode的指令DataNode还要进行块的创建、删除、复制等操作。

HDFS has a master/slave architecture. An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients. In addition, there are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes that they run on. HDFS exposes a file system namespace and allows user data to be stored in files. Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes. The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes. The DataNodes are responsible for serving read and write requests from the file system’s clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode.

在这里插入图片描述

NameNode:使用内存存储集群中的元数据(文件命名空间-文件目录结构、数据块到DataNode映射)

DataNode:负责响应客户端对数据块的读写请求,向NameNode汇报自身状态信息。

Block:是HDFS切分文件的尺度,默认是128MB,一个文件最多只有 一个不足128MB块

副本因子:HDFS为了防止DataNode宕机导致块的丢失,允许一个块又多个备份,默认备份是3

HDFS不擅长存储小文件

因为Namenode使用单机的内存存储,因此由于小文件会占用更多的内存空间,导致了Namenode内存浪费。

案例NameNodeDataNode
1文件128MB1条数据块映射元数据128MB磁盘存储*(副本因子)
1000文件总计128MB1000*1条数据块映射元数据128MB磁盘存储*(副本因子)

HDFS机架感知

分布式的集群通常包含非常多的机器,由于受到机架槽位和交换机网口的限制,通常大型的分布式集群都会跨好几个机架,由多个机架上的机器共同组成一个分布式集群。机架内的机器之间的网络速度通常都会高于跨机架机器之间的网络速度,并且机架之间机器的网络通信通常受到上层交换机间网络带宽的限制。

Hadoop在设计时考虑到数据的安全与高效,数据文件默认在HDFS上存放三份,存储策略为:

第一个block副本放在客户端所在的数据节点里(如果客户端不在集群范围内,则从整个集群中随机选择一个合适的数据节点来存放)。

第二个副本放置在与第一个副本所在节点相同机架内的其它数据节点上

第三个副本放置在不同机架的节点上

这样如果本地数据损坏,节点可以从同一机架内的相邻节点拿到数据,速度肯定比从跨机架节点上拿数据要快;同时,如果整个机架的网络出现异常,也能保证在其它机架的节点上找到数据。为了降低整体的带宽消耗和读取延时,HDFS会尽量让读取程序读取离它最近的副本。如果在读取程序的同一个机架上有一个副本,那么就读取该副本。如果一个HDFS集群跨越多个数据中心,那么客户端也将首先读本地数据中心的副本。

参考:https://www.cnblogs.com/zwgblog/p/7096875.html

SecondaryNameNode & NameNode

![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-421Xa6yc-1628160864369)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210302171532758.png)](https://img-blog.csdnimg.cn/62f5b900e0ae46e8b2ad2f1f3a44fe59.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQyMDc0OTQ5,size_16,color_FFFFFF,t_70)

fsimage:存储在Namenode服务所在物理主机磁盘上的一个二进制文本文件。记录了元数据信息

edits:存储在Namenode服务所在物理主机磁盘上的一个二进制文本文件,记录了对元数据修改操作。

当第一次启动Namenode服务的时候,系统会加载fsimage和edits文件进行合并得到最新元数据信息,并且更新fsimage和edits,一旦服务启动成功后,在服务运行期间不再更新fsimage,只是将操作记录在edits中。导致namenode在长期运行之后重启导致namenode启动时间过长,还可能导致edits文件过大。因此Hadoop HDFS引入Secondary Namenode 辅助Namenode在运行期间完成对元数据的整理。

The NameNode stores modifications to the file system as a log appended to a native file system file, edits. When a NameNode starts up, it reads HDFS state from an image file, fsimage, and then applies edits from the edits log file. It then writes new HDFS state to the fsimage and starts normal operation with an empty edits file. Since NameNode merges fsimage and edits files only during start up, the edits log file could get very large over time on a busy cluster. Another side effect of a larger edits file is that next restart of NameNode takes longer.

The secondary NameNode merges the fsimage and the edits log files periodically and keeps edits log size within a limit. It is usually run on a different machine than the primary NameNode since its memory requirements are on the same order as the primary NameNode.

The start of the checkpoint process on the secondary NameNode is controlled by two configuration parameters.

  • dfs.namenode.checkpoint.period, set to 1 hour by default, specifies the maximum delay between two consecutive checkpoints, and
  • dfs.namenode.checkpoint.txns, set to 1 million by default, defines the number of uncheckpointed transactions on the NameNode which will force an urgent checkpoint, even if the checkpoint period has not been reached.

The secondary NameNode stores the latest checkpoint in a directory which is structured the same way as the primary NameNode’s directory. So that the check pointed image is always ready to be read by the primary NameNode if necessary.

NameNode启动过程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Q5LY9Iyh-1628160864371)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210302171551432.png)]

NameNode的SafeMode

在启动过程中,NameNode会进入一个特殊的状态称为Safemode.HDFS在处于安全模式下不会进行数据块的复制。NameNode在安全模式下接收来自DataNode的心跳和Blockreport信息,每个DataNode的块的汇报信息中包含了该物理主机上所持有的所有的数据块的信息。Name会在启动时候检查所有汇报的块是否满足设置的最小副本数(默认值1),只要块达到了最小副本数,才认得当前块是安全的。NameNode等待30秒然后尝试检查汇报的所谓的安全的块的比例有没有达到99.9%,如果达到该阈值,NameNode自动退出安全模式。然后开始检查块的副本数有没有低于配置的副本数,然后发送复制指令,进行块的复制。

On startup, the NameNode enters a special state called Safemode. Replication of data blocks does not occur when the NameNode is in the Safemode state. The NameNode receives Heartbeat and Blockreport messages from the DataNodes. A Blockreport contains the list of data blocks that a DataNode is hosting. Each block has a specified minimum number of replicas. A block is considered safely replicated when the minimum number of replicas of that data block has checked in with the NameNode. After a configurable percentage of safely replicated data blocks checks in with the NameNode (plus an additional 30 seconds), the NameNode exits the Safemode state. It then determines the list of data blocks (if any) that still have fewer than the specified number of replicas. The NameNode then replicates these blocks to other DataNodes.

注意:HDFS在启动的时候会自动进入和退出安全模式,一般在生产一般有时候也会让HDFS强制进入安全模式,进而对服务器进行维护。

[root@CentOS ~]# hdfs dfsadmin -safemode get
Safe mode is OFF
[root@CentOS ~]# hdfs dfsadmin -safemode enter
Safe mode is ON
[root@CentOS ~]# hdfs dfs -put hadoop-2.9.2.tar.gz /
put: Cannot create file/hadoop-2.9.2.tar.gz._COPYING_. Name node is in safe mode.
[root@CentOS ~]# hdfs dfsadmin -safemode leave
Safe mode is OFF
[root@CentOS ~]# hdfs dfs -put hadoop-2.9.2.tar.gz /

SSH免密码认证

SSH 为建立在应用层基础上的安全协议。SSH 是较可靠,专为远程登录会话和其他网络服务提供安全性的协议。利用 SSH 协议可以有效防止远程管理过程中的信息泄露问题。提供的登录方式有两种:

  • 基于口令的安全验证 - 有可能远程主机冒充目标主机,截获用户信息。
  • 密匙的安全验证 -需要认证的是机器的身份

![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5DLonvYX-1628160864373)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210303165034628.png)](https://img-blog.csdnimg.cn/2237283e1aad45549dcfc9b9bfacacbf.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQyMDc0OTQ5,size_16,color_FFFFFF,t_70)

①产生公私钥对,可选RSA或者DSA算法

[root@CentOS ~]# ssh-keygen -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Created directory '/root/.ssh'.
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:qWX5zumy1JS1f1uxPb3Gr+5e8F0REVueJew/WYrlxwc root@CentOS
The key's randomart image is:
+---[RSA 2048]----+
|             ..+=|
|              .o*|
|            .. +.|
|         o o .E o|
|        S o .+.*+|
|       + +  ..o=%|
|      . . o   o+@|
|       ..o .   ==|
|        .+=  +*+o|
+----[SHA256]-----+

默认会在~/.ssh目录下产生id_rsa(私钥)和id_rsa.pub(公钥)

②将本机的公钥添加到目标主机的授信列表文件

[root@CentOS ~]# ssh-copy-id root@CentOS
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/root/.ssh/id_rsa.pub"
The authenticity of host 'centos (192.168.73.130)' can't be established.
ECDSA key fingerprint is SHA256:WnqQLGCjyJjgb9IMEUUhz1RLkpxvZJxzEZjtol7iLac.
ECDSA key fingerprint is MD5:45:05:12:4c:d6:1b:0c:1a:fc:58:00:ec:12:7e:c1:3d.
Are you sure you want to continue connecting (yes/no)? yes
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
root@centos's password:

Number of key(s) added: 1

Now try logging into the machine, with:   "ssh 'root@CentOS'"
and check to make sure that only the key(s) you wanted were added.

默认会将本机的公钥添加到远程目标主机的~/.ssh/authorized_keys文件中。

Trash回收站

HDFS为了规避由于用户的误操作,导致的数据删除丢失,用户可以在构建HDFS的时候,配置HDFS的垃圾回收功能。所谓的垃圾回收,本质上是在用户删除文件的时候,系统并不会立即删除文件,仅仅是将文件移动到垃圾回收的目录。然后更具配置的时间,一旦超过该时间,系统会删除该文件,用户需要在到期之前,将回收站的文件移除垃圾站,即可避免删除。

  • 开启垃圾回收,需要在core-site.xml中添加如下配置,然后重启hdfs即可
<!--垃圾回收,5 minites-->
<property>
  <name>fs.trash.interval</name>
  <value>5</value>
</property>
[root@CentOS hadoop-2.9.2]# hdfs dfs -rm -r -f /jdk-8u191-linux-x64.rpm
20/09/25 20:09:24 INFO fs.TrashPolicyDefault: Moved: 'hdfs://CentOS:9000/jdk-8u191-linux-x64.rpm' to trash at: hdfs://CentOS:9000/user/root/.Trash/Current/jdk-8u191-linux-x64.rpm

目录结构

[root@CentOS ~]# tree -L 1 /usr/hadoop-2.9.2/
/usr/hadoop-2.9.2/
├── bin  # 系统脚本,hdfs、hadoop、yarn
├── etc  # 配置目录xml、文本文件
├── include # 一些C的头文件,无需关注
├── lib  # 第三方native实现C实现
├── libexec # hadoop运行时候,加载配置的脚本
├── LICENSE.txt
├── logs # 系统运行日志目录,排查故障!
├── NOTICE.txt
├── README.txt
├── sbin  # 用户脚本,通常用于启动服务例如:start|stop-dfs.sh、
└── share # hadoop运行的依赖jars、内嵌webapp 

HDFS实践

HDFS Shell 命令(经常用)

√打印hadoop类路径

[root@CentOS ~]# hdfs classpath

√格式化NameNode

[root@CentOS ~]# hdfs namenode -format

dfsadmin命令

①可以使用-report -live或者-dead查看集群中dataNode节点状态

[root@CentOS ~]# hdfs dfsadmin -report  -live 

②使用-safemode enter|leave|get等操作安全模式

[root@CentOS ~]# hdfs dfsadmin -safemode get
Safe mode is OFF

③查看集群网络拓扑

[root@CentOS ~]# hdfs dfsadmin -printTopology
Rack: /default-rack
   192.168.73.130:50010 (CentOS)

更多请参考:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html#dfsadmin

检查某个目录状态

[root@CentOS ~]# hdfs fsck /

√DFS命令

[root@CentOS ~]# hdfs dfs -命令 选项 
或者老版本写法
[root@CentOS ~]# hadoop fs -命令 选项  
-appendToFile

将anaconda-ks.cfg 追加到aa.log中

[root@CentOS ~]# hdfs dfs -appendToFile /root/anaconda-ks.cfg /aa.log
[root@CentOS ~]# hdfs dfs -appendToFile /root/anaconda-ks.cfg /aa.log
-cat

查看文件内容

[root@CentOS ~]# hdfs dfs -cat /aa.log
等价
[root@CentOS ~]# hdfs dfs -cat hdfs://CentOS:9000/aa.log
-chmod

修改文件权限

[root@CentOS ~]# hdfs dfs -chmod -R u+x  /aa.log
[root@CentOS ~]# hdfs dfs -chmod -R o+x  /aa.log
[root@CentOS ~]# hdfs dfs -chmod -R a+x  /aa.log
[root@CentOS ~]# hdfs dfs -chmod -R a-x  /aa.log
-copyFromLocal/-copyToLocal

copyFromLocal本地上传到HDFS;copyToLocal从HDFS上下载文件

[root@CentOS ~]# hdfs dfs -copyFromLocal jdk-8u191-linux-x64.rpm /
[root@CentOS ~]# rm -rf jdk-8u191-linux-x64.rpm
[root@CentOS ~]# hdfs dfs -copyToLocal /jdk-8u191-linux-x64.rpm /root/
[root@CentOS ~]# ls
anaconda-ks.cfg  hadoop-2.9.2.tar.gz  jdk-8u191-linux-x64.rpm
-mvToLocal/mvFromLocal

mvToLocal先下载文件,然后删除远程数据;mvFromLocal:先上传,再删除本地

[root@CentOS ~]# hdfs dfs -moveFromLocal jdk-8u191-linux-x64.rpm /dir1
[root@CentOS ~]# ls
anaconda-ks.cfg  hadoop-2.9.2.tar.gz
[root@CentOS ~]# hdfs dfs -moveToLocal /dir1/jdk-8u191-linux-x64.rpm /root
moveToLocal: Option '-moveToLocal' is not implemented yet.
-put/get

文件上传/下载

[root@CentOS ~]# hdfs dfs -get /dir1/jdk-8u191-linux-x64.rpm /root
[root@CentOS ~]# ls
anaconda-ks.cfg  hadoop-2.9.2.tar.gz  jdk-8u191-linux-x64.rpm
[root@CentOS ~]# hdfs dfs -put hadoop-2.9.2.tar.gz /dir1

更多命令请使用

[root@CentOS ~]# hdfs dfs -help 命令

例如,想知道touchz该如何使用

[root@CentOS ~]# hdfs dfs -touchz /dir1/Helloworld.java
[root@CentOS ~]# hdfs dfs -ls /dir1/
Found 5 items
-rw-r--r--   1 root supergroup          0 2020-09-25 23:47 /dir1/Helloworld.java
drwxr-xr-x   - root supergroup          0 2020-09-25 23:07 /dir1/d1
drwxr-xr-x   - root supergroup          0 2020-09-25 23:09 /dir1/d2
-rw-r--r--   1 root supergroup  366447449 2020-09-25 23:43 /dir1/hadoop-2.9.2.tar.gz
-rw-r--r--   1 root supergroup  176154027 2020-09-25 23:41 /dir1/jdk-8u191-linux-x64.rpm

Java API操作HDFS(了解)

①搭建开发步骤,创建一个Maven的项目(不用选择任何模板),在pom.xml文件中添加如下依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.9.2</version>
</dependency>

②配置windows开发环境(很重要)

  • 需要将hadoop-2.9.2解压在window的指定目录下,比如这里我们解压在C:/目录下
  • 在Windows系统的环境变量中添加HADOOP_HOME环境变量
  • 将hadoop-window-master.zip中的bin目录下的文件全部拷贝到%HADOOP_HOME%/bin目录下进行覆盖
  • 重启IDEA否则集成开发环境不识别配置HADOOP_HOME环境变量

③建议将core-site.xml和hdfs-site.xml文件拷贝到项目的resources目录下

④在Windows上配置主机名和IP的映射关系(略)

⑤创建FileSystem和Configuration对象

public static FileSystem fs=null;
public static Configuration conf=null;
static {
    try {
        conf= new Configuration();
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
        fs=FileSystem.get(conf);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

文件上传

Path src = new Path("file://xx路径");
Path dst = new Path("/");
fs.copyFromLocalFile(src,dst);
-
InputStream in=new FileInputStream("file://xx路径");
Path dst = new Path("/xx路径");
OutputStream os=fs.create(dst);
IOUtils.copyBytes(in,os,1024,true);

文件下载

Path dst = new Path("file://xx路径");
Path src = new Path("/xx路径");
fs.copyToLocalFile(src,dst);
- 
Path dst = new Path("/xx路径");
InputStream in= fs.open(dst);
OutputStream os=new FileOutputStream("file://xx路径");
IOUtils.copyBytes(in,os,1024,true);    

删除文件

Path dst = new Path("/system");
fs.delete(dst,true);

回收站

Path dst = new Path("/aa.log");
Trash trash=new Trash(fs,conf);
trash.moveToTrash(dst);

所有文件

Path dst = new Path("/");
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(dst, true);
while (listFiles.hasNext()){
    LocatedFileStatus fileStatus = listFiles.next();
    System.out.println(fileStatus.getPath()+" "+fileStatus.isDirectory()+" "+fileStatus.getLen());
}

所有文件或文件夹

Path dst = new Path("/");
FileStatus[] fileStatuses = fs.listStatus(dst);
for (FileStatus fileStatus : fileStatuses) {
    System.out.println(fileStatus.getPath()+" "+fileStatus.isDirectory());
}

MapReduce

概述

MapReduce是一个 Hadoop 的并行计算框架,借鉴了函数式编程思想和矢量编程。Hadoop 中是充分利用了存储节点/Data Node运行所在主机的计算资源(CPU、内存、网络、少许磁盘)完成对任务的并行计算。Map Reduce框架会在所有的DataNode所在的物理主机启动一个计算资源管理者Node Manager用于管理本地的计算资源,默认系统会将计算资源均分8个等份,每个等份抽象成一个Container,该Container主要作为资源隔离。还会再找一些其他的主机启动一个资源管理中心Resource Manager,用于管理集群的计算资源。

流程分析

当用户提交一个计算任务给MapReduce框架,框架会将任务拆分成Map阶段和Reduce阶段(矢量编程思想将任务拆分成两个阶段),框架会根据Map/Reduce阶段的任务并行度.在任务提交初期会启动一个任务管理者(每个任务都有自己的任务管理者)-MRAppMaster(该进程会浪费掉1个计算资源)用于管理Map阶段和Reduce阶段任务执行。在任务执行时期,每个阶段会根据阶段任务的并行度分配计算资源(每个计算资源启动一个Yarn Child),由MRAppMaster完成对阶段任务的检测管理。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GEVKa1ud-1628160864375)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210304163703741.png)]

ResourceManager:负责任务资源的统一调度,管理NodeManager资源,启动MRAppMaster

NodeManager:用于管理本机上的计算资源,默认会将本机的计算资源拆分为8个等份,每个等份抽象成Container

MRAppMaster:任何一个执行的任务都会有一个MRAppMaster负责YarnChild任务的执行和监测。

YarnChild:是具体执行的MapTask或者是ReduceTask的统称。

任务执行期间系统会启动MRAppmaster和YarnChild负责任务的执行,一旦任务执行结束MRAppMaster和YarnChild会自动退出。

环境搭建

①配置资源管理器

[root@CentOS ~]# vi /usr/hadoop-2.9.2/etc/hadoop/yarn-site.xml
<!--配置MapReduce计算框架的核心实现Shuffle-洗牌-->
<property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
</property>
<!--配置资源管理器所在的目标主机-->
<property>
    <name>yarn.resourcemanager.hostname</name>
    <value>CentOS2</value>
</property>
<!--关闭物理内存检查-->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<!--关闭虚拟内存检查-->
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

②配置MapReduce计算框架

[root@CentOS ~]# mv /usr/hadoop-2.9.2/etc/hadoop/mapred-site.xml.template  /usr/hadoop-2.9.2/etc/hadoop/mapred-site.xml
[root@CentOS ~]# vi /usr/hadoop-2.9.2/etc/hadoop/mapred-site.xml
<!--MapRedcue框架资源管理器的实现-->
<property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
</property>

③启动计算服务

[root@CentOS ~]# start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to /usr/hadoop-2.9.2/logs/yarn-root-resourcemanager-CentOS.out
CentOS: starting nodemanager, logging to /usr/hadoop-2.9.2/logs/yarn-root-nodemanager-CentOS.out
[root@CentOS ~]# jps
13078 SecondaryNameNode
12824 DataNode
1080 ResourceManager
12681 NameNode
1195 NodeManager
1262 Jps

④可以访问ResourManager内嵌WebUI页面:http://CentOS2:8088

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7c1tQMZr-1628160864378)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210304163602138.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7OsM64Hx-1628160864379)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210805183520365.png)]

MapReduce任务开发

背景

假设我们有如下的一张表,需要统计出每个版块被点击次数。

日志级别类别点击日期
INFO/product/xxxx12020-09-28 10:10:00
INFO/product/xxxx22020-09-28 12:10:00
INFO/cart/xxxx22020-09-28 12:10:00
INFO/order/xxxx2020-09-28 12:10:00

如果我们可以将以上的日志看做成是数据库中的一张表,这个问题就可以使用以下SQL解决:

select category,sum(1) from t_click group by category

如果使用上面提到的MapReduce计算模型,我们可以使用Map完成group的功能,使用Reduce完成sum的功能。有如下数据格式

INFO /product/xxx/1?name=zhangsan 2020-09-28 10:10:00
INFO /product/xxx/1?name=zhangsan 2020-09-28 10:10:00
INFO /cart/xxx/1?name=lisi 2020-09-28 10:10:00
INFO /order/xxx/1?name=zhangsan 2020-09-28 10:10:00
INFO /product/xxx/1?name=zhaoliu 2020-09-28 10:10:00
INFO /cart/xxx/1?name=win7 2020-09-28 10:10:00

实现

①写Mapper逻辑

package com.baizhi.click;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 1、用户必须首先清楚 数据格式、存储位置  -- 读取数据方式   --  Mapper写法?
 *    TextInputFormat<LongWritable,Text>  : 读取文件系统中文件 本地系统、HDFS
 *                    字节偏移量     文本行
 * 2、必须清楚的知道自己想干嘛? 按 __类别__ 统计  __点击次数__ 值
 *                               key           value
 */
public class URLMapper extends Mapper<LongWritable, Text,Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         String line=value.toString();
         String url=line.split(" ")[1];
         //获取类别
         int endIndex=url.indexOf("/",1);
         String category=url.substring(0,endIndex);

         //将转换的结果输出
        context.write(new Text(category),new IntWritable(1));
    }

}

②Reducer逻辑

package com.baizhi.click;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * 1、你要对哪种Mapper的输出结果汇总  --  决定Reducer输入的Key和Value类型
 *
 * 2、你要知道最终输出结果以哪种格式写出去,输出Key/Value格式用户只需要关注他的toString即可
 *
 *   TextOutputFormat<key,value>  将结果写出到文件系统:本地、HDFS
 */
public class URLReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int total=0;
        for (IntWritable value : values) {
            total+=value.get();
        }
        context.write(key,new IntWritable(total));
    }
}

③封装Job对象

public class URLCountApplication extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
        //1.创建一个Job对象
        Configuration conf = getConf();
        Job job= Job.getInstance(conf,"URLCountApplication");

        //2.告诉job数据格式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        //3.设置数据路径
        TextInputFormat.addInputPath(job,new Path("/demo/click"));
        //系统自动创建,如果在执行前存在,则放弃执行
        TextOutputFormat.setOutputPath(job,new Path("/demo/result"));

        //4.设置处理逻辑
        job.setMapperClass(URLMapper.class);
        job.setReducerClass(URLReducer.class);

        //5.设置输出的Key,value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6.提交job
        return job.waitForCompletion(true)?1:0;
    }
    
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new URLCountApplication(),args);
    }
}

任务发布

远程部署
  • 需要在job当中添加如下代码
job.setJarByClass(URLCountApplication.class);

设置程序的类加载路径,因为任务是打好jar包以后使用hadoop jar命令提交的.

[root@CentOS ~]# yarn jar MapReduce-1.0-SNAPSHOT.jar com.baizhi.click.URLCountApplication
或者
[root@CentOS ~]# hadoop jar MapReduce-1.0-SNAPSHOT.jar com.baizhi.click.URLCountApplication

tips:如果大家感觉这种打包,然后提交比较复杂,我们可以使用maven提供的ssh远程登录插件,先登录系统后自动执行后续提交任务。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>MapReduce</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.9.2</version>
        </dependency>
    </dependencies>
    <build>
        <extensions>
            <extension>
                <groupId>org.apache.maven.wagon</groupId>
                <artifactId>wagon-ssh</artifactId>
                <version>2.10</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>wagon-maven-plugin</artifactId>
                <version>1.0</version>
                <executions>
                    <execution>
                        <id>upload-deploy</id>
                        <!-- 运行package打包的同时运行upload-single和sshexec -->
                        <phase>package</phase>
                        <goals>
                            <goal>upload-single</goal>
                            <goal>sshexec</goal>
                        </goals>
                        <configuration>
                            <!-- 需要部署的文件 -->
                            <fromFile>target/${project.artifactId}-${project.version}.jar</fromFile>
                            <!-- 部署目录  用户:密码@ip+部署地址:端口 -->
                            <url>
                                <![CDATA[ scp://root:123456@CentOS/root/ ]]>
                            </url>
                            <!--shell 执行脚本 -->
                            <commands>
                                <command> hadoop fs -rm -r -f /demo/result </command>
                                <command> hadoop jar MapReduce-1.0-SNAPSHOT.jar com.baizhi.click.URLCountApplication </command>
                            </commands>
                            <displayCommandOutputs>true</displayCommandOutputs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
本地仿真

无需任何yarn环境,直接通过本地仿真的方式实现.一般需要更改NativeIO源码。这里面由于下载不到2.9.2的源码包,大家可以尝试使用2.6.0代替,将源码的557行代码修改如下:

public static boolean access(String path, AccessRight desiredAccess)
    throws IOException {
    return true;
}

在resource资源目录下添加log4j.proeprties

log4j.rootLogger=INFO,CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender 
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout 
log4j.appender.CONSOLE.layout.ConversionPattern=%p %d{yyyy-MM-dd HH:mm:ss,SSS} %C -%m%n
跨平台提交

①将core|hdfs|yarn|mapred-site.xml拷贝到项目的resources目录下

②在mapred-site.xml添加如下配置

<!--开启跨平台-->
<property>
    <name>mapreduce.app-submission.cross-platform	</name>
    <value>true</value>
</property>

③修改job代码

conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
conf.addResource("yarn-site.xml");
conf.addResource("mapred-site.xml");
conf.set("mapreduce.job.jar","file:///xxx.jar");

InputForamt&OutputFormat

整体设计

![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YYplPnjf-1628160864381)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210804115918216.png)](https://img-blog.csdnimg.cn/dc1d87f1fba84b88810d7e7704714d8c.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQyMDc0OTQ5,size_16,color_FFFFFF,t_70)

InputFormat

该类是Hadoop提供的顶层抽象类,该类主要定制切片计算逻辑和切片数据的读取逻辑。

public abstract class InputFormat<K, V> {
    public InputFormat() {
    }
    //计算切片/ 数据拆分逻辑 区间
    public abstract List<InputSplit> getSplits(JobContext var1) 
        throws IOException, InterruptedException;
    //实现逻辑区间的读取逻辑,将读取的数据传递给Mapper
    public abstract RecordReader<K, V> createRecordReader(InputSplit var1, 
                                                          TaskAttemptContext var2) 
        throws IOException, InterruptedException;
}

在Hadoop的实现包中提供了InputFormat接口预实现主要有:

  • CompositeInputFormat - 主要实现在Map端实现大规模数据集的join
  • DBInputFormat - 主要提供针对RDBMS数据库的读取实现,主要针对Oracle和MySQL数据库。
  • FileInputFormat -主要针对分布式文件系统提供的预实现。

TextInputFormat

默认将文件按照128MB的大小为单位进行切割,切割的区间称为一个Split ,然后使用LineRecordReader对区间数据进行读取,该LineRecordReader会给Mapper提供每一行文本数据作为value,同时提供该value在文本行中的字节偏移量该偏移量是一个Long类型的参数,通常没有什么作用。

注意:默认FileInputFormat 的所有子类,在没有覆盖 getSplits方法的时候,默认计算的切片大小的区间(0,140.8MB]因为底层在计算文件切片的时候是通过(文件的大小/128MB > 1.1)?切分新块:不切分

代码参考上面案例。

NLineInputFormat

默认将文件按照N行对文件进行切割,切割的区间称为一个Split ,然后使用LineRecordReader对区间数据进行读取,该LineRecordReader会给Mapper提供每一行文本数据作为value,同时提供该value在文本行中的字节偏移量该偏移量是一个Long类型的参数,通常没有什么作用。

覆盖了FileInputFormat的getSplits,因此我们在使用NLineInputFormat的时候一般需要设置行数。

 NLineInputFormat.setNumLinesPerSplit(job,1000);
public class URLCountApplication extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
        //1.创建一个Job对象
        Configuration conf = getConf();

        Job job= Job.getInstance(conf,"URLCountApplication");

        //2.告诉job数据格式
        job.setInputFormatClass(NLineInputFormat.class);
        NLineInputFormat.setNumLinesPerSplit(job,1000);
        job.setOutputFormatClass(TextOutputFormat.class);

        //3.设置数据路径
        TextInputFormat.addInputPath(job,new Path("D:/data/click"));
        //系统自动创建,如果在执行前存在,则放弃执行
        TextOutputFormat.setOutputPath(job,new Path("D:/data/result"));

        //4.设置处理逻辑
        job.setMapperClass(URLMapper.class);
        job.setReducerClass(URLReducer.class);

        //5.设置输出的Key,value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6.提交job
        return job.waitForCompletion(true)?1:0;
    }
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new URLCountApplication(),args);
    }
}

KeyValueTextInputFormat

默认将文件按照128MB的大小为单位进行切割,切割的区间称为一个Split ,然后使用KeyValueLineRecordReader对区间数据进行读取,该KeyValueLineRecordReader会给Mapper提供key和value都是Text类型,该格式输入默认按照制表符\t进行分隔Key/Value如果没有正确拆分,会将整行作为key,value为null

conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
public class URLCountApplication extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
        //1.创建一个Job对象
        Configuration conf = getConf();
        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");

        Job job= Job.getInstance(conf,"AvgCostAplication");

        //2.告诉job数据格式
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        //3.设置数据路径
        TextInputFormat.addInputPath(job,new Path("file:///D:/data/keyvalue"));
        //系统自动创建,如果在执行前存在,则放弃执行
        TextOutputFormat.setOutputPath(job,new Path("file:///D:/data/result"));

        //4.设置处理逻辑
        job.setMapperClass(AvgCostMapper.class);
        job.setReducerClass(AvgCostReducer.class);

        //5.设置输出的Key,value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        //6.提交job
        return job.waitForCompletion(true)?1:0;
    }
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new URLCountApplication(),args);
    }
}

MultipleInputs

这是一个复合的输入格式,主要适用于将多个不同格式的InputFormat组合使用,要求Map段的输出格式还必须保持一致。

public class SumCostApplication extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
        //1.创建一个Job对象
        Configuration conf = getConf();
        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
        Job job= Job.getInstance(conf,"SumCostCountApplication");

        //2.告诉job数据格式
        job.setOutputFormatClass(TextOutputFormat.class);

        //3.设置数据路径
        TextOutputFormat.setOutputPath(job,new Path("file:///D:/data/result"));
        //4.设置处理逻辑
        MultipleInputs.addInputPath(job,new Path("file:///D:/data/mul/keyvalue"), KeyValueTextInputFormat.class,KeyVlaueCostMapper.class);
        MultipleInputs.addInputPath(job,new Path("file:///D:/data/mul/text"), TextInputFormat.class,TextCostMapper.class);
        job.setReducerClass(CostSumReducer.class);

        //5.设置输出的Key,value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        //6.提交job
        return job.waitForCompletion(true)?1:0;
    }
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new SumCostApplication(),args);
    }
}

CombineFileInputFormat

上述的所有FileInputFormat都是以文件为单位计算文件切片,也就意味着如果计算的目录下有很多小文件,就会导致第一 阶段的Map任务过多。因此默认FileInputFormat对小文件处理不是太友好,因此Hadoop提供了CombineFileInputFormat格式类,该类专门用于处理小文件场景下的切片计算,会将多个小文件对应同一个切片。但是要求这些小文件的格式必须一致。我们可以使用CombineTextInputFormat该类和TextInputFormat用法一致,不同的是在于切片的计算上。

public class URLCountApplication extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
        //1.创建一个Job对象
        Configuration conf = getConf();

        Job job= Job.getInstance(conf,"URLCountApplication");

        //2.告诉job数据格式
        job.setInputFormatClass(CombineTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        //3.设置数据路径
        CombineTextInputFormat.addInputPath(job,new Path("file:///D:/data/click"));
        //系统自动创建,如果在执行前存在,则放弃执行
        TextOutputFormat.setOutputPath(job,new Path("file:///D:/data/result"));

        //4.设置处理逻辑
        job.setMapperClass(URLMapper.class);
        job.setReducerClass(URLReducer.class);

        //5.设置输出的Key,value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6.提交job
        return job.waitForCompletion(true)?1:0;
    }
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new URLCountApplication(),args);
    }
}

DBInputFormat

主要负责读取RDBMS中的数据目前仅仅支持MySQL/Oracle数据库

public class UserDBWritable implements DBWritable {
    private Boolean sex;
    private Double salary;
    /**
     * DBOutputFormat使用的
     * @param statement
     * @throws SQLException
     */
    public void write(PreparedStatement statement) throws SQLException {

    }

    public void readFields(ResultSet resultSet) throws SQLException {
        this.sex=resultSet.getBoolean("sex");
        this.salary=resultSet.getDouble("salary");
    }

    public Boolean getSex() {
        return sex;
    }

    public void setSex(Boolean sex) {
        this.sex = sex;
    }

    public Double getSalary() {
        return salary;
    }

    public void setSalary(Double salary) {
        this.salary = salary;
    }
}
public class DBAvgSalaryApplication extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
        //1.创建一个Job对象
        Configuration conf = getConf();
        //设置并行度
        conf.setInt(MRJobConfig.NUM_MAPS,5);
        DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
                                    "jdbc:mysql://localhost:3306/test","root","123456");

        Job job= Job.getInstance(conf,"DBAvgSalaryApplication");

        //2.告诉job数据格式
        job.setInputFormatClass(DBInputFormat.class);

        String query="select sex,salary from t_user";
        String countQuery="select count(*) from t_user";
        DBInputFormat.setInput(job,UserDBWritable.class,query,countQuery);

        job.setOutputFormatClass(TextOutputFormat.class);

        //3.设置数据路径
        //系统自动创建,如果在执行前存在,则放弃执行
        TextOutputFormat.setOutputPath(job,new Path("D:/data/result"));

        //4.设置处理逻辑
        job.setMapperClass(UserAvgMapper.class);
        job.setReducerClass(UserAvgReducer.class);

        //5.设置输出的Key,value
        job.setMapOutputKeyClass(BooleanWritable.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        //6.提交job
        return job.waitForCompletion(true)?1:0;
    }
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new DBAvgSalaryApplication(),args);
    }
}

OutputFormat

该类是Hadoop提供的顶层抽象类,该类主要实现写的逻辑负责将Reduce端的输出写出到外围系统,同时也提供了输出检查(仅仅限于文件系统),负责返回Committer,确保系统能正常的输出。

public abstract class OutputFormat<K, V> {

  //创建RecordWriter
  public abstract RecordWriter<K, V> 
    getRecordWriter(TaskAttemptContext context
                    ) throws IOException, InterruptedException;

  //检查输出目录是否有效
  public abstract void checkOutputSpecs(JobContext context
                                        ) throws IOException, 
                                                 InterruptedException;

  //返回一个提交器
  public abstract 
  OutputCommitter getOutputCommitter(TaskAttemptContext context
                                     ) throws IOException, InterruptedException;
}


TextoutputFormat

将Reducer端的输出直接写入到文件系统中,其中在写入的时候会调用key、value的toString方法。

DBOutputFormat

将Reducer端的输出直接写入到数据库系统中。

public class URLCountDBWritable implements DBWritable {
    private String category;
    private Integer count;

    public URLCountDBWritable(String category, Integer count) {
        this.category = category;
        this.count = count;
    }

    public URLCountDBWritable() {
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }

    public Integer getCount() {
        return count;
    }

    public void setCount(Integer count) {
        this.count = count;
    }

    public void write(PreparedStatement statement) throws SQLException {
        statement.setString(1,category);
        statement.setInt(2,count);
    }

    public void readFields(ResultSet resultSet) throws SQLException {

    }
}
public class URLCountApplication extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
        //1.创建一个Job对象
        Configuration conf = getConf();
        DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
                "jdbc:mysql://localhost:3306/test",
                "root","123456");
        Job job= Job.getInstance(conf,"URLCountApplication");

        //2.告诉job数据格式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(DBOutputFormat.class);

        //3.设置数据路径
        TextInputFormat.addInputPath(job,new Path("file:///D:/data/click"));
        DBOutputFormat.setOutput(job,"url_click","url_category","url_count");

        //4.设置处理逻辑
        job.setMapperClass(URLMapper.class);
        job.setReducerClass(URLReducer.class);

        //5.设置输出的Key,value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(URLCountDBWritable.class);
        job.setOutputValueClass(NullWritable.class);

        //6.提交job
        return job.waitForCompletion(true)?1:0;
    }
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new URLCountApplication(),args);
    }
}

JedisOutputFormat

public class JedisOutputFormat extends OutputFormat<String,String> {
    public final static String JEDIS_HOST="jedis.host";
    public final static String JEDIS_PORT="jedis.port";

    public static void setOutput(Job job, String host, Integer port) {
        job.getConfiguration().set(JEDIS_HOST,host);
        job.getConfiguration().setInt(JEDIS_PORT,port);
    }

    public RecordWriter<String, String> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        Configuration config = context.getConfiguration();
        String host=config.get(JEDIS_HOST);
        Integer port=config.getInt(JEDIS_PORT,6379);
        return new JedisRecordWriter(host,port);
    }

    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {}

    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
                context);
    }
}
public class JedisRecordWriter  extends RecordWriter<String,String> {
    private Jedis jedis=null;

    public JedisRecordWriter(String host, Integer port) {
        jedis=new Jedis(host,port);
    }

    public void write(String key, String value) throws IOException, InterruptedException {
        jedis.set(key,value);
    }

    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        jedis.close();
    }
}

public class URLCountApplication extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
        //1.创建一个Job对象
        Configuration conf = getConf();

        Job job= Job.getInstance(conf,"URLCountApplication");

        //2.告诉job数据格式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(JedisOutputFormat.class);

        //3.设置数据路径
        TextInputFormat.addInputPath(job,new Path("file:///D:/data/click"));
        JedisOutputFormat.setOutput(job,"CentOS",6379);
        //4.设置处理逻辑
        job.setMapperClass(URLMapper.class);
        job.setReducerClass(URLReducer.class);

        //5.设置输出的Key,value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(String.class);
        job.setOutputValueClass(String.class);

        //6.提交job
        return job.waitForCompletion(true)?1:0;
    }
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new URLCountApplication(),args);
    }
}

依赖解决

  • 运行时依赖(Yarn Child依赖)

方案1

要求用户将依赖的jar包拷贝给所有的计算节点(NodeManager运行所在主机)

[root@CentOS ~]# hadoop jar  xxx.jar 入口类  -libjars 依赖jar包1,依赖jar包2,....

方案2

[root@CentOS ~]# hdfs dfs -mkdir /libs
[root@CentOS ~]# hdfs dfs -put mysql-connector-java-5.1.46.jar /libs
conf.setStrings("tmpjars","/libs/xxx1.jar,/libs/xxx2.jar,...");
  • 提交时依赖(client )

需要用户配置HADOOP_CLASSPATH环境变量(/root/.bashrc),通常这种依赖发生在切片计算阶段。

HADOOP_CLASSPATH=/root/mysql-connector-java-5.1.46.jar
export HADOOP_CLASSPATH
[root@CentOS ~]# source .bashrc 
[root@CentOS ~]# hadoop classpath #查看hadoop的类路径
/usr/hadoop-2.6.0/etc/hadoop:/usr/hadoop-2.6.0/share/hadoop/common/lib/*:/usr/hadoop-2.6.0/share/hadoop/common/*:/usr/hadoop-2.6.0/share/hadoop/hdfs:/usr/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/usr/hadoop-2.6.0/share/hadoop/hdfs/*:/usr/hadoop-2.6.0/share/hadoop/yarn/lib/*:/usr/hadoop-2.6.0/share/hadoop/yarn/*:/usr/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/usr/hadoop-2.6.0/share/hadoop/mapreduce/*:`/root/mysql-connector-java-5.1.46.jar`:/usr/hadoop-2.6.0/contrib/capacity-scheduler/*.jar

InputFormat/OuputFormat与Mapper和Reducer

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZaCb70iC-1628160864383)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210804172517404.png)]

![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xkrrjKCe-1628160864385)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210804172611934.png)](https://img-blog.csdnimg.cn/3e598436154c4b3e989069ad0e37caad.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQyMDc0OTQ5,size_16,color_FFFFFF,t_70)

MapReduce Shuffle

定义

MapReduce 中,mapper 阶段处理的数据如何传递给 reducer 阶段,是 MapReduce 框架中 最关键的一个流程,这个流程就叫 Shuffle。总体来说shuffle核心流程主要包括以下几个方面:数据分区、排序、局部聚合/Combiner、缓冲区、溢写、抓取/Fetch、归并排序等。

![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2FkjxkGI-1628160864387)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210308171221736.png)](https://img-blog.csdnimg.cn/1e6868f1d7504339a636edde45cbc12f.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQyMDc0OTQ5,size_16,color_FFFFFF,t_70)

常见问题

1、MapReduce能否实现全局排序?

默认情况下MapReduce是无法实现全局有序的,因为底层MapReduce使用的是HashPartitioner实现,仅仅只能保证数据分区内部的数据以key的自然顺序排列,因此无法实现全局有序。但是可以有一下思路完成全局排序:

  • 设置NumReduceTask的个数为1,这样会导致所有的数据落入到同一个分区,即可实现全排序,但是只适用于小批量数据集
  • 自定义分区策略让数据按照区间分区,不按照hash策略,此时只需要保证区间之间有序即可实现全局有序。但是这种做法会出现区间数据分布不均匀,导致计算过程中出现数据倾斜。
  • 使用Hadoop提供的TotalOrderPartitioner,先对目标进行采样,然后推算出分区区间。

参考:https://blog.csdn.net/lalaguozhe/article/details/9211919

2、如何干预MapReduce的分区策略?

一般来说在实际的开发中,很少去干预分区策略,因为基于大数据首先要考虑的是数据的均匀分布,防止数据倾斜。因此Hash的散列往往是最佳的选择,如果需要覆盖原有分区,可以调用:

job.setPartitionerClass(分区实现类信息即可)
public class CustomHashPartitioner<K, V> extends Partitioner<K, V> {

  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

3、如何解决在MapReduce计算过程中的数据倾斜问题(面试热点问题)?

场景:统计亚洲国家人口,以中国、日本为例。自然会使用国家作为key、公民信息作为value。在进行MapReduce计算的时候,中国的公民因为国籍都是China自然会落入到一个分区中。这样就出现数据严重倾斜。

![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SA3Hgtun-1628160864401)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210308171703780.png)](https://img-blog.csdnimg.cn/86355fa86b724058a5f664b6b666e07d.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQyMDc0OTQ5,size_16,color_FFFFFF,t_70)

4、Map、Reduce并行度是靠什么决定的?

Map端并行度是通过计算任务切片决定的,Reduce端是通过job.setNumReduceTask(n)

5、MapReduce调优策略

1)避免小文件计算,线下合并成大文件之后,在进行MapReduce分析或者CombineTextInputFormat。

2)调整环状缓冲区的参数,减少Map任务的IO操作,不能无限制调大,还要考虑到系统GC问题。

3)开启Map压缩,将溢写文件压缩成GZIP格式,减少ReduceShuffle过程中的网络带宽占用,消耗CPU为代价

//开启解压缩,必须真实的环境下运行
conf.setBoolean("mapreduce.map.output.compress",true);
conf.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);

4)如果条件允许,我们可以开启Map端预处理机制,提前在Map端执行Reduce逻辑,进行局部计算,这样既可极大提升计算性能,但是这种优化手段并不是所有场景都适合。例如:求均值,这种场景就不适合在Map端执行Reduce逻辑。

1)Combiner/Reduce的输入和输出类型必须一致,也就是说预计算逻辑不可以更改Map端输出类型/Reduce端输入类型。

2)不可以改变原有的业务逻辑,比如 求平均值,虽然类型兼容但是业务计算不正确。

![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-C3R2ZHQl-1628160864402)(assets/image-20200930142652073.png)][外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ebvFZep0-1628160864404)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210308173904104.png)](https://img-blog.csdnimg.cn/7291b1851ecd4951a11aa3eb5fdfaef5.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQyMDc0OTQ5,size_16,color_FFFFFF,t_70)

优点:减少Key数量,节省排序所占用内存空间,极大削减了在ReduceShuffle时候的数据下载量,节省带宽。

5)适当的调整NodeManager管理的资源数

yarn.nodemanager.resource.memory-mb=32G
yarn.nodemanager.resource.cpu-vcores = 16

或者开启硬件资源监测

yarn.nodemanager.resource.detect-hardware-capabilities=true

6)如果顺序执行多个小任务,我们可以考虑使用JVM重用机制,可以使用一个JVM顺序执行多个任务,无需重启新的jvm。

mapreduce.job.jvm.numtasks=2

Hadoop HA构建

概述

  • NameNode HA构建 存储、ResourceManager HA构建 计算

![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VPVAs1PE-1628160864405)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20210310102724193.png)](https://img-blog.csdnimg.cn/5ec5f801befc4b7692ab817131ad53b4.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQyMDc0OTQ5,size_16,color_FFFFFF,t_70)

准备工作

  • 安装三台CentOS-6.5 64 bit操作系统(完成JDK、SSH免密码认证、IP主机名映射、关闭防火墙等工作)

主机和服务启动映射表

主机服务
CentOSANameNode、zkfc、DataNode、JournalNode、Zookeeper、NodeManager
CentOSBNameNode、zkfc、DataNode、JournalNode、Zookeeper、NodeManager、ResourceManager
CentOSCDataNode、JournalNode、Zookeeper、NodeManager、ResourceManager

主机信息

主机名IP信息
CentOSA192.168.234.133
CentOSB192.168.234.134
CentOSC192.168.234.135

JDK安装和配置

[root@CentOSX ~]# rpm -ivh jdk-8u171-linux-x64.rpm
[root@CentOSX ~]# vi .bashrc
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.
export JAVA_HOME
export CLASSPATH
export PATH
[root@CentOSX ~]# source .bashrc

IP主机名映射

[root@CentOSX ~]# vi /etc/hosts

192.168.234.133 CentOSA
192.168.234.134 CentOSB
192.168.234.135 CentOSC

关闭防火墙

[root@CentOSX ~]# systemctl stop firewalld
[root@CentOSX ~]# systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
[root@CentOSX ~]# firewall-cmd --state
not running

SSH免密码认证

[root@CentOSX ~]# ssh-keygen -t rsa
[root@CentOSX ~]# ssh-copy-id CentOSA
[root@CentOSX ~]# ssh-copy-id CentOSB
[root@CentOSX ~]# ssh-copy-id CentOSC

Zookeeper

[root@CentOSX ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
[root@CentOSX ~]# mkdir /root/zkdata

[root@CentOSA ~]# echo 1 >> /root/zkdata/myid
[root@CentOSB ~]# echo 2 >> /root/zkdata/myid
[root@CentOSC ~]# echo 3 >> /root/zkdata/myid

[root@CentOSX ~]# touch /usr/zookeeper-3.4.6/conf/zoo.cfg
[root@CentOSX ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg
tickTime=2000
dataDir=/root/zkdata
clientPort=2181
initLimit=5
syncLimit=2
server.1=CentOSA:2887:3887
server.2=CentOSB:2887:3887
server.3=CentOSC:2887:3887

[root@CentOSX ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh start zoo.cfg
[root@CentOSX ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh status zoo.cfg
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: `follower|leader`
[root@CentOSX ~]# jps
5879 `QuorumPeerMain`
7423 Jps

搭建Hadoop 集群(HDFS)

解压并配置HADOOP_HOME

[root@CentOSX ~]# tar -zxf hadoop-2.9.2.tar.gz -C /usr/
[root@CentOSX ~]# vi .bashrc
HADOOP_HOME=/usr/hadoop-2.9.2
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export CLASSPATH
export PATH
export HADOOP_HOME
[root@CentOSX ~]# source .bashrc

配置core-site.xml vi /usr/hadoop-2.9.2/etc/hadoop/core-site.xml

<!--配置Namenode服务ID-->
<property>		
      <name>fs.defaultFS</name>		
      <value>hdfs://mycluster</value>	
</property>
<property>		
     <name>hadoop.tmp.dir</name>		
     <value>/usr/hadoop-2.9.2/hadoop-${user.name}</value>    
</property>
<property>		
     <name>fs.trash.interval</name>		
     <value>30</value>    
</property>
<!--配置机架脚本-->
<property>		
     <name>net.topology.script.file.name</name>		
     <value>/usr/hadoop-2.9.2/etc/hadoop/rack.sh</value>    
</property>
<!--配置ZK服务信息-->
<property>   
	<name>ha.zookeeper.quorum</name>
	<value>CentOSA:2181,CentOSB:2181,CentOSC:2181</value> 
</property>
<!--配置SSH秘钥位置-->
<property>
     <name>dfs.ha.fencing.methods</name>
     <value>sshfence</value>
</property>
<property>
     <name>dfs.ha.fencing.ssh.private-key-files</name>
     <value>/root/.ssh/id_rsa</value>
</property>

配置机架脚本

[root@CentOSX ~]# touch /usr/hadoop-2.9.2/etc/hadoop/rack.sh
[root@CentOSX ~]# chmod u+x /usr/hadoop-2.9.2/etc/hadoop/rack.sh
[root@CentOSX ~]# vi /usr/hadoop-2.9.2/etc/hadoop/rack.sh
while [ $# -gt 0 ] ; do
	  nodeArg=$1
	  exec</usr/hadoop-2.9.2/etc/hadoop/topology.data
	  result="" 
	  while read line ; do
		ar=( $line ) 
		if [ "${ar[0]}" = "$nodeArg" ] ; then
		  result="${ar[1]}"
		fi
	  done 
	  shift 
	  if [ -z "$result" ] ; then
		echo -n "/default-rack"
	  else
		echo -n "$result "
	  fi
done
[root@CentOSX ~]# touch /usr/hadoop-2.9.2/etc/hadoop/topology.data
[root@CentOSX ~]# vi /usr/hadoop-2.9.2/etc/hadoop/topology.data
192.168.234.133 /rack01
192.168.234.134 /rack01
192.168.234.135 /rack03

/usr/hadoop-2.9.2/etc/hadoop/rack.sh 192.168.234.133

配置hdfs-site.xml vi /usr/hadoop-2.9.2/etc/hadoop/hdfs-site.xml

<property>
	<name>dfs.replication</name>
	<value>3</value>
</property> 
<!--开启自动故障转移-->
<property>
	<name>dfs.ha.automatic-failover.enabled</name>
	<value>true</value>
</property>
<!--解释core-site.xml内容-->
<property>
	<name>dfs.nameservices</name>
	<value>mycluster</value>
</property>
<property>
	<name>dfs.ha.namenodes.mycluster</name>
	<value>nn1,nn2</value>
</property>
<property>
	<name>dfs.namenode.rpc-address.mycluster.nn1</name>
	<value>CentOSA:9000</value>
</property>
<property>
	 <name>dfs.namenode.rpc-address.mycluster.nn2</name>
	 <value>CentOSB:9000</value>
</property>
<!--配置日志服务器的信息-->
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://CentOSA:8485;CentOSB:8485;CentOSC:8485/mycluster</value>
</property>
<!--实现故障转切换的实现类-->
<property>
	<name>dfs.client.failover.proxy.provider.mycluster</name>
	<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

配置slaves vi /usr/hadoop-2.9.2/etc/hadoop/slaves

CentOSA
CentOSB
CentOSC

这里因为是CentOS7,需要我们额外安装一个小插件,psmic否则NameNode无法实现故障自动切换。

[root@CentOSX ~]# yum install -y psmisc 

启动HDFS(集群初始化启动)

[root@CentOSX ~]# hadoop-daemon.sh start journalnode (等待10s钟)
[root@CentOSA ~]# hdfs namenode -format
[root@CentOSA ~]# hadoop-daemon.sh start namenode
[root@CentOSB ~]# hdfs namenode -bootstrapStandby
[root@CentOSB ~]# hadoop-daemon.sh start namenode
#注册Namenode信息到zookeeper中,只需要在CentOSA或者B上任意一台执行一下指令
[root@CentOSA|B ~]# hdfs zkfc -formatZK
[root@CentOSA ~]# hadoop-daemon.sh start zkfc
[root@CentOSB ~]# hadoop-daemon.sh start zkfc
[root@CentOSX ~]# hadoop-daemon.sh start datanode

查看机架信息

[root@CentOSC ~]# hdfs dfsadmin -printTopology
Rack: /rack01
   192.168.73.131:50010 (CentOSA)
   192.168.73.132:50010 (CentOSB)

Rack: /rack03
   192.168.73.133:50010 (CentOSC)

Resource Manager搭建

yarn-site.xml vi /usr/hadoop-2.9.2/etc/hadoop/yarn-site.xml

<property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
</property>
<property>
    <name>yarn.resourcemanager.ha.enabled</name>
    <value>true</value>
</property>
<property>
    <name>yarn.resourcemanager.cluster-id</name>
    <value>cluster</value>
</property>
<property>
    <name>yarn.resourcemanager.ha.rm-ids</name>
    <value>rm1,rm2</value>
</property>
<property>
    <name>yarn.resourcemanager.hostname.rm1</name>
    <value>CentOSB</value>
</property>
<property>
    <name>yarn.resourcemanager.hostname.rm2</name>
    <value>CentOSC</value>
</property>
<property>
    <name>yarn.resourcemanager.zk-address</name>
    <value>CentOSA:2181,CentOSB:2181,CentOSC:2181</value>
</property>
<!--关闭物理内存检查-->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<!--关闭虚拟内存检查-->
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

mapred-site.xml mv /usr/hadoop-2.9.2/etc/hadoop/mapred-site.xml.template /usr/hadoop-2.9.2/etc/hadoop/mapred-site.xml

vi /usr/hadoop-2.9.2/etc/hadoop/mapred-site.xml

<property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
</property>

启动|关闭Yarn服务

[root@CentOSB ~]# yarn-daemon.sh start|stop resourcemanager
[root@CentOSC ~]# yarn-daemon.sh start|stop resourcemanager
[root@CentOSX ~]# yarn-daemon.sh start|stop nodemanger
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-06 09:53:50  更:2021-08-06 09:54:55 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 18:08:08-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码