diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index bf856a997..f370fb5f5 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -1182,9 +1182,6 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa int32 targetNodePort) { AcquireLogicalReplicationLock(); -elog(LOG,"before drop leftovers"); - DropAllLogicalReplicationLeftovers(SHARD_MOVE); -elog(LOG,"after drop leftovers"); MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaLogicalReplication", @@ -1213,6 +1210,10 @@ elog(LOG,"after drop leftovers"); MemoryContextReset(localContext); } + elog(LOG, "before drop leftovers"); + DropAllLogicalReplicationLeftovers(SHARD_MOVE); + elog(LOG, "after drop leftovers"); + MemoryContextSwitchTo(oldContext); /* data copy is done seperately when logical replication is used */ diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index bfedb998b..0374dc76f 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -320,10 +320,11 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo * the constraints earlier. */ CreateForeignConstraintsToReferenceTable(logicalRepTargetList); - elog(LOG,"before drop subs - try"); + elog(LOG, "before drop subs - try"); + /* we're done, cleanup the publication and subscription */ DropSubscriptions(logicalRepTargetList); - elog(LOG,"after drop subs - try"); + elog(LOG, "after drop subs - try"); DropReplicationSlots(sourceConnection, logicalRepTargetList); DropPublications(sourceConnection, publicationInfoHash); @@ -347,11 +348,13 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo */ /* reconnect if the connection failed or is waiting for a command */ - // RecreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash, - // superUser, databaseName); - elog(LOG,"before drop subs - catch"); + RecreateGroupedLogicalRepTargetsConnections( + groupedLogicalRepTargetsHash, + superUser, databaseName); + elog(LOG, "before drop subs - catch"); DropSubscriptions(logicalRepTargetList); - elog(LOG,"after drop subs - catch"); + elog(LOG, "after drop subs - catch"); + /* reconnect if the connection failed or is waiting for a command */ if (PQstatus(sourceConnection->pgConn) != CONNECTION_OK || PQisBusy(sourceConnection->pgConn)) @@ -1838,6 +1841,7 @@ void EnableSubscriptions(List *logicalRepTargetList) { LogicalRepTarget *target = NULL; + elog(LOG, "before enable subscriptions"); foreach_ptr(target, logicalRepTargetList) { ExecuteCriticalRemoteCommand(target->superuserConnection, psprintf( @@ -1845,6 +1849,7 @@ EnableSubscriptions(List *logicalRepTargetList) target->subscriptionName )); } + elog(LOG, "after enable subscriptions"); } diff --git a/src/test/regress/expected/failure_online_move_shard_placement.out b/src/test/regress/expected/failure_online_move_shard_placement.out index 0ea70a2eb..275df9c6f 100644 --- a/src/test/regress/expected/failure_online_move_shard_placement.out +++ b/src/test/regress/expected/failure_online_move_shard_placement.out @@ -152,6 +152,63 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").cancel SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request +SET client_min_messages TO LOG; +-- failure on dropping subscription +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +LOG: before drop leftovers +LOG: after drop leftovers +LOG: before enable subscriptions +LOG: before drop subs - catch +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx +-- try again +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +LOG: before drop leftovers +LOG: after drop leftovers +LOG: before enable subscriptions +LOG: after enable subscriptions +LOG: The states of the relations belonging to the subscriptions became READY on the target node localhost:xxxxx +LOG: The states of all subscriptions have become READY +LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN +LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN +LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN +LOG: before drop subs - try +LOG: after drop subs - try + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port); +LOG: before drop leftovers +LOG: after drop leftovers +LOG: before enable subscriptions +LOG: after enable subscriptions +LOG: The states of the relations belonging to the subscriptions became READY on the target node localhost:xxxxx +LOG: The states of all subscriptions have become READY +LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN +LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN +LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN +LOG: before drop subs - try +LOG: after drop subs - try + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +RESET client_min_messages; -- failure on disabling subscription (right before dropping it) SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kill()'); mitmproxy @@ -171,16 +228,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").can SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request --- failure on dropping subscription -SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -ERROR: connection not open -CONTEXT: while executing command on localhost:xxxxx -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); mitmproxy diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index 9388d4ebc..2aa45f18c 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -81,6 +81,16 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").cancel(' || :pid || ')'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +SET client_min_messages TO LOG; +-- failure on dropping subscription +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").kill()'); +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +-- try again +SELECT citus.mitmproxy('conn.allow()'); +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port); +RESET client_min_messages; + -- failure on disabling subscription (right before dropping it) SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); @@ -89,15 +99,6 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").cancel(' || :pid || ')'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -SET client_min_messages TO LOG; --- failure on dropping subscription -SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION").after(2).kill()'); -SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); --- try again -SELECT citus.mitmproxy('conn.onQuery(query="nonexistingquery").kill()'); -SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -RESET client_min_messages; - -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);