diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index babfe7bf5..abaa00251 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -70,22 +70,43 @@ typedef struct ShardCommandList List *ddlCommandList; } ShardCommandList; +static const char *ShardTransferTypeNames[] = { + [SHARD_TRANSFER_INVALID_FIRST] = "unknown", + [SHARD_TRANSFER_MOVE] = "move", + [SHARD_TRANSFER_COPY] = "copy", +}; + +static const char *ShardTransferTypeNamesCapitalized[] = { + [SHARD_TRANSFER_INVALID_FIRST] = "unknown", + [SHARD_TRANSFER_MOVE] = "Move", + [SHARD_TRANSFER_COPY] = "Copy", +}; + +static const char *ShardTransferTypeNamesContinuous[] = { + [SHARD_TRANSFER_INVALID_FIRST] = "unknown", + [SHARD_TRANSFER_MOVE] = "Moving", + [SHARD_TRANSFER_COPY] = "Copying", +}; + +static const char *ShardTransferTypeFunctionNames[] = { + [SHARD_TRANSFER_INVALID_FIRST] = "unknown", + [SHARD_TRANSFER_MOVE] = "citus_move_shard_placement", + [SHARD_TRANSFER_COPY] = "citus_copy_shard_placement", +}; + /* local function forward declarations */ static bool CanUseLogicalReplication(Oid relationId, char shardReplicationMode); static void ErrorIfTableCannotBeReplicated(Oid relationId); -static void ErrorIfTargetNodeIsNotSafeToCopyTo(const char *targetNodeName, - int targetNodePort); +static void ErrorIfTargetNodeIsNotSafeForTransfer(const char *targetNodeName, + int targetNodePort, + ShardTransferType transferType); static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort, const char *operationName); -static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, - int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort, - char shardReplicationMode); static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - char *operationName); + const char *operationName); static void CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, @@ -100,7 +121,7 @@ static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 targetNodePort); static List * RecreateTableDDLCommandList(Oid relationId); static void EnsureTableListOwner(List *tableIdList); -static void EnsureTableListSuitableForReplication(List *tableIdList); +static void ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList); static void DropShardPlacementsFromMetadata(List *shardList, char *nodeName, @@ -112,12 +133,28 @@ static void UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId, int32 targetNodePort); static bool IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetPort); +static void SetupRebalanceMonitorForShardTransfer(uint64 shardId, Oid distributedTableId, + char *sourceNodeName, + uint32 sourceNodePort, + char *targetNodeName, + uint32 targetNodePort, + ShardTransferType transferType); static void CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes); +static void EnsureAllShardsCanBeCopied(List *colocatedShardList, + char *sourceNodeName, uint32 sourceNodePort, + char *targetNodeName, uint32 targetNodePort); static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList, char *sourceNodeName, uint32 sourceNodePort, - char *targetNodeName, uint32 - targetNodePort); + char *targetNodeName, uint32 targetNodePort, + ShardTransferType transferType); +static bool TransferAlreadyCompleted(List *colocatedShardList, + char *sourceNodeName, uint32 sourceNodePort, + char *targetNodeName, uint32 targetNodePort, + ShardTransferType transferType); +static void LockColocatedRelationsForMove(List *colocatedTableList); +static void ErrorIfForeignTableForShardTransfer(List *colocatedTableList, + ShardTransferType transferType); static List * RecreateShardDDLCommandList(ShardInterval *shardInterval, const char *sourceNodeName, int32 sourceNodePort); @@ -163,9 +200,9 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); - ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, - shardReplicationMode); + TransferShards(shardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + shardReplicationMode, SHARD_TRANSFER_COPY); PG_RETURN_VOID(); } @@ -192,10 +229,9 @@ citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS) char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); - ReplicateColocatedShardPlacement(shardId, - sourceNode->workerName, sourceNode->workerPort, - targetNode->workerName, targetNode->workerPort, - shardReplicationMode); + TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort, + targetNode->workerName, targetNode->workerPort, + shardReplicationMode, SHARD_TRANSFER_COPY); PG_RETURN_VOID(); } @@ -228,9 +264,9 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) ereport(WARNING, (errmsg("do_repair argument is deprecated"))); } - ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, - shardReplicationMode); + TransferShards(shardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + shardReplicationMode, SHARD_TRANSFER_COPY); PG_RETURN_VOID(); @@ -264,9 +300,10 @@ citus_move_shard_placement(PG_FUNCTION_ARGS) int32 targetNodePort = PG_GETARG_INT32(4); Oid shardReplicationModeOid = PG_GETARG_OID(5); - citus_move_shard_placement_internal(shardId, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, - shardReplicationModeOid); + char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + TransferShards(shardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + shardReplicationMode, SHARD_TRANSFER_MOVE); PG_RETURN_VOID(); } @@ -291,126 +328,111 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS) WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk); WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk); - citus_move_shard_placement_internal(shardId, sourceNode->workerName, - sourceNode->workerPort, targetNode->workerName, - targetNode->workerPort, - shardReplicationModeOid); + char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + TransferShards(shardId, sourceNode->workerName, + sourceNode->workerPort, targetNode->workerName, + targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE); PG_RETURN_VOID(); } /* - * citus_move_shard_placement_internal is the internal function for shard moves. + * TransferShards is the function for shard transfers. */ void -citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName, - int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort, Oid shardReplicationModeOid) +TransferShards(int64 shardId, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort, char shardReplicationMode, + ShardTransferType transferType) { - ListCell *colocatedTableCell = NULL; - ListCell *colocatedShardCell = NULL; + /* strings to be used in log messages */ + const char *operationName = ShardTransferTypeNames[transferType]; + const char *operationNameCapitalized = + ShardTransferTypeNamesCapitalized[transferType]; + const char *operationFunctionName = ShardTransferTypeFunctionNames[transferType]; + /* cannot transfer shard to the same node */ ErrorIfSameNode(sourceNodeName, sourceNodePort, targetNodeName, targetNodePort, - "move"); - - Oid relationId = RelationIdForShard(shardId); - ErrorIfMoveUnsupportedTableType(relationId); - ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort); - - AcquirePlacementColocationLock(relationId, ExclusiveLock, "move"); + operationName); ShardInterval *shardInterval = LoadShardInterval(shardId); Oid distributedTableId = shardInterval->relationId; + /* error if unsupported shard transfer */ + if (transferType == SHARD_TRANSFER_MOVE) + { + ErrorIfMoveUnsupportedTableType(distributedTableId); + } + else if (transferType == SHARD_TRANSFER_COPY) + { + ErrorIfTableCannotBeReplicated(distributedTableId); + EnsureNoModificationsHaveBeenDone(); + } + + ErrorIfTargetNodeIsNotSafeForTransfer(targetNodeName, targetNodePort, transferType); + + AcquirePlacementColocationLock(distributedTableId, ExclusiveLock, operationName); + List *colocatedTableList = ColocatedTableList(distributedTableId); List *colocatedShardList = ColocatedShardIntervalList(shardInterval); - foreach(colocatedTableCell, colocatedTableList) + EnsureTableListOwner(colocatedTableList); + + if (transferType == SHARD_TRANSFER_MOVE) { - Oid colocatedTableId = lfirst_oid(colocatedTableCell); - - /* check that user has owner rights in all co-located tables */ - EnsureTableOwner(colocatedTableId); - /* * Block concurrent DDL / TRUNCATE commands on the relation. Similarly, * block concurrent citus_move_shard_placement() on any shard of * the same relation. This is OK for now since we're executing shard * moves sequentially anyway. */ - LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); - - if (IsForeignTable(relationId)) - { - char *relationName = get_rel_name(colocatedTableId); - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot move shard"), - errdetail("Table %s is a foreign table. Moving " - "shards backed by foreign tables is " - "not supported.", relationName))); - } + LockColocatedRelationsForMove(colocatedTableList); } - /* we sort colocatedShardList so that lock operations will not cause any deadlocks */ - colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + ErrorIfForeignTableForShardTransfer(colocatedTableList, transferType); + + if (transferType == SHARD_TRANSFER_COPY) + { + ErrorIfReplicatingDistributedTableWithFKeys(colocatedTableList); + } /* - * If there are no active placements on the source and only active placements on - * the target node, we assume the copy to already be done. + * We sort shardIntervalList so that lock operations will not cause any + * deadlocks. */ - if (IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) && - !IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort)) + colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + + if (TransferAlreadyCompleted(colocatedShardList, + sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + transferType)) { + /* if the transfer is already completed, we can return right away */ ereport(WARNING, (errmsg("shard is already present on node %s:%d", targetNodeName, targetNodePort), - errdetail("Move may have already completed."))); + errdetail("%s may have already completed.", + operationNameCapitalized))); return; } - foreach(colocatedShardCell, colocatedShardList) - { - ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); - uint64 colocatedShardId = colocatedShard->shardId; - - EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort, + EnsureAllShardsCanBeCopied(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort); - } - char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); if (shardReplicationMode == TRANSFER_MODE_AUTOMATIC) { VerifyTablesHaveReplicaIdentity(colocatedTableList); } - EnsureEnoughDiskSpaceForShardMove(colocatedShardList, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); + EnsureEnoughDiskSpaceForShardMove(colocatedShardList, + sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, transferType); - - /* - * We want to be able to track progress of shard moves using - * get_rebalancer_progress. If this move is initiated by the rebalancer, - * then the rebalancer call has already set up the shared memory that is - * used to do that. But if citus_move_shard_placement is called directly by - * the user (or through any other mechanism), then the shared memory is not - * set up yet. In that case we do it here. - */ - if (!IsRebalancerInternalBackend()) - { - WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); - WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); - - PlacementUpdateEvent *placementUpdateEvent = palloc0( - sizeof(PlacementUpdateEvent)); - placementUpdateEvent->updateType = PLACEMENT_UPDATE_MOVE; - placementUpdateEvent->shardId = shardId; - placementUpdateEvent->sourceNode = sourceNode; - placementUpdateEvent->targetNode = targetNode; - SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId, - REBALANCE_PROGRESS_MOVING, - PLACEMENT_UPDATE_STATUS_SETTING_UP); - } + SetupRebalanceMonitorForShardTransfer(shardId, distributedTableId, + sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort, + transferType); UpdatePlacementUpdateStatusForShardIntervalList( colocatedShardList, @@ -428,7 +450,7 @@ citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName, { BlockWritesToShardList(colocatedShardList); } - else + else if (transferType == SHARD_TRANSFER_MOVE) { /* * We prevent multiple shard moves in a transaction that use logical @@ -452,6 +474,20 @@ citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName, PlacementMovedUsingLogicalReplicationInTX = true; } + if (transferType == SHARD_TRANSFER_COPY && + !IsCitusTableType(distributedTableId, REFERENCE_TABLE)) + { + /* + * When copying a shard to a new node, we should first ensure that reference + * tables are present such that joins work immediately after copying the shard. + * When copying a reference table, we are probably trying to achieve just that. + * + * Since this a long-running operation we do this after the error checks, but + * before taking metadata locks. + */ + EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode); + } + DropOrphanedResourcesInSeparateTransaction(); ShardInterval *colocatedShard = NULL; @@ -466,18 +502,21 @@ citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName, ErrorIfCleanupRecordForShardExists(qualifiedShardName); } - /* - * CopyColocatedShardPlacement function copies given shard with its co-located - * shards. - */ CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName, - targetNodePort, useLogicalReplication, "citus_move_shard_placement"); + targetNodePort, useLogicalReplication, operationFunctionName); - /* delete old shards metadata and mark the shards as to be deferred drop */ - int32 sourceGroupId = GroupForNode(sourceNodeName, sourceNodePort); - InsertCleanupRecordsForShardPlacementsOnNode(colocatedShardList, - sourceGroupId); + if (transferType == SHARD_TRANSFER_MOVE) + { + /* delete old shards metadata and mark the shards as to be deferred drop */ + int32 sourceGroupId = GroupForNode(sourceNodeName, sourceNodePort); + InsertCleanupRecordsForShardPlacementsOnNode(colocatedShardList, + sourceGroupId); + } + /* + * Finally insert the placements to pg_dist_placement and sync it to the + * metadata workers. + */ colocatedShard = NULL; foreach_ptr(colocatedShard, colocatedShardList) { @@ -488,17 +527,30 @@ citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName, InsertShardPlacementRow(colocatedShardId, placementId, ShardLength(colocatedShardId), groupId); + + if (transferType == SHARD_TRANSFER_COPY && + ShouldSyncTableMetadata(colocatedShard->relationId)) + { + char *placementCommand = PlacementUpsertCommand(colocatedShardId, placementId, + 0, groupId); + + SendCommandToWorkersWithMetadata(placementCommand); + } } - /* - * Since this is move operation, we remove the placements from the metadata - * for the source node after copy. - */ - DropShardPlacementsFromMetadata(colocatedShardList, sourceNodeName, sourceNodePort); + if (transferType == SHARD_TRANSFER_MOVE) + { + /* + * Since this is move operation, we remove the placements from the metadata + * for the source node after copy. + */ + DropShardPlacementsFromMetadata(colocatedShardList, + sourceNodeName, sourceNodePort); - UpdateColocatedShardPlacementMetadataOnWorkers(shardId, sourceNodeName, - sourceNodePort, targetNodeName, - targetNodePort); + UpdateColocatedShardPlacementMetadataOnWorkers(shardId, sourceNodeName, + sourceNodePort, targetNodeName, + targetNodePort); + } UpdatePlacementUpdateStatusForShardIntervalList( colocatedShardList, @@ -611,6 +663,70 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN } +/* + * LockColocatedRelationsForMove takes a list of relations, locks all of them + * using ShareUpdateExclusiveLock + */ +static void +LockColocatedRelationsForMove(List *colocatedTableList) +{ + Oid colocatedTableId = InvalidOid; + foreach_oid(colocatedTableId, colocatedTableList) + { + LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); + } +} + + +/* + * ErrorIfForeignTableForShardTransfer takes a list of relations, errors out if + * there's a foreign table in the list. + */ +static void +ErrorIfForeignTableForShardTransfer(List *colocatedTableList, + ShardTransferType transferType) +{ + Oid colocatedTableId = InvalidOid; + foreach_oid(colocatedTableId, colocatedTableList) + { + if (IsForeignTable(colocatedTableId)) + { + char *relationName = get_rel_name(colocatedTableId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot %s shard", + ShardTransferTypeNames[transferType]), + errdetail("Table %s is a foreign table. " + "%s shards backed by foreign tables is " + "not supported.", relationName, + ShardTransferTypeNamesContinuous[transferType]))); + } + } +} + + +/* + * EnsureAllShardsCanBeCopied is a wrapper around EnsureShardCanBeCopied. + */ +static void +EnsureAllShardsCanBeCopied(List *colocatedShardList, + char *sourceNodeName, uint32 sourceNodePort, + char *targetNodeName, uint32 targetNodePort) +{ + ShardInterval *colocatedShard = NULL; + foreach_ptr(colocatedShard, colocatedShardList) + { + uint64 colocatedShardId = colocatedShard->shardId; + + /* + * To transfer shard, there should be healthy placement in source node and no + * placement in the target node. + */ + EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); + } +} + + /* * EnsureEnoughDiskSpaceForShardMove checks that there is enough space for * shard moves of the given colocated shard list from source node to target node. @@ -619,9 +735,10 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList, char *sourceNodeName, uint32 sourceNodePort, - char *targetNodeName, uint32 targetNodePort) + char *targetNodeName, uint32 targetNodePort, + ShardTransferType transferType) { - if (!CheckAvailableSpaceBeforeMove) + if (!CheckAvailableSpaceBeforeMove || transferType != SHARD_TRANSFER_MOVE) { return; } @@ -636,6 +753,34 @@ EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList, } +/* + * TransferAlreadyCompleted returns true if the given shard transfer is already done. + * Returns false otherwise. + */ +static bool +TransferAlreadyCompleted(List *colocatedShardList, + char *sourceNodeName, uint32 sourceNodePort, + char *targetNodeName, uint32 targetNodePort, + ShardTransferType transferType) +{ + if (transferType == SHARD_TRANSFER_MOVE && + IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) && + !IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort)) + { + return true; + } + + if (transferType == SHARD_TRANSFER_COPY && + IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) && + IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort)) + { + return true; + } + + return false; +} + + /* * ShardListSizeInBytes returns the size in bytes of a set of shard tables. */ @@ -682,6 +827,49 @@ ShardListSizeInBytes(List *shardList, char *workerNodeName, uint32 } +/* + * SetupRebalanceMonitorForShardTransfer prepares the parameters and + * calls SetupRebalanceMonitor, unless the current transfer is a move + * initiated by the rebalancer. + * See comments on SetupRebalanceMonitor + */ +static void +SetupRebalanceMonitorForShardTransfer(uint64 shardId, Oid distributedTableId, + char *sourceNodeName, uint32 sourceNodePort, + char *targetNodeName, uint32 targetNodePort, + ShardTransferType transferType) +{ + if (transferType == SHARD_TRANSFER_MOVE && IsRebalancerInternalBackend()) + { + /* + * We want to be able to track progress of shard moves using + * get_rebalancer_progress. If this move is initiated by the rebalancer, + * then the rebalancer call has already set up the shared memory that is + * used to do that, so we should return here. + * But if citus_move_shard_placement is called directly by the user + * (or through any other mechanism), then the shared memory is not + * set up yet. In that case we do it here. + */ + return; + } + + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + + PlacementUpdateEvent *placementUpdateEvent = palloc0( + sizeof(PlacementUpdateEvent)); + placementUpdateEvent->updateType = + transferType == SHARD_TRANSFER_COPY ? PLACEMENT_UPDATE_COPY : + PLACEMENT_UPDATE_MOVE; + placementUpdateEvent->shardId = shardId; + placementUpdateEvent->sourceNode = sourceNode; + placementUpdateEvent->targetNode = targetNode; + SetupRebalanceMonitor(list_make1(placementUpdateEvent), distributedTableId, + REBALANCE_PROGRESS_MOVING, + PLACEMENT_UPDATE_STATUS_SETTING_UP); +} + + /* * CheckSpaceConstraints checks there is enough space to place the colocation * on the node that the connection is connected to. @@ -729,17 +917,19 @@ CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes) /* - * ErrorIfTargetNodeIsNotSafeToMove throws error if the target node is not - * eligible for moving shards. + * ErrorIfTargetNodeIsNotSafeForTransfer throws error if the target node is not + * eligible for shard transfers. */ -void -ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int targetNodePort) +static void +ErrorIfTargetNodeIsNotSafeForTransfer(const char *targetNodeName, int targetNodePort, + ShardTransferType transferType) { WorkerNode *workerNode = FindWorkerNode(targetNodeName, targetNodePort); if (workerNode == NULL) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Moving shards to a non-existing node is not supported"), + errmsg("%s shards to a non-existing node is not supported", + ShardTransferTypeNamesContinuous[transferType]), errhint( "Add the target node via SELECT citus_add_node('%s', %d);", targetNodeName, targetNodePort))); @@ -748,13 +938,14 @@ ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int targetNodePort) if (!workerNode->isActive) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Moving shards to a non-active node is not supported"), + errmsg("%s shards to a non-active node is not supported", + ShardTransferTypeNamesContinuous[transferType]), errhint( "Activate the target node via SELECT citus_activate_node('%s', %d);", targetNodeName, targetNodePort))); } - if (!workerNode->shouldHaveShards) + if (transferType == SHARD_TRANSFER_MOVE && !workerNode->shouldHaveShards) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Moving shards to a node that shouldn't have a shard is " @@ -767,8 +958,9 @@ ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int targetNodePort) if (!NodeIsPrimary(workerNode)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Moving shards to a secondary (e.g., replica) node is " - "not supported"))); + errmsg("%s shards to a secondary (e.g., replica) node is " + "not supported", + ShardTransferTypeNamesContinuous[transferType]))); } } @@ -1046,41 +1238,6 @@ ErrorIfTableCannotBeReplicated(Oid relationId) } -/* - * ErrorIfTargetNodeIsNotSafeToCopyTo throws an error if the target node is not - * eligible for copying shards. - */ -static void -ErrorIfTargetNodeIsNotSafeToCopyTo(const char *targetNodeName, int targetNodePort) -{ - WorkerNode *workerNode = FindWorkerNode(targetNodeName, targetNodePort); - if (workerNode == NULL) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Copying shards to a non-existing node is not supported"), - errhint( - "Add the target node via SELECT citus_add_node('%s', %d);", - targetNodeName, targetNodePort))); - } - - if (!workerNode->isActive) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Copying shards to a non-active node is not supported"), - errhint( - "Activate the target node via SELECT citus_activate_node('%s', %d);", - targetNodeName, targetNodePort))); - } - - if (!NodeIsPrimary(workerNode)) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("Copying shards to a secondary (e.g., replica) node is " - "not supported"))); - } -} - - /* * LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum * values to a char. @@ -1114,154 +1271,6 @@ LookupShardTransferMode(Oid shardReplicationModeOid) } -/* - * ReplicateColocatedShardPlacement replicates the given shard and its - * colocated shards from a source node to target node. - */ -static void -ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, - int32 sourceNodePort, char *targetNodeName, - int32 targetNodePort, char shardReplicationMode) -{ - ShardInterval *shardInterval = LoadShardInterval(shardId); - Oid distributedTableId = shardInterval->relationId; - - ErrorIfSameNode(sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, - "copy"); - - ErrorIfTableCannotBeReplicated(shardInterval->relationId); - ErrorIfTargetNodeIsNotSafeToCopyTo(targetNodeName, targetNodePort); - EnsureNoModificationsHaveBeenDone(); - - AcquirePlacementColocationLock(shardInterval->relationId, ExclusiveLock, "copy"); - - List *colocatedTableList = ColocatedTableList(distributedTableId); - List *colocatedShardList = ColocatedShardIntervalList(shardInterval); - - EnsureTableListOwner(colocatedTableList); - EnsureTableListSuitableForReplication(colocatedTableList); - - /* - * We sort shardIntervalList so that lock operations will not cause any - * deadlocks. - */ - colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); - - /* - * If there are active placements on both nodes, we assume the copy to already - * be done. - */ - if (IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) && - IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort)) - { - ereport(WARNING, (errmsg("shard is already present on node %s:%d", - targetNodeName, targetNodePort), - errdetail("Copy may have already completed."))); - return; - } - - WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); - WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); - - Oid relationId = RelationIdForShard(shardId); - PlacementUpdateEvent *placementUpdateEvent = palloc0( - sizeof(PlacementUpdateEvent)); - placementUpdateEvent->updateType = PLACEMENT_UPDATE_COPY; - placementUpdateEvent->shardId = shardId; - placementUpdateEvent->sourceNode = sourceNode; - placementUpdateEvent->targetNode = targetNode; - SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId, - REBALANCE_PROGRESS_MOVING, - PLACEMENT_UPDATE_STATUS_SETTING_UP); - - UpdatePlacementUpdateStatusForShardIntervalList( - colocatedShardList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_SETTING_UP); - - /* - * At this point of the shard replication, we don't need to block the writes to - * shards when logical replication is used. - */ - bool useLogicalReplication = CanUseLogicalReplication(distributedTableId, - shardReplicationMode); - if (!useLogicalReplication) - { - BlockWritesToShardList(colocatedShardList); - } - - ShardInterval *colocatedShard = NULL; - foreach_ptr(colocatedShard, colocatedShardList) - { - uint64 colocatedShardId = colocatedShard->shardId; - - /* - * For shard copy, there should be healthy placement in source node and no - * placement in the target node. - */ - EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort); - } - - if (shardReplicationMode == TRANSFER_MODE_AUTOMATIC) - { - VerifyTablesHaveReplicaIdentity(colocatedTableList); - } - - if (!IsCitusTableType(distributedTableId, REFERENCE_TABLE)) - { - /* - * When copying a shard to a new node, we should first ensure that reference - * tables are present such that joins work immediately after copying the shard. - * When copying a reference table, we are probably trying to achieve just that. - * - * Since this a long-running operation we do this after the error checks, but - * before taking metadata locks. - */ - EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode); - } - - DropOrphanedResourcesInSeparateTransaction(); - - CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, - targetNodeName, targetNodePort, useLogicalReplication, - "citus_copy_shard_placement"); - - /* - * Finally insert the placements to pg_dist_placement and sync it to the - * metadata workers. - */ - foreach_ptr(colocatedShard, colocatedShardList) - { - uint64 colocatedShardId = colocatedShard->shardId; - uint32 groupId = GroupForNode(targetNodeName, targetNodePort); - uint64 placementId = GetNextPlacementId(); - - InsertShardPlacementRow(colocatedShardId, placementId, - ShardLength(colocatedShardId), - groupId); - - if (ShouldSyncTableMetadata(colocatedShard->relationId)) - { - char *placementCommand = PlacementUpsertCommand(colocatedShardId, placementId, - 0, groupId); - - SendCommandToWorkersWithMetadata(placementCommand); - } - } - - UpdatePlacementUpdateStatusForShardIntervalList( - colocatedShardList, - sourceNodeName, - sourceNodePort, - PLACEMENT_UPDATE_STATUS_COMPLETED); - - FinalizeCurrentProgressMonitor(); -} - - /* * EnsureTableListOwner ensures current user owns given tables. Superusers * are regarded as owners. @@ -1278,25 +1287,15 @@ EnsureTableListOwner(List *tableIdList) /* - * EnsureTableListSuitableForReplication errors out if given tables are not + * ErrorIfReplicatingDistributedTableWithFKeys errors out if given tables are not * suitable for replication. */ static void -EnsureTableListSuitableForReplication(List *tableIdList) +ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList) { Oid tableId = InvalidOid; foreach_oid(tableId, tableIdList) { - if (IsForeignTable(tableId)) - { - char *relationName = get_rel_name(tableId); - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot replicate shard"), - errdetail("Table %s is a foreign table. Replicating " - "shards backed by foreign tables is " - "not supported.", relationName))); - } - List *foreignConstraintCommandList = GetReferencingForeignConstaintCommands(tableId); @@ -1318,7 +1317,7 @@ EnsureTableListSuitableForReplication(List *tableIdList) static void CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort, char *targetNodeName, int32 targetNodePort, bool useLogicalReplication, - char *operationName) + const char *operationName) { if (list_length(shardIntervalList) < 1) { diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index ab3264e5d..ad8329a6c 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -310,8 +310,6 @@ extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList, extern ShardPlacement * SearchShardPlacementInListOrError(List *shardPlacementList, const char *nodeName, uint32 nodePort); -extern void ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int - targetNodePort); extern char LookupShardTransferMode(Oid shardReplicationModeOid); extern void BlockWritesToShardList(List *shardList); extern List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId); diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index ff2eb2809..a37e5abdb 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -12,11 +12,17 @@ #include "distributed/shard_rebalancer.h" #include "nodes/pg_list.h" -extern void citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName, - int32 sourceNodePort, - char *targetNodeName, - int32 targetNodePort, - Oid shardReplicationModeOid); +typedef enum +{ + SHARD_TRANSFER_INVALID_FIRST = 0, + SHARD_TRANSFER_MOVE = 1, + SHARD_TRANSFER_COPY = 2 +} ShardTransferType; + +extern void TransferShards(int64 shardId, + char *sourceNodeName, int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort, + char shardReplicationMode, ShardTransferType transferType); extern uint64 ShardListSizeInBytes(List *colocatedShardList, char *workerNodeName, uint32 workerNodePort); extern void ErrorIfMoveUnsupportedTableType(Oid relationId);