Some changes to try and reproduce failure, before fixing it

example-fix-logical-rep-cleanup
Jelte Fennema 2022-09-12 12:40:10 +02:00
parent 0f230ec98f
commit 1480aca6a4
4 changed files with 82 additions and 28 deletions

View File

@ -1182,9 +1182,6 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa
int32 targetNodePort)
{
AcquireLogicalReplicationLock();
elog(LOG,"before drop leftovers");
DropAllLogicalReplicationLeftovers(SHARD_MOVE);
elog(LOG,"after drop leftovers");
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CopyShardTablesViaLogicalReplication",
@ -1213,6 +1210,10 @@ elog(LOG,"after drop leftovers");
MemoryContextReset(localContext);
}
elog(LOG, "before drop leftovers");
DropAllLogicalReplicationLeftovers(SHARD_MOVE);
elog(LOG, "after drop leftovers");
MemoryContextSwitchTo(oldContext);
/* data copy is done seperately when logical replication is used */

View File

@ -321,6 +321,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
*/
CreateForeignConstraintsToReferenceTable(logicalRepTargetList);
elog(LOG, "before drop subs - try");
/* we're done, cleanup the publication and subscription */
DropSubscriptions(logicalRepTargetList);
elog(LOG, "after drop subs - try");
@ -347,11 +348,13 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
*/
/* reconnect if the connection failed or is waiting for a command */
// RecreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash,
// superUser, databaseName);
RecreateGroupedLogicalRepTargetsConnections(
groupedLogicalRepTargetsHash,
superUser, databaseName);
elog(LOG, "before drop subs - catch");
DropSubscriptions(logicalRepTargetList);
elog(LOG, "after drop subs - catch");
/* reconnect if the connection failed or is waiting for a command */
if (PQstatus(sourceConnection->pgConn) != CONNECTION_OK ||
PQisBusy(sourceConnection->pgConn))
@ -1838,6 +1841,7 @@ void
EnableSubscriptions(List *logicalRepTargetList)
{
LogicalRepTarget *target = NULL;
elog(LOG, "before enable subscriptions");
foreach_ptr(target, logicalRepTargetList)
{
ExecuteCriticalRemoteCommand(target->superuserConnection, psprintf(
@ -1845,6 +1849,7 @@ EnableSubscriptions(List *logicalRepTargetList)
target->subscriptionName
));
}
elog(LOG, "after enable subscriptions");
}

View File

@ -152,6 +152,63 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").cancel
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SET client_min_messages TO LOG;
-- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
LOG: before drop leftovers
LOG: after drop leftovers
LOG: before enable subscriptions
LOG: before drop subs - catch
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- try again
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
LOG: before drop leftovers
LOG: after drop leftovers
LOG: before enable subscriptions
LOG: after enable subscriptions
LOG: The states of the relations belonging to the subscriptions became READY on the target node localhost:xxxxx
LOG: The states of all subscriptions have become READY
LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN
LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN
LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN
LOG: before drop subs - try
LOG: after drop subs - try
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port);
LOG: before drop leftovers
LOG: after drop leftovers
LOG: before enable subscriptions
LOG: after enable subscriptions
LOG: The states of the relations belonging to the subscriptions became READY on the target node localhost:xxxxx
LOG: The states of all subscriptions have become READY
LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN
LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN
LOG: The LSN of the target subscriptions on node localhost:xxxxx have caught up with the source LSN
LOG: before drop subs - try
LOG: after drop subs - try
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
RESET client_min_messages;
-- failure on disabling subscription (right before dropping it)
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kill()');
mitmproxy
@ -171,16 +228,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").can
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
-- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')');
mitmproxy

View File

@ -81,6 +81,16 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").cancel(' || :pid || ')');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
SET client_min_messages TO LOG;
-- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- try again
SELECT citus.mitmproxy('conn.allow()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port);
RESET client_min_messages;
-- failure on disabling subscription (right before dropping it)
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
@ -89,15 +99,6 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").cancel(' || :pid || ')');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
SET client_min_messages TO LOG;
-- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION").after(2).kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- try again
SELECT citus.mitmproxy('conn.onQuery(query="nonexistingquery").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
RESET client_min_messages;
-- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);