From 028abaa3cb8cebe94607f33654842fd1eb29291e Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Thu, 14 Jul 2022 09:25:07 +0530 Subject: [PATCH] Add: 1. Add publication/subscription logic 2. Add Catchup code 3. Blockwrites 4. Create Indexs, Replica Identity --- .../distributed/operations/shard_split.c | 7 ++- .../replication/multi_logical_replication.c | 53 +++++++++---------- .../shardsplit_logical_replication.c | 44 +++++++++++++-- .../distributed/multi_logical_replication.h | 8 ++- src/include/distributed/shard_split.h | 2 + .../shardsplit_logical_replication.h | 6 ++- src/test/regress/sql/citus_sameer.sql | 2 + 7 files changed, 85 insertions(+), 37 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index f20cff26e..c98907875 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -60,8 +60,6 @@ static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, WorkerNode *sourceWorkerNode, List *workersForPlacementList); static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList); -static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, - List *workersForPlacementList); static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerNode); static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, @@ -525,7 +523,7 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, /* Create ShardGroup auxiliary structures (indexes, stats, replicaindentities, triggers) * on a list of corresponding workers. */ -static void +void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList) { @@ -1222,6 +1220,7 @@ static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, destinationWorkerNodesList, replicationSlotInfoList); - LogicallReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata); + + LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, destinationWorkerNodesList); } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 04f868f90..6820021b1 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -131,19 +131,15 @@ static void CreateShardMoveSubscriptions(MultiConnection *connection, char *databaseName, Bitmapset *tableOwnerIds); static char * escape_param_str(const char *str); -static XLogRecPtr GetRemoteLogPosition(MultiConnection *connection); static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); static uint64 TotalRelationSizeForSubscription(MultiConnection *connection, char *command); static bool RelationSubscriptionsAreReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds); -static void WaitForShardMoveSubscription(MultiConnection *targetConnection, - XLogRecPtr sourcePosition, - Bitmapset *tableOwnerIds); + Bitmapset *tableOwnerIds, char * operationPrefix); static void WaitForMiliseconds(long timeout); static XLogRecPtr GetSubscriptionPosition(MultiConnection *connection, - Bitmapset *tableOwnerIds); + Bitmapset *tableOwnerIds, char * operationPrefix); static char * ShardMovePublicationName(Oid ownerId); static char * ShardSubscriptionName(Oid ownerId, char * operationPrefix); static void AcquireLogicalReplicationLock(void); @@ -152,7 +148,7 @@ static void DropAllShardMoveSubscriptions(MultiConnection *connection); static void DropAllShardMoveReplicationSlots(MultiConnection *connection); static void DropAllShardMovePublications(MultiConnection *connection); static void DropAllShardMoveUsers(MultiConnection *connection); -static char * ShardMoveSubscriptionNamesValueList(Bitmapset *tableOwnerIds); +static char * ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char * operationPrefix); static void DropShardMoveSubscription(MultiConnection *connection, char *subscriptionName); static void DropShardMoveReplicationSlot(MultiConnection *connection, @@ -231,14 +227,14 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo * subscription is not ready. There is no point of locking the shards before the * subscriptions for each relation becomes ready, so wait for it. */ - WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds); + WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); /* * Wait until the subscription is caught up to changes that has happened * after the initial COPY on the shards. */ XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardMoveSubscription(targetConnection, sourcePosition, tableOwnerIds); + WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); /* * Now lets create the post-load objects, such as the indexes, constraints @@ -248,7 +244,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo CreatePostLogicalReplicationDataLoadObjects(shardList, targetNodeName, targetNodePort); sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardMoveSubscription(targetConnection, sourcePosition, tableOwnerIds); + WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); /* * We're almost done, we'll block the writes to the shards that we're @@ -261,7 +257,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo BlockWritesToShardList(shardList); sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardMoveSubscription(targetConnection, sourcePosition, tableOwnerIds); + WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); /* * We're creating the foreign constraints to reference tables after the @@ -1560,7 +1556,7 @@ escape_param_str(const char *str) /* * GetRemoteLogPosition gets the current WAL log position over the given connection. */ -static XLogRecPtr +XLogRecPtr GetRemoteLogPosition(MultiConnection *connection) { return GetRemoteLSN(connection, CURRENT_LOG_POSITION_COMMAND); @@ -1631,7 +1627,7 @@ GetRemoteLSN(MultiConnection *connection, char *command) */ void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds) + Bitmapset *tableOwnerIds, char * operationPrefix) { uint64 previousTotalRelationSizeForSubscription = 0; TimestampTz previousSizeChangeTime = GetCurrentTimestamp(); @@ -1659,7 +1655,7 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, while (true) { /* we're done, all relations are ready */ - if (RelationSubscriptionsAreReady(targetConnection, tableOwnerIds)) + if (RelationSubscriptionsAreReady(targetConnection, tableOwnerIds, operationPrefix)) { ereport(LOG, (errmsg("The states of the relations belonging to the " "subscriptions became READY on the " @@ -1669,7 +1665,7 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, break; } - char *subscriptionValueList = ShardMoveSubscriptionNamesValueList(tableOwnerIds); + char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, operationPrefix); /* Get the current total size of tables belonging to the subscriber */ uint64 currentTotalRelationSize = @@ -1823,12 +1819,12 @@ TotalRelationSizeForSubscription(MultiConnection *connection, char *command) /* - * ShardMoveSubscriptionNamesValueList returns a SQL value list containing the + * ShardSubscriptionNamesValueList returns a SQL value list containing the * subscription names for all of the given table owner ids. This value list can * be used in a query by using the IN operator. */ static char * -ShardMoveSubscriptionNamesValueList(Bitmapset *tableOwnerIds) +ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char * operationPrefix) { StringInfo subscriptionValueList = makeStringInfo(); appendStringInfoString(subscriptionValueList, "("); @@ -1846,7 +1842,7 @@ ShardMoveSubscriptionNamesValueList(Bitmapset *tableOwnerIds) first = false; } appendStringInfoString(subscriptionValueList, - quote_literal_cstr(ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX))); + quote_literal_cstr(ShardSubscriptionName(ownerId, operationPrefix))); } appendStringInfoString(subscriptionValueList, ")"); return subscriptionValueList->data; @@ -1859,11 +1855,11 @@ ShardMoveSubscriptionNamesValueList(Bitmapset *tableOwnerIds) */ static bool RelationSubscriptionsAreReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds) + Bitmapset *tableOwnerIds, char * operationPrefix) { bool raiseInterrupts = false; - char *subscriptionValueList = ShardMoveSubscriptionNamesValueList(tableOwnerIds); + char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, operationPrefix); char *query = psprintf( "SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription " "WHERE srsubid = subid AND srsubstate != 'r' AND subname IN %s", @@ -1908,14 +1904,14 @@ RelationSubscriptionsAreReady(MultiConnection *targetConnection, /* - * WaitForShardMoveSubscription waits until the last LSN reported by the subscription. + * WaitForShardSubscriptionToCatchUp waits until the last LSN reported by the subscription. * * The function errors if the target LSN doesn't increase within LogicalReplicationErrorTimeout. * The function also reports its progress in every logicalReplicationProgressReportTimeout. */ -static void -WaitForShardMoveSubscription(MultiConnection *targetConnection, XLogRecPtr sourcePosition, - Bitmapset *tableOwnerIds) +void +WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr sourcePosition, + Bitmapset *tableOwnerIds, char * operationPrefix) { XLogRecPtr previousTargetPosition = 0; TimestampTz previousLSNIncrementTime = GetCurrentTimestamp(); @@ -1931,7 +1927,7 @@ WaitForShardMoveSubscription(MultiConnection *targetConnection, XLogRecPtr sourc * a lot of memory. */ MemoryContext loopContext = AllocSetContextCreateExtended(CurrentMemoryContext, - "WaitForShardMoveSubscription", + "WaitForShardSubscriptionToCatchUp", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); @@ -1941,7 +1937,8 @@ WaitForShardMoveSubscription(MultiConnection *targetConnection, XLogRecPtr sourc while (true) { XLogRecPtr targetPosition = GetSubscriptionPosition(targetConnection, - tableOwnerIds); + tableOwnerIds, + operationPrefix); if (targetPosition >= sourcePosition) { ereport(LOG, (errmsg( @@ -2053,9 +2050,9 @@ WaitForMiliseconds(long timeout) * replication. */ static XLogRecPtr -GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds) +GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds, char * operationPrefix) { - char *subscriptionValueList = ShardMoveSubscriptionNamesValueList(tableOwnerIds); + char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, operationPrefix); return GetRemoteLSN(connection, psprintf( "SELECT min(latest_end_lsn) FROM pg_stat_subscription " "WHERE subname IN %s", subscriptionValueList)); diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index 7657dfa13..b66604ad3 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -19,6 +19,7 @@ #include "distributed/listutils.h" #include "distributed/shardsplit_logical_replication.h" #include "distributed/multi_logical_replication.h" +#include "distributed/resource_lock.h" #include "utils/builtins.h" #include "commands/dbcommands.h" @@ -38,6 +39,10 @@ CreateShardSplitSubscriptions(List * targetNodeConnectionList, List * shardSplit static void WaitForShardSplitRelationSubscriptionsBecomeReady(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList); +static void +WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, List * targetNodeConnectionList, List * shardSplitPubSubMetadataList); + + static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); @@ -227,7 +232,11 @@ ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint } -void LogicallReplicateSplitShards(WorkerNode *sourceWorkerNode, List* shardSplitPubSubMetadataList) +void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, + List* shardSplitPubSubMetadataList, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList) { char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); @@ -265,6 +274,21 @@ void LogicallReplicateSplitShards(WorkerNode *sourceWorkerNode, List* shardSplit databaseName); WaitForShardSplitRelationSubscriptionsBecomeReady(targetNodeConnectionList, shardSplitPubSubMetadataList); + + XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, targetNodeConnectionList, shardSplitPubSubMetadataList); + + CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, destinationWorkerNodesList); + + sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, targetNodeConnectionList, shardSplitPubSubMetadataList); + + BlockWritesToShardList(sourceColocatedShardIntervalList); + + sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, targetNodeConnectionList, shardSplitPubSubMetadataList); + + /*TOOD : Create foreign key constraints and handle partitioned tables*/ } @@ -375,6 +399,20 @@ WaitForShardSplitRelationSubscriptionsBecomeReady(List * targetNodeConnectionLis { Bitmapset *tableOwnerIds = NULL; tableOwnerIds = bms_add_member(tableOwnerIds, shardSplitPubSubMetadata->tableOwnerId); - WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds); + WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds, SHARD_SPLIT_SUBSCRIPTION_PREFIX); } -} \ No newline at end of file +} + +static void +WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, List * targetNodeConnectionList, List * shardSplitPubSubMetadataList) +{ + MultiConnection * targetConnection = NULL; + ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; + forboth_ptr(targetConnection, targetNodeConnectionList, + shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + Bitmapset *tableOwnerIds = NULL; + tableOwnerIds = bms_add_member(tableOwnerIds, shardSplitPubSubMetadata->tableOwnerId); + WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_SPLIT_SUBSCRIPTION_PREFIX); + } +} diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 6f7126eaa..c5a6f9612 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -26,6 +26,8 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort); +extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection); + extern void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, int sourceNodePort, char *userName, char *databaseName, @@ -33,7 +35,11 @@ CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, Oid ownerId); extern void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds); + Bitmapset *tableOwnerIds, char * operationPrefix); +extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, + XLogRecPtr sourcePosition, + Bitmapset *tableOwnerIds, + char * operationPrefix); #define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_" #define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_" diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index 042d6eb20..dbeb306bf 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -59,6 +59,8 @@ extern void SplitShard(SplitMode splitMode, /* TODO(niupre): Make all these APIs private when all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API. */ extern void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard); +extern void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, + List *workersForPlacementList); extern void DropShardList(List *shardIntervalList); #endif /* SHARDSPLIT_H_ */ diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index 7bbe55fb1..652582b6c 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -53,5 +53,9 @@ extern List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardInte List *destinationWorkerNodesList, List *replicationSlotInfoList); -extern void LogicallReplicateSplitShards(WorkerNode *sourceWorkerNode, List* shardSplitPubSubMetadataList); +extern void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, + List* shardSplitPubSubMetadataList, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList); #endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */ \ No newline at end of file diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql index 86c7af76f..d95c7007c 100644 --- a/src/test/regress/sql/citus_sameer.sql +++ b/src/test/regress/sql/citus_sameer.sql @@ -55,6 +55,8 @@ SELECT citus_split_shard_by_split_points( \c - - - :worker_2_port SET search_path TO citus_split_shard_by_split_points_negative; +SELECT * FROM pg_stat_subscription; +SELECT * FROM pg_subscription_rel; SELECT * FROM show_catalog; \c - - - :worker_1_port