diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 577378803..12b5da547 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -16,7 +16,7 @@ #include "miscadmin.h" #include "safe_lib.h" - +#include "postmaster/postmaster.h" #include "access/hash.h" #include "commands/dbcommands.h" #include "distributed/backend_data.h" @@ -63,7 +63,6 @@ static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int cachedConnectionCount); -static void ResetConnection(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection); static int EventSetSizeForConnectionList(List *connections); @@ -244,6 +243,23 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, } +/* + * GetConnectionForLocalQueriesOutsideTransaction returns a localhost connection for + * subtransaction. To avoid creating excessive connections, we reuse an + * existing connection. + */ +MultiConnection * +GetConnectionForLocalQueriesOutsideTransaction(char *userName) +{ + int connectionFlag = OUTSIDE_TRANSACTION; + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlag, LocalHostName, PostPortNumber, + userName, get_database_name(MyDatabaseId)); + + return connection; +} + + /* * StartNodeUserDatabaseConnection() initiates a connection to a remote node. * @@ -688,8 +704,8 @@ CloseConnection(MultiConnection *connection) dlist_delete(&connection->connectionNode); /* same for transaction state and shard/placement machinery */ - CloseRemoteTransaction(connection); CloseShardPlacementAssociation(connection); + ResetRemoteTransaction(connection); /* we leave the per-host entry alive */ pfree(connection); @@ -1443,7 +1459,10 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) /* * reset healthy session lifespan connections. */ - ResetConnection(connection); + ResetRemoteTransaction(connection); + + UnclaimConnection(connection); + cachedConnectionCount++; } @@ -1482,24 +1501,6 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection } -/* - * ResetConnection preserves the given connection for later usage by - * resetting its states. - */ -static void -ResetConnection(MultiConnection *connection) -{ - /* reset per-transaction state */ - ResetRemoteTransaction(connection); - ResetShardPlacementAssociation(connection); - - /* reset copy state */ - connection->copyBytesWrittenSinceLastFlush = 0; - - UnclaimConnection(connection); -} - - /* * RemoteTransactionIdle function returns true if we manually * set flag on run_commands_on_session_level_connection_to_node to true to diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 98b6dd4c6..8df06b70a 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -712,10 +712,10 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType, nodeGroupId, policy); - SendCommandListToWorkerOutsideTransaction(LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), - list_make1(command->data)); + MultiConnection *connection = + GetConnectionForLocalQueriesOutsideTransaction(CitusExtensionOwnerName()); + SendCommandListToWorkerOutsideTransactionWithConnection(connection, + list_make1(command->data)); } @@ -733,10 +733,10 @@ DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId) PG_DIST_CLEANUP, recordId); - SendCommandListToWorkerOutsideTransaction(LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), - list_make1(command->data)); + MultiConnection *connection = GetConnectionForLocalQueriesOutsideTransaction( + CitusExtensionOwnerName()); + SendCommandListToWorkerOutsideTransactionWithConnection(connection, + list_make1(command->data)); } @@ -791,10 +791,14 @@ TryDropShardOutsideTransaction(OperationId operationId, dropQuery->data); /* remove the shard from the node */ - bool success = SendOptionalCommandListToWorkerOutsideTransaction(nodeName, - nodePort, - NULL, - dropCommandList); + int connectionFlags = OUTSIDE_TRANSACTION; + MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + CurrentUserName(), + NULL); + bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + workerConnection, + dropCommandList); return success; } @@ -835,13 +839,8 @@ GetNextOperationId() appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr(sequenceName->data)); - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, - LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), - get_database_name( - MyDatabaseId)); + MultiConnection *connection = GetConnectionForLocalQueriesOutsideTransaction( + CitusExtensionOwnerName()); PGresult *result = NULL; int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data, @@ -856,7 +855,6 @@ GetNextOperationId() PQclear(result); ForgetResults(connection); - CloseConnection(connection); return operationdId; } diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index bcde6b0ae..84ee97cb6 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -151,7 +151,7 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, List *destinationWorkerNodesList, DistributionColumnMap * distributionColumnOverrides); -static void ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode); +static void ExecuteSplitShardReleaseSharedMemory(MultiConnection *sourceConnection); static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 targetNodeId, ShardInterval *shardInterval); @@ -1056,11 +1056,13 @@ static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerPlacementNode) { - char *currentUser = CurrentUserName(); - SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName, - workerPlacementNode->workerPort, - currentUser, - objectCreationCommandList); + MultiConnection *connection = + GetNodeUserDatabaseConnection(OUTSIDE_TRANSACTION, + workerPlacementNode->workerName, + workerPlacementNode->workerPort, + NULL, NULL); + SendCommandListToWorkerOutsideTransactionWithConnection(connection, + objectCreationCommandList); } @@ -1777,7 +1779,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, * 15) Release shared memory allocated by worker_split_shard_replication_setup udf * at source node. */ - ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode); + ExecuteSplitShardReleaseSharedMemory(sourceConnection); /* 16) Close source connection */ CloseConnection(sourceConnection); @@ -2074,19 +2076,8 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, * shared memory to store split information. This has to be released after split completes(or fails). */ static void -ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode) +ExecuteSplitShardReleaseSharedMemory(MultiConnection *sourceConnection) { - char *superUser = CitusExtensionOwnerName(); - char *databaseName = get_database_name(MyDatabaseId); - - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *sourceConnection = GetNodeUserDatabaseConnection( - connectionFlag, - sourceWorkerNode->workerName, - sourceWorkerNode->workerPort, - superUser, - databaseName); - StringInfo splitShardReleaseMemoryUDF = makeStringInfo(); appendStringInfo(splitShardReleaseMemoryUDF, "SELECT pg_catalog.worker_split_shard_release_dsm();"); @@ -2301,14 +2292,8 @@ GetNextShardIdForSplitChild() appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr( "pg_catalog.pg_dist_shardid_seq")); - int connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, - LocalHostName, - PostPortNumber, - CitusExtensionOwnerName(), - get_database_name( - MyDatabaseId)); - + MultiConnection *connection = GetConnectionForLocalQueriesOutsideTransaction( + CitusExtensionOwnerName()); PGresult *result = NULL; int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data, &result); @@ -2325,7 +2310,8 @@ GetNextShardIdForSplitChild() } shardId = SafeStringToUint64(PQgetvalue(result, 0, 0 /* nodeId column*/)); - CloseConnection(connection); + PQclear(result); + ForgetResults(connection); return shardId; } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 598db1988..c8b1dae6e 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -157,6 +157,10 @@ static void WaitForGroupedLogicalRepTargetsToBecomeReady( static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition, GroupedLogicalRepTargets * groupedLogicalRepTargets); +static void RecreateGroupedLogicalRepTargetsConnections( + HTAB *groupedLogicalRepTargetsHash, + char *user, + char *databaseName); /* * LogicallyReplicateShards replicates a list of shards from one node to another @@ -566,10 +570,10 @@ DropAllLogicalReplicationLeftovers(LogicalRepType type) char *databaseName = get_database_name(MyDatabaseId); /* - * We open new connections to all nodes. The reason for this is that - * operations on subscriptions, publications and replication slotscannot be - * run in a transaction. By forcing a new connection we make sure no - * transaction is active on the connection. + * We need connections that are not currently inside a transaction. The + * reason for this is that operations on subscriptions, publications and + * replication slots cannot be run in a transaction. By forcing a new + * connection we make sure no transaction is active on the connection. */ int connectionFlags = FORCE_NEW_CONNECTION; @@ -607,7 +611,9 @@ DropAllLogicalReplicationLeftovers(LogicalRepType type) /* * We close all connections that we opened for the dropping here. That * way we don't keep these connections open unnecessarily during the - * 'LogicalRepType' operation (which can take a long time). + * 'LogicalRepType' operation (which can take a long time). We might + * need to reopen a few later on, but that seems better than keeping + * many open for no reason for a long time. */ CloseConnection(cleanupConnection); } @@ -1157,11 +1163,14 @@ CreatePartitioningHierarchy(List *logicalRepTargetList) * parallel, so create them sequentially. Also attaching partition * is a quick operation, so it is fine to execute sequentially. */ - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, - tableOwner, - list_make1(attachPartitionCommand)); + + MultiConnection *connection = + GetNodeUserDatabaseConnection(OUTSIDE_TRANSACTION, + target->superuserConnection->hostname, + target->superuserConnection->port, + tableOwner, NULL); + ExecuteCriticalRemoteCommand(connection, attachPartitionCommand); + MemoryContextReset(localContext); } } @@ -1210,10 +1219,8 @@ CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList) list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"), commandList); - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, - target->superuserConnection->user, + SendCommandListToWorkerOutsideTransactionWithConnection( + target->superuserConnection, commandList); MemoryContextReset(localContext); @@ -1638,11 +1645,11 @@ DropUser(MultiConnection *connection, char *username) * The DROP USER command should not propagate, so we temporarily disable * DDL propagation. */ - SendCommandListToWorkerOutsideTransaction( - connection->hostname, connection->port, connection->user, + SendCommandListToWorkerOutsideTransactionWithConnection( + connection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", - psprintf("DROP USER IF EXISTS %s", + psprintf("DROP USER IF EXISTS %s;", quote_identifier(username)))); } @@ -1824,14 +1831,12 @@ CreateSubscriptions(MultiConnection *sourceConnection, * create a user with SUPERUSER permissions and then alter it to NOSUPERUSER. * This prevents permission escalations. */ - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, - target->superuserConnection->user, + SendCommandListToWorkerOutsideTransactionWithConnection( + target->superuserConnection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( - "CREATE USER %s SUPERUSER IN ROLE %s", + "CREATE USER %s SUPERUSER IN ROLE %s;", target->subscriptionOwnerName, GetUserNameFromId(ownerId, false) ))); @@ -1885,14 +1890,12 @@ CreateSubscriptions(MultiConnection *sourceConnection, * The ALTER ROLE command should not propagate, so we temporarily * disable DDL propagation. */ - SendCommandListToWorkerOutsideTransaction( - target->superuserConnection->hostname, - target->superuserConnection->port, - target->superuserConnection->user, + SendCommandListToWorkerOutsideTransactionWithConnection( + target->superuserConnection, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( - "ALTER ROLE %s NOSUPERUSER", + "ALTER ROLE %s NOSUPERUSER;", target->subscriptionOwnerName ))); } @@ -2054,8 +2057,12 @@ CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash, * RecreateGroupedLogicalRepTargetsConnections recreates connections for all of the * nodes in the groupedLogicalRepTargetsHash where the old connection is broken or * currently running a query. + * + * IMPORTANT: When it recreates the connection, it doesn't close the existing + * connection. This means that this function should only be called when we know + * we'll throw an error afterwards, otherwise we would leak these connections. */ -void +static void RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash, char *user, char *databaseName) @@ -2065,10 +2072,11 @@ RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash, GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL; foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash) { - if (groupedLogicalRepTargets->superuserConnection && - PQstatus(groupedLogicalRepTargets->superuserConnection->pgConn) == - CONNECTION_OK && - !PQisBusy(groupedLogicalRepTargets->superuserConnection->pgConn) + MultiConnection *superuserConnection = + groupedLogicalRepTargets->superuserConnection; + if (superuserConnection && + PQstatus(superuserConnection->pgConn) == CONNECTION_OK && + !PQisBusy(superuserConnection->pgConn) ) { continue; @@ -2076,12 +2084,12 @@ RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash, WorkerNode *targetWorkerNode = FindNodeWithNodeId( groupedLogicalRepTargets->nodeId, false); - MultiConnection *superuserConnection = - GetNodeUserDatabaseConnection(connectionFlags, - targetWorkerNode->workerName, - targetWorkerNode->workerPort, - user, - databaseName); + superuserConnection = GetNodeUserDatabaseConnection( + connectionFlags, + targetWorkerNode->workerName, + targetWorkerNode->workerPort, + user, + databaseName); /* * Operations on subscriptions cannot run in a transaction block. We diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 55a560575..59a509507 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -751,12 +751,11 @@ MarkRemoteTransactionCritical(struct MultiConnection *connection) /* - * CloseRemoteTransaction handles closing a connection that, potentially, is - * part of a coordinated transaction. This should only ever be called from - * connection_management.c, while closing a connection during a transaction. + * ResetRemoteTransaction resets the state of the transaction after the end of + * the main transaction, if the connection is being reused. */ void -CloseRemoteTransaction(struct MultiConnection *connection) +ResetRemoteTransaction(struct MultiConnection *connection) { RemoteTransaction *transaction = &connection->remoteTransaction; @@ -767,20 +766,14 @@ CloseRemoteTransaction(struct MultiConnection *connection) dlist_delete(&connection->transactionNode); } -} - - -/* - * ResetRemoteTransaction resets the state of the transaction after the end of - * the main transaction, if the connection is being reused. - */ -void -ResetRemoteTransaction(struct MultiConnection *connection) -{ - RemoteTransaction *transaction = &connection->remoteTransaction; /* just reset the entire state, relying on 0 being invalid/false */ memset(transaction, 0, sizeof(*transaction)); + + ResetShardPlacementAssociation(connection); + + /* reset copy state */ + connection->copyBytesWrittenSinceLastFlush = 0; } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 8cce25aca..486dd7280 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -340,6 +340,25 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, nodeName, nodePort, nodeUser, NULL); + SendCommandListToWorkerOutsideTransactionWithConnection(workerConnection, + commandList); + CloseConnection(workerConnection); +} + + +/* + * SendCommandListToWorkerOutsideTransactionWithConnection sends the command list + * over the specified connection. This opens a new transaction on the + * connection, thus it's important that no transaction is currently open. + * This function is mainly useful to avoid opening an closing + * connections excessively by allowing reusing a single connection to send + * multiple separately committing transactions. The function raises an error if + * any of the queries fail. + */ +void +SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection, + List *commandList) +{ MarkRemoteTransactionCritical(workerConnection); RemoteTransactionBegin(workerConnection); @@ -351,7 +370,7 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, } RemoteTransactionCommit(workerConnection); - CloseConnection(workerConnection); + ResetRemoteTransaction(workerConnection); } @@ -430,21 +449,18 @@ SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList /* - * SendOptionalCommandListToWorkerOutsideTransaction sends the given command - * list to the given worker in a single transaction that is outside of the - * coordinated tranaction. If any of the commands fail, it rollbacks the - * transaction, and otherwise commits. + * SendOptionalCommandListToWorkerOutsideTransactionWithConnection sends the + * given command list over a specified connection in a single transaction that + * is outside of the coordinated tranaction. + * + * If any of the commands fail, it rollbacks the transaction, and otherwise commits. + * A successful commit is indicated by returning true, and a failed commit by returning + * false. */ bool -SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, - const char *nodeUser, List *commandList) +SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + MultiConnection *workerConnection, List *commandList) { - int connectionFlags = FORCE_NEW_CONNECTION; - bool failed = false; - - MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, - nodeName, nodePort, - nodeUser, NULL); if (PQstatus(workerConnection->pgConn) != CONNECTION_OK) { return false; @@ -452,6 +468,7 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no RemoteTransactionBegin(workerConnection); /* iterate over the commands and execute them in the same connection */ + bool failed = false; const char *commandString = NULL; foreach_ptr(commandString, commandList) { @@ -471,6 +488,30 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no RemoteTransactionCommit(workerConnection); } + ResetRemoteTransaction(workerConnection); + + return !failed; +} + + +/* + * SendOptionalCommandListToWorkerOutsideTransaction sends the given command + * list to the given worker in a single transaction that is outside of the + * coordinated tranaction. If any of the commands fail, it rollbacks the + * transaction, and otherwise commits. + */ +bool +SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, + const char *nodeUser, List *commandList) +{ + int connectionFlags = FORCE_NEW_CONNECTION; + + MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + nodeUser, NULL); + bool failed = SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + workerConnection, + commandList); CloseConnection(workerConnection); return !failed; diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 6c3b8ae8d..b64785ec7 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -289,6 +289,7 @@ extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname, extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, const char *user, const char *database); +extern MultiConnection * GetConnectionForLocalQueriesOutsideTransaction(char *userName); extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 994650568..a40aebff9 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -172,10 +172,6 @@ extern HTAB * CreateGroupedLogicalRepTargetsHash(List *subscriptionInfoList); extern void CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash, char *user, char *databaseName); -extern void RecreateGroupedLogicalRepTargetsConnections( - HTAB *groupedLogicalRepTargetsHash, - char *user, - char *databaseName); extern void CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash); extern void CompleteNonBlockingShardTransfer(List *shardList, MultiConnection *sourceConnection, diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index f827bd9ec..6136f25c9 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -130,7 +130,6 @@ extern void MarkRemoteTransactionCritical(struct MultiConnection *connection); * transaction managment code. */ -extern void CloseRemoteTransaction(struct MultiConnection *connection); extern void ResetRemoteTransaction(struct MultiConnection *connection); /* perform handling for all in-progress transactions */ diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 689a9e192..aa137b76b 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -12,6 +12,7 @@ #ifndef WORKER_TRANSACTION_H #define WORKER_TRANSACTION_H +#include "distributed/connection_management.h" #include "distributed/worker_manager.h" #include "storage/lockdefs.h" @@ -59,6 +60,10 @@ extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeNa int32 nodePort, const char *nodeUser, List *commandList); +extern bool SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + MultiConnection *workerConnection, + List * + commandList); extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 nodePort, @@ -74,6 +79,9 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, List *commandList); +extern void SendCommandListToWorkerOutsideTransactionWithConnection( + MultiConnection *workerConnection, + List *commandList); extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction( List *workerNodeList, const char * diff --git a/src/test/regress/expected/failure_on_create_subscription.out b/src/test/regress/expected/failure_on_create_subscription.out index abab38a77..cdb9e822e 100644 --- a/src/test/regress/expected/failure_on_create_subscription.out +++ b/src/test/regress/expected/failure_on_create_subscription.out @@ -41,8 +41,13 @@ SELECT * FROM shards_in_workers; 103 | worker1 (4 rows) --- failure on creating the subscription -SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").kill()'); +-- Failure on creating the subscription +-- Failing exactly on CREATE SUBSCRIPTION is causing flaky test where we fail with either: +-- 1) ERROR: connection to the remote node localhost:xxxxx failed with the following error: ERROR: subscription "citus_shard_move_subscription_xxxxxxx" does not exist +-- another command is already in progress +-- 2) ERROR: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress +-- Instead fail on the next step (ALTER SUBSCRIPTION) instead which is also required logically as part of uber CREATE SUBSCRIPTION operation. +SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/sql/failure_on_create_subscription.sql b/src/test/regress/sql/failure_on_create_subscription.sql index b5a649008..0a464da17 100644 --- a/src/test/regress/sql/failure_on_create_subscription.sql +++ b/src/test/regress/sql/failure_on_create_subscription.sql @@ -32,8 +32,14 @@ INSERT INTO t SELECT x, x+1, MD5(random()::text) FROM generate_series(1,100000) -- Initial shard placements SELECT * FROM shards_in_workers; --- failure on creating the subscription -SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").kill()'); +-- Failure on creating the subscription +-- Failing exactly on CREATE SUBSCRIPTION is causing flaky test where we fail with either: +-- 1) ERROR: connection to the remote node localhost:xxxxx failed with the following error: ERROR: subscription "citus_shard_move_subscription_xxxxxxx" does not exist +-- another command is already in progress +-- 2) ERROR: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress +-- Instead fail on the next step (ALTER SUBSCRIPTION) instead which is also required logically as part of uber CREATE SUBSCRIPTION operation. + +SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -- Verify that the shard is not moved and the number of rows are still 100k