From 2829c601dd48dcdbbbf3c748777f0b1b056484fa Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Mon, 16 Dec 2019 10:40:31 +0300 Subject: [PATCH] replace Begin words in coordinated transactions with use (#3293) --- src/backend/distributed/commands/multi_copy.c | 4 +-- src/backend/distributed/commands/truncate.c | 2 +- .../distributed/executor/adaptive_executor.c | 2 +- .../executor/intermediate_results.c | 4 +-- .../distributed/executor/subplan_execution.c | 2 +- .../master/master_delete_protocol.c | 2 +- .../master/master_stage_protocol.c | 2 +- .../distributed/planner/multi_explain.c | 2 +- .../distributed/transaction/backend_data.c | 2 +- .../transaction/transaction_management.c | 36 +++++++------------ .../transaction/worker_transaction.c | 4 +-- src/backend/distributed/utils/resource_lock.c | 2 +- .../distributed/transaction_management.h | 2 +- 13 files changed, 27 insertions(+), 39 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 8cfa56695..1e7ee862d 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -298,7 +298,7 @@ PG_FUNCTION_INFO_V1(citus_text_send_as_jsonb); static void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) { - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); /* disallow COPY to/from file or program except for superusers */ if (copyStatement->filename != NULL && !superuser()) @@ -2251,7 +2251,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, /* keep the table metadata to avoid looking it up for every tuple */ copyDest->tableMetadata = cacheEntry; - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC || MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) diff --git a/src/backend/distributed/commands/truncate.c b/src/backend/distributed/commands/truncate.c index 3ec721b4a..e5fbe938f 100644 --- a/src/backend/distributed/commands/truncate.c +++ b/src/backend/distributed/commands/truncate.c @@ -237,7 +237,7 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode) relationIdList = SortList(relationIdList, CompareOids); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); foreach_oid(relationId, relationIdList) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index edbef8821..8b4da2e43 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -897,7 +897,7 @@ StartDistributedExecution(DistributedExecution *execution) */ if (DistributedExecutionRequiresRollback(execution) || LocalExecutionHappened) { - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); if (TaskListRequires2PC(taskList) || LocalExecutionHappened) { diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index fed7ad6cf..574d8a363 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -122,7 +122,7 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS) * Intermediate results will be stored in a directory that is derived * from the distributed transaction ID. */ - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); List *nodeList = ActivePrimaryWorkerNodeList(NoLock); EState *estate = CreateExecutorState(); @@ -164,7 +164,7 @@ create_intermediate_result(PG_FUNCTION_ARGS) * Intermediate results will be stored in a directory that is derived * from the distributed transaction ID. */ - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); EState *estate = CreateExecutorState(); RemoteFileDestReceiver *resultDest = diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index cb687e299..ef62f7b03 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -53,7 +53,7 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) * Intermediate results of subplans will be stored in a directory that is * derived from the distributed transaction ID. */ - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); foreach(subPlanCell, subPlanList) { diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 6104bcde6..dd5651d71 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -361,7 +361,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName, { ListCell *shardIntervalCell = NULL; - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); /* At this point we intentionally decided to not use 2PC for reference tables */ if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 5d607e93c..4c5394c35 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -284,7 +284,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) errhint("Try running master_create_empty_shard() first"))); } - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); /* issue command to append table to each shard placement */ foreach(shardPlacementCell, shardPlacementList) diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 1a7433843..7a527269a 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -386,7 +386,7 @@ RemoteExplain(Task *task, ExplainState *es) * Use a coordinated transaction to ensure that we open a transaction block * such that we can set a savepoint. */ - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); for (int placementIndex = 0; placementIndex < placementCount; placementIndex++) { diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 040b7800a..27222e69e 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -720,7 +720,7 @@ GetCurrentDistributedTransactionId(void) * sets it for the current backend. It also sets the databaseId and * processId fields. * - * This function should only be called on BeginCoordinatedTransaction(). Any other + * This function should only be called on UseCoordinatedTransaction(). Any other * callers is very likely to break the distributed transaction management. */ void diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 0ca2b9d61..e32e95e42 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -96,7 +96,6 @@ bool FunctionOpensTransactionBlock = true; /* transaction management functions */ -static void BeginCoordinatedTransaction(void); static void CoordinatedTransactionCallback(XactEvent event, void *arg); static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, SubTransactionId parentSubid, void *arg); @@ -111,18 +110,26 @@ static bool MaybeExecutingUDF(void); /* - * BeginOrContinueCoordinatedTransaction starts a coordinated transaction, - * unless one already is in progress. + * UseCoordinatedTransaction sets up the necessary variables to use + * a coordinated transaction, unless one is already in progress. */ void -BeginOrContinueCoordinatedTransaction(void) +UseCoordinatedTransaction(void) { if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED) { return; } - BeginCoordinatedTransaction(); + if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE && + CurrentCoordinatedTransactionState != COORD_TRANS_IDLE) + { + ereport(ERROR, (errmsg("starting transaction in wrong state"))); + } + + CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; + + AssignDistributedTransactionId(); } @@ -169,25 +176,6 @@ InitializeTransactionManagement(void) } -/* - * BeginCoordinatedTransaction begins a coordinated transaction. No - * pre-existing coordinated transaction may be in progress./ - */ -static void -BeginCoordinatedTransaction(void) -{ - if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE && - CurrentCoordinatedTransactionState != COORD_TRANS_IDLE) - { - ereport(ERROR, (errmsg("starting transaction in wrong state"))); - } - - CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; - - AssignDistributedTransactionId(); -} - - /* * Transaction management callback, handling coordinated transaction, and * transaction independent connection management. diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 11a4160b8..fb081d046 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -89,7 +89,7 @@ SendCommandToWorkerAsUser(char *nodeName, int32 nodePort, const char *nodeUser, { uint connectionFlags = 0; - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); CoordinatedTransactionUse2PC(); MultiConnection *transactionConnection = GetNodeUserDatabaseConnection( @@ -279,7 +279,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char * List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); ListCell *workerNodeCell = NULL; - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); CoordinatedTransactionUse2PC(); /* open connections in parallel */ diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index a6240fc2c..bbaba01ff 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -197,7 +197,7 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList) appendStringInfo(lockCommand, "])"); /* need to hold the lock until commit */ - BeginOrContinueCoordinatedTransaction(); + UseCoordinatedTransaction(); /* * Use the superuser connection to make sure we are allowed to lock. diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 0c382c161..924d836be 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -106,7 +106,7 @@ extern StringInfo activeSetStmts; /* * Coordinated transaction management. */ -extern void BeginOrContinueCoordinatedTransaction(void); +extern void UseCoordinatedTransaction(void); extern bool InCoordinatedTransaction(void); extern void CoordinatedTransactionUse2PC(void); extern bool IsMultiStatementTransaction(void);