From cc1c09050e98bd36eb2165b0adbd246a86324f38 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 9 Sep 2022 13:19:04 +0200 Subject: [PATCH] POC fix of issu --- src/backend/distributed/operations/repair_shards.c | 3 +++ .../distributed/replication/multi_logical_replication.c | 3 --- src/test/regress/sql/failure_online_move_shard_placement.sql | 2 +- src/test/regress/sql/logical_replication.sql | 1 + 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 4388b86fd..1bfdc8ad0 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -1181,6 +1181,9 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa int32 sourceNodePort, char *targetNodeName, int32 targetNodePort) { + AcquireLogicalReplicationLock(); + DropAllLogicalReplicationLeftovers(SHARD_MOVE); + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaLogicalReplication", ALLOCSET_DEFAULT_SIZES); diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index f91025b2e..e4cecd650 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -184,7 +184,6 @@ void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort) { - AcquireLogicalReplicationLock(); char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); int connectionFlags = FORCE_NEW_CONNECTION; @@ -196,8 +195,6 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo return; } - DropAllLogicalReplicationLeftovers(SHARD_MOVE); - MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, sourceNodeName, sourceNodePort, superUser, databaseName); diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index 282a2895c..f189a805d 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -90,7 +90,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").can SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -- failure on dropping subscription -SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="ENABLE SUBSCRIPTION|DROP SUBSCRIPTION").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -- cancellation on dropping subscription diff --git a/src/test/regress/sql/logical_replication.sql b/src/test/regress/sql/logical_replication.sql index 94b08a5d1..6730a6b24 100644 --- a/src/test/regress/sql/logical_replication.sql +++ b/src/test/regress/sql/logical_replication.sql @@ -33,6 +33,7 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid CONNECTION :connection_string PUBLICATION citus_shard_move_publication_:postgres_oid WITH (enabled=false, slot_name=citus_shard_move_slot_:postgres_oid); +DROP TABLE dist_6830000; SELECT count(*) from pg_subscription;