diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 54efd08db..9958fb94f 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -1162,10 +1162,8 @@ CreateForeignKeyConstraints(List *logicalRepTargetList) list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"), commandList); - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, - target->superuserConnection->user, + SendCommandListToWorkerInSeparateTransaction( + target->superuserConnection, commandList); MemoryContextReset(localContext); @@ -1544,8 +1542,8 @@ DropUser(MultiConnection *connection, char *username) * The DROP USER command should not propagate, so we temporarily disable * DDL propagation. */ - SendCommandListToWorkerOutsideTransaction( - connection->hostname, connection->port, connection->user, + SendCommandListToWorkerInSeparateTransaction( + connection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf("DROP USER IF EXISTS %s", @@ -1730,10 +1728,8 @@ CreateSubscriptions(MultiConnection *sourceConnection, * create a user with SUPERUSER permissions and then alter it to NOSUPERUSER. * This prevents permission escalations. */ - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, - target->superuserConnection->user, + SendCommandListToWorkerInSeparateTransaction( + target->superuserConnection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( @@ -1791,10 +1787,8 @@ CreateSubscriptions(MultiConnection *sourceConnection, * The ALTER ROLE command should not propagate, so we temporarily * disable DDL propagation. */ - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, - target->superuserConnection->user, + SendCommandListToWorkerInSeparateTransaction( + target->superuserConnection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 55a560575..b72fbc396 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -766,6 +766,17 @@ CloseRemoteTransaction(struct MultiConnection *connection) /* XXX: Should we error out for a critical transaction? */ dlist_delete(&connection->transactionNode); + + /* + * If the transaction was completed, we have now cleaned it up, so we + * can reset the state to REMOTE_TRANS_NOT_STARTED. This allows us to + * start a new transaction without running into errors. + */ + if (transaction->transactionState == REMOTE_TRANS_ABORTED || + transaction->transactionState == REMOTE_TRANS_COMMITTED) + { + transaction->transactionState = REMOTE_TRANS_NOT_STARTED; + } } } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 8cce25aca..5880fdba7 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -355,6 +355,34 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, } +/* + * SendCommandListToWorkerInSeparateTransaction sends the command list over the + * connection in a separate connection. This opens a new transaction on the + * connection, thus it's important that no transaction is currently open on the + * given connection. This function is mainly useful to avoid opening an closing + * connections excessively by allowing reusing a single connection to send + * multiple separately committing transactions. The function raises an error if + * any of the queries fail. + */ +void +SendCommandListToWorkerInSeparateTransaction(MultiConnection *workerConnection, + List *commandList) +{ + MarkRemoteTransactionCritical(workerConnection); + RemoteTransactionBegin(workerConnection); + + /* iterate over the commands and execute them in the same connection */ + const char *commandString = NULL; + foreach_ptr(commandString, commandList) + { + ExecuteCriticalRemoteCommand(workerConnection, commandString); + } + + RemoteTransactionCommit(workerConnection); + CloseRemoteTransaction(workerConnection); +} + + /* * SendCommandListToWorkerInCoordinatedTransaction opens connection to the node * with the given nodeName and nodePort. The commands are sent as part of the diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 689a9e192..86c6717d8 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -12,6 +12,7 @@ #ifndef WORKER_TRANSACTION_H #define WORKER_TRANSACTION_H +#include "distributed/connection_management.h" #include "distributed/worker_manager.h" #include "storage/lockdefs.h" @@ -74,6 +75,9 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, List *commandList); +extern void SendCommandListToWorkerInSeparateTransaction( + MultiConnection *workerConnection, + List *commandList); extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction( List *workerNodeList, const char *