mirror of https://github.com/citusdata/citus.git
Parallelize shard rebalancing to reduce rebalance time
This commit introduces the following changes: - Break out reference-table shard copies into independent background tasks - Introduce a new “single-shard” flag to the TransferShard APIs so they can either transfer all collected shards for a shard ID (existing behavior), or copy just the one shard specified by shardID - In citus_rebalance_start(), use this flag to spawn a separate background task for each reference-table shard, creating the table and loading its data without any constraints - Added a SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY flag so TransferShards() can defer foreign-key and other constraint creation - After all reference-table copies complete, schedule a final task that applies all deferred constraints in one batch - Introduce an advisory lock to serialize rebalance operations; downgrade previous colocation locks from ExclusiveLock to RowExclusiveLock so they don’t conflict with the rebalance - Remove intra-colocation-group dependencies so shards in the same group can move independently - Increase default citus.max_background_task_executors_per_node from 1 to 4pull/7983/head
parent
088ba75057
commit
49e56001fd
|
@ -220,7 +220,6 @@ typedef struct ShardMoveSourceNodeHashEntry
|
|||
*/
|
||||
typedef struct ShardMoveDependencies
|
||||
{
|
||||
HTAB *colocationDependencies;
|
||||
HTAB *nodeDependencies;
|
||||
} ShardMoveDependencies;
|
||||
|
||||
|
@ -274,6 +273,7 @@ static int64 RebalanceTableShardsBackground(RebalanceOptions *options, Oid
|
|||
static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName);
|
||||
static void ExecutePlacementUpdates(List *placementUpdateList, Oid
|
||||
shardReplicationModeOid, char *noticeOperation);
|
||||
static void AcquireRebalanceOperationLock(const char *operationName);
|
||||
static float4 CalculateUtilization(float4 totalCost, float4 capacity);
|
||||
static Form_pg_dist_rebalance_strategy GetRebalanceStrategy(Name name);
|
||||
static void EnsureShardCostUDF(Oid functionOid);
|
||||
|
@ -297,8 +297,9 @@ static void ErrorOnConcurrentRebalance(RebalanceOptions *);
|
|||
static List * GetSetCommandListForNewConnections(void);
|
||||
static int64 GetColocationId(PlacementUpdateEvent *move);
|
||||
static ShardMoveDependencies InitializeShardMoveDependencies();
|
||||
static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64
|
||||
colocationId,
|
||||
static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move,
|
||||
int64 *refTablesDepTaskIds,
|
||||
int refTablesDepTaskIdsCount,
|
||||
ShardMoveDependencies shardMoveDependencies,
|
||||
int *nDepends);
|
||||
static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId,
|
||||
|
@ -767,6 +768,31 @@ AcquireRebalanceColocationLock(Oid relationId, const char *operationName)
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* AcquireRebalanceOperationLock does not allow concurrent rebalance
|
||||
* operations.
|
||||
*/
|
||||
static void
|
||||
AcquireRebalanceOperationLock(const char *operationName)
|
||||
{
|
||||
LOCKTAG tag;
|
||||
const bool sessionLock = false;
|
||||
const bool dontWait = true;
|
||||
|
||||
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_REBALANCE_OPERATION);
|
||||
|
||||
LockAcquireResult lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock,
|
||||
dontWait);
|
||||
if (!lockAcquired)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not acquire the lock required for %s operation",
|
||||
operationName),
|
||||
errdetail("It means that either a concurrent shard move "
|
||||
"or shard copy is happening."),
|
||||
errhint("Make sure that the concurrent operation has "
|
||||
"finished and re-run the command")));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* AcquirePlacementColocationLock tries to acquire a lock for
|
||||
|
@ -1954,6 +1980,8 @@ ErrorOnConcurrentRebalance(RebalanceOptions *options)
|
|||
AcquireRebalanceColocationLock(relationId, options->operationName);
|
||||
}
|
||||
|
||||
AcquireRebalanceOperationLock(options->operationName);
|
||||
|
||||
int64 jobId = 0;
|
||||
if (HasNonTerminalJobOfType("rebalance", &jobId))
|
||||
{
|
||||
|
@ -1991,10 +2019,6 @@ static ShardMoveDependencies
|
|||
InitializeShardMoveDependencies()
|
||||
{
|
||||
ShardMoveDependencies shardMoveDependencies;
|
||||
shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64,
|
||||
ShardMoveDependencyInfo,
|
||||
"colocationDependencyHashMap",
|
||||
6);
|
||||
shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32,
|
||||
ShardMoveSourceNodeHashEntry,
|
||||
"nodeDependencyHashMap",
|
||||
|
@ -2008,23 +2032,14 @@ InitializeShardMoveDependencies()
|
|||
* the move must take a dependency on, given the shard move dependencies as input.
|
||||
*/
|
||||
static int64 *
|
||||
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
|
||||
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 *refTablesDepTaskIds,
|
||||
int refTablesDepTaskIdsCount,
|
||||
ShardMoveDependencies shardMoveDependencies, int *nDepends)
|
||||
{
|
||||
HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64,
|
||||
"shardMoveDependencyList", 0);
|
||||
|
||||
bool found;
|
||||
|
||||
/* Check if there exists a move in the same colocation group scheduled earlier. */
|
||||
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
|
||||
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found);
|
||||
|
||||
if (found)
|
||||
{
|
||||
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if there exists moves scheduled earlier whose source node
|
||||
* overlaps with the current move's target node.
|
||||
|
@ -2045,6 +2060,19 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* shard copy can only start after finishing copy of reference table shards
|
||||
* so each shard task will have a dependency on the task that indicates the
|
||||
* copy complete of reference tables
|
||||
*/
|
||||
while (refTablesDepTaskIdsCount > 0)
|
||||
{
|
||||
int64 refTableTaskId = *refTablesDepTaskIds;
|
||||
hash_search(dependsList, &refTableTaskId, HASH_ENTER, NULL);
|
||||
refTablesDepTaskIds++;
|
||||
refTablesDepTaskIdsCount--;
|
||||
}
|
||||
|
||||
*nDepends = hash_get_num_entries(dependsList);
|
||||
|
||||
int64 *dependsArray = NULL;
|
||||
|
@ -2076,9 +2104,6 @@ static void
|
|||
UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId,
|
||||
ShardMoveDependencies shardMoveDependencies)
|
||||
{
|
||||
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
|
||||
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL);
|
||||
shardMoveDependencyInfo->taskId = taskId;
|
||||
|
||||
bool found;
|
||||
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
|
||||
|
@ -2174,30 +2199,18 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
|
|||
initStringInfo(&buf);
|
||||
|
||||
List *referenceTableIdList = NIL;
|
||||
int64 replicateRefTablesTaskId = 0;
|
||||
int64 *refTablesDepTaskIds = NULL;
|
||||
int refTablesDepTaskIdsCount = 0;
|
||||
|
||||
if (HasNodesWithMissingReferenceTables(&referenceTableIdList))
|
||||
{
|
||||
if (shardTransferMode == TRANSFER_MODE_AUTOMATIC)
|
||||
{
|
||||
VerifyTablesHaveReplicaIdentity(referenceTableIdList);
|
||||
}
|
||||
|
||||
/*
|
||||
* Reference tables need to be copied to (newly-added) nodes, this needs to be the
|
||||
* first task before we can move any other table.
|
||||
*/
|
||||
appendStringInfo(&buf,
|
||||
"SELECT pg_catalog.replicate_reference_tables(%s)",
|
||||
quote_literal_cstr(shardTranferModeLabel));
|
||||
|
||||
int32 nodesInvolved[] = { 0 };
|
||||
|
||||
/* replicate_reference_tables permissions require superuser */
|
||||
Oid superUserId = CitusExtensionOwner();
|
||||
BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0,
|
||||
NULL, 0, nodesInvolved);
|
||||
replicateRefTablesTaskId = task->taskid;
|
||||
refTablesDepTaskIds = ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(jobId, TRANSFER_MODE_BLOCK_WRITES, &refTablesDepTaskIdsCount);
|
||||
ereport(DEBUG2,
|
||||
(errmsg("%d dependent copy reference table tasks for job %ld",
|
||||
refTablesDepTaskIdsCount, jobId),
|
||||
errdetail("Rebalance scheduled as background job"),
|
||||
errhint("To monitor progress, run: SELECT * FROM "
|
||||
"citus_rebalance_status();")));
|
||||
}
|
||||
|
||||
PlacementUpdateEvent *move = NULL;
|
||||
|
@ -2219,17 +2232,11 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
|
|||
|
||||
int nDepends = 0;
|
||||
|
||||
int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId,
|
||||
int64 *dependsArray = GenerateTaskMoveDependencyList(move, refTablesDepTaskIds,
|
||||
refTablesDepTaskIdsCount,
|
||||
shardMoveDependencies,
|
||||
&nDepends);
|
||||
|
||||
if (nDepends == 0 && replicateRefTablesTaskId > 0)
|
||||
{
|
||||
nDepends = 1;
|
||||
dependsArray = palloc(nDepends * sizeof(int64));
|
||||
dependsArray[0] = replicateRefTablesTaskId;
|
||||
}
|
||||
|
||||
int32 nodesInvolved[2] = { 0 };
|
||||
nodesInvolved[0] = move->sourceNode->nodeId;
|
||||
nodesInvolved[1] = move->targetNode->nodeId;
|
||||
|
|
|
@ -107,16 +107,18 @@ static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort,
|
|||
static void CopyShardTables(List *shardIntervalList, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort, bool useLogicalReplication,
|
||||
const char *operationName);
|
||||
const char *operationName, uint32 optionFlags);
|
||||
static void CopyShardTablesViaLogicalReplication(List *shardIntervalList,
|
||||
char *sourceNodeName,
|
||||
int32 sourceNodePort,
|
||||
char *targetNodeName,
|
||||
int32 targetNodePort);
|
||||
int32 targetNodePort,
|
||||
uint32 optionFlags);
|
||||
|
||||
static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
|
||||
int32 sourceNodePort,
|
||||
char *targetNodeName, int32 targetNodePort);
|
||||
char *targetNodeName, int32 targetNodePort,
|
||||
uint32 optionFlags);
|
||||
static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName,
|
||||
int32 sourceNodePort, const char *targetNodeName,
|
||||
int32 targetNodePort);
|
||||
|
@ -174,6 +176,7 @@ PG_FUNCTION_INFO_V1(master_copy_shard_placement);
|
|||
PG_FUNCTION_INFO_V1(citus_move_shard_placement);
|
||||
PG_FUNCTION_INFO_V1(citus_move_shard_placement_with_nodeid);
|
||||
PG_FUNCTION_INFO_V1(master_move_shard_placement);
|
||||
PG_FUNCTION_INFO_V1(citus_copy_one_shard_placement);
|
||||
|
||||
double DesiredPercentFreeAfterMove = 10;
|
||||
bool CheckAvailableSpaceBeforeMove = true;
|
||||
|
@ -203,7 +206,7 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS)
|
|||
|
||||
TransferShards(shardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
shardReplicationMode, SHARD_TRANSFER_COPY);
|
||||
shardReplicationMode, SHARD_TRANSFER_COPY, 0);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -232,7 +235,7 @@ citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
|
|||
|
||||
TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort,
|
||||
targetNode->workerName, targetNode->workerPort,
|
||||
shardReplicationMode, SHARD_TRANSFER_COPY);
|
||||
shardReplicationMode, SHARD_TRANSFER_COPY, 0);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -267,13 +270,41 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
|
|||
|
||||
TransferShards(shardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
shardReplicationMode, SHARD_TRANSFER_COPY);
|
||||
shardReplicationMode, SHARD_TRANSFER_COPY, 0);
|
||||
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* master_copy_shard_placement is a wrapper function for old UDF name.
|
||||
*/
|
||||
Datum
|
||||
citus_copy_one_shard_placement(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureCoordinator();
|
||||
|
||||
int64 shardId = PG_GETARG_INT64(0);
|
||||
uint32 sourceNodeId = PG_GETARG_INT32(1);
|
||||
uint32 targetNodeId = PG_GETARG_INT32(2);
|
||||
uint32 flags = PG_GETARG_INT32(3);
|
||||
Oid shardReplicationModeOid = PG_GETARG_OID(4);
|
||||
|
||||
bool missingOk = false;
|
||||
WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk);
|
||||
WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk);
|
||||
|
||||
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||
|
||||
TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort,
|
||||
targetNode->workerName, targetNode->workerPort,
|
||||
shardReplicationMode, SHARD_TRANSFER_COPY, flags);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/*
|
||||
* citus_move_shard_placement moves given shard (and its co-located shards) from one
|
||||
* node to the other node. To accomplish this it entirely recreates the table structure
|
||||
|
@ -315,7 +346,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
|||
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||
TransferShards(shardId, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
shardReplicationMode, SHARD_TRANSFER_MOVE);
|
||||
shardReplicationMode, SHARD_TRANSFER_MOVE, 0);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -343,12 +374,10 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
|
|||
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||
TransferShards(shardId, sourceNode->workerName,
|
||||
sourceNode->workerPort, targetNode->workerName,
|
||||
targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE);
|
||||
targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE, 0);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TransferShards is the function for shard transfers.
|
||||
*/
|
||||
|
@ -356,7 +385,7 @@ void
|
|||
TransferShards(int64 shardId, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort, char shardReplicationMode,
|
||||
ShardTransferType transferType)
|
||||
ShardTransferType transferType, uint32 optionFlags)
|
||||
{
|
||||
/* strings to be used in log messages */
|
||||
const char *operationName = ShardTransferTypeNames[transferType];
|
||||
|
@ -385,10 +414,25 @@ TransferShards(int64 shardId, char *sourceNodeName,
|
|||
|
||||
ErrorIfTargetNodeIsNotSafeForTransfer(targetNodeName, targetNodePort, transferType);
|
||||
|
||||
AcquirePlacementColocationLock(distributedTableId, ExclusiveLock, operationName);
|
||||
AcquirePlacementColocationLock(distributedTableId, RowExclusiveLock, operationName);
|
||||
|
||||
List *colocatedTableList = ColocatedTableList(distributedTableId);
|
||||
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||
List *colocatedTableList;
|
||||
List *colocatedShardList;
|
||||
|
||||
/*
|
||||
* If SHARD_TRANSFER_SINGLE_SHARD_ONLY is set, we only transfer the shard
|
||||
* specified by shardId. Otherwise, we transfer all colocated shards.
|
||||
*/
|
||||
if (optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY)
|
||||
{
|
||||
colocatedTableList = list_make1_oid(distributedTableId);
|
||||
colocatedShardList = list_make1(shardInterval);
|
||||
}
|
||||
else
|
||||
{
|
||||
colocatedTableList = ColocatedTableList(distributedTableId);
|
||||
colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||
}
|
||||
|
||||
EnsureTableListOwner(colocatedTableList);
|
||||
|
||||
|
@ -412,19 +456,52 @@ TransferShards(int64 shardId, char *sourceNodeName,
|
|||
|
||||
/*
|
||||
* We sort shardIntervalList so that lock operations will not cause any
|
||||
* deadlocks.
|
||||
* deadlocks. But we do not need to do that if the list contain only one
|
||||
* shard.
|
||||
*/
|
||||
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
||||
if (!(optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY))
|
||||
{
|
||||
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
||||
}
|
||||
|
||||
if (TransferAlreadyCompleted(colocatedShardList,
|
||||
sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
transferType))
|
||||
bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList,
|
||||
sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort,
|
||||
transferType);
|
||||
|
||||
/*
|
||||
* If we just need to create the shard relationships,We don't need to do anything
|
||||
* else other than calling CopyShardTables with SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY
|
||||
* flag.
|
||||
*/
|
||||
if (optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY)
|
||||
{
|
||||
if (!transferAlreadyCompleted)
|
||||
{
|
||||
/*
|
||||
* if the transfer is not completed, and we are here just to create
|
||||
* the relationships, we can return right away
|
||||
*/
|
||||
ereport(WARNING, (errmsg("shard is not present on node %s:%d",
|
||||
targetNodeName, targetNodePort),
|
||||
errdetail("%s may have not completed.",
|
||||
operationNameCapitalized)));
|
||||
return;
|
||||
}
|
||||
|
||||
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName,
|
||||
targetNodePort, (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL),
|
||||
operationFunctionName, optionFlags);
|
||||
/* We don't need to do anything else, just return */
|
||||
return;
|
||||
}
|
||||
|
||||
if (transferAlreadyCompleted)
|
||||
{
|
||||
/* if the transfer is already completed, we can return right away */
|
||||
ereport(WARNING, (errmsg("shard is already present on node %s:%d",
|
||||
targetNodeName, targetNodePort),
|
||||
errdetail("%s may have already completed.",
|
||||
targetNodeName, targetNodePort),
|
||||
errdetail("%s may have already completed.",
|
||||
operationNameCapitalized)));
|
||||
return;
|
||||
}
|
||||
|
@ -515,7 +592,7 @@ TransferShards(int64 shardId, char *sourceNodeName,
|
|||
}
|
||||
|
||||
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName,
|
||||
targetNodePort, useLogicalReplication, operationFunctionName);
|
||||
targetNodePort, useLogicalReplication, operationFunctionName, optionFlags);
|
||||
|
||||
if (transferType == SHARD_TRANSFER_MOVE)
|
||||
{
|
||||
|
@ -573,7 +650,6 @@ TransferShards(int64 shardId, char *sourceNodeName,
|
|||
FinalizeCurrentProgressMonitor();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Insert deferred cleanup records.
|
||||
* The shards will be dropped by background cleaner later.
|
||||
|
@ -1131,9 +1207,9 @@ BlockWritesToShardList(List *shardList)
|
|||
* asynchronous shard copy in case of cascading DML operations.
|
||||
*/
|
||||
LockReferencedReferenceShardDistributionMetadata(shard->shardId,
|
||||
ExclusiveLock);
|
||||
RowExclusiveLock);
|
||||
|
||||
LockShardDistributionMetadata(shard->shardId, ExclusiveLock);
|
||||
LockShardDistributionMetadata(shard->shardId, RowExclusiveLock);
|
||||
}
|
||||
|
||||
/* following code relies on the list to have at least one shard */
|
||||
|
@ -1156,7 +1232,7 @@ BlockWritesToShardList(List *shardList)
|
|||
* Even if users disable metadata sync, we cannot allow them not to
|
||||
* acquire the remote locks. Hence, we have !IsCoordinator() check.
|
||||
*/
|
||||
LockShardListMetadataOnWorkers(ExclusiveLock, shardList);
|
||||
LockShardListMetadataOnWorkers(RowExclusiveLock, shardList);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1333,7 +1409,7 @@ ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList)
|
|||
static void
|
||||
CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort,
|
||||
char *targetNodeName, int32 targetNodePort, bool useLogicalReplication,
|
||||
const char *operationName)
|
||||
const char *operationName, uint32 optionFlags)
|
||||
{
|
||||
if (list_length(shardIntervalList) < 1)
|
||||
{
|
||||
|
@ -1347,12 +1423,12 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
|
|||
{
|
||||
CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName,
|
||||
sourceNodePort, targetNodeName,
|
||||
targetNodePort);
|
||||
targetNodePort, optionFlags);
|
||||
}
|
||||
else
|
||||
{
|
||||
CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort,
|
||||
targetNodeName, targetNodePort);
|
||||
targetNodeName, targetNodePort, optionFlags);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1369,7 +1445,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
|
|||
static void
|
||||
CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort)
|
||||
int32 targetNodePort, uint32 optionFlags)
|
||||
{
|
||||
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"CopyShardTablesViaLogicalReplication",
|
||||
|
@ -1437,7 +1513,7 @@ CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList)
|
|||
static void
|
||||
CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
|
||||
int32 sourceNodePort, char *targetNodeName,
|
||||
int32 targetNodePort)
|
||||
int32 targetNodePort, uint32 optionFlags)
|
||||
{
|
||||
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"CopyShardTablesViaBlockWrites",
|
||||
|
@ -1446,127 +1522,141 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
|
|||
|
||||
WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
|
||||
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);
|
||||
|
||||
/* iterate through the colocated shards and copy each */
|
||||
ShardInterval *shardInterval = NULL;
|
||||
foreach_declared_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
/*
|
||||
* For each shard we first create the shard table in a separate
|
||||
* transaction and then we copy the data and create the indexes in a
|
||||
* second separate transaction. The reason we don't do both in a single
|
||||
* transaction is so we can see the size of the new shard growing
|
||||
* during the copy when we run get_rebalance_progress in another
|
||||
* session. If we wouldn't split these two phases up, then the table
|
||||
* wouldn't be visible in the session that get_rebalance_progress uses.
|
||||
* So get_rebalance_progress would always report its size as 0.
|
||||
*/
|
||||
List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName,
|
||||
sourceNodePort);
|
||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||
|
||||
/* drop the shard we created on the target, in case of failure */
|
||||
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
|
||||
ConstructQualifiedShardName(shardInterval),
|
||||
GroupForNode(targetNodeName,
|
||||
targetNodePort),
|
||||
CLEANUP_ON_FAILURE);
|
||||
|
||||
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
|
||||
tableOwner, ddlCommandList);
|
||||
}
|
||||
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
shardIntervalList,
|
||||
sourceNodeName,
|
||||
sourceNodePort,
|
||||
PLACEMENT_UPDATE_STATUS_COPYING_DATA);
|
||||
|
||||
ConflictWithIsolationTestingBeforeCopy();
|
||||
CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL);
|
||||
ConflictWithIsolationTestingAfterCopy();
|
||||
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
shardIntervalList,
|
||||
sourceNodeName,
|
||||
sourceNodePort,
|
||||
PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS);
|
||||
|
||||
foreach_declared_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
List *ddlCommandList =
|
||||
PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
|
||||
sourceNodePort);
|
||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
|
||||
tableOwner, ddlCommandList);
|
||||
|
||||
MemoryContextReset(localContext);
|
||||
}
|
||||
|
||||
/*
|
||||
* Once all shards are copied, we can recreate relationships between shards.
|
||||
* Create DDL commands to Attach child tables to their parents in a partitioning hierarchy.
|
||||
* If we’re only asked to create the relationships, the shards are already
|
||||
* present and populated on the node. Skip the table‑setup and data‑loading
|
||||
* steps and proceed straight to creating the relationships.
|
||||
*/
|
||||
List *shardIntervalWithDDCommandsList = NIL;
|
||||
foreach_declared_ptr(shardInterval, shardIntervalList)
|
||||
if (!(optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY))
|
||||
{
|
||||
if (PartitionTable(shardInterval->relationId))
|
||||
/* iterate through the colocated shards and copy each */
|
||||
foreach_declared_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
char *attachPartitionCommand =
|
||||
GenerateAttachShardPartitionCommand(shardInterval);
|
||||
/*
|
||||
* For each shard we first create the shard table in a separate
|
||||
* transaction and then we copy the data and create the indexes in a
|
||||
* second separate transaction. The reason we don't do both in a single
|
||||
* transaction is so we can see the size of the new shard growing
|
||||
* during the copy when we run get_rebalance_progress in another
|
||||
* session. If we wouldn't split these two phases up, then the table
|
||||
* wouldn't be visible in the session that get_rebalance_progress uses.
|
||||
* So get_rebalance_progress would always report its size as 0.
|
||||
*/
|
||||
List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName,
|
||||
sourceNodePort);
|
||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||
|
||||
ShardCommandList *shardCommandList = CreateShardCommandList(
|
||||
shardInterval,
|
||||
list_make1(attachPartitionCommand));
|
||||
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
|
||||
shardCommandList);
|
||||
/* drop the shard we created on the target, in case of failure */
|
||||
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
|
||||
ConstructQualifiedShardName(shardInterval),
|
||||
GroupForNode(targetNodeName,
|
||||
targetNodePort),
|
||||
CLEANUP_ON_FAILURE);
|
||||
|
||||
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
|
||||
tableOwner, ddlCommandList);
|
||||
}
|
||||
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
shardIntervalList,
|
||||
sourceNodeName,
|
||||
sourceNodePort,
|
||||
PLACEMENT_UPDATE_STATUS_COPYING_DATA);
|
||||
|
||||
ConflictWithIsolationTestingBeforeCopy();
|
||||
CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL);
|
||||
ConflictWithIsolationTestingAfterCopy();
|
||||
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
shardIntervalList,
|
||||
sourceNodeName,
|
||||
sourceNodePort,
|
||||
PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS);
|
||||
|
||||
foreach_declared_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
List *ddlCommandList =
|
||||
PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
|
||||
sourceNodePort);
|
||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
|
||||
tableOwner, ddlCommandList);
|
||||
|
||||
MemoryContextReset(localContext);
|
||||
}
|
||||
}
|
||||
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
shardIntervalList,
|
||||
sourceNodeName,
|
||||
sourceNodePort,
|
||||
PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS);
|
||||
|
||||
/*
|
||||
* Iterate through the colocated shards and create DDL commamnds
|
||||
* to create the foreign constraints.
|
||||
* Skip creating shard relationships if the caller has requested that they
|
||||
* not be created.
|
||||
*/
|
||||
foreach_declared_ptr(shardInterval, shardIntervalList)
|
||||
if (!(optionFlags & SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS))
|
||||
{
|
||||
List *shardForeignConstraintCommandList = NIL;
|
||||
List *referenceTableForeignConstraintList = NIL;
|
||||
/*
|
||||
* Once all shards are copied, we can recreate relationships between shards.
|
||||
* Create DDL commands to Attach child tables to their parents in a partitioning hierarchy.
|
||||
*/
|
||||
List *shardIntervalWithDDCommandsList = NIL;
|
||||
foreach_declared_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
if (PartitionTable(shardInterval->relationId))
|
||||
{
|
||||
char *attachPartitionCommand =
|
||||
GenerateAttachShardPartitionCommand(shardInterval);
|
||||
|
||||
CopyShardForeignConstraintCommandListGrouped(shardInterval,
|
||||
&shardForeignConstraintCommandList,
|
||||
&referenceTableForeignConstraintList);
|
||||
ShardCommandList *shardCommandList = CreateShardCommandList(
|
||||
shardInterval,
|
||||
list_make1(attachPartitionCommand));
|
||||
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
|
||||
shardCommandList);
|
||||
}
|
||||
}
|
||||
|
||||
ShardCommandList *shardCommandList = CreateShardCommandList(
|
||||
shardInterval,
|
||||
list_concat(shardForeignConstraintCommandList,
|
||||
referenceTableForeignConstraintList));
|
||||
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
|
||||
shardCommandList);
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
shardIntervalList,
|
||||
sourceNodeName,
|
||||
sourceNodePort,
|
||||
PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS);
|
||||
|
||||
/*
|
||||
* Iterate through the colocated shards and create DDL commamnds
|
||||
* to create the foreign constraints.
|
||||
*/
|
||||
foreach_declared_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
List *shardForeignConstraintCommandList = NIL;
|
||||
List *referenceTableForeignConstraintList = NIL;
|
||||
|
||||
CopyShardForeignConstraintCommandListGrouped(shardInterval,
|
||||
&shardForeignConstraintCommandList,
|
||||
&referenceTableForeignConstraintList);
|
||||
|
||||
ShardCommandList *shardCommandList = CreateShardCommandList(
|
||||
shardInterval,
|
||||
list_concat(shardForeignConstraintCommandList,
|
||||
referenceTableForeignConstraintList));
|
||||
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
|
||||
shardCommandList);
|
||||
}
|
||||
|
||||
/* Now execute the Partitioning & Foreign constraints creation commads. */
|
||||
ShardCommandList *shardCommandList = NULL;
|
||||
foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList)
|
||||
{
|
||||
char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId);
|
||||
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
|
||||
tableOwner,
|
||||
shardCommandList->ddlCommandList);
|
||||
}
|
||||
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
shardIntervalList,
|
||||
sourceNodeName,
|
||||
sourceNodePort,
|
||||
PLACEMENT_UPDATE_STATUS_COMPLETING);
|
||||
}
|
||||
|
||||
/* Now execute the Partitioning & Foreign constraints creation commads. */
|
||||
ShardCommandList *shardCommandList = NULL;
|
||||
foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList)
|
||||
{
|
||||
char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId);
|
||||
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
|
||||
tableOwner,
|
||||
shardCommandList->ddlCommandList);
|
||||
}
|
||||
|
||||
UpdatePlacementUpdateStatusForShardIntervalList(
|
||||
shardIntervalList,
|
||||
sourceNodeName,
|
||||
sourceNodePort,
|
||||
PLACEMENT_UPDATE_STATUS_COMPLETING);
|
||||
|
||||
MemoryContextReset(localContext);
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
|
|
@ -1925,7 +1925,7 @@ RegisterCitusConfigVariables(void)
|
|||
"for scheduled background tasks that involve a particular node"),
|
||||
NULL,
|
||||
&MaxBackgroundTaskExecutorsPerNode,
|
||||
1, 1, 128,
|
||||
4, 1, 128,
|
||||
PGC_SIGHUP,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
|
|
@ -51,3 +51,4 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
|
|||
#include "udfs/citus_stat_counters/13.1-1.sql"
|
||||
#include "udfs/citus_stat_counters_reset/13.1-1.sql"
|
||||
#include "udfs/citus_nodes/13.1-1.sql"
|
||||
#include "udfs/citus_copy_shard_placement/13.1-1.sql"
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_copy_one_shard_placement(
|
||||
shard_id bigint,
|
||||
source_node_id integer,
|
||||
target_node_id integer,
|
||||
flags integer,
|
||||
transfer_mode citus.shard_transfer_mode default 'auto')
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_copy_one_shard_placement$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_copy_one_shard_placement(
|
||||
shard_id bigint,
|
||||
source_node_id integer,
|
||||
target_node_id integer,
|
||||
flags integer,
|
||||
transfer_mode citus.shard_transfer_mode)
|
||||
IS 'copy a single shard from the source node to the destination node';
|
|
@ -125,7 +125,7 @@ static volatile sig_atomic_t GotSighup = false;
|
|||
|
||||
/* keeping track of parallel background tasks per node */
|
||||
HTAB *ParallelTasksPerNode = NULL;
|
||||
int MaxBackgroundTaskExecutorsPerNode = 1;
|
||||
int MaxBackgroundTaskExecutorsPerNode = 4;
|
||||
|
||||
PG_FUNCTION_INFO_V1(citus_job_cancel);
|
||||
PG_FUNCTION_INFO_V1(citus_job_wait);
|
||||
|
|
|
@ -41,6 +41,8 @@
|
|||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "distributed/shard_transfer.h"
|
||||
|
||||
|
||||
/* local function forward declarations */
|
||||
static List * WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode);
|
||||
|
@ -131,6 +133,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
|
|||
* DROP TABLE and create_reference_table calls so that the list of reference tables we
|
||||
* operate on are stable.
|
||||
*
|
||||
*
|
||||
* Since the changes to the reference table placements are made via loopback
|
||||
* connections we release the locks held at the end of this function. Due to Citus
|
||||
* only running transactions in READ COMMITTED mode we can be sure that other
|
||||
|
@ -179,7 +182,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
|
|||
if (list_length(newWorkersList) == 0)
|
||||
{
|
||||
/*
|
||||
* All workers alreaddy have a copy of the reference tables, make sure that
|
||||
* All workers already have a copy of the reference tables, make sure that
|
||||
* any locks obtained earlier are released. It will probably not matter, but
|
||||
* we release the locks in the reverse order we obtained them in.
|
||||
*/
|
||||
|
@ -293,6 +296,295 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes is essentially a
|
||||
* twin of EnsureReferenceTablesExistOnAllNodesExtended. The difference is instead of
|
||||
* copying the missing tables on to the worker nodes this function creates the background tasks
|
||||
* for each required copy operation and schedule it in the background job.
|
||||
* Another difference is that instead of moving all the colocated shards sequencially
|
||||
* this function creates a seperate background task for each shard, even when the shards
|
||||
* are part of same colocated shard group.
|
||||
*
|
||||
* For transfering the shards in parallel the function creates a task for each shard
|
||||
* move and than schedules another task that creates the shard relationships (if any)
|
||||
* between shards and that task wait for the completion of all shard transfer tasks.
|
||||
*
|
||||
* The function returns an array of task ids that are created for creating the shard
|
||||
* relationships, effectively completion of these tasks signals the completion of
|
||||
* of reference table setup on the worker nodes. Any process that needs to wait for
|
||||
* the completion of the reference table setup can wait for these tasks to complete.
|
||||
*
|
||||
* The transferMode is passed to this function gets ignored for now and it only uses
|
||||
* block write mode.
|
||||
*/
|
||||
int64 *
|
||||
ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode, int *nDependTasks)
|
||||
{
|
||||
List *referenceTableIdList = NIL;
|
||||
uint64 shardId = INVALID_SHARD_ID;
|
||||
List *newWorkersList = NIL;
|
||||
int64 *dependsTaskArray = NULL;
|
||||
const char *referenceTableName = NULL;
|
||||
int colocationId = GetReferenceTableColocationId();
|
||||
|
||||
*nDependTasks = 0;
|
||||
if (colocationId == INVALID_COLOCATION_ID)
|
||||
{
|
||||
/* we have no reference table yet. */
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Most of the time this function should result in a conclusion where we do not need
|
||||
* to copy any reference tables. To prevent excessive locking the majority of the time
|
||||
* we run our precondition checks first with a lower lock. If, after checking with the
|
||||
* lower lock, that we might need to copy reference tables we check with a more
|
||||
* aggressive and self conflicting lock. It is important to be self conflicting in the
|
||||
* second run to make sure that two concurrent calls to this routine will actually not
|
||||
* run concurrently after the initial check.
|
||||
*
|
||||
* If after two iterations of precondition checks we still find the need for copying
|
||||
* reference tables we exit the loop with all locks held. This will prevent concurrent
|
||||
* DROP TABLE and create_reference_table calls so that the list of reference tables we
|
||||
* operate on are stable.
|
||||
*
|
||||
*
|
||||
* Since the changes to the reference table placements are made via loopback
|
||||
* connections we release the locks held at the end of this function. Due to Citus
|
||||
* only running transactions in READ COMMITTED mode we can be sure that other
|
||||
* transactions correctly find the metadata entries.
|
||||
*/
|
||||
LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock };
|
||||
for (int lockmodeIndex = 0; lockmodeIndex < lengthof(lockmodes); lockmodeIndex++)
|
||||
{
|
||||
LockColocationId(colocationId, lockmodes[lockmodeIndex]);
|
||||
|
||||
referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
|
||||
if (referenceTableIdList == NIL)
|
||||
{
|
||||
/*
|
||||
* No reference tables exist, make sure that any locks obtained earlier are
|
||||
* released. It will probably not matter, but we release the locks in the
|
||||
* reverse order we obtained them in.
|
||||
*/
|
||||
for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0;
|
||||
releaseLockmodeIndex--)
|
||||
{
|
||||
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
Oid referenceTableId = linitial_oid(referenceTableIdList);
|
||||
referenceTableName = get_rel_name(referenceTableId);
|
||||
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
|
||||
if (list_length(shardIntervalList) == 0)
|
||||
{
|
||||
/* check for corrupt metadata */
|
||||
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
|
||||
referenceTableName)));
|
||||
}
|
||||
|
||||
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
||||
shardId = shardInterval->shardId;
|
||||
|
||||
/*
|
||||
* We only take an access share lock, otherwise we'll hold up citus_add_node.
|
||||
* In case of create_reference_table() where we don't want concurrent writes
|
||||
* to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
|
||||
*/
|
||||
newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock);
|
||||
if (list_length(newWorkersList) == 0)
|
||||
{
|
||||
/*
|
||||
* All workers already have a copy of the reference tables, make sure that
|
||||
* any locks obtained earlier are released. It will probably not matter, but
|
||||
* we release the locks in the reverse order we obtained them in.
|
||||
*/
|
||||
for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0;
|
||||
releaseLockmodeIndex--)
|
||||
{
|
||||
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* citus_copy_shard_placement triggers metadata sync-up, which tries to
|
||||
* acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement
|
||||
* in a separate connection. If we have modified pg_dist_node in the
|
||||
* current backend, this will cause a deadlock.
|
||||
*/
|
||||
if (TransactionModifiedNodeMetadata)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot replicate reference tables in a transaction "
|
||||
"that modified node metadata")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Modifications to reference tables in current transaction are not visible
|
||||
* to citus_copy_shard_placement, since it is done in a separate backend.
|
||||
*/
|
||||
if (AnyRelationsModifiedInTransaction(referenceTableIdList))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot replicate reference tables in a transaction "
|
||||
"that modified a reference table")));
|
||||
}
|
||||
|
||||
bool missingOk = false;
|
||||
ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk);
|
||||
if (sourceShardPlacement == NULL)
|
||||
{
|
||||
/* check for corrupt metadata */
|
||||
ereport(ERROR, (errmsg("reference table shard "
|
||||
UINT64_FORMAT
|
||||
" does not have an active shard placement",
|
||||
shardId)));
|
||||
}
|
||||
|
||||
WorkerNode *newWorkerNode = NULL;
|
||||
BackgroundTask *task = NULL;
|
||||
StringInfoData buf = { 0 };
|
||||
initStringInfo(&buf);
|
||||
List *depTasksList = NIL;
|
||||
const char *transferModeString = "block_writes"; /*For now we only support block writes*/
|
||||
|
||||
foreach_declared_ptr(newWorkerNode, newWorkersList)
|
||||
{
|
||||
ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...",
|
||||
referenceTableName, newWorkerNode->workerName,
|
||||
newWorkerNode->workerPort)));
|
||||
|
||||
Oid relationId = InvalidOid;
|
||||
List *nodeTasksList = NIL;
|
||||
|
||||
foreach_declared_oid(relationId, referenceTableIdList)
|
||||
{
|
||||
referenceTableName = get_rel_name(relationId);
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
/* Reference tables are supposed to have only one shard */
|
||||
if (list_length(shardIntervalList) != 1)
|
||||
{
|
||||
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
|
||||
referenceTableName)));
|
||||
}
|
||||
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
||||
shardId = shardInterval->shardId;
|
||||
|
||||
resetStringInfo(&buf);
|
||||
/*
|
||||
* In first step just create and load data in the shards but defer the creation
|
||||
* of the shard relationships to the next step.
|
||||
* The reason we want to defer the creation of the shard relationships is that
|
||||
* we want to make sure that all the parallel shard copy task are finished
|
||||
* before we create the relationships. Otherwise we might end up with
|
||||
* a situation where the dependent-shard task is still running and trying to
|
||||
* create the shard relationships will result in ERROR.
|
||||
*/
|
||||
uint32 shardTransferFlags = SHARD_TRANSFER_SINGLE_SHARD_ONLY |
|
||||
SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS;
|
||||
|
||||
appendStringInfo(&buf,
|
||||
"SELECT pg_catalog.citus_copy_one_shard_placement(%ld,%u,%u,%u,%s)",
|
||||
shardId,
|
||||
sourceShardPlacement->nodeId,
|
||||
newWorkerNode->nodeId,
|
||||
shardTransferFlags,
|
||||
quote_literal_cstr(transferModeString));
|
||||
|
||||
ereport(DEBUG2,
|
||||
(errmsg("replicating reference table '%s' to %s:%d ... QUERY= %s",
|
||||
referenceTableName, newWorkerNode->workerName,
|
||||
newWorkerNode->workerPort,buf.data)));
|
||||
|
||||
int nDepends = 0;
|
||||
int64 *dependsArray = NULL;
|
||||
int32 nodesInvolved[2] = { 0 };
|
||||
nodesInvolved[0] = sourceShardPlacement->nodeId;
|
||||
nodesInvolved[1] = newWorkerNode->nodeId;
|
||||
|
||||
task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data,
|
||||
nDepends,
|
||||
dependsArray, 2,
|
||||
nodesInvolved);
|
||||
|
||||
nodeTasksList = lappend(nodeTasksList,task);
|
||||
}
|
||||
/* Create a task to create reference table relations on this node */
|
||||
if (list_length(nodeTasksList) > 0)
|
||||
{
|
||||
int nDepends = list_length(nodeTasksList);
|
||||
int32 nodesInvolved[2] = { 0 };
|
||||
nodesInvolved[0] = sourceShardPlacement->nodeId;
|
||||
nodesInvolved[1] = newWorkerNode->nodeId;
|
||||
int64 *dependsArray = palloc(sizeof(int64) * list_length(nodeTasksList));
|
||||
int idx = 0;
|
||||
foreach_declared_ptr(task, nodeTasksList)
|
||||
{
|
||||
dependsArray[idx++] = task->taskid;
|
||||
}
|
||||
resetStringInfo(&buf);
|
||||
uint32 shardTransferFlags = SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY;
|
||||
|
||||
appendStringInfo(&buf,
|
||||
"SELECT pg_catalog.citus_copy_one_shard_placement(%ld,%u,%u,%u,%s)",
|
||||
shardId,
|
||||
sourceShardPlacement->nodeId,
|
||||
newWorkerNode->nodeId,
|
||||
shardTransferFlags,
|
||||
quote_literal_cstr(transferModeString));
|
||||
|
||||
ereport(DEBUG2,
|
||||
(errmsg("creating relations for reference table '%s' on %s:%d ... QUERY= %s",
|
||||
referenceTableName, newWorkerNode->workerName,
|
||||
newWorkerNode->workerPort,buf.data)));
|
||||
|
||||
task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data,
|
||||
nDepends,
|
||||
dependsArray, 2,
|
||||
nodesInvolved);
|
||||
|
||||
depTasksList = lappend(depTasksList, task);
|
||||
|
||||
pfree(dependsArray);
|
||||
list_free(nodeTasksList);
|
||||
nodeTasksList = NIL;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* compute a dependent task list array to be used to indicate the completion of all
|
||||
* reference table shards copy, so that we can start with distributed shard copy
|
||||
*/
|
||||
if (list_length(depTasksList) > 0)
|
||||
{
|
||||
*nDependTasks = list_length(depTasksList);
|
||||
dependsTaskArray = palloc(sizeof(int64) * *nDependTasks);
|
||||
int idx = 0;
|
||||
foreach_declared_ptr(task, depTasksList)
|
||||
{
|
||||
dependsTaskArray[idx++] = task->taskid;
|
||||
}
|
||||
list_free(depTasksList);
|
||||
}
|
||||
|
||||
/*
|
||||
* Since reference tables have been copied via a loopback connection we do not have to
|
||||
* retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure
|
||||
* that other transactions will find the reference tables copied.
|
||||
* We have obtained and held multiple locks, here we unlock them all in the reverse
|
||||
* order we have obtained them in.
|
||||
*/
|
||||
for (int releaseLockmodeIndex = lengthof(lockmodes) - 1; releaseLockmodeIndex >= 0;
|
||||
releaseLockmodeIndex--)
|
||||
{
|
||||
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
|
||||
}
|
||||
return dependsTaskArray;
|
||||
}
|
||||
|
||||
/*
|
||||
* HasNodesWithMissingReferenceTables checks if all reference tables are already copied to
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
|
||||
extern void EnsureReferenceTablesExistOnAllNodes(void);
|
||||
extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode);
|
||||
extern int64 *ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode, int *nDependTasks);
|
||||
extern bool HasNodesWithMissingReferenceTables(List **referenceTableList);
|
||||
extern uint32 CreateReferenceTableColocationId(void);
|
||||
extern uint32 GetReferenceTableColocationId(void);
|
||||
|
|
|
@ -57,7 +57,8 @@ typedef enum CitusOperations
|
|||
CITUS_NONBLOCKING_SPLIT = 1,
|
||||
CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY = 2,
|
||||
CITUS_CREATE_COLOCATION_DEFAULT = 3,
|
||||
CITUS_BACKGROUND_TASK_MONITOR = 4
|
||||
CITUS_BACKGROUND_TASK_MONITOR = 4,
|
||||
CITUS_REBALANCE_OPERATION = 5
|
||||
} CitusOperations;
|
||||
|
||||
/* reuse advisory lock, but with different, unused field 4 (4)*/
|
||||
|
|
|
@ -23,10 +23,51 @@ typedef enum
|
|||
SHARD_TRANSFER_COPY = 2
|
||||
} ShardTransferType;
|
||||
|
||||
/*
|
||||
* ShardTransferOperationMode is used to pass flags to the shard transfer
|
||||
* function. The flags are used to control the behavior of the transfer
|
||||
* function.
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
/*
|
||||
* This flag instructs the transfer function to only transfer single shard
|
||||
* rather than transfer all the colocated shards for the shard interval.
|
||||
* Using this flag mean we might break the colocated shard
|
||||
* relationship on the source node. So this is only usefull when setting up
|
||||
* the new node and we are sure that the node would not be used until we have
|
||||
* transfered all the shards.
|
||||
* The reason we need this flag is that we want to be able to transfer
|
||||
* colocated shards in parallel and for now it is only used for the reference
|
||||
* table shards.
|
||||
* Finally if you are using this flag, you should also use consider defering
|
||||
* the creation of the relationships on the source node until all colocated
|
||||
* shards are transfered (see: SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS).
|
||||
*/
|
||||
SHARD_TRANSFER_SINGLE_SHARD_ONLY = 1 << 0,
|
||||
/* With this flag the shard transfer function does not create any constrainsts
|
||||
* or foreign relations defined on the shard, This can be used to defer the
|
||||
* creation of the relationships until all the shards are transfered.
|
||||
* This is usefull when we are transfering colocated shards in parallel and
|
||||
* we want to avoid the creation of the relationships on the source node
|
||||
* until all the shards are transfered.
|
||||
*/
|
||||
SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS = 1 << 1,
|
||||
|
||||
/* This flag is used to indicate that the shard transfer function should
|
||||
* only create the relationships on the target node and not transfer any data.
|
||||
* This is can be used to create the relationships that were defered
|
||||
* during the transfering of shards.
|
||||
*/
|
||||
SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY = 1 << 2
|
||||
} ShardTransferOperationMode;
|
||||
|
||||
|
||||
extern void TransferShards(int64 shardId,
|
||||
char *sourceNodeName, int32 sourceNodePort,
|
||||
char *targetNodeName, int32 targetNodePort,
|
||||
char shardReplicationMode, ShardTransferType transferType);
|
||||
char shardReplicationMode, ShardTransferType transferType,
|
||||
uint32 optionFlags);
|
||||
extern uint64 ShardListSizeInBytes(List *colocatedShardList,
|
||||
char *workerNodeName, uint32 workerNodePort);
|
||||
extern void ErrorIfMoveUnsupportedTableType(Oid relationId);
|
||||
|
|
Loading…
Reference in New Issue