From 2edfea05ce2883a5cd252e414ef9c0694962cb73 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 16 Sep 2022 17:18:21 +0300 Subject: [PATCH] WIP --- .../connection/connection_management.c | 20 +++++++------------ .../distributed/operations/shard_cleaner.c | 6 ++---- .../distributed/operations/shard_split.c | 11 +++++----- .../replication/multi_logical_replication.c | 16 +++++++++------ .../transaction/remote_transaction.c | 13 ++---------- .../distributed/connection_management.h | 1 + 6 files changed, 28 insertions(+), 39 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 19405152c..699ed87f2 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -63,7 +63,6 @@ static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount); -static void ResetConnection(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -252,15 +251,9 @@ MultiConnection * GetLocalConnectionForSubtransactionAsUser(char *userName) { int connectionFlag = OUTSIDE_TRANSACTION; - MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, - LocalHostName, - PostPortNumber, - userName, - get_database_name( - MyDatabaseId)); - - /* Don't cache connection for the lifetime of the entire session. */ - connection->forceCloseAtTransactionEnd = true; + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlag, LocalHostName, PostPortNumber, + userName, get_database_name(MyDatabaseId)); return connection; } @@ -1467,6 +1460,9 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) */ ResetConnection(connection); + UnclaimConnection(connection); + + cachedConnectionCount++; } } @@ -1508,7 +1504,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection * ResetConnection preserves the given connection for later usage by * resetting its states. */ -static void +void ResetConnection(MultiConnection *connection) { /* reset per-transaction state */ @@ -1517,8 +1513,6 @@ ResetConnection(MultiConnection *connection) /* reset copy state */ connection->copyBytesWrittenSinceLastFlush = 0; - - UnclaimConnection(connection); } diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 25e0b0384..73f2dd46e 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -670,8 +670,8 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType, nodeGroupId, policy); - MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser( - CitusExtensionOwnerName()); + MultiConnection *connection = + GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName()); SendCommandListToWorkerOutsideTransactionWithConnection(connection, list_make1(command->data)); } @@ -761,8 +761,6 @@ TryDropShardOutsideTransaction(OperationId operationId, char *qualifiedTableName nodeName, nodePort, CurrentUserName(), NULL); - workerConnection->forceCloseAtTransactionEnd = true; - bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection( workerConnection, dropCommandList); diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 1976721f8..90be4f12b 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1015,11 +1015,12 @@ static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerPlacementNode) { - char *currentUser = CurrentUserName(); - SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName, - workerPlacementNode->workerPort, - currentUser, - objectCreationCommandList); + MultiConnection *c = + GetNodeUserDatabaseConnection(OUTSIDE_TRANSACTION, + workerPlacementNode->workerName, + workerPlacementNode->workerPort, + NULL, NULL); + SendCommandListToWorkerOutsideTransactionWithConnection(c, objectCreationCommandList); } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 98523e03b..b840f1ede 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -1150,11 +1150,15 @@ CreatePartitioningHierarchy(List *logicalRepTargetList) * parallel, so create them sequentially. Also attaching partition * is a quick operation, so it is fine to execute sequentially. */ - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, - tableOwner, - list_make1(attachPartitionCommand)); + + MultiConnection *c = + GetNodeUserDatabaseConnection(OUTSIDE_TRANSACTION, + target->superuserConnection->hostname, + target->superuserConnection->port, + tableOwner, NULL); + SendCommandListToWorkerOutsideTransactionWithConnection(c, list_make1( + attachPartitionCommand)); + MemoryContextReset(localContext); } } @@ -1587,7 +1591,7 @@ DropUser(MultiConnection *connection, char *username) connection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", - psprintf("DROP USER IF EXISTS %s", + psprintf("DROP USER IF EXISTS %s;", quote_identifier(username)))); } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index b72fbc396..3a8ebbe07 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -766,18 +766,9 @@ CloseRemoteTransaction(struct MultiConnection *connection) /* XXX: Should we error out for a critical transaction? */ dlist_delete(&connection->transactionNode); - - /* - * If the transaction was completed, we have now cleaned it up, so we - * can reset the state to REMOTE_TRANS_NOT_STARTED. This allows us to - * start a new transaction without running into errors. - */ - if (transaction->transactionState == REMOTE_TRANS_ABORTED || - transaction->transactionState == REMOTE_TRANS_COMMITTED) - { - transaction->transactionState = REMOTE_TRANS_NOT_STARTED; - } } + + ResetConnection(connection); } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index f862cd40f..84bf13447 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -303,6 +303,7 @@ extern MultiConnection * ConnectionAvailableToNode(char *hostName, int nodePort, extern void CloseConnection(MultiConnection *connection); extern void ShutdownAllConnections(void); extern void ShutdownConnection(MultiConnection *connection); +extern void ResetConnection(MultiConnection *connection); /* dealing with a connection */ extern void FinishConnectionListEstablishment(List *multiConnectionList);