Reuse existing superuser connection more for logical rep

In our logical replication logic we would sometimes call
`SendCommandListToWorkerOutsideTransaction` for a list of queries that
required superuser permissions. This would open a new superuser
connection for each of those calls, even though we already had a
perfectly fine superuser connection lying around.

This adds a new function that allows sending a list of queries in a
transaction over an existing connection and we start using that for our
logical replication.
reuse-connections-for-logical-ref-fkeys-2
Jelte Fennema 2022-09-09 15:22:02 +02:00
parent 00a94c7f13
commit a5aaa08a20
No known key found for this signature in database
4 changed files with 51 additions and 14 deletions

View File

@ -1162,10 +1162,8 @@ CreateForeignKeyConstraints(List *logicalRepTargetList)
list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"),
commandList);
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
target->superuserConnection->user,
SendCommandListToWorkerInSeparateTransaction(
target->superuserConnection,
commandList);
MemoryContextReset(localContext);
@ -1544,8 +1542,8 @@ DropUser(MultiConnection *connection, char *username)
* The DROP USER command should not propagate, so we temporarily disable
* DDL propagation.
*/
SendCommandListToWorkerOutsideTransaction(
connection->hostname, connection->port, connection->user,
SendCommandListToWorkerInSeparateTransaction(
connection,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf("DROP USER IF EXISTS %s",
@ -1730,10 +1728,8 @@ CreateSubscriptions(MultiConnection *sourceConnection,
* create a user with SUPERUSER permissions and then alter it to NOSUPERUSER.
* This prevents permission escalations.
*/
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
target->superuserConnection->user,
SendCommandListToWorkerInSeparateTransaction(
target->superuserConnection,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(
@ -1791,10 +1787,8 @@ CreateSubscriptions(MultiConnection *sourceConnection,
* The ALTER ROLE command should not propagate, so we temporarily
* disable DDL propagation.
*/
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
target->superuserConnection->user,
SendCommandListToWorkerInSeparateTransaction(
target->superuserConnection,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(

View File

@ -766,6 +766,17 @@ CloseRemoteTransaction(struct MultiConnection *connection)
/* XXX: Should we error out for a critical transaction? */
dlist_delete(&connection->transactionNode);
/*
* If the transaction was completed, we have now cleaned it up, so we
* can reset the state to REMOTE_TRANS_NOT_STARTED. This allows us to
* start a new transaction without running into errors.
*/
if (transaction->transactionState == REMOTE_TRANS_ABORTED ||
transaction->transactionState == REMOTE_TRANS_COMMITTED)
{
transaction->transactionState = REMOTE_TRANS_NOT_STARTED;
}
}
}

View File

@ -355,6 +355,34 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
}
/*
* SendCommandListToWorkerInSeparateTransaction sends the command list over the
* connection in a separate connection. This opens a new transaction on the
* connection, thus it's important that no transaction is currently open on the
* given connection. This function is mainly useful to avoid opening an closing
* connections excessively by allowing reusing a single connection to send
* multiple separately committing transactions. The function raises an error if
* any of the queries fail.
*/
void
SendCommandListToWorkerInSeparateTransaction(MultiConnection *workerConnection,
List *commandList)
{
MarkRemoteTransactionCritical(workerConnection);
RemoteTransactionBegin(workerConnection);
/* iterate over the commands and execute them in the same connection */
const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
ExecuteCriticalRemoteCommand(workerConnection, commandString);
}
RemoteTransactionCommit(workerConnection);
CloseRemoteTransaction(workerConnection);
}
/*
* SendCommandListToWorkerInCoordinatedTransaction opens connection to the node
* with the given nodeName and nodePort. The commands are sent as part of the

View File

@ -12,6 +12,7 @@
#ifndef WORKER_TRANSACTION_H
#define WORKER_TRANSACTION_H
#include "distributed/connection_management.h"
#include "distributed/worker_manager.h"
#include "storage/lockdefs.h"
@ -74,6 +75,9 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
int32 nodePort,
const char *nodeUser,
List *commandList);
extern void SendCommandListToWorkerInSeparateTransaction(
MultiConnection *workerConnection,
List *commandList);
extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
List *workerNodeList,
const char *