Reuse connections for Splits and Logical Replication (#6314)

In Split, Logical replication logic and ShardCleaner we call
`SendCommandListToWorkerOutsideTransaction` and
`SendOptionalCommandListToWorkerOutsideTransaction` frequently. This
opens new connection for each of those calls, even though we already
have a perfectly good connection lying around.

This PR adds two new APIs
`SendCommandListToWorkerOutsideTransactionWithConnection` and
`SendOptionalCommandListToWorkerOutsideTransactionWithConnection` that
allow sending a list of queries in a transaction over an existing
connection. We also update the callers (Split, ShardCleaner, Logical
Replication) to use these new APIs instead.

Co-authored-by: Nitish Upreti <niupre@microsoft.com>
Co-authored-by: Onder Kalaci <onderkalaci@gmail.com>
pull/6368/head
Jelte Fennema 2022-09-26 13:37:40 +02:00 committed by GitHub
parent dc9723fa45
commit 24e06af6d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 187 additions and 145 deletions

View File

@ -16,7 +16,7 @@
#include "miscadmin.h"
#include "safe_lib.h"
#include "postmaster/postmaster.h"
#include "access/hash.h"
#include "commands/dbcommands.h"
#include "distributed/backend_data.h"
@ -63,7 +63,6 @@ static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int
cachedConnectionCount);
static void ResetConnection(MultiConnection *connection);
static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections);
@ -244,6 +243,23 @@ GetNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
}
/*
* GetConnectionForLocalQueriesOutsideTransaction returns a localhost connection for
* subtransaction. To avoid creating excessive connections, we reuse an
* existing connection.
*/
MultiConnection *
GetConnectionForLocalQueriesOutsideTransaction(char *userName)
{
int connectionFlag = OUTSIDE_TRANSACTION;
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlag, LocalHostName, PostPortNumber,
userName, get_database_name(MyDatabaseId));
return connection;
}
/*
* StartNodeUserDatabaseConnection() initiates a connection to a remote node.
*
@ -688,8 +704,8 @@ CloseConnection(MultiConnection *connection)
dlist_delete(&connection->connectionNode);
/* same for transaction state and shard/placement machinery */
CloseRemoteTransaction(connection);
CloseShardPlacementAssociation(connection);
ResetRemoteTransaction(connection);
/* we leave the per-host entry alive */
pfree(connection);
@ -1443,7 +1459,10 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
/*
* reset healthy session lifespan connections.
*/
ResetConnection(connection);
ResetRemoteTransaction(connection);
UnclaimConnection(connection);
cachedConnectionCount++;
}
@ -1482,24 +1501,6 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
}
/*
* ResetConnection preserves the given connection for later usage by
* resetting its states.
*/
static void
ResetConnection(MultiConnection *connection)
{
/* reset per-transaction state */
ResetRemoteTransaction(connection);
ResetShardPlacementAssociation(connection);
/* reset copy state */
connection->copyBytesWrittenSinceLastFlush = 0;
UnclaimConnection(connection);
}
/*
* RemoteTransactionIdle function returns true if we manually
* set flag on run_commands_on_session_level_connection_to_node to true to

View File

@ -712,10 +712,10 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType,
nodeGroupId,
policy);
SendCommandListToWorkerOutsideTransaction(LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
list_make1(command->data));
MultiConnection *connection =
GetConnectionForLocalQueriesOutsideTransaction(CitusExtensionOwnerName());
SendCommandListToWorkerOutsideTransactionWithConnection(connection,
list_make1(command->data));
}
@ -733,10 +733,10 @@ DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId)
PG_DIST_CLEANUP,
recordId);
SendCommandListToWorkerOutsideTransaction(LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
list_make1(command->data));
MultiConnection *connection = GetConnectionForLocalQueriesOutsideTransaction(
CitusExtensionOwnerName());
SendCommandListToWorkerOutsideTransactionWithConnection(connection,
list_make1(command->data));
}
@ -791,10 +791,14 @@ TryDropShardOutsideTransaction(OperationId operationId,
dropQuery->data);
/* remove the shard from the node */
bool success = SendOptionalCommandListToWorkerOutsideTransaction(nodeName,
nodePort,
NULL,
dropCommandList);
int connectionFlags = OUTSIDE_TRANSACTION;
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
CurrentUserName(),
NULL);
bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
workerConnection,
dropCommandList);
return success;
}
@ -835,13 +839,8 @@ GetNextOperationId()
appendStringInfo(nextValueCommand, "SELECT nextval(%s);",
quote_literal_cstr(sequenceName->data));
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag,
LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
MultiConnection *connection = GetConnectionForLocalQueriesOutsideTransaction(
CitusExtensionOwnerName());
PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data,
@ -856,7 +855,6 @@ GetNextOperationId()
PQclear(result);
ForgetResults(connection);
CloseConnection(connection);
return operationdId;
}

View File

@ -151,7 +151,7 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
List *destinationWorkerNodesList,
DistributionColumnMap *
distributionColumnOverrides);
static void ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode);
static void ExecuteSplitShardReleaseSharedMemory(MultiConnection *sourceConnection);
static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32
targetNodeId,
ShardInterval *shardInterval);
@ -1056,11 +1056,13 @@ static void
CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerPlacementNode)
{
char *currentUser = CurrentUserName();
SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName,
workerPlacementNode->workerPort,
currentUser,
objectCreationCommandList);
MultiConnection *connection =
GetNodeUserDatabaseConnection(OUTSIDE_TRANSACTION,
workerPlacementNode->workerName,
workerPlacementNode->workerPort,
NULL, NULL);
SendCommandListToWorkerOutsideTransactionWithConnection(connection,
objectCreationCommandList);
}
@ -1777,7 +1779,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
* 15) Release shared memory allocated by worker_split_shard_replication_setup udf
* at source node.
*/
ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode);
ExecuteSplitShardReleaseSharedMemory(sourceConnection);
/* 16) Close source connection */
CloseConnection(sourceConnection);
@ -2074,19 +2076,8 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
* shared memory to store split information. This has to be released after split completes(or fails).
*/
static void
ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode)
ExecuteSplitShardReleaseSharedMemory(MultiConnection *sourceConnection)
{
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(
connectionFlag,
sourceWorkerNode->workerName,
sourceWorkerNode->workerPort,
superUser,
databaseName);
StringInfo splitShardReleaseMemoryUDF = makeStringInfo();
appendStringInfo(splitShardReleaseMemoryUDF,
"SELECT pg_catalog.worker_split_shard_release_dsm();");
@ -2301,14 +2292,8 @@ GetNextShardIdForSplitChild()
appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr(
"pg_catalog.pg_dist_shardid_seq"));
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag,
LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
MultiConnection *connection = GetConnectionForLocalQueriesOutsideTransaction(
CitusExtensionOwnerName());
PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data,
&result);
@ -2325,7 +2310,8 @@ GetNextShardIdForSplitChild()
}
shardId = SafeStringToUint64(PQgetvalue(result, 0, 0 /* nodeId column*/));
CloseConnection(connection);
PQclear(result);
ForgetResults(connection);
return shardId;
}

View File

@ -157,6 +157,10 @@ static void WaitForGroupedLogicalRepTargetsToBecomeReady(
static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition,
GroupedLogicalRepTargets *
groupedLogicalRepTargets);
static void RecreateGroupedLogicalRepTargetsConnections(
HTAB *groupedLogicalRepTargetsHash,
char *user,
char *databaseName);
/*
* LogicallyReplicateShards replicates a list of shards from one node to another
@ -566,10 +570,10 @@ DropAllLogicalReplicationLeftovers(LogicalRepType type)
char *databaseName = get_database_name(MyDatabaseId);
/*
* We open new connections to all nodes. The reason for this is that
* operations on subscriptions, publications and replication slotscannot be
* run in a transaction. By forcing a new connection we make sure no
* transaction is active on the connection.
* We need connections that are not currently inside a transaction. The
* reason for this is that operations on subscriptions, publications and
* replication slots cannot be run in a transaction. By forcing a new
* connection we make sure no transaction is active on the connection.
*/
int connectionFlags = FORCE_NEW_CONNECTION;
@ -607,7 +611,9 @@ DropAllLogicalReplicationLeftovers(LogicalRepType type)
/*
* We close all connections that we opened for the dropping here. That
* way we don't keep these connections open unnecessarily during the
* 'LogicalRepType' operation (which can take a long time).
* 'LogicalRepType' operation (which can take a long time). We might
* need to reopen a few later on, but that seems better than keeping
* many open for no reason for a long time.
*/
CloseConnection(cleanupConnection);
}
@ -1157,11 +1163,14 @@ CreatePartitioningHierarchy(List *logicalRepTargetList)
* parallel, so create them sequentially. Also attaching partition
* is a quick operation, so it is fine to execute sequentially.
*/
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
tableOwner,
list_make1(attachPartitionCommand));
MultiConnection *connection =
GetNodeUserDatabaseConnection(OUTSIDE_TRANSACTION,
target->superuserConnection->hostname,
target->superuserConnection->port,
tableOwner, NULL);
ExecuteCriticalRemoteCommand(connection, attachPartitionCommand);
MemoryContextReset(localContext);
}
}
@ -1210,10 +1219,8 @@ CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList)
list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"),
commandList);
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
target->superuserConnection->user,
SendCommandListToWorkerOutsideTransactionWithConnection(
target->superuserConnection,
commandList);
MemoryContextReset(localContext);
@ -1638,11 +1645,11 @@ DropUser(MultiConnection *connection, char *username)
* The DROP USER command should not propagate, so we temporarily disable
* DDL propagation.
*/
SendCommandListToWorkerOutsideTransaction(
connection->hostname, connection->port, connection->user,
SendCommandListToWorkerOutsideTransactionWithConnection(
connection,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf("DROP USER IF EXISTS %s",
psprintf("DROP USER IF EXISTS %s;",
quote_identifier(username))));
}
@ -1824,14 +1831,12 @@ CreateSubscriptions(MultiConnection *sourceConnection,
* create a user with SUPERUSER permissions and then alter it to NOSUPERUSER.
* This prevents permission escalations.
*/
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
target->superuserConnection->user,
SendCommandListToWorkerOutsideTransactionWithConnection(
target->superuserConnection,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(
"CREATE USER %s SUPERUSER IN ROLE %s",
"CREATE USER %s SUPERUSER IN ROLE %s;",
target->subscriptionOwnerName,
GetUserNameFromId(ownerId, false)
)));
@ -1885,14 +1890,12 @@ CreateSubscriptions(MultiConnection *sourceConnection,
* The ALTER ROLE command should not propagate, so we temporarily
* disable DDL propagation.
*/
SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
target->superuserConnection->user,
SendCommandListToWorkerOutsideTransactionWithConnection(
target->superuserConnection,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(
"ALTER ROLE %s NOSUPERUSER",
"ALTER ROLE %s NOSUPERUSER;",
target->subscriptionOwnerName
)));
}
@ -2054,8 +2057,12 @@ CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
* RecreateGroupedLogicalRepTargetsConnections recreates connections for all of the
* nodes in the groupedLogicalRepTargetsHash where the old connection is broken or
* currently running a query.
*
* IMPORTANT: When it recreates the connection, it doesn't close the existing
* connection. This means that this function should only be called when we know
* we'll throw an error afterwards, otherwise we would leak these connections.
*/
void
static void
RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
char *user,
char *databaseName)
@ -2065,10 +2072,11 @@ RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL;
foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash)
{
if (groupedLogicalRepTargets->superuserConnection &&
PQstatus(groupedLogicalRepTargets->superuserConnection->pgConn) ==
CONNECTION_OK &&
!PQisBusy(groupedLogicalRepTargets->superuserConnection->pgConn)
MultiConnection *superuserConnection =
groupedLogicalRepTargets->superuserConnection;
if (superuserConnection &&
PQstatus(superuserConnection->pgConn) == CONNECTION_OK &&
!PQisBusy(superuserConnection->pgConn)
)
{
continue;
@ -2076,12 +2084,12 @@ RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
WorkerNode *targetWorkerNode = FindNodeWithNodeId(
groupedLogicalRepTargets->nodeId,
false);
MultiConnection *superuserConnection =
GetNodeUserDatabaseConnection(connectionFlags,
targetWorkerNode->workerName,
targetWorkerNode->workerPort,
user,
databaseName);
superuserConnection = GetNodeUserDatabaseConnection(
connectionFlags,
targetWorkerNode->workerName,
targetWorkerNode->workerPort,
user,
databaseName);
/*
* Operations on subscriptions cannot run in a transaction block. We

View File

@ -751,12 +751,11 @@ MarkRemoteTransactionCritical(struct MultiConnection *connection)
/*
* CloseRemoteTransaction handles closing a connection that, potentially, is
* part of a coordinated transaction. This should only ever be called from
* connection_management.c, while closing a connection during a transaction.
* ResetRemoteTransaction resets the state of the transaction after the end of
* the main transaction, if the connection is being reused.
*/
void
CloseRemoteTransaction(struct MultiConnection *connection)
ResetRemoteTransaction(struct MultiConnection *connection)
{
RemoteTransaction *transaction = &connection->remoteTransaction;
@ -767,20 +766,14 @@ CloseRemoteTransaction(struct MultiConnection *connection)
dlist_delete(&connection->transactionNode);
}
}
/*
* ResetRemoteTransaction resets the state of the transaction after the end of
* the main transaction, if the connection is being reused.
*/
void
ResetRemoteTransaction(struct MultiConnection *connection)
{
RemoteTransaction *transaction = &connection->remoteTransaction;
/* just reset the entire state, relying on 0 being invalid/false */
memset(transaction, 0, sizeof(*transaction));
ResetShardPlacementAssociation(connection);
/* reset copy state */
connection->copyBytesWrittenSinceLastFlush = 0;
}

View File

@ -340,6 +340,25 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
nodeName, nodePort,
nodeUser, NULL);
SendCommandListToWorkerOutsideTransactionWithConnection(workerConnection,
commandList);
CloseConnection(workerConnection);
}
/*
* SendCommandListToWorkerOutsideTransactionWithConnection sends the command list
* over the specified connection. This opens a new transaction on the
* connection, thus it's important that no transaction is currently open.
* This function is mainly useful to avoid opening an closing
* connections excessively by allowing reusing a single connection to send
* multiple separately committing transactions. The function raises an error if
* any of the queries fail.
*/
void
SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *workerConnection,
List *commandList)
{
MarkRemoteTransactionCritical(workerConnection);
RemoteTransactionBegin(workerConnection);
@ -351,7 +370,7 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
}
RemoteTransactionCommit(workerConnection);
CloseConnection(workerConnection);
ResetRemoteTransaction(workerConnection);
}
@ -430,21 +449,18 @@ SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList
/*
* SendOptionalCommandListToWorkerOutsideTransaction sends the given command
* list to the given worker in a single transaction that is outside of the
* coordinated tranaction. If any of the commands fail, it rollbacks the
* transaction, and otherwise commits.
* SendOptionalCommandListToWorkerOutsideTransactionWithConnection sends the
* given command list over a specified connection in a single transaction that
* is outside of the coordinated tranaction.
*
* If any of the commands fail, it rollbacks the transaction, and otherwise commits.
* A successful commit is indicated by returning true, and a failed commit by returning
* false.
*/
bool
SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
const char *nodeUser, List *commandList)
SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection, List *commandList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
bool failed = false;
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
nodeUser, NULL);
if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
{
return false;
@ -452,6 +468,7 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no
RemoteTransactionBegin(workerConnection);
/* iterate over the commands and execute them in the same connection */
bool failed = false;
const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
@ -471,6 +488,30 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no
RemoteTransactionCommit(workerConnection);
}
ResetRemoteTransaction(workerConnection);
return !failed;
}
/*
* SendOptionalCommandListToWorkerOutsideTransaction sends the given command
* list to the given worker in a single transaction that is outside of the
* coordinated tranaction. If any of the commands fail, it rollbacks the
* transaction, and otherwise commits.
*/
bool
SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
const char *nodeUser, List *commandList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
nodeUser, NULL);
bool failed = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
workerConnection,
commandList);
CloseConnection(workerConnection);
return !failed;

View File

@ -289,6 +289,7 @@ extern MultiConnection * StartNodeConnection(uint32 flags, const char *hostname,
extern MultiConnection * GetNodeUserDatabaseConnection(uint32 flags, const char *hostname,
int32 port, const char *user,
const char *database);
extern MultiConnection * GetConnectionForLocalQueriesOutsideTransaction(char *userName);
extern MultiConnection * StartNodeUserDatabaseConnection(uint32 flags,
const char *hostname,
int32 port,

View File

@ -172,10 +172,6 @@ extern HTAB * CreateGroupedLogicalRepTargetsHash(List *subscriptionInfoList);
extern void CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
char *user,
char *databaseName);
extern void RecreateGroupedLogicalRepTargetsConnections(
HTAB *groupedLogicalRepTargetsHash,
char *user,
char *databaseName);
extern void CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash);
extern void CompleteNonBlockingShardTransfer(List *shardList,
MultiConnection *sourceConnection,

View File

@ -130,7 +130,6 @@ extern void MarkRemoteTransactionCritical(struct MultiConnection *connection);
* transaction managment code.
*/
extern void CloseRemoteTransaction(struct MultiConnection *connection);
extern void ResetRemoteTransaction(struct MultiConnection *connection);
/* perform handling for all in-progress transactions */

View File

@ -12,6 +12,7 @@
#ifndef WORKER_TRANSACTION_H
#define WORKER_TRANSACTION_H
#include "distributed/connection_management.h"
#include "distributed/worker_manager.h"
#include "storage/lockdefs.h"
@ -59,6 +60,10 @@ extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeNa
int32 nodePort,
const char *nodeUser,
List *commandList);
extern bool SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection,
List *
commandList);
extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const
char *nodeName,
int32 nodePort,
@ -74,6 +79,9 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
int32 nodePort,
const char *nodeUser,
List *commandList);
extern void SendCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection,
List *commandList);
extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
List *workerNodeList,
const char *

View File

@ -41,8 +41,13 @@ SELECT * FROM shards_in_workers;
103 | worker1
(4 rows)
-- failure on creating the subscription
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").kill()');
-- Failure on creating the subscription
-- Failing exactly on CREATE SUBSCRIPTION is causing flaky test where we fail with either:
-- 1) ERROR: connection to the remote node localhost:xxxxx failed with the following error: ERROR: subscription "citus_shard_move_subscription_xxxxxxx" does not exist
-- another command is already in progress
-- 2) ERROR: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
-- Instead fail on the next step (ALTER SUBSCRIPTION) instead which is also required logically as part of uber CREATE SUBSCRIPTION operation.
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()');
mitmproxy
---------------------------------------------------------------------

View File

@ -32,8 +32,14 @@ INSERT INTO t SELECT x, x+1, MD5(random()::text) FROM generate_series(1,100000)
-- Initial shard placements
SELECT * FROM shards_in_workers;
-- failure on creating the subscription
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").kill()');
-- Failure on creating the subscription
-- Failing exactly on CREATE SUBSCRIPTION is causing flaky test where we fail with either:
-- 1) ERROR: connection to the remote node localhost:xxxxx failed with the following error: ERROR: subscription "citus_shard_move_subscription_xxxxxxx" does not exist
-- another command is already in progress
-- 2) ERROR: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
-- Instead fail on the next step (ALTER SUBSCRIPTION) instead which is also required logically as part of uber CREATE SUBSCRIPTION operation.
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- Verify that the shard is not moved and the number of rows are still 100k