IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Postgresql一条建表语句导致逻辑复制槽堆积大量wal日志 -> 正文阅读

[大数据]Postgresql一条建表语句导致逻辑复制槽堆积大量wal日志

先看逻辑架构流程图:
在这里插入图片描述

问题出现在PG_DB2上,现象是CPU使用率升高,IO升高,磁盘空间大幅度增长。

案例复现:

应需求在PG_DB2实例上,抽取一张表的两个字段,所以先本地测试了一下,为了只是看效果,所以只执行了30秒后,取消了,如果要执行完,差不多要2分多钟。

melotall=> \dt+ user_assets_info
                                 List of relations
 Schema |       Name       | Type  |  Owner   | Persistence |  Size   | Description 
--------+------------------+-------+----------+-------------+---------+-------------
 public | user_assets_info | table | melotall | permanent   | 4969 MB | 
(1 row)

melotall=> \timing 
Timing is on.
melotall=> CREATE TABLE user_assets_info_bak AS SELECT user_id, show_money FROM melotall.public.user_assets_info;
^CCancel request sent
ERROR:  canceling statement due to user request
Time: 30173.850 ms (00:30.174)

但是,好戏来了,复制槽堆积了一堆需要解析的wal,所以只是为了故障复现,上面才没有让执行完,要么堆积的wal更多。

melotall=> SELECT slot_name, 
melotall->        pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag, 
melotall->        pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
melotall->        active
melotall-> FROM pg_replication_slots;
        slot_name        | replicationslotlag | confirmedlag | active 
-------------------------+--------------------+--------------+--------
 kk_cdc_melotpay_slot    | 1022 MB            | 1009 MB      | t
 kk_cdc_kkcx_slot        | 1029 MB            | 1019 MB      | t
 kk_cdc_melotlog_slot    | 1026 MB            | 1013 MB      | t
 kk_cdc_melotroom_slot   | 1029 MB            | 1019 MB      | t
 kk_cdc_tshow_slot       | 1022 MB            | 1013 MB      | t
 kk_cdc_kkstable_slot    | 1022 MB            | 1012 MB      | t
 kk_cdc_kkgame_slot      | 1022 MB            | 1012 MB      | t
 kk_cdc_reportrsync_slot | 1026 MB            | 1015 MB      | t
(8 rows)

#和上面查询间隔短短十几秒,就堆积了这么多wal需要解析
melotall=> SELECT slot_name, 
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag, 
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
       active
FROM pg_replication_slots;
        slot_name        | replicationslotlag | confirmedlag | active 
-------------------------+--------------------+--------------+--------
 kk_cdc_melotpay_slot    | 2207 MB            | 1144 MB      | t
 kk_cdc_kkcx_slot        | 2207 MB            | 1052 MB      | t
 kk_cdc_melotlog_slot    | 2207 MB            | 1144 MB      | t
 kk_cdc_melotroom_slot   | 2207 MB            | 1047 MB      | t
 kk_cdc_tshow_slot       | 2207 MB            | 1144 MB      | t
 kk_cdc_kkstable_slot    | 2207 MB            | 1144 MB      | t
 kk_cdc_kkgame_slot      | 2207 MB            | 1121 MB      | t
 kk_cdc_reportrsync_slot | 2207 MB            | 1208 MB      | t
(8 rows)

然后看了下等待事件

melotall=>  select * from pg_stat_activity where state='active';
 datid | datname  |  pid  | leader_pid | usesysid |  usename   |    application_name    |   client_addr   | client_hostname | client_port |         backend_st
art         |          xact_start           |          query_start          |         state_change          | wait_event_type |     wait_event     | state  | 
backend_xid | backend_xmin |                                 query                                 |  backend_type  
-------+----------+-------+------------+----------+------------+------------------------+-----------------+-----------------+-------------+-------------------
------------+-------------------------------+-------------------------------+-------------------------------+-----------------+--------------------+--------+-
------------+--------------+-----------------------------------------------------------------------+----------------
       |          | 27594 |            |    16385 | replicator | standby1               | 192.168.172.201 |                 |       45687 | 2021-11-14 14:08:4
6.144783+08 |                               |                               | 2021-11-14 14:08:46.153733+08 | Activity        | WalSenderMain      | active | 
            |              |                                                                       | walsender
 16394 | melotall | 51359 |            |    16391 | postgres   | psql                   | 192.168.172.67  |                 |       32786 | 2022-05-07 15:06:4
9.754137+08 | 2022-05-07 15:24:23.816814+08 | 2022-05-07 15:24:23.816814+08 | 2022-05-07 15:24:23.816816+08 |                 |                    | active | 
            |   2256169655 | select * from pg_stat_activity where state='active';                  | client backend
 16394 | melotall | 14064 |            |    17195 | wal2json   | PostgreSQL JDBC Driver | 192.168.88.6    |                 |       38394 | 2022-05-02 02:16:2
1.20966+08  |                               | 2022-05-02 02:16:21.223602+08 | 2022-05-02 02:16:21.228487+08 | IO              | ReorderBufferWrite | active | 
            |              | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_kkcx'        | walsender
 16394 | melotall | 14085 |            |    17195 | wal2json   | PostgreSQL JDBC Driver | 192.168.88.15   |                 |       48794 | 2022-05-02 02:16:2
1.475383+08 |                               | 2022-05-02 02:16:21.488744+08 | 2022-05-02 02:16:21.493338+08 | IO              | ReorderBufferWrite | active | 
            |              | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_kkstable'    | walsender
 16394 | melotall | 14069 |            |    17195 | wal2json   | PostgreSQL JDBC Driver | 192.168.88.7    |                 |       34238 | 2022-05-02 02:16:2
1.270096+08 |                               | 2022-05-02 02:16:21.284681+08 | 2022-05-02 02:16:21.289584+08 | IO              | ReorderBufferWrite | active | 
            |              | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_reportrsync' | walsender
 16394 | melotall | 14067 |            |    17195 | wal2json   | PostgreSQL JDBC Driver | 192.168.88.15   |                 |       48786 | 2022-05-02 02:16:2
1.246589+08 |                               | 2022-05-02 02:16:21.259689+08 | 2022-05-02 02:16:21.264259+08 | IO              | ReorderBufferWrite | active | 
            |              | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_melotpay'    | walsender
 16394 | melotall | 14066 |            |    17195 | wal2json   | PostgreSQL JDBC Driver | 192.168.88.4    |                 |       32846 | 2022-05-02 02:16:2
1.225855+08 |                               | 2022-05-02 02:16:21.23843+08  | 2022-05-02 02:16:21.242843+08 | IO              | ReorderBufferWrite | active | 
            |              | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_melotroom'   | walsender
 16394 | melotall | 14063 |            |    17195 | wal2json   | PostgreSQL JDBC Driver | 192.168.88.16   |                 |       34646 | 2022-05-02 02:16:2
1.07723+08  |                               | 2022-05-02 02:16:21.091532+08 | 2022-05-02 02:16:21.096436+08 | IO              | ReorderBufferWrite | active | 
            |              | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_tshow'       | walsender
 16394 | melotall | 64601 |            |    17195 | wal2json   | PostgreSQL JDBC Driver | 192.168.88.3    |                 |       49392 | 2022-05-06 16:49:2
9.173672+08 |                               | 2022-05-06 16:49:29.188201+08 | 2022-05-06 16:49:29.232379+08 | IO              | ReorderBufferWrite | active | 
            |              | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_melotlog'    | walsender
 16394 | melotall | 14083 |            |    17195 | wal2json   | PostgreSQL JDBC Driver | 192.168.88.12   |                 |       57980 | 2022-05-02 02:16:2
1.461445+08 |                               | 2022-05-02 02:16:21.475007+08 | 2022-05-02 02:16:21.480413+08 | IO              | ReorderBufferWrite | active | 
            |              | SELECT COUNT(1) FROM pg_publication WHERE pubname = 'cdc_kkgame'      | walsender

以上可见,都是IO等待类型,等待事件为ReorderBufferWrite,所以导致rds的CPU使用率也在飙升,IO也飙升,如下图
在这里插入图片描述

在这里插入图片描述
因为没有系统权限,数据库层面能查到的信息有限,不过大体也可以定位问题原因了。

首先,逻辑复制和物理复制一样,也是通过walsender解析的,walsender不停地读取WAL日志,会对每一条WAL日志记录都进行解析,将解析出的元组按照事务进行分组(保存进RecorderBuffer),并按照事务的开始时间进行排序。在事务提交时,会对提交事务在RecorderBuffer中的所有信息进行解码,并将解码后的逻辑日志发送给订阅端。

所以目前我有8个cdc,对应有8个slot,也就是有8个walsender,8个walsender都需要读取wal,进行解析,所以占用了8个核的CPU。并且,我们之前的建表操作是大事物,会产生很多wal,并且walsender解析的时候,肯定超过了logical_decoding_work_mem设置,超出后就会把解析内容溢出到磁盘,从而导致IO和CPU都会飙升。

查看相关参数设置:

melotall=> show logical_decoding_work_mem;
 logical_decoding_work_mem 
---------------------------
 64MB
(1 row)

优化建议
类似以上这种填充大量数据的时候,可以先创建表结构,然后再分批插入数据。中间sleep一段时间让walsender 去进行解析。

后续复现的时候,阿里云人员抓取的堆栈信息如下:

#0  0x00007f7b3e97f6e0 in __write_nocancel () from /lib64/libpthread.so.0
#1  0x00000000007b4a28 in ReorderBufferSerializeChange (txn=0x2989da8, txn=0x2989da8, change=0x9601aa8, fd=16, rb=0x2157dc8) at reorderbuffer.c:2653
#2  ReorderBufferSerializeTXN (rb=rb@entry=0x2157dc8, txn=txn@entry=0x2989da8) at reorderbuffer.c:2468
#3  0x00000000007b5a1e in ReorderBufferCheckMemoryLimit (rb=0x2157dc8) at reorderbuffer.c:2392
#4  ReorderBufferQueueChange (rb=0x2157dc8, xid=<optimized out>, lsn=7790948496384, change=change@entry=0x96186f8) at reorderbuffer.c:649
#5  0x00000000007a9c19 in DecodeTruncate (buf=<optimized out>, buf=<optimized out>, ctx=<optimized out>) at decode.c:879
#6  DecodeHeapOp (buf=0x7ffdd4bd8ba0, ctx=0x206d5f8) at decode.c:459
#7  LogicalDecodingProcessRecord (ctx=0x206d5f8, record=<optimized out>) at decode.c:129
#8  0x00000000007d109f in XLogSendLogical () at walsender.c:2961
#9  0x00000000007d4475 in WalSndLoop (send_data=0x7d1050 <XLogSendLogical>) at walsender.c:2341
#10 StartLogicalReplication (cmd=<optimized out>) at walsender.c:1224
#11 exec_replication_command (cmd_string=cmd_string@entry=0x203fae8 "START_REPLICATION SLOT kk_cdc_kkcx_slot LOGICAL 6DE/BEAA8058 (\"proto_version\" '1', \"publication_names\" 'cdc_kkcx')") at walsender.c:1656
#12 0x000000000082957d in PostgresMain (argc=<optimized out>, argv=argv@entry=0x2086f38, dbname=0x2086da8 "melotall", username=<optimized out>) at postgres.c:4447
#13 0x0000000000489dbd in BackendRun (port=<optimized out>, port=<optimized out>) at postmaster.c:4564
#14 BackendStartup (port=0x2078d70) at postmaster.c:4248
#15 ServerLoop () at postmaster.c:1741
#16 0x00000000007945c9 in PostmasterMain (argc=argc@entry=3, argv=argv@entry=0x203a650) at postmaster.c:1414
#17 0x000000000048bd1d in main (argc=3, argv=0x203a650) at main.c:210

可以从src/backend/replication/reorderbuffer.c查找上面堆栈中抓出的函数,主要溢出到磁盘的两个函数ReorderBufferSerializeChange,ReorderBufferSerializeTXN

ReorderBufferSerializeChange如下:

/*
 * Serialize individual change to disk.
 */
static void
ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
							 int fd, ReorderBufferChange *change)
{
	ReorderBufferDiskChange *ondisk;
	Size		sz = sizeof(ReorderBufferDiskChange);

	ReorderBufferSerializeReserve(rb, sz);

	ondisk = (ReorderBufferDiskChange *) rb->outbuf;
	memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));

	switch (change->action)
	{
			/* fall through these, they're all similar enough */
		case REORDER_BUFFER_CHANGE_INSERT:
		case REORDER_BUFFER_CHANGE_UPDATE:
		case REORDER_BUFFER_CHANGE_DELETE:
		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
			{
				char	   *data;
				ReorderBufferTupleBuf *oldtup,
						   *newtup;
				Size		oldlen = 0;
				Size		newlen = 0;

				oldtup = change->data.tp.oldtuple;
				newtup = change->data.tp.newtuple;

				if (oldtup)
				{
					sz += sizeof(HeapTupleData);
					oldlen = oldtup->tuple.t_len;
					sz += oldlen;
				}

				if (newtup)
				{
					sz += sizeof(HeapTupleData);
					newlen = newtup->tuple.t_len;
					sz += newlen;
				}

				/* make sure we have enough space */
				ReorderBufferSerializeReserve(rb, sz);

				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
				/* might have been reallocated above */
				ondisk = (ReorderBufferDiskChange *) rb->outbuf;

				if (oldlen)
				{
					memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
					data += sizeof(HeapTupleData);

					memcpy(data, oldtup->tuple.t_data, oldlen);
					data += oldlen;
				}

				if (newlen)
				{
					memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
					data += sizeof(HeapTupleData);

					memcpy(data, newtup->tuple.t_data, newlen);
					data += newlen;
				}
				break;
			}
		case REORDER_BUFFER_CHANGE_MESSAGE:
			{
				char	   *data;
				Size		prefix_size = strlen(change->data.msg.prefix) + 1;

				sz += prefix_size + change->data.msg.message_size +
					sizeof(Size) + sizeof(Size);
				ReorderBufferSerializeReserve(rb, sz);

				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);

				/* might have been reallocated above */
				ondisk = (ReorderBufferDiskChange *) rb->outbuf;

				/* write the prefix including the size */
				memcpy(data, &prefix_size, sizeof(Size));
				data += sizeof(Size);
				memcpy(data, change->data.msg.prefix,
					   prefix_size);
				data += prefix_size;

				/* write the message including the size */
				memcpy(data, &change->data.msg.message_size, sizeof(Size));
				data += sizeof(Size);
				memcpy(data, change->data.msg.message,
					   change->data.msg.message_size);
				data += change->data.msg.message_size;

				break;
			}
		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
			{
				Snapshot	snap;
				char	   *data;

				snap = change->data.snapshot;

				sz += sizeof(SnapshotData) +
					sizeof(TransactionId) * snap->xcnt +
					sizeof(TransactionId) * snap->subxcnt;

				/* make sure we have enough space */
				ReorderBufferSerializeReserve(rb, sz);
				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
				/* might have been reallocated above */
				ondisk = (ReorderBufferDiskChange *) rb->outbuf;

				memcpy(data, snap, sizeof(SnapshotData));
				data += sizeof(SnapshotData);

				if (snap->xcnt)
				{
					memcpy(data, snap->xip,
						   sizeof(TransactionId) * snap->xcnt);
					data += sizeof(TransactionId) * snap->xcnt;
				}

				if (snap->subxcnt)
				{
					memcpy(data, snap->subxip,
						   sizeof(TransactionId) * snap->subxcnt);
					data += sizeof(TransactionId) * snap->subxcnt;
				}
				break;
			}
		case REORDER_BUFFER_CHANGE_TRUNCATE:
			{
				Size		size;
				char	   *data;

				/* account for the OIDs of truncated relations */
				size = sizeof(Oid) * change->data.truncate.nrelids;
				sz += size;

				/* make sure we have enough space */
				ReorderBufferSerializeReserve(rb, sz);

				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
				/* might have been reallocated above */
				ondisk = (ReorderBufferDiskChange *) rb->outbuf;

				memcpy(data, change->data.truncate.relids, size);
				data += size;

				break;
			}
		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
			/* ReorderBufferChange contains everything important */
			break;
	}

	ondisk->size = sz;

	errno = 0;
	pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
	if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
	{
		int			save_errno = errno;

		CloseTransientFile(fd);

		/* if write didn't set errno, assume problem is no disk space */
		errno = save_errno ? save_errno : ENOSPC;
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not write to data file for XID %u: %m",
						txn->xid)));
	}
	pgstat_report_wait_end();

	/*
	 * Keep the transaction's final_lsn up to date with each change we send to
	 * disk, so that ReorderBufferRestoreCleanup works correctly.  (We used to
	 * only do this on commit and abort records, but that doesn't work if a
	 * system crash leaves a transaction without its abort record).
	 *
	 * Make sure not to move it backwards.
	 */
	if (txn->final_lsn < change->lsn)
		txn->final_lsn = change->lsn;

	Assert(ondisk->change.action == change->action);

ReorderBufferSerializeTXN如下:

/*
 * Spill data of a large transaction (and its subtransactions) to disk.
 */
static void
ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
	dlist_iter	subtxn_i;
	dlist_mutable_iter change_i;
	int			fd = -1;
	XLogSegNo	curOpenSegNo = 0;
	Size		spilled = 0;

	elog(DEBUG2, "spill %u changes in XID %u to disk",
		 (uint32) txn->nentries_mem, txn->xid);

	/* do the same to all child TXs */
	dlist_foreach(subtxn_i, &txn->subtxns)
	{
		ReorderBufferTXN *subtxn;

		subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
		ReorderBufferSerializeTXN(rb, subtxn);
	}

	/* serialize changestream */
	dlist_foreach_modify(change_i, &txn->changes)
	{
		ReorderBufferChange *change;

		change = dlist_container(ReorderBufferChange, node, change_i.cur);

		/*
		 * store in segment in which it belongs by start lsn, don't split over
		 * multiple segments tho
		 */
		if (fd == -1 ||
			!XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
		{
			char		path[MAXPGPATH];

			if (fd != -1)
				CloseTransientFile(fd);

			XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);

			/*
			 * No need to care about TLIs here, only used during a single run,
			 * so each LSN only maps to a specific WAL record.
			 */
			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
										curOpenSegNo);

			/* open segment, create it if necessary */
			fd = OpenTransientFile(path,
								   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);

			if (fd < 0)
				ereport(ERROR,
						(errcode_for_file_access(),
						 errmsg("could not open file \"%s\": %m", path)));
		}

		ReorderBufferSerializeChange(rb, txn, fd, change);
		dlist_delete(&change->node);
		ReorderBufferReturnChange(rb, change);

		spilled++;
	}

	Assert(spilled == txn->nentries_mem);
	Assert(dlist_is_empty(&txn->changes));
	txn->nentries_mem = 0;
	txn->txn_flags |= RBTXN_IS_SERIALIZED;

	if (fd != -1)
		CloseTransientFile(fd);
}

总结:

  1. 通过上述分析后,我们可以对自己的环境改造一下,比如,可以减少cdc使用的复制槽,从而减少写recorderbuffer和解析的压力。
  2. 大的事物,如备份表的时候,或者大量插入数据的时候,可以分批次插入,留给walsender喘息的时间。
  3. 另外要注意,如果当前实例既是订阅端,又是发布端,上游的发布端加入大表的时候,如果需要订阅全量数据,则会copy整张表的数据到当前订阅端,所以,也会影响当前实例的发布,会导致slot延迟,wal堆积。
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-12 16:31:26  更:2022-05-12 16:33:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 5:46:23-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码