diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 611015a6d..551af6620 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -63,10 +63,6 @@ static void CreateAndCopySplitShardsForShardGroup( static void CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static void CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *workersForPlacementList); static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, WorkerNode *sourceWorkerNode, @@ -113,7 +109,7 @@ static StringInfo CreateSplitShardReplicationSetupUDF( List *destinationWorkerNodesList); static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval); static void DropDummyShards(void); -void TryDropShard(MultiConnection *connection, ShardInterval *shardInterval); +static void TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval); char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, WorkerNode *sourceWorkerNode, MultiConnection ** @@ -123,6 +119,7 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList); +static List * ParseReplicationSlotInfoFromResult(PGresult *result); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -1400,39 +1397,28 @@ NonBlockingShardSplit(SplitOperation splitOperation, } -/* Create ShardGroup split children on a list of corresponding workers. */ -static void -CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *workersForPlacementList) -{ - /* Iterate on shard interval list for shard group */ - List *shardIntervalList = NULL; - foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) - { - /* Iterate on split shard interval list and corresponding placement worker */ - ShardInterval *shardInterval = NULL; - WorkerNode *workerPlacementNode = NULL; - forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, - workersForPlacementList) - { - /* Populate list of commands necessary to create shard interval on destination */ - List *splitShardCreationCommandList = GetPreLoadTableCreationCommands( - shardInterval->relationId, - false, /* includeSequenceDefaults */ - NULL /* auto add columnar options for cstore tables */); - splitShardCreationCommandList = WorkerApplyShardDDLCommandList( - splitShardCreationCommandList, - shardInterval->shardId); - - /* Create new split child shard on the specified worker node */ - CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); - } - } -} - - +/* + * Given we are using PG logical replication infrastructure there are some constraints + * that need to met around matching table names in source and target nodes: + * The restrictions in context of split are: + * Constraint 1: Dummy source shard(s) from shard group must exist on all destination nodes. + * Constraint 2: Dummy target shards from shard group must exist on source node. + * Example : + * Shard1[1-200] is co-located with Shard2[1-200] in Worker0. + * We are splitting 2-way to worker0 (same node) and worker1 (different node). + * + * Non-Dummy shards (expected from Split): + * In Worker0 --> Shard1_1 and Shard2_1. + * In Worker1 --> Shard1_2 and Shard2_2. + * + * Dummy shards: + * From constraint 1, we need to create: Dummy Shard1 and Shard2 in Worker0. Dummy Shard1 and Shard2 in Worker1 + * Note 1 : Given there is an overlap of source and destination in Worker0, Shard1 and Shard2 need not be created. + * Be very careful here, dropping Shard1, Shard2 with customer data to create dummy Shard1, Shard2 on worker0 is catastrophic. + * + * From constraint 2, we need to create: Dummy Shard1_1, Shard2_1, Shard1_2 and Shard2_2 in Worker0. + * Note 2 : Given there is an overlap of source and destination in Worker0, Shard1_1 and Shard2_1 need not be created. + */ static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, @@ -1546,89 +1532,13 @@ CreateWorkerForPlacementSet(List *workersForPlacementList) } -static void -AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval) -{ - NodeShardMappingKey key; - key.nodeId = targetNodeId; - key.tableOwnerId = TableOwnerOid(shardInterval->relationId); - - bool found = false; - NodeShardMappingEntry *nodeMappingEntry = - (NodeShardMappingEntry *) hash_search(DummyShardInfoHashMap, &key, HASH_ENTER, - &found); - if (!found) - { - nodeMappingEntry->shardSplitInfoList = NIL; - } - - nodeMappingEntry->shardSplitInfoList = - lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); -} - - -static void -DropDummyShards() -{ - /* Return if no dummy shards are created */ - if (DummyShardInfoHashMap == NULL) - { - return; - } - - HASH_SEQ_STATUS status; - hash_seq_init(&status, DummyShardInfoHashMap); - - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) - { - uint32 nodeId = entry->key.nodeId; - WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId, - false /* missingOk */); - - int connectionFlags = FOR_DDL; - connectionFlags |= OUTSIDE_TRANSACTION; - connectionFlags |= FORCE_NEW_CONNECTION; - MultiConnection *connection = GetNodeUserDatabaseConnection( - connectionFlags, - shardToBeDroppedNode->workerName, - shardToBeDroppedNode->workerPort, - CurrentUserName(), - NULL /* databaseName */); - - List *dummyShardIntervalList = entry->shardSplitInfoList; - ShardInterval *shardInterval = NULL; - foreach_ptr(shardInterval, dummyShardIntervalList) - { - TryDropShard(connection, shardInterval); - } - - CloseConnection(connection); - } -} - - -void -TryDropShard(MultiConnection *connection, ShardInterval *shardInterval) -{ - char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); - StringInfo dropShardQuery = makeStringInfo(); - - /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ - appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, - qualifiedShardName); - - /* - * Perform a drop in best effort manner. - * The shard may or may not exist and the connection could have died. - */ - ExecuteOptionalRemoteCommand( - connection, - dropShardQuery->data, - NULL /* pgResult */); -} - - +/* + * CreateTemplateReplicationSlotAndReturnSnapshot creates a replication slot + * and returns its snapshot. This slot acts as a 'Template' for creating + * replication slot copies used for logical replication. + * + * The snapshot remains valid till the lifetime of the session that creates it. + */ char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, WorkerNode *sourceWorkerNode, @@ -1646,17 +1556,25 @@ CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, CitusExtensionOwnerName(), get_database_name( MyDatabaseId)); - ClaimConnectionExclusively(sourceConnection); + /* + * Try to drop leftover template replication slot if any from previous operation + * and create new one. + */ char *snapShotName = DropExistingIfAnyAndCreateTemplateReplicationSlot(shardInterval, sourceConnection); - *templateSlotConnection = sourceConnection; + return snapShotName; } +/* + * ExecuteSplitShardReplicationSetupUDF executes + * 'worker_split_shard_replication_setup' UDF on source shard node + * and returns list of ReplicationSlotInfo. + */ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, List *sourceColocatedShardIntervalList, @@ -1668,6 +1586,7 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, shardGroupSplitIntervalListList, destinationWorkerNodesList); + /* Force a new connection to execute the UDF */ int connectionFlags = FORCE_NEW_CONNECTION; MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, sourceWorkerNode-> @@ -1784,3 +1703,132 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, return splitShardReplicationUDF; } + + +/* + * ParseReplicationSlotInfoFromResult parses custom datatype 'replication_slot_info'. + * 'replication_slot_info' is a tuple with below format: + * + */ +static List * +ParseReplicationSlotInfoFromResult(PGresult *result) +{ + int64 rowCount = PQntuples(result); + int64 colCount = PQnfields(result); + + List *replicationSlotInfoList = NIL; + for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0( + sizeof(ReplicationSlotInfo)); + + char *targeNodeIdString = PQgetvalue(result, rowIndex, 0 /* nodeId column*/); + + replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10); + + /* We're using the pstrdup to copy the data into the current memory context */ + replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, + 1 /* table owner name column */)); + + /* Replication slot name */ + replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, + 2 /* slot name column */)); + + replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); + } + + return replicationSlotInfoList; +} + + +/* + * AddDummyShardEntryInMap adds shard entry into hash map to keep track + * of dummy shards that are created. These shards are cleanedup after split completes. + * + * This is a cautious measure to keep track of dummy shards created for constraints + * of logical replication. We cautiously delete only the dummy shards added in the DummyShardHashMap. + */ +static void +AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval) +{ + NodeShardMappingKey key; + key.nodeId = targetNodeId; + key.tableOwnerId = TableOwnerOid(shardInterval->relationId); + + bool found = false; + NodeShardMappingEntry *nodeMappingEntry = + (NodeShardMappingEntry *) hash_search(DummyShardInfoHashMap, &key, HASH_ENTER, + &found); + if (!found) + { + nodeMappingEntry->shardSplitInfoList = NIL; + } + + nodeMappingEntry->shardSplitInfoList = + lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); +} + + +static void +DropDummyShards() +{ + /* Return if no dummy shards are created */ + if (DummyShardInfoHashMap == NULL) + { + return; + } + + HASH_SEQ_STATUS status; + hash_seq_init(&status, DummyShardInfoHashMap); + + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + uint32 nodeId = entry->key.nodeId; + WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId, + false /* missingOk */); + + int connectionFlags = FOR_DDL; + connectionFlags |= OUTSIDE_TRANSACTION; + connectionFlags |= FORCE_NEW_CONNECTION; + MultiConnection *connection = GetNodeUserDatabaseConnection( + connectionFlags, + shardToBeDroppedNode->workerName, + shardToBeDroppedNode->workerPort, + CurrentUserName(), + NULL /* databaseName */); + + List *dummyShardIntervalList = entry->shardSplitInfoList; + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, dummyShardIntervalList) + { + TryDroppingShard(connection, shardInterval); + } + + CloseConnection(connection); + } +} + + +/* + * TryDroppingShard drops a given shard on the source node connection. + */ +static void +TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval) +{ + char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); + StringInfo dropShardQuery = makeStringInfo(); + + /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ + appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, + qualifiedShardName); + + /* + * Perform a drop in best effort manner. + * The shard may or may not exist and the connection could have died. + */ + ExecuteOptionalRemoteCommand( + connection, + dropShardQuery->data, + NULL /* pgResult */); +} diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index 9ff55bf43..5b75f4c0a 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -517,42 +517,6 @@ CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlot } -/* - * ParseReplicationSlotInfoFromResult parses custom datatype 'replication_slot_info'. - * 'replication_slot_info' is a tuple with below format: - * - */ -List * -ParseReplicationSlotInfoFromResult(PGresult *result) -{ - int64 rowCount = PQntuples(result); - int64 colCount = PQnfields(result); - - List *replicationSlotInfoList = NIL; - for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) - { - ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0( - sizeof(ReplicationSlotInfo)); - - char *targeNodeIdString = PQgetvalue(result, rowIndex, 0 /* nodeId column*/); - - replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10); - - /* We're using the pstrdup to copy the data into the current memory context */ - replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, - 1 /* table owner name column */)); - - /* Replication slot name */ - replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, - 2 /* slot name column */)); - - replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); - } - - return replicationSlotInfoList; -} - - /* * DropAllShardSplitLeftOvers drops shard split subscriptions, publications, roles * and replication slots. These might have been left there after diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index 11b7efab2..3c2a2b5e6 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -61,7 +61,6 @@ extern int NodeShardMappingHashCompare(const void *left, const void *right, Size extern HTAB * SetupHashMapForShardInfo(void); /* Functions for subscriber metadata management */ -extern List * ParseReplicationSlotInfoFromResult(PGresult *result); extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap, List *replicationSlotInfoList); extern HTAB * CreateShardSplitInfoMapForPublication( @@ -73,7 +72,7 @@ extern HTAB * CreateShardSplitInfoMapForPublication( extern void CreateShardSplitPublications(MultiConnection *sourceConnection, HTAB *shardInfoHashMapForPublication); extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList, - List *shardSplitPubSubMetadataList, + List *shardSplitSubscriberMetadataList, WorkerNode *sourceWorkerNode, char *superUser, char *databaseName); extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection,