From 00c3830bee3e493504be408fdb2728479446e2d1 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Thu, 4 Aug 2022 16:26:15 +0530 Subject: [PATCH] 1)Revert the original order of workflow 2) Introduce GetNextShardIdForSplitChild method --- .../distributed/operations/shard_split.c | 140 ++++++++++++------ .../shardsplit_logical_replication.c | 85 ++++------- .../shardsplit_logical_replication.h | 4 +- 3 files changed, 124 insertions(+), 105 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index a0ae6ecc4..35b20ef70 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -39,6 +39,7 @@ #include "distributed/shardsplit_logical_replication.h" #include "distributed/deparse_shard_query.h" #include "distributed/shard_rebalancer.h" +#include "postmaster/postmaster.h" /* * Entry for map that tracks ShardInterval -> Placement Node @@ -127,6 +128,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId, ShardInterval *shardInterval); static void DropDummyShards(HTAB *mapOfDummyShardToPlacement); static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval); +static uint64 GetNextShardIdForSplitChild(void); /* Customize error message strings based on operation type */ @@ -963,7 +965,7 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard, { ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard); splitChildShardInterval->shardIndex = -1; - splitChildShardInterval->shardId = GetNextShardId(); + splitChildShardInterval->shardId = GetNextShardIdForSplitChild(); splitChildShardInterval->minValueExists = true; splitChildShardInterval->minValue = currentSplitChildMinValue; @@ -1302,30 +1304,13 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* Non-Blocking shard split workflow starts here */ PG_TRY(); { - /* - * 1) Create empty publications. Tables will be added after - * template replication slot and split shards are created. - */ - CreateShardSplitEmptyPublications(sourceConnection, - shardSplitHashMapForPublication); - - /* - * 2) Create template replication Slot. It returns a snapshot. The snapshot remains - * valid till the lifetime of the session that creates it. The connection is closed - * at the end of the workflow. - */ - MultiConnection *templateSlotConnection = NULL; - char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( - shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection); - - - /* 3) Physically create split children. */ + /* 1) Physically create split children. */ CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, shardGroupSplitIntervalListList, workersForPlacementList); /* - * 4) Create dummy shards due to PG logical replication constraints. + * 2) Create dummy shards due to PG logical replication constraints. * Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth * information. */ @@ -1339,17 +1324,24 @@ NonBlockingShardSplit(SplitOperation splitOperation, CreateReplicaIdentities(mapOfDummyShardToPlacement, shardGroupSplitIntervalListList, workersForPlacementList); + /* 3) Create Publications. */ + CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); - /* 5) Alter Publications and add split shards for logical replication */ - AlterShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); + /* + * 4) Create template replication Slot. It returns a snapshot. The snapshot remains + * valid till the lifetime of the session that creates it. The connection is closed + * at the end of the workflow. + */ + MultiConnection *templateSlotConnection = NULL; + char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( + shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection); - - /* 6) Do snapshotted Copy */ + /* 5) Do snapshotted Copy */ DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList, snapShotName); - /* 7) Execute 'worker_split_shard_replication_setup UDF */ + /* 6) Execute 'worker_split_shard_replication_setup UDF */ List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF( sourceShardToCopyNode, sourceColocatedShardIntervalList, @@ -1364,19 +1356,19 @@ NonBlockingShardSplit(SplitOperation splitOperation, PopulateShardSplitSubscriptionsMetadataList( shardSplitHashMapForPublication, replicationSlotInfoList); - /* Create connections to the target nodes. TODO: can be refactored */ + /* Create connections to the target nodes */ List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit( shardSplitSubscribersMetadataList, connectionFlags, superUser, databaseName); - /* 8) Create copies of template replication slot */ + /* 7) Create copies of template replication slot */ char *templateSlotName = ShardSplitTemplateReplicationSlotName( shardIntervalToSplit->shardId); CreateReplicationSlots(sourceConnection, templateSlotName, shardSplitSubscribersMetadataList); - /* 9) Create subscriptions on target nodes */ + /* 8) Create subscriptions on target nodes */ CreateShardSplitSubscriptions(targetNodeConnectionList, shardSplitSubscribersMetadataList, sourceShardToCopyNode, @@ -1386,40 +1378,40 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* Used for testing */ ConflictOnlyWithIsolationTesting(); - /* 10) Wait for subscriptions to be ready */ + /* 9) Wait for subscriptions to be ready */ WaitForShardSplitRelationSubscriptionsBecomeReady( shardSplitSubscribersMetadataList); - /* 11) Wait for subscribers to catchup till source LSN */ + /* 10) Wait for subscribers to catchup till source LSN */ XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, shardSplitSubscribersMetadataList); - /* 12) Create Auxilary structures */ + /* 11) Create Auxilary structures */ CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, workersForPlacementList, false /* includeReplicaIdentity*/); - /* 13) Wait for subscribers to catchup till source LSN */ + /* 12) Wait for subscribers to catchup till source LSN */ sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, shardSplitSubscribersMetadataList); - /* 14) Block writes on source shards */ + /* 13) Block writes on source shards */ BlockWritesToShardList(sourceColocatedShardIntervalList); - /* 15) Wait for subscribers to catchup till source LSN */ + /* 14) Wait for subscribers to catchup till source LSN */ sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, shardSplitSubscribersMetadataList); - /* 16) Drop Subscribers */ + /* 15) Drop Subscribers */ DropShardSplitSubsriptions(shardSplitSubscribersMetadataList); - /* 17) Drop Publications */ + /* 16) Drop Publications */ DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); - /* 18) Drop replication slots + /* 17) Drop replication slots * Drop template and subscriber replication slots */ DropShardReplicationSlot(sourceConnection, ShardSplitTemplateReplicationSlotName( @@ -1427,13 +1419,13 @@ NonBlockingShardSplit(SplitOperation splitOperation, DropShardSplitReplicationSlots(sourceConnection, replicationSlotInfoList); /* - * 19) Drop old shards and delete related metadata. Have to do that before + * 18) Drop old shards and delete related metadata. Have to do that before * creating the new shard metadata, because there's cross-checks * preventing inconsistent metadata (like overlapping shards). */ DropShardList(sourceColocatedShardIntervalList); - /* 20) Insert new shard and placement metdata */ + /* 19) Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); @@ -1441,7 +1433,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, workersForPlacementList); /* - * 21) Create foreign keys if exists after the metadata changes happening in + * 20) Create foreign keys if exists after the metadata changes happening in * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign * key creation depends on the new metadata. */ @@ -1449,17 +1441,17 @@ NonBlockingShardSplit(SplitOperation splitOperation, workersForPlacementList); /* - * 22) Drop dummy shards. + * 21) Drop dummy shards. */ DropDummyShards(mapOfDummyShardToPlacement); - /* 23) Close source connection */ + /* 22) Close source connection */ CloseConnection(sourceConnection); - /* 24) Close all subscriber connections */ + /* 23) Close all subscriber connections */ CloseShardSplitSubscriberConnections(shardSplitSubscribersMetadataList); - /* 25) Close connection of template replication slot */ + /* 24) Close connection of template replication slot */ CloseConnection(templateSlotConnection); } PG_CATCH(); @@ -1969,3 +1961,63 @@ CreateReplicaIdentities(HTAB *mapOfDummyShardToPlacement, shardToBeDroppedNode->workerPort); } } + + +/* + * GetNextShardIdForSplitChild returns shard id to be used for split child. + * The function connects to the local node through a new connection and gets the next + * sequence. This prevents self deadlock when 'CREATE_REPLICATION_SLOT' is executed + * as a part of nonblocking split workflow. + */ +static uint64 +GetNextShardIdForSplitChild() +{ + uint64 shardId = 0; + + /* + * In regression tests, we would like to generate shard IDs consistently + * even if the tests run in parallel. Instead of the sequence, we can use + * the next_shard_id GUC to specify which shard ID the current session should + * generate next. The GUC is automatically increased by 1 every time a new + * shard ID is generated. + */ + if (NextShardId > 0) + { + shardId = NextShardId; + NextShardId += 1; + + return shardId; + } + + StringInfo nextValueCommand = makeStringInfo(); + appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr( + "pg_catalog.pg_dist_shardid_seq")); + + int connectionFlag = FORCE_NEW_CONNECTION; + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag, + LocalHostName, + PostPortNumber, + CitusExtensionOwnerName(), + get_database_name( + MyDatabaseId)); + + PGresult *result = NULL; + int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data, + &result); + if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1 || + PQnfields(result) != 1) + { + PQclear(result); + ForgetResults(connection); + CloseConnection(connection); + + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg( + "Could not generate next shard id while executing shard splits."))); + } + + shardId = SafeStringToUint64(PQgetvalue(result, 0, 0 /* nodeId column*/)); + CloseConnection(connection); + + return shardId; +} diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index 616703d7b..ef3034c03 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -33,14 +33,14 @@ static HTAB *ShardInfoHashMapForPublications = NULL; static void AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, bool isChildShardInterval); -static void AlterShardSplitPublicationForNode(MultiConnection *connection, - List *shardList, - uint32_t publicationForTargetNodeId, Oid - ownerId); ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId, List * replicationSlotInfoList); +static void CreateShardSplitPublicationForNode(MultiConnection *connection, + List *shardList, + uint32_t publicationForTargetNodeId, Oid + tableOwner); static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); static void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection); static void DropAllShardSplitPublications(MultiConnection *cleanupConnection); @@ -174,44 +174,7 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, /* - * CreateShardSplitEmptyPublications creates empty publications on the source node. - * Due to a sporadic bug in PG, we have to create publications before we create replication slot. - * After the template replication slot is created, these empty publications are altered - * with actual tables to be replicated. - * More details about the bug can be found in the below mailing link. - * (https://www.postgresql.org/message-id/20191010115752.2d0f27af%40firost). - * - * We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications - * related to split operations. - */ -void -CreateShardSplitEmptyPublications(MultiConnection *sourceConnection, - HTAB *shardInfoHashMapForPublication) -{ - HASH_SEQ_STATUS status; - hash_seq_init(&status, shardInfoHashMapForPublication); - - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) - { - uint32 nodeId = entry->key.nodeId; - uint32 tableOwnerId = entry->key.tableOwnerId; - - StringInfo createEmptyPublicationCommand = makeStringInfo(); - appendStringInfo(createEmptyPublicationCommand, "CREATE PUBLICATION %s", - ShardSplitPublicationName(nodeId, tableOwnerId)); - - ExecuteCriticalRemoteCommand(sourceConnection, - createEmptyPublicationCommand->data); - pfree(createEmptyPublicationCommand->data); - pfree(createEmptyPublicationCommand); - } -} - - -/* - * AlterShardSplitPublications alters publications on the source node. - * It adds split shards for logical replication. + * CreateShardSplitPublications creates publications on the source node. * * sourceConnection - Connection of source node. * @@ -220,8 +183,8 @@ CreateShardSplitEmptyPublications(MultiConnection *sourceConnection, * ShardIntervals mapped by key. */ void -AlterShardSplitPublications(MultiConnection *sourceConnection, - HTAB *shardInfoHashMapForPublication) +CreateShardSplitPublications(MultiConnection *sourceConnection, + HTAB *shardInfoHashMapForPublication) { HASH_SEQ_STATUS status; hash_seq_init(&status, shardInfoHashMapForPublication); @@ -232,26 +195,30 @@ AlterShardSplitPublications(MultiConnection *sourceConnection, uint32 nodeId = entry->key.nodeId; uint32 tableOwnerId = entry->key.tableOwnerId; List *shardListForPublication = entry->shardSplitInfoList; - AlterShardSplitPublicationForNode(sourceConnection, - shardListForPublication, - nodeId, - tableOwnerId); + + /* Create publication on shard list */ + CreateShardSplitPublicationForNode(sourceConnection, + shardListForPublication, + nodeId, + tableOwnerId); } } /* - * AlterShardSplitPublicationForNode adds shards that have to be replicated - * for a given publication. + * CreateShardSplitPublicationForNode creates a publication on source node + * for given shard list. + * We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications + * related to split operations. */ static void -AlterShardSplitPublicationForNode(MultiConnection *connection, List *shardList, - uint32_t publicationForTargetNodeId, Oid ownerId) +CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, + uint32_t publicationForTargetNodeId, Oid ownerId) { - StringInfo alterPublicationCommand = makeStringInfo(); + StringInfo createPublicationCommand = makeStringInfo(); bool prefixWithComma = false; - appendStringInfo(alterPublicationCommand, "ALTER PUBLICATION %s ADD TABLE ", + appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ", ShardSplitPublicationName(publicationForTargetNodeId, ownerId)); ShardInterval *shard = NULL; @@ -261,16 +228,16 @@ AlterShardSplitPublicationForNode(MultiConnection *connection, List *shardList, if (prefixWithComma) { - appendStringInfoString(alterPublicationCommand, ","); + appendStringInfoString(createPublicationCommand, ","); } - appendStringInfoString(alterPublicationCommand, shardName); + appendStringInfoString(createPublicationCommand, shardName); prefixWithComma = true; } - ExecuteCriticalRemoteCommand(connection, alterPublicationCommand->data); - pfree(alterPublicationCommand->data); - pfree(alterPublicationCommand); + ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data); + pfree(createPublicationCommand->data); + pfree(createPublicationCommand); } diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index 566e10418..28138dd2a 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -87,8 +87,8 @@ extern List * CreateTargetNodeConnectionsForShardSplit( char *databaseName); /* Functions to drop publisher-subscriber resources */ -extern void CreateShardSplitEmptyPublications(MultiConnection *sourceConnection, - HTAB *shardInfoHashMapForPublication); +extern void CreateShardSplitPublications(MultiConnection *sourceConnection, + HTAB *shardInfoHashMapForPublication); extern char * CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit, MultiConnection * sourceConnection);