1 概述
HDFS(Hadoop Distributed File System)是一个文件系统,用于存储文件,通过目录树来定位文件;随着数据量激增,单个操作系统无法对海量数据进行存储,因此将数据分散到多个系统中,而为了方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统,HDFS就是其中的一种。
它具有以下优点
- 适合海量数据处理:能够处理百万规模以上的文件数量,数据存储可达TB、PB级别
- 高容错性:数据保存有多个副本,当某一节点的数据丢失后可利用其他副本恢复。
- 成本低:可通过廉价机器组成节点
相应的也有缺点:
- 不适合数据的频繁修改和快速读取。仅支持数据的追加,无法进行随即修改。而且无法满足数据毫秒级的存取
- 不适合大量小文件的存储。每个文件都会在NameNode中占用相同的内存来存储文件目录和块信息,当小文件数量过多时会耗尽NameNode内存;并且小文件的寻址时间会超过读取时间,这也不是HDFS设计的初衷。
- 不支持并发写入,不允许多个线程同时写操作
1.1 组成结构
HDFS的组成结构如下所示:
- NameNode可以看作是整个HDFS系统的主管(Master),它主要负责管理HDFS的名称空间和数据块Block的映射信息,配置保存副本的策略,以及对客户端的读写请求进行响应。
- SecondaryNameNode是NameNode的辅助,分担NameNode工作量,比如定期合并FSimage和edits推送给NameNode。它并不能替代NameNode,在紧急情况下只是帮助恢复NameNode,并不能马上替换并提供服务。
- DataNode是数据操作的实际执行者(Slave),它主要负责实际存储数据块,完成数据块的读写操作
- Client客户端,可以在文件上传时进行切分为Block,与NameNode交互,获取文件位置,然后再与DataNode交互完成数据读写。此外Client还包括一些命令来管理HDFS,例如对NameNode进行格式化、对HDFS进行增删改查等。
HDFS中的文件在物理上是分块处理的,块(Block)的大小可以通过参数dfs.blocksize来规定,其默认值为128M,当文件大小超过128M后会被切分为多个Block。块大小规定的是数据存储的最大值而不是实际值,例如1KB文件在块中实际空间就是1KB而不是128M。
文件块大小的选取和硬盘的传输速度有关,研究表明寻址时间最好为传输时间的1%,若寻址时间为10ms,那么传输时间最好为1s,当前普通机械硬盘的传输速率为100MB/s,因此选取块大小为较为接近的128M;若使用固态硬盘传输速率大约为200MB/s,那么Block大小可以设为256M。因此文件块不能设置得太大。另一方面,如果文件块设置得过小,那么文件数量就会变多,寻址时间就会变大,也不利于数据读取。
1.2 HDFS数据的读写
如下所示为客户端向HDFS读取文件内容的过程
- 客户端通过Distributed FileSystem模块向NameNode请求上传文件,
- NameNode检查目标文件是否已存在,父目录是否存在,然后返回是否可以上传。
- 客户端查询第一个 Block上传到哪几个DataNode服务器上。
- NameNode通过一定策略选出3个数据存放的DataNode节点,dn1、dn2、dn3返回给客户端。
- 客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成
- dn1、dn2、dn3逐级应答客户端。
- 客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。
- 数据传输完成后客户端关闭资源和连接。
节点距离 在HDFS写数据的过程中,NameNode会选择距离待上传数据最近距离的DataNode接收数据。而节点距离是指两个节点到达最近的共同祖先的距离总和。
节点选择
如下所示,不同的数据节点n-0、n-1、n-2放在一个服务器机架r1上,多个机架一起构成了一个大的集群
- Hadoop一般会根据就近原则将第一个副本放在Client所处节点上
- 为了保证数据的可用性,第二个副本放在随机另一个机架的节点上,防止同一个机架上的节点都失效
- 同时为了兼顾数据的传输,第三个副本放在第二个副本相同机架的另一个节点上
读数据流程
如下所示为从HDFS中读取数据的流程
- 客户端通过DistributedFileSystem向NameNode请求下载文件,
- NameNode通过查询元数据,找到文件块所在的DataNode地址。
- 挑选一台DataNode(就近原则,然后随机)服务器,请求读取第一个数据块blk_1。
- DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。
- 同理,继续读取第二个数据块blk_2
- 客户端以Packet为单位接收,将收到的数据块在本地缓存,,待接收到所有的数据块后写入目标文件。
1.3 NameNode工作机制
FsImage
为了对数据的随机访问作出快速响应,NameNode将元数据信息加载到内存中,但是为了防止内存中的数据断电丢失,HDFS会在磁盘中保存一份元数据备份文件FsImage 。在NameNode格式化时会创建FsImage,之后每次启动会将其中的内容加载到内存。
在Hadoop的data目录下可以看到fsimage文件,但是镜像文件无法直接打开,可以通过oiv(offline image viewer)来转换成可以查看的格式,其语法为
hdfs oiv -p XML -i fsimage_0000000000000000025 -o ./fsimage.xml
可以看到fsimage.xml文件中保存着namenode元数据信息
<inode>
<id>16386</id>
<type>DIRECTORY</type>
<name>user</name>
<mtime>1512722284477</mtime>
<permission>atguigu:supergroup:rwxr-xr-x</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
Edits文件
为了保证内存中的元数据和FsImage中的一致性,通过Edits文件以追加操作的方式记录内存中元数据的变化,每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。这样,一旦NameNode节点断电,FsImage可以根据Edits中的记录进行更新,从而保证数据的一致性。
同理可以对Edits文件进行查看
hdfs oev -p XML -i edits_0000000000000000012-0000000000000000013 -o ./edits.xml
查看edits.xml,其中记录了对于数据的更改操作
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<DATA>
<TXID>130</TXID>
<LENGTH>0</LENGTH>
<INODEID>16407</INODEID>
<PATH>/hello7.txt</PATH>
<REPLICATION>2</REPLICATION>
<MTIME>1512943607866</MTIME>
<ATIME>1512943607866</ATIME>
<BLOCKSIZE>134217728</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1544295051_1</CLIENT_NAME>
<CLIENT_MACHINE>192.168.10.102</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
<USERNAME>atguigu</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>908eafd4-9aec-4288-96f1-e8011d181561</RPC_CLIENTID>
<RPC_CALLID>0</RPC_CALLID>
</DATA>
</RECORD>
文件更新过程
NameNode数据更新过程如下:
- 第一次启动NameNode格式化后,创建Fsimage和Edits文件。否则直接加载编辑日志和镜像文件到内存。
- 客户端对元数据进行增删改的请求。
- NameNode记录操作日志,更新滚动日志。
- NameNode在内存中对元数据进行增删改。
Edits文件会随着时间逐渐增大,因此需要定期更新FsImage并清空Edits,更新操作由SecondaryNameNode辅助完成,其过程如下:
- Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
- Secondary NameNode请求执行CheckPoint。
- NameNode滚动正在写的Edits日志。
- 将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
- Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
- 生成新的镜像文件fsimage.chkpoint。
- 拷贝fsimage.chkpoint到NameNode。
- NameNode将fsimage.chkpoint重新命名成fsimage。
CheckPoint的更新有定期和存满了两种情况,默认每隔一小时SecondaryNameNode会执行一次清理合并。另一方面,2nn默认如果操作次数满了一百万次也会触发清理,同时每一分钟会检查一次是否够了一百万次。
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600s</value>
</property>
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
<description>操作动作次数</description>
</property>
<property>
<name>dfs.namenode.checkpoint.check.period</name>
<value>60s</value>
<description> 1分钟检查一次操作次数</description>
</property>
1.4 DataNode
数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身和元数据.meta文件,其中包括数据块的长度,块数据的校验和,以及时间戳等信息。
DataNode启动后向NameNode注册,之后会周期性(6小时)地向NameNode上报所有的块信息。DataNode会每3秒一次进行心跳反应,心跳返回结果带有NameNode给该DataNode的命令,如复制块数据或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。
2 Shell命令
通过hdfs的命令行可以对文件进行简单的增删改查等操作
上传下载
hadoop fs -mkdir /document
hadoop fs -moveFromLocal ./word.txt /document
hadoop fs -put ./word.txt /document
hadoop fs -copyFromLocal ./word.txt /document
hadoop fs -get /document/word.txt ./localword.txt
hadoop fs -copyToLocal /document/word.txt ./localword.txt
hadoop fs -cp /document/word.txt /newdoc
hadoop fs -mv /document/word.txt /newdoc
删除
hadoop fs -rm -r /document
修改
hadoop fs -appendToFile new.txt /document/word.txt
hadoop fs -chmod 666 /document/script.sh
hadoop fs -setrep 3 /document/word.txt
查看
hadoop fs -ls /document
hadoop fs -du /document
52 156 /document/word.txt
hadoop fs -cat /document/word.txt
hadoop fs -tail /document/word.txt
3 API操作
如果在Windows环境下和Hadoop集群进行交互,还需要配置Windows的环境,首先需要下载相关的工具包winutils:https://github.com/cdarlint/winutils,选择相应的版本解压,然后配置到Windows系统环境变量HADOOP_HOME中
双击依赖包中的winutils.ext,如无报错代表环境变量配置正常。
在IDEA中创建一个Maven工程HdfsClientDemo,并导入相应的依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
在src/main/resources目录下,创建log4j的配置文件log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
操作HDFS
如下所示创建HdfsClient类用于操作Hadoop集群,使用方式大致分为三步,第一步通过FileSystem.get()获取文件系统fs,其中可以传入集群地址,集群配置和登录用户作为参数;第二部通过fs对象的方法完成相关操作;第三步关闭资源。
如下所示,首先使用mkdirs() 方法在hdfs系统上创建目录,然后使用copyFromLocalFile() 方法将本地的example.py上传到了hdfs的/code目录,并且通过configuration.set() 设置副本数为2
package hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class HdfsClient {
@Test
public void makeDirectory() throws URISyntaxException, IOException, InterruptedException {
URI uri = new URI("hdfs://hadoop102:8020");
Configuration configuration = new Configuration();
configuration.set("dfs.replication", "2");
FileSystem fs = FileSystem.get(uri, configuration, "tory");
fs.mkdirs(new Path("/code"));
fs.copyFromLocalFile(new Path("d:/temp/example.py"), new Path("/code"));
fs.close();
}
}
文件下载
通过如下方法从HDFS下载文件到本地,HDFS使用循环冗余校验码来保证数据正确性,如果开启文件校验,会另外下载.CRC校验文件
copyToLocalFile(false, new Path("/code/example.py"), new Path("d:/code.test.py"), true);
文件移动
在HDFS上进行文件移动和名称修改都可以通过rename()函数实现,第一个参数为原文件路径,第二个为新文件路径
fs.rename(new Path("/tmp/example.java"), new Path("/code/example.java"));
文件删除 通过delete()方法可以对文件进行删除,第一个为要删除的文件路径,第二个参数为是否级联删除,当删除某个不为空的文件夹是需要将其设为true
fs.delete(new Path("/tmp"), true);
集群设置
除了在代码中通过Configuration对象对Hadoop集群进行设置外,还可以通过配置文件对集群进行设置,例如在resource目录下新建hdfs-site.xml,在其中设置文件副本为1份,但上述代码执行结果依旧为2份副本,这是由于参数设置存在优先级: 代码中的设置 > resource配置文件 > Hadoop自定义配置xxx-site.xml > Hadoop默认文件xxx-default.xml 因此文件副本数是2,而不是resource中的1和默认文件的3
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
文件信息
如下所示,通过listFiles() 方法获取根目录下的所有文件夹,第二个参数true代表递归遍历。然后通过迭代器依次对其进行遍历,并且通过fileStatus对象获得具体的文件信息
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
if (fileStatus.isFile()) {
System.out.println("这是一个文件");
}else {
System.out.println("这是一个文件夹");
}
System.out.println("文件路径" + fileStatus.getPath());
System.out.println("执行权限:" + fileStatus.getPermission());
System.out.println("所属者:" + fileStatus.getOwner());
System.out.println("所属组:" + fileStatus.getGroup());
System.out.println("文件大小" + fileStatus.getLen());
System.out.println("修改时间:" + fileStatus.getModificationTime());
System.out.println("副本数:" + fileStatus.getReplication());
System.out.println("块大小" + fileStatus.getBlockSize());
System.out.println("文件名" + fileStatus.getPath().getName());
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println("文件副本块位置:" + Arrays.toString(blockLocations));
}
|