pull/7983/merge
Muhammad Usama 2025-05-28 17:04:44 +03:00 committed by GitHub
commit 0b86830ab5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 662 additions and 226 deletions

View File

@ -220,7 +220,6 @@ typedef struct ShardMoveSourceNodeHashEntry
*/
typedef struct ShardMoveDependencies
{
HTAB *colocationDependencies;
HTAB *nodeDependencies;
} ShardMoveDependencies;
@ -274,6 +273,7 @@ static int64 RebalanceTableShardsBackground(RebalanceOptions *options, Oid
static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName);
static void ExecutePlacementUpdates(List *placementUpdateList, Oid
shardReplicationModeOid, char *noticeOperation);
static void AcquireRebalanceOperationLock(const char *operationName);
static float4 CalculateUtilization(float4 totalCost, float4 capacity);
static Form_pg_dist_rebalance_strategy GetRebalanceStrategy(Name name);
static void EnsureShardCostUDF(Oid functionOid);
@ -297,8 +297,9 @@ static void ErrorOnConcurrentRebalance(RebalanceOptions *);
static List * GetSetCommandListForNewConnections(void);
static int64 GetColocationId(PlacementUpdateEvent *move);
static ShardMoveDependencies InitializeShardMoveDependencies();
static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64
colocationId,
static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move,
int64 *refTablesDepTaskIds,
int refTablesDepTaskIdsCount,
ShardMoveDependencies shardMoveDependencies,
int *nDepends);
static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId,
@ -768,6 +769,33 @@ AcquireRebalanceColocationLock(Oid relationId, const char *operationName)
}
/*
* AcquireRebalanceOperationLock does not allow concurrent rebalance
* operations.
*/
static void
AcquireRebalanceOperationLock(const char *operationName)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = true;
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_REBALANCE_OPERATION);
LockAcquireResult lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock,
dontWait);
if (!lockAcquired)
{
ereport(ERROR, (errmsg("could not acquire the lock required for %s operation",
operationName),
errdetail("It means that either a concurrent shard move "
"or shard copy is happening."),
errhint("Make sure that the concurrent operation has "
"finished and re-run the command")));
}
}
/*
* AcquirePlacementColocationLock tries to acquire a lock for
* rebalance/replication while moving/copying the placement. If this
@ -1954,6 +1982,8 @@ ErrorOnConcurrentRebalance(RebalanceOptions *options)
AcquireRebalanceColocationLock(relationId, options->operationName);
}
AcquireRebalanceOperationLock(options->operationName);
int64 jobId = 0;
if (HasNonTerminalJobOfType("rebalance", &jobId))
{
@ -1991,10 +2021,6 @@ static ShardMoveDependencies
InitializeShardMoveDependencies()
{
ShardMoveDependencies shardMoveDependencies;
shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64,
ShardMoveDependencyInfo,
"colocationDependencyHashMap",
6);
shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32,
ShardMoveSourceNodeHashEntry,
"nodeDependencyHashMap",
@ -2008,7 +2034,8 @@ InitializeShardMoveDependencies()
* the move must take a dependency on, given the shard move dependencies as input.
*/
static int64 *
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 *refTablesDepTaskIds,
int refTablesDepTaskIdsCount,
ShardMoveDependencies shardMoveDependencies, int *nDepends)
{
HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64,
@ -2016,15 +2043,6 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
bool found;
/* Check if there exists a move in the same colocation group scheduled earlier. */
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found);
if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}
/*
* Check if there exists moves scheduled earlier whose source node
* overlaps with the current move's target node.
@ -2045,6 +2063,19 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
}
}
/*
* shard copy can only start after finishing copy of reference table shards
* so each shard task will have a dependency on the task that indicates the
* copy complete of reference tables
*/
while (refTablesDepTaskIdsCount > 0)
{
int64 refTableTaskId = *refTablesDepTaskIds;
hash_search(dependsList, &refTableTaskId, HASH_ENTER, NULL);
refTablesDepTaskIds++;
refTablesDepTaskIdsCount--;
}
*nDepends = hash_get_num_entries(dependsList);
int64 *dependsArray = NULL;
@ -2076,10 +2107,6 @@ static void
UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId,
ShardMoveDependencies shardMoveDependencies)
{
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;
bool found;
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER,
@ -2174,30 +2201,19 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
initStringInfo(&buf);
List *referenceTableIdList = NIL;
int64 replicateRefTablesTaskId = 0;
int64 *refTablesDepTaskIds = NULL;
int refTablesDepTaskIdsCount = 0;
if (HasNodesWithMissingReferenceTables(&referenceTableIdList))
{
if (shardTransferMode == TRANSFER_MODE_AUTOMATIC)
{
VerifyTablesHaveReplicaIdentity(referenceTableIdList);
}
/*
* Reference tables need to be copied to (newly-added) nodes, this needs to be the
* first task before we can move any other table.
*/
appendStringInfo(&buf,
"SELECT pg_catalog.replicate_reference_tables(%s)",
quote_literal_cstr(shardTranferModeLabel));
int32 nodesInvolved[] = { 0 };
/* replicate_reference_tables permissions require superuser */
Oid superUserId = CitusExtensionOwner();
BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0,
NULL, 0, nodesInvolved);
replicateRefTablesTaskId = task->taskid;
refTablesDepTaskIds = ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(
jobId, TRANSFER_MODE_BLOCK_WRITES, &refTablesDepTaskIdsCount);
ereport(DEBUG2,
(errmsg("%d dependent copy reference table tasks for job %ld",
refTablesDepTaskIdsCount, jobId),
errdetail("Rebalance scheduled as background job"),
errhint("To monitor progress, run: SELECT * FROM "
"citus_rebalance_status();")));
}
PlacementUpdateEvent *move = NULL;
@ -2219,17 +2235,11 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
int nDepends = 0;
int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId,
int64 *dependsArray = GenerateTaskMoveDependencyList(move, refTablesDepTaskIds,
refTablesDepTaskIdsCount,
shardMoveDependencies,
&nDepends);
if (nDepends == 0 && replicateRefTablesTaskId > 0)
{
nDepends = 1;
dependsArray = palloc(nDepends * sizeof(int64));
dependsArray[0] = replicateRefTablesTaskId;
}
int32 nodesInvolved[2] = { 0 };
nodesInvolved[0] = move->sourceNode->nodeId;
nodesInvolved[1] = move->targetNode->nodeId;

View File

@ -107,16 +107,18 @@ static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort,
static void CopyShardTables(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort, bool useLogicalReplication,
const char *operationName);
const char *operationName, uint32 optionFlags);
static void CopyShardTablesViaLogicalReplication(List *shardIntervalList,
char *sourceNodeName,
int32 sourceNodePort,
char *targetNodeName,
int32 targetNodePort);
int32 targetNodePort,
uint32 optionFlags);
static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort);
char *targetNodeName, int32 targetNodePort,
uint32 optionFlags);
static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName,
int32 sourceNodePort, const char *targetNodeName,
int32 targetNodePort);
@ -174,6 +176,7 @@ PG_FUNCTION_INFO_V1(master_copy_shard_placement);
PG_FUNCTION_INFO_V1(citus_move_shard_placement);
PG_FUNCTION_INFO_V1(citus_move_shard_placement_with_nodeid);
PG_FUNCTION_INFO_V1(master_move_shard_placement);
PG_FUNCTION_INFO_V1(citus_copy_one_shard_placement);
double DesiredPercentFreeAfterMove = 10;
bool CheckAvailableSpaceBeforeMove = true;
@ -203,7 +206,7 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS)
TransferShards(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
shardReplicationMode, SHARD_TRANSFER_COPY);
shardReplicationMode, SHARD_TRANSFER_COPY, 0);
PG_RETURN_VOID();
}
@ -232,7 +235,7 @@ citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort,
targetNode->workerName, targetNode->workerPort,
shardReplicationMode, SHARD_TRANSFER_COPY);
shardReplicationMode, SHARD_TRANSFER_COPY, 0);
PG_RETURN_VOID();
}
@ -267,13 +270,42 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
TransferShards(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
shardReplicationMode, SHARD_TRANSFER_COPY);
shardReplicationMode, SHARD_TRANSFER_COPY, 0);
PG_RETURN_VOID();
}
/*
* master_copy_shard_placement is a wrapper function for old UDF name.
*/
Datum
citus_copy_one_shard_placement(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
int64 shardId = PG_GETARG_INT64(0);
uint32 sourceNodeId = PG_GETARG_INT32(1);
uint32 targetNodeId = PG_GETARG_INT32(2);
uint32 flags = PG_GETARG_INT32(3);
Oid shardReplicationModeOid = PG_GETARG_OID(4);
bool missingOk = false;
WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk);
WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk);
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort,
targetNode->workerName, targetNode->workerPort,
shardReplicationMode, SHARD_TRANSFER_COPY, flags);
PG_RETURN_VOID();
}
/*
* citus_move_shard_placement moves given shard (and its co-located shards) from one
* node to the other node. To accomplish this it entirely recreates the table structure
@ -315,7 +347,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
TransferShards(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
shardReplicationMode, SHARD_TRANSFER_MOVE);
shardReplicationMode, SHARD_TRANSFER_MOVE, 0);
PG_RETURN_VOID();
}
@ -343,7 +375,7 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
TransferShards(shardId, sourceNode->workerName,
sourceNode->workerPort, targetNode->workerName,
targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE);
targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE, 0);
PG_RETURN_VOID();
}
@ -356,7 +388,7 @@ void
TransferShards(int64 shardId, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort, char shardReplicationMode,
ShardTransferType transferType)
ShardTransferType transferType, uint32 optionFlags)
{
/* strings to be used in log messages */
const char *operationName = ShardTransferTypeNames[transferType];
@ -385,20 +417,34 @@ TransferShards(int64 shardId, char *sourceNodeName,
ErrorIfTargetNodeIsNotSafeForTransfer(targetNodeName, targetNodePort, transferType);
AcquirePlacementColocationLock(distributedTableId, ExclusiveLock, operationName);
AcquirePlacementColocationLock(distributedTableId, RowExclusiveLock, operationName);
List *colocatedTableList = ColocatedTableList(distributedTableId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
List *colocatedTableList;
List *colocatedShardList;
/*
* If SHARD_TRANSFER_SINGLE_SHARD_ONLY is set, we only transfer the shard
* specified by shardId. Otherwise, we transfer all colocated shards.
*/
if (optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY)
{
colocatedTableList = list_make1_oid(distributedTableId);
colocatedShardList = list_make1(shardInterval);
}
else
{
colocatedTableList = ColocatedTableList(distributedTableId);
colocatedShardList = ColocatedShardIntervalList(shardInterval);
}
EnsureTableListOwner(colocatedTableList);
if (transferType == SHARD_TRANSFER_MOVE)
{
/*
* Block concurrent DDL / TRUNCATE commands on the relation. Similarly,
* block concurrent citus_move_shard_placement() on any shard of
* the same relation. This is OK for now since we're executing shard
* moves sequentially anyway.
* Block concurrent DDL / TRUNCATE commands on the relation. while,
* allow concurrent citus_move_shard_placement() on the shards of
* the same relation.
*/
LockColocatedRelationsForMove(colocatedTableList);
}
@ -412,14 +458,52 @@ TransferShards(int64 shardId, char *sourceNodeName,
/*
* We sort shardIntervalList so that lock operations will not cause any
* deadlocks.
* deadlocks. But we do not need to do that if the list contain only one
* shard.
*/
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
if (!(optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY))
{
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
}
if (TransferAlreadyCompleted(colocatedShardList,
sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
transferType))
bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList,
sourceNodeName,
sourceNodePort,
targetNodeName,
targetNodePort,
transferType);
/*
* If we just need to create the shard relationships,We don't need to do anything
* else other than calling CopyShardTables with SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY
* flag.
*/
if (optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY)
{
if (!transferAlreadyCompleted)
{
/*
* if the transfer is not completed, and we are here just to create
* the relationships, we can return right away
*/
ereport(WARNING, (errmsg("shard is not present on node %s:%d",
targetNodeName, targetNodePort),
errdetail("%s may have not completed.",
operationNameCapitalized)));
return;
}
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName
,
targetNodePort, (shardReplicationMode ==
TRANSFER_MODE_FORCE_LOGICAL),
operationFunctionName, optionFlags);
/* We don't need to do anything else, just return */
return;
}
if (transferAlreadyCompleted)
{
/* if the transfer is already completed, we can return right away */
ereport(WARNING, (errmsg("shard is already present on node %s:%d",
@ -515,7 +599,8 @@ TransferShards(int64 shardId, char *sourceNodeName,
}
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName,
targetNodePort, useLogicalReplication, operationFunctionName);
targetNodePort, useLogicalReplication, operationFunctionName,
optionFlags);
if (transferType == SHARD_TRANSFER_MOVE)
{
@ -676,7 +761,7 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN
/*
* LockColocatedRelationsForMove takes a list of relations, locks all of them
* using ShareUpdateExclusiveLock
* using ShareLock
*/
static void
LockColocatedRelationsForMove(List *colocatedTableList)
@ -684,7 +769,7 @@ LockColocatedRelationsForMove(List *colocatedTableList)
Oid colocatedTableId = InvalidOid;
foreach_declared_oid(colocatedTableId, colocatedTableList)
{
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
LockRelationOid(colocatedTableId, ShareLock);
}
}
@ -1131,9 +1216,9 @@ BlockWritesToShardList(List *shardList)
* asynchronous shard copy in case of cascading DML operations.
*/
LockReferencedReferenceShardDistributionMetadata(shard->shardId,
ExclusiveLock);
RowExclusiveLock);
LockShardDistributionMetadata(shard->shardId, ExclusiveLock);
LockShardDistributionMetadata(shard->shardId, RowExclusiveLock);
}
/* following code relies on the list to have at least one shard */
@ -1156,7 +1241,7 @@ BlockWritesToShardList(List *shardList)
* Even if users disable metadata sync, we cannot allow them not to
* acquire the remote locks. Hence, we have !IsCoordinator() check.
*/
LockShardListMetadataOnWorkers(ExclusiveLock, shardList);
LockShardListMetadataOnWorkers(RowExclusiveLock, shardList);
}
}
@ -1333,7 +1418,7 @@ ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList)
static void
CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort, bool useLogicalReplication,
const char *operationName)
const char *operationName, uint32 optionFlags)
{
if (list_length(shardIntervalList) < 1)
{
@ -1347,12 +1432,12 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
{
CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName,
sourceNodePort, targetNodeName,
targetNodePort);
targetNodePort, optionFlags);
}
else
{
CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort);
targetNodeName, targetNodePort, optionFlags);
}
/*
@ -1369,7 +1454,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
static void
CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort)
int32 targetNodePort, uint32 optionFlags)
{
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CopyShardTablesViaLogicalReplication",
@ -1437,7 +1522,7 @@ CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList)
static void
CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort)
int32 targetNodePort, uint32 optionFlags)
{
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CopyShardTablesViaBlockWrites",
@ -1446,127 +1531,145 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);
/* iterate through the colocated shards and copy each */
ShardInterval *shardInterval = NULL;
foreach_declared_ptr(shardInterval, shardIntervalList)
{
/*
* For each shard we first create the shard table in a separate
* transaction and then we copy the data and create the indexes in a
* second separate transaction. The reason we don't do both in a single
* transaction is so we can see the size of the new shard growing
* during the copy when we run get_rebalance_progress in another
* session. If we wouldn't split these two phases up, then the table
* wouldn't be visible in the session that get_rebalance_progress uses.
* So get_rebalance_progress would always report its size as 0.
*/
List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName,
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
/* drop the shard we created on the target, in case of failure */
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName,
targetNodePort),
CLEANUP_ON_FAILURE);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
}
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_COPYING_DATA);
ConflictWithIsolationTestingBeforeCopy();
CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL);
ConflictWithIsolationTestingAfterCopy();
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS);
foreach_declared_ptr(shardInterval, shardIntervalList)
{
List *ddlCommandList =
PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
MemoryContextReset(localContext);
}
/*
* Once all shards are copied, we can recreate relationships between shards.
* Create DDL commands to Attach child tables to their parents in a partitioning hierarchy.
* If were only asked to create the relationships, the shards are already
* present and populated on the node. Skip the tablesetup and dataloading
* steps and proceed straight to creating the relationships.
*/
List *shardIntervalWithDDCommandsList = NIL;
foreach_declared_ptr(shardInterval, shardIntervalList)
if (!(optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY))
{
if (PartitionTable(shardInterval->relationId))
/* iterate through the colocated shards and copy each */
foreach_declared_ptr(shardInterval, shardIntervalList)
{
char *attachPartitionCommand =
GenerateAttachShardPartitionCommand(shardInterval);
/*
* For each shard we first create the shard table in a separate
* transaction and then we copy the data and create the indexes in a
* second separate transaction. The reason we don't do both in a single
* transaction is so we can see the size of the new shard growing
* during the copy when we run get_rebalance_progress in another
* session. If we wouldn't split these two phases up, then the table
* wouldn't be visible in the session that get_rebalance_progress uses.
* So get_rebalance_progress would always report its size as 0.
*/
List *ddlCommandList = RecreateShardDDLCommandList(shardInterval,
sourceNodeName,
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
ShardCommandList *shardCommandList = CreateShardCommandList(
shardInterval,
list_make1(attachPartitionCommand));
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
shardCommandList);
/* drop the shard we created on the target, in case of failure */
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
GroupForNode(targetNodeName,
targetNodePort),
CLEANUP_ON_FAILURE);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
}
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_COPYING_DATA);
ConflictWithIsolationTestingBeforeCopy();
CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL);
ConflictWithIsolationTestingAfterCopy();
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS);
foreach_declared_ptr(shardInterval, shardIntervalList)
{
List *ddlCommandList =
PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
MemoryContextReset(localContext);
}
}
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS);
/*
* Iterate through the colocated shards and create DDL commamnds
* to create the foreign constraints.
* Skip creating shard relationships if the caller has requested that they
* not be created.
*/
foreach_declared_ptr(shardInterval, shardIntervalList)
if (!(optionFlags & SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS))
{
List *shardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL;
/*
* Once all shards are copied, we can recreate relationships between shards.
* Create DDL commands to Attach child tables to their parents in a partitioning hierarchy.
*/
List *shardIntervalWithDDCommandsList = NIL;
foreach_declared_ptr(shardInterval, shardIntervalList)
{
if (PartitionTable(shardInterval->relationId))
{
char *attachPartitionCommand =
GenerateAttachShardPartitionCommand(shardInterval);
CopyShardForeignConstraintCommandListGrouped(shardInterval,
&shardForeignConstraintCommandList,
&referenceTableForeignConstraintList);
ShardCommandList *shardCommandList = CreateShardCommandList(
shardInterval,
list_make1(attachPartitionCommand));
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
shardCommandList);
}
}
ShardCommandList *shardCommandList = CreateShardCommandList(
shardInterval,
list_concat(shardForeignConstraintCommandList,
referenceTableForeignConstraintList));
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
shardCommandList);
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS);
/*
* Iterate through the colocated shards and create DDL commamnds
* to create the foreign constraints.
*/
foreach_declared_ptr(shardInterval, shardIntervalList)
{
List *shardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL;
CopyShardForeignConstraintCommandListGrouped(shardInterval,
&
shardForeignConstraintCommandList,
&
referenceTableForeignConstraintList);
ShardCommandList *shardCommandList = CreateShardCommandList(
shardInterval,
list_concat(shardForeignConstraintCommandList,
referenceTableForeignConstraintList));
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
shardCommandList);
}
/* Now execute the Partitioning & Foreign constraints creation commads. */
ShardCommandList *shardCommandList = NULL;
foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList)
{
char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner,
shardCommandList->ddlCommandList);
}
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_COMPLETING);
}
/* Now execute the Partitioning & Foreign constraints creation commads. */
ShardCommandList *shardCommandList = NULL;
foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList)
{
char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner,
shardCommandList->ddlCommandList);
}
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_COMPLETING);
MemoryContextReset(localContext);
MemoryContextSwitchTo(oldContext);
}
@ -1647,7 +1750,8 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList,
MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */);
NULL /* jobIdList (ignored by API implementation) */
);
}

View File

@ -132,7 +132,6 @@ static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command);
static void WaitForMiliseconds(long timeout);
static XLogRecPtr GetSubscriptionPosition(
GroupedLogicalRepTargets *groupedLogicalRepTargets);
static void AcquireLogicalReplicationLock(void);
static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode,
List *shardIntervals);
@ -156,7 +155,6 @@ void
LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort,
char *targetNodeName, int targetNodePort)
{
AcquireLogicalReplicationLock();
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlags = FORCE_NEW_CONNECTION;
@ -268,6 +266,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
*/
CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash);
CloseConnection(sourceConnection);
}
@ -497,25 +496,6 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList)
}
/*
* AcquireLogicalReplicationLock tries to acquire a lock for logical
* replication. We need this lock, because at the start of logical replication
* we clean up old subscriptions and publications. Because of this cleanup it's
* not safe to run multiple logical replication based shard moves at the same
* time. If multiple logical replication moves would run at the same time, the
* second move might clean up subscriptions and publications that are in use by
* another move.
*/
static void
AcquireLogicalReplicationLock(void)
{
LOCKTAG tag;
SET_LOCKTAG_LOGICAL_REPLICATION(tag);
LockAcquire(&tag, ExclusiveLock, false, false);
}
/*
* PrepareReplicationSubscriptionList returns list of shards to be logically
* replicated from given shard list. This is needed because Postgres does not

View File

@ -1925,7 +1925,7 @@ RegisterCitusConfigVariables(void)
"for scheduled background tasks that involve a particular node"),
NULL,
&MaxBackgroundTaskExecutorsPerNode,
1, 1, 128,
4, 1, 128,
PGC_SIGHUP,
GUC_STANDARD,
NULL, NULL, NULL);

View File

@ -51,3 +51,4 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/citus_stat_counters/13.1-1.sql"
#include "udfs/citus_stat_counters_reset/13.1-1.sql"
#include "udfs/citus_nodes/13.1-1.sql"
#include "udfs/citus_copy_shard_placement/13.1-1.sql"

View File

@ -0,0 +1,16 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_copy_one_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
flags integer,
transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_copy_one_shard_placement$$;
COMMENT ON FUNCTION pg_catalog.citus_copy_one_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
flags integer,
transfer_mode citus.shard_transfer_mode)
IS 'copy a single shard from the source node to the destination node';

View File

@ -125,7 +125,7 @@ static volatile sig_atomic_t GotSighup = false;
/* keeping track of parallel background tasks per node */
HTAB *ParallelTasksPerNode = NULL;
int MaxBackgroundTaskExecutorsPerNode = 1;
int MaxBackgroundTaskExecutorsPerNode = 4;
PG_FUNCTION_INFO_V1(citus_job_cancel);
PG_FUNCTION_INFO_V1(citus_job_wait);

View File

@ -41,6 +41,8 @@
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "distributed/shard_transfer.h"
/* local function forward declarations */
static List * WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode);
@ -131,6 +133,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
* DROP TABLE and create_reference_table calls so that the list of reference tables we
* operate on are stable.
*
*
* Since the changes to the reference table placements are made via loopback
* connections we release the locks held at the end of this function. Due to Citus
* only running transactions in READ COMMITTED mode we can be sure that other
@ -179,7 +182,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
if (list_length(newWorkersList) == 0)
{
/*
* All workers alreaddy have a copy of the reference tables, make sure that
* All workers already have a copy of the reference tables, make sure that
* any locks obtained earlier are released. It will probably not matter, but
* we release the locks in the reverse order we obtained them in.
*/
@ -293,6 +296,295 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
}
}
/*
* ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes is essentially a
* twin of EnsureReferenceTablesExistOnAllNodesExtended. The difference is instead of
* copying the missing tables on to the worker nodes this function creates the background tasks
* for each required copy operation and schedule it in the background job.
* Another difference is that instead of moving all the colocated shards sequencially
* this function creates a seperate background task for each shard, even when the shards
* are part of same colocated shard group.
*
* For transfering the shards in parallel the function creates a task for each shard
* move and than schedules another task that creates the shard relationships (if any)
* between shards and that task wait for the completion of all shard transfer tasks.
*
* The function returns an array of task ids that are created for creating the shard
* relationships, effectively completion of these tasks signals the completion of
* of reference table setup on the worker nodes. Any process that needs to wait for
* the completion of the reference table setup can wait for these tasks to complete.
*
* The transferMode is passed to this function gets ignored for now and it only uses
* block write mode.
*/
int64 *
ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode, int *nDependTasks)
{
List *referenceTableIdList = NIL;
uint64 shardId = INVALID_SHARD_ID;
List *newWorkersList = NIL;
int64 *dependsTaskArray = NULL;
const char *referenceTableName = NULL;
int colocationId = GetReferenceTableColocationId();
*nDependTasks = 0;
if (colocationId == INVALID_COLOCATION_ID)
{
/* we have no reference table yet. */
return 0;
}
/*
* Most of the time this function should result in a conclusion where we do not need
* to copy any reference tables. To prevent excessive locking the majority of the time
* we run our precondition checks first with a lower lock. If, after checking with the
* lower lock, that we might need to copy reference tables we check with a more
* aggressive and self conflicting lock. It is important to be self conflicting in the
* second run to make sure that two concurrent calls to this routine will actually not
* run concurrently after the initial check.
*
* If after two iterations of precondition checks we still find the need for copying
* reference tables we exit the loop with all locks held. This will prevent concurrent
* DROP TABLE and create_reference_table calls so that the list of reference tables we
* operate on are stable.
*
*
* Since the changes to the reference table placements are made via loopback
* connections we release the locks held at the end of this function. Due to Citus
* only running transactions in READ COMMITTED mode we can be sure that other
* transactions correctly find the metadata entries.
*/
LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock };
for (int lockmodeIndex = 0; lockmodeIndex < lengthof(lockmodes); lockmodeIndex++)
{
LockColocationId(colocationId, lockmodes[lockmodeIndex]);
referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
if (referenceTableIdList == NIL)
{
/*
* No reference tables exist, make sure that any locks obtained earlier are
* released. It will probably not matter, but we release the locks in the
* reverse order we obtained them in.
*/
for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0;
releaseLockmodeIndex--)
{
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
}
return 0;
}
Oid referenceTableId = linitial_oid(referenceTableIdList);
referenceTableName = get_rel_name(referenceTableId);
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
if (list_length(shardIntervalList) == 0)
{
/* check for corrupt metadata */
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
referenceTableName)));
}
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
shardId = shardInterval->shardId;
/*
* We only take an access share lock, otherwise we'll hold up citus_add_node.
* In case of create_reference_table() where we don't want concurrent writes
* to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
*/
newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock);
if (list_length(newWorkersList) == 0)
{
/*
* All workers already have a copy of the reference tables, make sure that
* any locks obtained earlier are released. It will probably not matter, but
* we release the locks in the reverse order we obtained them in.
*/
for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0;
releaseLockmodeIndex--)
{
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
}
return 0;
}
}
/*
* citus_copy_shard_placement triggers metadata sync-up, which tries to
* acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement
* in a separate connection. If we have modified pg_dist_node in the
* current backend, this will cause a deadlock.
*/
if (TransactionModifiedNodeMetadata)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate reference tables in a transaction "
"that modified node metadata")));
}
/*
* Modifications to reference tables in current transaction are not visible
* to citus_copy_shard_placement, since it is done in a separate backend.
*/
if (AnyRelationsModifiedInTransaction(referenceTableIdList))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate reference tables in a transaction "
"that modified a reference table")));
}
bool missingOk = false;
ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk);
if (sourceShardPlacement == NULL)
{
/* check for corrupt metadata */
ereport(ERROR, (errmsg("reference table shard "
UINT64_FORMAT
" does not have an active shard placement",
shardId)));
}
WorkerNode *newWorkerNode = NULL;
BackgroundTask *task = NULL;
StringInfoData buf = { 0 };
initStringInfo(&buf);
List *depTasksList = NIL;
const char *transferModeString = "block_writes"; /*For now we only support block writes*/
foreach_declared_ptr(newWorkerNode, newWorkersList)
{
ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...",
referenceTableName, newWorkerNode->workerName,
newWorkerNode->workerPort)));
Oid relationId = InvalidOid;
List *nodeTasksList = NIL;
foreach_declared_oid(relationId, referenceTableIdList)
{
referenceTableName = get_rel_name(relationId);
List *shardIntervalList = LoadShardIntervalList(relationId);
/* Reference tables are supposed to have only one shard */
if (list_length(shardIntervalList) != 1)
{
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
referenceTableName)));
}
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
shardId = shardInterval->shardId;
resetStringInfo(&buf);
/*
* In first step just create and load data in the shards but defer the creation
* of the shard relationships to the next step.
* The reason we want to defer the creation of the shard relationships is that
* we want to make sure that all the parallel shard copy task are finished
* before we create the relationships. Otherwise we might end up with
* a situation where the dependent-shard task is still running and trying to
* create the shard relationships will result in ERROR.
*/
uint32 shardTransferFlags = SHARD_TRANSFER_SINGLE_SHARD_ONLY |
SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS;
appendStringInfo(&buf,
"SELECT pg_catalog.citus_copy_one_shard_placement(%ld,%u,%u,%u,%s)",
shardId,
sourceShardPlacement->nodeId,
newWorkerNode->nodeId,
shardTransferFlags,
quote_literal_cstr(transferModeString));
ereport(DEBUG2,
(errmsg("replicating reference table '%s' to %s:%d ... QUERY= %s",
referenceTableName, newWorkerNode->workerName,
newWorkerNode->workerPort,buf.data)));
int nDepends = 0;
int64 *dependsArray = NULL;
int32 nodesInvolved[2] = { 0 };
nodesInvolved[0] = sourceShardPlacement->nodeId;
nodesInvolved[1] = newWorkerNode->nodeId;
task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data,
nDepends,
dependsArray, 2,
nodesInvolved);
nodeTasksList = lappend(nodeTasksList,task);
}
/* Create a task to create reference table relations on this node */
if (list_length(nodeTasksList) > 0)
{
int nDepends = list_length(nodeTasksList);
int32 nodesInvolved[2] = { 0 };
nodesInvolved[0] = sourceShardPlacement->nodeId;
nodesInvolved[1] = newWorkerNode->nodeId;
int64 *dependsArray = palloc(sizeof(int64) * list_length(nodeTasksList));
int idx = 0;
foreach_declared_ptr(task, nodeTasksList)
{
dependsArray[idx++] = task->taskid;
}
resetStringInfo(&buf);
uint32 shardTransferFlags = SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY;
appendStringInfo(&buf,
"SELECT pg_catalog.citus_copy_one_shard_placement(%ld,%u,%u,%u,%s)",
shardId,
sourceShardPlacement->nodeId,
newWorkerNode->nodeId,
shardTransferFlags,
quote_literal_cstr(transferModeString));
ereport(DEBUG2,
(errmsg("creating relations for reference table '%s' on %s:%d ... QUERY= %s",
referenceTableName, newWorkerNode->workerName,
newWorkerNode->workerPort,buf.data)));
task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data,
nDepends,
dependsArray, 2,
nodesInvolved);
depTasksList = lappend(depTasksList, task);
pfree(dependsArray);
list_free(nodeTasksList);
nodeTasksList = NIL;
}
}
/*
* compute a dependent task list array to be used to indicate the completion of all
* reference table shards copy, so that we can start with distributed shard copy
*/
if (list_length(depTasksList) > 0)
{
*nDependTasks = list_length(depTasksList);
dependsTaskArray = palloc(sizeof(int64) * *nDependTasks);
int idx = 0;
foreach_declared_ptr(task, depTasksList)
{
dependsTaskArray[idx++] = task->taskid;
}
list_free(depTasksList);
}
/*
* Since reference tables have been copied via a loopback connection we do not have to
* retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure
* that other transactions will find the reference tables copied.
* We have obtained and held multiple locks, here we unlock them all in the reverse
* order we have obtained them in.
*/
for (int releaseLockmodeIndex = lengthof(lockmodes) - 1; releaseLockmodeIndex >= 0;
releaseLockmodeIndex--)
{
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
}
return dependsTaskArray;
}
/*
* HasNodesWithMissingReferenceTables checks if all reference tables are already copied to

View File

@ -21,6 +21,7 @@
extern void EnsureReferenceTablesExistOnAllNodes(void);
extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode);
extern int64 *ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode, int *nDependTasks);
extern bool HasNodesWithMissingReferenceTables(List **referenceTableList);
extern uint32 CreateReferenceTableColocationId(void);
extern uint32 GetReferenceTableColocationId(void);

View File

@ -44,7 +44,7 @@ typedef enum AdvisoryLocktagClass
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9,
ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10,
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12,
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, /* Not used anymore */
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13,
ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK = 14,
ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15
@ -57,7 +57,8 @@ typedef enum CitusOperations
CITUS_NONBLOCKING_SPLIT = 1,
CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY = 2,
CITUS_CREATE_COLOCATION_DEFAULT = 3,
CITUS_BACKGROUND_TASK_MONITOR = 4
CITUS_BACKGROUND_TASK_MONITOR = 4,
CITUS_REBALANCE_OPERATION = 5
} CitusOperations;
/* reuse advisory lock, but with different, unused field 4 (4)*/
@ -124,16 +125,6 @@ typedef enum CitusOperations
(uint32) operationId, \
ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID)
/* reuse advisory lock, but with different, unused field 4 (12)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */
#define SET_LOCKTAG_LOGICAL_REPLICATION(tag) \
SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \
(uint32) 0, \
(uint32) 0, \
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION)
/* reuse advisory lock, but with different, unused field 4 (14)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */

View File

@ -23,10 +23,51 @@ typedef enum
SHARD_TRANSFER_COPY = 2
} ShardTransferType;
/*
* ShardTransferOperationMode is used to pass flags to the shard transfer
* function. The flags are used to control the behavior of the transfer
* function.
*/
typedef enum
{
/*
* This flag instructs the transfer function to only transfer single shard
* rather than transfer all the colocated shards for the shard interval.
* Using this flag mean we might break the colocated shard
* relationship on the source node. So this is only usefull when setting up
* the new node and we are sure that the node would not be used until we have
* transfered all the shards.
* The reason we need this flag is that we want to be able to transfer
* colocated shards in parallel and for now it is only used for the reference
* table shards.
* Finally if you are using this flag, you should also use consider defering
* the creation of the relationships on the source node until all colocated
* shards are transfered (see: SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS).
*/
SHARD_TRANSFER_SINGLE_SHARD_ONLY = 1 << 0,
/* With this flag the shard transfer function does not create any constrainsts
* or foreign relations defined on the shard, This can be used to defer the
* creation of the relationships until all the shards are transfered.
* This is usefull when we are transfering colocated shards in parallel and
* we want to avoid the creation of the relationships on the source node
* until all the shards are transfered.
*/
SHARD_TRANSFER_NO_CREATE_RELATIONSHIPS = 1 << 1,
/* This flag is used to indicate that the shard transfer function should
* only create the relationships on the target node and not transfer any data.
* This is can be used to create the relationships that were defered
* during the transfering of shards.
*/
SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY = 1 << 2
} ShardTransferOperationMode;
extern void TransferShards(int64 shardId,
char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort,
char shardReplicationMode, ShardTransferType transferType);
char shardReplicationMode, ShardTransferType transferType,
uint32 optionFlags);
extern uint64 ShardListSizeInBytes(List *colocatedShardList,
char *workerNodeName, uint32 workerNodePort);
extern void ErrorIfMoveUnsupportedTableType(Oid relationId);