From 495b6e9b62215336280886b104546c740338db25 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 25 Apr 2019 12:17:13 +0300 Subject: [PATCH] Refactor Parallel Relation Access Recording Instead of scattering the code around, we move all the logic into a single function. This will help supporting foreign keys to reference tables in the unified executor with a single line of change, just calling this function. --- .../executor/multi_real_time_executor.c | 8 +-- .../executor/multi_router_executor.c | 26 +------ .../transaction/relation_access_tracking.c | 70 ++++++++++++++++++- .../distributed/relation_access_tracking.h | 4 +- 4 files changed, 71 insertions(+), 37 deletions(-) diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 9c77ccf4c..31a634ad3 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -109,18 +109,14 @@ MultiRealTimeExecute(Job *job) BeginOrContinueCoordinatedTransaction(); } + RecordParallelRelationAccessForTaskList(taskList); + /* initialize task execution structures for remote execution */ foreach(taskCell, taskList) { Task *task = (Task *) lfirst(taskCell); TaskExecution *taskExecution = NULL; - /* keep track of multi shard accesses before opening the connections */ - if (MultiShardConnectionType == PARALLEL_CONNECTION) - { - RecordRelationParallelSelectAccessForTask(task); - } - taskExecution = InitTaskExecution(task, EXEC_TASK_CONNECT_START); taskExecutionList = lappend(taskExecutionList, taskExecution); } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index c5259cc6f..d3626d53f 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -1500,31 +1500,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn CoordinatedTransactionUse2PC(); } - /* - * With a similar rationale as above, where we expect all tasks to operate on - * the same relations, we prefer to record relation accesses for the first - * task only. - */ - if (firstTask->taskType == MODIFY_TASK) - { - RecordRelationParallelModifyAccessForTask(firstTask); - - /* - * We prefer to mark with SELECT access as well because for multi shard - * modification queries, the placement access list is always marked with both - * DML and SELECT accesses. - */ - RecordRelationParallelSelectAccessForTask(firstTask); - } - else if (firstTask->taskType == DDL_TASK && - PartitionMethod(firstShardInterval->relationId) != DISTRIBUTE_BY_NONE) - { - /* - * Even single task DDLs hit here, so we'd prefer - * not to record for reference tables. - */ - RecordRelationParallelDDLAccessForTask(firstTask); - } + RecordParallelRelationAccessForTaskList(taskList); if (firstTask->taskType == DDL_TASK || firstTask->taskType == VACUUM_ANALYZE_TASK) { diff --git a/src/backend/distributed/transaction/relation_access_tracking.c b/src/backend/distributed/transaction/relation_access_tracking.c index bd36ecff0..a2e015f93 100644 --- a/src/backend/distributed/transaction/relation_access_tracking.c +++ b/src/backend/distributed/transaction/relation_access_tracking.c @@ -85,6 +85,9 @@ static HTAB *RelationAccessHash; static void RecordRelationAccess(Oid relationId, ShardPlacementAccessType accessType); static void RecordPlacementAccessToCache(Oid relationId, ShardPlacementAccessType accessType); +static void RecordRelationParallelSelectAccessForTask(Task *task); +static void RecordRelationParallelModifyAccessForTask(Task *task); +static void RecordRelationParallelDDLAccessForTask(Task *task); static RelationAccessMode GetRelationAccessMode(Oid relationId, ShardPlacementAccessType accessType); static void RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessType @@ -281,11 +284,72 @@ RecordPlacementAccessToCache(Oid relationId, ShardPlacementAccessType accessType } +/* + * RecordParallelRelationAccessForTaskList gets a task list and records + * the necessary parallel relation accesses for the task list. + * + * This function is used to enforce foreign keys from distributed + * tables to reference tables. + */ +void +RecordParallelRelationAccessForTaskList(List *taskList) +{ + Task *firstTask = NULL; + + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) + { + /* sequential mode prevents parallel access */ + return; + } + + if (list_length(taskList) < 2) + { + /* single shard task doesn't mean parallel access in our definition */ + return; + } + + /* + * Since all the tasks in a task list is expected to operate on the same + * distributed table(s), we only need to process the first task. + */ + firstTask = linitial(taskList); + + if (firstTask->taskType == SQL_TASK) + { + RecordRelationParallelSelectAccessForTask(firstTask); + } + else if (firstTask->taskType == MODIFY_TASK) + { + if (firstTask->rowValuesLists != NIL) + { + /* + * We always run multi-row INSERTs in a sequential + * mode (hard-coded). Thus, we do not mark as parallel + * access even if the prerequisites hold. + */ + } + else + { + /* + * We prefer to mark with all remaining multi-shard modifications + * with both modify and select accesses. + */ + RecordRelationParallelModifyAccessForTask(firstTask); + RecordRelationParallelSelectAccessForTask(firstTask); + } + } + else + { + RecordRelationParallelDDLAccessForTask(firstTask); + } +} + + /* * RecordRelationParallelSelectAccessForTask goes over all the relations * in the relationShardList and records the select access per each table. */ -void +static void RecordRelationParallelSelectAccessForTask(Task *task) { List *relationShardList = NIL; @@ -327,7 +391,7 @@ RecordRelationParallelSelectAccessForTask(Task *task) * where as the subqueries inside the modify query is recorded with select * access. */ -void +static void RecordRelationParallelModifyAccessForTask(Task *task) { List *relationShardList = NULL; @@ -374,7 +438,7 @@ RecordRelationParallelModifyAccessForTask(Task *task) * DDL commands such as foreign key creation. The function also records * the relation that anchorShardId belongs to. */ -void +static void RecordRelationParallelDDLAccessForTask(Task *task) { List *relationShardList = task->relationShardList; diff --git a/src/include/distributed/relation_access_tracking.h b/src/include/distributed/relation_access_tracking.h index e3c330322..32c278194 100644 --- a/src/include/distributed/relation_access_tracking.h +++ b/src/include/distributed/relation_access_tracking.h @@ -31,12 +31,10 @@ extern void AllocateRelationAccessHash(void); extern void ResetRelationAccessHash(void); extern void AssociatePlacementAccessWithRelation(ShardPlacement *placement, ShardPlacementAccessType accessType); +extern void RecordParallelRelationAccessForTaskList(List *taskList); extern void RecordParallelSelectAccess(Oid relationId); -extern void RecordRelationParallelSelectAccessForTask(Task *task); -extern void RecordRelationParallelModifyAccessForTask(Task *task); extern void RecordParallelModifyAccess(Oid relationId); extern void RecordParallelDDLAccess(Oid relationId); -extern void RecordRelationParallelDDLAccessForTask(Task *task); extern RelationAccessMode GetRelationDDLAccessMode(Oid relationId); extern RelationAccessMode GetRelationDMLAccessMode(Oid relationId); extern RelationAccessMode GetRelationSelectAccessMode(Oid relationId);