IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 2021SC@SDUSC HBase(十三)项目代码分析——WAL写入 -> 正文阅读

[大数据]2021SC@SDUSC HBase(十三)项目代码分析——WAL写入

2021SC@SDUSC

一、简述

Hbase 的 WAL 机制是保证 hbase 使用 lsm 树存储模型把随机写转化成顺序写,并从内存 read 数据,从而提高大规模读写效率的关键一环。wal 的多生产者单消费者的线程模型让wal的写入变得安全而高效。
WAL(Write-Ahead Logging)是数据库系统中保障原子性和持久性的技术,通过使用WAL可以将数据的随机写入变为顺序写入,可以提高数据写入的性能。在hbase中写入数据时,会将数据写入内存同时写wal日志,为防止日志丢失,日志是写在hdfs上的。

二、机制

WAL(write ahead log)类似oracle的归档日志,提供了一种高并发,持久化的日志保存和回放机制。数据的写入操作(PUT/DELETE) 执行前,都会先写Hlog。

  1. client 向regionserver端提交数据的时候,会优先写WAL日志(HLog),只有当WAL日志写成功以后,client才会被告诉提交数据成功,如果写WAL失败会告知客户端提交失败
  2. 一个regionserver上所有的region共享一个HLog,一次数据的提交是先写WAL,再写memstore
    在这里插入图片描述

HLog类
实现了WAL的类叫做HLog,当hregion被实例化时,HLog实例会被当做一个参数传到HRegion的构造器中,当一个Region接收到一个更新操作时,它可以直接把数据保存到一个共享的WAL实例中去
在这里插入图片描述
HLogKey类
1、当前的WAL使用的是hadoop的sequencefile格式,其key是HLogKey实例。HLogKey中记录了写入数据的归属信息,,除了table和region名字外,同时还包括sequence number和timestamp,timestamp是“写入时间“,sequence number的起始值为0,或者是最近一次存入文件系统中sequence number
2、HLog sequence File的value是HBase的KeyValue对象,即对应HFile中的KeyValue
WALEdit类
客户端发送的每个修改都会封装成WALEdit类,一个WALEdit类包含了多个更新操作,可以说一个WALEdit就是一个原子操作,包含若干个操作的集合
LogSyncer类
1、Table在创建的时候,有一个参数可以设置,是否每次写Log日志都需要往集群的其他机器同步一次,默认是每次都同步,同步的开销是比较大的,但不及时同步又可能因为机器宕而丢日志。同步的操作现在是通过pipeline的方式来实现的,pipeline是指datanode接收数据后,再传给另外一台datanode,是一种串行的方式,n-Way writes是指多datanode同时接收数据,最慢的一台结束就是整个结束,差别在于一个延迟大,一个开发高,hdfs现在正在开发中,以便可以选择是按pipeline还是n-way writes来实现写操作
2、Table如果设置每次不同步,则写操作会被RegionServer缓存,并启动一个LogSyncer线程来定时同步日志,定时时间默认是一秒也可由hbase.regionserver.optionallogflushinterval设置
LogRoller类
日志写入的大小是有限制的,LogRoller类会作为一个后台线程运行,在特定的时间间隔内滚动日志,通过hbase.regionserver.logroll.period属性控制,默认1小时

三、线程模型

在这里插入图片描述
这个图主要描述了HRegion中调用append和sync后,hbase的wal线程流转模型。最左边是有多个client提交到HRegion的append和sync操作。
当调用append后WALEdit和WALKey会被封装成FSWALEntry类进而再封装成RinbBufferTruck类放入一个线程安全的Buffer(LMAX Disruptor RingBuffer)中。
当调用sync后会生成一个SyncFuture进而封装成RinbBufferTruck类同样放入这个Buffer中,然后工作线程此时会被阻塞等待被notify()唤醒。在最右边会有一个且只有一个线程专门去处理这些RinbBufferTruck,如果是FSWALEntry则写入hadoop sequence文件。因为文件缓存的存在,这时候很可能client数据并没有落盘。所以进一步如果是SyncFuture会被批量的放到一个线程池中,异步的批量去刷盘,刷盘成功后唤醒工作线程完成wal。

四、具体实现

工作线程中当HRegion准备好一个行事务“写”操作的,WALEdit,WALKey后就会调用FSHLog的append方法。

if (walEdit.isReplay()) {
      walKey.setOrigLogSeqNum(origLogSeqNum);
    }
    //don't call the coproc hook for writes to the WAL caused by
    //system lifecycle events like flushes or compactions
    if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
      this.coprocessorHost.preWALAppend(walKey, walEdit);
    }
    WriteEntry writeEntry = null;
    try {
      long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
      // Call sync on our edit.
      if (txid != 0) {
        sync(txid, durability);
      }
      writeEntry = walKey.getWriteEntry();
    } catch (IOException ioe) {
      if (walKey != null && walKey.getWriteEntry() != null) {
        mvcc.complete(walKey.getWriteEntry());
      }
      throw ioe;
    }

FSHLog的append方法首先会从LAMX Disruptor RingbBuffer中拿到一个序号作为txid(sequence),然后把WALEdit,WALKey和sequence等构建一个FSALEntry实例entry,并把entry放到ringbuffer中。而entry以truck(RingBufferTruck,ringbuffer实际存储类型)通过sequence和ringbuffer一一对应。

 public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
      final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners,
      final boolean failIfWALExists, final String prefix, final String suffix) throws IOException {
    super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
    this.minTolerableReplication = conf.getInt(TOLERABLE_LOW_REPLICATION,
      CommonFSUtils.getDefaultReplication(fs, this.walDir));
    this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT);
    this.closeErrorsTolerated = conf.getInt(ROLL_ERRORS_TOLERATED, DEFAULT_ROLL_ERRORS_TOLERATED);
    String hostingThreadName = Thread.currentThread().getName();
    this.disruptor = new Disruptor<>(RingBufferTruck::new,
        getPreallocatedEventCount(),
        Threads.newDaemonThreadFactory(hostingThreadName + ".append"),
        ProducerType.MULTI, new BlockingWaitStrategy());
   this.disruptor.getRingBuffer().next();
    int syncerCount = conf.getInt(SYNCER_COUNT, DEFAULT_SYNCER_COUNT);
    int maxBatchCount = conf.getInt(MAX_BATCH_COUNT,
        conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, DEFAULT_MAX_BATCH_COUNT));
    this.ringBufferEventHandler = new RingBufferEventHandler(syncerCount, maxBatchCount);
    this.disruptor.setDefaultExceptionHandler(new RingBufferExceptionHandler());
    this.disruptor.handleEventsWith(new RingBufferEventHandler[] { this.ringBufferEventHandler });
    this.disruptor.start();
  }
protected long getSequenceOnRingBuffer() {
    return this.disruptor.getRingBuffer().next();
  }

  private SyncFuture publishSyncOnRingBuffer(boolean forceSync) {
    long sequence = getSequenceOnRingBuffer();
    return publishSyncOnRingBuffer(sequence, forceSync);
  }

  @VisibleForTesting
  protected SyncFuture publishSyncOnRingBuffer(long sequence, boolean forceSync) {
    // here we use ring buffer sequence as transaction id
    SyncFuture syncFuture = getSyncFuture(sequence, forceSync);
    try {
      RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
      truck.load(syncFuture);
    } finally {
      this.disruptor.getRingBuffer().publish(sequence);
    }
    return syncFuture;
  }

如果client设置的持久化等级是USER_DEFAULT,SYNC_WAL或FSYNC_WAL,那么工作线程的HRegion还将调用FSHLog的sync()方法:

  private void sync(long txid, Durability durability) throws IOException {
    if (this.getRegionInfo().isMetaRegion()) {
      this.wal.sync(txid);
    } else {
      switch(durability) {
      case USE_DEFAULT:
        // do what table defaults to
        if (shouldSyncWAL()) {
          this.wal.sync(txid);
        }
        break;
      case SKIP_WAL:
        // nothing do to
        break;
      case ASYNC_WAL:
        // nothing do to
        break;
      case SYNC_WAL:
          this.wal.sync(txid, false);
          break;
      case FSYNC_WAL:
          this.wal.sync(txid, true);
          break;
      default:
        throw new RuntimeException("Unknown durability " + durability);
      }
    }
  }

Sync()方法会往ringbuffer中放入一个SyncFuture对象,并阻塞等待完成(唤醒)。
在FSHLog中有一个私有内部类RingBufferEventHandler类实现了LAMX Disruptor的EventHandler接口,也即是实现了OnEvent方法的ringbuffer的消费者。Disruptor通过 java.util.concurrent.ExecutorService 提供的线程来触发 Consumer 的事件处理,可以看到hbase的wal中只启了一个线程,从源码注释中也可以看到RingBufferEventHandler在运行中只有单个线程。由于消费者是按照sequence的顺序刷数据,这样就能保证WAL日志并发写入时只有一个线程在真正的写入日志文件的可感知的全局唯一顺序。
RingBufferEventHandler类的onEvent()(一个回调方法)是具体处理append和sync的方法。在前面说明过wal使用RingBufferTruck来封装WALEntry和SyncFuture(如下图源码),在消费线程的实际执行方法onEvent()中就是被ringbuffer通知一个个的从ringbfer取出RingBufferTruck,如果是WALEntry则使用当前HadoopSequence文件writer写入文件(此时很可能写的是文件缓存),如果是SyncFuture则简单的轮询处理放入SyncRunner线程异步去把文件缓存中数据刷到磁盘。

/**
 * A 'truck' to carry a payload across the ring buffer from Handler to WAL. Has EITHER a
 * {@link FSWALEntry} for making an append OR it has a {@link SyncFuture} to represent a 'sync'
 * invocation. Truck instances are reused by the disruptor when it gets around to it so their
 * payload references must be discarded on consumption to release them to GC.
 */
final class RingBufferTruck {
  public enum Type {
    APPEND, SYNC, EMPTY
  }

这部分源码可以看到RingBufferTruck类的结构,从注释可以看到选择SyncFuture和FSWALEntry一个放入ringbuffer中。
可以看到append的最终归属就是根据sequence有序的把FSWALEntry实例entry写入HadoopSequence文件。这里有序的原因是多工作线程写之前通过ringbuffer线程安全的CAS得到一个递增的sequence,ringbuffer会根据sequence取出FSWALEntry并落盘。这样做其实只有在得到递增的sequence的时候需要保证线程安全,而java的CAS通过轮询并不用加锁,所以效率很高。

 public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
        throws Exception {
      // Appends and syncs are coming in order off the ringbuffer. We depend on this fact. We'll
      // add appends to dfsclient as they come in. Batching appends doesn't give any significant
      // benefit on measurement. Handler sync calls we will batch up. If we get an exception
      // appending an edit, we fail all subsequent appends and syncs with the same exception until
      // the WAL is reset. It is important that we not short-circuit and exit early this method.
      // It is important that we always go through the attainSafePoint on the end. Another thread,
      // the log roller may be waiting on a signal from us here and will just hang without it.

      try {
        if (truck.type() == RingBufferTruck.Type.SYNC) {
          this.syncFutures[this.syncFuturesCount.getAndIncrement()] = truck.unloadSync();
          // Force flush of syncs if we are carrying a full complement of syncFutures.
          if (this.syncFuturesCount.get() == this.syncFutures.length) {
            endOfBatch = true;
          }
        } else if (truck.type() == RingBufferTruck.Type.APPEND) {
          FSWALEntry entry = truck.unloadAppend();
          //TODO handle htrace API change, see HBASE-18895
          //TraceScope scope = Trace.continueSpan(entry.detachSpan());
          try {
            if (this.exception != null) {
              // Return to keep processing events coming off the ringbuffer
              return;
            }
            append(entry);
          } catch (Exception e) {
            // Failed append. Record the exception.
            this.exception = e;

这部分源码是说明sync操作的SyncFuture会被提交到SyncRunner中,这里可以注意SyncFuture实例其实并不是一个个提交到SyncRunner中执行的,而是以syncFutures(数组,多个SyncFuture实例)方式提交的。下面这部分源码是注释中说明批量刷盘的决策。

  protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
    writer.append(entry);
  }

SyncRunner是一个线程,wal实际有一个SyncRunner的线程组,专门负责之前append到文件缓存的刷盘工作。

 private class SyncRunner extends Thread {
    private volatile long sequence;
    // Keep around last exception thrown. Clear on successful sync.
    private final BlockingQueue<SyncFuture> syncFutures;
    private volatile SyncFuture takeSyncFuture = null;
class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware {
    private final SyncRunner[] syncRunners;
    private final SyncFuture[] syncFutures;

SyncRunner的线程方法(run())负责具体的刷写文件缓存到磁盘的工作。首先去之前提交的synceFutues中拿到其中sequence最大的SyncFuture实例,并拿到它对应ringbuffer的sequence。再去比对当前最大的sequence,如果发现比当前最大的sequence则去调用releaseSyncFuture()方法释放synceFuture,实际就是notify通知正被阻塞的sync操作,让工作线程可以继续往下继续。
前面解释了sequence是根据提交顺序过来的,并且解释了append到文件缓存的时候也是全局有序的,所以这里取最大的去刷盘,只要最大sequence已经刷盘,那么比这个sequence的也就已经刷盘成功。最后调用当前HadoopSequence文件writer刷盘,并notify对应的syncFuture。这样整个wal写入也完成了。

五、总结

Hbase的WAL机制是保证hbase使用lsm树存储模型把随机写转化成顺序写,并从内存read数据,从而提高大规模读写效率的关键一环。wal的多生产者单消费者的线程模型让wal的写入变得安全而高效。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-09 11:45:16  更:2021-12-09 11:46:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 7:36:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码