一、 前篇的疑惑
前篇我们留下了两个问题:
- 为什么WALInsertLock目前只设为8,不设更大?
- 多进程并发复制数据会有什么问题?
第一个问题很直观,主要是第二个。
1. 多进程并发复制数据会有什么问题?
首先我们回顾一下日志写入的两个核心流程:
来考虑这样一种场景:
时间点 | 事务A | 事务B | 1 | 预留空间,获取EndPos_A | | 2 | | 预留空间,获取EndPos_B | 3 | | 将数据全部复制入WAL Buffer | 4 | | 执行commit 此时应该将EndPos_B之前的日志全部刷盘 | 5 | 将数据全部复制入WAL Buffer | |
- 事务A先于事务B预留空间,所以很显然EndPos_B > EndPos_A。
- 紧接着事务B将XLOG复制到WAL Buffer,并执行了commit。
- Commit应该将EndPos_B之前的所有日志都刷入此盘。
- 但是此时,事务A还没来得及将XLOG复制到WAL Buffer。此时在EndPos_B之前的日志是不完整的,当然也就不能直接刷盘,必须等到事务A将相关的日志复制到WAL Buffer才能执行。
- WaitXLogInsertionsToFinish的功能,就是判断指定位置之前是否还有XLOG没有复制到WAL Buffer,如果有就等待,直到这些XLOG完成复制。
所以,在调用XLogWrite进行XLOG刷盘之前,都需要调用WaitXLogInsertionsToFinish。
2. 如何判断XLOG复制是否完成
那它怎么样判断指定位置之前是否还有XLOG没有复制到WAL Buffer? 需要借助前面提到的WALInsertLocks结构体中的insertingAt属性来解决。
一个事务是否能将WAL Buffer中的数据刷入磁盘取决于两个因素:
- 当前没有其他进程持有WALInsertLock(没有并发操作,当然就没有问题)
- 有其他进程持有WALInsertLock,但其insertingAt大于当前事务要刷入的LSN
? ? ? ?回顾一下之前对insertingAt的解释:insertingAt记录当前日志写入WAL Buffer的进展,这个变量会在进程将WAL由内存刷往磁盘时读取,以确认所有对该区域的写入操作已完成。
? ? ? ?所以第二种情况指的就是当前事务要刷入的LSN之前的数据都已经复制完成了(因为其他事务的写入进展都已经大于当前事务了)。那如果小于怎么办,那就像上面说的,要等待,直到这些XLOG数据复制完成。
二、 WaitXLogInsertionsToFinish函数
/*
* 等待WAL写入完成。
* 返回最旧的、仍在进行WAL写入的LSN,在这点之前的所有WAL数据均已写入WAL buffer,可以被刷入磁盘。
* 注意:在进行WAL刷盘之前,务必要调用该函数
*/
static XLogRecPtr
WaitXLogInsertionsToFinish(XLogRecPtr upto)
{
uint64 bytepos;
XLogRecPtr reservedUpto;
XLogRecPtr finishedUpto;
XLogCtlInsert *Insert = &XLogCtl->Insert;
int i;
if (MyProc == NULL)
elog(PANIC, "cannot wait without a PGPROC structure");
/* Read the current insert position,加锁、获取当前写入位置 */
SpinLockAcquire(&Insert->insertpos_lck);
bytepos = Insert->CurrBytePos;
SpinLockRelease(&Insert->insertpos_lck);
reservedUpto = XLogBytePosToEndRecPtr(bytepos);
/*
* No-one should request to flush a piece of WAL that hasn't even been reserved yet. However, it can happen if there is a block with a bogus LSN on disk, for example. XLogFlush checks for that situation and complains, but only after the flush.
* Here we just assume that to mean that all WAL that has been reserved needs to be finished. In this corner-case, the return value can be smaller than 'upto' argument.
* 这里我们只假设所有已预留的空间都需要刷入磁盘。在这种情况下,函数返回值可以比upto参数更小。
*/
if (upto > reservedUpto)
{
ereport(LOG,
(errmsg("request to flush past end of generated WAL; request %X/%X, current position %X/%X",
LSN_FORMAT_ARGS(upto), LSN_FORMAT_ARGS(reservedUpto))));
upto = reservedUpto;
}
/*
* Loop through all the locks, sleeping on any in-progress insert older than 'upto'.
* finishedUpto is our return value, indicating the point upto which all the WAL insertions have been finished. Initialize it to the head of reserved WAL, and as we iterate through the insertion locks, back it out for any insertion that's still in progress.
* 循环所有WALInsertLock,如果有正在进行的、insertingAt值小于当前待刷入LSN的写入操作,则sleep(等待)。
* finishedUpto是我们的返回值,标志着这一点之前所有的WAL已经写入完毕。初始化该值为reservedUpto,然后循环迭代WALInsertLock,遇到任意还在执行的WAL写入时返回。
*/
finishedUpto = reservedUpto;
for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
{
XLogRecPtr insertingat = InvalidXLogRecPtr;
do
{
/*
* See if this insertion is in progress. LWLockWaitForVar will wait for the lock to be released, or for the 'value' to be set by a LWLockUpdateVar call.
* 看是否有写入操作正在进行,LWLockWaitForVar函数用于等待锁释放,或者用于调用LWLockUpdateVar函数设置的值
*/
if (LWLockWaitForVar(&WALInsertLocks[i].l.lock,
&WALInsertLocks[i].l.insertingAt,
insertingat, &insertingat))
{
/* the lock was free, so no insertion in progress */
insertingat = InvalidXLogRecPtr;
break;
}
/*
* This insertion is still in progress. Have to wait, unless the inserter has proceeded past 'upto'. 这就是前面说的,如果insertingAt小于当前事务要刷入的LSN,则必须等待,并且调整finishedUpto的值。
*/
} while (insertingat < upto);
if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
finishedUpto = insertingat;
}
return finishedUpto;
}
参考
PostgreSQL重启恢复---Log Buffer_obvious__的博客-CSDN博客
https://zhuanlan.zhihu.com/p/166413747
PostgreSQL重启恢复---XLOG 2.0_obvious__的博客-CSDN博客
|