Write In Front
或许是因为我几乎没写过比较大且完整的项目的原因,从编程环境配置到C++语法掌握,再到整个项目组织设计模式,我已经记不得多少次拍手称赞。 期间遇到各种问题,很多时候想过要不要放弃了,但是经过一次次内心的挣扎之后,耗时一个月满分完成了这个项目。
收获很多,感谢Bustub教学组!!!
PROJECT #1 - BUFFER POOL
TASK #1 - LRU REPLACEMENT POLICY
核心概念
-
page_id
磁盘被划分为若干块(页),每个块都用唯一的 page_id 来标识,disk_manager 就是根据 page_id 来读取和写入磁盘页
-
frame_id
磁盘页加载到内存,内存中也需要有相应的内存页(人为划分的内存页),BufferPoolManager(BMP)管理的正是这些内存块,和磁盘类似,内存页也需要有相应的标识来唯一标记这些内存页,frame_id 就是唯一标识内存页的标记
-
lru_hash
lru_hash 记录可以被置换出内存的那些内存页的 frame_id_t,用于在 O(1)时间查询某个 frame_id 是否可以被置换出内存
-
lru_list
lru_list 是一个双向链表,记录能够被置换出内存的页面的frame_id, 后加入链表的元素被放到链表头,尾部则是最近最久未使用的页面对应的 frame_id。
-
max_size
max_size 是 lru_replacer 可以管理的最大的置换页面数量,本文和 BPM 大小一致
-
Pin
把 frame_id 对应页面剔除 lru_list, 表示该页面被线程占用,不可以被置换出去
-
Unpin
把 frame_id 对应的页面加入 lru_list,表示该页面可以被加入lru_replacer
-
Victim
查看 lru_replacer 是否有可以被置换出的页面,有的话则取出 frame_id
数据结构
- 主要成员
std::mutex mtx; // 互斥锁
std::list<frame_id_t> lru_List; // 保存能够置换的页面信息,这里用 frame_id 标记相应页面
std::unordered_map<frame_id_t, std::list<frame_id_t>::iterator> lru_hash; // 便于快速判断某个 frame_id 对应页面是否在 lru中
size_t max_size; // lru_replacer 最大可以管理的置换页面数量
- 主要函数
/**
* @brief 使用 LRU 策略选择出待从内存移除的页面对应的页帧编号(frame_id)
*
* @param frame_id 用于记录被替换出的页面对应的 frame_id,如果没有可被替换出的页面,则返回 false, frame_id 指向 nullptr
* @return true 如果存在待移除的页面,返回 true
* @return false 当不存在待移除的页面时,返回 false
*/
bool LRUReplacer::Victim(frame_id_t *frame_id);
/**
* @brief 固定 frame_id 对应的页面,表示该页面不可以被置换,即从 replacer 中移除该页
*
* @param frame_id 将要从 replacer 中移除的页对应的 frame_id
*/
void LRUReplacer::Pin(frame_id_t frame_id);
/**
* @brief 加入一个新的可以置换的页面对应的 frame_id 到 replacer
*
* @param frame_id 待加入的 frame_id
*/
void LRUReplacer::Unpin(frame_id_t frame_id);
Other
- Victim 函数为什么不把选中的 frame_id 页面移动到 lru_List 头部(最新访问的移动到头部)
Victim 这里只用于选中 frame_id, Victim 一般搭配 Unpin 操作使用,而 Unpin 操作会把 frame_id 移动到头部,相当于把选择 frame_id 和 更新 frame_id 拆分了,由更上层进行调用
TASK #2 - BUFFER POOL MANAGER INSTANCE
核心概念
主要函数
-
函数理解
需要注意的是以下函数都是 BPM 调用的,主要是对 BPM 的理解,本质上 BPM 的以下操作都是管理的内存页面 Page 数组(除了AllocatePage、DeallocatePage、FindVictimPage),以 FetchPgImp 为例,站在系统角度,需要从主存取 page_id 对应的页面,显示查看内存是否已经调入了该页面,如果没有,再考虑从磁盘读入。
-
FetchPgImp VS NewPgImp
- FetchPgImp是从 buffer pool 取 page_id 对应页面,如果不存在,则从磁盘调入,会把数据一起写入 buffer pool 内存页
- NewPgImp 是从磁盘创建一个新的物理页,然后在内存 buffer pool 找到一个位置放入,因为新申请的物理页没有有效的 data。因此, NewPgImp 不需要 ReadPage 操作,只需要完成元数据更新,ResetMemory 即可
- 主要函数
// Fetch the requested page from the buffer pool.
Page *FetchPgImp(page_id_t page_id);
// Creates a new page in the buffer pool.
Page *NewPgImp(page_id_t *page_id);
// Deletes a page from the buffer pool.
bool DeletePgImp(page_id_t page_id);
// Unpin the target page from the buffer pool.
bool UnpinPgImp(page_id_t page_id, bool is_dirty);
// Flushes the target page to disk.
bool FlushPgImp(page_id_t page_id);
// Flushes all the pages in the buffer pool to disk.
void FlushAllPgsImp();
// Allocate a page on disk.?
page_id_t AllocatePage();
// Deallocate a page on disk.
void DeallocatePage(__attribute__((unused)) page_id_t page_id)
// 自定义函数,查找空闲的 buffer pool Page,优先从 free_list_查找,其次采用 LRU 策略置换页面
bool FindVictimPage(frame_id_t *frame_id);
PROJECT #2 - EXTENDIBLE HASH INDEX
Task #1 PAGE LAYOUTS
HashTableDirectoryPage
- 成员变量
page_id_t page_id_; // 目录页自身的页编号
lsn_t lsn_; // 页面的 lsm 用于 recovery
uint32_t global_depth_{0}; // 目录全局深度
uint8_t local_depths_[DIRECTORY_ARRAY_SIZE]; // 每个数据桶(bucket_page) 的局部深度
page_id_t bucket_page_ids_[DIRECTORY_ARRAY_SIZE]; // 每个数据桶所在的物理页面编号
- 获取镜像桶编号
uint32_t HashTableDirectoryPage::GetSplitImageIndex(uint32_t bucket_idx) {
int n = local_depths_[bucket_idx]; // local_depth 记录分裂过后的局部深度
if (n == 0) { // 整个 hash 表只剩下最后一个桶了,这个就是最初的那个深度为 0 的桶,该桶不能进行合并了
return 0;
}
return (1 << (n - 1)) ^ bucket_idx; // 希望获得由分裂前深度 +0 or +1 得到的镜像桶的 index,则需要1 << (n - 1)
}
- 当局部深度和全局深度相同的桶需要分裂时,增加全局深度,降低时不需要这么复杂
/**
* 以 global_depth = 1 grow 到 global_depth = 2 为例:
* 0 -> b1
* 1 -> b1
* after grow :
* 00 -> b1
* 01 -> b2
* 10 -> b1
* 11 -> b2
* 本质上是在 grow 前的每个 bucket_idx 的二进制表示形式最高位的再高一位加上 1, 如原先的 0 + (1 << gd) -> 10 , 1 + (1 <<
* gd) -> 11 这样就可以保证所有的数在加上一个数后末尾的二进制表示形式总是不变的,也就使得 i + orig_max_bucket 与 i
* 的二进制表示形式具有相同的低位二进制表示 本质上 i + orig_max_bucket 就是该桶分裂后的新桶的 bucket_idx
*/
void HashTableDirectoryPage::IncrGlobalDepth() {
int orig_max_bucket = 1 << global_depth_; // 原来总的桶的数量
for (int i = 0; i < orig_max_bucket; ++i) { // 遍历原来的每个桶索引 bucket_idx
bucket_page_ids_[i + orig_max_bucket] =
bucket_page_ids_[i]; // 编号为 i 和编号为 i + orig_max_bucket 的索引指向同一个桶
local_depths_[i + orig_max_bucket] =
local_depths_[i]; // 编号为 i 和编号为 i + orig_max_bucket 的桶的初始时深度保持一致
}
global_depth_++;
}
Hash Table Bucket Page
-
成员变量 /*
* Bucket page format (keys are stored in order):
* ----------------------------------------------------------------
* | KEY(1) + VALUE(1) | KEY(2) + VALUE(2) | ... | KEY(n) + VALUE(n)
* ----------------------------------------------------------------
*/
// 用比特位标记某个<key, value>位置是否被占用(这主要用于linear probe hash),一旦插入过数据,即便删除也是1,Extendiable Hash 没用到
// 0 if tombstone/brand new (never occupied), 1 otherwise.
char occupied_[(BUCKET_ARRAY_SIZE - 1) / 8 + 1];
// 标记某个位置是否已经插入数据
char readable_[(BUCKET_ARRAY_SIZE - 1) / 8 + 1];
MappingType array_[0]; // 零长度数组 :occupied_ 和 readable_ 剩下的空间都用于数组
-
实现细节
-
readable_ 为 1 表示该位置为有效的键值对,0表示该位置为 0 -
occupied_ 为 0 表示该位置从未被使用过,1 表示该位置被占据过,当键值对删除时,occupied_值仍然为 1 ,该位不表示空与否,Extensible hash 其实可以不需要,这是linear probe hash 中锁使用的,tombstone的设计主要是为了在开放寻址法中防止探测中断 -
readable_ 设置位,采用 BitMap 思想实现
readable 为 char 类型,每个单位占 8bit ,我们可以把 readable—— 数组看成 r 行 8 列的数组,每个 bit,当数字 n 出现时,对应 n / 8 行 和 n % 8 列的数应该置为 1。当数组变成 int(32-bit) 类型时候,列数则为 32
// set bucket_idx
int r = bucket_idx / 8;
int c = bucket_idx % 8;
readable_[r] |= (1 << c); // 其他位置不变,相应位置设置为 1
// clear bucket_idx
int r = bucket_idx / 8;
int c = bucket_idx % 8;
readable_[r] &= (~(1 << c)); // 其他位置不变,相应位置设置为 0
-
MappingType array_[0] 采用零长度数组实现,整个 Bucket 类除去 occupied_ & readable_ 数组,其余空间全部用于这个数组 -
Bitmap 简介 -
获取桶内某个键对应的值 template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::GetValue(KeyType key, KeyComparator cmp, std::vector<ValueType> *result) {
for (size_t i = 0; i < BUCKET_ARRAY_SIZE; ++i) { // 遍历当前桶(页)所有 <key, value>
if (IsReadable(i) &&
!cmp(KeyAt(i), key)) { // 如果该位置为有效键值对且该位置的键和传入的 key 相当 ,那么加入该位置的值到 result
result->emplace_back(ValueAt(i));
}
}
return !result->empty(); // 如果存在至少一个 key 对应的 value,则返回 true, 否则返回 false
}
-
插入键值对到桶中,可以重复键,但是不可以重复键值对 template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::Insert(KeyType key, ValueType value, KeyComparator cmp) {
// 1. 检查待插入的键值对是否已经存在
for (size_t i = 0; i < BUCKET_ARRAY_SIZE; ++i) {
// cmp 相等时返回 0, !cmp(x,x) 表示相等为真
// IsReadable(i) 为真表示 Bucket 第 i 个位置为有效的键值对
if (IsReadable(i) && !cmp(KeyAt(i), key) && ValueAt(i) == value) {
return false; // <key, value> pair 重复
}
}
for (size_t i = 0; i < BUCKET_ARRAY_SIZE; ++i) {
if (!IsReadable(i)) { // 该位置没有有效的键值对,即该位置可以插入(无论Occupied 与否),即存在空位置
SetOccupied(i); // 表示该位置已经被占用,删除时,Ocuupied 值不变
SetReadable(i); // 该位置已经插入有效键值对
array_[i] = MappingType(key, value); // 插入新的键值对
return true;
}
}
return false; // 没有空的位置,插入失败
}
-
删除某个键值对只需要更改 readable_ 即可 template <typename KeyType, typename ValueType, typename KeyComparator>
void HASH_TABLE_BUCKET_TYPE::RemoveAt(uint32_t bucket_idx) {
// char 类型数组,每个单位 8 bit,能标记8个位置是否有键值对存在, bucket_idx / 8 找到应该修改的字节位置
// bucket_idx % 8 找出在该字节的对应 bit 位 pos_
// 构建一个 8 位长度的,除该 pos_ 位为 0 外,其他全为 1 的Byte ,例如: 11110111
// 然后与相应字节取按位与操作,则实现清除该位置的 1 的操作,而其他位置保持不变
readable_[bucket_idx / 8] &= (~(1 << (bucket_idx % 8)));
}
-
判断某个位置是否存在键值对 template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_BUCKET_TYPE::IsReadable(uint32_t bucket_idx) const {
return readable_[bucket_idx / 8] & (1 << (bucket_idx % 8)); // 只要存在,那么相应位置为 000001000000,肯定不为 0
}
Task #2 HASH TABLE IMPLEMENTATION
这个ExtendiableHash 很繁琐,实现细节非常多,需要非常了解其原理才可以正确实现,建议先看看Extendible Hashing (Dynamic approach to DBMS),有个粗略的了解。然后理解其全局深度和局部深度之间的关系,搞明白分裂桶和镜像桶之间的区别与联系很重要。
实现细节
- 分裂桶编号的扩展方向
- 本文采用的是低位向高位扩展的方式,最开始全局深度为0,指向第一个桶。
- 低位向高位扩展举例:
假设局部深度为2,原先桶编号为 10, 那么其分裂的两个桶编号分别为 110 和 010 - 如何根据 key 获取其应该存在的桶的编号?
用 key 值和全局深度对应的全1的掩码按位与操作获得桶编号template <typename KeyType, typename ValueType, typename KeyComparator>
inline uint32_t HASH_TABLE_TYPE::KeyToDirectoryIndex(KeyType key, HashTableDirectoryPage *dir_page) {
return Hash(key) & dir_page->GetGlobalDepthMask();
}
- 根据 key 获取其在桶内的值,这就是个取哈希值的过程
template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_TYPE::GetValue(Transaction *transaction, const KeyType &key, std::vector<ValueType> *result) {
// 添加读锁
table_latch_.RLock();
// 取目录页和 key 所在的桶页
auto dpg = FetchDirectoryPage();
auto bpg = FetchBucketPageByKey(key);
auto flag = bpg->GetValue(key, comparator_, result);
// !!! 一定不要忘了 UnPin 页面,否则 BufferPool 会因为一直加入页面而无法替换出页面,导致BufferPool 溢出
buffer_pool_manager_->UnpinPage(directory_page_id_, false, nullptr);
buffer_pool_manager_->UnpinPage(KeyToPageId(key, dpg), false, nullptr);
// 解锁
table_latch_.RUnlock();
return flag;
}
- 关键的插入操作
template <typename KeyType, typename ValueType, typename KeyComparator>
bool HASH_TABLE_TYPE::Insert(Transaction *transaction, const KeyType &key, const ValueType &value) {
table_latch_.WLock();
page_id_t bucket_page_id = KeyToPageId(key, FetchDirectoryPage()); // 必须提前获取页面,不能等到分裂再获取
auto bpg = FetchBucketPageByKey(key); // 取key 所在的桶页面
bool flag = bpg->Insert(key, value, comparator_); // 记录是否插入成功
buffer_pool_manager_->UnpinPage(directory_page_id_, false, nullptr);
// 释放写锁
table_latch_.WUnlock();
// 如果插入失败,需要判断是因为重复 <key, value> 键值对导致的失败还是因为桶满了导致的失败
// TODO(zhangw) 这里是否需要对桶进行枷锁操作?
if (!flag) {
std::vector<ValueType> result;
bpg->GetValue(key, comparator_, &result);
auto iter = std::find(result.begin(), result.end(), value);
if (iter == result.end()) { // 如果是桶满导致的失败
flag = SplitInsert(transaction, key, value);
}
}
// page_id_t bucket_page_id = KeyToPageId(key, dpg); // 这里 key 获取的 page_id
// 可能会因为分裂而改变,因此这里是错误的
buffer_pool_manager_->UnpinPage(bucket_page_id, true, nullptr);
return flag;
}
- 什么时候需要分裂,什么时候需要增加全局深度?
- 当插入的桶满了就需要分裂
- 当插入的桶需要分裂且该桶的局部深度等于全局深度时则增加全局深度
- 注意全局深度增加会导致 key 到桶编号映射的改变,因此需要更新此时因为全局深度改变增加出来的那些桶的指向,保证其指向正确非常重要。
- 增加全局深度后,需要及时更新分裂桶和镜像桶的指向
- 这里比较难理解,直白点就是把由分裂前global_depth 长度的后缀分流为由 global_depth + 1 长度后缀的两部分(这两部分只有最高位不同0 or 1),然后把数据根据 global_depth + 1 位置的不同,设置其新的页面号。
// 把指向溢出桶的目录项的相应部分映射到新桶的 page_id 上去
size_t ld = dpg->GetLocalDepth(overflow_bucket_dir_idx);
size_t local_depth_bits =
overflow_bucket_dir_idx &
((1 << ld) - 1); // 获取overflow_bucket_idx 的最低的 local_depth 个 bit ,等价于 % (1 << ld)
for (size_t i = local_depth_bits; i < dpg->Size(); i += (1 << ld)) {
if (((i >> ld) & 1) != ((overflow_bucket_dir_idx >> ld) & 1)) {
dpg->SetBucketPageId(i, img_page_id);
}
dpg->IncrLocalDepth(i);
}
- Merge 操作,当删除元素后桶为空时,则指向 Merge 操作
// local_depth > 0 && local_depts equality && page_id not equality
// 需要两个桶 page_id 不相同,是因为当Merge 到最后只剩下一个桶(目录项编号为
// 0)时,该桶不会被删除(保证以下程序正确执行)
if (dpg->GetLocalDepth(bucket_dir_idx) > 0 &&
dpg->GetLocalDepth(bucket_dir_idx) == dpg->GetLocalDepth(img_bucket_dir_idx) &&
dpg->GetBucketPageId(img_bucket_dir_idx) != dpg->GetBucketPageId(bucket_dir_idx)) {
size_t ld = dpg->GetLocalDepth(bucket_dir_idx) - 1;
size_t local_bits = bucket_dir_idx & ((1 << ld) - 1);
for (size_t i = local_bits; i < dpg->Size(); i += (1 << ld)) {
if (((i >> ld) & 1) == ((bucket_dir_idx >> ld) & 1)) {
dpg->SetBucketPageId(i, img_bucket_page_id); // 把原先指向空桶的指针指向它的镜像桶(空桶被删除,镜像桶保留)
}
dpg->DecrLocalDepth(i); // 所有具有相同 local_bits 的桶(原先指向空桶和镜像桶的目录项)都执行 local_depth - 1
}
- 如果所有桶局部深度都小于 global_depth,那么需要遍历所有桶,并把空桶都合并了,整个过程是递归进行的,直到最后一个桶是0号桶时,此时其镜像编号也为0,不满足合并条件退出
if (dpg->CanShrink()) {
dpg->DecrGlobalDepth();
for (size_t i = 0; i < dpg->Size(); ++i) {
HASH_TABLE_BUCKET_TYPE *bpg = reinterpret_cast<HASH_TABLE_BUCKET_TYPE *>(
buffer_pool_manager_->FetchPage(dpg->GetBucketPageId(i))->GetData());
if (bpg->IsEmpty()) {
buffer_pool_manager_->UnpinPage(dpg->GetBucketPageId(i), false, nullptr); // 一定要Unpin
MergeCore(dpg, i);
} else {
buffer_pool_manager_->UnpinPage(dpg->GetBucketPageId(i), false, nullptr);
}
}
}
坑点
- 每次取页,一定要调用
Unpin 操作,否则BUfferPool 很快就炸了 - Merge 的时候要判断是否能够降低 global_depth,可以的话要把所有空桶都删除了,而不是仅删除由删除数据时触发合并的那个桶
Task #3 CONCURRENCY CONTROL
- 在 Task 2 实现时对涉及数据更新的 Remove 、Insert 等操作加锁即可实现并发操作
PROJECT #3 - QUERY EXECUTION
主要类介绍
type
- Type
- 自定义的数据库表Column列数据的类型,使用成员变量
type_id_ 区分不同子类型 - 定义了数据相关操作,包括加减乘除、取模等运算
- 定义不同数据比较运算,函数传入参数为
Value 类,根据传入的Value 值的 type_id 类型选择相应的处理方式 - 共定义了
NumericType IntegerParentType TinyintType SmallintType IntegerType BigintType DecimalType TimestampType BooleanType VarlenType 10 种数据类型 - Value
- IntegerType
- 定义宏
INT_COMPARE_FUNC(OP) 和 INT_MODIFY_FUNC(METHOD,OP) - 前者用于根据比较方式
OP 来比较大小,返回内置类型 CmpTrue or CmpFalse - 后者传入
METHOD 如 MultiplyValue 来替换,其中具体加减乘除定义在integer_parent.h|cpp 中 - 其他类型原理类似,这里只举例IntegerType
catalog
- Column
- 用于表示数据库的列。
- 包括列名(
column_name_ ),列类型(column_type_ ),固定列长度(fixed_length_ ),可变列长度(variable_length_ ),列偏移量(column_offset_ )等参数。 - 除了 VARCHAR UnInlined ,其余类型均为 Inlined 类型。
- 对于非VARCHAR类型(inlined)
fixed_length_ 为数据长度,否则为指针长度。 而variable_length_用于表示VARCHAR 类型长度,对于Inlined类型长度为0,具体参考column.h|cpp 文件。 - Schema
- 定义单个 tuple(数据库表中的每一个record,即每一行)的模式
- 可以根据索引获取列,根据列名获取索引编号
length_ :定义每个 tuple 的总长度 (每个tuple 占用的总字节数)std::vector<Column> columns_ :记录整个 tuple 的所有列数据tuple_is_inlined_ :记录是否所有元素均为非VARCHAR类型std::vector<uint32_t> uninlined_columns_ :记录所有非Inlined类型的在tuple中的索引 - TableGenerator
- 创建测试数据库表,随机填充数据
TableInsertMeta :定义数据库表的元数据信息,包括表名name_ ,行数num_rows ,列数据模式std::vector<ColumnInsertMeta> col_meta_ ColumnInsertMeta :定义列元数据信息,包括列名、数据类型、是否可为空值、最小值、最大值等信息。 - TableInfo
- Catalog
tuple
- tuple 结构
tuple 由 std::vector<Value> 根据 Schema 中指示的偏移量插入到 tuple 的字符数组data_ 构建而成tuple 在本项目中不仅用于存储数据库的每一行数据,而且也用于索引的存储,是一种复用的数据结构
- tuple 插入是会设置 RID,而在作为索引的 key 时,构造函数是根据
key_attrs 和 key_schema 构建的,此时并没有设置成员变量 RID - 无论是可变长度(Inlined)数据还是非可变长度(Un-Inlined)数据在构建
tuple 模式 Schema 中计算偏移量时都采用固定的长度。在 column.h 文件中指出了,对可变长度数据,其计算偏移量时,使用指针类型大小计算,而对于非可变类型数据则采用固定类型长度(fixed_length_ 在Column 构造函数中调用TypeSize 函数得到)。因此,每个 Schema 也就是每个 tuple 在插入数据前就会确定每个Column 的偏移量。 Schema::Schema(const std::vector<Column> &columns) : tuple_is_inlined_(true) {
uint32_t curr_offset = 0;
for (uint32_t index = 0; index < columns.size(); index++) {
Column column = columns[index];
// handle uninlined column
if (!column.IsInlined()) { // 如果是可变长数据
tuple_is_inlined_ = false;
uninlined_columns_.push_back(index);
}
// set column offset
column.column_offset_ = curr_offset;
curr_offset += column.GetFixedLength();
// add column
this->columns_.push_back(column);
}
// set tuple length
length_ = curr_offset; // 整个 tuple 长度,不包括可变数据本身,对同一个表而言其长度是固定的
}
/** For a non-inlined column, this is the size of a pointer. Otherwise, the size of the fixed length column. */
uint32_t fixed_length_;
- Tuple 的构造过程
Tuple::Tuple(std::vector<Value> values, const Schema *schema) {
assert(values.size() == schema->GetColumnCount());
// 1. Calculate the size of the tuple.
uint32_t tuple_size = schema->GetLength(); // 获取固定大小类型的列的长度(非VARCHAR 类型,字节数)
for (auto &i : schema->GetUnlinedColumns()) { // 遍历所有 VARCHAR 类型数据
tuple_size += (values[i].GetLength() + sizeof(uint32_t)); // 统计所有非 VARCHAR 类型占用字节数,每个VARCHAR 类型需要uint32_t大小表示其长度
}
// 2. Allocate memory.
size_ = tuple_size;
data_ = new char[size_]; // 每个 tuple 所有数据都存储于 data_ 中
std::memset(data_, 0, size_);
// 3. Serialize each attribute based on the input value.
uint32_t column_count = schema->GetColumnCount();
uint32_t offset = schema->GetLength(); // 总的固定部分长度
for (uint32_t i = 0; i < column_count; i++) {
const auto &col = schema->GetColumn(i);
if (!col.IsInlined()) { // 如果是可变长度
// 对于可变数据而已,col.GetOffset() 不是真实数据位置,而是偏移量
// 设置该位置可变数据的真实偏移量
*reinterpret_cast<uint32_t *>(data_ + col.GetOffset()) = offset; // 前 uint32_t 大小指示可变数据偏移量
// Serialize varchar value, in place (size+data).
values[i].SerializeTo(data_ + offset);
// 空出 unint32_t 大小保存数据长度,因此 offset 多加上 uint32_t 大小
offset += (values[i].GetLength() + sizeof(uint32_t));
}
else {
values[i].SerializeTo(data_ + col.GetOffset());
}
}
}
// 对可变长度数据持久化, varlen_type.cpp
void VarlenType::SerializeTo(const Value &val, char *storage) const {
uint32_t len = GetLength(val); // 计算真实数据长度
if (len == BUSTUB_VALUE_NULL) {
memcpy(storage, &len, sizeof(uint32_t));
return;
}
memcpy(storage, &len, sizeof(uint32_t)); // 先写入长度
memcpy(storage + sizeof(uint32_t), val.value_.varlen_, len); // 再写入真实数据
}
// 对非可变长度数据的持久化(Integer 类型为例)integer_type.cpp
void IntegerType::SerializeTo(const Value &val, char *storage) const {
*reinterpret_cast<int32_t *>(storage) = val.value_.integer_;
}
table_page
table_heap
- 概述
- 用于把 table_page 以双向链表的形式组织起来
- 把 table_page 的操作进一步封装起来,使得根据 RID 即可完成增删改查操作
- 使用方式
- 首先根据 RID 获取页面编号,然后取出相应页(table_page)
- 调用 table_page 的方法实现具体的操作
- InsertTuple
- 从 first_page_id_ 对应的页面开始,从前往后找出第一个可以容纳待插入 tuple 的页面
- 如果不存在页面,则调用 BufferPoolManager 创建新的页面,然后调用 Init 方法 初始化页面
- 接着把 tuple 插入页面,并把页面加入双向链表
table_iterator
- 概述
- tuple 的迭代器,用于对table_heap中以链表组织的 table_page 中的 tuple 进行迭代遍历
- 主要实现了以下重载方法
- operator*
- operator->
- operator++
- operator==
- operator!=
- operator=
- 注意 operator++ 包括前置++ 和后置++ 两种重载实现方式
PlanNode & Executor
- AbstractPlanNode
- AbstractExecutor
- 每种节点类型都有自己的执行器,这些执行器继承自该类
- 子类包括
- AggregationExecutor
- DeleteExecutor
- DistinctExecutor
- HashJoinExecutor
- IndexScanExecutor
- InsertExecutor
- LimitExecutor
- NestIndexJoinExecutor
- NestedLoopJoinExecutor
- SeqScanExecutor
- UpdateExecutor
- ExecutorFactory
- 内部定义
CreateExecutor 函数,根据传入AbstractPlanNode 类型动态创建执行器实例 static std::unique_ptr<AbstractExecutor> CreateExecutor(ExecutorContext *exec_ctx, const AbstractPlanNode *plan);
- 通过
dynamic_cast 把 AbstractPlanNode 转换为子类型switch(plan->GetType()) return std::make_unique<SeqScanExecutor>(exec_ctx, dynamic_cast<const SeqScanPlanNode *>(plan));
- ExecutorContext
- 记录当前执行上下文环境,具体包括
- Transaction
- Catalog
- BufferPoolManager
- TransactionManager
- ExecutionEngine
- 定义执行方法,对当前计划节点执行一次
Init 和 Next 方法Execute(const AbstractPlanNode *plan, std::vector<Tuple> *result_set, Transaction *txn,
ExecutorContext *exec_ctx)
Index 实现原理
- GenericKey
- GenericComparator
- 采用函数对象方式实现的泛化类型比较器
- 定义成员变量
Schema *key_schema_ ,用于记录数据模式 - 比较数据时,根据数据模式
key_schema 从左往右按列比较,直到第一个不相同的列,根据大小返回相应结果(如果所有列都相同则返回 0 表示两个 GenericKey 相同) - ExtendiableHashIndex
- 内部封装 ExtendiableHash 类作为索引数据结构,命名为 container_
- 内部定义以下三个函数,注意其 key 均为
Tuple 类型,内只需调用哈希数据结构 container_ 实现即可(Project2实现的内容)void InsertEntry(const Tuple &key, RID rid, Transaction *transaction) override;
void DeleteEntry(const Tuple &key, RID rid, Transaction *transaction) override;
void ScanKey(const Tuple &key, std::vector<RID> *result, Transaction *transaction) override;
- Hash Function
- 调用第三方库 MurmurHash3实现
virtual uint64_t GetHash(KeyType key) {
uint64_t hash[2];
murmur3::MurmurHash3_x64_128(reinterpret_cast<const void *>(&key), static_cast<int>(sizeof(KeyType)), 0,
reinterpret_cast<void *>(&hash));
return hash[0];
}
- Hash 索引创建流程
- 在某个表中选几个列(1 or n 列)构建索引,选中的列构成
key_schema ,记录了索引模式信息 - 根据选中的列模式
key_schema 和 key_attrs (索引列在表元组中的列索引号) 构建新的元组,这个新元组是由索引的列构成的,记作 key_tuple GenericKey 调用SetFromKey方法接收 key_tuple 作为参数,创建GenericKey 实例- ExtendiableHashIndex 类调用
InsertEntry 方法,接收上一步创建的 GenericKey 实例作为参数,经过 Hash Function 插入到 container_ 中,实现索引的创建 - Index & IndexMeta &IndexInfo
- 这几个类都用于维护索引的元数据信息,例如索引名、索引
key_schema 、索引所在表名等 - IndexMeta 维护了索引的元数据信息
- Index 作为所有索引的基类,维护了访问
IndexMeta 的方法,采用其他数据结构时都需要继承自该类,Index 定义了成员变量 std::unique_ptr<IndexMetadata> metadata_ 用于记录索引的元数据信息 - Index 也声明了子类索引需要实现的
InsertEntry ,DeleteEntry ,ScanKey - IndexInfo 维护了更高级的索引信息,持有
Index 类型的成员变量
查询语句实现
TASK #1 - EXECUTORS
本文采用的是 Iterator 模型实现的SQL执行器,父节点通过调用子节点的Next 函数每次获取一个Tuple ,调用成功则返回 Tuple 和 RID 否则返回 false。
SeqScanExecutor
- 首先获取整个表的迭代器,迭代器作为整个类的成员变量,第一次调用前指向整个表的开始位置,然后每次调用后移动迭代器到下一个元组
bool SeqScanExecutor::Next(Tuple *tuple, RID *rid) {
// 执行选择操作
while (iter_ != table_info_->table_->End() && plan_->GetPredicate() != nullptr &&
!plan_->GetPredicate()->Evaluate(&(*iter_), &table_info_->schema_).GetAs<bool>()) {
iter_++;
}
// 执行投影操作,返回 RID 是为了能找到磁盘中的投影之前的原始数据
if (iter_ != table_info_->table_->End()) {
std::vector<Value> values;
for (size_t i = 0; i < plan_->OutputSchema()->GetColumnCount(); ++i) {
values.emplace_back(plan_->OutputSchema()->GetColumn(i).GetExpr()->Evaluate(&(*iter_), &(table_info_->schema_)));
}
*tuple = Tuple(values, plan_->OutputSchema()); // 投影后的元组
*rid = iter_->GetRid();
// READ_UNCOMMITTED 没有加锁,不需要解锁 REPEATED_READ , READ_COMMITTED 需要等待提交才可以被其他事务读取
// 因此,如果在 READ_COMMITTED 级别下,写锁不能立即释放
iter_++;
return true;
}
return false;
}
HashJoinExecutor
- Join 的时候调用的是
Expression 的 EvaluateJoin 方法,根据表达式tuple_idx 确定选择左右表,根据col_idx 选择具体列值,Aggregation调用的是Expression 的 EvaluateAggregation 方法 HashJoin 使用外表的某些列组合值作为key ,构建hash 表(可以用 unordered_set or unordered_map 实现),然后遍历内表的每个 tuple 获取相应列值,然后查找 hash 表,获取匹配列,需要注意的是,set.find(key) != set.end() 时表示一定存在该 key,否则一定不存在。但是,只能确定一定存在该key,而 set[key] 这个桶里面可能会因为键冲突存在其他键的情形,因而需要比较具体的 key 值来过滤掉这些元组。
- 根据外表构建 hash 桶
while (left_executor_->Next(&left_tuple, &left_rid)) {
JoinKey cur_key{plan_->LeftJoinKeyExpression()->Evaluate(&left_tuple, left_executor_->GetOutputSchema())};
if (bucket_.find(cur_key) == bucket_.end()) {
bucket_[cur_key] = {left_tuple};
} else {
bucket_[cur_key].emplace_back(left_tuple);
}
}
- 查找
bool HashJoinExecutor::Next(Tuple *tuple, RID *rid) {
RID right_rid{};
JoinKey right_key;
// -1 表示上一个 right_tuple join 完成,而非 -1 则表示上一个 right_tupe 未完成
// 因为火山模型每次只取一组 join 成功的元组,因此需要记录上次取完后的状态,例如 同一个 right_key 可能对应多个
// left_key,每次调用 Next 只取出一个 left_tuple, 实际上由 right_key 匹配的桶中可能有多个 left_tuple
// 都能使用,因此需要用 last_idx 标记 上一次匹配成功之后,遍历到映射桶(mapped_bucket) 的位置,由于 hash
// 映射存在冲突,一个桶中可能存在多个与 right_key 不相同的 left_tuple 因此 last_idx
// 只能保证从上一次成功取出的位置开始遍历,不能保证每次调用 Next 时 last_idx 位置都是和 right_key 匹配的 left_tuple
if (last_idx_ != -1) {
// right_tuple_ 是上一次能成功 join 的内表元组,last_idx != -1 表示桶内可能还有其他 left_tuple 能够 join 成功
right_key.val_ = plan_->RightJoinKeyExpression()->Evaluate(&right_tuple_, right_executor_->GetOutputSchema());
}
if (last_idx_ == -1 || bucket_.find(right_key) == bucket_.end() ||
last_idx_ == static_cast<int32_t>(bucket_[right_key].size())) {
while (true) {
if (right_executor_->Next(&right_tuple_, &right_rid)) { // 从内表取出一个新元组
right_key.val_ = plan_->RightJoinKeyExpression()->Evaluate(&right_tuple_, right_executor_->GetOutputSchema());
if (bucket_.find(right_key) != bucket_.end()) { // 外表中存在内表 right_key
last_idx_ = 0; // 从该桶第一个位置开始找
break;
}
} else {
return false; // 内表遍历完了
}
}
}
auto mapped_bucked = bucket_[right_key]; // 当前内表的列值映射到的桶,桶内可能因为碰撞存在不同的 key
while (last_idx_ < static_cast<int32_t>(mapped_bucked.size()) &&
!IsTupleValueEqual(&mapped_bucked[last_idx_], right_key.val_)) {
++last_idx_;
}
//如果在内表当前 right_key 经过 hash 映射的桶内所有的对应列(参与join的列)值都不等于 right_key
// 那么递归执行 Next 函数,last_idx 设置为 -1,是为了进入上面的while 循环,取出新的内表元组进行 join
if (last_idx_ == static_cast<int32_t>(mapped_bucked.size())) {
last_idx_ = -1;
return Next(tuple, rid);
}
// .. 省略新元组构建过程
*tuple = Tuple(values, plan_->OutputSchema());
last_idx_++; // 更新当前桶内的数据索引
return true;
}
Aggregation & OrderBy
- 类似 HashJoin 思想,把 OrderBy 相关列作为 key 值,Aggregation 相关列作为 value,使用
unordered_map 实现 hash 表,然后遍历所有元组,根据每一列的聚合类型对所有元组进行聚合操作,最后通过ComparisonExpression 实现 having 操作。 - 构建 hash 表,聚合操作
while (child_->Next(&tuple, &rid)) {
// 根据 plan_->GetGroupBys() & plan_->GetAggregates() 从元组中获取对应列分别构成hash 表的 key 和 value
// key 和 value 都是std::vector<Value> 类型,实质上就是根据 group by 指定列 hash 值映射到 aggregate 指定列
aht_.InsertCombine(this->MakeAggregateKey(&tuple), this->MakeAggregateValue(&tuple));
}
- Next 函数
bool AggregationExecutor::Next(Tuple *tuple, RID *rid) {
while (aht_iterator_ != aht_.End()) {
if (plan_->GetHaving() == nullptr ||
plan_->GetHaving()
->EvaluateAggregate(aht_iterator_.Key().group_bys_, aht_iterator_.Val().aggregates_)
.GetAs<bool>()) {
// 根据OutputSchema 每列传入的表达式传入的是 0(group by) or 1(aggregate) 来从 hash 表的 key(group_bys_) 还是
// value(aggregates_) 取值
std::vector<Value> values;
for (const Column &column : plan_->OutputSchema()->GetColumns()) {
values.emplace_back(
column.GetExpr()->EvaluateAggregate(aht_iterator_.Key().group_bys_, aht_iterator_.Val().aggregates_));
}
*tuple = Tuple(values, plan_->OutputSchema());
++aht_iterator_;
return true;
}
++aht_iterator_;
}
return false;
}
Delete & Update
- 调用
TableInfo 的 table_heap 类实现,但是需要注意的是,要更新元组上面的索引 - 以 delete 为例
bool DeleteExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
Tuple del_tuple{};
RID del_rid{};
// 从 SeqScanExecutor 获取满足条件的元组及其 rid,然后根据 rid 获取对应元组的 hash key,接着从所有索引中删除该 key
while (child_executor_->Next(&del_tuple,
&del_rid)) { // 此时 del_tupe 是经过投影的结果,只包含部分列,这里主要是为了获取 del_tup
// 获取完整元组,方便下面获取 tuple 上面的索引列
bool flag = table_info_->table_->GetTuple(del_rid, &del_tuple, GetExecutorContext()->GetTransaction());
BUSTUB_ASSERT(flag, "get tuple to be delete failed");
// 遍历每个索引,删除该元组上建立的所有索引
for (IndexInfo *index_info : indexes_info_) {
auto del_tuple_key = del_tuple.KeyFromTuple(table_info_->schema_, *(index_info->index_->GetKeySchema()),
index_info->index_->GetKeyAttrs());
index_info->index_->DeleteEntry(del_tuple_key, del_rid, GetExecutorContext()->GetTransaction());
}
// 标记删除
table_info_->table_->MarkDelete(del_rid, GetExecutorContext()->GetTransaction());
}
return false;
}
踩坑
PROJECT #4 - CONCURRENCY CONTROL
Task #1 - Lock Manager
基本概念:
本文主要实现一个全局的锁管理器,用于满足不同事物对锁的需求,加锁的粒度为Tuple 粒度,即不同事物在操作同一个Tuple 时候会出现加锁情况,Tuple 作为数据共享资源的基本单位。
锁分类
- SHARED : 共享锁,用于读操作
- EXCLUSIVE : 排他锁,用于写操作
锁设计思路:
- 对每个
Tuple 用RID 唯一标识,维护一个请求队列std::unordered_map<RID, LockRequestQueue> lock_table_; ,队列中用双向链表维护一组请求这个Tuple 的事务信息,包括事务编号,锁分类(SHARED/EXCLUSIVE)以及是否获得授权等信息。(实现上采用嵌套类的方式实现,嵌套在LockManager类内)// 某个 RID 对应的请求队列
class LockRequestQueue {
public: // 嵌套类
std::list<LockRequest> request_queue_;
// for notifying blocked transactions on this rid
std::condition_variable cv_;
// txn_id of an upgrading transaction (if any)
txn_id_t upgrading_ = INVALID_TXN_ID;
// for LockRequestQueue concurrency
std::mutex lrq_latch_;
};
// 请求对象
class LockRequest {
public: // 嵌套类,成员变量及成员函数需要为 public
LockRequest(txn_id_t txn_id, LockMode lock_mode) : txn_id_(txn_id), lock_mode_(lock_mode), granted_(false) {}
txn_id_t txn_id_;
LockMode lock_mode_;
bool granted_;
};
- 不同事务线程需要先根据
RID 获取请求队列的信息,因为多个事务可能并发请求同一个RID 对应的请求队列,如果是读还好,当发现 RID 对应请求队列为空时,会涉及创建新的请求队列操作,这是一个写操作,并发写操作一定要加锁保证正确性,因此需要对整个 LockManager 对象加锁,该锁用于并发对 lock_table_ 的操作 - 多个并发事务对同一个
RID 对应的请求队列进行加锁\释放锁操作时,会涉及队列元素的写操作,因此需要对整个队列加锁std::mutex lrq_latch_; 就是这个作用。
两段锁协议(Two phase locking)
- 事务锁操作分为两个阶段,
GROWING & SHRINKING GROWING 阶段只能获取锁或升级锁,不能释放锁SHRINKING 阶段只能释放锁,不能获取和升级锁
严格两段锁协议(strict two phase locking)
- 在两段锁的基础上加了在
SHRINKING 阶段也不能释放锁,只有提交时才可以释放锁的限制
几种 dirty 行为
- 脏读(dirty read)
- 不可重复读(unrepeatable read)
- 丢失修改 (lost midification)
- 幻读(phantoms)
四个隔离级别,隔离级别依次递增,并发度依次递减
- READ_UNCOMMITED
- 不能加共享锁,可以加写锁
- 四种 dirty 行为都不可避免
- READ_COMMITED
- 需要加读锁和写锁,但是读写完就释放
- 避免脏读和丢失修改,但不可重复读和幻读不能解决
- REPEATED_READ
- 不仅要枷锁,而且需要采用严格的两段锁协议解决
- 严格两段锁协议可以进一步解决不可重复读的问题,但是无法解决幻读
- SERILIZIABLE
LockShared
- 事务申请
SHARED 锁时一定是要在GROWING 阶段,且READ_UNCOMMITED 隔离级别不可申请该锁 - 获取
lock_table 前需要对整个LockManager 对象加锁,防止并发写lock_table - 加锁前需要执行
KillLowerPriorityTxn 进行死锁避免(具体实现和死锁策略有关) - 申请
RID 对应的队列锁,获取队列锁成功后,先把该事务的锁请求加入队列,当满足等待条件时继续执行相关操作,否则调用wait释放锁,等待满足条件被唤醒。需要注意的是,等待期间该事务可能会被优先级更高的事务请求给Abort掉,进入ABORTED 状态。需要在循环结束后判断事务是因为被Abort还是真的满足锁条件而退出循环。
- 注意,如果事务被 Abort 退出的,此时需要删除刚才加入队列请求,否则评测过程会出现超时的情况
// 如果有更高优先级的事务申请锁
while (ExistsHigherPriorityTxn(lrq, txn->GetTransactionId(), LockMode::EXCLUSIVE) &&
txn->GetState() != TransactionState::ABORTED) {
lrq->cv_.wait(lck); // 阻塞自己,并监听条件变量,等待其他事务(同一请求队列的事务)释放锁
}
// 判断事务是因为获得锁还是事务被Abort导致的退出循环
auto iter = GetLockRequestIter(lrq, txn->GetTransactionId());
// 如果是因为被 Abort 导致退出循环
if (txn->GetState() == TransactionState::ABORTED) {
lrq->request_queue_.erase(iter); // 从请求队列中删除这个请求
// 唤醒其他等待(不怕唤醒队列中靠后的,因为有IsCompatibleWithBefore方法,唤醒后需要再次判断循环条件)
lrq->cv_.notify_all();
throw TransactionAbortException(txn->GetTransactionId(),
AbortReason::DEADLOCK); // 由于评分系统原因,这里就随便选个已有的原因抛出了
return false;
}
LockExclusive
- 总体上和 LockShared 实现大体相同,只是在判断优先级上稍有不同
LockUpgrade
- 总体和 LockExclusive 相同,需要先判断锁是否已经授权,是否已经是排他锁
- 需要及时更新
upgrading_ 前后参数状态
Unlock
- Two phase locking 实现上,当隔离级别为
REPEATABLE_READ 时 Unlock时就要把状态设置为 SHRINKING - 及时从请求队列中把对应请求删除掉
Task #2 - Deadlock Prevention
死锁问题
- 对于死锁,总体上可以分为死锁避免和死锁解决策略,死锁避免主要是在申请资源前避免死锁的产生,例如一次性获取所有资源,或者让申请着按照一定的方向申请资源避免死锁的产生,而死锁解决则采用一定的方法定期的对死锁进行检测,然后根据一定策略杀死产生死锁的线程。
- 本文采用的是死锁预防策略。死锁预防策略也有
Wait Die 和 Wound-Wait 两种不同的实现思路,都属于抢占式的方式实现。
Wait Die(Old Waits for Young) :如果请求的事务相比持有锁的事务有着更高的优先级,那么请求的事务等待,否则抢占持有锁的事务,令其Abort 回滚。Wound-Wait(Young Waits for Old) :如果请求事务相比持有锁的事务有着更高的优先级,那么请求事务抢占锁,否则等待。被抢占的事务会被Abort,回滚。 - 优先级判定
- 请求锁的事务如果期望申请的是
SHARED 锁,那么需要满足请求锁的事务的事务id小于持有锁的事务id并且持有锁的事务持有的是共享锁,因为请求事务请求的共享锁可以和已有事务的共享锁同时存在。 - 请求锁的事务如果期望申请的是
EXCLUSIVE 锁,那么只需要满足请求锁的事务的事务id小于持有锁的事务id,因为请求事务请求的共享锁不能和已有事务的任意类型锁同时存在。 - 下面是新事务申请锁之前需要进行死锁避免操作。
// 根据 disallowed_mode 来决定杀死哪些事务锁,如果 disallowed_mode 为SHARED,表示SHARED & EXCLUSIVED 都不能共存,那么事务 id
// 靠后的那些读锁、写锁都要被杀死(不能共存) 如果 disallowed_mode 为 EXCLUSIVED,表示可以存在事务 id 靠后的读锁,而写锁不可以
void LockManager::KillLowerPriorityTxn(LockRequestQueue *lrq, txn_id_t cur_txn_id, LockMode disallowed_mode) {
// auto end_iter = GetLockRequestIter(lrq, cur_txn_id);
for (auto iter = lrq->request_queue_.begin(); iter != lrq->request_queue_.end(); ++iter) {
if ((disallowed_mode == LockMode::SHARED || iter->lock_mode_ == LockMode::EXCLUSIVE) &&
iter->txn_id_ > cur_txn_id &&
TransactionManager::GetTransaction(iter->txn_id_)->GetState() != TransactionState::ABORTED) {
TransactionManager::GetTransaction(iter->txn_id_)->SetState(TransactionState::ABORTED);
iter->granted_ = false;
lrq->cv_.notify_all(); // 被杀掉的可能已经持有锁,释放锁后需要 nofify
}
}
}
对锁的理解
- 锁分类:
加锁的前提是资源是共享的,如果是独占的那就无需加锁了。锁类型有mutex 、shared_mutex 、seamphore 这几类。mutex 一般用于互斥情形,该资源同一时间只能被一个线程占用,shared_mutex 则是c++17引入的可以让多个线程能共享同一互斥的所有权的一种锁类型,需要配合shared_lock 才可以,而shared_mutex 和lock_guard、unique_lock 仍然是独占的.seamphore 则是表示一种资源有一定数量,可以通过P,V 操作改变数量的一种锁。 - 理解锁:
平时说的锁到底是锁什么呢,如何实现线程之间的互斥呢?锁必然需要锁一个东西(对象),如下面代码所示,这个互斥量是对谁加的锁呢? 要回答这个问题,实际上需要看std::mutex latch_; 定义的位置,以LockManager 为例,把latch 定义在LockManager 对象中,那么整个对象就一个互斥对象,多个线程同一时间必然只有一个线程能持有这个互斥量,而其余线程都会在申请获取 latch_ 的时候被阻塞,因为latch_ 是整个LockManager 对象持有的唯一一个互斥对象,当一个线程在任意一个函数内获取该互斥对象时,其他线程在执行该对象的其他方法时,只要涉及获取latch_ 的操作都会被阻塞,scoped_lock 起始就是执行 latch_.lock() ,不过是以一种 RAII 的方式实现锁资源的自动化管理。 可见,锁的本质是需要对一个互斥的对象加锁,让其他需要访问这个对象的线程无法再次获取这个互斥对象,从而实现互斥操作。 问题就显然了,如果LockManager 实例化了多个对象,那么每个对象都持有自己的latch_ ,不同线程在操作不同实例化对象的方法时是不会相互干扰的,因为不同线程想要获取的是不同的 latch_ 尽管他们是同一个名字,因此,如果需要全局互斥的话,单例模式是一个很好的解决方案。 // 定义一个互斥量
std::mutex latch_;
// 使用互斥量完成加锁
void run() {
std::scoped_lock<std::mutex> lck(latch_);
// xxx
// xxx
}
- 如何理解条件变量(condition_variable)?
假如有那么一个情形,某个线程执行某个方法时需要先获取对应的互斥资源latch_ ,如果没有获取成功,那么其必然一直等待。但是获得锁成功就一定要(能)一直执行下去吗?如果发现其需要的某些其他资源没有满足条件而无法继续执行呢?此时该怎么办?直接放弃锁吗?是的需要放弃锁,但是资源获得满足时,什么时候我再来继续获取锁继续执行呢? 这些问题就是 condition_variable 需要解决的。 condition_variable 提供 wait 方法,用于当线程获取锁后,发现某些其他条件没有满足而不能继续执行时(总不能一直持有锁吧,不然其他线程怎么办?),主动放弃持有锁,转而进入等待状态,此时其他申请锁的线程会获取锁执行其他任务,而当其他获取该锁的线程执行完毕后就会调用notify_one & notify_all 方法唤醒之前放弃锁的线程,此时之前放弃锁的线程就会被唤醒,并且重新试图获取锁(因为此时可能有很多其他线程都在 wait 状态),如果获取锁成功,那么继续判断其需要的其他资源是否获得满足,如果满足了,则可以继续往下执行了,否则继续执行上述wait 方法进入等待状态。下面就是本文涉及的条件变量部分,多个事务线程请求同一个RID 对应的请求队列时,可能同时有多个事务线程在申请锁,哪个事务被唤醒获得锁是随机的,然而我们需要的是优先级最高的那个事务获得锁,如何保证呢?被唤醒的线程不是直接执行后续操作,而是需要先判断有无比其优先级更高的事务线程在等待状态(遍历整个请求队列,根据优先级判断),如果有优先级更高的事务线程,那么自己就会主动释放锁从而进入等待状态,直到其他线程调用notify_one & notify_all 方法唤醒。 std::unique_lock<std::mutex> lck(latch_);
std::condition_variable cv_;
while(ExistsHigherPriorityTxn()) {
cv_.wait(lck);
}
对于线程调度的一些思考
- 如果一个线程第一时间没有获得互斥对象的锁,那么它后续如何执行?轮询申请锁?
- 要根据锁类型来决定,自旋锁一般会自选等待,每隔一段时间重新尝试获取锁操作,也有的会执行系统调用阻塞线程,等到锁释放 V 操作执行后唤醒该线程继续尝试获取锁。
- cv_.wait 调用后会触发唤醒其他线程重新发起锁请求吗?
- 其实大可不必担心这个问题,因为假设等待
latch_ 的互斥变量都因为其他原因(例如IO资源)进入 wait 状态,那么这个其他原因(IO)必然是被其他线程占用的,那么其他线程在释放IO资源的时候会调用notify 方法,从而唤醒这里“全部等待IO” 资源的线程,OK 闭环了属于是~
坑点
- Wound-Wait 的实现上面,无论请求授权与否,优先级低的都要被杀掉,否则会超时(因为随机唤醒优先级低的线程会再次进入 wait,反复会超时)
- 不要对在函数内检测同一事务在同一 RID 上是否重复上锁,否则会导致Augograder BasicTest 过不了,这很头疼,找了很久才找到原因,直接触发错误是因为 Unlock 时候从请求队列删除迭代器导致的
Segmentation Fault
Task #3 - Concurrent Query Execution
注意事项
- SeqScanExecutor 需要根据隔离级别判断要不要加锁,加什么锁,读取后也同样考虑解锁问题
if (txn->GetIsolationLevel() != IsolationLevel::READ_UNCOMMITTED && !txn->IsExclusiveLocked(iter_->GetRid()) &&
!txn->IsSharedLocked(iter_->GetRid())) {
GetExecutorContext()->GetLockManager()->LockShared(txn, iter_->GetRid()); // 加读锁
}
// READ_UNCOMMITTED 没有加锁,不需要解锁 REPEATED_READ , READ_COMMITTED 需要等待提交才可以被其他事务读取
// 因此,如果在 READ_COMMITTED 级别下,写锁不能立即释放
if (txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED && !txn->IsExclusiveLocked(iter_->GetRid())) {
GetExecutorContext()->GetLockManager()->Unlock(txn, iter_->GetRid());
}
- DeleteExecutor 之前都要判断是否加锁,是否要升级锁
- DeleteExecutor 不真实删除数据,而是标记删除,这样也方便回滚,提交的时候才会调用
ApplyDelete - 删除时需要记录删除的信息,便于 Abort 后 redo 操作
// 更新前申请写锁,如果已经存在写锁则直接用,否则判断需要升级读锁还是申请写锁
if (!txn->IsExclusiveLocked(del_rid)) {
if (txn->IsSharedLocked(del_rid)) {
exec_ctx_->GetLockManager()->LockUpgrade(txn, del_rid);
} else {
exec_ctx_->GetLockManager()->LockExclusive(txn, del_rid);
}
}
// 把删除操作记录下来,方便 abort 时候 redo
txn->GetIndexWriteSet()->emplace_back(del_rid, table_info_->oid_, WType::DELETE, del_tuple,
index_info->index_oid_, catalog);
- UpdateExecutor 和 DeleteExecutor 类似,需要判断是否升级锁,加锁,并且记录更新信息,便于 redo
- InsertExecutor 这里没有加上
LockManager 对象的锁,而是依赖 table_page 的读写锁
系统如何实现多个事务之间同步互斥
Q & A
cmake
GDB 调试指南
Background
class HashTableBucketPage {
//省略...
private:
// For more on BUCKET_ARRAY_SIZE see storage/page/hash_table_page_defs.h
char occupied_[(BUCKET_ARRAY_SIZE - 1) / 8 + 1];
// 0 if tombstone/brand new (never occupied), 1 otherwise.
char readable_[(BUCKET_ARRAY_SIZE - 1) / 8 + 1];
MappingType array_[0]; // 零长度数组 :occupied_ 和 readable_ 剩下的空间都用于数组
}
-
待调试的代码
- 因为 CMU15445 Busttub 项目本身用的是 GTest,这里为了更好的进行调试,自己新建了 gdb_test 目录,用于自行 gdb debug
int main() {
DiskManager *disk_manager = new DiskManager("test.db");
auto *bpm = new BufferPoolManagerInstance(5, disk_manager);
// get a bucket page from the BufferPoolManager
page_id_t bucket_page_id = INVALID_PAGE_ID;
auto bucket_page = reinterpret_cast<HashTableBucketPage<int, int, IntComparator> *>(
bpm->NewPage(&bucket_page_id, nullptr)->GetData());
// insert a few (key, value) pairs
for (unsigned i = 0; i < 10; i++) {
assert(bucket_page->Insert(i, i, IntComparator()));
}
// check for the inserted pairs
for (unsigned i = 0; i < 10; i++) {
EXPECT_EQ(i, bucket_page->KeyAt(i));
EXPECT_EQ(i, bucket_page->ValueAt(i));
}
// remove a few pairs
for (unsigned i = 0; i < 10; i++) {
if (i % 2 == 1) {
assert(bucket_page->Remove(i, i, IntComparator()));
}
}
// check for the flags
for (unsigned i = 0; i < 15; i++) {
if (i < 10) {
EXPECT_TRUE(bucket_page->IsOccupied(i));
if (i % 2 == 1) {
EXPECT_FALSE(bucket_page->IsReadable(i));
} else {
EXPECT_TRUE(bucket_page->IsReadable(i));
}
} else {
EXPECT_FALSE(bucket_page->IsOccupied(i));
}
}
// try to remove the already-removed pairs
for (unsigned i = 0; i < 10; i++) {
if (i % 2 == 1) {
assert(!bucket_page->Remove(i, i, IntComparator()));
}
}
// unpin the directory page now that we are done
bpm->UnpinPage(bucket_page_id, true, nullptr);
disk_manager->ShutDown();
remove("test.db");
delete disk_manager;
delete bpm;
}
debug 过程
-
进入gdb gdb ./bin/gdb_hash_table_bucket_page_test
-
可以快速 run 一下,结合 gtest(Google Test 框架) 快速定位错误大致位置 -
设置断点并运行 b gdb_hash_table_bucket_page_test:39
r // 运行
-
查看数据值
-
一般查看数据的值用 p <var> 就足够了,但是这里是个 class,并且数据成员是两个 char (字节)类型数组和MappingType 类型零字节数组 -
x/nfu 可以根据内存地址查看数据,可以通过在gdb中 help x 查看具体参数意义
Examine memory: x/FMT ADDRESS. ADDRESS is an expression for the memory address to examine. FMT is a repeat count followed by a format letter and a size letter. Format letters are o(octal), x(hex), d(decimal), u(unsigned decimal), t(binary), f(float), a(address), i(instruction), c(char), s(string) and z(hex, zero padded on the left). Size letters are b(byte), h(halfword), w(word), g(giant, 8 bytes). The specified number of objects of the specified size are printed according to the format. If a negative number is specified, memory is examined backward from the address.
Defaults for format and size letters are those previously used. Default count is 1. Default address is following last thing printed with this command or “print”.
-
查看pair<int,int>类型数组数据
x/4xw bucket_page->array_
0x555555775834: 0x00000000 0x00000000 0x00000001 0x00000001
x/16dw bucket_page->array_
0x555555775834: 0 0 1 1
0x555555775844: 2 2 3 3
0x555555775854: 4 4 5 5
0x555555775864: 6 6 7 7
-
查看 char 类型数组数据
x/16xb bucket_page->readable_
0x5555557757b8: 0xff 0x03 0x00 0x00 0x00 0x00 0x00 0x00
-
根据 readable_ 的结果可以发现,一共插入了 10个元素,并且在代码34-36行对奇数位置对应的 <key,value>移除了,但是实际上并没有成功,说明是 Remove 函数执行有问题,我们可以定位到 Remove 函数进一步查找问题
多线程调试
- 如何进行多线程调试呢?
如果按照默认思路设置断点,然后 run ,此时默认会在主线程中,并且通过 info threads 大概率是看不到异步执行的子线程的因为其执行太快,而且 等待时间就 300ms ,还没反应过来就会结束了。因此,我们需要在产生错误之前的某个地方多等待一段时间(几十分钟,够调试就行),然后重新打断点执行,此时 info threads 就可以看到有个子线程处理 sleep 状态(syscall系统调用),此时主线程就不会因为子线程有问题而终止运行。thread <thread_num> 可以切换到 <thread_num>对应的线程
- 常用命令(RTFM -> Read The Fucking Manual)
- info threads
- thread <thread_num>
- 遇事不决就 help thread
Autograder
-
autograder 使用
- 使用前必须 format lint clang-tiny 检查,否则直接 0 分
-
autograder score 评分问题
-
autograder 底下有日志文件,可以用 notepad 打开,然后全文检索 error 定位错误 -
The autograder failed to execute correctly. Please ensure that your submission is valid. Contact your course staff for help in debugging this issue. Make sure to include a link to this page so that they can help you most effectively.
- 这种是编译错误,因为我在 config.h 中自己定义了 INVALID_FRAME_ID ,而评测机没有这个玩意,所以报错了。
-
Grade Submission Note
GoogleTest
System Config & VSCode Config
-
CentOS 8 更换镜像源 -
CentOS-8 安装开发工具包
-
VS-Code 连接腾讯云主机连不上
最开始能够通过ssh连接上,但是中途销毁过一次云主机,重装系统,因为之前我的windows电脑记住了上次连接系统的finger信息,导致重装系统后连接不上,此时需要本地windows电脑删除 C/User/brave/.ssh/known_hosts里面对应腾讯云主机的finger信息,再次ssh连接即可。
-
.ssh/config 文件的使用方法
- config 文件可以配置当前主机对其他主机的 ssh 连接情况
- 主要参数有 Host和HostName,UserName,其中 Host 可以自定义方便记忆,HostName是目标主机的IP地址,UserName为登录用户名
- shh confi 使用总结
-
配置免密登录服务器 -
Ubuntu 腾讯云服务器默认禁止 root 用户 ssh 登录 -
创建新用户时候如果不指定 -s /bin/bash 会只有 $ 符号,很难受 -
VS-Code 插件无法检索 XHR Failed ,关闭 windows 代理服务器 -
配置vscode 中 windows 到腾讯云免密登录 1、ssh-keygen -t rsa -b 4096
2、mkdir ~/.ssh && cd ~/.ssh
3、vim authorized_keys
复制 windows 内的 id_rsa.pub 内容到 authorized_keys 文件
4、配置remote-ssh:
Host tencent
HostName 43.142.31.159
User zw
IdentityFile C:\Users\brave\.ssh\id_rsa
-
vs-code 通过 ssh 远程安装插件一直 installing 状态
- 默认只能在通过远程主机的 root 账户连接时,才可以正常安装
- 如果想用其他普通用户安装,需要先配置 vscode 免密登录
-
VS-Code installing 一直安装失败
- 采用离线方式安装
- 首先到插件对应仓库下载离线包
- 如果是想安装到 linux 下,则下载其对应版本,然后上传到腾讯云服务器(linux)
- vscoode ssh 登录 linux ,点击EXPLORE 框右侧的三个点 …
- 选择 Install from VSIX 即可安装
-
vscode 修改变量名
- Way 1
- Find All reference ctrl - shift - f12 ,确认所有同名变量都是期望修改的变量
- Rename symbols F2
- 这种方式会直接修改所有引用变量
- Way 2
- 全局查询,ctrl- shit -f 检索所有文件的查询变量
- 逐个修改
-
lunix 压缩文档 zip -r src/include/primer/p0_primer.h <zipfile_name.zip>
unzip -l zipfile_name.zip 可以查看压缩文档包含的内容
-
man 命令显示出错,bad
-
ubuntu 升级 gcc版本 sudo apt-get update
sudo apt-get install gcc-8
sudo apt-get install g++-8
cd /usr/bin
sudo rm gcc g++
sudo ln -s gcc-8 gcc
sudo ln -s g++-8 g++
-
vscode 底部导航栏可以选择编译器类型 gcc/clang -
系统库不支持 scoped_lock
-
c++17 后支持,但是vscode 会给出无法识别信息,实际上是可以通过编译的 -
解决办法:
ctrl+shift+p 输入 C/C++:Edit Configurations(JSON),打开c_cpp_properties.json文件 修改 cStandard 值为 c17 修改 cppStandard 值为c++17
-
此外,ctrl + shift + p ,打开 Preference: Open Settings
- 这里面分为三个环境,分别为 User、Remote、Workspace
- 里面包含很多配置,包括自动不全,设置 cppStandard等
-
REF -
How to enable C++17 support in VSCode C++ Extension -
gtest: where to put gdb breakpoint
Reference
Busttub
C++
|