文件的读取流程
我们一般使用FileSystem来打开HDFS中的一个文件,使用open()方法。
对于HDFS来说,FileSystem是由其子类DistributeFileSystem的一个实例来完成的:
我们可以进入其open()方法中:
首先,DistributeFileSystem对象会对当前的读线程进行计数,这就跳过哈。然后就会将输入的路径进行转化一下,变成绝对路径,然后,会通过一个RPC远程服务调用和NameNode进行通讯,NameNode会返回文件块的起始位置,还有包含当前文件块的副本的位置DataNode地址。
客户端调用open()方法后,会返回一个FSDataInputStream对象供用户读取数据,而FSDataInputStream会在一个FileSystemLinkResolver中封装成DFSInputStream对象,而这就是关键的对象,它管理着HDFS的IO流程。
之后,在demo代码中:
Hadoop提供的IOUtils.copyBytes() 方法会去调用输入流中的read()方法,获得数据
实际调用的read()方法:
在readWithStrategy 中,会调用blockSeekTo() 方法,会去寻找当前需要访问的DataNode节点信息。
我们进入到blockSeekTo() 方法中:
Open a DataInputStream to a DataNode so that it can be read from.
We get block ID and the IDs of the destinations at startup, from the namenode.
会去开启一个连接到DataNode的DataInputStream流,在启动的时候已经从NameNode中得到block 的ID以及每个终点的起始位置。
启动时从NameNode中获取到的元数据是存放在LocatedBlock对象 中的:
接着会根据偏移量来得到一个目标DataNode,具体的方法为getBlockAt():
经过这个方法,我们就可以得到目标DataNode:这里和我的集群192.168.10.102对应上了!
建立连接之后,就可以读取到对应的数据了,但是他不是仅仅读取一次就可以了,当数据块的个数比较多的时候,当前的数据块读取完毕后会关闭和当前DataNode的连接,然后会去寻找下一块最佳的DataNode,进行类似的操作,直到所有的数据都读取完毕。
读取完一个DataNode的数据后关闭该DataNode的连接,然后和下一个DataNode建立连接的过程是对客户透明的,在客户看来,他就是在从他持有的DFSInputStream对象中不断的读取数据,是一个连续不间断的流。
以上就是从HDFS中打开一个数据输入流的过程。
最佳DataNode选择
我们知道,HDFS会将数据生成副本,对于同一个数据块(block),会生成不同的副本放在不同的DataNode上,那到底选择哪一个DataNode来作为实际选择的数据读取对象呢?
在HDFS中,会有一个chooseDataNode()的方法,会从nodes的列表里选择一个最佳的node作为目标DataNode。
DatanodeInfo 对象是对DataNode信息进行了封装,我们可以进去看看:
这个里面会有很多DataNode的信息,比如容量以及地址等信息。
在HDFS的客户端中,最佳node的选择很简单:
if (nodes != null) {
for (int i = 0; i < nodes.length; i++) {
if (!deadNodes.containsKey(nodes[i])
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
chosenNode = nodes[i];
if (storageTypes != null && i < storageTypes.length) {
storageType = storageTypes[i];
}
break;
}
}
}
直接从NameNode返回的DataNode列表中选择第一符合要求的即可,在代码有几个需要注意的变量:
在读取数据时,DFSInputStream会不断确认DataNode的状态,一旦与DataNode通信发生异常,会将其放置在对应的列表中,dead、ignored,确保下次不会将其选中;然后会从下一个最近的DataNode继续读取,并通知NameNode。
- deadNodes——datanodes that already dedad(not working)
hadoop3.1.3版本中:(代码贡献点啊,老铁们,嘻嘻嘻)——开源代码贡献者指日可待,嘻嘻嘻
- ignoredNodes——datanodes that we don’t care.
??为啥选择第一个满足条件的呢
因为NameNode返回的DataNode是按照优先级排好序的,我们只要选择第一个正常工作的DataNode建立即可,响应的代价最小。
为chosenNode赋值之后,就可以和该DataNode建立socket网络连接了。
|