From ad240b29041e5510b63703623f3f88e9695f6182 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Thu, 4 Aug 2022 12:42:53 +0530 Subject: [PATCH] Reorder split workflow to create empty publications first and then alter to add actual shards. --- .../distributed/operations/shard_split.c | 74 +++++++++------- .../shardsplit_logical_replication.c | 86 +++++++++++++------ .../shardsplit_logical_replication.h | 13 +-- 3 files changed, 107 insertions(+), 66 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 426d066c2..a0ae6ecc4 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1302,13 +1302,30 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* Non-Blocking shard split workflow starts here */ PG_TRY(); { - /* 1) Physically create split children. */ + /* + * 1) Create empty publications. Tables will be added after + * template replication slot and split shards are created. + */ + CreateShardSplitEmptyPublications(sourceConnection, + shardSplitHashMapForPublication); + + /* + * 2) Create template replication Slot. It returns a snapshot. The snapshot remains + * valid till the lifetime of the session that creates it. The connection is closed + * at the end of the workflow. + */ + MultiConnection *templateSlotConnection = NULL; + char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( + shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection); + + + /* 3) Physically create split children. */ CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, shardGroupSplitIntervalListList, workersForPlacementList); /* - * 2) Create dummy shards due logical replication constraints. + * 4) Create dummy shards due to PG logical replication constraints. * Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth * information. */ @@ -1323,25 +1340,16 @@ NonBlockingShardSplit(SplitOperation splitOperation, shardGroupSplitIntervalListList, workersForPlacementList); - /* 3) Create Publications. */ - CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); - - /* - * 4) Create template replication Slot. It returns a snapshot. The snapshot remains - * valid till the lifetime of the session that creates it. The connection is closed - * at the end of the workflow. - */ - MultiConnection *templateSlotConnection = NULL; - char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( - shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection); + /* 5) Alter Publications and add split shards for logical replication */ + AlterShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); - /* 5) Do snapshotted Copy */ + /* 6) Do snapshotted Copy */ DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList, snapShotName); - /* 6) Execute 'worker_split_shard_replication_setup UDF */ + /* 7) Execute 'worker_split_shard_replication_setup UDF */ List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF( sourceShardToCopyNode, sourceColocatedShardIntervalList, @@ -1362,13 +1370,13 @@ NonBlockingShardSplit(SplitOperation splitOperation, connectionFlags, superUser, databaseName); - /* 7) Create copies of template replication slot */ + /* 8) Create copies of template replication slot */ char *templateSlotName = ShardSplitTemplateReplicationSlotName( shardIntervalToSplit->shardId); CreateReplicationSlots(sourceConnection, templateSlotName, shardSplitSubscribersMetadataList); - /* 8) Create subscriptions on target nodes */ + /* 9) Create subscriptions on target nodes */ CreateShardSplitSubscriptions(targetNodeConnectionList, shardSplitSubscribersMetadataList, sourceShardToCopyNode, @@ -1378,40 +1386,40 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* Used for testing */ ConflictOnlyWithIsolationTesting(); - /* 9) Wait for subscriptions to be ready */ + /* 10) Wait for subscriptions to be ready */ WaitForShardSplitRelationSubscriptionsBecomeReady( shardSplitSubscribersMetadataList); - /* 10) Wait for subscribers to catchup till source LSN */ + /* 11) Wait for subscribers to catchup till source LSN */ XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, shardSplitSubscribersMetadataList); - /* 11) Create Auxilary structures */ + /* 12) Create Auxilary structures */ CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, workersForPlacementList, false /* includeReplicaIdentity*/); - /* 12) Wait for subscribers to catchup till source LSN */ + /* 13) Wait for subscribers to catchup till source LSN */ sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, shardSplitSubscribersMetadataList); - /* 13) Block writes on source shards */ + /* 14) Block writes on source shards */ BlockWritesToShardList(sourceColocatedShardIntervalList); - /* 14) Wait for subscribers to catchup till source LSN */ + /* 15) Wait for subscribers to catchup till source LSN */ sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, shardSplitSubscribersMetadataList); - /* 15) Drop Subscribers */ + /* 16) Drop Subscribers */ DropShardSplitSubsriptions(shardSplitSubscribersMetadataList); - /* 16) Drop Publications */ + /* 17) Drop Publications */ DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); - /* 17) Drop replication slots + /* 18) Drop replication slots * Drop template and subscriber replication slots */ DropShardReplicationSlot(sourceConnection, ShardSplitTemplateReplicationSlotName( @@ -1419,13 +1427,13 @@ NonBlockingShardSplit(SplitOperation splitOperation, DropShardSplitReplicationSlots(sourceConnection, replicationSlotInfoList); /* - * 18) Drop old shards and delete related metadata. Have to do that before + * 19) Drop old shards and delete related metadata. Have to do that before * creating the new shard metadata, because there's cross-checks * preventing inconsistent metadata (like overlapping shards). */ DropShardList(sourceColocatedShardIntervalList); - /* 19) Insert new shard and placement metdata */ + /* 20) Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); @@ -1433,7 +1441,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, workersForPlacementList); /* - * 20) Create foreign keys if exists after the metadata changes happening in + * 21) Create foreign keys if exists after the metadata changes happening in * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign * key creation depends on the new metadata. */ @@ -1441,17 +1449,17 @@ NonBlockingShardSplit(SplitOperation splitOperation, workersForPlacementList); /* - * 21) Drop dummy shards. + * 22) Drop dummy shards. */ DropDummyShards(mapOfDummyShardToPlacement); - /* 22) Close source connection */ + /* 23) Close source connection */ CloseConnection(sourceConnection); - /* 23) Close all subscriber connections */ + /* 24) Close all subscriber connections */ CloseShardSplitSubscriberConnections(shardSplitSubscribersMetadataList); - /* 24) Close connection of template replication slot */ + /* 25) Close connection of template replication slot */ CloseConnection(templateSlotConnection); } PG_CATCH(); diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index d410296be..616703d7b 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -33,21 +33,20 @@ static HTAB *ShardInfoHashMapForPublications = NULL; static void AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, bool isChildShardInterval); +static void AlterShardSplitPublicationForNode(MultiConnection *connection, + List *shardList, + uint32_t publicationForTargetNodeId, Oid + ownerId); ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId, List * replicationSlotInfoList); -static void CreateShardSplitPublicationForNode(MultiConnection *connection, - List *shardList, - uint32_t publicationForTargetNodeId, Oid - tableOwner); static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); static void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection); static void DropAllShardSplitPublications(MultiConnection *cleanupConnection); static void DropAllShardSplitUsers(MultiConnection *cleanupConnection); static void DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection); - /* * CreateShardSplitInfoMapForPublication creates a hashmap that groups * shards for creating publications and subscriptions. @@ -175,7 +174,44 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, /* - * CreateShardSplitPublications creates publications on the source node. + * CreateShardSplitEmptyPublications creates empty publications on the source node. + * Due to a sporadic bug in PG, we have to create publications before we create replication slot. + * After the template replication slot is created, these empty publications are altered + * with actual tables to be replicated. + * More details about the bug can be found in the below mailing link. + * (https://www.postgresql.org/message-id/20191010115752.2d0f27af%40firost). + * + * We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications + * related to split operations. + */ +void +CreateShardSplitEmptyPublications(MultiConnection *sourceConnection, + HTAB *shardInfoHashMapForPublication) +{ + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardInfoHashMapForPublication); + + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + uint32 nodeId = entry->key.nodeId; + uint32 tableOwnerId = entry->key.tableOwnerId; + + StringInfo createEmptyPublicationCommand = makeStringInfo(); + appendStringInfo(createEmptyPublicationCommand, "CREATE PUBLICATION %s", + ShardSplitPublicationName(nodeId, tableOwnerId)); + + ExecuteCriticalRemoteCommand(sourceConnection, + createEmptyPublicationCommand->data); + pfree(createEmptyPublicationCommand->data); + pfree(createEmptyPublicationCommand); + } +} + + +/* + * AlterShardSplitPublications alters publications on the source node. + * It adds split shards for logical replication. * * sourceConnection - Connection of source node. * @@ -184,8 +220,8 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, * ShardIntervals mapped by key. */ void -CreateShardSplitPublications(MultiConnection *sourceConnection, - HTAB *shardInfoHashMapForPublication) +AlterShardSplitPublications(MultiConnection *sourceConnection, + HTAB *shardInfoHashMapForPublication) { HASH_SEQ_STATUS status; hash_seq_init(&status, shardInfoHashMapForPublication); @@ -196,30 +232,26 @@ CreateShardSplitPublications(MultiConnection *sourceConnection, uint32 nodeId = entry->key.nodeId; uint32 tableOwnerId = entry->key.tableOwnerId; List *shardListForPublication = entry->shardSplitInfoList; - - /* Create publication on shard list */ - CreateShardSplitPublicationForNode(sourceConnection, - shardListForPublication, - nodeId, - tableOwnerId); + AlterShardSplitPublicationForNode(sourceConnection, + shardListForPublication, + nodeId, + tableOwnerId); } } /* - * CreateShardSplitPublicationForNode creates a publication on source node - * for given shard list. - * We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications - * related to split operations. + * AlterShardSplitPublicationForNode adds shards that have to be replicated + * for a given publication. */ static void -CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, - uint32_t publicationForTargetNodeId, Oid ownerId) +AlterShardSplitPublicationForNode(MultiConnection *connection, List *shardList, + uint32_t publicationForTargetNodeId, Oid ownerId) { - StringInfo createPublicationCommand = makeStringInfo(); + StringInfo alterPublicationCommand = makeStringInfo(); bool prefixWithComma = false; - appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ", + appendStringInfo(alterPublicationCommand, "ALTER PUBLICATION %s ADD TABLE ", ShardSplitPublicationName(publicationForTargetNodeId, ownerId)); ShardInterval *shard = NULL; @@ -229,16 +261,16 @@ CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, if (prefixWithComma) { - appendStringInfoString(createPublicationCommand, ","); + appendStringInfoString(alterPublicationCommand, ","); } - appendStringInfoString(createPublicationCommand, shardName); + appendStringInfoString(alterPublicationCommand, shardName); prefixWithComma = true; } - ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data); - pfree(createPublicationCommand->data); - pfree(createPublicationCommand); + ExecuteCriticalRemoteCommand(connection, alterPublicationCommand->data); + pfree(alterPublicationCommand->data); + pfree(alterPublicationCommand); } diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index f0c2568e6..566e10418 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -71,8 +71,8 @@ extern HTAB * CreateShardSplitInfoMapForPublication( List *destinationWorkerNodesList); /* Functions for creating publications and subscriptions*/ -extern void CreateShardSplitPublications(MultiConnection *sourceConnection, - HTAB *shardInfoHashMapForPublication); +extern void AlterShardSplitPublications(MultiConnection *sourceConnection, + HTAB *shardInfoHashMapForPublication); extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList, List *shardSplitSubscriberMetadataList, WorkerNode *sourceWorkerNode, char *superUser, @@ -87,6 +87,11 @@ extern List * CreateTargetNodeConnectionsForShardSplit( char *databaseName); /* Functions to drop publisher-subscriber resources */ +extern void CreateShardSplitEmptyPublications(MultiConnection *sourceConnection, + HTAB *shardInfoHashMapForPublication); +extern char * CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit, + MultiConnection * + sourceConnection); extern void DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitMapOfPublications); extern void DropShardSplitPublications(MultiConnection *sourceConnection, @@ -94,9 +99,6 @@ extern void DropShardSplitPublications(MultiConnection *sourceConnection, extern void DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList); extern void DropShardSplitReplicationSlots(MultiConnection *sourceConnection, List *replicationSlotInfoList); -extern char * CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit, - MultiConnection * - sourceConnection); extern void DisableAndDropShardSplitSubscription(MultiConnection *connection, char *subscriptionName); @@ -108,6 +110,5 @@ extern void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr source shardSplitPubSubMetadataList); extern char * ShardSplitTemplateReplicationSlotName(uint64 shardId); - extern void CloseShardSplitSubscriberConnections(List *shardSplitSubscriberMetadataList); #endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */