mirror of https://github.com/citusdata/citus.git
Refactor shard transfers (#6631)
DESCRIPTION: Refactor and unify shard move and copy functions Shard move and copy functions share a lot of code in common. This PR unifies these functions into one, along with some helper functions. To preserve the current behavior, we'll introduce and use an enum parameter, and hardcoded strings for producing error/warning messages.pull/6810/head
parent
92b358fe0a
commit
697bb55fc5
|
@ -70,22 +70,43 @@ typedef struct ShardCommandList
|
|||
List *ddlCommandList;
|
||||
} ShardCommandList;
|
||||
|
||||
static const char *ShardTransferTypeNames[] = {
|
||||
[SHARD_TRANSFER_INVALID_FIRST] = "unknown",
|
||||
[SHARD_TRANSFER_MOVE] = "move",
|
||||
[SHARD_TRANSFER_COPY] = "copy",
|
||||
};
|
||||
|
||||
static const char *ShardTransferTypeNamesCapitalized[] = {
|
||||
[SHARD_TRANSFER_INVALID_FIRST] = "unknown",
|
||||
[SHARD_TRANSFER_MOVE] = "Move",
|
||||
[SHARD_TRANSFER_COPY] = "Copy",
|
||||
};
|
||||
|
||||
static const char *ShardTransferTypeNamesContinuous[] = {
|
||||
[SHARD_TRANSFER_INVALID_FIRST] = "unknown",
|
||||
[SHARD_TRANSFER_MOVE] = "Moving",
|
||||
[SHARD_TRANSFER_COPY] = "Copying",
|
||||
};
|
||||
|
||||
static const char *ShardTransferTypeFunctionNames[] = {
|
||||
[SHARD_TRANSFER_INVALID_FIRST] = "unknown",
|
||||
[SHARD_TRANSFER_MOVE] = "citus_move_shard_placement",
|
||||
[SHARD_TRANSFER_COPY] = "citus_copy_shard_placement",
|
||||
};
|
||||
|
||||
/* local function forward declarations */
|
||||
static bool CanUseLogicalReplication(Oid relationId, char shardReplicationMode);
|
||||
static void ErrorIfTableCannotBeReplicated(Oid relationId);
|
||||
static void ErrorIfTargetNodeIsNotSafeToCopyTo(const char *targetNodeName,
|
||||
int targetNodePort);
|
||||
static void ErrorIfTargetNodeIsNotSafeForTransfer(const char *targetNodeName,
|
||||
int targetNodePort,
|
||||
ShardTransferType transferType);
|
||||
static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort,
|
||||
char *targetNodeName, int targetNodePort,
|
||||
const char *operationName);
|
||||
static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort,
|
||||
char shardReplicationMode);
|
||||
static void CopyShardTables(List *shardIntervalList, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort, bool useLogicalReplication,
|
||||
char *operationName);
|
||||
const char *operationName);
|
||||
static void CopyShardTablesViaLogicalReplication(List *shardIntervalList,
|
||||
char *sourceNodeName,
|
||||
int32 sourceNodePort,
|
||||
|
@ -100,7 +121,7 @@ static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName,
|
|||
int32 targetNodePort);
|
||||
static List * RecreateTableDDLCommandList(Oid relationId);
|
||||
static void EnsureTableListOwner(List *tableIdList);
|
||||
static void EnsureTableListSuitableForReplication(List *tableIdList);
|
||||
static void ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList);
|
||||
|
||||
static void DropShardPlacementsFromMetadata(List *shardList,
|
||||
char *nodeName,
|
||||
|
@ -112,12 +133,28 @@ static void UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId,
|
|||
int32 targetNodePort);
|
||||
static bool IsShardListOnNode(List *colocatedShardList, char *targetNodeName,
|
||||
uint32 targetPort);
|
||||
static void SetupRebalanceMonitorForShardTransfer(uint64 shardId, Oid distributedTableId,
|
||||
char *sourceNodeName,
|
||||
uint32 sourceNodePort,
|
||||
char *targetNodeName,
|
||||
uint32 targetNodePort,
|
||||
ShardTransferType transferType);
|
||||
static void CheckSpaceConstraints(MultiConnection *connection,
|
||||
uint64 colocationSizeInBytes);
|
||||
static void EnsureAllShardsCanBeCopied(List *colocatedShardList,
|
||||
char *sourceNodeName, uint32 sourceNodePort,
|
||||
char *targetNodeName, uint32 targetNodePort);
|
||||
static void EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList,
|
||||
char *sourceNodeName, uint32 sourceNodePort,
|
||||
char *targetNodeName, uint32
|
||||
targetNodePort);
|
||||
char *targetNodeName, uint32 targetNodePort,
|
||||
ShardTransferType transferType);
|
||||
static bool TransferAlreadyCompleted(List *colocatedShardList,
|
||||
char *sourceNodeName, uint32 sourceNodePort,
|
||||
char *targetNodeName, uint32 targetNodePort,
|
||||
ShardTransferType transferType);
|
||||
static void LockColocatedRelationsForMove(List *colocatedTableList);
|
||||
static void ErrorIfForeignTableForShardTransfer(List *colocatedTableList,
|
||||
ShardTransferType transferType);
|
||||
static List * RecreateShardDDLCommandList(ShardInterval *shardInterval,
|
||||
const char *sourceNodeName,
|
||||
int32 sourceNodePort);
|
||||
|
@ -163,9 +200,9 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS)
|
|||
|
||||
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||
|
||||
ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
shardReplicationMode);
|
||||
TransferShards(shardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
shardReplicationMode, SHARD_TRANSFER_COPY);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -192,10 +229,9 @@ citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
|
|||
|
||||
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||
|
||||
ReplicateColocatedShardPlacement(shardId,
|
||||
sourceNode->workerName, sourceNode->workerPort,
|
||||
targetNode->workerName, targetNode->workerPort,
|
||||
shardReplicationMode);
|
||||
TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort,
|
||||
targetNode->workerName, targetNode->workerPort,
|
||||
shardReplicationMode, SHARD_TRANSFER_COPY);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -228,9 +264,9 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
|
|||
ereport(WARNING, (errmsg("do_repair argument is deprecated")));
|
||||
}
|
||||
|
||||
ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
shardReplicationMode);
|
||||
TransferShards(shardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
shardReplicationMode, SHARD_TRANSFER_COPY);
|
||||
|
||||
|
||||
PG_RETURN_VOID();
|
||||
|
@ -264,9 +300,10 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
|||
int32 targetNodePort = PG_GETARG_INT32(4);
|
||||
Oid shardReplicationModeOid = PG_GETARG_OID(5);
|
||||
|
||||
citus_move_shard_placement_internal(shardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
shardReplicationModeOid);
|
||||
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||
TransferShards(shardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
shardReplicationMode, SHARD_TRANSFER_MOVE);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -291,126 +328,111 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
|
|||
WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk);
|
||||
WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk);
|
||||
|
||||
citus_move_shard_placement_internal(shardId, sourceNode->workerName,
|
||||
sourceNode->workerPort, targetNode->workerName,
|
||||
targetNode->workerPort,
|
||||
shardReplicationModeOid);
|
||||
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||
TransferShards(shardId, sourceNode->workerName,
|
||||
sourceNode->workerPort, targetNode->workerName,
|
||||
targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_move_shard_placement_internal is the internal function for shard moves.
|
||||
* TransferShards is the function for shard transfers.
|
||||
*/
|
||||
void
|
||||
citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort, Oid shardReplicationModeOid)
|
||||
TransferShards(int64 shardId, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort, char shardReplicationMode,
|
||||
ShardTransferType transferType)
|
||||
{
|
||||
ListCell *colocatedTableCell = NULL;
|
||||
ListCell *colocatedShardCell = NULL;
|
||||
/* strings to be used in log messages */
|
||||
const char *operationName = ShardTransferTypeNames[transferType];
|
||||
const char *operationNameCapitalized =
|
||||
ShardTransferTypeNamesCapitalized[transferType];
|
||||
const char *operationFunctionName = ShardTransferTypeFunctionNames[transferType];
|
||||
|
||||
/* cannot transfer shard to the same node */
|
||||
ErrorIfSameNode(sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
"move");
|
||||
|
||||
Oid relationId = RelationIdForShard(shardId);
|
||||
ErrorIfMoveUnsupportedTableType(relationId);
|
||||
ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort);
|
||||
|
||||
AcquirePlacementColocationLock(relationId, ExclusiveLock, "move");
|
||||
operationName);
|
||||
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
Oid distributedTableId = shardInterval->relationId;
|
||||
|
||||
/* error if unsupported shard transfer */
|
||||
if (transferType == SHARD_TRANSFER_MOVE)
|
||||
{
|
||||
ErrorIfMoveUnsupportedTableType(distributedTableId);
|
||||
}
|
||||
else if (transferType == SHARD_TRANSFER_COPY)
|
||||
{
|
||||
ErrorIfTableCannotBeReplicated(distributedTableId);
|
||||
EnsureNoModificationsHaveBeenDone();
|
||||
}
|
||||
|
||||
ErrorIfTargetNodeIsNotSafeForTransfer(targetNodeName, targetNodePort, transferType);
|
||||
|
||||
AcquirePlacementColocationLock(distributedTableId, ExclusiveLock, operationName);
|
||||
|
||||
List *colocatedTableList = ColocatedTableList(distributedTableId);
|
||||
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||
|
||||
foreach(colocatedTableCell, colocatedTableList)
|
||||
EnsureTableListOwner(colocatedTableList);
|
||||
|
||||
if (transferType == SHARD_TRANSFER_MOVE)
|
||||
{
|
||||
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
|
||||
|
||||
/* check that user has owner rights in all co-located tables */
|
||||
EnsureTableOwner(colocatedTableId);
|
||||
|
||||
/*
|
||||
* Block concurrent DDL / TRUNCATE commands on the relation. Similarly,
|
||||
* block concurrent citus_move_shard_placement() on any shard of
|
||||
* the same relation. This is OK for now since we're executing shard
|
||||
* moves sequentially anyway.
|
||||
*/
|
||||
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
|
||||
|
||||
if (IsForeignTable(relationId))
|
||||
{
|
||||
char *relationName = get_rel_name(colocatedTableId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot move shard"),
|
||||
errdetail("Table %s is a foreign table. Moving "
|
||||
"shards backed by foreign tables is "
|
||||
"not supported.", relationName)));
|
||||
}
|
||||
LockColocatedRelationsForMove(colocatedTableList);
|
||||
}
|
||||
|
||||
/* we sort colocatedShardList so that lock operations will not cause any deadlocks */
|
||||
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
||||
ErrorIfForeignTableForShardTransfer(colocatedTableList, transferType);
|
||||
|
||||
if (transferType == SHARD_TRANSFER_COPY)
|
||||
{
|
||||
ErrorIfReplicatingDistributedTableWithFKeys(colocatedTableList);
|
||||
}
|
||||
|
||||
/*
|
||||
* If there are no active placements on the source and only active placements on
|
||||
* the target node, we assume the copy to already be done.
|
||||
* We sort shardIntervalList so that lock operations will not cause any
|
||||
* deadlocks.
|
||||
*/
|
||||
if (IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) &&
|
||||
!IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort))
|
||||
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
||||
|
||||
if (TransferAlreadyCompleted(colocatedShardList,
|
||||
sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
transferType))
|
||||
{
|
||||
/* if the transfer is already completed, we can return right away */
|
||||
ereport(WARNING, (errmsg("shard is already present on node %s:%d",
|
||||
targetNodeName, targetNodePort),
|
||||
errdetail("Move may have already completed.")));
|
||||
errdetail("%s may have already completed.",
|
||||
operationNameCapitalized)));
|
||||
return;
|
||||
}
|
||||
|
||||
foreach(colocatedShardCell, colocatedShardList)
|
||||
{
|
||||
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
||||
uint64 colocatedShardId = colocatedShard->shardId;
|
||||
|
||||
EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort,
|
||||
EnsureAllShardsCanBeCopied(colocatedShardList, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort);
|
||||
}
|
||||
|
||||
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||
if (shardReplicationMode == TRANSFER_MODE_AUTOMATIC)
|
||||
{
|
||||
VerifyTablesHaveReplicaIdentity(colocatedTableList);
|
||||
}
|
||||
|
||||
EnsureEnoughDiskSpaceForShardMove(colocatedShardList, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort);
|
||||
EnsureEnoughDiskSpaceForShardMove(colocatedShardList,
|
||||
sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort, transferType);
|
||||
|
||||
|
||||
/*
|
||||
* We want to be able to track progress of shard moves using
|
||||
* get_rebalancer_progress. If this move is initiated by the rebalancer,
|
||||
* then the rebalancer call has already set up the shared memory that is
|
||||
* used to do that. But if citus_move_shard_placement is called directly by
|
||||
* the user (or through any other mechanism), then the shared memory is not
|
||||
* set up yet. In that case we do it here.
|
||||
*/
|
||||
if (!IsRebalancerInternalBackend())
|
||||
{
|
||||
WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
|
||||
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);
|
||||
|
||||
PlacementUpdateEvent *placementUpdateEvent = palloc0(
|
||||
sizeof(PlacementUpdateEvent));
|
||||
placementUpdateEvent->updateType = PLACEMENT_UPDATE_MOVE;
|
||||
placementUpdateEvent->shardId = shardId;
|
||||
placementUpdateEvent->sourceNode = sourceNode;
|
||||
placementUpdateEvent->targetNode = targetNode;
|
||||
SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId,
|
||||
REBALANCE_PROGRESS_MOVING,
|
||||
PLACEMENT_UPDATE_STATUS_SETTING_UP);
|
||||
}
|
||||
SetupRebalanceMonitorForShardTransfer(shardId, distributedTableId,
|
||||
sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
transferType);
|
||||
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
colocatedShardList,
|
||||
|
@ -428,7 +450,7 @@ citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName,
|
|||
{
|
||||
BlockWritesToShardList(colocatedShardList);
|
||||
}
|
||||
else
|
||||
else if (transferType == SHARD_TRANSFER_MOVE)
|
||||
{
|
||||
/*
|
||||
* We prevent multiple shard moves in a transaction that use logical
|
||||
|
@ -452,6 +474,20 @@ citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName,
|
|||
PlacementMovedUsingLogicalReplicationInTX = true;
|
||||
}
|
||||
|
||||
if (transferType == SHARD_TRANSFER_COPY &&
|
||||
!IsCitusTableType(distributedTableId, REFERENCE_TABLE))
|
||||
{
|
||||
/*
|
||||
* When copying a shard to a new node, we should first ensure that reference
|
||||
* tables are present such that joins work immediately after copying the shard.
|
||||
* When copying a reference table, we are probably trying to achieve just that.
|
||||
*
|
||||
* Since this a long-running operation we do this after the error checks, but
|
||||
* before taking metadata locks.
|
||||
*/
|
||||
EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode);
|
||||
}
|
||||
|
||||
DropOrphanedResourcesInSeparateTransaction();
|
||||
|
||||
ShardInterval *colocatedShard = NULL;
|
||||
|
@ -466,18 +502,21 @@ citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName,
|
|||
ErrorIfCleanupRecordForShardExists(qualifiedShardName);
|
||||
}
|
||||
|
||||
/*
|
||||
* CopyColocatedShardPlacement function copies given shard with its co-located
|
||||
* shards.
|
||||
*/
|
||||
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName,
|
||||
targetNodePort, useLogicalReplication, "citus_move_shard_placement");
|
||||
targetNodePort, useLogicalReplication, operationFunctionName);
|
||||
|
||||
/* delete old shards metadata and mark the shards as to be deferred drop */
|
||||
int32 sourceGroupId = GroupForNode(sourceNodeName, sourceNodePort);
|
||||
InsertCleanupRecordsForShardPlacementsOnNode(colocatedShardList,
|
||||
sourceGroupId);
|
||||
if (transferType == SHARD_TRANSFER_MOVE)
|
||||
{
|
||||
/* delete old shards metadata and mark the shards as to be deferred drop */
|
||||
int32 sourceGroupId = GroupForNode(sourceNodeName, sourceNodePort);
|
||||
InsertCleanupRecordsForShardPlacementsOnNode(colocatedShardList,
|
||||
sourceGroupId);
|
||||
}
|
||||
|
||||
/*
|
||||
* Finally insert the placements to pg_dist_placement and sync it to the
|
||||
* metadata workers.
|
||||
*/
|
||||
colocatedShard = NULL;
|
||||
foreach_ptr(colocatedShard, colocatedShardList)
|
||||
{
|
||||
|
@ -488,17 +527,30 @@ citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName,
|
|||
InsertShardPlacementRow(colocatedShardId, placementId,
|
||||
ShardLength(colocatedShardId),
|
||||
groupId);
|
||||
|
||||
if (transferType == SHARD_TRANSFER_COPY &&
|
||||
ShouldSyncTableMetadata(colocatedShard->relationId))
|
||||
{
|
||||
char *placementCommand = PlacementUpsertCommand(colocatedShardId, placementId,
|
||||
0, groupId);
|
||||
|
||||
SendCommandToWorkersWithMetadata(placementCommand);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Since this is move operation, we remove the placements from the metadata
|
||||
* for the source node after copy.
|
||||
*/
|
||||
DropShardPlacementsFromMetadata(colocatedShardList, sourceNodeName, sourceNodePort);
|
||||
if (transferType == SHARD_TRANSFER_MOVE)
|
||||
{
|
||||
/*
|
||||
* Since this is move operation, we remove the placements from the metadata
|
||||
* for the source node after copy.
|
||||
*/
|
||||
DropShardPlacementsFromMetadata(colocatedShardList,
|
||||
sourceNodeName, sourceNodePort);
|
||||
|
||||
UpdateColocatedShardPlacementMetadataOnWorkers(shardId, sourceNodeName,
|
||||
sourceNodePort, targetNodeName,
|
||||
targetNodePort);
|
||||
UpdateColocatedShardPlacementMetadataOnWorkers(shardId, sourceNodeName,
|
||||
sourceNodePort, targetNodeName,
|
||||
targetNodePort);
|
||||
}
|
||||
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
colocatedShardList,
|
||||
|
@ -611,6 +663,70 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockColocatedRelationsForMove takes a list of relations, locks all of them
|
||||
* using ShareUpdateExclusiveLock
|
||||
*/
|
||||
static void
|
||||
LockColocatedRelationsForMove(List *colocatedTableList)
|
||||
{
|
||||
Oid colocatedTableId = InvalidOid;
|
||||
foreach_oid(colocatedTableId, colocatedTableList)
|
||||
{
|
||||
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfForeignTableForShardTransfer takes a list of relations, errors out if
|
||||
* there's a foreign table in the list.
|
||||
*/
|
||||
static void
|
||||
ErrorIfForeignTableForShardTransfer(List *colocatedTableList,
|
||||
ShardTransferType transferType)
|
||||
{
|
||||
Oid colocatedTableId = InvalidOid;
|
||||
foreach_oid(colocatedTableId, colocatedTableList)
|
||||
{
|
||||
if (IsForeignTable(colocatedTableId))
|
||||
{
|
||||
char *relationName = get_rel_name(colocatedTableId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot %s shard",
|
||||
ShardTransferTypeNames[transferType]),
|
||||
errdetail("Table %s is a foreign table. "
|
||||
"%s shards backed by foreign tables is "
|
||||
"not supported.", relationName,
|
||||
ShardTransferTypeNamesContinuous[transferType])));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureAllShardsCanBeCopied is a wrapper around EnsureShardCanBeCopied.
|
||||
*/
|
||||
static void
|
||||
EnsureAllShardsCanBeCopied(List *colocatedShardList,
|
||||
char *sourceNodeName, uint32 sourceNodePort,
|
||||
char *targetNodeName, uint32 targetNodePort)
|
||||
{
|
||||
ShardInterval *colocatedShard = NULL;
|
||||
foreach_ptr(colocatedShard, colocatedShardList)
|
||||
{
|
||||
uint64 colocatedShardId = colocatedShard->shardId;
|
||||
|
||||
/*
|
||||
* To transfer shard, there should be healthy placement in source node and no
|
||||
* placement in the target node.
|
||||
*/
|
||||
EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureEnoughDiskSpaceForShardMove checks that there is enough space for
|
||||
* shard moves of the given colocated shard list from source node to target node.
|
||||
|
@ -619,9 +735,10 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN
|
|||
static void
|
||||
EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList,
|
||||
char *sourceNodeName, uint32 sourceNodePort,
|
||||
char *targetNodeName, uint32 targetNodePort)
|
||||
char *targetNodeName, uint32 targetNodePort,
|
||||
ShardTransferType transferType)
|
||||
{
|
||||
if (!CheckAvailableSpaceBeforeMove)
|
||||
if (!CheckAvailableSpaceBeforeMove || transferType != SHARD_TRANSFER_MOVE)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -636,6 +753,34 @@ EnsureEnoughDiskSpaceForShardMove(List *colocatedShardList,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* TransferAlreadyCompleted returns true if the given shard transfer is already done.
|
||||
* Returns false otherwise.
|
||||
*/
|
||||
static bool
|
||||
TransferAlreadyCompleted(List *colocatedShardList,
|
||||
char *sourceNodeName, uint32 sourceNodePort,
|
||||
char *targetNodeName, uint32 targetNodePort,
|
||||
ShardTransferType transferType)
|
||||
{
|
||||
if (transferType == SHARD_TRANSFER_MOVE &&
|
||||
IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) &&
|
||||
!IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (transferType == SHARD_TRANSFER_COPY &&
|
||||
IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) &&
|
||||
IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardListSizeInBytes returns the size in bytes of a set of shard tables.
|
||||
*/
|
||||
|
@ -682,6 +827,49 @@ ShardListSizeInBytes(List *shardList, char *workerNodeName, uint32
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetupRebalanceMonitorForShardTransfer prepares the parameters and
|
||||
* calls SetupRebalanceMonitor, unless the current transfer is a move
|
||||
* initiated by the rebalancer.
|
||||
* See comments on SetupRebalanceMonitor
|
||||
*/
|
||||
static void
|
||||
SetupRebalanceMonitorForShardTransfer(uint64 shardId, Oid distributedTableId,
|
||||
char *sourceNodeName, uint32 sourceNodePort,
|
||||
char *targetNodeName, uint32 targetNodePort,
|
||||
ShardTransferType transferType)
|
||||
{
|
||||
if (transferType == SHARD_TRANSFER_MOVE && IsRebalancerInternalBackend())
|
||||
{
|
||||
/*
|
||||
* We want to be able to track progress of shard moves using
|
||||
* get_rebalancer_progress. If this move is initiated by the rebalancer,
|
||||
* then the rebalancer call has already set up the shared memory that is
|
||||
* used to do that, so we should return here.
|
||||
* But if citus_move_shard_placement is called directly by the user
|
||||
* (or through any other mechanism), then the shared memory is not
|
||||
* set up yet. In that case we do it here.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
|
||||
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);
|
||||
|
||||
PlacementUpdateEvent *placementUpdateEvent = palloc0(
|
||||
sizeof(PlacementUpdateEvent));
|
||||
placementUpdateEvent->updateType =
|
||||
transferType == SHARD_TRANSFER_COPY ? PLACEMENT_UPDATE_COPY :
|
||||
PLACEMENT_UPDATE_MOVE;
|
||||
placementUpdateEvent->shardId = shardId;
|
||||
placementUpdateEvent->sourceNode = sourceNode;
|
||||
placementUpdateEvent->targetNode = targetNode;
|
||||
SetupRebalanceMonitor(list_make1(placementUpdateEvent), distributedTableId,
|
||||
REBALANCE_PROGRESS_MOVING,
|
||||
PLACEMENT_UPDATE_STATUS_SETTING_UP);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CheckSpaceConstraints checks there is enough space to place the colocation
|
||||
* on the node that the connection is connected to.
|
||||
|
@ -729,17 +917,19 @@ CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes)
|
|||
|
||||
|
||||
/*
|
||||
* ErrorIfTargetNodeIsNotSafeToMove throws error if the target node is not
|
||||
* eligible for moving shards.
|
||||
* ErrorIfTargetNodeIsNotSafeForTransfer throws error if the target node is not
|
||||
* eligible for shard transfers.
|
||||
*/
|
||||
void
|
||||
ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int targetNodePort)
|
||||
static void
|
||||
ErrorIfTargetNodeIsNotSafeForTransfer(const char *targetNodeName, int targetNodePort,
|
||||
ShardTransferType transferType)
|
||||
{
|
||||
WorkerNode *workerNode = FindWorkerNode(targetNodeName, targetNodePort);
|
||||
if (workerNode == NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Moving shards to a non-existing node is not supported"),
|
||||
errmsg("%s shards to a non-existing node is not supported",
|
||||
ShardTransferTypeNamesContinuous[transferType]),
|
||||
errhint(
|
||||
"Add the target node via SELECT citus_add_node('%s', %d);",
|
||||
targetNodeName, targetNodePort)));
|
||||
|
@ -748,13 +938,14 @@ ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int targetNodePort)
|
|||
if (!workerNode->isActive)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Moving shards to a non-active node is not supported"),
|
||||
errmsg("%s shards to a non-active node is not supported",
|
||||
ShardTransferTypeNamesContinuous[transferType]),
|
||||
errhint(
|
||||
"Activate the target node via SELECT citus_activate_node('%s', %d);",
|
||||
targetNodeName, targetNodePort)));
|
||||
}
|
||||
|
||||
if (!workerNode->shouldHaveShards)
|
||||
if (transferType == SHARD_TRANSFER_MOVE && !workerNode->shouldHaveShards)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Moving shards to a node that shouldn't have a shard is "
|
||||
|
@ -767,8 +958,9 @@ ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int targetNodePort)
|
|||
if (!NodeIsPrimary(workerNode))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Moving shards to a secondary (e.g., replica) node is "
|
||||
"not supported")));
|
||||
errmsg("%s shards to a secondary (e.g., replica) node is "
|
||||
"not supported",
|
||||
ShardTransferTypeNamesContinuous[transferType])));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1046,41 +1238,6 @@ ErrorIfTableCannotBeReplicated(Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfTargetNodeIsNotSafeToCopyTo throws an error if the target node is not
|
||||
* eligible for copying shards.
|
||||
*/
|
||||
static void
|
||||
ErrorIfTargetNodeIsNotSafeToCopyTo(const char *targetNodeName, int targetNodePort)
|
||||
{
|
||||
WorkerNode *workerNode = FindWorkerNode(targetNodeName, targetNodePort);
|
||||
if (workerNode == NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Copying shards to a non-existing node is not supported"),
|
||||
errhint(
|
||||
"Add the target node via SELECT citus_add_node('%s', %d);",
|
||||
targetNodeName, targetNodePort)));
|
||||
}
|
||||
|
||||
if (!workerNode->isActive)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Copying shards to a non-active node is not supported"),
|
||||
errhint(
|
||||
"Activate the target node via SELECT citus_activate_node('%s', %d);",
|
||||
targetNodeName, targetNodePort)));
|
||||
}
|
||||
|
||||
if (!NodeIsPrimary(workerNode))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("Copying shards to a secondary (e.g., replica) node is "
|
||||
"not supported")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum
|
||||
* values to a char.
|
||||
|
@ -1114,154 +1271,6 @@ LookupShardTransferMode(Oid shardReplicationModeOid)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReplicateColocatedShardPlacement replicates the given shard and its
|
||||
* colocated shards from a source node to target node.
|
||||
*/
|
||||
static void
|
||||
ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort, char shardReplicationMode)
|
||||
{
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
Oid distributedTableId = shardInterval->relationId;
|
||||
|
||||
ErrorIfSameNode(sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
"copy");
|
||||
|
||||
ErrorIfTableCannotBeReplicated(shardInterval->relationId);
|
||||
ErrorIfTargetNodeIsNotSafeToCopyTo(targetNodeName, targetNodePort);
|
||||
EnsureNoModificationsHaveBeenDone();
|
||||
|
||||
AcquirePlacementColocationLock(shardInterval->relationId, ExclusiveLock, "copy");
|
||||
|
||||
List *colocatedTableList = ColocatedTableList(distributedTableId);
|
||||
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||
|
||||
EnsureTableListOwner(colocatedTableList);
|
||||
EnsureTableListSuitableForReplication(colocatedTableList);
|
||||
|
||||
/*
|
||||
* We sort shardIntervalList so that lock operations will not cause any
|
||||
* deadlocks.
|
||||
*/
|
||||
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
||||
|
||||
/*
|
||||
* If there are active placements on both nodes, we assume the copy to already
|
||||
* be done.
|
||||
*/
|
||||
if (IsShardListOnNode(colocatedShardList, targetNodeName, targetNodePort) &&
|
||||
IsShardListOnNode(colocatedShardList, sourceNodeName, sourceNodePort))
|
||||
{
|
||||
ereport(WARNING, (errmsg("shard is already present on node %s:%d",
|
||||
targetNodeName, targetNodePort),
|
||||
errdetail("Copy may have already completed.")));
|
||||
return;
|
||||
}
|
||||
|
||||
WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
|
||||
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);
|
||||
|
||||
Oid relationId = RelationIdForShard(shardId);
|
||||
PlacementUpdateEvent *placementUpdateEvent = palloc0(
|
||||
sizeof(PlacementUpdateEvent));
|
||||
placementUpdateEvent->updateType = PLACEMENT_UPDATE_COPY;
|
||||
placementUpdateEvent->shardId = shardId;
|
||||
placementUpdateEvent->sourceNode = sourceNode;
|
||||
placementUpdateEvent->targetNode = targetNode;
|
||||
SetupRebalanceMonitor(list_make1(placementUpdateEvent), relationId,
|
||||
REBALANCE_PROGRESS_MOVING,
|
||||
PLACEMENT_UPDATE_STATUS_SETTING_UP);
|
||||
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
colocatedShardList,
|
||||
sourceNodeName,
|
||||
sourceNodePort,
|
||||
PLACEMENT_UPDATE_STATUS_SETTING_UP);
|
||||
|
||||
/*
|
||||
* At this point of the shard replication, we don't need to block the writes to
|
||||
* shards when logical replication is used.
|
||||
*/
|
||||
bool useLogicalReplication = CanUseLogicalReplication(distributedTableId,
|
||||
shardReplicationMode);
|
||||
if (!useLogicalReplication)
|
||||
{
|
||||
BlockWritesToShardList(colocatedShardList);
|
||||
}
|
||||
|
||||
ShardInterval *colocatedShard = NULL;
|
||||
foreach_ptr(colocatedShard, colocatedShardList)
|
||||
{
|
||||
uint64 colocatedShardId = colocatedShard->shardId;
|
||||
|
||||
/*
|
||||
* For shard copy, there should be healthy placement in source node and no
|
||||
* placement in the target node.
|
||||
*/
|
||||
EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort);
|
||||
}
|
||||
|
||||
if (shardReplicationMode == TRANSFER_MODE_AUTOMATIC)
|
||||
{
|
||||
VerifyTablesHaveReplicaIdentity(colocatedTableList);
|
||||
}
|
||||
|
||||
if (!IsCitusTableType(distributedTableId, REFERENCE_TABLE))
|
||||
{
|
||||
/*
|
||||
* When copying a shard to a new node, we should first ensure that reference
|
||||
* tables are present such that joins work immediately after copying the shard.
|
||||
* When copying a reference table, we are probably trying to achieve just that.
|
||||
*
|
||||
* Since this a long-running operation we do this after the error checks, but
|
||||
* before taking metadata locks.
|
||||
*/
|
||||
EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode);
|
||||
}
|
||||
|
||||
DropOrphanedResourcesInSeparateTransaction();
|
||||
|
||||
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort, useLogicalReplication,
|
||||
"citus_copy_shard_placement");
|
||||
|
||||
/*
|
||||
* Finally insert the placements to pg_dist_placement and sync it to the
|
||||
* metadata workers.
|
||||
*/
|
||||
foreach_ptr(colocatedShard, colocatedShardList)
|
||||
{
|
||||
uint64 colocatedShardId = colocatedShard->shardId;
|
||||
uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
|
||||
uint64 placementId = GetNextPlacementId();
|
||||
|
||||
InsertShardPlacementRow(colocatedShardId, placementId,
|
||||
ShardLength(colocatedShardId),
|
||||
groupId);
|
||||
|
||||
if (ShouldSyncTableMetadata(colocatedShard->relationId))
|
||||
{
|
||||
char *placementCommand = PlacementUpsertCommand(colocatedShardId, placementId,
|
||||
0, groupId);
|
||||
|
||||
SendCommandToWorkersWithMetadata(placementCommand);
|
||||
}
|
||||
}
|
||||
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
colocatedShardList,
|
||||
sourceNodeName,
|
||||
sourceNodePort,
|
||||
PLACEMENT_UPDATE_STATUS_COMPLETED);
|
||||
|
||||
FinalizeCurrentProgressMonitor();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureTableListOwner ensures current user owns given tables. Superusers
|
||||
* are regarded as owners.
|
||||
|
@ -1278,25 +1287,15 @@ EnsureTableListOwner(List *tableIdList)
|
|||
|
||||
|
||||
/*
|
||||
* EnsureTableListSuitableForReplication errors out if given tables are not
|
||||
* ErrorIfReplicatingDistributedTableWithFKeys errors out if given tables are not
|
||||
* suitable for replication.
|
||||
*/
|
||||
static void
|
||||
EnsureTableListSuitableForReplication(List *tableIdList)
|
||||
ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList)
|
||||
{
|
||||
Oid tableId = InvalidOid;
|
||||
foreach_oid(tableId, tableIdList)
|
||||
{
|
||||
if (IsForeignTable(tableId))
|
||||
{
|
||||
char *relationName = get_rel_name(tableId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot replicate shard"),
|
||||
errdetail("Table %s is a foreign table. Replicating "
|
||||
"shards backed by foreign tables is "
|
||||
"not supported.", relationName)));
|
||||
}
|
||||
|
||||
List *foreignConstraintCommandList =
|
||||
GetReferencingForeignConstaintCommands(tableId);
|
||||
|
||||
|
@ -1318,7 +1317,7 @@ EnsureTableListSuitableForReplication(List *tableIdList)
|
|||
static void
|
||||
CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort,
|
||||
char *targetNodeName, int32 targetNodePort, bool useLogicalReplication,
|
||||
char *operationName)
|
||||
const char *operationName)
|
||||
{
|
||||
if (list_length(shardIntervalList) < 1)
|
||||
{
|
||||
|
|
|
@ -310,8 +310,6 @@ extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList,
|
|||
extern ShardPlacement * SearchShardPlacementInListOrError(List *shardPlacementList,
|
||||
const char *nodeName,
|
||||
uint32 nodePort);
|
||||
extern void ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int
|
||||
targetNodePort);
|
||||
extern char LookupShardTransferMode(Oid shardReplicationModeOid);
|
||||
extern void BlockWritesToShardList(List *shardList);
|
||||
extern List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId);
|
||||
|
|
|
@ -12,11 +12,17 @@
|
|||
#include "distributed/shard_rebalancer.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
extern void citus_move_shard_placement_internal(int64 shardId, char *sourceNodeName,
|
||||
int32 sourceNodePort,
|
||||
char *targetNodeName,
|
||||
int32 targetNodePort,
|
||||
Oid shardReplicationModeOid);
|
||||
typedef enum
|
||||
{
|
||||
SHARD_TRANSFER_INVALID_FIRST = 0,
|
||||
SHARD_TRANSFER_MOVE = 1,
|
||||
SHARD_TRANSFER_COPY = 2
|
||||
} ShardTransferType;
|
||||
|
||||
extern void TransferShards(int64 shardId,
|
||||
char *sourceNodeName, int32 sourceNodePort,
|
||||
char *targetNodeName, int32 targetNodePort,
|
||||
char shardReplicationMode, ShardTransferType transferType);
|
||||
extern uint64 ShardListSizeInBytes(List *colocatedShardList,
|
||||
char *workerNodeName, uint32 workerNodePort);
|
||||
extern void ErrorIfMoveUnsupportedTableType(Oid relationId);
|
||||
|
|
Loading…
Reference in New Issue