HBase源码分析(二) 2021SC@SDUSC
前言
在 HBase中,大部分的操作都是在RegionServer完成的,Client端想要插入、删除、查询数据都需要先找到相应的 RegionServer。什么叫相应的RegionServer?就是管理你要操作的那个Region的RegionServer。Client本身并 不知道哪个RegionServer管理哪个Region,那么它是如何找到相应的RegionServer的?此篇文章可以分析此问题。
一、HRegionServer作用
HRegionServer作用如下:
使得被它管理的一系列HRegion能够被客户端来使用,每个HRegion对应了Table中的一个Region,HRegion中由多个HStore组成。 主要负责响应用户I/O请求,向HDFS文件系统中读写数据。
二、对Client端代码分析
1.put方法:
可以一次put一行记录也可以一次put多行记录,两个方法内部都会调用doPut方法,最后再来根据autoFlush(默认为true)判断是否需要flushCommits ,在autoFlush为false的时候,如果当前容量超过了缓冲区大小(默认值为:2097152=2M),也会调用flushCommits方法。也就是说,在自动提交情况下,你可以手动控制通过一次put多条记录(这时候缓冲区不会满),然后将这些记录flush,以提高写操作tps。
`
@Override public void put(final Put put) throws IOException { validatePut(put); ClientServiceCallable callable = new ClientServiceCallable(this.connection, getName(), put.getRow(), this.rpcControllerFactory.newController(), put.getPriority()) {
@Override
public void put(final List<Put> puts) throws IOException {
for (Put put : puts) {
validatePut(put);
}
2.用doput代码判断
validatePut(put); //验证Put有效,主要是判断kv的长度 writeBuffer.add(put); //写入缓存 currentWriteBufferSize += put.heapSize(); //计算缓存容量 if (currentWriteBufferSize > writeBufferSize) { flushCommits(); //如果超过缓存容量,则调用flushCommits()
3.flushCommits方法如下:
`
public void flushCommits() throws IOException { // no-op } ``
其核心是调用this.connection的processBatch方法,其参数有:writeBuffer、tableName、pool、results
4.ConnectionImplementation的processBatch方法:
public void processBatch(List<? extends Row> list,
final byte[] tableName,
ExecutorService pool,
Object[] results) throws IOException, InterruptedException {
if (results.length != list.size()) {
throw new IllegalArgumentException("argument results must be the same size as argument list");
}
processBatchCallback(list, tableName, pool, results, null);
}
最后是调用的processBatchCallback方法,第五个参数为空,即没有回调方法。
processBatchCallback方法内部可以失败后进行重试,重试次数为hbase.client.retries.number控制,默认为10,每一次重试直接都会休眠一下
过程如下:
第一步: 查找HRegion所在位置过程关键在private HRegionLocation locateRegion(final byte [] tableName,final byte [] row, boolean useCache)方法中,并且为递归方法,过程如下:调用locateRegionInMeta方法到.META.表中查找tableName的row所对应的HRegion所在位置,先从本地缓存查找,如果没有,则进行下一步; 调用locateRegionInMeta方法到-ROOT-表中查找.META.所对应的HRegion所在位置,先从本地缓存查找,如果没有,则进行下一步 通过rootRegionTracker(即从zk上)获取RootRegionServer地址,即找到-ROOT-表所在的RegionServer地址,然后获取到.META.所在位置,最后在获取.META.表上所有HRegion,并将其加入到本地缓存。 说明:
当我们创建一个表时,不管是否预建分区,该表创建之后,在.META.上会有一条记录的。 在客户端第一次连接服务端时,会两次查询缓存并没有查到结果,最后在通过-ROOT-–>.META.–>HRegion找到对应的HRegion所在位置。 第二步: 第二步中,先是创建到RegionServer的连接,后是调用RegionServer上的multi方法,显然这是远程调用的过程。
三.对Server端代码分析
客户端的相关代码分析完后,对服务器端的代码继续分析。
1.multi方法:
对于客户端写操作,最终会调用HRegionServer的multi方法。
因为传递到RegionServer都是按regionName分组的,故最后的操作实际上都是调用的HRegion对象的方法。
该方法主要就是遍历multi并对actionsForRegion按rowid进行排序,然后分类别对action进行处理,Put和Delete操作会放到一起然后调用batchMutate方法批量提交。
OperationStatus[] codes =region.batchMutate(mutationsWithLocks.toArray(new Pair[]{}));
对于Put和Delete操作(保存在mutations中),在处理之前,先通过cacheFlusher检查memstore大小吃否超过限定值,如果是,则进行flush。
接下来遍历mutations,为每个Mutation添加一个锁lock,然后再调用region的batchMutate方法。
2.batchMutate:
batchMutate方法内部,依次一个个处理:
先检查是否只读 检查当前资源是否支持update操作,会比较memstoreSize和blockingMemStoreSize大小,然后会阻塞线程 调用startRegionOperation,给lock.readLock()加锁 调用doPreMutationHook执行协作器里的一些方法 计算其待添加的大小 计算加入memstore之后的memstore大小 写完之后,释放lock.readLock()锁 判断是否需要flush memstore,如果需要,则调用requestFlush方法,其内部实际是通过RegionServerServices中的FlushRequester(其实现类为MemStoreFlusher)来执行flush操作
MemStoreFlusher flush过程: HRegion中的requestFlush方法:
@Override
public void requestFlush(FlushLifeCycleTracker tracker) throws IOException {
requestFlush0(tracker);
}
/**
* This method modifies the region's configuration in order to inject replication-related
* features
* @param conf region configurations
*/
static void decorateRegionConfiguration(Configuration conf) {
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
String plugins = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,"");
String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
if (!plugins.contains(replicationCoprocessorClass)) {
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
(plugins.equals("") ? "" : (plugins + ",")) + replicationCoprocessorClass);
}
}
}
上面this.rsServices.getFlushRequester()其实际上返回的是MemStoreFlusher类。 MemStoreFlusher构造方法:
初始化threadWakeFrequency,该值由hbase.server.thread.wakefrequency设置,默认为10 * 1000 初始化globalMemStoreLimit,该值为最大堆内存乘以hbase.regionserver.global.memstore.upperLimit的值,hbase.regionserver.global.memstore.upperLimit参数默认值为0.4 初始化globalMemStoreLimitLowMark,该值为最大堆内存乘以hbase.regionserver.global.memstore.lowerLimit的值,hbase.regionserver.global.memstore.lowerLimit参数默认值为0.35 初始化blockingWaitTime,该值由hbase.hstore.blockingWaitTime设置,默认为90000 MemStoreFlusher实现了Runnable接口,在RegionServer启动过程中会启动一个线程,其run方法逻辑如下:
只要RegionServer一直在运行,该线程就不会停止运行 每隔threadWakeFrequency时间从flushQueue中取出一个对象 如果取出的对象为空或者WakeupFlushThread,则判断:如果当前RegionServer的总大小大于globalMemStoreLimit值,则找到没有太多storefiles(只个数小于hbase.hstore.blockingStoreFiles的,该参数默认值为7)的最大的region和不管有多少storefiles的最大region,比较两个大小找出最大的一个,然后flush该region,并休眠1秒;最后在唤醒flush线程 先flush region上的memstore,这部分代码通过HRegion的internalFlushcache方法来完成,其内部使用了mvcc 判断是否该拆分,如果是则拆分 - 判断是否该压缩合并,如果是则合并 如果如果取出的对象为FlushRegionEntry,则flush该对象。 如果当前region不是meta region并且当前region的storefiles数大于hbase.hstore.blockingStoreFiles,先判断是否要拆分,然后再判断是否需要合并小文件。这个过程会阻塞blockingWaitTime值定义的时间。 - 否则, 直接flush该region上的memstore(调用HRegion的internalFlushcache方法),然后再判断是否需要拆分和合并
四.总结
HRegion定位过程:
client -> zookeeper -> -ROOT- -> .META -> HRegion地址 -> HRegionServer-> HRegion
在这个过程中客户端先通过zk找到Root表所在的RegionServer(通过zk上的/hbase/root-region-server节点获取),然后找到Meta表对应的HRegion地址,最后在Meta表里找到目标表所在的HRegion地址,这个过程客户端并没有和HMaster进行交互。
Client端并不会每次数据操作都做这整个路由过程,因为HRegion的相关信息会缓存到本地,当有变化时,通过zk监听器能够及时感知。
|