From 17ae71fa8fcc7605b17696c058d08a32d08da56d Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Wed, 14 Sep 2022 14:56:35 -0700 Subject: [PATCH] Fix PR comments --- .../connection/connection_management.c | 23 +++++++- .../distributed/operations/shard_cleaner.c | 15 ++--- .../distributed/operations/shard_split.c | 12 +--- .../replication/multi_logical_replication.c | 8 +-- .../transaction/worker_transaction.c | 58 +++++++++++-------- .../distributed/connection_management.h | 1 + src/include/distributed/worker_transaction.h | 5 +- 7 files changed, 73 insertions(+), 49 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 577378803..2de821983 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -16,7 +16,7 @@ #include "miscadmin.h" #include "safe_lib.h" - +#include "postmaster/postmaster.h" #include "access/hash.h" #include "commands/dbcommands.h" #include "distributed/backend_data.h" @@ -244,6 +244,27 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, } +/* + * GetLocalConnectionForSubtransaction establishes a localhost connection. + * To avoid creating excessive connections, we try to reuse an existing connection. + */ +MultiConnection* +GetLocalConnectionForSubtransactionAsUser(char *userName) +{ + int connectionFlag = OUTSIDE_TRANSACTION; + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, + LocalHostName, + PostPortNumber, + userName, + get_database_name( + MyDatabaseId)); + /* Don't cache connection for the lifetime of the entire session. */ + connection->forceCloseAtTransactionEnd = true; + + return connection; +} + + /* * StartNodeUserDatabaseConnection() initiates a connection to a remote node. * diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 9775099eb..e58c5a953 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -670,9 +670,8 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType, nodeGroupId, policy); - SendCommandListToWorkerOutsideTransaction(LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), + MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName()); + SendCommandListToWorkerOutsideTransactionWithConnection(connection, list_make1(command->data)); } @@ -691,9 +690,8 @@ DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId) PG_DIST_CLEANUP, recordId); - SendCommandListToWorkerOutsideTransaction(LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), + MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName()); + SendCommandListToWorkerOutsideTransactionWithConnection(connection, list_make1(command->data)); } @@ -756,9 +754,8 @@ TryDropShardOutsideTransaction(OperationId operationId, char *qualifiedTableName dropQuery->data); /* remove the shard from the node */ - bool success = SendOptionalCommandListToWorkerOutsideTransaction(nodeName, - nodePort, - NULL, + MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName()); + bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(connection, dropCommandList); return success; diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 4656e798f..f12d14f80 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -2245,14 +2245,7 @@ GetNextShardIdForSplitChild() appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr( "pg_catalog.pg_dist_shardid_seq")); - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, - LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), - get_database_name( - MyDatabaseId)); - + MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser(CitusExtensionOwnerName()); PGresult *result = NULL; int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data, &result); @@ -2269,7 +2262,8 @@ GetNextShardIdForSplitChild() } shardId = SafeStringToUint64(PQgetvalue(result, 0, 0 /* nodeId column*/)); - CloseConnection(connection); + PQclear(result); + ForgetResults(connection); return shardId; } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 9958fb94f..d3f351da7 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -1162,7 +1162,7 @@ CreateForeignKeyConstraints(List *logicalRepTargetList) list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"), commandList); - SendCommandListToWorkerInSeparateTransaction( + SendCommandListToWorkerOutsideTransactionWithConnection( target->superuserConnection, commandList); @@ -1542,7 +1542,7 @@ DropUser(MultiConnection *connection, char *username) * The DROP USER command should not propagate, so we temporarily disable * DDL propagation. */ - SendCommandListToWorkerInSeparateTransaction( + SendCommandListToWorkerOutsideTransactionWithConnection( connection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", @@ -1728,7 +1728,7 @@ CreateSubscriptions(MultiConnection *sourceConnection, * create a user with SUPERUSER permissions and then alter it to NOSUPERUSER. * This prevents permission escalations. */ - SendCommandListToWorkerInSeparateTransaction( + SendCommandListToWorkerOutsideTransactionWithConnection( target->superuserConnection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", @@ -1787,7 +1787,7 @@ CreateSubscriptions(MultiConnection *sourceConnection, * The ALTER ROLE command should not propagate, so we temporarily * disable DDL propagation. */ - SendCommandListToWorkerInSeparateTransaction( + SendCommandListToWorkerOutsideTransactionWithConnection( target->superuserConnection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 5880fdba7..7b0e037d1 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -340,24 +340,15 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, nodeName, nodePort, nodeUser, NULL); - MarkRemoteTransactionCritical(workerConnection); - RemoteTransactionBegin(workerConnection); - - /* iterate over the commands and execute them in the same connection */ - const char *commandString = NULL; - foreach_ptr(commandString, commandList) - { - ExecuteCriticalRemoteCommand(workerConnection, commandString); - } - - RemoteTransactionCommit(workerConnection); + SendCommandListToWorkerOutsideTransactionWithConnection(workerConnection, + commandList); CloseConnection(workerConnection); } /* - * SendCommandListToWorkerInSeparateTransaction sends the command list over the - * connection in a separate connection. This opens a new transaction on the + * SendCommandListToWorkerOutsideTransactionWithConnection sends the command list + * over the connection in a separate connection. This opens a new transaction on the * connection, thus it's important that no transaction is currently open on the * given connection. This function is mainly useful to avoid opening an closing * connections excessively by allowing reusing a single connection to send @@ -365,7 +356,7 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, * any of the queries fail. */ void -SendCommandListToWorkerInSeparateTransaction(MultiConnection *workerConnection, +SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection, List *commandList) { MarkRemoteTransactionCritical(workerConnection); @@ -458,21 +449,15 @@ SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList /* - * SendOptionalCommandListToWorkerOutsideTransaction sends the given command - * list to the given worker in a single transaction that is outside of the - * coordinated tranaction. If any of the commands fail, it rollbacks the + * SendOptionalCommandListToWorkerOutsideTransactionWithConnection sends the + * given command list to the given worker in a single transaction that is outside + * of the coordinated tranaction. If any of the commands fail, it rollbacks the * transaction, and otherwise commits. + * */ bool -SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, - const char *nodeUser, List *commandList) +SendOptionalCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection, List *commandList) { - int connectionFlags = FORCE_NEW_CONNECTION; - bool failed = false; - - MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, - nodeName, nodePort, - nodeUser, NULL); if (PQstatus(workerConnection->pgConn) != CONNECTION_OK) { return false; @@ -480,6 +465,7 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no RemoteTransactionBegin(workerConnection); /* iterate over the commands and execute them in the same connection */ + bool failed = false; const char *commandString = NULL; foreach_ptr(commandString, commandList) { @@ -499,6 +485,28 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no RemoteTransactionCommit(workerConnection); } + CloseRemoteTransaction(workerConnection); + + return !failed; +} + +/* + * SendOptionalCommandListToWorkerOutsideTransaction sends the given command + * list to the given worker in a single transaction that is outside of the + * coordinated tranaction. If any of the commands fail, it rollbacks the + * transaction, and otherwise commits. + */ +bool +SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, + const char *nodeUser, List *commandList) +{ + int connectionFlags = FORCE_NEW_CONNECTION; + + MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + nodeUser, NULL); + bool failed = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(workerConnection, + commandList); CloseConnection(workerConnection); return !failed; diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 6c3b8ae8d..2587b311c 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -289,6 +289,7 @@ extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const char *user, const char *database); +extern MultiConnection* GetLocalConnectionForSubtransactionAsUser(char* userName); extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 86c6717d8..a9a93e48e 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -60,6 +60,9 @@ extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeNa int32 nodePort, const char *nodeUser, List *commandList); +extern bool SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + MultiConnection *workerConnection, + List *commandList); extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 nodePort, @@ -75,7 +78,7 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, List *commandList); -extern void SendCommandListToWorkerInSeparateTransaction( +extern void SendCommandListToWorkerOutsideTransactionWithConnection( MultiConnection *workerConnection, List *commandList); extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(