From a2d86214b28c1f73b968d39ad4731cbec8f6f71d Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 9 Sep 2022 16:45:38 +0200 Subject: [PATCH] Share more replication code between moves and splits (#6310) The logical replication catchup part for shard splits and shard moves is very similar. This abstracts most of that similarity away into a single function. This also improves the logic for non blocking shard splits a bit, by using faster foreign key creation. It also parallelizes index creation which shard moves were already doing, but shard splits did not. --- .../distributed/operations/shard_split.c | 89 +-- .../replication/multi_logical_replication.c | 563 ++++++++++-------- .../distributed/multi_logical_replication.h | 8 + .../failure_tenant_isolation_nonblocking.out | 27 - .../failure_tenant_isolation_nonblocking.sql | 12 - 5 files changed, 340 insertions(+), 359 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 2f1f29868..7386f1555 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -132,8 +132,9 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList, uint32 colocationId); static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static void CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList, - List *workersForPlacementList); +static void CreatePartitioningHierarchyForBlockingSplit( + List *shardGroupSplitIntervalListList, + List *workersForPlacementList); static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode); @@ -630,8 +631,9 @@ BlockingShardSplit(SplitOperation splitOperation, workersForPlacementList); /* create partitioning hierarchy, if any */ - CreatePartitioningHierarchy(shardGroupSplitIntervalListList, - workersForPlacementList); + CreatePartitioningHierarchyForBlockingSplit( + shardGroupSplitIntervalListList, + workersForPlacementList); /* * Create foreign keys if exists after the metadata changes happening in @@ -1218,8 +1220,8 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, * hierarchy between the shardList, if any. */ static void -CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList, - List *workersForPlacementList) +CreatePartitioningHierarchyForBlockingSplit(List *shardGroupSplitIntervalListList, + List *workersForPlacementList) { /* Create partition heirarchy between shards */ List *shardIntervalList = NIL; @@ -1610,51 +1612,18 @@ NonBlockingShardSplit(SplitOperation splitOperation, snapshot, distributionColumnOverrides); /* - * 9) Create replica identities, this needs to be done before enabling - * the subscriptions. + * 9) Logically replicate all the changes and do most of the table DDL, + * like index and foreign key creation. */ - CreateReplicaIdentities(logicalRepTargetList); + CompleteNonBlockingShardTransfer(sourceColocatedShardIntervalList, + sourceConnection, + publicationInfoHash, + logicalRepTargetList, + groupedLogicalRepTargetsHash, + SHARD_SPLIT); /* - * 10) Enable the subscriptions: Start the catchup phase - */ - EnableSubscriptions(logicalRepTargetList); - - /* 11) Wait for subscriptions to be ready */ - WaitForAllSubscriptionsToBecomeReady(groupedLogicalRepTargetsHash); - - /* 12) Wait for subscribers to catchup till source LSN */ - WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); - - /* 13) Create Auxilary structures */ - CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, - workersForPlacementList, - false /* includeReplicaIdentity*/); - - /* 14) Wait for subscribers to catchup till source LSN */ - WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); - - /* Used for testing */ - ConflictOnlyWithIsolationTesting(); - - /* 15) Block writes on source shards */ - BlockWritesToShardList(sourceColocatedShardIntervalList); - - /* 16) Wait for subscribers to catchup till source LSN */ - WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); - - /* 17) Drop Subscribers */ - DropSubscriptions(logicalRepTargetList); - - /* 18) Drop replication slots - */ - DropReplicationSlots(sourceConnection, logicalRepTargetList); - - /* 19) Drop Publications */ - DropPublications(sourceConnection, publicationInfoHash); - - /* - * 20) Delete old shards metadata and either mark the shards as + * 10) Delete old shards metadata and either mark the shards as * to be deferred drop or physically delete them. * Have to do that before creating the new shard metadata, * because there's cross-checks preventing inconsistent metadata @@ -1672,7 +1641,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, DropShardListMetadata(sourceColocatedShardIntervalList); /* - * 21) In case of create_distributed_table_concurrently, which converts + * 11) In case of create_distributed_table_concurrently, which converts * a Citus local table to a distributed table, update the distributed * table metadata now. * @@ -1704,34 +1673,36 @@ NonBlockingShardSplit(SplitOperation splitOperation, targetColocationId); } - /* 22) Insert new shard and placement metdata */ + /* 12) Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); - CreatePartitioningHierarchy(shardGroupSplitIntervalListList, - workersForPlacementList); + /* 13) create partitioning hierarchy, if any, this needs to be done + * after the metadata is correct, because it fails for some + * uninvestigated reason otherwise. + */ + CreatePartitioningHierarchy(logicalRepTargetList); /* - * 23) Create foreign keys if exists after the metadata changes happening in + * 14) Create foreign keys if exists after the metadata changes happening in * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign * key creation depends on the new metadata. */ - CreateForeignKeyConstraints(shardGroupSplitIntervalListList, - workersForPlacementList); + CreateUncheckedForeignKeyConstraints(logicalRepTargetList); /* - * 24) Release shared memory allocated by worker_split_shard_replication_setup udf + * 15) Release shared memory allocated by worker_split_shard_replication_setup udf * at source node. */ ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode); - /* 25) Close source connection */ + /* 16) Close source connection */ CloseConnection(sourceConnection); - /* 26) Close all subscriber connections */ + /* 17) Close all subscriber connections */ CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash); - /* 27) Close connection of template replication slot */ + /* 18) Close connection of template replication slot */ CloseConnection(sourceReplicationConnection); } PG_CATCH(); diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 3f1c429f0..bc1a69a3d 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -114,31 +114,20 @@ bool PlacementMovedUsingLogicalReplicationInTX = false; static int logicalReplicationProgressReportTimeout = 10 * 1000; -static void CreateForeignKeyConstraints(List *logicalRepTargetList); static List * PrepareReplicationSubscriptionList(List *shardList); static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId); static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId, uint64 shardId); -static void CreatePostLogicalReplicationDataLoadObjects(List *shardList, - char *targetNodeName, - int32 targetNodePort); -static void ExecuteCreateIndexCommands(List *shardList, char *targetNodeName, - int targetNodePort); -static void ExecuteCreateConstraintsBackedByIndexCommands(List *shardList, - char *targetNodeName, - int targetNodePort); +static void CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, + LogicalRepType type); +static void ExecuteCreateIndexCommands(List *logicalRepTargetList); +static void ExecuteCreateConstraintsBackedByIndexCommands(List *logicalRepTargetList); static List * ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList, - uint64 shardId, char *targetNodeName, int targetNodePort); -static void ExecuteClusterOnCommands(List *shardList, char *targetNodeName, - int targetNodePort); -static void ExecuteCreateIndexStatisticsCommands(List *shardList, char *targetNodeName, - int targetNodePort); -static void ExecuteRemainingPostLoadTableCommands(List *shardList, char *targetNodeName, - int targetNodePort); -static void CreatePartitioningHierarchy(List *shardList, char *targetNodeName, - int targetNodePort); +static void ExecuteClusterOnCommands(List *logicalRepTargetList); +static void ExecuteCreateIndexStatisticsCommands(List *logicalRepTargetList); +static void ExecuteRemainingPostLoadTableCommands(List *logicalRepTargetList); static char * escape_param_str(const char *str); static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); static bool RelationSubscriptionsAreReady( @@ -208,10 +197,6 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo */ ClaimConnectionExclusively(sourceConnection); - - MultiConnection *sourceReplicationConnection = - GetReplicationConnection(sourceNodeName, sourceNodePort); - WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); @@ -229,6 +214,9 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo PG_TRY(); { + MultiConnection *sourceReplicationConnection = + GetReplicationConnection(sourceConnection->hostname, sourceConnection->port); + /* set up the publication on the source and subscription on the target */ CreatePublications(sourceConnection, publicationInfoHash); char *snapshot = CreateReplicationSlots( @@ -239,7 +227,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo CreateSubscriptions( sourceConnection, - databaseName, + sourceConnection->database, logicalRepTargetList); /* only useful for isolation testing, see the function comment for the details */ @@ -256,77 +244,14 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo CloseConnection(sourceReplicationConnection); /* - * We have to create the primary key (or any other replica identity) - * before the update/delete operations that are queued will be - * replicated. Because if the replica identity does not exist on the - * target, the replication would fail. - * - * So we it right after the initial data COPY, but before enabling the - * susbcriptions. We do it at this latest possible moment, because its - * much cheaper to build an index at once than to create it - * incrementally. So this way we create the primary key index in one go - * for all data from the initial COPY. + * Start the replication and copy all data */ - CreateReplicaIdentities(logicalRepTargetList); - - /* Start applying the changes from the replication slots to catch up. */ - EnableSubscriptions(logicalRepTargetList); - - /* - * The following check is a leftover from when used subscriptions with - * copy_data=true. It's probably not really necessary anymore, but it - * seemed like a nice check to keep. At least for debugging issues it - * seems nice to report differences between the subscription never - * becoming ready and the subscriber not applying WAL. It's also not - * entirely clear if the catchup check handles the case correctly where - * the subscription is not in the ready state yet, because so far it - * never had to. - */ - WaitForAllSubscriptionsToBecomeReady(groupedLogicalRepTargetsHash); - - /* - * Wait until all the subscriptions are caught up to changes that - * happened after the initial COPY on the shards. - */ - WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); - - /* - * Now lets create the post-load objects, such as the indexes, constraints - * and partitioning hierarchy. Once they are done, wait until the replication - * catches up again. So we don't block writes too long. - */ - CreatePostLogicalReplicationDataLoadObjects(shardList, targetNodeName, - targetNodePort); - WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); - - /* - * We're almost done, we'll block the writes to the shards that we're - * replicating and expect all the subscription to catch up quickly - * afterwards. - * - * Notice that although shards in partitioned relation are excluded from - * logical replication, they are still locked against modification, and - * foreign constraints are created on them too. - */ - BlockWritesToShardList(shardList); - - WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); - - /* - * We're creating the foreign constraints to reference tables after the - * data is already replicated and all the necessary locks are acquired. - * - * We prefer to do it here because the placements of reference tables - * are always valid, and any modification during the shard move would - * cascade to the hash distributed tables' shards if we had created - * the constraints earlier. - */ - CreateForeignKeyConstraints(logicalRepTargetList); - - /* we're done, cleanup the publication and subscription */ - DropSubscriptions(logicalRepTargetList); - DropReplicationSlots(sourceConnection, logicalRepTargetList); - DropPublications(sourceConnection, publicationInfoHash); + CompleteNonBlockingShardTransfer(shardList, + sourceConnection, + publicationInfoHash, + logicalRepTargetList, + groupedLogicalRepTargetsHash, + SHARD_MOVE); /* * We use these connections exclusively for subscription management, @@ -405,6 +330,104 @@ CreateGroupedLogicalRepTargetsHash(List *logicalRepTargetList) } +/* + * CompleteNonBlockingShardTransfer uses logical replication to apply the changes + * made on the source to the target. It also runs all DDL on the target shards + * that need to be run after the data copy. + * + * For shard splits it skips the partition hierarchy and foreign key creation + * though, since those need to happen after the metadata is updated. + */ +void +CompleteNonBlockingShardTransfer(List *shardList, + MultiConnection *sourceConnection, + HTAB *publicationInfoHash, + List *logicalRepTargetList, + HTAB *groupedLogicalRepTargetsHash, + LogicalRepType type) +{ + /* + * We have to create the primary key (or any other replica identity) + * before the update/delete operations that are queued will be + * replicated. Because if the replica identity does not exist on the + * target, the replication would fail. + * + * So we it right after the initial data COPY, but before enabling the + * susbcriptions. We do it at this latest possible moment, because its + * much cheaper to build an index at once than to create it + * incrementally. So this way we create the primary key index in one go + * for all data from the initial COPY. + */ + CreateReplicaIdentities(logicalRepTargetList); + + /* Start applying the changes from the replication slots to catch up. */ + EnableSubscriptions(logicalRepTargetList); + + /* + * The following check is a leftover from when used subscriptions with + * copy_data=true. It's probably not really necessary anymore, but it + * seemed like a nice check to keep. At least for debugging issues it + * seems nice to report differences between the subscription never + * becoming ready and the subscriber not applying WAL. It's also not + * entirely clear if the catchup check handles the case correctly where + * the subscription is not in the ready state yet, because so far it + * never had to. + */ + WaitForAllSubscriptionsToBecomeReady(groupedLogicalRepTargetsHash); + + /* + * Wait until all the subscriptions are caught up to changes that + * happened after the initial COPY on the shards. + */ + WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); + + /* + * Now lets create the post-load objects, such as the indexes, constraints + * and partitioning hierarchy. Once they are done, wait until the replication + * catches up again. So we don't block writes too long. + */ + CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type); + WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); + + + /* only useful for isolation testing, see the function comment for the details */ + ConflictOnlyWithIsolationTesting(); + + /* + * We're almost done, we'll block the writes to the shards that we're + * replicating and expect all the subscription to catch up quickly + * afterwards. + * + * Notice that although shards in partitioned relation are excluded from + * logical replication, they are still locked against modification, and + * foreign constraints are created on them too. + */ + BlockWritesToShardList(shardList); + + WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); + + if (type != SHARD_SPLIT) + { + /* + * We're creating the foreign constraints to reference tables after the + * data is already replicated and all the necessary locks are acquired. + * + * We prefer to do it here because the placements of reference tables + * are always valid, and any modification during the shard move would + * cascade to the hash distributed tables' shards if we had created + * the constraints earlier. The same is true for foreign keys between + * tables owned by different users. + */ + CreateUncheckedForeignKeyConstraints(logicalRepTargetList); + } + + /* we're done, cleanup the publication and subscription */ + DropSubscriptions(logicalRepTargetList); + DropReplicationSlots(sourceConnection, logicalRepTargetList); + DropPublications(sourceConnection, publicationInfoHash); +} + + /* * CreateShardMovePublicationInfoHash creates hashmap of PublicationInfos for a * shard move. Even though we only support moving a shard to a single target @@ -742,8 +765,8 @@ GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId) * the objects that can be created after the data is moved with logical replication. */ static void -CreatePostLogicalReplicationDataLoadObjects(List *shardList, char *targetNodeName, - int32 targetNodePort) +CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList, + LogicalRepType type) { /* * We create indexes in 4 steps. @@ -759,20 +782,25 @@ CreatePostLogicalReplicationDataLoadObjects(List *shardList, char *targetNodeNam * table and setting the statistics of indexes, depends on the indexes being * created. That's why the execution is divided into four distinct stages. */ - ExecuteCreateIndexCommands(shardList, targetNodeName, targetNodePort); - ExecuteCreateConstraintsBackedByIndexCommands(shardList, targetNodeName, - targetNodePort); - ExecuteClusterOnCommands(shardList, targetNodeName, targetNodePort); - ExecuteCreateIndexStatisticsCommands(shardList, targetNodeName, targetNodePort); + ExecuteCreateIndexCommands(logicalRepTargetList); + ExecuteCreateConstraintsBackedByIndexCommands(logicalRepTargetList); + ExecuteClusterOnCommands(logicalRepTargetList); + ExecuteCreateIndexStatisticsCommands(logicalRepTargetList); /* * Once the indexes are created, there are few more objects like triggers and table * statistics that should be created after the data move. */ - ExecuteRemainingPostLoadTableCommands(shardList, targetNodeName, targetNodePort); + ExecuteRemainingPostLoadTableCommands(logicalRepTargetList); - /* create partitioning hierarchy, if any */ - CreatePartitioningHierarchy(shardList, targetNodeName, targetNodePort); + /* + * Creating the partitioning hierarchy errors out in shard splits when + */ + if (type != SHARD_SPLIT) + { + /* create partitioning hierarchy, if any */ + CreatePartitioningHierarchy(logicalRepTargetList); + } } @@ -784,27 +812,31 @@ CreatePostLogicalReplicationDataLoadObjects(List *shardList, char *targetNodeNam * commands fail. */ static void -ExecuteCreateIndexCommands(List *shardList, char *targetNodeName, int targetNodePort) +ExecuteCreateIndexCommands(List *logicalRepTargetList) { List *taskList = NIL; - ListCell *shardCell = NULL; - foreach(shardCell, shardList) + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell); - Oid relationId = shardInterval->relationId; + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, target->newShards) + { + Oid relationId = shardInterval->relationId; - List *tableCreateIndexCommandList = - GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(relationId, - INCLUDE_CREATE_INDEX_STATEMENTS); + List *tableCreateIndexCommandList = + GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(relationId, + INCLUDE_CREATE_INDEX_STATEMENTS); - List *shardCreateIndexCommandList = - WorkerApplyShardDDLCommandList(tableCreateIndexCommandList, - shardInterval->shardId); - List *taskListForShard = - ConvertNonExistingPlacementDDLCommandsToTasks(shardCreateIndexCommandList, - shardInterval->shardId, - targetNodeName, targetNodePort); - taskList = list_concat(taskList, taskListForShard); + List *shardCreateIndexCommandList = + WorkerApplyShardDDLCommandList(tableCreateIndexCommandList, + shardInterval->shardId); + List *taskListForShard = + ConvertNonExistingPlacementDDLCommandsToTasks( + shardCreateIndexCommandList, + target->superuserConnection->hostname, + target->superuserConnection->port); + taskList = list_concat(taskList, taskListForShard); + } } /* @@ -819,8 +851,7 @@ ExecuteCreateIndexCommands(List *shardList, char *targetNodeName, int targetNode */ ereport(DEBUG1, (errmsg("Creating post logical replication objects " - "(indexes) on node %s:%d", targetNodeName, - targetNodePort))); + "(indexes)"))); ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, taskList, MaxAdaptiveExecutorPoolSize, @@ -836,45 +867,47 @@ ExecuteCreateIndexCommands(List *shardList, char *targetNodeName, int targetNode * commands fail. */ static void -ExecuteCreateConstraintsBackedByIndexCommands(List *shardList, char *targetNodeName, - int targetNodePort) +ExecuteCreateConstraintsBackedByIndexCommands(List *logicalRepTargetList) { ereport(DEBUG1, (errmsg("Creating post logical replication objects " - "(constraints backed by indexes) on node %s:%d", - targetNodeName, - targetNodePort))); + "(constraints backed by indexes)"))); MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CreateConstraintsBackedByIndexContext", ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); - ListCell *shardCell = NULL; - foreach(shardCell, shardList) + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell); - Oid relationId = shardInterval->relationId; - - List *tableCreateConstraintCommandList = - GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(relationId, - INCLUDE_CREATE_CONSTRAINT_STATEMENTS); - - if (tableCreateConstraintCommandList == NIL) + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, target->newShards) { - /* no constraints backed by indexes, skip */ + Oid relationId = shardInterval->relationId; + + List *tableCreateConstraintCommandList = + GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(relationId, + INCLUDE_CREATE_CONSTRAINT_STATEMENTS); + + if (tableCreateConstraintCommandList == NIL) + { + /* no constraints backed by indexes, skip */ + MemoryContextReset(localContext); + continue; + } + + List *shardCreateConstraintCommandList = + WorkerApplyShardDDLCommandList(tableCreateConstraintCommandList, + shardInterval->shardId); + + char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction( + target->superuserConnection->hostname, + target->superuserConnection->port, + tableOwner, + shardCreateConstraintCommandList); MemoryContextReset(localContext); - continue; } - - List *shardCreateConstraintCommandList = - WorkerApplyShardDDLCommandList(tableCreateConstraintCommandList, - shardInterval->shardId); - - char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardCreateConstraintCommandList); - MemoryContextReset(localContext); } MemoryContextSwitchTo(oldContext); @@ -890,7 +923,6 @@ ExecuteCreateConstraintsBackedByIndexCommands(List *shardList, char *targetNodeN */ static List * ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList, - uint64 shardId, char *targetNodeName, int targetNodePort) { @@ -911,7 +943,6 @@ ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList, SetPlacementNodeMetadata(taskPlacement, workerNode); task->taskPlacementList = list_make1(taskPlacement); - task->anchorShardId = shardId; taskList = lappend(taskList, task); taskId++; @@ -929,34 +960,36 @@ ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList, * is aborted. */ static void -ExecuteClusterOnCommands(List *shardList, char *targetNodeName, int targetNodePort) +ExecuteClusterOnCommands(List *logicalRepTargetList) { List *taskList = NIL; - ListCell *shardCell; - foreach(shardCell, shardList) + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell); - Oid relationId = shardInterval->relationId; + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, target->newShards) + { + Oid relationId = shardInterval->relationId; - List *tableAlterTableClusterOnCommandList = - GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(relationId, - INCLUDE_INDEX_CLUSTERED_STATEMENTS); + List *tableAlterTableClusterOnCommandList = + GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(relationId, + INCLUDE_INDEX_CLUSTERED_STATEMENTS); - List *shardAlterTableClusterOnCommandList = - WorkerApplyShardDDLCommandList(tableAlterTableClusterOnCommandList, - shardInterval->shardId); + List *shardAlterTableClusterOnCommandList = + WorkerApplyShardDDLCommandList(tableAlterTableClusterOnCommandList, + shardInterval->shardId); - List *taskListForShard = - ConvertNonExistingPlacementDDLCommandsToTasks( - shardAlterTableClusterOnCommandList, - shardInterval->shardId, - targetNodeName, targetNodePort); - taskList = list_concat(taskList, taskListForShard); + List *taskListForShard = + ConvertNonExistingPlacementDDLCommandsToTasks( + shardAlterTableClusterOnCommandList, + target->superuserConnection->hostname, + target->superuserConnection->port); + taskList = list_concat(taskList, taskListForShard); + } } ereport(DEBUG1, (errmsg("Creating post logical replication objects " - "(CLUSTER ON) on node %s:%d", targetNodeName, - targetNodePort))); + "(CLUSTER ON)"))); ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, taskList, MaxAdaptiveExecutorPoolSize, @@ -972,48 +1005,51 @@ ExecuteClusterOnCommands(List *shardList, char *targetNodeName, int targetNodePo * is aborted. */ static void -ExecuteCreateIndexStatisticsCommands(List *shardList, char *targetNodeName, int - targetNodePort) +ExecuteCreateIndexStatisticsCommands(List *logicalRepTargetList) { ereport(DEBUG1, (errmsg("Creating post logical replication objects " - "(index statistics) on node %s:%d", targetNodeName, - targetNodePort))); + "(index statistics)"))); MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CreateIndexStatisticsContext", ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); - ListCell *shardCell; - foreach(shardCell, shardList) + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell); - Oid relationId = shardInterval->relationId; - - List *tableAlterIndexSetStatisticsCommandList = - GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(relationId, - INCLUDE_INDEX_STATISTICS_STATEMENTTS); - List *shardAlterIndexSetStatisticsCommandList = - WorkerApplyShardDDLCommandList(tableAlterIndexSetStatisticsCommandList, - shardInterval->shardId); - - if (shardAlterIndexSetStatisticsCommandList == NIL) + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, target->newShards) { - /* no index statistics exists, skip */ + Oid relationId = shardInterval->relationId; + + List *tableAlterIndexSetStatisticsCommandList = + GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(relationId, + INCLUDE_INDEX_STATISTICS_STATEMENTTS); + List *shardAlterIndexSetStatisticsCommandList = + WorkerApplyShardDDLCommandList(tableAlterIndexSetStatisticsCommandList, + shardInterval->shardId); + + if (shardAlterIndexSetStatisticsCommandList == NIL) + { + /* no index statistics exists, skip */ + MemoryContextReset(localContext); + continue; + } + + /* + * These remaining operations do not require significant resources, so no + * need to create them in parallel. + */ + char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction( + target->superuserConnection->hostname, + target->superuserConnection->port, + tableOwner, + shardAlterIndexSetStatisticsCommandList); + MemoryContextReset(localContext); - continue; } - - /* - * These remaining operations do not require significant resources, so no - * need to create them in parallel. - */ - char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardAlterIndexSetStatisticsCommandList); - - MemoryContextReset(localContext); } MemoryContextSwitchTo(oldContext); @@ -1026,52 +1062,55 @@ ExecuteCreateIndexStatisticsCommands(List *shardList, char *targetNodeName, int * in the given target node. */ static void -ExecuteRemainingPostLoadTableCommands(List *shardList, char *targetNodeName, int - targetNodePort) +ExecuteRemainingPostLoadTableCommands(List *logicalRepTargetList) { ereport(DEBUG1, (errmsg("Creating post logical replication objects " - "(triggers and table statistics) on node %s:%d", - targetNodeName, - targetNodePort))); + "(triggers and table statistics)" + ))); MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CreateTableStatisticsContext", ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); - ListCell *shardCell = NULL; - foreach(shardCell, shardList) + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell); - Oid relationId = shardInterval->relationId; - - bool includeIndexes = false; - bool includeReplicaIdentity = false; - - List *tablePostLoadTableCommandList = - GetPostLoadTableCreationCommands(relationId, includeIndexes, - includeReplicaIdentity); - - List *shardPostLoadTableCommandList = - WorkerApplyShardDDLCommandList(tablePostLoadTableCommandList, - shardInterval->shardId); - - if (shardPostLoadTableCommandList == NIL) + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, target->newShards) { - /* no index statistics exists, skip */ - continue; + Oid relationId = shardInterval->relationId; + + bool includeIndexes = false; + bool includeReplicaIdentity = false; + + List *tablePostLoadTableCommandList = + GetPostLoadTableCreationCommands(relationId, includeIndexes, + includeReplicaIdentity); + + List *shardPostLoadTableCommandList = + WorkerApplyShardDDLCommandList(tablePostLoadTableCommandList, + shardInterval->shardId); + + if (shardPostLoadTableCommandList == NIL) + { + /* no index statistics exists, skip */ + continue; + } + + /* + * These remaining operations do not require significant resources, so no + * need to create them in parallel. + */ + char *tableOwner = TableOwner(shardInterval->relationId); + SendCommandListToWorkerOutsideTransaction( + target->superuserConnection->hostname, + target->superuserConnection->port, + tableOwner, + shardPostLoadTableCommandList); + + MemoryContextReset(localContext); } - - /* - * These remaining operations do not require significant resources, so no - * need to create them in parallel. - */ - char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardPostLoadTableCommandList); - - MemoryContextReset(localContext); } MemoryContextSwitchTo(oldContext); @@ -1082,40 +1121,42 @@ ExecuteRemainingPostLoadTableCommands(List *shardList, char *targetNodeName, int * CreatePartitioningHierarchy gets a shardList and creates the partitioning * hierarchy between the shardList, if any, */ -static void -CreatePartitioningHierarchy(List *shardList, char *targetNodeName, int targetNodePort) +void +CreatePartitioningHierarchy(List *logicalRepTargetList) { ereport(DEBUG1, (errmsg("Creating post logical replication objects " - "(partitioning hierarchy) on node %s:%d", targetNodeName, - targetNodePort))); + "(partitioning hierarchy)"))); MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CreatePartitioningHierarchy", ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); - ListCell *shardCell = NULL; - foreach(shardCell, shardList) + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell); - - if (PartitionTable(shardInterval->relationId)) + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, target->newShards) { - char *attachPartitionCommand = - GenerateAttachShardPartitionCommand(shardInterval); + if (PartitionTable(shardInterval->relationId)) + { + char *attachPartitionCommand = + GenerateAttachShardPartitionCommand(shardInterval); - char *tableOwner = TableOwner(shardInterval->relationId); + char *tableOwner = TableOwner(shardInterval->relationId); - /* - * Attaching partition may acquire conflicting locks when created in - * parallel, so create them sequentially. Also attaching partition - * is a quick operation, so it is fine to execute sequentially. - */ - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - list_make1( - attachPartitionCommand)); - MemoryContextReset(localContext); + /* + * Attaching partition may acquire conflicting locks when created in + * parallel, so create them sequentially. Also attaching partition + * is a quick operation, so it is fine to execute sequentially. + */ + SendCommandListToWorkerOutsideTransaction( + target->superuserConnection->hostname, + target->superuserConnection->port, + tableOwner, + list_make1(attachPartitionCommand)); + MemoryContextReset(localContext); + } } } @@ -1124,17 +1165,17 @@ CreatePartitioningHierarchy(List *shardList, char *targetNodeName, int targetNod /* - * CreateForeignKeyConstraints is used to create the foreign constraints - * on the logical replication target without checking that they are actually - * valid. + * CreateUncheckedForeignKeyConstraints is used to create the foreign + * constraints on the logical replication target without checking that they are + * actually valid. * * We skip the validation phase of foreign keys to after a shard * move/copy/split because the validation is pretty costly and given that the * source placements are already valid, the validation in the target nodes is * useless. */ -static void -CreateForeignKeyConstraints(List *logicalRepTargetList) +void +CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 168f7b03c..994650568 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -177,5 +177,13 @@ extern void RecreateGroupedLogicalRepTargetsConnections( char *user, char *databaseName); extern void CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash); +extern void CompleteNonBlockingShardTransfer(List *shardList, + MultiConnection *sourceConnection, + HTAB *publicationInfoHash, + List *logicalRepTargetList, + HTAB *groupedLogicalRepTargetsHash, + LogicalRepType type); +extern void CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList); +extern void CreatePartitioningHierarchy(List *logicalRepTargetList); #endif /* MULTI_LOGICAL_REPLICATION_H_ */ diff --git a/src/test/regress/expected/failure_tenant_isolation_nonblocking.out b/src/test/regress/expected/failure_tenant_isolation_nonblocking.out index 63719c4ec..c166a41af 100644 --- a/src/test/regress/expected/failure_tenant_isolation_nonblocking.out +++ b/src/test/regress/expected/failure_tenant_isolation_nonblocking.out @@ -233,24 +233,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").cancel(' || :p (1 row) -SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); -ERROR: canceling statement due to user request --- failure on colocated table constraints -SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_2 ADD CONSTRAINT").after(1).kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); -ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open --- cancellation on colocated table constraints -SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_2 ADD CONSTRAINT").after(2).cancel(' || :pid || ')'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on catching up LSN @@ -382,15 +364,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey F SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx --- failure on foreign key creation -SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").after(2).cancel(' || :pid || ')'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); -ERROR: canceling statement due to user request -- failure on shard split transaction SELECT citus.mitmproxy('conn.onQuery(query="BEGIN").kill()'); mitmproxy diff --git a/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql b/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql index 60bfff417..2ee928a56 100644 --- a/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql +++ b/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql @@ -117,14 +117,6 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").cancel(' || :pid || ')'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); --- failure on colocated table constraints -SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_2 ADD CONSTRAINT").after(1).kill()'); -SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); - --- cancellation on colocated table constraints -SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_2 ADD CONSTRAINT").after(2).cancel(' || :pid || ')'); -SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); - -- failure on catching up LSN SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_current_wal_lsn").kill()'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); @@ -184,10 +176,6 @@ SET citus.defer_drop_after_shard_split TO ON; SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); --- failure on foreign key creation -SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").after(2).cancel(' || :pid || ')'); -SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); - -- failure on shard split transaction SELECT citus.mitmproxy('conn.onQuery(query="BEGIN").kill()'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');