IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> hadoop fsck命令分析 + 源码解析 -> 正文阅读

[大数据]hadoop fsck命令分析 + 源码解析

fsck命令分析

HDFS支持fsck命令用以检查各种不一致。fsck用以报告各种文件问题,如block丢失或缺少block等。fack命令用法如下:

$HADOOP_HOME/bin/hdfs fsck [-move | -delete | -openforwrite] [-files [-blocks [-locations | -racks]]]

<path> 检查的起始目录

-move 将损坏的文件移动到/lost+found下面

-delete 删除损坏的文件

-openforwrite 打印出正在写的文件

-files 打印出所有被检查的文件

-blocks 打印出block报告

-locations 打印出每个block的位置

-racks 打印出datanode的网络拓扑结构

默认情况下,fsck会忽略正在写的文件,使用-openforwrite选项可以汇报这种文件。

官网对fsck命令的介绍如下:
在这里插入图片描述

一个fsck命令运行的结果示例如下:


[hadoop@master3 hadoop-current]$ bin/hdfs fsck /home/hadoop
Connecting to namenode via http://master3:50070/fsck?ugi=hadoop&path=%2Fhome%2Fhadoop
FSCK started by hadoop(null) (auth:SIMPLE) from /XX.XX.XX.XX for path /home/hadoop at Tue Aug 24 11:41:44 CST 2021

Status: HEALTHY
 Number of data-nodes:  12
 Number of racks:               1
 Total dirs:                    1
 Total symlinks:                0

Replicated Blocks:
 Total size:    0 B
 Total files:   2
 Total blocks (validated):      0
 Minimally replicated blocks:   0
 Over-replicated blocks:        0
 Under-replicated blocks:       0
 Mis-replicated blocks:         0
 Default replication factor:    3
 Average block replication:     0.0
 Missing blocks:                0
 Corrupt blocks:                0
 Missing replicas:              0

Erasure Coded Block Groups:
 Total size:    0 B
 Total files:   0
 Total block groups (validated):        0
 Minimally erasure-coded block groups:  0
 Over-erasure-coded block groups:       0
 Under-erasure-coded block groups:      0
 Unsatisfactory placement block groups: 0
 Average block group size:      0.0
 Missing block groups:          0
 Corrupt block groups:          0
 Missing internal blocks:       0
FSCK ended at Tue Aug 24 11:41:44 CST 2021 in 1 milliseconds


The filesystem under path '/home/hadoop' is HEALTHY

源码分析

测试代码如下

	@Test
    public void FsckShell() throws Exception{
        Configuration conf = new Configuration();
        conf.addResource(new Path("/Users/didi/hdfs-site.xml"));
        conf.addResource(new Path("/Users/didi/core-site.xml"));
        FsShell fsShell = new FsShell();
        fsShell.setConf(conf);
        String[] args = {"-openforwrite", "/test"};

        ByteArrayOutputStream bStream = new ByteArrayOutputStream();
        PrintStream out = new PrintStream(bStream, true);
        DFSck dfsck = new DFSck(conf, out);
        int errCode = ToolRunner.run(dfsck, args);
        System.out.println(errCode);
        System.out.println(bStream.toString());

    }

从ToolRunner.run()方法step into。
fsck工具的启动入口在org.apache.hadoop.hdfs.tools.DFSck类,主要运行逻辑在doWork方法中


 private int doWork(final String[] args) throws IOException {
    final StringBuilder url = new StringBuilder();
   
    url.append("/fsck?ugi=").append(ugi.getShortUserName());
    String dir = null;
    boolean doListCorruptFileBlocks = false;
    for (int idx = 0; idx < args.length; idx++) {
      if (args[idx].equals("-move")) { url.append("&move=1"); }
      else if (args[idx].equals("-delete")) { url.append("&delete=1"); }
      else if (args[idx].equals("-files")) { url.append("&files=1"); }
      else if (args[idx].equals("-openforwrite")) { url.append("&openforwrite=1"); }
      else if (args[idx].equals("-blocks")) { url.append("&blocks=1"); }
      else if (args[idx].equals("-locations")) { url.append("&locations=1"); }
      else if (args[idx].equals("-racks")) { url.append("&racks=1"); }
      else if (args[idx].equals("-storagepolicies")) { url.append("&storagepolicies=1"); }
      else if (args[idx].equals("-list-corruptfileblocks")) {
        url.append("&listcorruptfileblocks=1");
        doListCorruptFileBlocks = true;
      } else if (args[idx].equals("-includeSnapshots")) {
        url.append("&includeSnapshots=1");
      } else if (args[idx].equals("-blockId")) {
        StringBuilder sb = new StringBuilder();
        idx++;
        while(idx < args.length && !args[idx].startsWith("-")){
          sb.append(args[idx]);
          sb.append(" ");
          idx++;
        }
        url.append("&blockId=").append(URLEncoder.encode(sb.toString(), "UTF-8"));
      } else if (!args[idx].startsWith("-")) {
        if (null == dir) {
          dir = args[idx];
        } else {
          System.err.println("fsck: can only operate on one path at a time '"
              + args[idx] + "'");
          printUsage(System.err);
          return -1;
        }

      } else {
        System.err.println("fsck: Illegal option '" + args[idx] + "'");
        printUsage(System.err);
        return -1;
      }
    }
    //整个for循环其实只干了一件事:解析传过来的参数,加上从conf里得到的用户名称,并将其拼接成一个url
    
    //到这里,url的值为/fsck?ugi=XXX&openforwrite=1
    
    if (null == dir) {
      dir = "/";
    }

    Path dirpath = null;
    URI namenodeAddress = null;
    try {
      dirpath = getResolvedPath(dir); //  注意!!!这里进行了一次RPC,调用了DistributedFileSystem.getFileStatus()与namenode交互,获得了参数中的路径所在的HDFS集群的绝对地址
      // 此时, dirpath = hdfs://mycluster/test
      namenodeAddress = getCurrentNamenodeAddress(dirpath); // 这里获得了namenode的地址,具体代码见下文
    } catch (IOException ioe) {
      System.err.println("FileSystem is inaccessible due to:\n"
          + StringUtils.stringifyException(ioe));
    }
    ///......
    ///......

这里贴上getCurrentNamenodeAddress()方法

private URI getCurrentNamenodeAddress(Path target) throws IOException {
    //String nnAddress = null;
    Configuration conf = getConf();

    //get the filesystem object to verify it is an HDFS system
    final FileSystem fs = target.getFileSystem(conf);
    if (!(fs instanceof DistributedFileSystem)) {
      System.err.println("FileSystem is " + fs.getUri());
      return null;
    }

    return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs), conf,
        DFSUtil.getHttpClientScheme(conf));
    //这里,getAddressOfActive()方法进行了RPC,再次与namenode交互获得了当前处于active状态的namenode
  }

DFSUtil.getInfoServer()方法如下,主要拼接了一个URI:

public static URI getInfoServer(InetSocketAddress namenodeAddr,
      Configuration conf, String scheme) throws IOException {
    String[] suffixes = null;
    if (namenodeAddr != null) {
      // if non-default namenode, try reverse look up 
      // the nameServiceID if it is available
      suffixes = getSuffixIDs(conf, namenodeAddr,  // 这个方法返回了存储目标路径的socketaddress:master1/XX.XX.XX.XX:8020
          DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
          DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
    }

    String authority;
    if ("http".equals(scheme)) {
      authority = getSuffixedConf(conf, DFS_NAMENODE_HTTP_ADDRESS_KEY,
          DFS_NAMENODE_HTTP_ADDRESS_DEFAULT, suffixes);
    } else if ("https".equals(scheme)) {
      //...
      //...
    return URI.create(scheme + "://" + authority);
  }

我们接着step into

static String[] getSuffixIDs(final Configuration conf,
      final InetSocketAddress address, final String... keys) {
    AddressMatcher matcher = new AddressMatcher() {
     @Override
      public boolean match(InetSocketAddress s) {
        return address.equals(s);
      } 
    };
    
    for (String key : keys) {
      String[] ids = getSuffixIDs(conf, key, null, null, matcher);
      if (ids != null && (ids [0] != null || ids[1] != null)) {
        return ids;
      }
    }
    return null;
  }

在这个方法中,通过本地conf中的dfs.namenode.rpc-address,来匹配RPC得到的地址(置放于matcher),如匹配成功,则返回所在NS和NN

static String[] getSuffixIDs(final Configuration conf, final String addressKey,
      String knownNsId, String knownNNId,
      final AddressMatcher matcher) {
    String nameserviceId = null;
    String namenodeId = null;
    int found = 0;
    
    Collection<String> nsIds = getNameServiceIds(conf);
    for (String nsId : emptyAsSingletonNull(nsIds)) {
      if (knownNsId != null && !knownNsId.equals(nsId)) {
        continue;
      }
      
      Collection<String> nnIds = getNameNodeIds(conf, nsId);
      for (String nnId : emptyAsSingletonNull(nnIds)) {
        if (LOG.isTraceEnabled()) {
          LOG.trace(String.format("addressKey: %s nsId: %s nnId: %s",
              addressKey, nsId, nnId));
        }
        if (knownNNId != null && !knownNNId.equals(nnId)) {
          continue;
        }
        String key = addKeySuffixes(addressKey, nsId, nnId);
        String addr = conf.get(key);
        if (addr == null) {
          continue;
        }
        InetSocketAddress s = null;
        try {
          s = NetUtils.createSocketAddr(addr);
        } catch (Exception e) {
          LOG.warn("Exception in creating socket address " + addr, e);
          continue;
        }
        if (!s.isUnresolved() && matcher.match(s)) {
          nameserviceId = nsId;
          namenodeId = nnId;
          found++;
        }
      }
    }
    if (found > 1) { // Only one address must match the local address
      String msg = "Configuration has multiple addresses that match "
          + "local node's address. Please configure the system with "
          + DFS_NAMESERVICE_ID + " and "
          + DFS_HA_NAMENODE_ID_KEY + ". Choose the last address.";
      throw new HadoopIllegalArgumentException(msg);
    }
    return new String[] { nameserviceId, namenodeId };
  }

这里是FsckShell()方法最后的部分,

    if (namenodeAddress == null) {
      //Error message already output in {@link #getCurrentNamenodeAddress()}
      System.err.println("DFSck exiting.");
      return 0;
    }

    url.insert(0, namenodeAddress.toString());//这里,将url和namenode的地址拼在了一块
    //此时,url值为http://master1:50070/fsck?ugi=XXX&openforwrite=1
    url.append("&path=").append(URLEncoder.encode(
      Path.getPathWithoutSchemeAndAuthority(dirpath).toString(), "UTF-8"));
  //这次,拼接dir,url值为http://master1:50070/fsck?ugi=XXX&openforwrite=1&path=%2Ftest
  //其实,namenode实现了fsck的servlet,这个fsck命令行脚本只不过去向这个接口提交请求。
    System.err.println("Connecting to namenode via " + url.toString());

    if (doListCorruptFileBlocks) {
      return listCorruptFileBlocks(dir, url.toString());
    }//如有坏块,执行坏块汇报
    URL path = new URL(url.toString());
    URLConnection connection;
    try {
      connection = connectionFactory.openConnection(path, isSpnegoEnabled);
    } catch (AuthenticationException e) {
      throw new IOException(e);
    }
    InputStream stream = connection.getInputStream();
    BufferedReader input = new BufferedReader(new InputStreamReader(
                                              stream, "UTF-8"));
    String line = null;
    String lastLine = null;
    int errCode = -1;
    try {
      while ((line = input.readLine()) != null) {
        out.println(line);
        lastLine = line;
      }
    } finally {
      input.close();
    }
    if (lastLine.endsWith(NamenodeFsck.HEALTHY_STATUS)) {
      errCode = 0;
    } else if (lastLine.endsWith(NamenodeFsck.CORRUPT_STATUS)) {
      errCode = 1;
    } else if (lastLine.endsWith(NamenodeFsck.NONEXISTENT_STATUS)) {
      errCode = 0;
    } else if (lastLine.contains("Incorrect blockId format:")) {
      errCode = 0;
    } else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONED_STATUS)) {
      errCode = 2;
    } else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONING_STATUS)) {
      errCode = 3;
    }
    return errCode;
  }

在idea使用全局搜索,在NameNodeHttpServer类的setupServlets方法中,找到httpServer.addInternalServlet(“fsck”, “/fsck”, FsckServlet.class, true),证明namenode确实使用Servlet响应请求

在FsckServlet类的doGet方法中,找到new NamenodeFsck(conf, nn, bm.getDatanodeManager().getNetworkTopology(), pmap, out,
totalDatanodes, remoteAddress).fsck()。
fsck方法真正响应了请求

 public void fsck() {
    final long startTime = Time.monotonicNow();
    try {
      if(blockIds != null) {
        String[] blocks = blockIds.split(" ");
        StringBuilder sb = new StringBuilder();
        sb.append("FSCK started by " +
            UserGroupInformation.getCurrentUser() + " from " +
            remoteAddress + " at " + new Date());
        out.println(sb.toString());
        sb.append(" for blockIds: \n");
        for (String blk: blocks) {
          if(blk == null || !blk.contains(Block.BLOCK_FILE_PREFIX)) {
            out.println("Incorrect blockId format: " + blk);
            continue;
          }
          out.print("\n");
          blockIdCK(blk);
          sb.append(blk + "\n");
        }
        LOG.info(sb.toString());
        namenode.getNamesystem().logFsckEvent("/", remoteAddress);
        out.flush();
        return;
      }

      String msg = "FSCK started by " + UserGroupInformation.getCurrentUser()
          + " from " + remoteAddress + " for path " + path + " at " + new Date();
      LOG.info(msg);//用户的fsck操作会被打到namenode的log里。
      out.println(msg);
      namenode.getNamesystem().logFsckEvent(path, remoteAddress);
		
      if (snapshottableDirs != null) {
        SnapshottableDirectoryStatus[] snapshotDirs = namenode.getRpcServer()
            .getSnapshottableDirListing();
        if (snapshotDirs != null) {
          for (SnapshottableDirectoryStatus dir : snapshotDirs) {
            snapshottableDirs.add(dir.getFullPath().toString());
          }
        }
      }
	  //这里就是找到文件对应的inode
      final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(path);
      if (file != null) {

        if (showCorruptFileBlocks) {
          listCorruptFileBlocks();
          return;
        }
        
        if (this.showStoragePolcies) {
          storageTypeSummary = new StoragePolicySummary(
              namenode.getNamesystem().getBlockManager().getStoragePolicies());
        }

        Result res = new Result(conf);

        check(path, file, res);

        out.println(res);
        out.println(" Number of data-nodes:\t\t" + totalDatanodes);
        out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());

        if (this.showStoragePolcies) {
          out.print(storageTypeSummary.toString());
        }

        out.println("FSCK ended at " + new Date() + " in "
            + (Time.monotonicNow() - startTime + " milliseconds"));

        // If there were internal errors during the fsck operation, we want to
        // return FAILURE_STATUS, even if those errors were not immediately
        // fatal.  Otherwise many unit tests will pass even when there are bugs.
        if (internalError) {
          throw new IOException("fsck encountered internal errors!");
        }

        // DFSck client scans for the string HEALTHY/CORRUPT to check the status
        // of file system and return appropriate code. Changing the output
        // string might break testcases. Also note this must be the last line 
        // of the report.
        if (res.isHealthy()) {
          out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS);
        } else {
          out.print("\n\nThe filesystem under path '" + path + "' " + CORRUPT_STATUS);
        }

      } else {
        out.print("\n\nPath '" + path + "' " + NONEXISTENT_STATUS);
      }
    } catch (Exception e) {
      String errMsg = "Fsck on path '" + path + "' " + FAILURE_STATUS;
      LOG.warn(errMsg, e);
      out.println("FSCK ended at " + new Date() + " in "
          + (Time.monotonicNow() - startTime + " milliseconds"));
      out.println(e.getMessage());
      out.print("\n\n" + errMsg);
    } finally {
      out.close();
    }
  }

小结:
fsck是namenode本身提供的对外接口,通过servlet方式访问调用,至于访问方式,随便,只要提交这个接口请求就行了,hadoop的shell命令行是通过一个工具类使用java提交的,你也可指直接在浏览器拼接url,例如:

http://10.4.19.42:50070/fsck?ugi=hadoop&path=/tmp/hadoop/wordcountjavain&files=1&blocks=1&locations=1&racks=1

等价于hadoop fsck /tmp/hadoop/wordcountjavain -fles -blocks -locations -racks (-racks可以没有,没有也出现racks信息擦)

fsck的实质是通过name的大管家FsNamesystem对象(FSDirectory)管理的那套命名空间,及其块汇报上来的信息,从namenode的内存中读取inode的属性及其block信息等,副本数,多少个块,有木有顺坏,这些结果都是现成的,并不需要namenode再去dn找找到对应的块,然后让datanode去检查,所以这种“现成”的结果,即存在namenode内存的信息,就是你fsck得到的结果有时候是过时的,你更改文件一段时间后,才能从fsck到准确的记过,比如我把dn上得文件blk给换一个坏的,这时候namenode没有拿到块汇报信息,你不会从fsck结果立即感知到它坏了。

但是删除等会从fsck得到信息,因为删除的原理前面也介绍过了无论是trash还是skiptrash,都只是把要删除的文件进行标记(寄一本台账,有清理线程周期发布rpc调用对应的dn去删除blk),它直接影响namenode的内存和命名空间(还包括块汇报信息,即dn弄过来的块信息也会因为删除操作而被修改)。

部分参考:https://blog.csdn.net/tracymkgld/article/details/18044577

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-26 12:10:28  更:2021-08-26 12:10:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:01:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码