From 177e69e086c136083a2dad9d439f052945d5aa34 Mon Sep 17 00:00:00 2001 From: naisila Date: Fri, 4 Nov 2022 16:08:22 +0300 Subject: [PATCH] Prototype: logically replicate partitioned shards --- .../distributed/operations/shard_split.c | 1 + .../distributed/operations/shard_transfer.c | 29 ++++++++++++++ .../replication/multi_logical_replication.c | 38 ++++++++++--------- .../distributed/multi_logical_replication.h | 1 + 4 files changed, 51 insertions(+), 18 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index e62412c7c..982d2dc49 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1633,6 +1633,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, sourceConnection, publicationInfoHash, logicalRepTargetList, + logicalRepTargetList, groupedLogicalRepTargetsHash, SHARD_SPLIT); diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 81bef791a..a49a8594e 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -1209,6 +1209,35 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa MemoryContextSwitchTo(oldContext); + /* + * We should recreate partitioning hierarchy between shards. + */ + List *attachPartitionCommands = NIL; + foreach_ptr(shardInterval, shardIntervalList) + { + if (PartitionTable(shardInterval->relationId)) + { + char *attachPartitionCommand = + GenerateAttachShardPartitionCommand(shardInterval); + + ShardCommandList *shardCommandList = CreateShardCommandList( + shardInterval, + list_make1(attachPartitionCommand)); + attachPartitionCommands = lappend(attachPartitionCommands, + shardCommandList); + } + } + + /* Now execute the Partitioning creation commads. */ + ShardCommandList *attachPartitionCommand = NULL; + foreach_ptr(attachPartitionCommand, attachPartitionCommands) + { + char *tableOwner = TableOwner(attachPartitionCommand->shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, + attachPartitionCommand->ddlCommandList); + } + /* data copy is done seperately when logical replication is used */ LogicallyReplicateShards(shardIntervalList, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort); diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 1edd48c48..f46f8cd7d 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -205,13 +205,23 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo targetNode, replicationSubscriptionList); List *logicalRepTargetList = CreateShardMoveLogicalRepTargetList(publicationInfoHash, - shardList); + replicationSubscriptionList); + HTAB *publicationInfoHash2 = CreateShardMovePublicationInfoHash( + targetNode, replicationSubscriptionList); + List *logicalRepTargetList2 = CreateShardMoveLogicalRepTargetList( + publicationInfoHash2, + shardList); HTAB *groupedLogicalRepTargetsHash = CreateGroupedLogicalRepTargetsHash( logicalRepTargetList); + HTAB *groupedLogicalRepTargetsHash2 = CreateGroupedLogicalRepTargetsHash( + logicalRepTargetList2); + CreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash, superUser, databaseName); + CreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash2, superUser, + databaseName); PG_TRY(); { @@ -277,6 +287,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo sourceConnection, publicationInfoHash, logicalRepTargetList, + logicalRepTargetList2, groupedLogicalRepTargetsHash, SHARD_MOVE); @@ -370,6 +381,7 @@ CompleteNonBlockingShardTransfer(List *shardList, MultiConnection *sourceConnection, HTAB *publicationInfoHash, List *logicalRepTargetList, + List *logicalRepTargetList2, HTAB *groupedLogicalRepTargetsHash, LogicalRepType type) { @@ -399,7 +411,7 @@ CompleteNonBlockingShardTransfer(List *shardList, * and partitioning hierarchy. Once they are done, wait until the replication * catches up again. So we don't block writes too long. */ - CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type); + CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList2, type); UpdatePlacementUpdateStatusForShardIntervalList( shardList, @@ -444,7 +456,7 @@ CompleteNonBlockingShardTransfer(List *shardList, * the constraints earlier. The same is true for foreign keys between * tables owned by different users. */ - CreateUncheckedForeignKeyConstraints(logicalRepTargetList); + CreateUncheckedForeignKeyConstraints(logicalRepTargetList2); } UpdatePlacementUpdateStatusForShardIntervalList( @@ -643,10 +655,9 @@ DropAllLogicalReplicationLeftovers(LogicalRepType type) /* * PrepareReplicationSubscriptionList returns list of shards to be logically - * replicated from given shard list. This is needed because Postgres does not - * allow logical replication on partitioned tables, therefore shards belonging - * to a partitioned tables should be exluded from logical replication - * subscription list. + * replicated from given shard list. This is needed because we exclude shards + * belonging to partition tables here as they will be automatically + * logically replicated if their parent shard is logically replicated. */ static List * PrepareReplicationSubscriptionList(List *shardList) @@ -657,9 +668,9 @@ PrepareReplicationSubscriptionList(List *shardList) foreach(shardCell, shardList) { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell); - if (!PartitionedTable(shardInterval->relationId)) + if (!PartitionTable(shardInterval->relationId)) { - /* only add regular and child tables to subscription */ + /* only add regular and partitioned parent tables to subscription */ replicationSubscriptionList = lappend(replicationSubscriptionList, shardInterval); } @@ -826,15 +837,6 @@ CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, * statistics that should be created after the data move. */ ExecuteRemainingPostLoadTableCommands(logicalRepTargetList); - - /* - * Creating the partitioning hierarchy errors out in shard splits when - */ - if (type != SHARD_SPLIT) - { - /* create partitioning hierarchy, if any */ - CreatePartitioningHierarchy(logicalRepTargetList); - } } diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 1db36402b..3e8497e15 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -178,6 +178,7 @@ extern void CompleteNonBlockingShardTransfer(List *shardList, MultiConnection *sourceConnection, HTAB *publicationInfoHash, List *logicalRepTargetList, + List *logicalRepTargetList2, HTAB *groupedLogicalRepTargetsHash, LogicalRepType type); extern void CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList);