Reorder split workflow to create empty publications first and then alter to add actual shards.

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-08-04 12:42:53 +05:30
parent 7d64ddf4c2
commit ad240b2904
3 changed files with 107 additions and 66 deletions

View File

@ -1302,13 +1302,30 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* Non-Blocking shard split workflow starts here */
PG_TRY();
{
/* 1) Physically create split children. */
/*
* 1) Create empty publications. Tables will be added after
* template replication slot and split shards are created.
*/
CreateShardSplitEmptyPublications(sourceConnection,
shardSplitHashMapForPublication);
/*
* 2) Create template replication Slot. It returns a snapshot. The snapshot remains
* valid till the lifetime of the session that creates it. The connection is closed
* at the end of the workflow.
*/
MultiConnection *templateSlotConnection = NULL;
char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot(
shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection);
/* 3) Physically create split children. */
CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow,
shardGroupSplitIntervalListList,
workersForPlacementList);
/*
* 2) Create dummy shards due logical replication constraints.
* 4) Create dummy shards due to PG logical replication constraints.
* Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth
* information.
*/
@ -1323,25 +1340,16 @@ NonBlockingShardSplit(SplitOperation splitOperation,
shardGroupSplitIntervalListList, workersForPlacementList);
/* 3) Create Publications. */
CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/*
* 4) Create template replication Slot. It returns a snapshot. The snapshot remains
* valid till the lifetime of the session that creates it. The connection is closed
* at the end of the workflow.
*/
MultiConnection *templateSlotConnection = NULL;
char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot(
shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection);
/* 5) Alter Publications and add split shards for logical replication */
AlterShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/* 5) Do snapshotted Copy */
/* 6) Do snapshotted Copy */
DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList,
snapShotName);
/* 6) Execute 'worker_split_shard_replication_setup UDF */
/* 7) Execute 'worker_split_shard_replication_setup UDF */
List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF(
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
@ -1362,13 +1370,13 @@ NonBlockingShardSplit(SplitOperation splitOperation,
connectionFlags,
superUser, databaseName);
/* 7) Create copies of template replication slot */
/* 8) Create copies of template replication slot */
char *templateSlotName = ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId);
CreateReplicationSlots(sourceConnection, templateSlotName,
shardSplitSubscribersMetadataList);
/* 8) Create subscriptions on target nodes */
/* 9) Create subscriptions on target nodes */
CreateShardSplitSubscriptions(targetNodeConnectionList,
shardSplitSubscribersMetadataList,
sourceShardToCopyNode,
@ -1378,40 +1386,40 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* Used for testing */
ConflictOnlyWithIsolationTesting();
/* 9) Wait for subscriptions to be ready */
/* 10) Wait for subscriptions to be ready */
WaitForShardSplitRelationSubscriptionsBecomeReady(
shardSplitSubscribersMetadataList);
/* 10) Wait for subscribers to catchup till source LSN */
/* 11) Wait for subscribers to catchup till source LSN */
XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
/* 11) Create Auxilary structures */
/* 12) Create Auxilary structures */
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList,
false /* includeReplicaIdentity*/);
/* 12) Wait for subscribers to catchup till source LSN */
/* 13) Wait for subscribers to catchup till source LSN */
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
/* 13) Block writes on source shards */
/* 14) Block writes on source shards */
BlockWritesToShardList(sourceColocatedShardIntervalList);
/* 14) Wait for subscribers to catchup till source LSN */
/* 15) Wait for subscribers to catchup till source LSN */
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
/* 15) Drop Subscribers */
/* 16) Drop Subscribers */
DropShardSplitSubsriptions(shardSplitSubscribersMetadataList);
/* 16) Drop Publications */
/* 17) Drop Publications */
DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/* 17) Drop replication slots
/* 18) Drop replication slots
* Drop template and subscriber replication slots
*/
DropShardReplicationSlot(sourceConnection, ShardSplitTemplateReplicationSlotName(
@ -1419,13 +1427,13 @@ NonBlockingShardSplit(SplitOperation splitOperation,
DropShardSplitReplicationSlots(sourceConnection, replicationSlotInfoList);
/*
* 18) Drop old shards and delete related metadata. Have to do that before
* 19) Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards).
*/
DropShardList(sourceColocatedShardIntervalList);
/* 19) Insert new shard and placement metdata */
/* 20) Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
@ -1433,7 +1441,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
workersForPlacementList);
/*
* 20) Create foreign keys if exists after the metadata changes happening in
* 21) Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
@ -1441,17 +1449,17 @@ NonBlockingShardSplit(SplitOperation splitOperation,
workersForPlacementList);
/*
* 21) Drop dummy shards.
* 22) Drop dummy shards.
*/
DropDummyShards(mapOfDummyShardToPlacement);
/* 22) Close source connection */
/* 23) Close source connection */
CloseConnection(sourceConnection);
/* 23) Close all subscriber connections */
/* 24) Close all subscriber connections */
CloseShardSplitSubscriberConnections(shardSplitSubscribersMetadataList);
/* 24) Close connection of template replication slot */
/* 25) Close connection of template replication slot */
CloseConnection(templateSlotConnection);
}
PG_CATCH();

View File

@ -33,21 +33,20 @@ static HTAB *ShardInfoHashMapForPublications = NULL;
static void AddPublishableShardEntryInMap(uint32 targetNodeId,
ShardInterval *shardInterval, bool
isChildShardInterval);
static void AlterShardSplitPublicationForNode(MultiConnection *connection,
List *shardList,
uint32_t publicationForTargetNodeId, Oid
ownerId);
ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32
nodeId,
List *
replicationSlotInfoList);
static void CreateShardSplitPublicationForNode(MultiConnection *connection,
List *shardList,
uint32_t publicationForTargetNodeId, Oid
tableOwner);
static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId);
static void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection);
static void DropAllShardSplitPublications(MultiConnection *cleanupConnection);
static void DropAllShardSplitUsers(MultiConnection *cleanupConnection);
static void DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection);
/*
* CreateShardSplitInfoMapForPublication creates a hashmap that groups
* shards for creating publications and subscriptions.
@ -175,7 +174,44 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval,
/*
* CreateShardSplitPublications creates publications on the source node.
* CreateShardSplitEmptyPublications creates empty publications on the source node.
* Due to a sporadic bug in PG, we have to create publications before we create replication slot.
* After the template replication slot is created, these empty publications are altered
* with actual tables to be replicated.
* More details about the bug can be found in the below mailing link.
* (https://www.postgresql.org/message-id/20191010115752.2d0f27af%40firost).
*
* We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications
* related to split operations.
*/
void
CreateShardSplitEmptyPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMapForPublication);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32 nodeId = entry->key.nodeId;
uint32 tableOwnerId = entry->key.tableOwnerId;
StringInfo createEmptyPublicationCommand = makeStringInfo();
appendStringInfo(createEmptyPublicationCommand, "CREATE PUBLICATION %s",
ShardSplitPublicationName(nodeId, tableOwnerId));
ExecuteCriticalRemoteCommand(sourceConnection,
createEmptyPublicationCommand->data);
pfree(createEmptyPublicationCommand->data);
pfree(createEmptyPublicationCommand);
}
}
/*
* AlterShardSplitPublications alters publications on the source node.
* It adds split shards for logical replication.
*
* sourceConnection - Connection of source node.
*
@ -184,8 +220,8 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval,
* ShardIntervals mapped by key.
*/
void
CreateShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication)
AlterShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMapForPublication);
@ -196,30 +232,26 @@ CreateShardSplitPublications(MultiConnection *sourceConnection,
uint32 nodeId = entry->key.nodeId;
uint32 tableOwnerId = entry->key.tableOwnerId;
List *shardListForPublication = entry->shardSplitInfoList;
/* Create publication on shard list */
CreateShardSplitPublicationForNode(sourceConnection,
shardListForPublication,
nodeId,
tableOwnerId);
AlterShardSplitPublicationForNode(sourceConnection,
shardListForPublication,
nodeId,
tableOwnerId);
}
}
/*
* CreateShardSplitPublicationForNode creates a publication on source node
* for given shard list.
* We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications
* related to split operations.
* AlterShardSplitPublicationForNode adds shards that have to be replicated
* for a given publication.
*/
static void
CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList,
uint32_t publicationForTargetNodeId, Oid ownerId)
AlterShardSplitPublicationForNode(MultiConnection *connection, List *shardList,
uint32_t publicationForTargetNodeId, Oid ownerId)
{
StringInfo createPublicationCommand = makeStringInfo();
StringInfo alterPublicationCommand = makeStringInfo();
bool prefixWithComma = false;
appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ",
appendStringInfo(alterPublicationCommand, "ALTER PUBLICATION %s ADD TABLE ",
ShardSplitPublicationName(publicationForTargetNodeId, ownerId));
ShardInterval *shard = NULL;
@ -229,16 +261,16 @@ CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList,
if (prefixWithComma)
{
appendStringInfoString(createPublicationCommand, ",");
appendStringInfoString(alterPublicationCommand, ",");
}
appendStringInfoString(createPublicationCommand, shardName);
appendStringInfoString(alterPublicationCommand, shardName);
prefixWithComma = true;
}
ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data);
pfree(createPublicationCommand->data);
pfree(createPublicationCommand);
ExecuteCriticalRemoteCommand(connection, alterPublicationCommand->data);
pfree(alterPublicationCommand->data);
pfree(alterPublicationCommand);
}

View File

@ -71,8 +71,8 @@ extern HTAB * CreateShardSplitInfoMapForPublication(
List *destinationWorkerNodesList);
/* Functions for creating publications and subscriptions*/
extern void CreateShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication);
extern void AlterShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication);
extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList,
List *shardSplitSubscriberMetadataList,
WorkerNode *sourceWorkerNode, char *superUser,
@ -87,6 +87,11 @@ extern List * CreateTargetNodeConnectionsForShardSplit(
char *databaseName);
/* Functions to drop publisher-subscriber resources */
extern void CreateShardSplitEmptyPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication);
extern char * CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit,
MultiConnection *
sourceConnection);
extern void DropAllShardSplitLeftOvers(WorkerNode *sourceNode,
HTAB *shardSplitMapOfPublications);
extern void DropShardSplitPublications(MultiConnection *sourceConnection,
@ -94,9 +99,6 @@ extern void DropShardSplitPublications(MultiConnection *sourceConnection,
extern void DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList);
extern void DropShardSplitReplicationSlots(MultiConnection *sourceConnection,
List *replicationSlotInfoList);
extern char * CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit,
MultiConnection *
sourceConnection);
extern void DisableAndDropShardSplitSubscription(MultiConnection *connection,
char *subscriptionName);
@ -108,6 +110,5 @@ extern void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr source
shardSplitPubSubMetadataList);
extern char * ShardSplitTemplateReplicationSlotName(uint64 shardId);
extern void CloseShardSplitSubscriberConnections(List *shardSplitSubscriberMetadataList);
#endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */