From 79923d60ac0c51aa030d0006c448fdac0e848960 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Mon, 12 Sep 2022 12:44:23 +0300 Subject: [PATCH] Fix build --- .../distributed/operations/repair_shards.c | 23 +++++++++++++- .../replication/multi_logical_replication.c | 31 ++++--------------- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 1bfdc8ad0..815ecce91 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -91,7 +91,7 @@ static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); - +static void AcquireLogicalReplicationLock(void); static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort); @@ -1182,7 +1182,9 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa int32 targetNodePort) { AcquireLogicalReplicationLock(); +elog(WARNING,"before drop leftovers"); DropAllLogicalReplicationLeftovers(SHARD_MOVE); +elog(WARNING,"after drop leftovers"); MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CopyShardTablesViaLogicalReplication", @@ -1235,6 +1237,25 @@ CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList) } +/* + * AcquireLogicalReplicationLock tries to acquire a lock for logical + * replication. We need this lock, because at the start of logical replication + * we clean up old subscriptions and publications. Because of this cleanup it's + * not safe to run multiple logical replication based shard moves at the same + * time. If multiple logical replication moves would run at the same time, the + * second move might clean up subscriptions and publications that are in use by + * another move. + */ +static void +AcquireLogicalReplicationLock(void) +{ + LOCKTAG tag; + SET_LOCKTAG_LOGICAL_REPLICATION(tag); + + LockAcquire(&tag, ExclusiveLock, false, false); +} + + /* * CopyShardTablesViaBlockWrites copies a shard along with its co-located shards * from a source node to target node via COPY command. While the command is in diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index e4cecd650..26fe9af38 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -148,7 +148,6 @@ static bool RelationSubscriptionsAreReady( static void WaitForMiliseconds(long timeout); static XLogRecPtr GetSubscriptionPosition( GroupedLogicalRepTargets *groupedLogicalRepTargets); -static void AcquireLogicalReplicationLock(void); static void DropSubscription(MultiConnection *connection, char *subscriptionName); static void DropPublication(MultiConnection *connection, char *publicationName); @@ -321,9 +320,10 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo * the constraints earlier. */ CreateForeignConstraintsToReferenceTable(logicalRepTargetList); - + elog(WARNING,"before drop subs - try"); /* we're done, cleanup the publication and subscription */ DropSubscriptions(logicalRepTargetList); + elog(WARNING,"after drop subs - try"); DropReplicationSlots(sourceConnection, logicalRepTargetList); DropPublications(sourceConnection, publicationInfoHash); @@ -347,11 +347,11 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo */ /* reconnect if the connection failed or is waiting for a command */ - RecreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash, - superUser, databaseName); - + // RecreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash, + // superUser, databaseName); + elog(WARNING,"before drop subs - catch"); DropSubscriptions(logicalRepTargetList); - + elog(WARNING,"after drop subs - catch"); /* reconnect if the connection failed or is waiting for a command */ if (PQstatus(sourceConnection->pgConn) != CONNECTION_OK || PQisBusy(sourceConnection->pgConn)) @@ -498,25 +498,6 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList) } -/* - * AcquireLogicalReplicationLock tries to acquire a lock for logical - * replication. We need this lock, because at the start of logical replication - * we clean up old subscriptions and publications. Because of this cleanup it's - * not safe to run multiple logical replication based shard moves at the same - * time. If multiple logical replication moves would run at the same time, the - * second move might clean up subscriptions and publications that are in use by - * another move. - */ -static void -AcquireLogicalReplicationLock(void) -{ - LOCKTAG tag; - SET_LOCKTAG_LOGICAL_REPLICATION(tag); - - LockAcquire(&tag, ExclusiveLock, false, false); -} - - /* * DropAllLogicalReplicationLeftovers drops all subscriptions, publications, * roles and replication slots on all nodes that were related to this