From 2e6d04df7b0c0b9a4f0d3346a24e6e94f87d7360 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 20 Jun 2019 17:23:12 +0200 Subject: [PATCH] Refactor ExecuteModifyTasksSequentially. --- .../executor/multi_router_executor.c | 105 +++++++----------- .../distributed/multi_router_executor.h | 4 + 2 files changed, 45 insertions(+), 64 deletions(-) diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index ca6ff8f5d..070809b1c 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -91,8 +91,6 @@ bool SortReturning = false; /* functions needed during run phase */ static void AcquireMetadataLocks(List *taskList); -static ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, - ShardPlacementAccessType accessType); static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation, bool alwaysThrowErrorOnFailure, bool expectResults); @@ -732,59 +730,13 @@ RouterSequentialModifyExecScan(CustomScanState *node) bool hasReturning = distributedPlan->hasReturning; Job *workerJob = distributedPlan->workerJob; List *taskList = workerJob->taskList; - ListCell *taskCell = NULL; - bool multipleTasks = list_length(taskList) > 1; EState *executorState = ScanStateGetExecutorState(scanState); - bool taskListRequires2PC = TaskListRequires2PC(taskList); - bool alwaysThrowErrorOnFailure = false; CmdType operation = scanState->distributedPlan->operation; Assert(!scanState->finishedRemoteScan); - /* - * We could naturally handle function-based transactions (i.e. those using - * PL/pgSQL or similar) by checking the type of queryDesc->dest, but some - * customers already use functions that touch multiple shards from within - * a function, so we'll ignore functions for now. - */ - if (IsMultiStatementTransaction() || multipleTasks || taskListRequires2PC) - { - BeginOrContinueCoordinatedTransaction(); - - /* - * Although using two phase commit protocol is an independent decision than - * failing on any error, we prefer to couple them. Our motivation is that - * the failures are rare, and we prefer to avoid marking placements invalid - * in case of failures. - * - * For reference tables, we always set alwaysThrowErrorOnFailure since we - * absolutely want to avoid marking any placements invalid. - * - * We also cannot handle failures when there is RETURNING and there are more - * than one task to execute. - */ - if (taskListRequires2PC) - { - CoordinatedTransactionUse2PC(); - - alwaysThrowErrorOnFailure = true; - } - else if (multipleTasks && hasReturning) - { - alwaysThrowErrorOnFailure = true; - } - } - - - foreach(taskCell, taskList) - { - Task *task = (Task *) lfirst(taskCell); - bool expectResults = (hasReturning || task->relationRowLockList != NIL); - - executorState->es_processed += - ExecuteSingleModifyTask(scanState, task, operation, - alwaysThrowErrorOnFailure, expectResults); - } + executorState->es_processed += + ExecuteModifyTasksSequentially(scanState, taskList, operation, hasReturning); } @@ -1056,7 +1008,7 @@ BuildPlacementAccessList(int32 groupId, List *relationShardList, * CreatePlacementAccess returns a new ShardPlacementAccess for the given placement * and access type. */ -static ShardPlacementAccess * +ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType) { ShardPlacementAccess *placementAccess = NULL; @@ -1379,7 +1331,18 @@ ExecuteModifyTasksWithoutResults(List *taskList) /* - * ExecuteModifyTasksSequentiallyWithoutResults basically calls ExecuteSingleModifyTask in + * ExecuteModifyTasksSequentiallyWithoutResults calls ExecuteModifyTasksSequentially + * and ignores the results. + */ +int64 +ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) +{ + return ExecuteModifyTasksSequentially(NULL, taskList, operation, false); +} + + +/* + * ExecuteModifyTasksSequentially basically calls ExecuteSingleModifyTask in * a loop in order to simulate sequential execution of a list of tasks. Useful * in cases where issuing commands in parallel before waiting for results could * result in deadlocks (such as foreign key creation to reference tables). @@ -1388,43 +1351,57 @@ ExecuteModifyTasksWithoutResults(List *taskList) * returns 0. */ int64 -ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) +ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, + CmdType operation, bool hasReturning) { ListCell *taskCell = NULL; bool multipleTasks = list_length(taskList) > 1; - bool expectResults = false; int64 affectedTupleCount = 0; - bool alwaysThrowErrorOnFailure = true; + bool alwaysThrowErrorOnFailure = false; bool taskListRequires2PC = TaskListRequires2PC(taskList); - /* decide on whether to use coordinated transaction and 2PC */ if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE) { + alwaysThrowErrorOnFailure = true; + /* we don't run CREATE INDEX CONCURRENTLY in a distributed transaction */ } - else if (IsMultiStatementTransaction() || multipleTasks) + else if (IsMultiStatementTransaction() || multipleTasks || taskListRequires2PC) { BeginOrContinueCoordinatedTransaction(); + /* + * Although using two phase commit protocol is an independent decision than + * failing on any error, we prefer to couple them. Our motivation is that + * the failures are rare, and we prefer to avoid marking placements invalid + * in case of failures. + * + * For reference tables, we always set alwaysThrowErrorOnFailure since we + * absolutely want to avoid marking any placements invalid. + * + * We also cannot handle failures when there is RETURNING and there are more + * than one task to execute. + */ if (taskListRequires2PC) { CoordinatedTransactionUse2PC(); + + alwaysThrowErrorOnFailure = true; + } + else if (multipleTasks && hasReturning) + { + alwaysThrowErrorOnFailure = true; } - } - else if (!multipleTasks && taskListRequires2PC) - { - /* DDL on a reference table should also use 2PC */ - BeginOrContinueCoordinatedTransaction(); - CoordinatedTransactionUse2PC(); } /* now that we've decided on the transaction status, execute the tasks */ foreach(taskCell, taskList) { Task *task = (Task *) lfirst(taskCell); + bool expectResults = (hasReturning || task->relationRowLockList != NIL); affectedTupleCount += - ExecuteSingleModifyTask(NULL, task, operation, alwaysThrowErrorOnFailure, + ExecuteSingleModifyTask(scanState, task, operation, alwaysThrowErrorOnFailure, expectResults); } diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index 10417c8ef..17e7eb9c0 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -45,9 +45,13 @@ extern TupleTableSlot * RouterModifyExecScan(CustomScanState *node); extern void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults); +int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, + CmdType operation, bool hasReturning); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation); +extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, + ShardPlacementAccessType accessType); /* helper functions */ extern bool TaskListRequires2PC(List *taskList);