概览
PostgreSQL 并行框架提供了一系列方便的函数,支持在插件或内核中直接调用相关函数,启动若干个后台进程进行并行操作。目前,PG 的并行框架主要用来支持并行查询。本文将介绍并行框架的相关功能,分析其核心逻辑和函数。 可能需要了解的前置知识:
- shm_mq 消息队列: PG 内核提供的单生产者单消费者消息队列
- 动态共享内存:简称 dsm, 在数据库运行期间可以动态创建出来的共享内存
为什么需要并行框架
设想这样一个场景:我们运行了一条 SQL,运行这条 SQL 需要再拉起一个 background worker 进行并行查询。为了方便描述,我们将运行这条 SQL 的进程称为主进程,拉起的 background worker 称为从进程。 但是,从进程可能干活的时候出现了 ERROR ,甚至 CRASH。这时候我们需要通知主进程自己遇到了 ERROR,让主进程进行异常处理。 事实上,我们确实可以不依赖于并行框架实现这样的功能,但是这就需要我们自己去完成类似的异常处理机制。在这种情况下,直接使用并行框架会是一个更好的选择。 除此之外,并行框架还提供了其他的一些功能。简要的概括一下:
- 错误消息处理机制:在动态共享内存中存放一个消息队列,如果从进程出现异常,将异常信息放入消息队列中并发送信号,主进程在
CHECK_FOR_INTERRUPTS 时会检测到错误,进行异常处理; - 序列化状态同步:主进程将一些内容序列化表示(如 GUC、快照等),通过动态共享内存传递给从进程;
- 用户自定义内容传递:用户可以将一些自定义的数据放入动态共享内存;
- 安全防护机制:并行框架严格限制只有
只读类型 SQL 才能使用。如果只是使用 SQL 的话,并行框架将是安全的;但是对于内核开发者而言,在其他代码中使用并行框架就需要遵守并行框架的限制。
如何使用并行框架
根据 PG 代码中 README.parallel 介绍,使用并行框架需要遵循如下规范:
EnterParallelMode();
pcxt = CreateParallelContext("library_name", "function_name", nworkers);
shm_toc_estimate_chunk(&pcxt->estimator, size);
shm_toc_estimate_keys(&pcxt->estimator, keys);
InitializeParallelDSM(pcxt);
space = shm_toc_allocate(pcxt->toc, size);
shm_toc_insert(pcxt->toc, key, space);
LaunchParallelWorkers(pcxt);
WaitForParallelWorkersToFinish(pcxt);
DestroyParallelContext(pcxt);
ExitParallelMode();
原理分析
主进程——拉起 background worker
主进程在启动从进程时,会调用 LaunchParallelWorkers 函数,这个函数会调用 RegisterDynamicBackgroundWorker ,此时会给 Postmaster 进程发一个信号,请求 Postmaster 进程启动一个 background 进程,Postmaster 进程收到信号后就会启动一个新后台进程。
接下来我们仔细分析一下 LaunchParallelWorkers 函数的核心代码。
void
LaunchParallelWorkers(ParallelContext *pcxt)
{
if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
return;
BecomeLockGroupLeader();
Assert(pcxt->seg != NULL);
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
memset(&worker, 0, sizeof(worker));
snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
MyProcPid);
snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
worker.bgw_flags =
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
| BGWORKER_CLASS_PARALLEL;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
sprintf(worker.bgw_library_name, "postgres");
sprintf(worker.bgw_function_name, "ParallelWorkerMain");
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
worker.bgw_notify_pid = MyProcPid;
for (i = 0; i < pcxt->nworkers_to_launch; ++i)
{
memcpy(worker.bgw_extra, &i, sizeof(int));
if (!any_registrations_failed &&
RegisterDynamicBackgroundWorker(&worker,
&pcxt->worker[i].bgwhandle))
{
shm_mq_set_handle(pcxt->worker[i].error_mqh,
pcxt->worker[i].bgwhandle);
pcxt->nworkers_launched++;
}
else
{
any_registrations_failed = true;
pcxt->worker[i].bgwhandle = NULL;
shm_mq_detach(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
}
}
if (pcxt->nworkers_launched > 0)
{
pcxt->known_attached_workers =
palloc0(sizeof(bool) * pcxt->nworkers_launched);
pcxt->nknown_attached_workers = 0;
}
MemoryContextSwitchTo(oldcontext);
}
从进程入口——ParallelWorkerMain
ParallelWorkerMain 可以理解为从进程的入口函数,其函数调用栈为 PostmasterMain->ServerLoop->maybe_start_bgworkers->StartBackgroundWorker->ParallelWorkerMain 。而用户定义的从进程 main 函数,将在 ParallelWorkerMain 中被调用。 该函数主要就是从动态共享内存中读取一些已经序列化的主进程信息(GUC、Snapshot),成为 Lock Group Member (对应上文主进程是 Leader),然后连进数据库,通过消息队列发消息告诉主进程自己已经成功启动。
异常处理机制
回到一开始的问题,如果主进程拉起 n 个 background worker 进程进行并行查询。这时候如果其中的一个出现了错误,应该如何通知主进程呢?注意,这 n 个从进程可能不是一次拉起的,可能第一次拉几个,第二次再拉几个,对应多个 ParallelContext (下文简称 pcxt)。
在上文中提到,动态共享内存中存放了一个消息队列,如果从进程出现异常,将异常信息放入消息队列中并发送信号,主进程在 CHECK_FOR_INTERRUPTS 时会检测到错误,进行异常处理。
事实上,主进程会维护一个 pcxt_list ,对于每一个从进程,将每一个从进程的信息存放在 pcxt_list 中。在调用 CHECK_FOR_INTERRUPTS 时,其内部会调用 HandleParallelMessages 函数。如果收到了从进程发来的信号,会遍历 pcxt_list ,如对于每一个 ParallelContext ,都会检查这个 ParallelContext 启动的所有 background worker 对应的错误消息队列,如果有异常信息就从消息队列中读取信息。
总结
本文介绍了 PG 中并行框架的相关概念和设计原理,对于其中的一些细节:如 Lock Group、动态共享内存、shm_mq 消息队列并没有做具体的介绍。这些将在后续的博客中进行详细的分析。
|