POC fix of issu

example-fix-logical-rep-cleanup
Jelte Fennema 2022-09-09 13:19:04 +02:00
parent 1c5b8588fe
commit cc1c09050e
No known key found for this signature in database
4 changed files with 5 additions and 4 deletions

View File

@ -1181,6 +1181,9 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa
int32 sourceNodePort, char *targetNodeName, int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort) int32 targetNodePort)
{ {
AcquireLogicalReplicationLock();
DropAllLogicalReplicationLeftovers(SHARD_MOVE);
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CopyShardTablesViaLogicalReplication", "CopyShardTablesViaLogicalReplication",
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);

View File

@ -184,7 +184,6 @@ void
LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort,
char *targetNodeName, int targetNodePort) char *targetNodeName, int targetNodePort)
{ {
AcquireLogicalReplicationLock();
char *superUser = CitusExtensionOwnerName(); char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId); char *databaseName = get_database_name(MyDatabaseId);
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
@ -196,8 +195,6 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
return; return;
} }
DropAllLogicalReplicationLeftovers(SHARD_MOVE);
MultiConnection *sourceConnection = MultiConnection *sourceConnection =
GetNodeUserDatabaseConnection(connectionFlags, sourceNodeName, sourceNodePort, GetNodeUserDatabaseConnection(connectionFlags, sourceNodeName, sourceNodePort,
superUser, databaseName); superUser, databaseName);

View File

@ -90,7 +90,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").can
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- failure on dropping subscription -- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="ENABLE SUBSCRIPTION|DROP SUBSCRIPTION").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- cancellation on dropping subscription -- cancellation on dropping subscription

View File

@ -33,6 +33,7 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid
CONNECTION :connection_string CONNECTION :connection_string
PUBLICATION citus_shard_move_publication_:postgres_oid PUBLICATION citus_shard_move_publication_:postgres_oid
WITH (enabled=false, slot_name=citus_shard_move_slot_:postgres_oid); WITH (enabled=false, slot_name=citus_shard_move_slot_:postgres_oid);
DROP TABLE dist_6830000;
SELECT count(*) from pg_subscription; SELECT count(*) from pg_subscription;