2021SC@SDUSC
Hash索引创建
上一篇博客我们讲了如何Hash索引的相关构成和原理,这一篇我们会根据源码详细分析Hash索引的创建过程。 构建一个Hash索引时,我们要进行相关的初始化,同时会用到扫描函数将该索引插入到Hash表中。接下来我们看一下与Hash索引创建的重要函数。
hashbuild函数
hashbuild函数时间里Hash索引的入口函数,我们来分析一下它的相关源码
IndexBuildResult *
hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
IndexBuildResult *result;
BlockNumber relpages;
double reltuples;
double allvisfrac;
uint32 num_buckets;
long sort_threshold;
HashBuildState buildstate;
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "index \"%s\" already contains data",
RelationGetRelationName(index));
estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
num_buckets = _hash_init(index, reltuples, MAIN_FORKNUM);
sort_threshold = (maintenance_work_mem * 1024L) / BLCKSZ;
if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
sort_threshold = Min(sort_threshold, NBuffers);
else
sort_threshold = Min(sort_threshold, NLocBuffer);
if (num_buckets >= (uint32) sort_threshold)
buildstate.spool = _h_spoolinit(heap, index, num_buckets);
else
buildstate.spool = NULL;
buildstate.indtuples = 0;
buildstate.heapRel = heap;
reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
hashbuildCallback,
(void *) &buildstate, NULL);
pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_TOTAL,
buildstate.indtuples);
if (buildstate.spool)
{
_h_indexbuild(buildstate.spool, buildstate.heapRel);
_h_spooldestroy(buildstate.spool);
}
result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
result->heap_tuples = reltuples;
result->index_tuples = buildstate.indtuples;
return result;
}
_hash_init函数
该函数是专用于初始化Hash表所用,其源码在hashpage.c文件中
uint32
_hash_init(Relation rel, double num_tuples, ForkNumber forkNum)
{
Buffer metabuf;,
Buffer buf;
Buffer bitmapbuf;
Page pg;
HashMetaPage metap;
RegProcedure procid;
int32 data_width;
int32 item_width;
int32 ffactor;
uint32 num_buckets;
uint32 i;
bool use_wal;
if (RelationGetNumberOfBlocksInFork(rel, forkNum) != 0)
elog(ERROR, "cannot initialize non-empty hash index \"%s\"",
RelationGetRelationName(rel));
use_wal = RelationNeedsWAL(rel) || forkNum == INIT_FORKNUM;
data_width = sizeof(uint32);
item_width = MAXALIGN(sizeof(IndexTupleData)) + MAXALIGN(data_width) +
sizeof(ItemIdData);
ffactor = RelationGetTargetPageUsage(rel, HASH_DEFAULT_FILLFACTOR) / item_width;
if (ffactor < 10)
ffactor = 10;
procid = index_getprocid(rel, 1, HASHSTANDARD_PROC);
*/
metabuf = _hash_getnewbuf(rel, HASH_METAPAGE, forkNum);
_hash_init_metabuffer(metabuf, num_tuples, procid, ffactor, false);
MarkBufferDirty(metabuf);
pg = BufferGetPage(metabuf);
metap = HashPageGetMeta(pg);
if (use_wal)
{
xl_hash_init_meta_page xlrec;
XLogRecPtr recptr;
xlrec.num_tuples = num_tuples;
xlrec.procid = metap->hashm_procid;
xlrec.ffactor = metap->hashm_ffactor;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHashInitMetaPage);
XLogRegisterBuffer(0, metabuf, REGBUF_WILL_INIT | REGBUF_STANDARD);
recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_META_PAGE);
PageSetLSN(BufferGetPage(metabuf), recptr);
}
num_buckets = metap->hashm_maxbucket + 1;
LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
for (i = 0; i < num_buckets; i++)
{
BlockNumber blkno;
CHECK_FOR_INTERRUPTS();
blkno = BUCKET_TO_BLKNO(metap, i);
buf = _hash_getnewbuf(rel, blkno, forkNum);
_hash_initbuf(buf, metap->hashm_maxbucket, i, LH_BUCKET_PAGE, false);
MarkBufferDirty(buf);
if (use_wal)
log_newpage(&rel->rd_node,
forkNum,
blkno,
BufferGetPage(buf),
true);
_hash_relbuf(rel, buf);
}
LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
bitmapbuf = _hash_getnewbuf(rel, num_buckets + 1, forkNum);
_hash_initbitmapbuffer(bitmapbuf, metap->hashm_bmsize, false);
MarkBufferDirty(bitmapbuf);
if (metap->hashm_nmaps >= HASH_MAX_BITMAPS)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("out of overflow pages in hash index \"%s\"",
RelationGetRelationName(rel))));
metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
metap->hashm_nmaps++;
MarkBufferDirty(metabuf);
if (use_wal)
{
xl_hash_init_bitmap_page xlrec;
XLogRecPtr recptr;
xlrec.bmsize = metap->hashm_bmsize;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHashInitBitmapPage);
XLogRegisterBuffer(0, bitmapbuf, REGBUF_WILL_INIT);
XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INIT_BITMAP_PAGE);
PageSetLSN(BufferGetPage(bitmapbuf), recptr);
PageSetLSN(BufferGetPage(metabuf), recptr);
}
_hash_relbuf(rel, bitmapbuf);
_hash_relbuf(rel, metabuf);
return num_buckets;
}
|