From 5a5505db558becfee60f980633a61cfd04c1c1af Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Sat, 23 Jul 2022 15:47:14 +0530 Subject: [PATCH] Add comments for functions --- .../distributed/operations/shard_split.c | 19 + .../shardsplit_logical_replication.c | 407 ++++++++++++------ 2 files changed, 285 insertions(+), 141 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 1aad1eaf7..611015a6d 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1713,6 +1713,25 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, } +/* + * CreateSplitShardReplicationSetupUDF creates and returns + * parameterized 'worker_split_shard_replication_setup' UDF command. + * + * 'sourceShardSplitIntervalList' : Source shard interval to split. + * 'shardGroupSplitIntervalListList' : List of shard intervals for split children.. + * 'destinationWorkerNodesList' : List of workers for split children placement. + * + * For example consider below input values: + * sourceColocatedShardIntervalList : [sourceShardInterval] + * shardGroupSplitIntervalListList : [] + * destinationWorkerNodesList : [worker1, worker2] + * + * SELECT * FROM worker_split_shard_replication_setup( + * Array[ + * ROW(sourceShardId, childFirstShardId, childFirstMinRange, childFirstMaxRange, worker1)::citus.split_shard_info, + * ROW(sourceShardId, childSecondShardId, childSecondMinRange, childSecondMaxRange, worker2)::citus.split_shard_info + * ]); + */ StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index 14fa0de28..9ff55bf43 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -44,6 +44,28 @@ static void DropAllShardSplitUsers(MultiConnection *cleanupConnection); static void DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection); +/* + * CreateShardSplitInfoMapForPublication creates a hashmap that groups + * shards for creating publications and subscriptions. + * + * While creating publications and subscriptions, apart from table owners, + * placement of child shard matters too. To further understand this, please see + * the following example: + * + * Shard1(on Worker1) is to be split in Shard2 and Shard3 on Worker2 and Worker3 respectively. + * Lets assume the owner to be 'A'. The hashmap groups shard list in the following way. + * + * Map key + * ======= ------ ------ + * ------> |Shard2|-->|Shard1| + * ------ ------ + * + * ------ ------ + * ------> |Shard3|-->|Shard1| + * ------ ------ + * Shard1 is a dummy table that is to be created on Worker2 and Worker3. + * Based on the above placement, we would need to create two publications on the source node. + */ HTAB * CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, @@ -78,6 +100,10 @@ CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList, } +/* + * AddPublishableShardEntryInMap adds a shard interval in the list + * of shards to be published. + */ static void AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, bool isChildShardInterval) @@ -91,19 +117,25 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, (NodeShardMappingEntry *) hash_search(ShardInfoHashMapForPublications, &key, HASH_ENTER, &found); + + /* Create a new list for pair */ if (!found) { nodeMappingEntry->shardSplitInfoList = NIL; } + /* Add child shard interval */ if (isChildShardInterval) { nodeMappingEntry->shardSplitInfoList = lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); + + /* We return from here as the child interval is only added once in the list */ return; } + /* Check if parent is already added */ ShardInterval *existingShardInterval = NULL; foreach_ptr(existingShardInterval, nodeMappingEntry->shardSplitInfoList) { @@ -120,32 +152,44 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, } +/* + * CreateShardSplitPublications creates publications on the source node. + * + * sourceConnection - Connection of source node. + * + * shardInfoHashMapForPublication - ShardIntervals are grouped by key. + * A publication is created for list of + * ShardIntervals mapped by key. + */ void -CreateShardSplitSubscriptions(List *targetNodeConnectionList, - List *shardSplitPubSubMetadataList, - WorkerNode *sourceWorkerNode, - char *superUser, - char *databaseName) +CreateShardSplitPublications(MultiConnection *sourceConnection, + HTAB *shardInfoHashMapForPublication) { - MultiConnection *targetConnection = NULL; - ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; - forboth_ptr(targetConnection, targetNodeConnectionList, - shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardInfoHashMapForPublication); + + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) { - uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; - Oid ownerId = shardSplitPubSubMetadata->tableOwnerId; - CreateShardSubscription(targetConnection, - sourceWorkerNode->workerName, - sourceWorkerNode->workerPort, - superUser, - databaseName, - ShardSplitPublicationName(publicationForNodeId, ownerId), - shardSplitPubSubMetadata->slotInfo->slotName, - ownerId); + uint32 nodeId = entry->key.nodeId; + uint32 tableOwnerId = entry->key.tableOwnerId; + List *shardListForPublication = entry->shardSplitInfoList; + + /* Create publication on shard list */ + CreateShardSplitPublicationForNode(sourceConnection, + shardListForPublication, + nodeId, + tableOwnerId); } } +/* + * CreateShardSplitPublicationForNode creates a publication on source node + * for given shard list. + * We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications + * related to split operations. + */ static void CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, uint32_t publicationForTargetNodeId, Oid ownerId) @@ -176,6 +220,9 @@ CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, } +/* + * ShardSplitPublicationName returns publication name for Shard Split operations. + */ static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId) { @@ -183,41 +230,6 @@ ShardSplitPublicationName(uint32_t nodeId, Oid ownerId) } -void -WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadataList) -{ - ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; - foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) - { - Bitmapset *tableOwnerIds = NULL; - tableOwnerIds = bms_add_member(tableOwnerIds, - shardSplitPubSubMetadata->tableOwnerId); - WaitForRelationSubscriptionsBecomeReady( - shardSplitPubSubMetadata->targetNodeConnection, tableOwnerIds, - SHARD_SPLIT_SUBSCRIPTION_PREFIX); - } -} - - -void -WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, - List *shardSplitPubSubMetadataList) -{ - ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; - foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) - { - Bitmapset *tableOwnerIds = NULL; - tableOwnerIds = bms_add_member(tableOwnerIds, - shardSplitPubSubMetadata->tableOwnerId); - - WaitForShardSubscriptionToCatchUp(shardSplitPubSubMetadata->targetNodeConnection, - sourcePosition, - tableOwnerIds, - SHARD_SPLIT_SUBSCRIPTION_PREFIX); - } -} - - List * CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList, int connectionFlags, char *user, char *databaseName) @@ -246,81 +258,16 @@ CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList } -char * -DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit, - MultiConnection *sourceConnection) -{ - /* - * To ensure SPLIT is idempotent drop any existing slot from - * previous failed operation. - */ - StringInfo dropReplicationSlotCommand = makeStringInfo(); - appendStringInfo(dropReplicationSlotCommand, "SELECT pg_drop_replication_slot('%s')", - ShardSplitTemplateReplicationSlotName( - shardIntervalToSplit->shardId)); - - /* The Drop command can fail so ignore the response / result and proceed anyways */ - PGresult *result = NULL; - int response = ExecuteOptionalRemoteCommand(sourceConnection, - dropReplicationSlotCommand->data, - &result); - - PQclear(result); - ForgetResults(sourceConnection); - - /* - * PG 13 Function: pg_create_logical_replication_slot ( slot_name name, plugin name [, temporary boolean ] ) - * PG 14 Function: pg_create_logical_replication_slot (slot_name name, plugin name [, temporary boolean, two_phase boolean ] ) - * Return: record ( slot_name name, lsn pg_lsn ) - * Note: Temporary slot are only live during the session's lifetime causing them to be dropped when the session ends. - * In our invocation 'two_phase' support is disabled. - */ - StringInfo createReplicationSlotCommand = makeStringInfo(); - - /* TODO(niupre): Replace pgoutput with an appropriate name (to be introduced in by saawasek's PR) */ - /*TODO(saawasek): Try creating TEMPORAL once basic flow is ready and we have a testcase*/ - appendStringInfo(createReplicationSlotCommand, - "CREATE_REPLICATION_SLOT %s LOGICAL citus EXPORT_SNAPSHOT;", - ShardSplitTemplateReplicationSlotName( - shardIntervalToSplit->shardId)); - - response = ExecuteOptionalRemoteCommand(sourceConnection, - createReplicationSlotCommand->data, &result); - - if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1) - { - ReportResultError(sourceConnection, result, ERROR); - } - - /*'snapshot_name' is second column where index starts from zero. - * We're using the pstrdup to copy the data into the current memory context */ - char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */)); - return snapShotName; -} - - -void -CreateShardSplitPublications(MultiConnection *sourceConnection, - HTAB *shardInfoHashMapForPublication) -{ - HASH_SEQ_STATUS status; - hash_seq_init(&status, shardInfoHashMapForPublication); - - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) - { - uint32 nodeId = entry->key.nodeId; - uint32 tableOwnerId = entry->key.tableOwnerId; - List *shardListForPublication = entry->shardSplitInfoList; - - CreateShardSplitPublicationForNode(sourceConnection, - shardListForPublication, - nodeId, - tableOwnerId); - } -} - - +/* + * PopulateShardSplitSubscriptionsMetadataList returns a list of 'ShardSplitSubscriberMetadata' + * structure. + * + * shardSplitInfoHashMap - Shards are grouped by key. + * For each key, we create a metadata structure. This facilitates easy + * publication-subscription management. + * + * replicationSlotInfoList - List of replication slot info. + */ List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap, List *replicationSlotInfoList) @@ -346,6 +293,11 @@ PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap, } +/* + * Creates a 'ShardSplitSubscriberMetadata' structure for given table owner, node id. + * It scans the list of 'ReplicationSlotInfo' to identify the corresponding slot + * to be used for given tableOwnerId and nodeId. + */ ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId, List *replicationSlotInfoList) @@ -354,6 +306,11 @@ CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId, sizeof(ShardSplitSubscriberMetadata)); shardSplitSubscriberMetadata->tableOwnerId = tableOwnerId; + /* + * Each 'ReplicationSlotInfo' belongs to a unique combination of node id and owner. + * Traverse the slot list to identify the corresponding slot for given + * table owner and node. + */ char *tableOwnerName = GetUserNameFromId(tableOwnerId, false); ReplicationSlotInfo *replicationSlotInfo = NULL; foreach_ptr(replicationSlotInfo, replicationSlotInfoList) @@ -370,7 +327,166 @@ CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId, } -/*TODO(saawasek): Remove existing slots before creating newer ones */ +/* + * CreateShardSplitSubscriptions creates subscriptions for Shard Split operation. + * We follow Shard Split naming scheme for Publication-Subscription management. + * + * targetNodeConnectionList - List of connections to target nodes on which + * subscriptions have to be created. + * + * shardSplitSubscriberMetadataList - List of subscriber metadata. + * + * sourceWorkerNode - Source node. + */ +void +CreateShardSplitSubscriptions(List *targetNodeConnectionList, + List *shardSplitSubscriberMetadataList, + WorkerNode *sourceWorkerNode, + char *superUser, + char *databaseName) +{ + MultiConnection *targetConnection = NULL; + ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; + forboth_ptr(targetConnection, targetNodeConnectionList, + shardSplitPubSubMetadata, shardSplitSubscriberMetadataList) + { + uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; + Oid ownerId = shardSplitPubSubMetadata->tableOwnerId; + CreateShardSubscription(targetConnection, + sourceWorkerNode->workerName, + sourceWorkerNode->workerPort, + superUser, + databaseName, + ShardSplitPublicationName(publicationForNodeId, ownerId), + shardSplitPubSubMetadata->slotInfo->slotName, + ownerId); + } +} + + +/* + * WaitForShardSplitRelationSubscriptionsBecomeReady waits for a list of subscriptions + * to be come ready. This method invokes 'WaitForRelationSubscriptionsBecomeReady' for each + * subscription. + */ +void +WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadataList) +{ + ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; + foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + Bitmapset *tableOwnerIds = NULL; + tableOwnerIds = bms_add_member(tableOwnerIds, + shardSplitPubSubMetadata->tableOwnerId); + WaitForRelationSubscriptionsBecomeReady( + shardSplitPubSubMetadata->targetNodeConnection, tableOwnerIds, + SHARD_SPLIT_SUBSCRIPTION_PREFIX); + } +} + + +/* + * WaitForShardSplitRelationSubscriptionsToBeCaughtUp waits until subscriptions are caught up till + * the source LSN. This method invokes 'WaitForShardSubscriptionToCatchUp' for each subscription. + */ +void +WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, + List *shardSplitPubSubMetadataList) +{ + ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; + foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + Bitmapset *tableOwnerIds = NULL; + tableOwnerIds = bms_add_member(tableOwnerIds, + shardSplitPubSubMetadata->tableOwnerId); + + WaitForShardSubscriptionToCatchUp(shardSplitPubSubMetadata->targetNodeConnection, + sourcePosition, + tableOwnerIds, + SHARD_SPLIT_SUBSCRIPTION_PREFIX); + } +} + + +char * +DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit, + MultiConnection *sourceConnection) +{ + /* + * To ensure SPLIT is idempotent drop any existing slot from + * previous failed operation. + */ + StringInfo dropReplicationSlotCommand = makeStringInfo(); + appendStringInfo(dropReplicationSlotCommand, "SELECT pg_drop_replication_slot('%s')", + ShardSplitTemplateReplicationSlotName( + shardIntervalToSplit->shardId)); + + /* The Drop command can fail so ignore the response / result and proceed anyways */ + PGresult *result = NULL; + int response = ExecuteOptionalRemoteCommand(sourceConnection, + dropReplicationSlotCommand->data, + &result); + + PQclear(result); + ForgetResults(sourceConnection); + + /* + * Note: Temporary slot are only live during the session's lifetime causing them to be dropped when the session ends. + */ + StringInfo createReplicationSlotCommand = makeStringInfo(); + + appendStringInfo(createReplicationSlotCommand, + "CREATE_REPLICATION_SLOT %s LOGICAL citus EXPORT_SNAPSHOT;", + ShardSplitTemplateReplicationSlotName( + shardIntervalToSplit->shardId)); + + response = ExecuteOptionalRemoteCommand(sourceConnection, + createReplicationSlotCommand->data, &result); + + if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1) + { + ReportResultError(sourceConnection, result, ERROR); + } + + /*'snapshot_name' is second column where index starts from zero. + * We're using the pstrdup to copy the data into the current memory context */ + char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */)); + return snapShotName; +} + + +/* + * ShardSplitTemplateReplicationSlotName returns name of template replication slot + * following the shard split naming scheme. + */ +char * +ShardSplitTemplateReplicationSlotName(uint64 shardId) +{ + return psprintf("%s%lu", SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX, shardId); +} + + +/* + * CreateReplicationSlots creates copies of template replication slot + * on the source node. + * + * sourceNodeConnection - Source node connection. + * + * templateSlotName - Template replication slot name whose copies have to be created. + * This slot holds a LSN from which the logical replication + * begins. + * + * shardSplitSubscriberMetadataList - List of 'ShardSplitSubscriberMetadata. ' + * + * 'ShardSplitSubscriberMetadata' contains replication slot name that is used + * to create copies of template replication slot on source node. These slot names are returned by + * 'worker_split_shard_replication_setup' UDF and each slot is responsible for a specific + * split range. We try multiple attemtps to clean up these replicaton slot copies in the + * below order to be on safer side. + * 1. Clean up before starting shard split workflow. + * 2. Implicitly dropping slots while dropping subscriptions. + * 3. Explicitly dropping slots which would have skipped over from 2. + */ void CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlotName, List *shardSplitSubscriberMetadataList) @@ -401,16 +517,6 @@ CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlot } -/* - * ShardSplitTemplateReplicationSlotName returns name of template replication slot. - */ -char * -ShardSplitTemplateReplicationSlotName(uint64 shardId) -{ - return psprintf("%s%lu", SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX, shardId); -} - - /* * ParseReplicationSlotInfoFromResult parses custom datatype 'replication_slot_info'. * 'replication_slot_info' is a tuple with below format: @@ -449,7 +555,7 @@ ParseReplicationSlotInfoFromResult(PGresult *result) /* * DropAllShardSplitLeftOvers drops shard split subscriptions, publications, roles - * and replication slots on all nodes. These might have been left there after + * and replication slots. These might have been left there after * the coordinator crashed during a shard split. It's important to delete them * for two reasons: * 1. Starting new shard split might fail when they exist, because it cannot @@ -513,6 +619,11 @@ DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitHashMapForPub } +/* + * DropAllShardSplitSubscriptions drops all the existing subscriptions that + * match our shard split naming scheme on the node that the connection points + * to. + */ void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection) { @@ -529,6 +640,11 @@ DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection) } +/* + * DropAllShardSplitPublications drops all the existing publications that + * match our shard split naming scheme on the node that the connection points + * to. + */ static void DropAllShardSplitPublications(MultiConnection *connection) { @@ -545,6 +661,10 @@ DropAllShardSplitPublications(MultiConnection *connection) } +/* + * DropAllShardSplitUsers drops all the users that match our shard split naming + * scheme. The users are temporary created for shard splits. + */ static void DropAllShardSplitUsers(MultiConnection *connection) { @@ -561,6 +681,11 @@ DropAllShardSplitUsers(MultiConnection *connection) } +/* + * DropAllShardSplitReplicationSlots drops all the existing replication slots + * that match shard split naming scheme on the node that the connection + * points to. + */ static void DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection) {