第3章: Hadoop分布式文件系统
? 如果文件太大,那么一台机器肯定存不下,所以需要进行分块存储到不同的机器上。这就需要用到网络通信,同时保证文件不丢失。
? Hadoop的HDFS则实现了分布式存储。
? 本章具体介绍HDFS,以及其他的存储系统(本地文件系统、Amazon S3系统)
3.1 HDFS的设计
? HDFS以流数据访问模式来存储超大文件,运行于商业硬件集群上
? 下面具体解释上述句子中的各个词语的含义
(1)超大文件: MB,GB,TB甚至PB级别的文件。Hadoop都可以存储
(2)流式数据访问:
? 其基本思路为:一次写入、多次读取。所以读取更加频繁,所以读取的速度直接影响用户的体验。
(3)商用硬件:
? Hadoop并不需要运行在昂贵的硬件上,只需要运行在商用硬件(普通硬件)上。
? 这因为如此,故障率会很高,所以需要容错机制,让用户感觉不到故障。
(4)非低时间延迟的数据访问:
? HDFS不适合低延迟访问,无法在短时间(几十毫秒)内完成访问
? 对于低延迟访问,HBase是一个更好的选择
(5)大量的小文件:
? namenode的花名册记录了该集群所有分块文件的信息。
? 因此集群中分块的数量由namenode文件系统的容量决定。
? 如果有100万个数据库,那么namenode至少需要300MB内存,如果是数十亿,那么就无法完成了。
(6)不支持多用户写入,任意修改文件
? HDFS只允许单用户写入
? HDFS只允许添加,不允许修改。
3.2 HDFS的概念
3.2.1 数据块
? 在操作系统中,每个磁盘都会有默认的块大小,是磁盘写入(读取)数据的最小单位。
? HDFS也有块的概念,但是大得多,默认为128MB。将一个大文件分为若干块(chunk),作为独立的存储单元。
? 如果这个文件小于128MB,那么他是不会占用128MB的空间,只会是原来的大小。(如一个1MB的文件存储在128MB的块中,也是1MB)
? 使用块的好处一:任意的大文件都可以拆分成多个块,然后存储到不同的磁盘上。
? 使用块的好处二:使用固定大小的块,简化系统的设计,更容易管理和存储。
? 使用块的好处三:固定的块适合于备份提高容错能力。一般来说同一个块会存在3台不同的机器上
? 查看块信息的命令% hdfs fsck / -files -blocks
3.2.2 namenode 和 datanode
-
HDFS集群有两类节点,一个namenode(管理节点),多个datanode(工作节点) -
namenode:管理文件系统的命名空间。 维护文件系统树,该树内的所有文件和目录。 -
namenode的系统信息以2个文件的形式保存,分别是(1)命名空间镜像文件(2)编辑日志文件 -
namenode记录每个文件的块信息,但并不永久保存,系统重新启动后就会重新构建。 -
namenode十分重要,如果namenode损坏,那么文件系统的所有文件将丢失。 -
客户端并不需要知道namenode是哪一个,根据接口就可以执行访问系统 -
datanode:文件系统的工作节点,根据需要存储和检索数据块,并定期向namenode发送当前节点的块信息。 -
Hadoop为namenode提供两种容错机制,防止namenode损坏。 (1)备份namenode,在多个文件系统中保存元信息。 (2)辅助namenode(secondaryNameNode),该辅助定期合并编辑日志与命名空间镜像两个文件,防止编辑日志过大。 ? 该辅助一般在单独的计算机上运行。该辅助会保存命名空间的副本,并在namenode发送故障时启用。 ? 但是,辅助的状态落后于namenode,所以namenode失效后难免会丢失部分数据。将远程挂载的网络文件系统(NFS)上的namenode信息复制到辅助namenode(secondaryNameNode)作为新的namenode运行 (3)使用高可用集群,在3.2.5 有详细的讨论
3.2.3 块缓存
? 通常,datanode都是从磁盘读取文件
? 但是如果某文件访问频繁,则可以被缓存在datanode的内存中,以堆外存缓存的形式存在。
? 通常情况,一个块只会缓存在一个datanode内存中,也可以通过配置文件修改缓存在datanode的数量。
? MapReduce、Spark等 可以通过缓存块,来提高运行速度。比如连接(join)就是一个很好的候选
? 用户通过缓存池(cache poole) 增加一个cache directive 来告诉namenode需要缓存哪些文件,以及缓存的时间。
3.2.4 联邦HDFS
? namenode的内存中保存着元数据,这个数据指明系统中的文件以及对于的块。
? 这也意味着namenode的内存大小限制了集群的数量。
? Hadoop2.x引入了联邦HDFS,将多个namenode组成一个联邦,联邦中的每个namenode管理命名空间的一部分,比如一个namenode可能管理/user下的所有文件,另一个namenode可能管理/share下的所有文件。
? 联邦中的每个namenode维护一个命名空间卷(namespace volume)。命名空间卷之间相互独立,互不通信。
? 每个命名空间卷由两部分组成(1)命名空间的元数据(2)一个数据块池(block pool)
? 数据池块不再切分,集群中的datanode需要注册到每个namenode,并且存储着来自多个数据池块中的数据块。换句话说,一个datanode由多个namenode管着,同样一个namenode管理多个datanode。
? 若要访问联邦HDFS集群,则需要使用客户端挂载数据将文件路径映射到namenode,通过ViewFileSystem和viewfs://URI 进行配置和管理
3.2.5 HDFS的高可用性
高可用HA之前存在的问题
? 通过备份namenode元数据和通过备用secondaryNameNode依然无法实现系统的高可用。
? 在上述情况,如果namenode失效了,那么所有的客户端,MapReduce都将停止。必须要重新配置一个namenode,再启动才行。该过程称为冷启动
? 重新配置namenode(冷启动)需要的过程(1)将命名空间导入内存(2)重新执行编辑日志(3)接收多个来自datanode的数据块报告,并退出安全模式。
HA高可用
? 针对上述问题,Hadoop2增加了对HDFS的高可用性(HA)。
? 在高可用中配置了active(活动)和standby(备用)两个namenode。当active的namenode失效,standby的namenode会立马接管。 这过程用户无法察觉到明显的中断。
HA在架构上的修改
? (1)namenode之间的编辑日志时共享的,所以当standby的namenode接管后,直接读取共享的编辑日志,以实现namenode的状态同步。
? (2)datanaode需要同时向两个namenode发送块信息。因为块信息存储在namenode的内存中
? (3)客户端要使用特定的方式处理namenode失效问题,且该方式对用户是透明的
? (4)standby的namenode同样包含辅助namenode(SecondaryNameNode)
? (5)standy的namenode为active的namenode进行定期检查命名空间。
? 对于上述点(1)编辑日志的共享存储,可以有两种选择。一种是NFS过滤器,另一种是日志管理器(QJM)。QJM是用HDFS实现,并且专门为共享编辑日志而设计的。
? 对于QJM,以一组日志节点(journal)的形式运行,每次编辑都写入多个日志节点,这样就可以防止数据彻底丢失。QJM的实现没有使用Zookeeper,但是工作方式与Zookeeper类似。
备用namenode为什么快速接管
? 活动(active)的namenode失效后,备用(standby)的namenode能够在几十秒内接管任务。
? 因为最小的状态存储在内存中:包括最新的编辑日志条目和最新的数据块映射信息。
? namenode失效后,备用并不会立马顶上,因为系统要确保namenode是否真的失效了。
? 若活动(active)的namenode和备用(standby)的namenode都失效后,依然可以声明一个namenode进行冷启动。
故障切换与规避
? 系统中有一个新实体,称为故障转移控制器(failover controller),管理着将活动namenode转移为备用namenode的过程
? 有多种故障转移控制器,但默认的是使用Zookeeper来确保有且仅有一个活动的namenode。
? 每个namenode都运行一个轻量级故障转移器,监视宿主namenode是否失效,并在失效的时候进行故障切换。
? 管理员可以手动发起故障转移,称之为”平稳的故障“,来致使namenode有序地切换角色。
当系统误以为namenode失效
? 如果当前网速非常慢,导致系统误以为namenode已经停止工作,从而引起故障转移。但之前的namenode并没有停止工作,高可用用规避的方式来确保运行的namenode不会危害系统。
? 同一时间QJM只允许一个namenode向编辑日志写入数据。为了防止之前活动的namenode继续运行,可以使用SSH杀死namenode的进程。
? 客户端的故障转移可以通过客户端类库实现透明处理。通过配置文件等信息来实现故障转移的控制。
3.3 命令行接口
? 通过命令行接口进一步认识HDFS
配置伪分布式配置文件
? (1)属性一:fs.defaultFS 设置为 hdfs://localhost/
? 用户设置Hadoop默认文件系统。 HDFS的守护进程通过该属性确定namenode所在的位置和端口。
? (2)属性二:dfs.replication 设置为1
? Hadoop每一块的副本个数,默认为3。即一个数据块会被存储到3个datanode上。因为是伪分布式,所以只需要设置为1即可。
文件系统的基本操作
? 基本操作包括读取文件、新建目录、移动文件、删除数据、列出目录等。
? 通过hadoop fs -help 查看每个命令的详细帮助文档
? (1)从本地文件复制到HDFS
hadoop fs -copyFromLocal 本地路径 \ hdfs路径
hadoop fs -copyFromLocal input/docs/quangle.txt \ hdfs://localhost/user/tom/quangle.txt
? 因为在core-site.xml中已经指定了hdfs://localhost 所以可以省略为
hadoop fs -copyFromLocal input/docs/quangle.txt quangle.txt
? 在hdfs上新建一个目录
hadoop fs -mkdir books
? 在hdfs上查看当前文件夹里的文件
hadoop fs -ls .
3.4 文件系统
? Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现。
? Java中定义抽象类 org.apache.hadoop.fs.FileSystem 定义了Hadoop文件系统客户端的接口,并且该抽象类有几个具体的实现。
文件系统 | URI方案 | Java实现(都在org.apache.hadoop包中) | 描述 |
---|
Local | file | fs.LocalFileSystem | 本地文件系统 | HDFS | hdfs | hdfs.DistributedFileSystem | HDFS分布式文件系统 | WebHDFS | Webhdfs | hdfs.web.WebHdfsFileSystem | 基于Http的文件系统 | SecureWebHDFS | swebhdfs | fs.HarFileSystem | | HAR | har | fs.HarFileSystem | | View | viewfs | viewfs.ViewFileSystem | | FTP | ftp | fs.ftp.FTPFileSystem | | S3 | S3a | fs.s3a.S3AFileSystem | | Azure | wasb | fs.azure.NativeAzureFileSystem | | Swift | swift | fs.swift.snative.SwiftNativeFileSystem | |
列出本地文件系统根目录下的文件,可以使用命令
hadoop fs -ls file:///
接口
? Hadoop是用Java写的,可以通过JavaAPI调用Hadoop文件系统的操作。
? 文件系统的命令就是一个Java应用。
? Java使用FileSystem类来提供文件系统操作
? 除了使用Java外,还可以使用其他接口来访问文件系统,接下来具体介绍其他接口。
1、 HTTP
? 由WebHDFS协议提供了HTTP REST API来实现与HDFS进行交互。
? 由于HTTP接口比原生Java要慢,不到万不得已不要用来传输大文件。
? 通过HTTP访问HDFS有两种方法
? (1)直接访问
? (2)通过代理进行访问
? 两种方法都使用了WebHDFS协议
? 对于第(1)种方法,文件元数据由namenode管理,文件读写会首先发往namenode,由namenode重定向到datanode,再写入datanode
? 对于第(2)种方法,由于使用了代理,所以不直接访问namenode和datanode。使用代理可以用防火墙和带宽限制等策略,从而更加的安全。
2、C语言
? hadoop提供了一个名为libhdfs的C语言类库。
? 这个C语言API与Java的API非常类似,但开发滞后于Java API,因此有一些新的特性还不支持。
? 可以通过include头文件hdfs.h
3、NFS
? 使用Hadoop的NFSv3网关将HDFS挂载为本地客户端的文件系统。
? 然后就可以使用Unix的程序(ls或者cat)与文件系统进行交互
4、FUSE
? 用户空间文件系统(FUSE,Filesystem in Userspace)允许用户空间实现的文件系统作为Unix文件系统进行基础。
3.5 Java接口
? 深入探索Hadoop的Filesystem抽象类。主要聚焦于HDFS实例,即DistributedFileSystem实现鳄梨。
3.5.1 从Hadoop URL读取数据
? 从Hadoop文件系统读取文件。使用java.net.URL对象打开数据流
InputStream in = null;
try{
in = new URL("hdfs://host/path").openStream();
}finally{
IOUtils.closeStream(in);
}
用Java实现类似于Linux中的cat命令
public class URLCat{
static{
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws Exception{
InputStream in = null;
try{
in = new URL(args[0]).openStream();
IOUtils.copyBytes(in,System.out,4096,false);
}finally{
IOUtils.closeStream(in);
}
}
}
3.5.2 通过FileSystem API读取数据
使用FileSystem以标准输出格式显示Hadoop文件系统中的文件
public class FileSystemCat{
public static void main(String[] args) throws Exception{
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),conf);
InputStream in = null;
try{
in = fs.open(new Path(uri));
IOUtils.copyBytes(in,System.out,4096,false);
}finally{
IOUtils.closeStream(in);
}
}
}
FSDataImputStream对象
? FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io类对象。 这个类继承了java.io.DataInputStream的一个特殊类,并支持随机访问。
public class FileSystemDoubleCat{
public static void main(String[] args) throws Exception{
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),conf);
FSDataInputStream in = null;
try{
in = fs.open(new Path(uri));
IOUtils.copyBytes(in,System.out,4096,false);
in.seek(0);
IOUtils.copyBytes(in,System.out.4096,false);
}finally{
IOUtils.closeStream(in);
}
}
}
3.5.3 写入数据
将本地文件复制到Hadoop文件系统
public class FileCopyWithProgress{
public static void main(String[] args) throws Exception{
String localStr = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst),conf);
OutputStream out = fs.create(new Path(dst), new Progressable(){
public void progress(){
System.out.println(".")
}
});
IOUtils.copyBytes(in,out,4096,true);
}
}
3.5.4 目录
? Filesystem实例提供了创建目录的方法
public boolean mkdirs(Path f) throws IOException
3.5.5 查询文件系统
展示文件状态信息
public class ShowFileStatusTest{
private MiniDFSCluster cluster;
private FileSystem fs;
@Before
public void setUp throws IOException{
Configuration conf = new Configuration();
if(System.getProperty("test.build.data") == null){
System.setProperty("test.build.data","/tmp");
}
cluster = new MiniDFSCluster.Builder(conf).build();
fs = cluster.getFileSystem();
OutputStream out = fs.create(new Path("/dir/file"));
out.write("content".getBytes("UTF-8"));
out.close;
}
@After
public void tearDown() throws IOException{
if (fs !=null){
fs.close();
}
if (cluster != null){
cluster.shutdown();
}
}
@Test
public void fileStatusForfile() throws IOException{
Path file = new Path("/dir/file");
FileStatus stat = fs.getFileStatus(file);
assertThat(stat.getPath().toUri().getPath(),is("/dir/file"))
}
}
3.6 数据流
3.6.1 剖析文件读取
? 本章具体介绍了HDFS、namenode和datanode之间的数据流是什么样的。
? (1)客户端通过调用FileSystem对象的open()方法打开文件,该对象HDFS的具体实现类为DistributedFileSystem
? (2)DistributedFileSystem通过远程过程调用(RPC)来调用namenode
? (3)namenode返回该文件所有块的datanode地址,因为一个块有多个副本,所以其返回的规则是就近原则(返回距离近的datanode)。
? (4)如果namenode本身也是datanode,就将其自身返回
? (5)返回的datanode地址被封装进FSDataInputStream对象,再封装进DFSInputStream,返回给客户端。
? (6)客户端接收到第一个datanode的地址后,调用read()方法读取具体信息。读取完毕后,再请求下一个datanode地址,循环至全部读取完毕。
? (7)如果客户端在与datanode连接出现错误,则会去连接另一个最近邻的datanode,并且将之前故障的datanode记录,将损坏记录告诉datanode。
注! 正是因为namenode只告诉客户端datanode地址,而不是具体的内容,才能保证namenode能够接收更多客户端的连接。
3.6.2 剖析文件写入
? (1)客户端通过对DistributedFileSystem对象调用create()方法来创建文件。
? (2)DistributedFileSystem通过RPC调用,申请在namenode创建一个新的文件。
? (3)namenode会执行这种检查,包括文件是否存在,客户端是否有权限。检查通过后则创建一条成功的记录,否则就抛出IOException异常。
? (4)DistributedFileSystem给客户端返回一个FSDataOutputStream对象,客户端开始写入数据
? (5)同样,FSDataOutputStream封装进DFSOutputStream对象,该对象负责datanode和namenode的通信。
? (6)DFSOutputStream将数据分成一个一个数据包,并放入队列中。
? (7)DataStreamer处理数据队列,挑选适合存储的一组datanode(组内的datanode数量由副本数决定)。
? (8)若副本数设置为3,则会先写入第1个datanode,然后第1个datanode发送给第2个datanode,第2个datanode发送给第3个datanode。
? (9)一组中的每个datanode都会将写入成功的结果返回给客户端(DFSOutputStream)
? (10)若写入的时候发生故障,首先关闭管线,把还没有写入的数据包都返回给正常datanode指定的新标识中,然后告诉namenode发送故障的地点,namenode则删除故障节点的残余数据。删除后,正常的datanode沿着另一条管线继续走。
? (11)当设置dfs.namenode.replication.min=1时,只要写入1个datnode就算写入成功,剩下的两个则由namenode自己复制。
? (12)写入完成后,调用close()方法。该操作将剩余的所有数据写入datanode管线,并告知namenode这些datanode的地址。namenode等最小量(min)写入成功后,就由namenode进行复制。
3.6.3 一致模型
? 一致模型保证文件读和写的可见性。
? 但是在HDFS中为了性能,牺牲了一部分一致性模型。
? 在HDFS中写入的文件,并非立马就能看见。
? 所以,HDFS提供了刷缓存(hflush)的方法,缓存刷新后,写入的数据就可以看见了。
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.hflush();
assertThat(fs.getFileStatus(p).getLen(),is(((long) "content".length())));
? hflush()不保证datanode在磁盘中,而是保证在datanode内存中。可以使用hsync()替代。
FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush();
out.getFD().sync();
assertThat(localFile.length(),is(((long) "content".length())));
3.7 通过distcp进行复制
? Hadoop自带的应用程序distcp。
? distcp程序可以并行从Hadoop文件系统复制大量数据,也可以将数据复制到Hadoop中。
复制文件
% hadoop distcp file1 file2
复制目录
% hadoop distcp dir1 dir2
更新目录
%hadoop distcp -update dir1 dir2
distcp是一个MapReduce程序,该复制作业是通过集群并行的map完成,并没有reducer。
保持HDFS集群的均衡
? 向HDFS复制数据时,考虑集群的均衡性也是相当重要的。
? 如果只由一个map来执行复制作业,那么一个map会把单一的节点磁盘给塞满。就无法达到均衡。
? 所以设定多个map可以避免这个问题,默认是使用20个map来执行distcp命令。
|