diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 19405152c..3cc2c29fe 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -63,7 +63,6 @@ static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount); -static void ResetConnection(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -252,12 +251,9 @@ MultiConnection * GetLocalConnectionForSubtransactionAsUser(char *userName) { int connectionFlag = OUTSIDE_TRANSACTION; - MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, - LocalHostName, - PostPortNumber, - userName, - get_database_name( - MyDatabaseId)); + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlag, LocalHostName, + PostPortNumber, userName, NULL); /* Don't cache connection for the lifetime of the entire session. */ connection->forceCloseAtTransactionEnd = true; @@ -1508,7 +1504,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection * ResetConnection preserves the given connection for later usage by * resetting its states. */ -static void +void ResetConnection(MultiConnection *connection) { /* reset per-transaction state */ diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 1976721f8..5196d4271 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1015,11 +1015,17 @@ static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerPlacementNode) { - char *currentUser = CurrentUserName(); - SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName, - workerPlacementNode->workerPort, - currentUser, - objectCreationCommandList); + int connectionFlags = OUTSIDE_TRANSACTION; + MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, + workerPlacementNode + ->workerName, + workerPlacementNode + ->workerPort, + CurrentUserName(), + NULL); + + SendCommandListToWorkerOutsideTransactionWithConnection(workerConnection, + objectCreationCommandList); } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 98523e03b..ef7e7264a 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -1150,11 +1150,19 @@ CreatePartitioningHierarchy(List *logicalRepTargetList) * parallel, so create them sequentially. Also attaching partition * is a quick operation, so it is fine to execute sequentially. */ - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, + int connectionFlags = OUTSIDE_TRANSACTION; + MultiConnection *workerConnection = GetNodeUserDatabaseConnection( + connectionFlags, + target-> + superuserConnection->hostname, + target-> + superuserConnection->port, tableOwner, - list_make1(attachPartitionCommand)); + NULL); + SendCommandListToWorkerOutsideTransactionWithConnection(workerConnection, + list_make1( + attachPartitionCommand)); + MemoryContextReset(localContext); } } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index b72fbc396..55a560575 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -766,17 +766,6 @@ CloseRemoteTransaction(struct MultiConnection *connection) /* XXX: Should we error out for a critical transaction? */ dlist_delete(&connection->transactionNode); - - /* - * If the transaction was completed, we have now cleaned it up, so we - * can reset the state to REMOTE_TRANS_NOT_STARTED. This allows us to - * start a new transaction without running into errors. - */ - if (transaction->transactionState == REMOTE_TRANS_ABORTED || - transaction->transactionState == REMOTE_TRANS_COMMITTED) - { - transaction->transactionState = REMOTE_TRANS_NOT_STARTED; - } } } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index c0d94c3e6..a5d15f206 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -370,7 +370,7 @@ SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerC } RemoteTransactionCommit(workerConnection); - CloseRemoteTransaction(workerConnection); + ResetConnection(workerConnection); } @@ -488,7 +488,7 @@ SendOptionalCommandListToWorkerOutsideTransactionWithConnection( RemoteTransactionCommit(workerConnection); } - CloseRemoteTransaction(workerConnection); + ResetConnection(workerConnection); return !failed; } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index f862cd40f..021b3e3cc 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -300,6 +300,7 @@ extern void CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort); extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName, const char *database); +extern void ResetConnection(MultiConnection *connection); extern void CloseConnection(MultiConnection *connection); extern void ShutdownAllConnections(void); extern void ShutdownConnection(MultiConnection *connection);