Fix build

example-fix-logical-rep-cleanup
Ahmet Gedemenli 2022-09-12 12:44:23 +03:00
parent cc1c09050e
commit 79923d60ac
2 changed files with 28 additions and 26 deletions

View File

@ -91,7 +91,7 @@ static void CopyShardTablesViaLogicalReplication(List *shardIntervalList,
int32 sourceNodePort,
char *targetNodeName,
int32 targetNodePort);
static void AcquireLogicalReplicationLock(void);
static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort);
@ -1182,7 +1182,9 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa
int32 targetNodePort)
{
AcquireLogicalReplicationLock();
elog(WARNING,"before drop leftovers");
DropAllLogicalReplicationLeftovers(SHARD_MOVE);
elog(WARNING,"after drop leftovers");
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CopyShardTablesViaLogicalReplication",
@ -1235,6 +1237,25 @@ CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList)
}
/*
* AcquireLogicalReplicationLock tries to acquire a lock for logical
* replication. We need this lock, because at the start of logical replication
* we clean up old subscriptions and publications. Because of this cleanup it's
* not safe to run multiple logical replication based shard moves at the same
* time. If multiple logical replication moves would run at the same time, the
* second move might clean up subscriptions and publications that are in use by
* another move.
*/
static void
AcquireLogicalReplicationLock(void)
{
LOCKTAG tag;
SET_LOCKTAG_LOGICAL_REPLICATION(tag);
LockAcquire(&tag, ExclusiveLock, false, false);
}
/*
* CopyShardTablesViaBlockWrites copies a shard along with its co-located shards
* from a source node to target node via COPY command. While the command is in

View File

@ -148,7 +148,6 @@ static bool RelationSubscriptionsAreReady(
static void WaitForMiliseconds(long timeout);
static XLogRecPtr GetSubscriptionPosition(
GroupedLogicalRepTargets *groupedLogicalRepTargets);
static void AcquireLogicalReplicationLock(void);
static void DropSubscription(MultiConnection *connection,
char *subscriptionName);
static void DropPublication(MultiConnection *connection, char *publicationName);
@ -321,9 +320,10 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
* the constraints earlier.
*/
CreateForeignConstraintsToReferenceTable(logicalRepTargetList);
elog(WARNING,"before drop subs - try");
/* we're done, cleanup the publication and subscription */
DropSubscriptions(logicalRepTargetList);
elog(WARNING,"after drop subs - try");
DropReplicationSlots(sourceConnection, logicalRepTargetList);
DropPublications(sourceConnection, publicationInfoHash);
@ -347,11 +347,11 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
*/
/* reconnect if the connection failed or is waiting for a command */
RecreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash,
superUser, databaseName);
// RecreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash,
// superUser, databaseName);
elog(WARNING,"before drop subs - catch");
DropSubscriptions(logicalRepTargetList);
elog(WARNING,"after drop subs - catch");
/* reconnect if the connection failed or is waiting for a command */
if (PQstatus(sourceConnection->pgConn) != CONNECTION_OK ||
PQisBusy(sourceConnection->pgConn))
@ -498,25 +498,6 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList)
}
/*
* AcquireLogicalReplicationLock tries to acquire a lock for logical
* replication. We need this lock, because at the start of logical replication
* we clean up old subscriptions and publications. Because of this cleanup it's
* not safe to run multiple logical replication based shard moves at the same
* time. If multiple logical replication moves would run at the same time, the
* second move might clean up subscriptions and publications that are in use by
* another move.
*/
static void
AcquireLogicalReplicationLock(void)
{
LOCKTAG tag;
SET_LOCKTAG_LOGICAL_REPLICATION(tag);
LockAcquire(&tag, ExclusiveLock, false, false);
}
/*
* DropAllLogicalReplicationLeftovers drops all subscriptions, publications,
* roles and replication slots on all nodes that were related to this