先看逻辑架构流程图:
问题出现在PG_DB2上,现象是CPU使用率升高,IO升高,磁盘空间大幅度增长。
案例复现:
应需求在PG_DB2实例上,抽取一张表的两个字段,所以先本地测试了一下,为了只是看效果,所以只执行了30秒后,取消了,如果要执行完,差不多要2分多钟。
melotall=> \dt+ user_assets_info
List of relations
Schema | Name | Type | Owner | Persistence | Size | Description
public | user_assets_info | table | melotall | permanent | 4969 MB |
(1 row)
melotall=> \timing
Timing is on.
melotall=> CREATE TABLE user_assets_info_bak AS SELECT user_id, show_money FROM melotall.public.user_assets_info;
^CCancel request sent
ERROR: canceling statement due to user request
Time: 30173.850 ms (00:30.174)
但是,好戏来了,复制槽堆积了一堆需要解析的wal,所以只是为了故障复现,上面才没有让执行完,要么堆积的wal更多。
melotall=> SELECT slot_name,
melotall-> pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag,
melotall-> pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
melotall-> active
melotall-> FROM pg_replication_slots;
slot_name | replicationslotlag | confirmedlag | active
kk_cdc_melotpay_slot | 1022 MB | 1009 MB | t
kk_cdc_kkcx_slot | 1029 MB | 1019 MB | t
kk_cdc_melotlog_slot | 1026 MB | 1013 MB | t
kk_cdc_melotroom_slot | 1029 MB | 1019 MB | t
kk_cdc_tshow_slot | 1022 MB | 1013 MB | t
kk_cdc_kkstable_slot | 1022 MB | 1012 MB | t
kk_cdc_kkgame_slot | 1022 MB | 1012 MB | t
kk_cdc_reportrsync_slot | 1026 MB | 1015 MB | t
(8 rows)
melotall=> SELECT slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
active
FROM pg_replication_slots;
slot_name | replicationslotlag | confirmedlag | active
kk_cdc_melotpay_slot | 2207 MB | 1144 MB | t
kk_cdc_kkcx_slot | 2207 MB | 1052 MB | t
kk_cdc_melotlog_slot | 2207 MB | 1144 MB | t
kk_cdc_melotroom_slot | 2207 MB | 1047 MB | t
kk_cdc_tshow_slot | 2207 MB | 1144 MB | t
kk_cdc_kkstable_slot | 2207 MB | 1144 MB | t
kk_cdc_kkgame_slot | 2207 MB | 1121 MB | t
kk_cdc_reportrsync_slot | 2207 MB | 1208 MB | t
(8 rows)
然后看了下等待事件
melotall=> select * from pg_stat_activity where state='active';
datid | datname | pid | leader_pid | usesysid | usename | application_name | client_addr | client_hostname | client_port | backend_st
art | xact_start | query_start | state_change | wait_event_type | wait_event | state |
backend_xid | backend_xmin | query | backend_type
| | 27594 | | 16385 | replicator | standby1 | 192.168.172.201 | | 45687 | 2021-11-14 14:08:4
6.144783+08 | | | 2021-11-14 14:08:46.153733+08 | Activity | WalSenderMain | active |
| | | walsender
16394 | melotall | 51359 | | 16391 | postgres | psql | 192.168.172.67 | | 32786 | 2022-05-07 15:06:4
9.754137+08 | 2022-05-07 15:24:23.816814+08 | 2022-05-07 15:24:23.816814+08 | 2022-05-07 15:24:23.816816+08 | | | active |
| 2256169655 | select * from pg_stat_activity where state='active'; | client backend
16394 | melotall | 14064 | | 17195 | wal2json | PostgreSQL JDBC Driver | 192.168.88.6 | | 38394 | 2022-05-02 02:16:2
1.20966+08 | | 2022-05-02 02:16:21.223602+08 | 2022-05-02 02:16:21.228487+08 | IO | ReorderBufferWrite | active |
| | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_kkcx' | walsender
16394 | melotall | 14085 | | 17195 | wal2json | PostgreSQL JDBC Driver | 192.168.88.15 | | 48794 | 2022-05-02 02:16:2
1.475383+08 | | 2022-05-02 02:16:21.488744+08 | 2022-05-02 02:16:21.493338+08 | IO | ReorderBufferWrite | active |
| | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_kkstable' | walsender
16394 | melotall | 14069 | | 17195 | wal2json | PostgreSQL JDBC Driver | 192.168.88.7 | | 34238 | 2022-05-02 02:16:2
1.270096+08 | | 2022-05-02 02:16:21.284681+08 | 2022-05-02 02:16:21.289584+08 | IO | ReorderBufferWrite | active |
| | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_reportrsync' | walsender
16394 | melotall | 14067 | | 17195 | wal2json | PostgreSQL JDBC Driver | 192.168.88.15 | | 48786 | 2022-05-02 02:16:2
1.246589+08 | | 2022-05-02 02:16:21.259689+08 | 2022-05-02 02:16:21.264259+08 | IO | ReorderBufferWrite | active |
| | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_melotpay' | walsender
16394 | melotall | 14066 | | 17195 | wal2json | PostgreSQL JDBC Driver | 192.168.88.4 | | 32846 | 2022-05-02 02:16:2
1.225855+08 | | 2022-05-02 02:16:21.23843+08 | 2022-05-02 02:16:21.242843+08 | IO | ReorderBufferWrite | active |
| | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_melotroom' | walsender
16394 | melotall | 14063 | | 17195 | wal2json | PostgreSQL JDBC Driver | 192.168.88.16 | | 34646 | 2022-05-02 02:16:2
1.07723+08 | | 2022-05-02 02:16:21.091532+08 | 2022-05-02 02:16:21.096436+08 | IO | ReorderBufferWrite | active |
| | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_tshow' | walsender
16394 | melotall | 64601 | | 17195 | wal2json | PostgreSQL JDBC Driver | 192.168.88.3 | | 49392 | 2022-05-06 16:49:2
9.173672+08 | | 2022-05-06 16:49:29.188201+08 | 2022-05-06 16:49:29.232379+08 | IO | ReorderBufferWrite | active |
| | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_melotlog' | walsender
16394 | melotall | 14083 | | 17195 | wal2json | PostgreSQL JDBC Driver | 192.168.88.12 | | 57980 | 2022-05-02 02:16:2
1.461445+08 | | 2022-05-02 02:16:21.475007+08 | 2022-05-02 02:16:21.480413+08 | IO | ReorderBufferWrite | active |
| | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_kkgame' | walsender
以上可见,都是IO等待类型,等待事件为ReorderBufferWrite,所以导致rds的CPU使用率也在飙升,IO也飙升,如下图
因为没有系统权限,数据库层面能查到的信息有限,不过大体也可以定位问题原因了。
首先,逻辑复制和物理复制一样,也是通过walsender解析的,walsender不停地读取WAL日志,会对每一条WAL日志记录都进行解析,将解析出的元组按照事务进行分组(保存进RecorderBuffer),并按照事务的开始时间进行排序。在事务提交时,会对提交事务在RecorderBuffer中的所有信息进行解码,并将解码后的逻辑日志发送给订阅端。
所以目前我有8个cdc,对应有8个slot,也就是有8个walsender,8个walsender都需要读取wal,进行解析,所以占用了8个核的CPU。并且,我们之前的建表操作是大事物,会产生很多wal,并且walsender解析的时候,肯定超过了logical_decoding_work_mem设置,超出后就会把解析内容溢出到磁盘,从而导致IO和CPU都会飙升。
查看相关参数设置:
melotall=> show logical_decoding_work_mem;
logical_decoding_work_mem
64MB
(1 row)
优化建议 类似以上这种填充大量数据的时候,可以先创建表结构,然后再分批插入数据。中间sleep一段时间让walsender 去进行解析。
后续复现的时候,阿里云人员抓取的堆栈信息如下:
可以从src/backend/replication/reorderbuffer.c查找上面堆栈中抓出的函数,主要溢出到磁盘的两个函数ReorderBufferSerializeChange,ReorderBufferSerializeTXN
ReorderBufferSerializeChange如下:
static void
ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
int fd, ReorderBufferChange *change)
{
ReorderBufferDiskChange *ondisk;
Size sz = sizeof(ReorderBufferDiskChange);
ReorderBufferSerializeReserve(rb, sz);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
case REORDER_BUFFER_CHANGE_UPDATE:
case REORDER_BUFFER_CHANGE_DELETE:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
{
char *data;
ReorderBufferTupleBuf *oldtup,
*newtup;
Size oldlen = 0;
Size newlen = 0;
oldtup = change->data.tp.oldtuple;
newtup = change->data.tp.newtuple;
if (oldtup)
{
sz += sizeof(HeapTupleData);
oldlen = oldtup->tuple.t_len;
sz += oldlen;
}
if (newtup)
{
sz += sizeof(HeapTupleData);
newlen = newtup->tuple.t_len;
sz += newlen;
}
ReorderBufferSerializeReserve(rb, sz);
data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
if (oldlen)
{
memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
memcpy(data, oldtup->tuple.t_data, oldlen);
data += oldlen;
}
if (newlen)
{
memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
memcpy(data, newtup->tuple.t_data, newlen);
data += newlen;
}
break;
}
case REORDER_BUFFER_CHANGE_MESSAGE:
{
char *data;
Size prefix_size = strlen(change->data.msg.prefix) + 1;
sz += prefix_size + change->data.msg.message_size +
sizeof(Size) + sizeof(Size);
ReorderBufferSerializeReserve(rb, sz);
data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
memcpy(data, &prefix_size, sizeof(Size));
data += sizeof(Size);
memcpy(data, change->data.msg.prefix,
prefix_size);
data += prefix_size;
memcpy(data, &change->data.msg.message_size, sizeof(Size));
data += sizeof(Size);
memcpy(data, change->data.msg.message,
change->data.msg.message_size);
data += change->data.msg.message_size;
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
Snapshot snap;
char *data;
snap = change->data.snapshot;
sz += sizeof(SnapshotData) +
sizeof(TransactionId) * snap->xcnt +
sizeof(TransactionId) * snap->subxcnt;
ReorderBufferSerializeReserve(rb, sz);
data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
memcpy(data, snap, sizeof(SnapshotData));
data += sizeof(SnapshotData);
if (snap->xcnt)
{
memcpy(data, snap->xip,
sizeof(TransactionId) * snap->xcnt);
data += sizeof(TransactionId) * snap->xcnt;
}
if (snap->subxcnt)
{
memcpy(data, snap->subxip,
sizeof(TransactionId) * snap->subxcnt);
data += sizeof(TransactionId) * snap->subxcnt;
}
break;
}
case REORDER_BUFFER_CHANGE_TRUNCATE:
{
Size size;
char *data;
size = sizeof(Oid) * change->data.truncate.nrelids;
sz += size;
ReorderBufferSerializeReserve(rb, sz);
data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
memcpy(data, change->data.truncate.relids, size);
data += size;
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
break;
}
ondisk->size = sz;
errno = 0;
pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
{
int save_errno = errno;
CloseTransientFile(fd);
errno = save_errno ? save_errno : ENOSPC;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to data file for XID %u: %m",
txn->xid)));
}
pgstat_report_wait_end();
if (txn->final_lsn < change->lsn)
txn->final_lsn = change->lsn;
Assert(ondisk->change.action == change->action);
ReorderBufferSerializeTXN如下:
static void
ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
dlist_iter subtxn_i;
dlist_mutable_iter change_i;
int fd = -1;
XLogSegNo curOpenSegNo = 0;
Size spilled = 0;
elog(DEBUG2, "spill %u changes in XID %u to disk",
(uint32) txn->nentries_mem, txn->xid);
dlist_foreach(subtxn_i, &txn->subtxns)
{
ReorderBufferTXN *subtxn;
subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
ReorderBufferSerializeTXN(rb, subtxn);
}
dlist_foreach_modify(change_i, &txn->changes)
{
ReorderBufferChange *change;
change = dlist_container(ReorderBufferChange, node, change_i.cur);
if (fd == -1 ||
!XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
{
char path[MAXPGPATH];
if (fd != -1)
CloseTransientFile(fd);
XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
curOpenSegNo);
fd = OpenTransientFile(path,
O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
if (fd < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", path)));
}
ReorderBufferSerializeChange(rb, txn, fd, change);
dlist_delete(&change->node);
ReorderBufferReturnChange(rb, change);
spilled++;
}
Assert(spilled == txn->nentries_mem);
Assert(dlist_is_empty(&txn->changes));
txn->nentries_mem = 0;
txn->txn_flags |= RBTXN_IS_SERIALIZED;
if (fd != -1)
CloseTransientFile(fd);
}
总结:
- 通过上述分析后,我们可以对自己的环境改造一下,比如,可以减少cdc使用的复制槽,从而减少写recorderbuffer和解析的压力。
- 大的事物,如备份表的时候,或者大量插入数据的时候,可以分批次插入,留给walsender喘息的时间。
- 另外要注意,如果当前实例既是订阅端,又是发布端,上游的发布端加入大表的时候,如果需要订阅全量数据,则会copy整张表的数据到当前订阅端,所以,也会影响当前实例的发布,会导致slot延迟,wal堆积。
|