From f4031dd477966ed1a846c18ad526b66b72a90d41 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 11 Dec 2019 12:52:39 +0100 Subject: [PATCH] Clean up transaction block usage logic in adaptive executor --- src/backend/distributed/commands/call.c | 13 +- .../distributed/executor/adaptive_executor.c | 253 ++++++++++++------ .../executor/insert_select_executor.c | 6 +- src/include/distributed/multi_executor.h | 41 ++- 4 files changed, 228 insertions(+), 85 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 73871d297..3cf173860 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -174,9 +174,20 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, task->relationShardList = NIL; task->taskPlacementList = placementList; + /* + * We are delegating the distributed transaction to the worker, so we + * should not run the CALL in a transaction block. + */ + TransactionProperties xactProperties = { + .errorOnAnyFailure = true, + .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_DISALLOWED, + .requires2PC = false + }; + ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task), tupleDesc, tupleStore, hasReturning, - MaxAdaptiveExecutorPoolSize); + MaxAdaptiveExecutorPoolSize, + &xactProperties); while (tuplestore_gettupleslot(tupleStore, true, false, slot)) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 8b4da2e43..e8769bde0 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -242,18 +242,12 @@ typedef struct DistributedExecution */ bool raiseInterrupts; - /* - * Flag to indicate whether the query is running in a distributed - * transaction. - */ - bool isTransaction; + /* transactional properties of the current execution */ + TransactionProperties *transactionProperties; /* indicates whether distributed execution has failed */ bool failed; - /* set to true when we prefer to bail out early */ - bool errorOnAnyFailure; - /* * For SELECT commands or INSERT/UPDATE/DELETE commands with RETURNING, * the total number of rows received from the workers. For @@ -541,7 +535,12 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel ParamListInfo paramListInfo, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, - int targetPoolSize); + int targetPoolSize, + TransactionProperties * + xactProperties); +static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel + modLevel, + List *taskList); static void StartDistributedExecution(DistributedExecution *execution); static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution); static void RunDistributedExecution(DistributedExecution *execution); @@ -556,8 +555,9 @@ static void AcquireExecutorShardLocksForExecution(DistributedExecution *executio static void AdjustDistributedExecutionAfterLocalExecution(DistributedExecution * execution); static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution); +static bool IsMultiShardModification(RowModifyLevel modLevel, List *taskList); static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); -static bool DistributedExecutionRequiresRollback(DistributedExecution *execution); +static bool DistributedExecutionRequiresRollback(List *taskList); static bool TaskListRequires2PC(List *taskList); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); static void AssignTasksToConnections(DistributedExecution *execution); @@ -621,7 +621,6 @@ AdaptiveExecutor(CitusScanState *scanState) bool interTransactions = false; int targetPoolSize = MaxAdaptiveExecutorPoolSize; - Job *job = distributedPlan->workerJob; List *taskList = job->taskList; @@ -646,13 +645,18 @@ AdaptiveExecutor(CitusScanState *scanState) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); + TransactionProperties xactProperties = + DecideTransactionPropertiesForTaskList(distributedPlan->modLevel, taskList); + DistributedExecution *execution = CreateDistributedExecution( - distributedPlan->modLevel, taskList, - distributedPlan-> - hasReturning, paramListInfo, + distributedPlan->modLevel, + taskList, + distributedPlan->hasReturning, + paramListInfo, tupleDescriptor, - scanState-> - tuplestorestate, targetPoolSize); + scanState->tuplestorestate, + targetPoolSize, + &xactProperties); /* * Make sure that we acquire the appropriate locks even if the local tasks @@ -771,8 +775,32 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) Tuplestorestate *tupleStore = NULL; bool hasReturning = false; + TransactionProperties xactProperties = + DecideTransactionPropertiesForTaskList(modLevel, taskList); + return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, - tupleStore, hasReturning, targetPoolSize); + tupleStore, hasReturning, targetPoolSize, + &xactProperties); +} + + +/* + * ExecuteTaskListIntoTupleStore is a proxy to ExecuteTaskListExtended() with defaults + * for some of the arguments. + */ +uint64 +ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, + TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, + bool hasReturning) +{ + int targetPoolSize = MaxAdaptiveExecutorPoolSize; + + TransactionProperties xactProperties = + DecideTransactionPropertiesForTaskList(modLevel, taskList); + + return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, + tupleStore, hasReturning, targetPoolSize, + &xactProperties); } @@ -783,7 +811,8 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, - bool hasReturning, int targetPoolSize) + bool hasReturning, int targetPoolSize, + TransactionProperties *xactProperties) { ParamListInfo paramListInfo = NULL; @@ -800,7 +829,8 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, DistributedExecution *execution = CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo, - tupleDescriptor, tupleStore, targetPoolSize); + tupleDescriptor, tupleStore, targetPoolSize, + xactProperties); StartDistributedExecution(execution); RunDistributedExecution(execution); @@ -817,7 +847,8 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, - Tuplestorestate *tupleStore, int targetPoolSize) + Tuplestorestate *tupleStore, int targetPoolSize, + TransactionProperties *xactProperties) { DistributedExecution *execution = (DistributedExecution *) palloc0(sizeof(DistributedExecution)); @@ -825,6 +856,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu execution->modLevel = modLevel; execution->tasksToExecute = taskList; execution->hasReturning = hasReturning; + execution->transactionProperties = xactProperties; execution->localTaskList = NIL; execution->remoteTaskList = NIL; @@ -873,6 +905,97 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu } +/* + * DecideTransactionPropertiesForTaskList decides whether to use remote transaction + * blocks, whether to use 2PC for the given task list, and whether to error on any + * failure. + * + * Since these decisions have specific dependencies on each other (e.g. 2PC implies + * errorOnAnyFailure, but not the other way around) we keep them in the same place. + */ +static TransactionProperties +DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList) +{ + TransactionProperties xactProperties = { + .errorOnAnyFailure = false, + .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_ALLOWED, + .requires2PC = false + }; + + if (taskList == NIL) + { + /* nothing to do, return defaults */ + return xactProperties; + } + + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE) + { + /* + * We prefer to error on any failures for CREATE INDEX + * CONCURRENTLY or VACUUM//VACUUM ANALYZE (e.g., COMMIT_PROTOCOL_BARE). + */ + xactProperties.errorOnAnyFailure = true; + xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_DISALLOWED; + return xactProperties; + } + + if (LocalExecutionHappened) + { + /* + * In case localExecutionHappened, we force the executor to use 2PC. + * The primary motivation is that at this point we're definitely expanding + * the nodes participated in the transaction. And, by re-generating the + * remote task lists during local query execution, we might prevent the adaptive + * executor to kick-in 2PC (or even start coordinated transaction, that's why + * we prefer adding this check here instead of + * Activate2PCIfModifyingTransactionExpandsToNewNode()). + */ + xactProperties.errorOnAnyFailure = true; + xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED; + xactProperties.requires2PC = true; + return xactProperties; + } + + if (DistributedExecutionRequiresRollback(taskList)) + { + /* transaction blocks are required if the task list needs to roll back */ + xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED; + + if (TaskListRequires2PC(taskList)) + { + /* + * Although using two phase commit protocol is an independent decision than + * failing on any error, we prefer to couple them. Our motivation is that + * the failures are rare, and we prefer to avoid marking placements invalid + * in case of failures. + */ + xactProperties.errorOnAnyFailure = true; + xactProperties.requires2PC = true; + } + else if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC && + IsMultiShardModification(modLevel, taskList)) + { + /* + * Even if we're not using 2PC, we prefer to error out + * on any failures during multi shard modifications/DDLs. + */ + xactProperties.errorOnAnyFailure = true; + } + } + else if (InCoordinatedTransaction()) + { + /* + * If we are already in a coordinated transaction then transaction blocks + * are required even if they are not strictly required for the current + * execution. + */ + xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED; + } + + return xactProperties; +} + + /* * StartDistributedExecution sets up the coordinated transaction and 2PC for * the execution whenever necessary. It also keeps track of parallel relation @@ -882,54 +1005,16 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu void StartDistributedExecution(DistributedExecution *execution) { - List *taskList = execution->tasksToExecute; + TransactionProperties *xactProperties = execution->transactionProperties; - if (MultiShardCommitProtocol != COMMIT_PROTOCOL_BARE) + if (xactProperties->useRemoteTransactionBlocks == TRANSACTION_BLOCKS_REQUIRED) { - /* - * In case localExecutionHappened, we simply force the executor to use 2PC. - * The primary motivation is that at this point we're definitely expanding - * the nodes participated in the transaction. And, by re-generating the - * remote task lists during local query execution, we might prevent the adaptive - * executor to kick-in 2PC (or even start coordinated transaction, that's why - * we prefer adding this check here instead of - * Activate2PCIfModifyingTransactionExpandsToNewNode()). - */ - if (DistributedExecutionRequiresRollback(execution) || LocalExecutionHappened) - { - UseCoordinatedTransaction(); - - if (TaskListRequires2PC(taskList) || LocalExecutionHappened) - { - /* - * Although using two phase commit protocol is an independent decision than - * failing on any error, we prefer to couple them. Our motivation is that - * the failures are rare, and we prefer to avoid marking placements invalid - * in case of failures. - */ - CoordinatedTransactionUse2PC(); - - execution->errorOnAnyFailure = true; - } - else if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC && - list_length(taskList) > 1 && - DistributedExecutionModifiesDatabase(execution)) - { - /* - * Even if we're not using 2PC, we prefer to error out - * on any failures during multi shard modifications/DDLs. - */ - execution->errorOnAnyFailure = true; - } - } + UseCoordinatedTransaction(); } - else + + if (xactProperties->requires2PC) { - /* - * We prefer to error on any failures for CREATE INDEX - * CONCURRENTLY or VACUUM//VACUUM ANALYZE (e.g., COMMIT_PROTOCOL_BARE). - */ - execution->errorOnAnyFailure = true; + CoordinatedTransactionUse2PC(); } /* @@ -943,12 +1028,6 @@ StartDistributedExecution(DistributedExecution *execution) */ AcquireExecutorShardLocksForExecution(execution); - /* - * If the current or previous execution in the current transaction requires - * rollback then we should use transaction blocks. - */ - execution->isTransaction = InCoordinatedTransaction(); - /* * We should not record parallel access if the target pool size is less than 2. * The reason is that we define parallel access as at least two connections @@ -961,7 +1040,7 @@ StartDistributedExecution(DistributedExecution *execution) */ if (execution->targetPoolSize > 1) { - RecordParallelRelationAccessForTaskList(taskList); + RecordParallelRelationAccessForTaskList(execution->tasksToExecute); } } @@ -988,6 +1067,17 @@ DistributedPlanModifiesDatabase(DistributedPlan *plan) } +/* + * IsMultiShardModification returns true if the task list is a modification + * across shards. + */ +static bool +IsMultiShardModification(RowModifyLevel modLevel, List *taskList) +{ + return list_length(taskList) > 1 && TaskListModifiesDatabase(modLevel, taskList); +} + + /* * TaskListModifiesDatabase is a helper function for DistributedExecutionModifiesDatabase and * DistributedPlanModifiesDatabase. @@ -1023,9 +1113,8 @@ TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList) * involved in the distributed execution. */ static bool -DistributedExecutionRequiresRollback(DistributedExecution *execution) +DistributedExecutionRequiresRollback(List *taskList) { - List *taskList = execution->tasksToExecute; int taskCount = list_length(taskList); if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE) @@ -2175,7 +2264,7 @@ CheckConnectionTimeout(WorkerPool *workerPool) * has two different placements, we'd warn the user, fail the pool and continue * with the next placement. */ - if (execution->errorOnAnyFailure || execution->failed) + if (execution->transactionProperties->errorOnAnyFailure || execution->failed) { logLevel = ERROR; } @@ -2424,7 +2513,8 @@ ConnectionStateMachine(WorkerSession *session) * The execution may have failed as a result of WorkerSessionFailed * or WorkerPoolFailed. */ - if (execution->failed || execution->errorOnAnyFailure) + if (execution->failed || + execution->transactionProperties->errorOnAnyFailure) { /* a task has failed due to this connection failure */ ReportConnectionError(connection, ERROR); @@ -2561,12 +2651,12 @@ static bool TransactionModifiedDistributedTable(DistributedExecution *execution) { /* - * We need to explicitly check for isTransaction due to + * We need to explicitly check for a coordinated transaction due to * citus.function_opens_transaction_block flag. When set to false, we * should not be pretending that we're in a coordinated transaction even * if XACT_MODIFICATION_DATA is set. That's why we implemented this workaround. */ - return execution->isTransaction && XactModificationLevel == XACT_MODIFICATION_DATA; + return InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA; } @@ -2578,6 +2668,8 @@ TransactionStateMachine(WorkerSession *session) { WorkerPool *workerPool = session->workerPool; DistributedExecution *execution = workerPool->distributedExecution; + TransactionBlocksUsage useRemoteTransactionBlocks = + execution->transactionProperties->useRemoteTransactionBlocks; MultiConnection *connection = session->connection; RemoteTransaction *transaction = &(connection->remoteTransaction); @@ -2596,7 +2688,7 @@ TransactionStateMachine(WorkerSession *session) { case REMOTE_TRANS_NOT_STARTED: { - if (execution->isTransaction) + if (useRemoteTransactionBlocks == TRANSACTION_BLOCKS_REQUIRED) { /* if we're expanding the nodes in a transaction, use 2PC */ Activate2PCIfModifyingTransactionExpandsToNewNode(session); @@ -2684,7 +2776,7 @@ TransactionStateMachine(WorkerSession *session) UpdateConnectionWaitFlags(session, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); - if (execution->isTransaction) + if (transaction->beginSent) { transaction->transactionState = REMOTE_TRANS_STARTED; } @@ -3404,7 +3496,8 @@ ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution, bool static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution) { - if (!DistributedExecutionModifiesDatabase(execution) || execution->errorOnAnyFailure) + if (!DistributedExecutionModifiesDatabase(execution) || + execution->transactionProperties->errorOnAnyFailure) { /* * Failures that do not modify the database (e.g., mainly SELECTs) should diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index adbac4db1..bcf4d99e1 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -196,9 +196,9 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, prunedTaskList, - tupleDescriptor, scanState->tuplestorestate, - hasReturning, MaxAdaptiveExecutorPoolSize); + ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList, + tupleDescriptor, scanState->tuplestorestate, + hasReturning); if (SortReturning && hasReturning) { diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 37ef2d55b..ce1fc51b3 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -26,6 +26,40 @@ typedef enum SEQUENTIAL_CONNECTION = 1 } MultiShardConnectionTypes; +/* + * TransactionBlocksUsage indicates whether to use remote transaction + * blocks according to one of the following policies: + * - opening a remote transaction is required + * - opening a remote transaction does not matter, so it is allowed but not required. + * - opening a remote transaction is disallowed + */ +typedef enum TransactionBlocksUsage +{ + TRANSACTION_BLOCKS_REQUIRED, + TRANSACTION_BLOCKS_ALLOWED, + TRANSACTION_BLOCKS_DISALLOWED, +} TransactionBlocksUsage; + +/* + * TransactionProperties reflects how we should execute a task list + * given previous commands in the transaction and the type of task list. + */ +typedef struct TransactionProperties +{ + /* if true, any failure on the worker causes the execution to end immediately */ + bool errorOnAnyFailure; + + /* + * Determines whether transaction blocks on workers are required, disallowed, or + * allowed (will use them if already in a coordinated transaction). + */ + TransactionBlocksUsage useRemoteTransactionBlocks; + + /* if true, the current execution requires 2PC to be globally enabled */ + bool requires2PC; +} TransactionProperties; + + extern int MultiShardConnectionType; extern bool WritableStandbyCoordinator; extern bool ForceMaxQueryParallelization; @@ -42,7 +76,12 @@ extern TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState); extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, - bool hasReturning, int targetPoolSize); + bool hasReturning, int targetPoolSize, + TransactionProperties *xactProperties); +extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, + TupleDesc tupleDescriptor, + Tuplestorestate *tupleStore, + bool hasReturning); extern void ExecuteUtilityTaskListWithoutResults(List *taskList); extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize);