Prototype: logically replicate partitioned shards

naisila/rebalancer
naisila 2022-11-04 16:08:22 +03:00
parent fcaabfdcf3
commit 177e69e086
4 changed files with 51 additions and 18 deletions

View File

@ -1633,6 +1633,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
sourceConnection,
publicationInfoHash,
logicalRepTargetList,
logicalRepTargetList,
groupedLogicalRepTargetsHash,
SHARD_SPLIT);

View File

@ -1209,6 +1209,35 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa
MemoryContextSwitchTo(oldContext);
/*
* We should recreate partitioning hierarchy between shards.
*/
List *attachPartitionCommands = NIL;
foreach_ptr(shardInterval, shardIntervalList)
{
if (PartitionTable(shardInterval->relationId))
{
char *attachPartitionCommand =
GenerateAttachShardPartitionCommand(shardInterval);
ShardCommandList *shardCommandList = CreateShardCommandList(
shardInterval,
list_make1(attachPartitionCommand));
attachPartitionCommands = lappend(attachPartitionCommands,
shardCommandList);
}
}
/* Now execute the Partitioning creation commads. */
ShardCommandList *attachPartitionCommand = NULL;
foreach_ptr(attachPartitionCommand, attachPartitionCommands)
{
char *tableOwner = TableOwner(attachPartitionCommand->shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner,
attachPartitionCommand->ddlCommandList);
}
/* data copy is done seperately when logical replication is used */
LogicallyReplicateShards(shardIntervalList, sourceNodeName,
sourceNodePort, targetNodeName, targetNodePort);

View File

@ -205,13 +205,23 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
targetNode, replicationSubscriptionList);
List *logicalRepTargetList = CreateShardMoveLogicalRepTargetList(publicationInfoHash,
shardList);
replicationSubscriptionList);
HTAB *publicationInfoHash2 = CreateShardMovePublicationInfoHash(
targetNode, replicationSubscriptionList);
List *logicalRepTargetList2 = CreateShardMoveLogicalRepTargetList(
publicationInfoHash2,
shardList);
HTAB *groupedLogicalRepTargetsHash = CreateGroupedLogicalRepTargetsHash(
logicalRepTargetList);
HTAB *groupedLogicalRepTargetsHash2 = CreateGroupedLogicalRepTargetsHash(
logicalRepTargetList2);
CreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash, superUser,
databaseName);
CreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash2, superUser,
databaseName);
PG_TRY();
{
@ -277,6 +287,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
sourceConnection,
publicationInfoHash,
logicalRepTargetList,
logicalRepTargetList2,
groupedLogicalRepTargetsHash,
SHARD_MOVE);
@ -370,6 +381,7 @@ CompleteNonBlockingShardTransfer(List *shardList,
MultiConnection *sourceConnection,
HTAB *publicationInfoHash,
List *logicalRepTargetList,
List *logicalRepTargetList2,
HTAB *groupedLogicalRepTargetsHash,
LogicalRepType type)
{
@ -399,7 +411,7 @@ CompleteNonBlockingShardTransfer(List *shardList,
* and partitioning hierarchy. Once they are done, wait until the replication
* catches up again. So we don't block writes too long.
*/
CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type);
CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList2, type);
UpdatePlacementUpdateStatusForShardIntervalList(
shardList,
@ -444,7 +456,7 @@ CompleteNonBlockingShardTransfer(List *shardList,
* the constraints earlier. The same is true for foreign keys between
* tables owned by different users.
*/
CreateUncheckedForeignKeyConstraints(logicalRepTargetList);
CreateUncheckedForeignKeyConstraints(logicalRepTargetList2);
}
UpdatePlacementUpdateStatusForShardIntervalList(
@ -643,10 +655,9 @@ DropAllLogicalReplicationLeftovers(LogicalRepType type)
/*
* PrepareReplicationSubscriptionList returns list of shards to be logically
* replicated from given shard list. This is needed because Postgres does not
* allow logical replication on partitioned tables, therefore shards belonging
* to a partitioned tables should be exluded from logical replication
* subscription list.
* replicated from given shard list. This is needed because we exclude shards
* belonging to partition tables here as they will be automatically
* logically replicated if their parent shard is logically replicated.
*/
static List *
PrepareReplicationSubscriptionList(List *shardList)
@ -657,9 +668,9 @@ PrepareReplicationSubscriptionList(List *shardList)
foreach(shardCell, shardList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell);
if (!PartitionedTable(shardInterval->relationId))
if (!PartitionTable(shardInterval->relationId))
{
/* only add regular and child tables to subscription */
/* only add regular and partitioned parent tables to subscription */
replicationSubscriptionList = lappend(replicationSubscriptionList,
shardInterval);
}
@ -826,15 +837,6 @@ CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList,
* statistics that should be created after the data move.
*/
ExecuteRemainingPostLoadTableCommands(logicalRepTargetList);
/*
* Creating the partitioning hierarchy errors out in shard splits when
*/
if (type != SHARD_SPLIT)
{
/* create partitioning hierarchy, if any */
CreatePartitioningHierarchy(logicalRepTargetList);
}
}

View File

@ -178,6 +178,7 @@ extern void CompleteNonBlockingShardTransfer(List *shardList,
MultiConnection *sourceConnection,
HTAB *publicationInfoHash,
List *logicalRepTargetList,
List *logicalRepTargetList2,
HTAB *groupedLogicalRepTargetsHash,
LogicalRepType type);
extern void CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList);