From 10ea295d6cb94bcd57524470bbd932e000f331f4 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Mon, 18 Jul 2022 13:51:58 +0530 Subject: [PATCH] Flow completed --- .../distributed/operations/shard_split.c | 103 +++++++++++++----- .../replication/multi_logical_replication.c | 7 +- .../shardsplit_logical_replication.c | 88 ++++++++++----- .../distributed/multi_logical_replication.h | 2 +- .../shardsplit_logical_replication.h | 2 + src/test/regress/sql/citus_sameer.sql | 11 +- 6 files changed, 152 insertions(+), 61 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 8d2184f19..7a7228df9 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1120,6 +1120,10 @@ NonBlockingShardSplit(SplitOperation splitOperation, List *shardSplitPointsList, List *workersForPlacementList) { + char *superUser = CitusExtensionOwnerName(); + char *databaseName = get_database_name(MyDatabaseId); + int connectionFlags = FORCE_NEW_CONNECTION; + List *sourceColocatedShardIntervalList = ColocatedShardIntervalList( shardIntervalToSplit); @@ -1136,6 +1140,24 @@ NonBlockingShardSplit(SplitOperation splitOperation, WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, false /* missingOk */); + HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication( + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + workersForPlacementList); + + DropAllShardSplitLeftOvers(sourceShardToCopyNode, shardSplitHashMapForPublication); + + MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, + sourceShardToCopyNode + -> + workerName, + sourceShardToCopyNode + -> + workerPort, + superUser, + databaseName); + ClaimConnectionExclusively(sourceConnection); + PG_TRY(); { /* @@ -1149,11 +1171,6 @@ NonBlockingShardSplit(SplitOperation splitOperation, shardGroupSplitIntervalListList, workersForPlacementList); - /* char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( */ - /* shardIntervalToSplit, sourceShardToCopyNode); */ - - /* DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, */ - /* shardGroupSplitIntervalListList, workersForPlacementList, NULL); */ CreateDummyShardsForShardGroup( sourceColocatedShardIntervalList, @@ -1161,12 +1178,58 @@ NonBlockingShardSplit(SplitOperation splitOperation, sourceShardToCopyNode, workersForPlacementList); - /*TODO: Refactor this method. BlockWrites is within this as of now, take it out */ - SplitShardReplicationSetup(shardIntervalToSplit, - sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, - sourceShardToCopyNode, - workersForPlacementList); + CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); + + /*Create Template Replication Slot */ + + /* DoSplitCopy */ + + /*worker_split_replication_setup_udf*/ + List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF( + sourceShardToCopyNode, + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + workersForPlacementList); + + /* Subscriber flow starts from here */ + List *shardSplitSubscribersMetadataList = + PopulateShardSplitSubscriptionsMetadataList( + shardSplitHashMapForPublication, replicationSlotInfoList); + + List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit( + shardSplitSubscribersMetadataList, + connectionFlags, + superUser, databaseName); + + /* Create copies of template replication slot */ + CreateReplicationSlots(sourceConnection, shardSplitSubscribersMetadataList); + + CreateShardSplitSubscriptions(targetNodeConnectionList, + shardSplitSubscribersMetadataList, + sourceShardToCopyNode, + superUser, + databaseName); + + WaitForShardSplitRelationSubscriptionsBecomeReady( + shardSplitSubscribersMetadataList); + + XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, + shardSplitSubscribersMetadataList); + + CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, + workersForPlacementList); + + sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, + shardSplitSubscribersMetadataList); + + BlockWritesToShardList(sourceColocatedShardIntervalList); + + sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, + shardSplitSubscribersMetadataList); + /* * Drop old shards and delete related metadata. Have to do that before * creating the new shard metadata, because there's cross-checks @@ -1355,22 +1418,6 @@ SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit, WorkerNode *sourceWorkerNode, List *destinationWorkerNodesList) { - /* / * // / *Create Template replication slot * / * / */ - /* / * char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( * / */ - /* / * shardIntervalToSplit, sourceWorkerNode); * / */ - /* List *shardSplitPubSubMetadata = CreateShardSplitInfoMapForPublication( */ - /* sourceColocatedShardIntervalList, */ - /* shardGroupSplitIntervalListList, */ - /* destinationWorkerNodesList, */ - /* replicationSlotInfoList); */ - /* earlier the above method used to take replication slot info as information */ - - - /* LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata, */ - /* sourceColocatedShardIntervalList, */ - /* shardGroupSplitIntervalListList, */ - /* destinationWorkerNodesList); */ - char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); int connectionFlags = FORCE_NEW_CONNECTION; @@ -1424,7 +1471,7 @@ SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit, XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, shardSplitSubscribersMetadataList); - + CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, destinationWorkerNodesList); diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index b0f931967..332755a72 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -2080,7 +2080,7 @@ GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds, void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, int sourceNodePort, char *userName, char *databaseName, - char *publicationName, + char *publicationName, char *slotName, Oid ownerId) { StringInfo createSubscriptionCommand = makeStringInfo(); @@ -2107,11 +2107,12 @@ CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, appendStringInfo(createSubscriptionCommand, "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " - "WITH (citus_use_authinfo=true, enabled=false)", + "WITH (citus_use_authinfo=true, enabled=false, create_slot=false, copy_data=false, slot_name='%s')", quote_identifier(ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX)), quote_literal_cstr(conninfo->data), - quote_identifier(publicationName)); + quote_identifier(publicationName), + slotName); ExecuteCriticalRemoteCommand(connection, createSubscriptionCommand->data); pfree(createSubscriptionCommand->data); diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index a0e886af1..85ee3d813 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -32,6 +32,7 @@ ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwner nodeId, List * replicationSlotInfoList); + static void CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, uint32_t publicationForTargetNodeId, Oid @@ -149,30 +150,6 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, } -ShardSplitSubscriberMetadata * -CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId, - List *replicationSlotInfoList) -{ - ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = palloc0( - sizeof(ShardSplitSubscriberMetadata)); - shardSplitSubscriberMetadata->tableOwnerId = tableOwnerId; - - char *tableOwnerName = GetUserNameFromId(tableOwnerId, false); - ReplicationSlotInfo *replicationSlotInfo = NULL; - foreach_ptr(replicationSlotInfo, replicationSlotInfoList) - { - if (nodeId == replicationSlotInfo->targetNodeId && - strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0) - { - shardSplitSubscriberMetadata->slotInfo = replicationSlotInfo; - break; - } - } - - return shardSplitSubscriberMetadata; -} - - void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, List *shardSplitPubSubMetadataList, @@ -281,6 +258,7 @@ CreateShardSplitSubscriptions(List *targetNodeConnectionList, superUser, databaseName, ShardSplitPublicationName(publicationForNodeId, ownerId), + shardSplitPubSubMetadata->slotInfo->slotName, ownerId); } } @@ -576,9 +554,67 @@ PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap, shardSplitSubscriptionMetadataList = lappend(shardSplitSubscriptionMetadataList, shardSplitSubscriberMetadata); - - /*replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); */ } return shardSplitSubscriptionMetadataList; } + + +ShardSplitSubscriberMetadata * +CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId, + List *replicationSlotInfoList) +{ + ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = palloc0( + sizeof(ShardSplitSubscriberMetadata)); + shardSplitSubscriberMetadata->tableOwnerId = tableOwnerId; + + char *tableOwnerName = GetUserNameFromId(tableOwnerId, false); + ReplicationSlotInfo *replicationSlotInfo = NULL; + foreach_ptr(replicationSlotInfo, replicationSlotInfoList) + { + if (nodeId == replicationSlotInfo->targetNodeId && + strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0) + { + shardSplitSubscriberMetadata->slotInfo = replicationSlotInfo; + break; + } + } + + PrintShardSplitPubSubMetadata(shardSplitSubscriberMetadata); + + return shardSplitSubscriberMetadata; +} + + +/*TODO(saawasek): Remove existing slots before creating newer ones */ + +/* extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection, List * shardSplitSubscriberMetadataList); */ +void +CreateReplicationSlots(MultiConnection *sourceNodeConnection, + List *shardSplitSubscriberMetadataList) +{ + ShardSplitSubscriberMetadata *subscriberMetadata = NULL; + foreach_ptr(subscriberMetadata, shardSplitSubscriberMetadataList) + { + char *slotName = subscriberMetadata->slotInfo->slotName; + + StringInfo createReplicationSlotCommand = makeStringInfo(); + + /* TODO(niupre): Replace pgoutput with an appropriate name (to e introduced in by saawasek's PR) */ + appendStringInfo(createReplicationSlotCommand, + "SELECT * FROM pg_create_logical_replication_slot('%s','citus', false)", + slotName); + + PGresult *result = NULL; + int response = ExecuteOptionalRemoteCommand(sourceNodeConnection, + createReplicationSlotCommand->data, + &result); + if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1) + { + ReportResultError(sourceNodeConnection, result, ERROR); + } + + PQclear(result); + ForgetResults(sourceNodeConnection); + } +} diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 5f98102b1..b079baff2 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -38,7 +38,7 @@ extern void DropShardUser(MultiConnection *connection, char *username); extern void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, int sourceNodePort, char *userName, char *databaseName, - char *publicationName, + char *publicationName, char *slotName, Oid ownerId); extern void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index 38e125bd0..d5aa6fef8 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -89,6 +89,8 @@ List * CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetad int connectionFlags, char *user, char *databaseName); +extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection, + List *shardSplitSubscriberMetadataList); /*used for debuggin. Remove later*/ extern void PrintShardSplitPubSubMetadata( diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql index aa8e14d53..b0184d824 100644 --- a/src/test/regress/sql/citus_sameer.sql +++ b/src/test/regress/sql/citus_sameer.sql @@ -48,15 +48,19 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SELECT citus_split_shard_by_split_points( 1, ARRAY['0'], - ARRAY[:worker_2_node, :worker_2_node], + ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); --- On worker2, we want child shard 2 and dummy shard 1 -- --- on worker1, we want child shard 3 and 1 and dummy shard 2 -- + +INSERT INTO table_to_split values(100,'a'); +INSERT INTO table_to_split values(400, 'a'); +INSERT INTO table_to_split values(500, 'a'); \c - - - :worker_2_port SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; SELECT * FROM pg_subscription; +SELECT slot_name FROM pg_replication_slots; +SELECT * FROM table_to_split_101; \c - - - :worker_1_port SET search_path TO citus_split_shard_by_split_points_negative; @@ -64,3 +68,4 @@ SELECT * FROM show_catalog; SELECT * FROM pg_publication; SELECT * FROM pg_subscription; SELECT slot_name FROM pg_replication_slots; +SELECT * FROM table_to_split_100;