fsck命令分析
HDFS支持fsck命令用以检查各种不一致。fsck用以报告各种文件问题,如block丢失或缺少block等。fack命令用法如下:
$HADOOP_HOME/bin/hdfs fsck [-move | -delete | -openforwrite] [-files [-blocks [-locations | -racks]]]
<path> 检查的起始目录
-move 将损坏的文件移动到/lost+found下面
-delete 删除损坏的文件
-openforwrite 打印出正在写的文件
-files 打印出所有被检查的文件
-blocks 打印出block报告
-locations 打印出每个block的位置
-racks 打印出datanode的网络拓扑结构
默认情况下,fsck会忽略正在写的文件,使用-openforwrite选项可以汇报这种文件。
官网对fsck命令的介绍如下:
一个fsck命令运行的结果示例如下:
[hadoop@master3 hadoop-current]$ bin/hdfs fsck /home/hadoop
Connecting to namenode via http://master3:50070/fsck?ugi=hadoop&path=%2Fhome%2Fhadoop
FSCK started by hadoop(null) (auth:SIMPLE) from /XX.XX.XX.XX for path /home/hadoop at Tue Aug 24 11:41:44 CST 2021
Status: HEALTHY
Number of data-nodes: 12
Number of racks: 1
Total dirs: 1
Total symlinks: 0
Replicated Blocks:
Total size: 0 B
Total files: 2
Total blocks (validated): 0
Minimally replicated blocks: 0
Over-replicated blocks: 0
Under-replicated blocks: 0
Mis-replicated blocks: 0
Default replication factor: 3
Average block replication: 0.0
Missing blocks: 0
Corrupt blocks: 0
Missing replicas: 0
Erasure Coded Block Groups:
Total size: 0 B
Total files: 0
Total block groups (validated): 0
Minimally erasure-coded block groups: 0
Over-erasure-coded block groups: 0
Under-erasure-coded block groups: 0
Unsatisfactory placement block groups: 0
Average block group size: 0.0
Missing block groups: 0
Corrupt block groups: 0
Missing internal blocks: 0
FSCK ended at Tue Aug 24 11:41:44 CST 2021 in 1 milliseconds
The filesystem under path '/home/hadoop' is HEALTHY
源码分析
测试代码如下
@Test
public void FsckShell() throws Exception{
Configuration conf = new Configuration();
conf.addResource(new Path("/Users/didi/hdfs-site.xml"));
conf.addResource(new Path("/Users/didi/core-site.xml"));
FsShell fsShell = new FsShell();
fsShell.setConf(conf);
String[] args = {"-openforwrite", "/test"};
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bStream, true);
DFSck dfsck = new DFSck(conf, out);
int errCode = ToolRunner.run(dfsck, args);
System.out.println(errCode);
System.out.println(bStream.toString());
}
从ToolRunner.run()方法step into。 fsck工具的启动入口在org.apache.hadoop.hdfs.tools.DFSck类,主要运行逻辑在doWork方法中
private int doWork(final String[] args) throws IOException {
final StringBuilder url = new StringBuilder();
url.append("/fsck?ugi=").append(ugi.getShortUserName());
String dir = null;
boolean doListCorruptFileBlocks = false;
for (int idx = 0; idx < args.length; idx++) {
if (args[idx].equals("-move")) { url.append("&move=1"); }
else if (args[idx].equals("-delete")) { url.append("&delete=1"); }
else if (args[idx].equals("-files")) { url.append("&files=1"); }
else if (args[idx].equals("-openforwrite")) { url.append("&openforwrite=1"); }
else if (args[idx].equals("-blocks")) { url.append("&blocks=1"); }
else if (args[idx].equals("-locations")) { url.append("&locations=1"); }
else if (args[idx].equals("-racks")) { url.append("&racks=1"); }
else if (args[idx].equals("-storagepolicies")) { url.append("&storagepolicies=1"); }
else if (args[idx].equals("-list-corruptfileblocks")) {
url.append("&listcorruptfileblocks=1");
doListCorruptFileBlocks = true;
} else if (args[idx].equals("-includeSnapshots")) {
url.append("&includeSnapshots=1");
} else if (args[idx].equals("-blockId")) {
StringBuilder sb = new StringBuilder();
idx++;
while(idx < args.length && !args[idx].startsWith("-")){
sb.append(args[idx]);
sb.append(" ");
idx++;
}
url.append("&blockId=").append(URLEncoder.encode(sb.toString(), "UTF-8"));
} else if (!args[idx].startsWith("-")) {
if (null == dir) {
dir = args[idx];
} else {
System.err.println("fsck: can only operate on one path at a time '"
+ args[idx] + "'");
printUsage(System.err);
return -1;
}
} else {
System.err.println("fsck: Illegal option '" + args[idx] + "'");
printUsage(System.err);
return -1;
}
}
if (null == dir) {
dir = "/";
}
Path dirpath = null;
URI namenodeAddress = null;
try {
dirpath = getResolvedPath(dir);
namenodeAddress = getCurrentNamenodeAddress(dirpath);
} catch (IOException ioe) {
System.err.println("FileSystem is inaccessible due to:\n"
+ StringUtils.stringifyException(ioe));
}
这里贴上getCurrentNamenodeAddress()方法
private URI getCurrentNamenodeAddress(Path target) throws IOException {
Configuration conf = getConf();
final FileSystem fs = target.getFileSystem(conf);
if (!(fs instanceof DistributedFileSystem)) {
System.err.println("FileSystem is " + fs.getUri());
return null;
}
return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs), conf,
DFSUtil.getHttpClientScheme(conf));
}
DFSUtil.getInfoServer()方法如下,主要拼接了一个URI:
public static URI getInfoServer(InetSocketAddress namenodeAddr,
Configuration conf, String scheme) throws IOException {
String[] suffixes = null;
if (namenodeAddr != null) {
suffixes = getSuffixIDs(conf, namenodeAddr,
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
String authority;
if ("http".equals(scheme)) {
authority = getSuffixedConf(conf, DFS_NAMENODE_HTTP_ADDRESS_KEY,
DFS_NAMENODE_HTTP_ADDRESS_DEFAULT, suffixes);
} else if ("https".equals(scheme)) {
return URI.create(scheme + "://" + authority);
}
我们接着step into
static String[] getSuffixIDs(final Configuration conf,
final InetSocketAddress address, final String... keys) {
AddressMatcher matcher = new AddressMatcher() {
@Override
public boolean match(InetSocketAddress s) {
return address.equals(s);
}
};
for (String key : keys) {
String[] ids = getSuffixIDs(conf, key, null, null, matcher);
if (ids != null && (ids [0] != null || ids[1] != null)) {
return ids;
}
}
return null;
}
在这个方法中,通过本地conf中的dfs.namenode.rpc-address,来匹配RPC得到的地址(置放于matcher),如匹配成功,则返回所在NS和NN
static String[] getSuffixIDs(final Configuration conf, final String addressKey,
String knownNsId, String knownNNId,
final AddressMatcher matcher) {
String nameserviceId = null;
String namenodeId = null;
int found = 0;
Collection<String> nsIds = getNameServiceIds(conf);
for (String nsId : emptyAsSingletonNull(nsIds)) {
if (knownNsId != null && !knownNsId.equals(nsId)) {
continue;
}
Collection<String> nnIds = getNameNodeIds(conf, nsId);
for (String nnId : emptyAsSingletonNull(nnIds)) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("addressKey: %s nsId: %s nnId: %s",
addressKey, nsId, nnId));
}
if (knownNNId != null && !knownNNId.equals(nnId)) {
continue;
}
String key = addKeySuffixes(addressKey, nsId, nnId);
String addr = conf.get(key);
if (addr == null) {
continue;
}
InetSocketAddress s = null;
try {
s = NetUtils.createSocketAddr(addr);
} catch (Exception e) {
LOG.warn("Exception in creating socket address " + addr, e);
continue;
}
if (!s.isUnresolved() && matcher.match(s)) {
nameserviceId = nsId;
namenodeId = nnId;
found++;
}
}
}
if (found > 1) {
String msg = "Configuration has multiple addresses that match "
+ "local node's address. Please configure the system with "
+ DFS_NAMESERVICE_ID + " and "
+ DFS_HA_NAMENODE_ID_KEY + ". Choose the last address.";
throw new HadoopIllegalArgumentException(msg);
}
return new String[] { nameserviceId, namenodeId };
}
这里是FsckShell()方法最后的部分,
if (namenodeAddress == null) {
System.err.println("DFSck exiting.");
return 0;
}
url.insert(0, namenodeAddress.toString());
url.append("&path=").append(URLEncoder.encode(
Path.getPathWithoutSchemeAndAuthority(dirpath).toString(), "UTF-8"));
System.err.println("Connecting to namenode via " + url.toString());
if (doListCorruptFileBlocks) {
return listCorruptFileBlocks(dir, url.toString());
}
URL path = new URL(url.toString());
URLConnection connection;
try {
connection = connectionFactory.openConnection(path, isSpnegoEnabled);
} catch (AuthenticationException e) {
throw new IOException(e);
}
InputStream stream = connection.getInputStream();
BufferedReader input = new BufferedReader(new InputStreamReader(
stream, "UTF-8"));
String line = null;
String lastLine = null;
int errCode = -1;
try {
while ((line = input.readLine()) != null) {
out.println(line);
lastLine = line;
}
} finally {
input.close();
}
if (lastLine.endsWith(NamenodeFsck.HEALTHY_STATUS)) {
errCode = 0;
} else if (lastLine.endsWith(NamenodeFsck.CORRUPT_STATUS)) {
errCode = 1;
} else if (lastLine.endsWith(NamenodeFsck.NONEXISTENT_STATUS)) {
errCode = 0;
} else if (lastLine.contains("Incorrect blockId format:")) {
errCode = 0;
} else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONED_STATUS)) {
errCode = 2;
} else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONING_STATUS)) {
errCode = 3;
}
return errCode;
}
在idea使用全局搜索,在NameNodeHttpServer类的setupServlets方法中,找到httpServer.addInternalServlet(“fsck”, “/fsck”, FsckServlet.class, true),证明namenode确实使用Servlet响应请求
在FsckServlet类的doGet方法中,找到new NamenodeFsck(conf, nn, bm.getDatanodeManager().getNetworkTopology(), pmap, out, totalDatanodes, remoteAddress).fsck()。 fsck方法真正响应了请求
public void fsck() {
final long startTime = Time.monotonicNow();
try {
if(blockIds != null) {
String[] blocks = blockIds.split(" ");
StringBuilder sb = new StringBuilder();
sb.append("FSCK started by " +
UserGroupInformation.getCurrentUser() + " from " +
remoteAddress + " at " + new Date());
out.println(sb.toString());
sb.append(" for blockIds: \n");
for (String blk: blocks) {
if(blk == null || !blk.contains(Block.BLOCK_FILE_PREFIX)) {
out.println("Incorrect blockId format: " + blk);
continue;
}
out.print("\n");
blockIdCK(blk);
sb.append(blk + "\n");
}
LOG.info(sb.toString());
namenode.getNamesystem().logFsckEvent("/", remoteAddress);
out.flush();
return;
}
String msg = "FSCK started by " + UserGroupInformation.getCurrentUser()
+ " from " + remoteAddress + " for path " + path + " at " + new Date();
LOG.info(msg);
out.println(msg);
namenode.getNamesystem().logFsckEvent(path, remoteAddress);
if (snapshottableDirs != null) {
SnapshottableDirectoryStatus[] snapshotDirs = namenode.getRpcServer()
.getSnapshottableDirListing();
if (snapshotDirs != null) {
for (SnapshottableDirectoryStatus dir : snapshotDirs) {
snapshottableDirs.add(dir.getFullPath().toString());
}
}
}
final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(path);
if (file != null) {
if (showCorruptFileBlocks) {
listCorruptFileBlocks();
return;
}
if (this.showStoragePolcies) {
storageTypeSummary = new StoragePolicySummary(
namenode.getNamesystem().getBlockManager().getStoragePolicies());
}
Result res = new Result(conf);
check(path, file, res);
out.println(res);
out.println(" Number of data-nodes:\t\t" + totalDatanodes);
out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());
if (this.showStoragePolcies) {
out.print(storageTypeSummary.toString());
}
out.println("FSCK ended at " + new Date() + " in "
+ (Time.monotonicNow() - startTime + " milliseconds"));
if (internalError) {
throw new IOException("fsck encountered internal errors!");
}
if (res.isHealthy()) {
out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS);
} else {
out.print("\n\nThe filesystem under path '" + path + "' " + CORRUPT_STATUS);
}
} else {
out.print("\n\nPath '" + path + "' " + NONEXISTENT_STATUS);
}
} catch (Exception e) {
String errMsg = "Fsck on path '" + path + "' " + FAILURE_STATUS;
LOG.warn(errMsg, e);
out.println("FSCK ended at " + new Date() + " in "
+ (Time.monotonicNow() - startTime + " milliseconds"));
out.println(e.getMessage());
out.print("\n\n" + errMsg);
} finally {
out.close();
}
}
小结: fsck是namenode本身提供的对外接口,通过servlet方式访问调用,至于访问方式,随便,只要提交这个接口请求就行了,hadoop的shell命令行是通过一个工具类使用java提交的,你也可指直接在浏览器拼接url,例如:
http://10.4.19.42:50070/fsck?ugi=hadoop&path=/tmp/hadoop/wordcountjavain&files=1&blocks=1&locations=1&racks=1
等价于hadoop fsck /tmp/hadoop/wordcountjavain -fles -blocks -locations -racks (-racks可以没有,没有也出现racks信息擦)
fsck的实质是通过name的大管家FsNamesystem对象(FSDirectory)管理的那套命名空间,及其块汇报上来的信息,从namenode的内存中读取inode的属性及其block信息等,副本数,多少个块,有木有顺坏,这些结果都是现成的,并不需要namenode再去dn找找到对应的块,然后让datanode去检查,所以这种“现成”的结果,即存在namenode内存的信息,就是你fsck得到的结果有时候是过时的,你更改文件一段时间后,才能从fsck到准确的记过,比如我把dn上得文件blk给换一个坏的,这时候namenode没有拿到块汇报信息,你不会从fsck结果立即感知到它坏了。
但是删除等会从fsck得到信息,因为删除的原理前面也介绍过了无论是trash还是skiptrash,都只是把要删除的文件进行标记(寄一本台账,有清理线程周期发布rpc调用对应的dn去删除blk),它直接影响namenode的内存和命名空间(还包括块汇报信息,即dn弄过来的块信息也会因为删除操作而被修改)。
部分参考:https://blog.csdn.net/tracymkgld/article/details/18044577
|