IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Hadoop源码分析(九) -> 正文阅读

[大数据]Hadoop源码分析(九)

2021SC@SDUSC

研究内容简略介绍

上周我们对输入分片InputSplit进行了分析,对其分片大小进行了进一步的探究。随后展开了对与mapreduce中的核心类mapper的分析,在了解其作用的基础上,又对其几个重要的子类做了源码分析。本此我们将继续分析,首先从org.apache.hadoop.mapreduce.MarkableIterator<VALUE>开始。

在这里插入图片描述

org.apache.hadoop.mapreduce.MarkableIterator源码分析

首先附上文件的源代码:

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
 * <code>MarkableIterator</code> is a wrapper iterator class that 
 * implements the {@link MarkableIteratorInterface}.
 * 
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MarkableIterator<VALUE> 
  implements MarkableIteratorInterface<VALUE> {

  MarkableIteratorInterface<VALUE> baseIterator;

  /**
   * Create a new iterator layered on the input iterator
   * @param itr underlying iterator that implements MarkableIteratorInterface
   */
  public MarkableIterator(Iterator<VALUE> itr)  {
    if (!(itr instanceof MarkableIteratorInterface)) {
      throw new IllegalArgumentException("Input Iterator not markable");
    }
    baseIterator = (MarkableIteratorInterface<VALUE>) itr;
  }

  @Override
  public void mark() throws IOException {
    baseIterator.mark();
  }

  @Override
  public void reset() throws IOException {
    baseIterator.reset();
  }

  @Override
  public void clearMark() throws IOException {
    baseIterator.clearMark();
  }

  @Override
  public boolean hasNext() { 
    return baseIterator.hasNext();
  }

  @Override
  public VALUE next() {
    return baseIterator.next();
  }

  @Override
  public void remove() {
    throw new UnsupportedOperationException("Remove Not Implemented");
  }
}

可以看出,MarkableIterator是一个包装迭代器类,它实现了MarkableIteratorInterface接口。
在这里插入图片描述
其中:
mark()方法能够标记当前记录,对 reset 的后续调用会将迭代器倒回到此记录。
reset()方法可以在调用上一个标记之前将迭代器重置为最后一条记录。
clearMark()可以清除任何先前设置的标记。

org.apache.hadoop.mapreduce.OutputCommitter源码分析

接下来分析OutputCommitter。

Map-Reduce 框架依赖于OutputCommitter作业:
1.在初始化期间设置作业。例如,在作业初始化期间为作业创建临时输出目录。
2.作业完成后清理作业。例如,在作业完成后删除临时输出目录。
3.设置任务临时输出。
4.检查任务是否需要提交。如果任务不需要提交,这是为了避免提交过程。
5.提交任务输出。
6.放弃任务提交。

新版本 MapReduce API中,OutputCommitter 由OutputFormat 通过getOutputCommitter() 方法确定。默认为FileOutputCommitter,适用于有文件输出的MapReduce任务。若是需要,也可以实现一个新的OutputCommitter类,以对作业的完成或任务做自定义设置或清理。

OutputCommiter 部分源码如下:

public abstract class OutputCommitter extends org.apache.hadoop.mapreduce.OutputCommitter {
     public OutputCommitter() {
     }
 
     public abstract void setupJob(JobContext var1) throws IOException;
 
     /** @deprecated */
     @Deprecated
     public void cleanupJob(JobContext jobContext) throws IOException {
     }
 
     public void commitJob(JobContext jobContext) throws IOException {
         this.cleanupJob(jobContext);
     }
 
     public void abortJob(JobContext jobContext, int status) throws IOException {
         this.cleanupJob(jobContext);
     }
 
     public abstract void setupTask(TaskAttemptContext var1) throws IOException;
 
     public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;
 
     public abstract void commitTask(TaskAttemptContext var1) throws IOException;
 
     public abstract void abortTask(TaskAttemptContext var1) throws IOException;

其中 setupJob在作业运行前被调用,用于初始化操作。当OutputCommitter 被设置为 FileOutputCommitter时,它会创建最终的输出目录${mapreduce.output.fileoutputformat.outputdir},并为任务的输出创建一个临时文件夹 _temporary,作为最终输出目录的子目录。

FileOutputCommitter 中setupJob() 方法源码如下:

public void setupJob(JobContext context) throws IOException {
     if (this.hasOutputPath()) {
         Path jobAttemptPath = this.getJobAttemptPath(context);
         FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
         if (!fs.mkdirs(jobAttemptPath)) {
             LOG.error("Mkdirs failed to create " + jobAttemptPath);
         }
     } else {
         LOG.warn("Output Path is null in setupJob()");
     }
 
 }

其中 jobAttemptPath 由 getJobAttemptPath(context) 获取,一层层往下查看此方法调用,最终可以看到FileOutputCommitter 创建的临时目录为:目标输出目录下的_temporary 子目录:

private static Path getPendingJobAttemptsPath(Path out) {
     return new Path(out, "_temporary");
 }

如果作业成功,则调用 commitJob() 方法。此方法会做临时文件的清理(cleanupJob()),并在最终输出目录中创建名为_SUCCESS的文件,表示Job成功执行完成。若是Job 执行失败,则被状态对象调用abortJob(),默认会调用 cleanupJob() 的方法,对临时文件进行清理。

以上提到的是Job 级别的Committer。在 Task级别,同样也有上述几种方法:

public abstract void setupTask(TaskAttemptContext var1) throws IOException;
 
 public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;
 
 public abstract void commitTask(TaskAttemptContext var1) throws IOException;
 
 public abstract void abortTask(TaskAttemptContext var1) throws IOException;

其中,在 task 执行之前会调用 setupTask(),但是默认并不做任何工作。因为创建临时任务的输出路径的工作已经在setupJob() 阶段完成。方法needsTaskCommit返回是否需要task 执行提交阶段。提交阶段的工作为:将临时目录下的输出(若有)移动到最终目录。若设置为 false,则执行框架不会为任务运行分布式提交协议,也就不会执行commitTask() 或 abortTask()。当此task没有写任何输出时,FileOutputCommitter会跳过 commit (提交)阶段。

如果task成功执行,并且有输出,则会调用commitTask() 方法,(默认的实现为)将临时目录下的输出文件移动到最终目录(mapreduce.output.fileoutputformat.outputdir)。若是执行失败,则调用abortTask(),删除任务输出的临时目录及文件。

执行框架会保证一个task在有多次尝试的情况下,仅有一个task会被提交。

FileOutputCommitter 有两个方法,commitTask 和 commitJob。Apache Spark 2.0 以及更高版本使用的是 Apache Hadoop 2。
Apache Hadoop 2 使用 mapreduce.fileoutputcommitter.algorithm.version 控制 commitTask 和 commitJob 如何工作。
在 Hadoop 2 中,默认的值是 1。在这种情况下,commitTask 会将 task 的输出文件从 task 的临时目录移动到 job 的临时目录下。在所有 task 任务完成后,commitJob 将生成的数据从 job 的临时目录移动到最终的 job 目录下。这个工作在 spark 中由 driver 完成。

若是使用的是云存储(如 s3),则这个操作会消耗较长时间。会看到所有 task 已结束,但是任务仍未结束。

在设置 mapreduce.fileoutputcommitter.algorithm.version 的值为 2 后,commitTask 会将 task 生成的输出文件从 task 临时目录直接移动到 job 的最终目录。

此时,commitJob 基本无操作。

org.apache.hadoop.mapreduce.OutputFormat<K,V>源码分析

分析了OutputCommitter之后,我们来看看OutputFormat。

package org.apache.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;


@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class OutputFormat<K, V> {

  public abstract RecordWriter<K, V> 
    getRecordWriter(TaskAttemptContext context
                    ) throws IOException, InterruptedException;

 
  public abstract void checkOutputSpecs(JobContext context
                                        ) throws IOException, 
                                                 InterruptedException;


  public abstract 
  OutputCommitter getOutputCommitter(TaskAttemptContext context
                                     ) throws IOException, InterruptedException;
}


首先我们看官方api的解释:
在这里插入图片描述
大致是说:
OutputFormat 描述 Map-Reduce 作业的输出规范。

Map-Reduce 框架依赖于OutputFormat作业:
1.验证作业的输出规范。例如,检查输出目录是否已经存在。
2.提供RecordWriter用于写出作业输出文件的实现。输出文件存储在 FileSystem.

在MapReduce框架中,OutputFormat负责把Reducer处理完成的Key-Value写出到本地磁盘或HDFS上,默认计算结果会以part-000的命名输出成多个文件,并且输出的文件数量与Reduce数量一致。000是关联到某个Reduce任务的分区的Id号。

MapReduce提供多种输出格式,用户可以灵活设置输出的路径、文件名、输出格式等。输出格式类实现OutputFormat接口,FileOutputFormat是实现的抽象类,常见的实现类还包括TextOutputFormat、SequenceFileOutputFormat、NullOutputFormat、DBOutputFormat,继承关系图如图所示。

在这里插入图片描述
1)TextOutputFormat, 默认输出字符串输出格式,key和value中间值用tab隔开;
2)SequenceFileOutputFormat, 序列化文件输出,将key和value以sequencefile格式输出;
3)MultipleOutputs,可以把输出数据输送到不同的目录;
4)NullOutputFormat, 把输出输出到/dev/null中,即不输出任何数据,这个应用场景是在MR中进行了逻辑处理,同时输出文件已经在MR中进行了输出,而不需要在输出的情况;
5)DBOutputFormat, 适用于将作业输出数据(数据量太大不适合)存到Mysql、Oracle等数据库,在写出数据时会并行连接数据库,需要设置合适的map、reduce个数以便将并行连接的数量控制在合理的范围之内。

在驱动程序中可以通过特定方法实现输出定义:
1)指定输出的格式化类
job.setOutputFormatClass(TextOutputFormat.class)
2)设置输出的文件名
TextOutputFormat.setOutputName(job, “foobar”)
3)设置输出路径
TextOutputFormat.setOutputPath()

自定义输出格式

实现自定义输出格式,我们需要
(1)继承OutputFormat的类,实现getRecordWriter方法,返回一个RecordWriter类型;
(2)继承RecordWriter的类,定义其write方法,针对每个<key,value>写入文件数据;

OutputFormat的接口定义如下,在具体实现时需要自定义RecordWriter和OutputCommitter类,其中OutputCommitter类由于不涉及到具体的输出目的地,所以一般情况下,不用重写,可直接使用FileOutputCommitter对象,RecordWriter类是具体的定义如何将数据写到目的地的。

public abstract class OutputFormat<K, V> { 
    // 获取具体的数据写出对象
    public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
     throws IOException, InterruptedException; 
    // 检查输出配置信息是否正确
    public abstract void checkOutputSpecs(JobContext context)
     throws IOException, InterruptedException; 
    // 获取输出job的提交者对象
    public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context)
     throws IOException, InterruptedException; 
}

自定义RecordWriter需要实现以下方法:

public abstract class RecordWriter<K, V> {  
    // 具体的写数据的方法
    public abstract void write(K key, V value)
     throws IOException, InterruptedException;  
     // 关闭资源
    public abstract void close(TaskAttemptContext context)
     throws IOException, InterruptedException; 
}

总结

本次我们分析了迭代器类MarkableIterator,OutputCommitter,OutputFormat以及其子类,并对自定义格式输出有了进一步的了解,为之后的源码分析打下了基础。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-01 17:46:35  更:2021-12-01 17:49:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 13:59:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码