分布式文件系统的操作涉及 master(namenode),slave(datanode),client之间的各种调用连接,HDFS将这些rpc通信抽象成不同接口
- Hadoop RPC:
ClientProtocal :client和namenode ClientDatanodeProtocal: client和datanode DatanodeProtocal: datanode和namenode InterDatanodeProtocal:datanode和datanode
1.ClientProtocal
client发起namenode响应 主要分成几个部分
- HDFS文件读操作
- HDFS文件写操作
- HDFS命名空间操作
1.1 HDFS文件读操作
- getBlockLocations方法,得到块的位置信息
- reportBadBlocks方法,上报错误的block
这是一个hdfs的拷贝文件的常规api操作,把hdfs根路径下的kaka.txt拷贝到本地D盘下,得到块的位置信息这步肯定发生在copyToLocalFile这步
public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException{
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "red");
fs.copyToLocalFile(false, new Path("/kaka.txt"), new Path("d:/kaka.txt"), true);
fs.close();
}
ClientProtocal 接口里面的getBlockLocations方法,返回的LocatedBlocks封装了List,也就是namenode按照block位置远近排好序放进list的。
public LocatedBlocks getBlockLocations(String src,
long offset,
long length)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException;
顺着api的方法,Debug进入copyToLocalFile,一步一步,终于看到取得块信息的方法
最终指向namenode包FSNamesystem类的getBlockLocationsInt方法(这里面Int是internal的意思):srcArg是拷贝文件的名字(文件夹/文件),iip是这个路径下的节点,LocatedBlocks是通过inode.getBlocks得到的
private GetBlockLocationsResult getBlockLocationsInt(
final String srcArg, long offset, long length, boolean needBlockToken)
throws IOException {
String src = srcArg;
FSPermissionChecker pc = getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath(src, true);
final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
if (isPermissionEnabled) {
dir.checkPathAccess(pc, iip, FsAction.READ);
checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
}
final long fileSize = iip.isSnapshot()
? inode.computeFileSize(iip.getPathSnapshotId())
: inode.computeFileSizeNotIncludingLastUcBlock();
boolean isUc = inode.isUnderConstruction();
if (iip.isSnapshot()) {
length = Math.min(length, fileSize - offset);
isUc = false;
}
final FileEncryptionInfo feInfo =
FSDirectory.isReservedRawName(srcArg) ? null
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
final LocatedBlocks blocks = blockManager.createLocatedBlocks(
inode.getBlocks(iip.getPathSnapshotId()), fileSize,
isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
cacheManager.setCachedLocations(lb);
}
final long now = now();
boolean updateAccessTime = isAccessTimeSupported() && !isInSafeMode()
&& !iip.isSnapshot()
&& now > inode.getAccessTime() + getAccessTimePrecision();
return new GetBlockLocationsResult(updateAccessTime ? iip : null, blocks);
}
同理reportBadBlocks方法实现了客户端在读取block时候发现数据块校验和不正确就会去汇报
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
1.2 HDFS文件写操作
- create()
客户端写一个新文件的后,用这个方法创建新的空文件 - append()
打开一个已有文件,如果文件最后一个数据块没有写满就返回locatedBlock继续追加,否则就addBlock() - addBlock()
通过这个方法给文件添加新的块 - complete()
客户端完成文件写入之后通知namenode
以下都是在写入block发生错误时候调用的方法,目前不做具体深入
- abandonBlock()
- getAdditionalDatanodes()
- upDtateBlocksForPipeLine()
- updatePipeline()
1.3 HDFS文件命名空间操作
对应了客户端对hdfs文件的删改查,获取属性,修改副本数
比如想把根目录的test.txt复制系数改下
fs.setReplication(new Path("/test.txt"), (short) 4);
最终调用的是namenode包下FSDirAttrOp类的setReplication方法
2.ClientDatanodeProtocal
client和datanode之间的通信
- getReplicaVisibleLength() 返回数据块的真实长度,因为hdfs文件最后一个块的长度和namenode元数据可能不一样。client创建输入流的时候需要知道这个datanode的的长度
- getBlockLocalPathInfo() HDFS本地读取,如果client和datanode在同一个机器,会调用这个方法执行本地读取
3.DatanodeProtocal
datanode和namenode通信,这包含了hdfs主从所有的互动逻辑,主要有三个方面,一个是心跳机制,一个是启动机制,一个是数据块读写
3.1 心跳机制
分布式一般用心跳来检测从节点健康度,默认datanode 3 秒汇报一次
- sendHeartbeat() datanode向namenode汇报,并得到HeartbeatResponse类返回,里面包含了DatanodeCommand[],也就是namenode对datanode的命令
3.2 启动机制
启动后datanode和namenode沟通流程
step1 versionRequest() datanode和从节点握手,得到NamespaceInfo对象(版本号,集群id etc…),核对版本
step2 registerDatanode() datanode向namenode注册信息
step3 blockReport() datanode向namenode报告自己的块信息 启动时候一次,间隔时间一次(默认六小时)
step4 cacheReport() datanode向namenode报告自己的块缓存信息
3.3 数据块读写
reportBadBlocks() datanode定期扫描数据块向namenode汇报校验码错误的数据块/数据流管道写数据datanaode发现接受的数据块校验码错误/进行数据块复制时候发现副本长度和namenode记录长度不同,namenode收到错误副本的位置后,会下发删除指令
4. InterDatanodeProtocal
租约相关,这里暂时不展开 s() datanode定期扫描数据块向namenode汇报校验码错误的数据块/数据流管道写数据datanaode发现接受的数据块校验码错误/进行数据块复制时候发现副本长度和namenode记录长度不同,namenode收到错误副本的位置后,会下发删除指令
4. InterDatanodeProtocal
租约相关,这里暂时不展开
|