diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 577378803..19405152c 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -16,7 +16,7 @@ #include "miscadmin.h" #include "safe_lib.h" - +#include "postmaster/postmaster.h" #include "access/hash.h" #include "commands/dbcommands.h" #include "distributed/backend_data.h" @@ -244,6 +244,28 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, } +/* + * GetLocalConnectionForSubtransaction returns a localhost connection for subtransaction. + * To avoid creating excessive connections, we reuse an existing connection. + */ +MultiConnection * +GetLocalConnectionForSubtransactionAsUser(char *userName) +{ + int connectionFlag = OUTSIDE_TRANSACTION; + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, + LocalHostName, + PostPortNumber, + userName, + get_database_name( + MyDatabaseId)); + + /* Don't cache connection for the lifetime of the entire session. */ + connection->forceCloseAtTransactionEnd = true; + + return connection; +} + + /* * StartNodeUserDatabaseConnection() initiates a connection to a remote node. * diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 9775099eb..25e0b0384 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -670,10 +670,10 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType, nodeGroupId, policy); - SendCommandListToWorkerOutsideTransaction(LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), - list_make1(command->data)); + MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser( + CitusExtensionOwnerName()); + SendCommandListToWorkerOutsideTransactionWithConnection(connection, + list_make1(command->data)); } @@ -691,10 +691,10 @@ DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId) PG_DIST_CLEANUP, recordId); - SendCommandListToWorkerOutsideTransaction(LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), - list_make1(command->data)); + MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser( + CitusExtensionOwnerName()); + SendCommandListToWorkerOutsideTransactionWithConnection(connection, + list_make1(command->data)); } @@ -756,10 +756,16 @@ TryDropShardOutsideTransaction(OperationId operationId, char *qualifiedTableName dropQuery->data); /* remove the shard from the node */ - bool success = SendOptionalCommandListToWorkerOutsideTransaction(nodeName, - nodePort, - NULL, - dropCommandList); + int connectionFlags = OUTSIDE_TRANSACTION; + MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + CurrentUserName(), + NULL); + workerConnection->forceCloseAtTransactionEnd = true; + + bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + workerConnection, + dropCommandList); return success; } diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 7386f1555..1976721f8 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -2214,14 +2214,8 @@ GetNextShardIdForSplitChild() appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr( "pg_catalog.pg_dist_shardid_seq")); - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, - LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), - get_database_name( - MyDatabaseId)); - + MultiConnection *connection = GetLocalConnectionForSubtransactionAsUser( + CitusExtensionOwnerName()); PGresult *result = NULL; int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data, &result); @@ -2238,7 +2232,8 @@ GetNextShardIdForSplitChild() } shardId = SafeStringToUint64(PQgetvalue(result, 0, 0 /* nodeId column*/)); - CloseConnection(connection); + PQclear(result); + ForgetResults(connection); return shardId; } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index bc1a69a3d..98523e03b 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -1203,10 +1203,8 @@ CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList) list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"), commandList); - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, - target->superuserConnection->user, + SendCommandListToWorkerOutsideTransactionWithConnection( + target->superuserConnection, commandList); MemoryContextReset(localContext); @@ -1585,8 +1583,8 @@ DropUser(MultiConnection *connection, char *username) * The DROP USER command should not propagate, so we temporarily disable * DDL propagation. */ - SendCommandListToWorkerOutsideTransaction( - connection->hostname, connection->port, connection->user, + SendCommandListToWorkerOutsideTransactionWithConnection( + connection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf("DROP USER IF EXISTS %s", @@ -1771,10 +1769,8 @@ CreateSubscriptions(MultiConnection *sourceConnection, * create a user with SUPERUSER permissions and then alter it to NOSUPERUSER. * This prevents permission escalations. */ - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, - target->superuserConnection->user, + SendCommandListToWorkerOutsideTransactionWithConnection( + target->superuserConnection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( @@ -1832,10 +1828,8 @@ CreateSubscriptions(MultiConnection *sourceConnection, * The ALTER ROLE command should not propagate, so we temporarily * disable DDL propagation. */ - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, - target->superuserConnection->user, + SendCommandListToWorkerOutsideTransactionWithConnection( + target->superuserConnection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 55a560575..b72fbc396 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -766,6 +766,17 @@ CloseRemoteTransaction(struct MultiConnection *connection) /* XXX: Should we error out for a critical transaction? */ dlist_delete(&connection->transactionNode); + + /* + * If the transaction was completed, we have now cleaned it up, so we + * can reset the state to REMOTE_TRANS_NOT_STARTED. This allows us to + * start a new transaction without running into errors. + */ + if (transaction->transactionState == REMOTE_TRANS_ABORTED || + transaction->transactionState == REMOTE_TRANS_COMMITTED) + { + transaction->transactionState = REMOTE_TRANS_NOT_STARTED; + } } } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 8cce25aca..c0d94c3e6 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -340,6 +340,25 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, nodeName, nodePort, nodeUser, NULL); + SendCommandListToWorkerOutsideTransactionWithConnection(workerConnection, + commandList); + CloseConnection(workerConnection); +} + + +/* + * SendCommandListToWorkerOutsideTransactionWithConnection sends the command list + * over the specified connection. This opens a new transaction on the + * connection, thus it's important that no transaction is currently open. + * This function is mainly useful to avoid opening an closing + * connections excessively by allowing reusing a single connection to send + * multiple separately committing transactions. The function raises an error if + * any of the queries fail. + */ +void +SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection, + List *commandList) +{ MarkRemoteTransactionCritical(workerConnection); RemoteTransactionBegin(workerConnection); @@ -351,7 +370,7 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, } RemoteTransactionCommit(workerConnection); - CloseConnection(workerConnection); + CloseRemoteTransaction(workerConnection); } @@ -430,21 +449,18 @@ SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList /* - * SendOptionalCommandListToWorkerOutsideTransaction sends the given command - * list to the given worker in a single transaction that is outside of the - * coordinated tranaction. If any of the commands fail, it rollbacks the - * transaction, and otherwise commits. + * SendOptionalCommandListToWorkerOutsideTransactionWithConnection sends the + * given command list over a specified connection in a single transaction that + * is outside of the coordinated tranaction. + * + * If any of the commands fail, it rollbacks the transaction, and otherwise commits. + * A successful commit is indicated by returning true, and a failed commit by returning + * false. */ bool -SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, - const char *nodeUser, List *commandList) +SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + MultiConnection *workerConnection, List *commandList) { - int connectionFlags = FORCE_NEW_CONNECTION; - bool failed = false; - - MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, - nodeName, nodePort, - nodeUser, NULL); if (PQstatus(workerConnection->pgConn) != CONNECTION_OK) { return false; @@ -452,6 +468,7 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no RemoteTransactionBegin(workerConnection); /* iterate over the commands and execute them in the same connection */ + bool failed = false; const char *commandString = NULL; foreach_ptr(commandString, commandList) { @@ -471,6 +488,30 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no RemoteTransactionCommit(workerConnection); } + CloseRemoteTransaction(workerConnection); + + return !failed; +} + + +/* + * SendOptionalCommandListToWorkerOutsideTransaction sends the given command + * list to the given worker in a single transaction that is outside of the + * coordinated tranaction. If any of the commands fail, it rollbacks the + * transaction, and otherwise commits. + */ +bool +SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, + const char *nodeUser, List *commandList) +{ + int connectionFlags = FORCE_NEW_CONNECTION; + + MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + nodeUser, NULL); + bool failed = SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + workerConnection, + commandList); CloseConnection(workerConnection); return !failed; diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 6c3b8ae8d..f862cd40f 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -289,6 +289,7 @@ extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const char *user, const char *database); +extern MultiConnection * GetLocalConnectionForSubtransactionAsUser(char *userName); extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 689a9e192..aa137b76b 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -12,6 +12,7 @@ #ifndef WORKER_TRANSACTION_H #define WORKER_TRANSACTION_H +#include "distributed/connection_management.h" #include "distributed/worker_manager.h" #include "storage/lockdefs.h" @@ -59,6 +60,10 @@ extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeNa int32 nodePort, const char *nodeUser, List *commandList); +extern bool SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + MultiConnection *workerConnection, + List * + commandList); extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 nodePort, @@ -74,6 +79,9 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, List *commandList); +extern void SendCommandListToWorkerOutsideTransactionWithConnection( + MultiConnection *workerConnection, + List *commandList); extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction( List *workerNodeList, const char * diff --git a/src/test/regress/expected/failure_on_create_subscription.out b/src/test/regress/expected/failure_on_create_subscription.out index abab38a77..cdb9e822e 100644 --- a/src/test/regress/expected/failure_on_create_subscription.out +++ b/src/test/regress/expected/failure_on_create_subscription.out @@ -41,8 +41,13 @@ SELECT * FROM shards_in_workers; 103 | worker1 (4 rows) --- failure on creating the subscription -SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").kill()'); +-- Failure on creating the subscription +-- Failing exactly on CREATE SUBSCRIPTION is causing flaky test where we fail with either: +-- 1) ERROR: connection to the remote node localhost:xxxxx failed with the following error: ERROR: subscription "citus_shard_move_subscription_xxxxxxx" does not exist +-- another command is already in progress +-- 2) ERROR: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress +-- Instead fail on the next step (ALTER SUBSCRIPTION) instead which is also required logically as part of uber CREATE SUBSCRIPTION operation. +SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/sql/failure_on_create_subscription.sql b/src/test/regress/sql/failure_on_create_subscription.sql index b5a649008..0a464da17 100644 --- a/src/test/regress/sql/failure_on_create_subscription.sql +++ b/src/test/regress/sql/failure_on_create_subscription.sql @@ -32,8 +32,14 @@ INSERT INTO t SELECT x, x+1, MD5(random()::text) FROM generate_series(1,100000) -- Initial shard placements SELECT * FROM shards_in_workers; --- failure on creating the subscription -SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").kill()'); +-- Failure on creating the subscription +-- Failing exactly on CREATE SUBSCRIPTION is causing flaky test where we fail with either: +-- 1) ERROR: connection to the remote node localhost:xxxxx failed with the following error: ERROR: subscription "citus_shard_move_subscription_xxxxxxx" does not exist +-- another command is already in progress +-- 2) ERROR: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress +-- Instead fail on the next step (ALTER SUBSCRIPTION) instead which is also required logically as part of uber CREATE SUBSCRIPTION operation. + +SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -- Verify that the shard is not moved and the number of rows are still 100k