PostgreSQL Parallel Execution.

  • PostgreSQL 9.6 Parallel Execution 机制介绍
parallel execution是9.6版本中所引入的一个全新的特性,其通过对于底层parallel seq scan的支持来实现对于所谓的parall execution的支持。 在我们进行例如:聚集函数的操作时候,开启parellel特性,可以大幅度的提高执行效率。 因为,其将通常的有一个worker所完成的任务通过分解成数个worker以并行执行的方式来加快语句的执行。 通常通过 set max_parallel_degree 命令来开启并行执行功能。例如:set max_parallel_degree = 10, 那么在在后续语句的执行过程中将可以进行并行执行的操作,分别由10个不同的worker来执行并在所有的worker执行完毕后,进行由gather模块进行结果的汇集。为了能够使得不同worker之间共享某些参数,postgres采用共享内存的方式来进行ipc,即进程间的通信。

对于并行特性的支持涉及到许多的问题,例如:存储引擎对于parallel scan的支持等。但是,在当前的版本中,并非是任何条件下都可以进行parallel执行的。首先,当前版本中只是完成了对于parallel seq scan的执行。由于parall scan一个relation的时候,涉及到许多问题:例如,对于buffer的管理,relation的管理,数据可见性对parallel scan的影响等等,而这些需要一个完全且高效的锁机制来保证在parallel执行的时候的正确性,从而可以高效的利用多核系统所带来的并行执行的红利;否则,一个低效的锁管理机制,必然会低效多核系统所带来的效率的提升。当然,当前系统所支持的仅仅是seq scan,即:当前只是在在heapscan至少实现了相应的parallelheapscan操作;对于indexscan并未在这个版本中支持。相对应,seql scan到parallel seq scan,索引的parallel scan支持要比relation的seq scan涉及更多的问题,例如:在b-tree上的parallel 查找并在查找的过程中,查找的中间状态的维护等等问题。

1.1 shm_toc & shm_toc_entry

首先要介绍的第一个数据结构为: shm_toc, table of content of shared memory。即:共享内存的目录表,在postgresql 9.5中所实现的shared memory的相关部件。其中的shm_toc中的每个shm_toc_entry类型对象保存了系统所构建的shm的入口信息。

shm_catalog
此为 shm table,其中的每一项为shm_toc_entry类型对象,而shm_to_entry对象中为我们所创建的每一个shared memroy的信息:其中包括:shared memory id和大小。

1.2 shm_mq 消息队列

shm_mq为一个single reader和single writer的共享内存队列。 如果,该queue视图attach到一个backend-process时候。可以将队列的指针使用shm_mq_attach函数来完成该项操作。 当我们的消息队列与shm连接上后,我们就可以通过该消息队列发送和接受数据了。
当消息中的数据没有超过消息队列的大小时候,我们之间将该消息返回;但当消息的大小超过队列的大小时候,我们将该消息拷贝到backend 的local buffer中。

当postmaster启动的时候,需要每个cluster调用 dsm_postmaster_startup 函数来启动其相应的共享内存。 而 dsm_postmaster_startup 函数的启动又是由 CreateSharedMemoryAndSemaphores 函数来完成的。 而上述这点我们可以由函数 SubPostmasterMain中可以看出。在函数SubPostmasterMain中,我们在使用InitProcess函数完成进程的初始化后,那么我们就使用 CreateSharedMemoryAndSemaphores 函数来创建相应的共享内存 shm对象。

共享内存由postmaster在启动阶段创建,但是比较有趣的一个现象是,虽然shm是由postmaster创建,但是postmaster并不会直接操作shm对象。 因为,这样可以保证,共享内存的可靠性

增加了: gather 查询计划节点, execParallel模块(执行阶段)。 其中gather查询计划,顾名思义是来收集各个worker执行结果并将这些结果进行汇聚。 并且在parallel.c文件中,postgres定义了进行parallel执行所需要的基础功能,例如:启动多个workers等。

当系统在执行Gather查询计划时候,我们可以看到,当postgres发现当前处于parallel执行模式下,在与其它方式一样完成相应执行时上下文的初始化话,由函数ExecInitParallelPlan来完成对于parallel execution context的初始化。

在进行parallel execution context进行初始化时候,主要完成:(1)将查询计划的序列化,所谓的查询计划的串行化,即:将查询计划进行字符串话,nodeToString操作,因为我们需要在不同的worker中间利用shm对象进行进程间的通信,即:IPC工作。 需要我们将不同的任务分配到不同的worker上进行parallel执行,而这项工作正是通过shm的IPC机制来完成的; (2)设置 parallel执行的入口函数: ParallelQueryMain; (3)创建并初始化 DSM; 那么为什么我们需要创建一个动态内存段(DSM, dynamic shared memory segment) 。每个dsm中保存了一个worker的相关执行上下文信息,同时由于需要在不同的worker直接进行通信。因此,我们为每个worker创建了相应的DSM; 该dsm中的内容包括:(1)LibraryState;(2)GUC;(3)ComboCID;(4)Snapshot, 其中包括当前事务和当前活动的snapshot两类;(5)TransactionState; 为上述的内容创建了相应的DSM后,接下来就是将这些内容序列化到这些DSM中。那为啥需要将这些信息保存到dsm中呢?

因为,在后续的worker中需要使用到啊。原因就这么简单 在我们的执行Gather查询计划时候,即:在ExecGather函数内,由函数 LaunchParallelWorkers 来启动相应的worker。 在 LaunchParallelWorkers 函数内,完成对于BackgroundWorker类型 worker对象的初始化工作后,此时真正的开始启动所创建的worker进程。对于为什么使用进程而非线程 robert haas在pgconf 2016中有提及,还请读者自行谷歌。其中所完成的worker的函数主入口为 ParallelQueryMain。

has_parallel_hazard 需要判断该语句中是否存在着危害parallel执行的语句。
/*
* Check whether a node tree contains parallel hazards. This is used both
* on the entire query tree, to see whether the query can be parallelized at
* all, and also to evaluate whether a particular expression is safe to
* run in a parallel worker. We could separate these concerns into two
* different functions, but there’s enough overlap that it doesn’t seem
* worthwhile.
*/

由相应的walker来遍历相应的语法树来执行相应的操作。来判定是否存在着危害parallel执行的语句或者约束条件。

func_parallel 函数来判定所给的函数是否是parallel函数函数。通过查找syscache中的pg_proc系统表中的数据并检查该函数的parallel标志。

glob->parallelModeNeeded should tell us whether it’s necessary to * impose the parallel mode restrictions, but we don’t actually want to * impose them unless we choose a parallel plan, so that people who * mislabel their functions but don’t use parallelism anyway aren’t * harmed.
这段话表明,当我们对于一个函数,错误的标记其为parallel但其真正的却是不可进行parallel的话,我们对于该函数的标记是属于无害操作的。

同样在函数 pull_up_subqueries_recurse 中,在fromexpr分支中,对于qual子句,添加了deletion_ok 标志来描述我们的条件语句是否可以被去掉(删除掉,当条件语句是一个deletable的语句时候,我们就可以把该语句从查询树中删除掉。)

并在pull_up_subqueries_recurse 函数中在遍历语法树的过程中添加了一个变量 sub_deletion_ok 来描述我们的条件语句是否可以被删除掉。

当经过处理后,我们发现该语法树下仍然存在着相应的子树,则表明下面的子树并未完全被删除。因此,我们可以设置相应的参数 have_undeleted_child = true;

if (lfirst(l) != NULL)
have_undeleted_child = true;

PathTarget 结构代替了原先的TagetList对象。由PathTarget来描述了,查询的目标列的情况。 其将RelOptInfo中的相关的output信息,集中到PathTarget中,而这点我们从该结构的名称中即可以看出。

因为,对于在查询计划中我们所需要的output列的信息并非与我们语句中所给出的output列相一致的。 由函数make_pathtarget_from_tlist来生成相应的 PathTarget结构。 在之前的版本中,只是有tlist来表述并未创建相应的 PathTarget对象。

apply_projection_to_path 函数中,由于是处理输出列的情况,因此我们需要考虑,当前的路径为GatherPath情况。 若当前路径为GatherPath路径时候,如果该路径是可以进行parallel处理时候,我们创建一个ProjectionPath。

同样,为了适应上述的对于output列的修改,我们对 PlannerInfo数据结构进行了修改, 添加了,如下域:struct PathTarget *upper_targets[UPPERREL_FINAL + 1]; 及 List *upper_rels[UPPERREL_FINAL + 1]; 又来描述,上层的基表的RelOptInfo对象。

同样使用,create_upper_paths_hook来支持第三方的,构建上层查询访问路径的函数接口。 if (create_upper_paths_hook)
(*create_upper_paths_hook) (root, UPPERREL_FINAL,
current_rel, final_rel);

create_gather_plan 创建gather类型的查询计划。

LaunchParallelWorkers 创建一个parallel的worker。并创建一个BackgroundWorker对象来完成parallel的执行。 一个后台进程。ParallelWorkerMain 函数为整个worker函数的主函数入口地址。

/*
* pg_(read|write)_barrier – prevent the CPU from reordering memory access
* 避免让cpu对内存访问进行重新排序访问的方式。
* A read barrier must act as a compiler barrier, and in addition must
* guarantee that any loads issued prior to the barrier are completed before
* any loads issued after the barrier. Similarly, a write barrier acts
* as a compiler barrier, and also orders stores. Read and write barriers
* are thus weaker than a full memory barrier, but stronger than a compiler
* barrier. In practice, on machines with strong memory ordering, read and
* write barriers may require nothing more than a compiler barrier.
*/

worker之间由共享内存进行通信,不同worker直接的通信。 parallel_worker_main_type 函数指针,指向真正执行的工作代码。

在执行的的时候, PoratalRun 函数中,最后会执行PortalRunSelect这中的standard_executorRun函数中,当在parallel mode下时候,而该模式是由查询引擎在优化的过程中完成对于parallelModeNeeded参数的设置。 然后进入到EnterParallelMode函数中。 并在该函数中完成对于事务状态中parallelModeLevel参数的设置。

在Gather节点中,我们设置了相应的num_workers大小,用来表明我们是使用多少工作线程来完成该项工作的。 并将该参数在parallelexecution执行状态初始化时候使用。

ExecInitParallelPlan。

SharedExecutorInstrumentation,参数信息描述了,gather的基础信息。因为对于每个parallel worker来说,都有各自独立的执行信息,而这些执行信息描述了当前的worker所能够完成的工作描述。

在worker之间所传输的数据格式为const char*,即:把相应的查询计划节点进行“序列化”,作为参数传递给各个worker,然后由相应的各个worker进行“反序列化”,并根据该查询计划进行由worker来执行。 pstmt_data = ExecSerializePlan(planstate->plan, estate); 函数完成了相应的node类型查询计划节点树到const char*的转换。

并且函数的入口点为:ParallelQueryMain。如果当前的事务隔离级别为serializable,则将worker的数量设置为0; 在完成上述信息设置后,我们构建一个parallelContext对象,并对该parallelContext进行相应的参数设置。

然后由InitializeParallelDSM函数来初始化dynamic shared memory。 需要有个表来记录所创建的shared memory信息。 这样我们就可以由该表中来查询出相应的shared memory信息。 并将guc的信息也保存至 TOC(Table of Content)中。

当我们创建完shared memory和TOC后,我们会将相应的GUC,combo CID,事务的快照和活动的快照,事务的状态等并将这些信息序列化后保存在TOC中,这样有利于各个parallel worker之间的通讯。

并创建相应的错误信息queue,给错误信息 ErrorResponse, NoticeResponse, NotifyResponse。

在完成这些信息的创建后,并完相应的各个worker参数的设置。
for (i = 0; i < pcxt->nworkers; ++i)
{
char *start;
shm_mq *mq;
start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
shm_mq_set_receiver(mq, MyProc);
pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
}

对plannedstmt进行序列化,对paramlist进行序列化。 并在ExecSeqScanInitializeDSM 函数中完成对 ParallelHeapScanDesc 的初始化。

LaunchParallelWorkers,将相应的parallel seq scan进行初始化后,开始启动worker。 在函数LaunchParallelWorkers中,完成对于lock leader的设置,以及 后台BackgroundWorker worker参数的设置 ParallelWorkerMain。 从shared memory中查询出toc中的内容,然后根据这些内容来执行。

RegisterDynamicBackgroundWorker 函数来注册相应的后台工作进程。

HandleParallelMessage 来处理所收到的消息。 由shared memory中收到消息后。