Share more replication code between moves and splits (#6310)

The logical replication catchup part for shard splits and shard moves is
very similar. This abstracts most of that similarity away into a single
function. This also improves the logic for non blocking shard splits a
bit, by using faster foreign key creation. It also parallelizes index creation
which shard moves were already doing, but shard splits did not.
pull/6313/head
Jelte Fennema 2022-09-09 16:45:38 +02:00 committed by GitHub
parent ba2fe3e3c4
commit a2d86214b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 340 additions and 359 deletions

View File

@ -132,7 +132,8 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
uint32 colocationId); uint32 colocationId);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList, static void CreatePartitioningHierarchyForBlockingSplit(
List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
@ -630,7 +631,8 @@ BlockingShardSplit(SplitOperation splitOperation,
workersForPlacementList); workersForPlacementList);
/* create partitioning hierarchy, if any */ /* create partitioning hierarchy, if any */
CreatePartitioningHierarchy(shardGroupSplitIntervalListList, CreatePartitioningHierarchyForBlockingSplit(
shardGroupSplitIntervalListList,
workersForPlacementList); workersForPlacementList);
/* /*
@ -1218,7 +1220,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
* hierarchy between the shardList, if any. * hierarchy between the shardList, if any.
*/ */
static void static void
CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList, CreatePartitioningHierarchyForBlockingSplit(List *shardGroupSplitIntervalListList,
List *workersForPlacementList) List *workersForPlacementList)
{ {
/* Create partition heirarchy between shards */ /* Create partition heirarchy between shards */
@ -1610,51 +1612,18 @@ NonBlockingShardSplit(SplitOperation splitOperation,
snapshot, distributionColumnOverrides); snapshot, distributionColumnOverrides);
/* /*
* 9) Create replica identities, this needs to be done before enabling * 9) Logically replicate all the changes and do most of the table DDL,
* the subscriptions. * like index and foreign key creation.
*/ */
CreateReplicaIdentities(logicalRepTargetList); CompleteNonBlockingShardTransfer(sourceColocatedShardIntervalList,
sourceConnection,
publicationInfoHash,
logicalRepTargetList,
groupedLogicalRepTargetsHash,
SHARD_SPLIT);
/* /*
* 10) Enable the subscriptions: Start the catchup phase * 10) Delete old shards metadata and either mark the shards as
*/
EnableSubscriptions(logicalRepTargetList);
/* 11) Wait for subscriptions to be ready */
WaitForAllSubscriptionsToBecomeReady(groupedLogicalRepTargetsHash);
/* 12) Wait for subscribers to catchup till source LSN */
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
/* 13) Create Auxilary structures */
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList,
false /* includeReplicaIdentity*/);
/* 14) Wait for subscribers to catchup till source LSN */
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
/* Used for testing */
ConflictOnlyWithIsolationTesting();
/* 15) Block writes on source shards */
BlockWritesToShardList(sourceColocatedShardIntervalList);
/* 16) Wait for subscribers to catchup till source LSN */
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
/* 17) Drop Subscribers */
DropSubscriptions(logicalRepTargetList);
/* 18) Drop replication slots
*/
DropReplicationSlots(sourceConnection, logicalRepTargetList);
/* 19) Drop Publications */
DropPublications(sourceConnection, publicationInfoHash);
/*
* 20) Delete old shards metadata and either mark the shards as
* to be deferred drop or physically delete them. * to be deferred drop or physically delete them.
* Have to do that before creating the new shard metadata, * Have to do that before creating the new shard metadata,
* because there's cross-checks preventing inconsistent metadata * because there's cross-checks preventing inconsistent metadata
@ -1672,7 +1641,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
DropShardListMetadata(sourceColocatedShardIntervalList); DropShardListMetadata(sourceColocatedShardIntervalList);
/* /*
* 21) In case of create_distributed_table_concurrently, which converts * 11) In case of create_distributed_table_concurrently, which converts
* a Citus local table to a distributed table, update the distributed * a Citus local table to a distributed table, update the distributed
* table metadata now. * table metadata now.
* *
@ -1704,34 +1673,36 @@ NonBlockingShardSplit(SplitOperation splitOperation,
targetColocationId); targetColocationId);
} }
/* 22) Insert new shard and placement metdata */ /* 12) Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList); workersForPlacementList);
CreatePartitioningHierarchy(shardGroupSplitIntervalListList, /* 13) create partitioning hierarchy, if any, this needs to be done
workersForPlacementList); * after the metadata is correct, because it fails for some
* uninvestigated reason otherwise.
*/
CreatePartitioningHierarchy(logicalRepTargetList);
/* /*
* 23) Create foreign keys if exists after the metadata changes happening in * 14) Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata. * key creation depends on the new metadata.
*/ */
CreateForeignKeyConstraints(shardGroupSplitIntervalListList, CreateUncheckedForeignKeyConstraints(logicalRepTargetList);
workersForPlacementList);
/* /*
* 24) Release shared memory allocated by worker_split_shard_replication_setup udf * 15) Release shared memory allocated by worker_split_shard_replication_setup udf
* at source node. * at source node.
*/ */
ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode); ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode);
/* 25) Close source connection */ /* 16) Close source connection */
CloseConnection(sourceConnection); CloseConnection(sourceConnection);
/* 26) Close all subscriber connections */ /* 17) Close all subscriber connections */
CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash); CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash);
/* 27) Close connection of template replication slot */ /* 18) Close connection of template replication slot */
CloseConnection(sourceReplicationConnection); CloseConnection(sourceReplicationConnection);
} }
PG_CATCH(); PG_CATCH();

View File

@ -114,31 +114,20 @@ bool PlacementMovedUsingLogicalReplicationInTX = false;
static int logicalReplicationProgressReportTimeout = 10 * 1000; static int logicalReplicationProgressReportTimeout = 10 * 1000;
static void CreateForeignKeyConstraints(List *logicalRepTargetList);
static List * PrepareReplicationSubscriptionList(List *shardList); static List * PrepareReplicationSubscriptionList(List *shardList);
static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId); static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId);
static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId, static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId,
uint64 shardId); uint64 shardId);
static void CreatePostLogicalReplicationDataLoadObjects(List *shardList, static void CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList,
char *targetNodeName, LogicalRepType type);
int32 targetNodePort); static void ExecuteCreateIndexCommands(List *logicalRepTargetList);
static void ExecuteCreateIndexCommands(List *shardList, char *targetNodeName, static void ExecuteCreateConstraintsBackedByIndexCommands(List *logicalRepTargetList);
int targetNodePort);
static void ExecuteCreateConstraintsBackedByIndexCommands(List *shardList,
char *targetNodeName,
int targetNodePort);
static List * ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList, static List * ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList,
uint64 shardId,
char *targetNodeName, char *targetNodeName,
int targetNodePort); int targetNodePort);
static void ExecuteClusterOnCommands(List *shardList, char *targetNodeName, static void ExecuteClusterOnCommands(List *logicalRepTargetList);
int targetNodePort); static void ExecuteCreateIndexStatisticsCommands(List *logicalRepTargetList);
static void ExecuteCreateIndexStatisticsCommands(List *shardList, char *targetNodeName, static void ExecuteRemainingPostLoadTableCommands(List *logicalRepTargetList);
int targetNodePort);
static void ExecuteRemainingPostLoadTableCommands(List *shardList, char *targetNodeName,
int targetNodePort);
static void CreatePartitioningHierarchy(List *shardList, char *targetNodeName,
int targetNodePort);
static char * escape_param_str(const char *str); static char * escape_param_str(const char *str);
static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command);
static bool RelationSubscriptionsAreReady( static bool RelationSubscriptionsAreReady(
@ -208,10 +197,6 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
*/ */
ClaimConnectionExclusively(sourceConnection); ClaimConnectionExclusively(sourceConnection);
MultiConnection *sourceReplicationConnection =
GetReplicationConnection(sourceNodeName, sourceNodePort);
WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);
@ -229,6 +214,9 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
PG_TRY(); PG_TRY();
{ {
MultiConnection *sourceReplicationConnection =
GetReplicationConnection(sourceConnection->hostname, sourceConnection->port);
/* set up the publication on the source and subscription on the target */ /* set up the publication on the source and subscription on the target */
CreatePublications(sourceConnection, publicationInfoHash); CreatePublications(sourceConnection, publicationInfoHash);
char *snapshot = CreateReplicationSlots( char *snapshot = CreateReplicationSlots(
@ -239,7 +227,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
CreateSubscriptions( CreateSubscriptions(
sourceConnection, sourceConnection,
databaseName, sourceConnection->database,
logicalRepTargetList); logicalRepTargetList);
/* only useful for isolation testing, see the function comment for the details */ /* only useful for isolation testing, see the function comment for the details */
@ -256,77 +244,14 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
CloseConnection(sourceReplicationConnection); CloseConnection(sourceReplicationConnection);
/* /*
* We have to create the primary key (or any other replica identity) * Start the replication and copy all data
* before the update/delete operations that are queued will be
* replicated. Because if the replica identity does not exist on the
* target, the replication would fail.
*
* So we it right after the initial data COPY, but before enabling the
* susbcriptions. We do it at this latest possible moment, because its
* much cheaper to build an index at once than to create it
* incrementally. So this way we create the primary key index in one go
* for all data from the initial COPY.
*/ */
CreateReplicaIdentities(logicalRepTargetList); CompleteNonBlockingShardTransfer(shardList,
sourceConnection,
/* Start applying the changes from the replication slots to catch up. */ publicationInfoHash,
EnableSubscriptions(logicalRepTargetList); logicalRepTargetList,
groupedLogicalRepTargetsHash,
/* SHARD_MOVE);
* The following check is a leftover from when used subscriptions with
* copy_data=true. It's probably not really necessary anymore, but it
* seemed like a nice check to keep. At least for debugging issues it
* seems nice to report differences between the subscription never
* becoming ready and the subscriber not applying WAL. It's also not
* entirely clear if the catchup check handles the case correctly where
* the subscription is not in the ready state yet, because so far it
* never had to.
*/
WaitForAllSubscriptionsToBecomeReady(groupedLogicalRepTargetsHash);
/*
* Wait until all the subscriptions are caught up to changes that
* happened after the initial COPY on the shards.
*/
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
/*
* Now lets create the post-load objects, such as the indexes, constraints
* and partitioning hierarchy. Once they are done, wait until the replication
* catches up again. So we don't block writes too long.
*/
CreatePostLogicalReplicationDataLoadObjects(shardList, targetNodeName,
targetNodePort);
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
/*
* We're almost done, we'll block the writes to the shards that we're
* replicating and expect all the subscription to catch up quickly
* afterwards.
*
* Notice that although shards in partitioned relation are excluded from
* logical replication, they are still locked against modification, and
* foreign constraints are created on them too.
*/
BlockWritesToShardList(shardList);
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
/*
* We're creating the foreign constraints to reference tables after the
* data is already replicated and all the necessary locks are acquired.
*
* We prefer to do it here because the placements of reference tables
* are always valid, and any modification during the shard move would
* cascade to the hash distributed tables' shards if we had created
* the constraints earlier.
*/
CreateForeignKeyConstraints(logicalRepTargetList);
/* we're done, cleanup the publication and subscription */
DropSubscriptions(logicalRepTargetList);
DropReplicationSlots(sourceConnection, logicalRepTargetList);
DropPublications(sourceConnection, publicationInfoHash);
/* /*
* We use these connections exclusively for subscription management, * We use these connections exclusively for subscription management,
@ -405,6 +330,104 @@ CreateGroupedLogicalRepTargetsHash(List *logicalRepTargetList)
} }
/*
* CompleteNonBlockingShardTransfer uses logical replication to apply the changes
* made on the source to the target. It also runs all DDL on the target shards
* that need to be run after the data copy.
*
* For shard splits it skips the partition hierarchy and foreign key creation
* though, since those need to happen after the metadata is updated.
*/
void
CompleteNonBlockingShardTransfer(List *shardList,
MultiConnection *sourceConnection,
HTAB *publicationInfoHash,
List *logicalRepTargetList,
HTAB *groupedLogicalRepTargetsHash,
LogicalRepType type)
{
/*
* We have to create the primary key (or any other replica identity)
* before the update/delete operations that are queued will be
* replicated. Because if the replica identity does not exist on the
* target, the replication would fail.
*
* So we it right after the initial data COPY, but before enabling the
* susbcriptions. We do it at this latest possible moment, because its
* much cheaper to build an index at once than to create it
* incrementally. So this way we create the primary key index in one go
* for all data from the initial COPY.
*/
CreateReplicaIdentities(logicalRepTargetList);
/* Start applying the changes from the replication slots to catch up. */
EnableSubscriptions(logicalRepTargetList);
/*
* The following check is a leftover from when used subscriptions with
* copy_data=true. It's probably not really necessary anymore, but it
* seemed like a nice check to keep. At least for debugging issues it
* seems nice to report differences between the subscription never
* becoming ready and the subscriber not applying WAL. It's also not
* entirely clear if the catchup check handles the case correctly where
* the subscription is not in the ready state yet, because so far it
* never had to.
*/
WaitForAllSubscriptionsToBecomeReady(groupedLogicalRepTargetsHash);
/*
* Wait until all the subscriptions are caught up to changes that
* happened after the initial COPY on the shards.
*/
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
/*
* Now lets create the post-load objects, such as the indexes, constraints
* and partitioning hierarchy. Once they are done, wait until the replication
* catches up again. So we don't block writes too long.
*/
CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type);
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
/* only useful for isolation testing, see the function comment for the details */
ConflictOnlyWithIsolationTesting();
/*
* We're almost done, we'll block the writes to the shards that we're
* replicating and expect all the subscription to catch up quickly
* afterwards.
*
* Notice that although shards in partitioned relation are excluded from
* logical replication, they are still locked against modification, and
* foreign constraints are created on them too.
*/
BlockWritesToShardList(shardList);
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
if (type != SHARD_SPLIT)
{
/*
* We're creating the foreign constraints to reference tables after the
* data is already replicated and all the necessary locks are acquired.
*
* We prefer to do it here because the placements of reference tables
* are always valid, and any modification during the shard move would
* cascade to the hash distributed tables' shards if we had created
* the constraints earlier. The same is true for foreign keys between
* tables owned by different users.
*/
CreateUncheckedForeignKeyConstraints(logicalRepTargetList);
}
/* we're done, cleanup the publication and subscription */
DropSubscriptions(logicalRepTargetList);
DropReplicationSlots(sourceConnection, logicalRepTargetList);
DropPublications(sourceConnection, publicationInfoHash);
}
/* /*
* CreateShardMovePublicationInfoHash creates hashmap of PublicationInfos for a * CreateShardMovePublicationInfoHash creates hashmap of PublicationInfos for a
* shard move. Even though we only support moving a shard to a single target * shard move. Even though we only support moving a shard to a single target
@ -742,8 +765,8 @@ GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId)
* the objects that can be created after the data is moved with logical replication. * the objects that can be created after the data is moved with logical replication.
*/ */
static void static void
CreatePostLogicalReplicationDataLoadObjects(List *shardList, char *targetNodeName, CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList,
int32 targetNodePort) LogicalRepType type)
{ {
/* /*
* We create indexes in 4 steps. * We create indexes in 4 steps.
@ -759,20 +782,25 @@ CreatePostLogicalReplicationDataLoadObjects(List *shardList, char *targetNodeNam
* table and setting the statistics of indexes, depends on the indexes being * table and setting the statistics of indexes, depends on the indexes being
* created. That's why the execution is divided into four distinct stages. * created. That's why the execution is divided into four distinct stages.
*/ */
ExecuteCreateIndexCommands(shardList, targetNodeName, targetNodePort); ExecuteCreateIndexCommands(logicalRepTargetList);
ExecuteCreateConstraintsBackedByIndexCommands(shardList, targetNodeName, ExecuteCreateConstraintsBackedByIndexCommands(logicalRepTargetList);
targetNodePort); ExecuteClusterOnCommands(logicalRepTargetList);
ExecuteClusterOnCommands(shardList, targetNodeName, targetNodePort); ExecuteCreateIndexStatisticsCommands(logicalRepTargetList);
ExecuteCreateIndexStatisticsCommands(shardList, targetNodeName, targetNodePort);
/* /*
* Once the indexes are created, there are few more objects like triggers and table * Once the indexes are created, there are few more objects like triggers and table
* statistics that should be created after the data move. * statistics that should be created after the data move.
*/ */
ExecuteRemainingPostLoadTableCommands(shardList, targetNodeName, targetNodePort); ExecuteRemainingPostLoadTableCommands(logicalRepTargetList);
/*
* Creating the partitioning hierarchy errors out in shard splits when
*/
if (type != SHARD_SPLIT)
{
/* create partitioning hierarchy, if any */ /* create partitioning hierarchy, if any */
CreatePartitioningHierarchy(shardList, targetNodeName, targetNodePort); CreatePartitioningHierarchy(logicalRepTargetList);
}
} }
@ -784,13 +812,15 @@ CreatePostLogicalReplicationDataLoadObjects(List *shardList, char *targetNodeNam
* commands fail. * commands fail.
*/ */
static void static void
ExecuteCreateIndexCommands(List *shardList, char *targetNodeName, int targetNodePort) ExecuteCreateIndexCommands(List *logicalRepTargetList)
{ {
List *taskList = NIL; List *taskList = NIL;
ListCell *shardCell = NULL; LogicalRepTarget *target = NULL;
foreach(shardCell, shardList) foreach_ptr(target, logicalRepTargetList)
{
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, target->newShards)
{ {
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell);
Oid relationId = shardInterval->relationId; Oid relationId = shardInterval->relationId;
List *tableCreateIndexCommandList = List *tableCreateIndexCommandList =
@ -801,11 +831,13 @@ ExecuteCreateIndexCommands(List *shardList, char *targetNodeName, int targetNode
WorkerApplyShardDDLCommandList(tableCreateIndexCommandList, WorkerApplyShardDDLCommandList(tableCreateIndexCommandList,
shardInterval->shardId); shardInterval->shardId);
List *taskListForShard = List *taskListForShard =
ConvertNonExistingPlacementDDLCommandsToTasks(shardCreateIndexCommandList, ConvertNonExistingPlacementDDLCommandsToTasks(
shardInterval->shardId, shardCreateIndexCommandList,
targetNodeName, targetNodePort); target->superuserConnection->hostname,
target->superuserConnection->port);
taskList = list_concat(taskList, taskListForShard); taskList = list_concat(taskList, taskListForShard);
} }
}
/* /*
* We are going to create indexes and constraints using the current user. That is * We are going to create indexes and constraints using the current user. That is
@ -819,8 +851,7 @@ ExecuteCreateIndexCommands(List *shardList, char *targetNodeName, int targetNode
*/ */
ereport(DEBUG1, (errmsg("Creating post logical replication objects " ereport(DEBUG1, (errmsg("Creating post logical replication objects "
"(indexes) on node %s:%d", targetNodeName, "(indexes)")));
targetNodePort)));
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, taskList, ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, taskList,
MaxAdaptiveExecutorPoolSize, MaxAdaptiveExecutorPoolSize,
@ -836,23 +867,22 @@ ExecuteCreateIndexCommands(List *shardList, char *targetNodeName, int targetNode
* commands fail. * commands fail.
*/ */
static void static void
ExecuteCreateConstraintsBackedByIndexCommands(List *shardList, char *targetNodeName, ExecuteCreateConstraintsBackedByIndexCommands(List *logicalRepTargetList)
int targetNodePort)
{ {
ereport(DEBUG1, (errmsg("Creating post logical replication objects " ereport(DEBUG1, (errmsg("Creating post logical replication objects "
"(constraints backed by indexes) on node %s:%d", "(constraints backed by indexes)")));
targetNodeName,
targetNodePort)));
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CreateConstraintsBackedByIndexContext", "CreateConstraintsBackedByIndexContext",
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext); MemoryContext oldContext = MemoryContextSwitchTo(localContext);
ListCell *shardCell = NULL; LogicalRepTarget *target = NULL;
foreach(shardCell, shardList) foreach_ptr(target, logicalRepTargetList)
{
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, target->newShards)
{ {
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell);
Oid relationId = shardInterval->relationId; Oid relationId = shardInterval->relationId;
List *tableCreateConstraintCommandList = List *tableCreateConstraintCommandList =
@ -871,11 +901,14 @@ ExecuteCreateConstraintsBackedByIndexCommands(List *shardList, char *targetNodeN
shardInterval->shardId); shardInterval->shardId);
char *tableOwner = TableOwner(shardInterval->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
tableOwner, tableOwner,
shardCreateConstraintCommandList); shardCreateConstraintCommandList);
MemoryContextReset(localContext); MemoryContextReset(localContext);
} }
}
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
} }
@ -890,7 +923,6 @@ ExecuteCreateConstraintsBackedByIndexCommands(List *shardList, char *targetNodeN
*/ */
static List * static List *
ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList, ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList,
uint64 shardId,
char *targetNodeName, char *targetNodeName,
int targetNodePort) int targetNodePort)
{ {
@ -911,7 +943,6 @@ ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList,
SetPlacementNodeMetadata(taskPlacement, workerNode); SetPlacementNodeMetadata(taskPlacement, workerNode);
task->taskPlacementList = list_make1(taskPlacement); task->taskPlacementList = list_make1(taskPlacement);
task->anchorShardId = shardId;
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
taskId++; taskId++;
@ -929,13 +960,15 @@ ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList,
* is aborted. * is aborted.
*/ */
static void static void
ExecuteClusterOnCommands(List *shardList, char *targetNodeName, int targetNodePort) ExecuteClusterOnCommands(List *logicalRepTargetList)
{ {
List *taskList = NIL; List *taskList = NIL;
ListCell *shardCell; LogicalRepTarget *target = NULL;
foreach(shardCell, shardList) foreach_ptr(target, logicalRepTargetList)
{
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, target->newShards)
{ {
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell);
Oid relationId = shardInterval->relationId; Oid relationId = shardInterval->relationId;
List *tableAlterTableClusterOnCommandList = List *tableAlterTableClusterOnCommandList =
@ -949,14 +982,14 @@ ExecuteClusterOnCommands(List *shardList, char *targetNodeName, int targetNodePo
List *taskListForShard = List *taskListForShard =
ConvertNonExistingPlacementDDLCommandsToTasks( ConvertNonExistingPlacementDDLCommandsToTasks(
shardAlterTableClusterOnCommandList, shardAlterTableClusterOnCommandList,
shardInterval->shardId, target->superuserConnection->hostname,
targetNodeName, targetNodePort); target->superuserConnection->port);
taskList = list_concat(taskList, taskListForShard); taskList = list_concat(taskList, taskListForShard);
} }
}
ereport(DEBUG1, (errmsg("Creating post logical replication objects " ereport(DEBUG1, (errmsg("Creating post logical replication objects "
"(CLUSTER ON) on node %s:%d", targetNodeName, "(CLUSTER ON)")));
targetNodePort)));
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, taskList, ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, taskList,
MaxAdaptiveExecutorPoolSize, MaxAdaptiveExecutorPoolSize,
@ -972,22 +1005,22 @@ ExecuteClusterOnCommands(List *shardList, char *targetNodeName, int targetNodePo
* is aborted. * is aborted.
*/ */
static void static void
ExecuteCreateIndexStatisticsCommands(List *shardList, char *targetNodeName, int ExecuteCreateIndexStatisticsCommands(List *logicalRepTargetList)
targetNodePort)
{ {
ereport(DEBUG1, (errmsg("Creating post logical replication objects " ereport(DEBUG1, (errmsg("Creating post logical replication objects "
"(index statistics) on node %s:%d", targetNodeName, "(index statistics)")));
targetNodePort)));
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CreateIndexStatisticsContext", "CreateIndexStatisticsContext",
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext); MemoryContext oldContext = MemoryContextSwitchTo(localContext);
ListCell *shardCell; LogicalRepTarget *target = NULL;
foreach(shardCell, shardList) foreach_ptr(target, logicalRepTargetList)
{
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, target->newShards)
{ {
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell);
Oid relationId = shardInterval->relationId; Oid relationId = shardInterval->relationId;
List *tableAlterIndexSetStatisticsCommandList = List *tableAlterIndexSetStatisticsCommandList =
@ -1009,12 +1042,15 @@ ExecuteCreateIndexStatisticsCommands(List *shardList, char *targetNodeName, int
* need to create them in parallel. * need to create them in parallel.
*/ */
char *tableOwner = TableOwner(shardInterval->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
tableOwner, tableOwner,
shardAlterIndexSetStatisticsCommandList); shardAlterIndexSetStatisticsCommandList);
MemoryContextReset(localContext); MemoryContextReset(localContext);
} }
}
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
} }
@ -1026,23 +1062,23 @@ ExecuteCreateIndexStatisticsCommands(List *shardList, char *targetNodeName, int
* in the given target node. * in the given target node.
*/ */
static void static void
ExecuteRemainingPostLoadTableCommands(List *shardList, char *targetNodeName, int ExecuteRemainingPostLoadTableCommands(List *logicalRepTargetList)
targetNodePort)
{ {
ereport(DEBUG1, (errmsg("Creating post logical replication objects " ereport(DEBUG1, (errmsg("Creating post logical replication objects "
"(triggers and table statistics) on node %s:%d", "(triggers and table statistics)"
targetNodeName, )));
targetNodePort)));
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CreateTableStatisticsContext", "CreateTableStatisticsContext",
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext); MemoryContext oldContext = MemoryContextSwitchTo(localContext);
ListCell *shardCell = NULL; LogicalRepTarget *target = NULL;
foreach(shardCell, shardList) foreach_ptr(target, logicalRepTargetList)
{
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, target->newShards)
{ {
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell);
Oid relationId = shardInterval->relationId; Oid relationId = shardInterval->relationId;
bool includeIndexes = false; bool includeIndexes = false;
@ -1067,12 +1103,15 @@ ExecuteRemainingPostLoadTableCommands(List *shardList, char *targetNodeName, int
* need to create them in parallel. * need to create them in parallel.
*/ */
char *tableOwner = TableOwner(shardInterval->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
tableOwner, tableOwner,
shardPostLoadTableCommandList); shardPostLoadTableCommandList);
MemoryContextReset(localContext); MemoryContextReset(localContext);
} }
}
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
} }
@ -1082,23 +1121,23 @@ ExecuteRemainingPostLoadTableCommands(List *shardList, char *targetNodeName, int
* CreatePartitioningHierarchy gets a shardList and creates the partitioning * CreatePartitioningHierarchy gets a shardList and creates the partitioning
* hierarchy between the shardList, if any, * hierarchy between the shardList, if any,
*/ */
static void void
CreatePartitioningHierarchy(List *shardList, char *targetNodeName, int targetNodePort) CreatePartitioningHierarchy(List *logicalRepTargetList)
{ {
ereport(DEBUG1, (errmsg("Creating post logical replication objects " ereport(DEBUG1, (errmsg("Creating post logical replication objects "
"(partitioning hierarchy) on node %s:%d", targetNodeName, "(partitioning hierarchy)")));
targetNodePort)));
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CreatePartitioningHierarchy", "CreatePartitioningHierarchy",
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext); MemoryContext oldContext = MemoryContextSwitchTo(localContext);
ListCell *shardCell = NULL; LogicalRepTarget *target = NULL;
foreach(shardCell, shardList) foreach_ptr(target, logicalRepTargetList)
{
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, target->newShards)
{ {
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell);
if (PartitionTable(shardInterval->relationId)) if (PartitionTable(shardInterval->relationId))
{ {
char *attachPartitionCommand = char *attachPartitionCommand =
@ -1111,30 +1150,32 @@ CreatePartitioningHierarchy(List *shardList, char *targetNodeName, int targetNod
* parallel, so create them sequentially. Also attaching partition * parallel, so create them sequentially. Also attaching partition
* is a quick operation, so it is fine to execute sequentially. * is a quick operation, so it is fine to execute sequentially.
*/ */
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerOutsideTransaction(
target->superuserConnection->hostname,
target->superuserConnection->port,
tableOwner, tableOwner,
list_make1( list_make1(attachPartitionCommand));
attachPartitionCommand));
MemoryContextReset(localContext); MemoryContextReset(localContext);
} }
} }
}
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
} }
/* /*
* CreateForeignKeyConstraints is used to create the foreign constraints * CreateUncheckedForeignKeyConstraints is used to create the foreign
* on the logical replication target without checking that they are actually * constraints on the logical replication target without checking that they are
* valid. * actually valid.
* *
* We skip the validation phase of foreign keys to after a shard * We skip the validation phase of foreign keys to after a shard
* move/copy/split because the validation is pretty costly and given that the * move/copy/split because the validation is pretty costly and given that the
* source placements are already valid, the validation in the target nodes is * source placements are already valid, the validation in the target nodes is
* useless. * useless.
*/ */
static void void
CreateForeignKeyConstraints(List *logicalRepTargetList) CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList)
{ {
MemoryContext localContext = MemoryContext localContext =
AllocSetContextCreate(CurrentMemoryContext, AllocSetContextCreate(CurrentMemoryContext,

View File

@ -177,5 +177,13 @@ extern void RecreateGroupedLogicalRepTargetsConnections(
char *user, char *user,
char *databaseName); char *databaseName);
extern void CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash); extern void CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash);
extern void CompleteNonBlockingShardTransfer(List *shardList,
MultiConnection *sourceConnection,
HTAB *publicationInfoHash,
List *logicalRepTargetList,
HTAB *groupedLogicalRepTargetsHash,
LogicalRepType type);
extern void CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList);
extern void CreatePartitioningHierarchy(List *logicalRepTargetList);
#endif /* MULTI_LOGICAL_REPLICATION_H_ */ #endif /* MULTI_LOGICAL_REPLICATION_H_ */

View File

@ -233,24 +233,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").cancel(' || :p
(1 row) (1 row)
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: canceling statement due to user request
-- failure on colocated table constraints
SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_2 ADD CONSTRAINT").after(1).kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- cancellation on colocated table constraints
SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_2 ADD CONSTRAINT").after(2).cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: canceling statement due to user request ERROR: canceling statement due to user request
-- failure on catching up LSN -- failure on catching up LSN
@ -382,15 +364,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey F
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: connection not open ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
-- failure on foreign key creation
SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").after(2).cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: canceling statement due to user request
-- failure on shard split transaction -- failure on shard split transaction
SELECT citus.mitmproxy('conn.onQuery(query="BEGIN").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="BEGIN").kill()');
mitmproxy mitmproxy

View File

@ -117,14 +117,6 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").cancel(' || :pid || ')');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- failure on colocated table constraints
SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_2 ADD CONSTRAINT").after(1).kill()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- cancellation on colocated table constraints
SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_2 ADD CONSTRAINT").after(2).cancel(' || :pid || ')');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- failure on catching up LSN -- failure on catching up LSN
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_current_wal_lsn").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_current_wal_lsn").kill()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
@ -184,10 +176,6 @@ SET citus.defer_drop_after_shard_split TO ON;
SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- failure on foreign key creation
SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").after(2).cancel(' || :pid || ')');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- failure on shard split transaction -- failure on shard split transaction
SELECT citus.mitmproxy('conn.onQuery(query="BEGIN").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="BEGIN").kill()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');