Connection 和 Admin,HTable 的理解
一个标准的 HBase 客户端程序的写法,来看具体实现:
HBaseConfuration conf = HBaseConfuration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
HTable table = connection.getTable();
admin.createTable();
table.put(Put put);
table.delete(Delete delete);
Result result = table.get(Get get);
ResultScanner rs = table.getScanner(Scan scan);
在第二步中,会创建 RpcClient。来看具体实现:
ConnectionFactory.createConnection(conf){
String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, ConnectionImplementation.class.getName());
Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class);
(Connection) constructor.newInstance(conf, pool, user){
ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException {
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
this.metaCache = new MetaCache(this.metrics);
this.registry = ConnectionRegistryFactory.getRegistry(conf){
this.znodePaths = new ZNodePaths(conf);
this.zk = new ReadOnlyZKClient(conf);
}
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
}
}
}
客户端的核心四大组件: AsyncProcess,异步处理器,负责提交请求 MetaCache,存在于客户端用于缓存 Meta 表的相关信息(一个 Map + 两个方法) ConnectionRegistryFactory,HBase 的 ZK 客户端,客户端会跟 ZK 打交道获取 Meta 的信息 RpcClient,RPC 客户端
Procedure 和 ProcedureExecutor 详解
当 HMaster 接收到一个 createTable() 的 RPC 请求的时候,封装成一个 CreateTableProcedure 提交到 ProcedureExecutor。ProcedureExecutor 在 HMaster 启动的时候,会初始化启动。 ProcedureExecutor 创建和初始化 ProcedureExecutor 启动 ProcedureExecutor 接收 Procedure 执行处理
HMaster.finishActiveMasterInitialization(){
createProcedureExecutor(){
MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
procedureStore = new RegionProcedureStore();
procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler);
procedureStore.start(numThreads);
procedureExecutor.init(numThreads, abortOnCorruption){
workerThreads = new CopyOnWriteArrayList<>();
for(int i = 0; i < corePoolSize; ++i) {
workerThreads.add(new WorkerThread(threadGroup));
}
scheduler.start();
load(abortOnCorruption);
}
}
}
HMaster.finishActiveMasterInitialization(){
startServiceThreads(){
startProcedureExecutor(){
procedureExecutor.startWorkers(){
for(WorkerThread worker : workerThreads) {
worker.start();
}
}
}
}
}
class WorkerThread{
public void run() {
Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS){
poll(unit.toNanos(timeout)){
final Procedure pollResult = dequeue(){
Procedure<?> pollResult = doPoll(metaRunQueue);
if(pollResult = = null) {
pollResult = doPoll(serverRunQueue);
} if(pollResult = = null) {
pollResult = doPoll(peerRunQueue);
} if(pollResult == null) {
pollResult = doPoll(tableRunQueue);
} return pollResult;
} return pollResult;
}
} this.activeProcedure = proc;
executeProcedure(proc);
}
}
HMaster.createTable(TableDescriptor tableDescriptor, final byte[][] splitKeys, final long nonceGroup, final long nonce){
TableDescriptor desc = getMasterCoprocessorHost().preCreateTableRegionsInfos(tableDescriptor);
String namespace = desc.getTableName().getNamespaceAsString();
RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(desc, splitKeys);
submitProcedure(new CreateTableProcedure(procedureExecutor.getEnvironment(), desc, newRegions, latch)){
getProcedureExecutor().submitProcedure(proc, nonceKey){
prepareProcedure(proc);
store.insert(proc, null);
return pushProcedure(proc){
procedures.put(currentProcId, proc);
scheduler.addBack(proc){
enqueue(procedure, addFront){
if(isMetaProcedure(proc)) {
doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
} else if(isTableProcedure(proc)) {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if(isServerProcedure(proc)) {
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
} else if(isPeerProcedure(proc)) {
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
} else {
throw new UnsupportedOperationException("");
}
}
}
class WorkerThread{
public void run() {
Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
executeProcedure(proc){
execProcedure(procStack, proc){
procedure.doExecute(getEnvironment()){
execute(env){
stateFlow = executeFromState(env, state){
CreateTableProcedure.executeFromState(final MasterProcedureEnv env, final CreateTableState state);
}
}
}
DDL 创建表服务端处理,CreateTableProcedure
CreateTableProcedure核心逻辑executeFromStat
CreateTableProcedure.executeFromState(final MasterProcedureEnv env, final CreateTableState state){
switch(state) {
case CREATE_TABLE_PRE_OPERATION:
boolean exists = !prepareCreate(env);
releaseSyncLatch();
if(exists) {
assert isFailed() : "the delete should have an exception here";
return Flow.NO_MORE_STATE;
} preCreate(env);
setNextState(CreateTableState.CREATE_TABLE_WRITE_FS_LAYOUT);
break;
case CREATE_TABLE_WRITE_FS_LAYOUT:
DeleteTableProcedure.deleteFromFs(env, getTableName(), newRegions, true);
newRegions = createFsLayout(env, tableDescriptor, newRegions);
env.getMasterServices().getTableDescriptors().update(tableDescriptor, true);
setNextState(CreateTableState.CREATE_TABLE_ADD_TO_META);
break;
case CREATE_TABLE_ADD_TO_META:
newRegions = addTableToMeta(env, tableDescriptor, newRegions);
setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS);
break;
case CREATE_TABLE_ASSIGN_REGIONS:
setEnablingState(env, getTableName());
addChildProcedure(env.getAssignmentManager().createRoundRobinAssignProcedures(...));
setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE);
break;
case CREATE_TABLE_UPDATE_DESC_CACHE:
setEnabledState(env, getTableName());
setNextState(CreateTableState.CREATE_TABLE_POST_OPERATION);
break;
case CREATE_TABLE_POST_OPERATION:
postCreate(env);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
Meta 表初始化 InitMetaProcedure
如果 HBase 集群是第一次启动,初始化 Meta 表
HMaster.finishActiveMasterInitialization(){
InitMetaProcedure temp = new InitMetaProcedure();
procedureExecutor.submitProcedure(temp);
}
InitMetaProcedure.executeFromState(MasterProcedureEnv env, InitMetaState state){
switch(state) {
case INIT_META_WRITE_FS_LAYOUT:
TableDescriptor td = writeFsLayout(rootDir, conf);
setNextState(InitMetaState.INIT_META_ASSIGN_META);
return Flow.HAS_MORE_STATE;
case INIT_META_ASSIGN_META:
addChildProcedure(env.getAssignmentManager().createAssignProcedures(....));
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
InitMetaProcedure.writeFsLayout(){
FileSystem fs = rootDir.getFileSystem(conf);
Path tableDir = CommonFSUtils.getTableDir(rootDir, TableName.META_TABLE_NAME);
if(fs.exists(tableDir) && !fs.delete(tableDir, true)) {
LOG.warn("Can not delete partial created meta table, continue...");
}
TableDescriptor metaDescriptor = FSTableDescriptors.tryUpdateAndGetMetaTableDescriptor(conf, fs, rootDir);
HRegion.createHRegion(RegionInfoBuilder.FIRST_META_REGIONINFO, rootDir, conf, metaDescriptor, null){
createRegionDir(conf, info, rootDir);
HRegion region = HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, rsRpcServices);
if(initialize) {
region.initialize(null){
initializeRegionInternals(reporter, status){
initializeStores(reporter, status){
for(final ColumnFamilyDescriptor family : htableDescriptor.getColumnFamilies()) {
completionService.submit(new Callable<HStore>() {
@Override
public HStore call() throws IOException {
return instantiateHStore(family, warmup){
if(family.isMobEnabled()) {
if(HFile.getFormatVersion(this.conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
return new HMobStore(this, family, this.conf, warmup);
}
}
return new HStore(this, family, this.conf, warmup){
this.dataBlockEncoder = new HFileDataBlockEncoderImpl(....);
this.memstore = getMemstore(){
MemStore ms = null;
switch(inMemoryCompaction) {
case NONE:
ms = DefaultMemStore.class;
break;
default:
ms = CompactingMemStore.class;
} return ms;
} this.storeEngine = createStoreEngine(....);
List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
}
}
}
});
}
}
if(!isRestoredRegion && ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
Collection<HStore> stores = this.stores.values();
stores.forEach(HStore::startReplayingFromWAL);
replayRecoveredEditsIfAny(maxSeqIdInStores, reporter, status);
loadRecoveredHFilesIfAny(stores);
stores.forEach(HStore::stopReplayingFromWAL);
}
this.splitPolicy = RegionSplitPolicy.create(this, conf);
splitRestriction = RegionSplitRestriction.create(getTableDescriptor(), conf);
this.flushPolicy = FlushPolicyFactory.create(this, conf);
}
}
}
} return metaDescriptor;
}
env.getAssignmentManager().createAssignProcedures(){
new TransitRegionStateProcedure(env, region, targetServer, forceNewPlan, TransitionType.ASSIGN);
}
***
TransitRegionStateProcedure.executeFromState(MasterProcedureEnv env, RegionStateTransitionState state){
switch(state) {
case REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE:
queueAssign(env, regionNode);
return Flow.HAS_MORE_STATE;
case REGION_STATE_TRANSITION_OPEN:
openRegion(env, regionNode);
return Flow.HAS_MORE_STATE;
case REGION_STATE_TRANSITION_CONFIRM_OPENED:
return confirmOpened(env, regionNode);
case REGION_STATE_TRANSITION_CLOSE:
closeRegion(env, regionNode);
return Flow.HAS_MORE_STATE;
case REGION_STATE_TRANSITION_CONFIRM_CLOSED:
return confirmClosed(env, regionNode);
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
从来不完美,一直不放弃,先模仿
|