Parallelize Shard Rebalancing & Unlock Concurrent Logical Shard Moves (#7983)

DESCRIPTION: Parallelizes shard rebalancing and removes the bottlenecks
that previously blocked concurrent logical-replication moves.
These improvements reduce rebalance windows—particularly for clusters
with large reference tables and enable multiple shard transfers to run in parallel.

Motivation:
Citus’ shard rebalancer has some key performance bottlenecks:
**Sequential Movement of Reference Tables:**
Reference tables are often assumed to be small, but in real-world
deployments, they can grow significantly large. Previously, reference
table shards were transferred as a single unit, making the process
monolithic and time-consuming.
**No Parallelism Within a Colocation Group:**
Although Citus distributes data using colocated shards, shard
movements within the same colocation group were serialized. In
environments with hundreds of distributed tables colocated
together, this serialization significantly slowed down rebalance
operations.
 **Excessive Locking:**
 Rebalancer used restrictive locks and redundant logical replication
guards, further limiting concurrency.
The goal of this commit is to eliminate these inefficiencies and enable
maximum parallelism during rebalance, without compromising correctness
or compatibility. Parallelize shard rebalancing to reduce rebalance
time.

Feature Summary:

**1. Parallel Reference Table Rebalancing**
Each reference-table shard is now copied in its own background task.
Foreign key and other constraints are deferred until all shards are
copied.
For single shard movement without considering colocation a new
internal-only UDF '`citus_internal_copy_single_shard_placement`' is
introduced to allow single-shard copy/move operations.
Since this function is internal, we do not allow users to call it
directly.

**Temporary Hack to Set Background Task Context** Background tasks
cannot currently set custom GUCs like application_name before executing
internal-only functions. 'citus_rebalancer ...' statement as a prefix in
the task command. This is a temporary hack to label internal tasks until
proper GUC injection support is added to the background task executor.

**2. Changes in Locking Strategy**

- Drop the leftover replication lock that previously serialized shard
moves performed via logical replication. This lock was only needed when
we used to drop and recreate the subscriptions/publications before each
move. Since Citus now removes those objects later as part of the “unused
distributed objects” cleanup, shard moves via logical replication can
safely run in parallel without additional locking.

- Introduced a per-shard advisory lock to prevent concurrent operations
on the same shard while allowing maximum parallelism elsewhere.

- Change the lock mode in AcquirePlacementColocationLock from
ExclusiveLock to RowExclusiveLock to allow concurrent updates within the
same colocation group, while still preventing concurrent DDL operations.

**3. citus_rebalance_start() enhancements**
The citus_rebalance_start() function now accepts two new optional
parameters:

```
- parallel_transfer_colocated_shards BOOLEAN DEFAULT false,
- parallel_transfer_reference_tables BOOLEAN DEFAULT false
```
This ensures backward compatibility by preserving the existing behavior
and avoiding any disruption to user expectations and when both are set
to true, the rebalancer operates with full parallelism.

**Previous Rebalancer Behavior:**
`SELECT citus_rebalance_start(shard_transfer_mode := 'force_logical');`
This would:
Start a single background task for replicating all reference tables
Then, move all shards serially, one at a time.
```
Task 1: replicate_reference_tables()
         ↓
         Task 2: move_shard_1()
         ↓
         Task 3: move_shard_2()
         ↓
         Task 4: move_shard_3()
```
Slow and sequential. Reference table copy is a bottleneck. Colocated
shards must wait for each other.

**New Parallel Rebalancer:**
```
SELECT citus_rebalance_start(
        shard_transfer_mode := 'force_logical',
        parallel_transfer_colocated_shards := true,
        parallel_transfer_reference_tables := true
      );
```
This would:

- Schedule independent background tasks for each reference-table shard.
- Move colocated shards in parallel, while still maintaining dependency
order.
- Defer constraint application until all reference shards are in place.
-     
```
Task 1: copy_ref_shard_1()
          Task 2: copy_ref_shard_2()
          Task 3: copy_ref_shard_3()
            → Task 4: apply_constraints()
          ↓
         Task 5: copy_shard_1()
         Task 6: copy_shard_2()
         Task 7: copy_shard_3()
         ↓
         Task 8-10: move_shard_1..3()
```
Each operation is scheduled independently and can run as soon as
dependencies are satisfied.
pull/8136/head
Muhammad Usama 2025-08-18 17:44:14 +03:00 committed by GitHub
parent 2095679dc8
commit f743b35fc2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1875 additions and 277 deletions

View File

@ -222,6 +222,7 @@ typedef struct ShardMoveDependencies
{
HTAB *colocationDependencies;
HTAB *nodeDependencies;
bool parallelTransferColocatedShards;
} ShardMoveDependencies;
char *VariablesToBePassedToNewConnections = NULL;
@ -270,7 +271,9 @@ static ShardCost GetShardCost(uint64 shardId, void *context);
static List * NonColocatedDistRelationIdList(void);
static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid);
static int64 RebalanceTableShardsBackground(RebalanceOptions *options, Oid
shardReplicationModeOid);
shardReplicationModeOid,
bool ParallelTransferReferenceTables,
bool ParallelTransferColocatedShards);
static void AcquireRebalanceColocationLock(Oid relationId, const char *operationName);
static void ExecutePlacementUpdates(List *placementUpdateList, Oid
shardReplicationModeOid, char *noticeOperation);
@ -296,9 +299,12 @@ static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStati
static void ErrorOnConcurrentRebalance(RebalanceOptions *);
static List * GetSetCommandListForNewConnections(void);
static int64 GetColocationId(PlacementUpdateEvent *move);
static ShardMoveDependencies InitializeShardMoveDependencies();
static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64
colocationId,
static ShardMoveDependencies InitializeShardMoveDependencies(bool
ParallelTransferColocatedShards);
static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move,
int64 colocationId,
int64 *refTablesDepTaskIds,
int refTablesDepTaskIdsCount,
ShardMoveDependencies shardMoveDependencies,
int *nDepends);
static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId,
@ -1014,6 +1020,12 @@ citus_rebalance_start(PG_FUNCTION_ARGS)
PG_ENSURE_ARGNOTNULL(2, "shard_transfer_mode");
Oid shardTransferModeOid = PG_GETARG_OID(2);
PG_ENSURE_ARGNOTNULL(3, "parallel_transfer_reference_tables");
bool ParallelTransferReferenceTables = PG_GETARG_BOOL(3);
PG_ENSURE_ARGNOTNULL(4, "parallel_transfer_colocated_shards");
bool ParallelTransferColocatedShards = PG_GETARG_BOOL(4);
RebalanceOptions options = {
.relationIdList = relationIdList,
.threshold = strategy->defaultThreshold,
@ -1023,7 +1035,9 @@ citus_rebalance_start(PG_FUNCTION_ARGS)
.rebalanceStrategy = strategy,
.improvementThreshold = strategy->improvementThreshold,
};
int jobId = RebalanceTableShardsBackground(&options, shardTransferModeOid);
int jobId = RebalanceTableShardsBackground(&options, shardTransferModeOid,
ParallelTransferReferenceTables,
ParallelTransferColocatedShards);
if (jobId == 0)
{
@ -1988,17 +2002,20 @@ GetColocationId(PlacementUpdateEvent *move)
* given colocation group and the other one is for tracking source nodes of all moves.
*/
static ShardMoveDependencies
InitializeShardMoveDependencies()
InitializeShardMoveDependencies(bool ParallelTransferColocatedShards)
{
ShardMoveDependencies shardMoveDependencies;
shardMoveDependencies.colocationDependencies = CreateSimpleHashWithNameAndSize(int64,
ShardMoveDependencyInfo,
"colocationDependencyHashMap",
6);
shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32,
ShardMoveSourceNodeHashEntry,
"nodeDependencyHashMap",
6);
shardMoveDependencies.parallelTransferColocatedShards =
ParallelTransferColocatedShards;
return shardMoveDependencies;
}
@ -2009,6 +2026,7 @@ InitializeShardMoveDependencies()
*/
static int64 *
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
int64 *refTablesDepTaskIds, int refTablesDepTaskIdsCount,
ShardMoveDependencies shardMoveDependencies, int *nDepends)
{
HTAB *dependsList = CreateSimpleHashSetWithNameAndSize(int64,
@ -2016,13 +2034,17 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
bool found;
/* Check if there exists a move in the same colocation group scheduled earlier. */
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &found);
if (found)
if (!shardMoveDependencies.parallelTransferColocatedShards)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
/* Check if there exists a move in the same colocation group scheduled earlier. */
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, &
found);
if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}
}
/*
@ -2045,6 +2067,23 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
}
}
*nDepends = hash_get_num_entries(dependsList);
if (*nDepends == 0)
{
/*
* shard copy can only start after finishing copy of reference table shards
* so each shard task will have a dependency on the task that indicates the
* copy complete of reference tables
*/
while (refTablesDepTaskIdsCount > 0)
{
int64 refTableTaskId = *refTablesDepTaskIds;
hash_search(dependsList, &refTableTaskId, HASH_ENTER, NULL);
refTablesDepTaskIds++;
refTablesDepTaskIdsCount--;
}
}
*nDepends = hash_get_num_entries(dependsList);
int64 *dependsArray = NULL;
@ -2076,9 +2115,13 @@ static void
UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int64 taskId,
ShardMoveDependencies shardMoveDependencies)
{
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;
if (!shardMoveDependencies.parallelTransferColocatedShards)
{
ShardMoveDependencyInfo *shardMoveDependencyInfo = hash_search(
shardMoveDependencies.colocationDependencies, &colocationId,
HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;
}
bool found;
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
@ -2103,7 +2146,9 @@ UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int
* background job+task infrastructure.
*/
static int64
RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationModeOid)
RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationModeOid,
bool ParallelTransferReferenceTables,
bool ParallelTransferColocatedShards)
{
if (list_length(options->relationIdList) == 0)
{
@ -2174,7 +2219,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
initStringInfo(&buf);
List *referenceTableIdList = NIL;
int64 replicateRefTablesTaskId = 0;
int64 *refTablesDepTaskIds = NULL;
int refTablesDepTaskIdsCount = 0;
if (HasNodesWithMissingReferenceTables(&referenceTableIdList))
{
@ -2187,22 +2233,41 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
* Reference tables need to be copied to (newly-added) nodes, this needs to be the
* first task before we can move any other table.
*/
appendStringInfo(&buf,
"SELECT pg_catalog.replicate_reference_tables(%s)",
quote_literal_cstr(shardTranferModeLabel));
if (ParallelTransferReferenceTables)
{
refTablesDepTaskIds =
ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(
jobId, shardTransferMode, &refTablesDepTaskIdsCount);
ereport(DEBUG2,
(errmsg("%d dependent copy reference table tasks for job %ld",
refTablesDepTaskIdsCount, jobId),
errdetail("Rebalance scheduled as background job"),
errhint("To monitor progress, run: SELECT * FROM "
"citus_rebalance_status();")));
}
else
{
/* Move all reference tables as single task. Classical way */
appendStringInfo(&buf,
"SELECT pg_catalog.replicate_reference_tables(%s)",
quote_literal_cstr(shardTranferModeLabel));
int32 nodesInvolved[] = { 0 };
int32 nodesInvolved[] = { 0 };
/* replicate_reference_tables permissions require superuser */
Oid superUserId = CitusExtensionOwner();
BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0,
NULL, 0, nodesInvolved);
replicateRefTablesTaskId = task->taskid;
/* replicate_reference_tables permissions require superuser */
Oid superUserId = CitusExtensionOwner();
BackgroundTask *task = ScheduleBackgroundTask(jobId, superUserId, buf.data, 0,
NULL, 0, nodesInvolved);
refTablesDepTaskIds = palloc0(sizeof(int64));
refTablesDepTaskIds[0] = task->taskid;
refTablesDepTaskIdsCount = 1;
}
}
PlacementUpdateEvent *move = NULL;
ShardMoveDependencies shardMoveDependencies = InitializeShardMoveDependencies();
ShardMoveDependencies shardMoveDependencies =
InitializeShardMoveDependencies(ParallelTransferColocatedShards);
foreach_declared_ptr(move, placementUpdateList)
{
@ -2220,16 +2285,11 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
int nDepends = 0;
int64 *dependsArray = GenerateTaskMoveDependencyList(move, colocationId,
refTablesDepTaskIds,
refTablesDepTaskIdsCount,
shardMoveDependencies,
&nDepends);
if (nDepends == 0 && replicateRefTablesTaskId > 0)
{
nDepends = 1;
dependsArray = palloc(nDepends * sizeof(int64));
dependsArray[0] = replicateRefTablesTaskId;
}
int32 nodesInvolved[2] = { 0 };
nodesInvolved[0] = move->sourceNode->nodeId;
nodesInvolved[1] = move->targetNode->nodeId;

View File

@ -1546,12 +1546,15 @@ NonBlockingShardSplit(SplitOperation splitOperation,
* 9) Logically replicate all the changes and do most of the table DDL,
* like index and foreign key creation.
*/
bool skipInterShardRelationshipCreation = false;
CompleteNonBlockingShardTransfer(sourceColocatedShardIntervalList,
sourceConnection,
publicationInfoHash,
logicalRepTargetList,
groupedLogicalRepTargetsHash,
SHARD_SPLIT);
SHARD_SPLIT,
skipInterShardRelationshipCreation);
/*
* 10) Delete old shards metadata and mark the shards as to be deferred drop.

View File

@ -107,16 +107,18 @@ static void ErrorIfSameNode(char *sourceNodeName, int sourceNodePort,
static void CopyShardTables(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort, bool useLogicalReplication,
const char *operationName);
const char *operationName, uint32 optionFlags);
static void CopyShardTablesViaLogicalReplication(List *shardIntervalList,
char *sourceNodeName,
int32 sourceNodePort,
char *targetNodeName,
int32 targetNodePort);
int32 targetNodePort,
uint32 optionFlags);
static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort);
char *targetNodeName, int32 targetNodePort,
uint32 optionFlags);
static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName,
int32 sourceNodePort, const char *targetNodeName,
int32 targetNodePort);
@ -165,7 +167,8 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval,
static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval,
List *ddlCommandList);
static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode);
static void AcquireShardPlacementLock(uint64_t shardId, int lockMode, Oid relationId,
const char *operationName);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(citus_copy_shard_placement);
@ -174,7 +177,7 @@ PG_FUNCTION_INFO_V1(master_copy_shard_placement);
PG_FUNCTION_INFO_V1(citus_move_shard_placement);
PG_FUNCTION_INFO_V1(citus_move_shard_placement_with_nodeid);
PG_FUNCTION_INFO_V1(master_move_shard_placement);
PG_FUNCTION_INFO_V1(citus_internal_copy_single_shard_placement);
double DesiredPercentFreeAfterMove = 10;
bool CheckAvailableSpaceBeforeMove = true;
@ -203,7 +206,7 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS)
TransferShards(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
shardReplicationMode, SHARD_TRANSFER_COPY);
shardReplicationMode, SHARD_TRANSFER_COPY, 0);
PG_RETURN_VOID();
}
@ -232,7 +235,7 @@ citus_copy_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort,
targetNode->workerName, targetNode->workerPort,
shardReplicationMode, SHARD_TRANSFER_COPY);
shardReplicationMode, SHARD_TRANSFER_COPY, 0);
PG_RETURN_VOID();
}
@ -267,13 +270,69 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
TransferShards(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
shardReplicationMode, SHARD_TRANSFER_COPY);
shardReplicationMode, SHARD_TRANSFER_COPY, 0);
PG_RETURN_VOID();
}
/*
* citus_internal_copy_single_shard_placement is an internal function that
* copies a single shard placement from a source node to a target node.
* It has two main differences from citus_copy_shard_placement:
* 1. it copies only a single shard placement, not all colocated shards
* 2. It allows to defer the constraints creation and this same function
* can be used to create the constraints later.
*
* The primary use case for this function is to transfer the shards of
* reference tables. Since all reference tables are colocated together,
* and each reference table has only one shard, this function can be used
* to transfer the shards of reference tables in parallel.
* Furthermore, the reference tables could have relations with
* other reference tables, so we need to ensure that their constraints
* are also transferred after copying the shards to the target node.
* For this reason, we allow the caller to defer the constraints creation.
*
* This function is not supposed to be called by the user directly.
*/
Datum
citus_internal_copy_single_shard_placement(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
int64 shardId = PG_GETARG_INT64(0);
uint32 sourceNodeId = PG_GETARG_INT32(1);
uint32 targetNodeId = PG_GETARG_INT32(2);
uint32 flags = PG_GETARG_INT32(3);
Oid shardReplicationModeOid = PG_GETARG_OID(4);
bool missingOk = false;
WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk);
WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk);
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
/*
* This is an internal function that is used by the rebalancer.
* It is not supposed to be called by the user directly.
*/
if (!IsRebalancerInternalBackend())
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("This is an internal Citus function that can only"
" be used by a rebalancer task")));
}
TransferShards(shardId, sourceNode->workerName, sourceNode->workerPort,
targetNode->workerName, targetNode->workerPort,
shardReplicationMode, SHARD_TRANSFER_COPY, flags);
PG_RETURN_VOID();
}
/*
* citus_move_shard_placement moves given shard (and its co-located shards) from one
* node to the other node. To accomplish this it entirely recreates the table structure
@ -315,7 +374,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
TransferShards(shardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
shardReplicationMode, SHARD_TRANSFER_MOVE);
shardReplicationMode, SHARD_TRANSFER_MOVE, 0);
PG_RETURN_VOID();
}
@ -343,20 +402,77 @@ citus_move_shard_placement_with_nodeid(PG_FUNCTION_ARGS)
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
TransferShards(shardId, sourceNode->workerName,
sourceNode->workerPort, targetNode->workerName,
targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE);
targetNode->workerPort, shardReplicationMode, SHARD_TRANSFER_MOVE, 0);
PG_RETURN_VOID();
}
/*
* TransferShards is the function for shard transfers.
* AcquireShardPlacementLock tries to acquire a lock on the shardid
* while moving/copying the shard placement. If this
* is it not possible it fails instantly because this means
* another move/copy on same shard is currently happening. */
static void
AcquireShardPlacementLock(uint64_t shardId, int lockMode, Oid relationId,
const char *operationName)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = true;
SET_LOCKTAG_SHARD_MOVE(tag, shardId);
LockAcquireResult lockAcquired = LockAcquire(&tag, lockMode, sessionLock, dontWait);
if (!lockAcquired)
{
ereport(ERROR, (errmsg("could not acquire the lock required to %s %s",
operationName,
generate_qualified_relation_name(relationId)),
errdetail("It means that either a concurrent shard move "
"or colocated distributed table creation is "
"happening."),
errhint("Make sure that the concurrent operation has "
"finished and re-run the command")));
}
}
/*
* TransferShards is responsible for handling shard transfers.
*
* The optionFlags parameter controls the transfer behavior:
*
* - By default, shard colocation groups are treated as a single unit. This works
* well for distributed tables, since they can contain multiple colocated shards
* on the same node, and shard transfers can still be parallelized at the group level.
*
* - Reference tables are different: every reference table belongs to the same
* colocation group but has only a single shard. To parallelize reference table
* transfers, we must bypass the colocation group. The
* SHARD_TRANSFER_SINGLE_SHARD_ONLY flag enables this behavior by transferring
* only the specific shardId passed into the function, ignoring colocated shards.
*
* - Reference tables may also define foreign key relationships with each other.
* Since we cannot create those relationships until all shards have been moved,
* the SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS flag is used to defer their
* creation until shard transfer completes.
*
* - After shards are transferred, the SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY
* flag is used to create the foreign key relationships for already-transferred
* reference tables.
*
* Currently, optionFlags are only used to customize reference table transfers.
* For distributed tables, optionFlags should always be set to 0.
* passing 0 as optionFlags means that the default behavior will be used for
* all aspects of the shard transfer. That is to consider all colocated shards
* as a single unit and return after creating the necessary relationships.
*/
void
TransferShards(int64 shardId, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort, char shardReplicationMode,
ShardTransferType transferType)
ShardTransferType transferType, uint32 optionFlags)
{
/* strings to be used in log messages */
const char *operationName = ShardTransferTypeNames[transferType];
@ -385,20 +501,36 @@ TransferShards(int64 shardId, char *sourceNodeName,
ErrorIfTargetNodeIsNotSafeForTransfer(targetNodeName, targetNodePort, transferType);
AcquirePlacementColocationLock(distributedTableId, ExclusiveLock, operationName);
AcquirePlacementColocationLock(distributedTableId, RowExclusiveLock, operationName);
List *colocatedTableList = ColocatedTableList(distributedTableId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
List *colocatedTableList;
List *colocatedShardList;
/*
* If SHARD_TRANSFER_SINGLE_SHARD_ONLY is set, we only transfer a single shard
* specified by shardId. Otherwise, we transfer all colocated shards.
*/
bool isSingleShardOnly = optionFlags & SHARD_TRANSFER_SINGLE_SHARD_ONLY;
if (isSingleShardOnly)
{
colocatedTableList = list_make1_oid(distributedTableId);
colocatedShardList = list_make1(shardInterval);
}
else
{
colocatedTableList = ColocatedTableList(distributedTableId);
colocatedShardList = ColocatedShardIntervalList(shardInterval);
}
EnsureTableListOwner(colocatedTableList);
if (transferType == SHARD_TRANSFER_MOVE)
{
/*
* Block concurrent DDL / TRUNCATE commands on the relation. Similarly,
* block concurrent citus_move_shard_placement() on any shard of
* the same relation. This is OK for now since we're executing shard
* moves sequentially anyway.
* Block concurrent DDL / TRUNCATE commands on the relation. while,
* allow concurrent citus_move_shard_placement() on the shards of
* the same relation.
*/
LockColocatedRelationsForMove(colocatedTableList);
}
@ -412,14 +544,66 @@ TransferShards(int64 shardId, char *sourceNodeName,
/*
* We sort shardIntervalList so that lock operations will not cause any
* deadlocks.
* deadlocks. But we do not need to do that if the list contain only one
* shard.
*/
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
if (!isSingleShardOnly)
{
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
}
if (TransferAlreadyCompleted(colocatedShardList,
sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort,
transferType))
/* We have pretty much covered the concurrent rebalance operations
* and we want to allow concurrent moves within the same colocation group.
* but at the same time we want to block the concurrent moves on the same shard
* placement. So we lock the shard moves before starting the transfer.
*/
foreach_declared_ptr(shardInterval, colocatedShardList)
{
int64 shardIdToLock = shardInterval->shardId;
AcquireShardPlacementLock(shardIdToLock, ExclusiveLock, distributedTableId,
operationName);
}
bool transferAlreadyCompleted = TransferAlreadyCompleted(colocatedShardList,
sourceNodeName,
sourceNodePort,
targetNodeName,
targetNodePort,
transferType);
/*
* If we just need to create the shard relationships,We don't need to do anything
* else other than calling CopyShardTables with SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY
* flag.
*/
bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY;
if (createRelationshipsOnly)
{
if (!transferAlreadyCompleted)
{
/*
* if the transfer is not completed, and we are here just to create
* the relationships, we can return right away
*/
ereport(WARNING, (errmsg("shard is not present on node %s:%d",
targetNodeName, targetNodePort),
errdetail("%s may have not completed.",
operationNameCapitalized)));
return;
}
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName
,
targetNodePort, (shardReplicationMode ==
TRANSFER_MODE_FORCE_LOGICAL),
operationFunctionName, optionFlags);
/* We don't need to do anything else, just return */
return;
}
if (transferAlreadyCompleted)
{
/* if the transfer is already completed, we can return right away */
ereport(WARNING, (errmsg("shard is already present on node %s:%d",
@ -515,7 +699,8 @@ TransferShards(int64 shardId, char *sourceNodeName,
}
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, targetNodeName,
targetNodePort, useLogicalReplication, operationFunctionName);
targetNodePort, useLogicalReplication, operationFunctionName,
optionFlags);
if (transferType == SHARD_TRANSFER_MOVE)
{
@ -676,7 +861,7 @@ IsShardListOnNode(List *colocatedShardList, char *targetNodeName, uint32 targetN
/*
* LockColocatedRelationsForMove takes a list of relations, locks all of them
* using ShareUpdateExclusiveLock
* using ShareLock
*/
static void
LockColocatedRelationsForMove(List *colocatedTableList)
@ -684,7 +869,7 @@ LockColocatedRelationsForMove(List *colocatedTableList)
Oid colocatedTableId = InvalidOid;
foreach_declared_oid(colocatedTableId, colocatedTableList)
{
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
LockRelationOid(colocatedTableId, RowExclusiveLock);
}
}
@ -1333,7 +1518,7 @@ ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList)
static void
CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort, bool useLogicalReplication,
const char *operationName)
const char *operationName, uint32 optionFlags)
{
if (list_length(shardIntervalList) < 1)
{
@ -1343,16 +1528,22 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
/* Start operation to prepare for generating cleanup records */
RegisterOperationNeedingCleanup();
if (useLogicalReplication)
bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY;
/*
* If we're just going to create relationships only always use
* CopyShardTablesViaBlockWrites.
*/
if (useLogicalReplication && !createRelationshipsOnly)
{
CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName,
sourceNodePort, targetNodeName,
targetNodePort);
targetNodePort, optionFlags);
}
else
{
CopyShardTablesViaBlockWrites(shardIntervalList, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort);
targetNodeName, targetNodePort, optionFlags);
}
/*
@ -1369,7 +1560,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
static void
CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort)
int32 targetNodePort, uint32 optionFlags)
{
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CopyShardTablesViaLogicalReplication",
@ -1407,9 +1598,13 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa
MemoryContextSwitchTo(oldContext);
bool skipRelationshipCreation = (optionFlags &
SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS);
/* data copy is done seperately when logical replication is used */
LogicallyReplicateShards(shardIntervalList, sourceNodeName,
sourceNodePort, targetNodeName, targetNodePort);
sourceNodePort, targetNodeName, targetNodePort,
skipRelationshipCreation);
}
@ -1437,7 +1632,7 @@ CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList)
static void
CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort)
int32 targetNodePort, uint32 optionFlags)
{
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CopyShardTablesViaBlockWrites",
@ -1446,127 +1641,150 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);
/* iterate through the colocated shards and copy each */
ShardInterval *shardInterval = NULL;
foreach_declared_ptr(shardInterval, shardIntervalList)
{
/*
* For each shard we first create the shard table in a separate
* transaction and then we copy the data and create the indexes in a
* second separate transaction. The reason we don't do both in a single
* transaction is so we can see the size of the new shard growing
* during the copy when we run get_rebalance_progress in another
* session. If we wouldn't split these two phases up, then the table
* wouldn't be visible in the session that get_rebalance_progress uses.
* So get_rebalance_progress would always report its size as 0.
*/
List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName,
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
/* drop the shard we created on the target, in case of failure */
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(shardInterval),
GroupForNode(targetNodeName,
targetNodePort),
CLEANUP_ON_FAILURE);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
}
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_COPYING_DATA);
ConflictWithIsolationTestingBeforeCopy();
CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL);
ConflictWithIsolationTestingAfterCopy();
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS);
foreach_declared_ptr(shardInterval, shardIntervalList)
{
List *ddlCommandList =
PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
MemoryContextReset(localContext);
}
bool createRelationshipsOnly = optionFlags & SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY;
/*
* Once all shards are copied, we can recreate relationships between shards.
* Create DDL commands to Attach child tables to their parents in a partitioning hierarchy.
* If were only asked to create the relationships, the shards are already
* present and populated on the node. Skip the tablesetup and dataloading
* steps and proceed straight to creating the relationships.
*/
List *shardIntervalWithDDCommandsList = NIL;
foreach_declared_ptr(shardInterval, shardIntervalList)
if (!createRelationshipsOnly)
{
if (PartitionTable(shardInterval->relationId))
/* iterate through the colocated shards and copy each */
foreach_declared_ptr(shardInterval, shardIntervalList)
{
char *attachPartitionCommand =
GenerateAttachShardPartitionCommand(shardInterval);
/*
* For each shard we first create the shard table in a separate
* transaction and then we copy the data and create the indexes in a
* second separate transaction. The reason we don't do both in a single
* transaction is so we can see the size of the new shard growing
* during the copy when we run get_rebalance_progress in another
* session. If we wouldn't split these two phases up, then the table
* wouldn't be visible in the session that get_rebalance_progress uses.
* So get_rebalance_progress would always report its size as 0.
*/
List *ddlCommandList = RecreateShardDDLCommandList(shardInterval,
sourceNodeName,
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
ShardCommandList *shardCommandList = CreateShardCommandList(
shardInterval,
list_make1(attachPartitionCommand));
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
shardCommandList);
/* drop the shard we created on the target, in case of failure */
InsertCleanupRecordOutsideTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
GroupForNode(targetNodeName,
targetNodePort),
CLEANUP_ON_FAILURE);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
}
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_COPYING_DATA);
ConflictWithIsolationTestingBeforeCopy();
CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL);
ConflictWithIsolationTestingAfterCopy();
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_CREATING_CONSTRAINTS);
foreach_declared_ptr(shardInterval, shardIntervalList)
{
List *ddlCommandList =
PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList);
MemoryContextReset(localContext);
}
}
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS);
/*
* Iterate through the colocated shards and create DDL commamnds
* to create the foreign constraints.
* Skip creating shard relationships if the caller has requested that they
* not be created.
*/
foreach_declared_ptr(shardInterval, shardIntervalList)
bool skipRelationshipCreation = (optionFlags &
SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS);
if (!skipRelationshipCreation)
{
List *shardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL;
/*
* Once all shards are copied, we can recreate relationships between shards.
* Create DDL commands to Attach child tables to their parents in a partitioning hierarchy.
*/
List *shardIntervalWithDDCommandsList = NIL;
foreach_declared_ptr(shardInterval, shardIntervalList)
{
if (PartitionTable(shardInterval->relationId))
{
char *attachPartitionCommand =
GenerateAttachShardPartitionCommand(shardInterval);
CopyShardForeignConstraintCommandListGrouped(shardInterval,
&shardForeignConstraintCommandList,
&referenceTableForeignConstraintList);
ShardCommandList *shardCommandList = CreateShardCommandList(
shardInterval,
list_make1(attachPartitionCommand));
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
shardCommandList);
}
}
ShardCommandList *shardCommandList = CreateShardCommandList(
shardInterval,
list_concat(shardForeignConstraintCommandList,
referenceTableForeignConstraintList));
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
shardCommandList);
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_CREATING_FOREIGN_KEYS);
/*
* Iterate through the colocated shards and create DDL commamnds
* to create the foreign constraints.
*/
foreach_declared_ptr(shardInterval, shardIntervalList)
{
List *shardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL;
CopyShardForeignConstraintCommandListGrouped(shardInterval,
&
shardForeignConstraintCommandList,
&
referenceTableForeignConstraintList);
ShardCommandList *shardCommandList = CreateShardCommandList(
shardInterval,
list_concat(shardForeignConstraintCommandList,
referenceTableForeignConstraintList));
shardIntervalWithDDCommandsList = lappend(shardIntervalWithDDCommandsList,
shardCommandList);
}
/* Now execute the Partitioning & Foreign constraints creation commads. */
ShardCommandList *shardCommandList = NULL;
foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList)
{
char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner,
shardCommandList->ddlCommandList);
}
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_COMPLETING);
}
/* Now execute the Partitioning & Foreign constraints creation commads. */
ShardCommandList *shardCommandList = NULL;
foreach_declared_ptr(shardCommandList, shardIntervalWithDDCommandsList)
{
char *tableOwner = TableOwner(shardCommandList->shardInterval->relationId);
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner,
shardCommandList->ddlCommandList);
}
UpdatePlacementUpdateStatusForShardIntervalList(
shardIntervalList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_COMPLETING);
MemoryContextReset(localContext);
MemoryContextSwitchTo(oldContext);
}
@ -1647,7 +1865,8 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList,
MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */);
NULL /* jobIdList (ignored by API implementation) */
);
}

View File

@ -118,7 +118,8 @@ static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shard
static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId,
uint64 shardId);
static void CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList,
LogicalRepType type);
LogicalRepType type,
bool skipInterShardRelationships);
static void ExecuteCreateIndexCommands(List *logicalRepTargetList);
static void ExecuteCreateConstraintsBackedByIndexCommands(List *logicalRepTargetList);
static List * ConvertNonExistingPlacementDDLCommandsToTasks(List *shardCommandList,
@ -132,7 +133,6 @@ static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command);
static void WaitForMiliseconds(long timeout);
static XLogRecPtr GetSubscriptionPosition(
GroupedLogicalRepTargets *groupedLogicalRepTargets);
static void AcquireLogicalReplicationLock(void);
static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode,
List *shardIntervals);
@ -154,9 +154,9 @@ static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition,
*/
void
LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort,
char *targetNodeName, int targetNodePort)
char *targetNodeName, int targetNodePort,
bool skipInterShardRelationshipCreation)
{
AcquireLogicalReplicationLock();
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlags = FORCE_NEW_CONNECTION;
@ -258,7 +258,8 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
publicationInfoHash,
logicalRepTargetList,
groupedLogicalRepTargetsHash,
SHARD_MOVE);
SHARD_MOVE,
skipInterShardRelationshipCreation);
/*
* We use these connections exclusively for subscription management,
@ -317,7 +318,8 @@ CompleteNonBlockingShardTransfer(List *shardList,
HTAB *publicationInfoHash,
List *logicalRepTargetList,
HTAB *groupedLogicalRepTargetsHash,
LogicalRepType type)
LogicalRepType type,
bool skipInterShardRelationshipCreation)
{
/* Start applying the changes from the replication slots to catch up. */
EnableSubscriptions(logicalRepTargetList);
@ -345,7 +347,8 @@ CompleteNonBlockingShardTransfer(List *shardList,
* and partitioning hierarchy. Once they are done, wait until the replication
* catches up again. So we don't block writes too long.
*/
CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type);
CreatePostLogicalReplicationDataLoadObjects(logicalRepTargetList, type,
skipInterShardRelationshipCreation);
UpdatePlacementUpdateStatusForShardIntervalList(
shardList,
@ -372,7 +375,7 @@ CompleteNonBlockingShardTransfer(List *shardList,
WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash);
if (type != SHARD_SPLIT)
if (type != SHARD_SPLIT && !skipInterShardRelationshipCreation)
{
UpdatePlacementUpdateStatusForShardIntervalList(
shardList,
@ -497,25 +500,6 @@ CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList)
}
/*
* AcquireLogicalReplicationLock tries to acquire a lock for logical
* replication. We need this lock, because at the start of logical replication
* we clean up old subscriptions and publications. Because of this cleanup it's
* not safe to run multiple logical replication based shard moves at the same
* time. If multiple logical replication moves would run at the same time, the
* second move might clean up subscriptions and publications that are in use by
* another move.
*/
static void
AcquireLogicalReplicationLock(void)
{
LOCKTAG tag;
SET_LOCKTAG_LOGICAL_REPLICATION(tag);
LockAcquire(&tag, ExclusiveLock, false, false);
}
/*
* PrepareReplicationSubscriptionList returns list of shards to be logically
* replicated from given shard list. This is needed because Postgres does not
@ -675,7 +659,8 @@ GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId)
*/
static void
CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList,
LogicalRepType type)
LogicalRepType type,
bool skipInterShardRelationships)
{
/*
* We create indexes in 4 steps.
@ -705,7 +690,7 @@ CreatePostLogicalReplicationDataLoadObjects(List *logicalRepTargetList,
/*
* Creating the partitioning hierarchy errors out in shard splits when
*/
if (type != SHARD_SPLIT)
if (type != SHARD_SPLIT && !skipInterShardRelationships)
{
/* create partitioning hierarchy, if any */
CreatePartitioningHierarchy(logicalRepTargetList);

View File

@ -1967,7 +1967,7 @@ RegisterCitusConfigVariables(void)
"because total background worker count is shared by all background workers. The value "
"represents the possible maximum number of task executors."),
&MaxBackgroundTaskExecutors,
4, 1, MAX_BG_TASK_EXECUTORS,
1, 1, MAX_BG_TASK_EXECUTORS,
PGC_SIGHUP,
GUC_STANDARD,
NULL, NULL, NULL);

View File

@ -1,7 +1,11 @@
-- citus--13.1-1--13.2-1
-- bump version to 13.2-1
#include "udfs/worker_last_saved_explain_analyze/13.2-1.sql"
#include "udfs/citus_rebalance_start/13.2-1.sql"
#include "udfs/citus_internal_copy_single_shard_placement/13.2-1.sql"
#include "udfs/citus_finish_pg_upgrade/13.2-1.sql"
DO $drop_leftover_old_columnar_objects$

View File

@ -1,5 +1,9 @@
-- citus--13.2-1--13.1-1
-- downgrade version to 13.1-1
DROP FUNCTION IF EXISTS citus_internal.citus_internal_copy_single_shard_placement(bigint, integer, integer, integer, citus.shard_transfer_mode);
DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean);
#include "../udfs/citus_rebalance_start/11.1-1.sql"
DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze();
#include "../udfs/worker_last_saved_explain_analyze/9.4-1.sql"

View File

@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION citus_internal.citus_internal_copy_single_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
flags integer,
transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_copy_single_shard_placement$$;

View File

@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION citus_internal.citus_internal_copy_single_shard_placement(
shard_id bigint,
source_node_id integer,
target_node_id integer,
flags integer,
transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_copy_single_shard_placement$$;

View File

@ -0,0 +1,15 @@
DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode);
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start(
rebalance_strategy name DEFAULT NULL,
drain_only boolean DEFAULT false,
shard_transfer_mode citus.shard_transfer_mode default 'auto',
parallel_transfer_reference_tables boolean DEFAULT false,
parallel_transfer_colocated_shards boolean DEFAULT false
)
RETURNS bigint
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean)
IS 'rebalance the shards in the cluster in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) TO PUBLIC;

View File

@ -1,11 +1,15 @@
DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode);
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start(
rebalance_strategy name DEFAULT NULL,
drain_only boolean DEFAULT false,
shard_transfer_mode citus.shard_transfer_mode default 'auto'
shard_transfer_mode citus.shard_transfer_mode default 'auto',
parallel_transfer_reference_tables boolean DEFAULT false,
parallel_transfer_colocated_shards boolean DEFAULT false
)
RETURNS bigint
AS 'MODULE_PATHNAME'
LANGUAGE C VOLATILE;
COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode)
COMMENT ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean)
IS 'rebalance the shards in the cluster in the background';
GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode) TO PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean) TO PUBLIC;

View File

@ -99,7 +99,7 @@ double DistributedDeadlockDetectionTimeoutFactor = 2.0;
int Recover2PCInterval = 60000;
int DeferShardDeleteInterval = 15000;
int BackgroundTaskQueueCheckInterval = 5000;
int MaxBackgroundTaskExecutors = 4;
int MaxBackgroundTaskExecutors = 1;
char *MainDb = "";
/* config variables for metadata sync timeout */

View File

@ -27,6 +27,7 @@
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/hash_helpers.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
@ -37,20 +38,27 @@
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_transfer.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
/* local function forward declarations */
static List * WorkersWithoutReferenceTablePlacement(uint64 shardId, LOCKMODE lockMode);
static StringInfo CopyShardPlacementToWorkerNodeQuery(
ShardPlacement *sourceShardPlacement,
WorkerNode *workerNode,
char transferMode);
static StringInfo CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement
,
WorkerNode *workerNode,
char transferMode);
static bool AnyRelationsModifiedInTransaction(List *relationIdList);
static List * ReplicatedMetadataSyncedDistributedTableList(void);
static bool NodeHasAllReferenceTableReplicas(WorkerNode *workerNode);
typedef struct ShardTaskEntry
{
uint64 shardId;
int64 taskId;
} ShardTaskEntry;
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
@ -179,7 +187,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
if (list_length(newWorkersList) == 0)
{
/*
* All workers alreaddy have a copy of the reference tables, make sure that
* All workers already have a copy of the reference tables, make sure that
* any locks obtained earlier are released. It will probably not matter, but
* we release the locks in the reverse order we obtained them in.
*/
@ -230,7 +238,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
WorkerNode *newWorkerNode = NULL;
foreach_declared_ptr(newWorkerNode, newWorkersList)
{
ereport(NOTICE, (errmsg("replicating reference table '%s' to %s:%d ...",
ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...",
referenceTableName, newWorkerNode->workerName,
newWorkerNode->workerPort)));
@ -279,6 +287,386 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
CloseConnection(connection);
}
/*
* Since reference tables have been copied via a loopback connection we do not have
* to retain our locks. Since Citus only runs well in READ COMMITTED mode we can be
* sure that other transactions will find the reference tables copied.
* We have obtained and held multiple locks, here we unlock them all in the reverse
* order we have obtained them in.
*/
for (int releaseLockmodeIndex = lengthof(lockmodes) - 1; releaseLockmodeIndex >= 0;
releaseLockmodeIndex--)
{
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
}
}
/*
* ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes is essentially a
* twin of EnsureReferenceTablesExistOnAllNodesExtended. The difference is instead of
* copying the missing tables on to the worker nodes this function creates the background
* tasks for each required copy operation and schedule it in the background job.
* Another difference is that instead of moving all the colocated shards sequencially
* this function creates a seperate background task for each shard, even when the shards
* are part of same colocated shard group.
*
* For transfering the shards in parallel the function creates a task for each shard
* move and than schedules another task that creates the shard relationships (if any)
* between shards and that task wait for the completion of all shard transfer tasks.
*
* The function returns an array of task ids that are created for creating the shard
* relationships, effectively completion of these tasks signals the completion of
* of reference table setup on the worker nodes. Any process that needs to wait for
* the completion of the reference table setup can wait for these tasks to complete.
*
* The transferMode is passed to this function gets ignored for now and it only uses
* block write mode.
*/
int64 *
ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char transferMode
,
int *nDependTasks)
{
List *referenceTableIdList = NIL;
uint64 shardId = INVALID_SHARD_ID;
List *newWorkersList = NIL;
int64 *dependsTaskArray = NULL;
const char *referenceTableName = NULL;
int colocationId = GetReferenceTableColocationId();
*nDependTasks = 0;
if (colocationId == INVALID_COLOCATION_ID)
{
/* we have no reference table yet. */
return 0;
}
/*
* Most of the time this function should result in a conclusion where we do not need
* to copy any reference tables. To prevent excessive locking the majority of the time
* we run our precondition checks first with a lower lock. If, after checking with the
* lower lock, that we might need to copy reference tables we check with a more
* aggressive and self conflicting lock. It is important to be self conflicting in the
* second run to make sure that two concurrent calls to this routine will actually not
* run concurrently after the initial check.
*
* If after two iterations of precondition checks we still find the need for copying
* reference tables we exit the loop with all locks held. This will prevent concurrent
* DROP TABLE and create_reference_table calls so that the list of reference tables we
* operate on are stable.
*
*
* Since the changes to the reference table placements are made via loopback
* connections we release the locks held at the end of this function. Due to Citus
* only running transactions in READ COMMITTED mode we can be sure that other
* transactions correctly find the metadata entries.
*/
LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock };
for (int lockmodeIndex = 0; lockmodeIndex < lengthof(lockmodes); lockmodeIndex++)
{
LockColocationId(colocationId, lockmodes[lockmodeIndex]);
referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
if (referenceTableIdList == NIL)
{
/*
* No reference tables exist, make sure that any locks obtained earlier are
* released. It will probably not matter, but we release the locks in the
* reverse order we obtained them in.
*/
for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0;
releaseLockmodeIndex--)
{
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
}
return 0;
}
Oid referenceTableId = linitial_oid(referenceTableIdList);
referenceTableName = get_rel_name(referenceTableId);
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
if (list_length(shardIntervalList) == 0)
{
/* check for corrupt metadata */
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
referenceTableName)));
}
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
shardId = shardInterval->shardId;
/*
* We only take an access share lock, otherwise we'll hold up citus_add_node.
* In case of create_reference_table() where we don't want concurrent writes
* to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
*/
newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock);
if (list_length(newWorkersList) == 0)
{
/*
* All workers already have a copy of the reference tables, make sure that
* any locks obtained earlier are released. It will probably not matter, but
* we release the locks in the reverse order we obtained them in.
*/
for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0;
releaseLockmodeIndex--)
{
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
}
return 0;
}
}
/*
* citus_copy_shard_placement triggers metadata sync-up, which tries to
* acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement
* in a separate connection. If we have modified pg_dist_node in the
* current backend, this will cause a deadlock.
*/
if (TransactionModifiedNodeMetadata)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate reference tables in a transaction "
"that modified node metadata")));
}
/*
* Modifications to reference tables in current transaction are not visible
* to citus_copy_shard_placement, since it is done in a separate backend.
*/
if (AnyRelationsModifiedInTransaction(referenceTableIdList))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate reference tables in a transaction "
"that modified a reference table")));
}
bool missingOk = false;
ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk);
if (sourceShardPlacement == NULL)
{
/* check for corrupt metadata */
ereport(ERROR, (errmsg("reference table shard "
UINT64_FORMAT
" does not have an active shard placement",
shardId)));
}
WorkerNode *newWorkerNode = NULL;
BackgroundTask *task = NULL;
StringInfoData buf = { 0 };
initStringInfo(&buf);
List *depTasksList = NIL;
const char *transferModeString =
transferMode == TRANSFER_MODE_BLOCK_WRITES ? "block_writes" :
transferMode == TRANSFER_MODE_FORCE_LOGICAL ? "force_logical" :
"auto";
foreach_declared_ptr(newWorkerNode, newWorkersList)
{
ereport(DEBUG2, (errmsg("replicating reference table '%s' to %s:%d ...",
referenceTableName, newWorkerNode->workerName,
newWorkerNode->workerPort)));
Oid relationId = InvalidOid;
List *nodeTasksList = NIL;
HTAB *shardTaskMap = CreateSimpleHashWithNameAndSize(uint64,
ShardTaskEntry,
"Shard_Task_Map",
list_length(
referenceTableIdList));
foreach_declared_oid(relationId, referenceTableIdList)
{
referenceTableName = get_rel_name(relationId);
List *shardIntervalList = LoadShardIntervalList(relationId);
if (list_length(shardIntervalList) != 1)
{
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
referenceTableName)));
}
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
shardId = shardInterval->shardId;
resetStringInfo(&buf);
uint32 shardTransferFlags = SHARD_TRANSFER_SINGLE_SHARD_ONLY |
SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS;
/* Temporary hack until we get background task config support PR */
appendStringInfo(&buf,
"SET LOCAL application_name TO '%s%ld';",
CITUS_REBALANCER_APPLICATION_NAME_PREFIX,
GetGlobalPID());
/*
* In first step just create and load data in the shards but defer the
* creation of the shard relationships to the next step.
* The reason we want to defer the creation of the shard relationships is that
* we want to make sure that all the parallel shard copy task are finished
* before we create the relationships. Otherwise we might end up with
* a situation where the dependent-shard task is still running and trying to
* create the shard relationships will result in ERROR.
*/
appendStringInfo(&buf,
"SELECT "
"citus_internal.citus_internal_copy_single_shard_placement"
"(%ld,%u,%u,%u,%s)",
shardId,
sourceShardPlacement->nodeId,
newWorkerNode->nodeId,
shardTransferFlags,
quote_literal_cstr(transferModeString));
ereport(DEBUG2,
(errmsg("replicating reference table '%s' to %s:%d ... QUERY= %s",
referenceTableName, newWorkerNode->workerName,
newWorkerNode->workerPort, buf.data)));
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
List *relatedRelations = list_concat(cacheEntry->
referencedRelationsViaForeignKey,
cacheEntry->
referencingRelationsViaForeignKey);
List *dependencyTaskList = NIL;
Oid relatedRelationId = InvalidOid;
foreach_declared_oid(relatedRelationId, relatedRelations)
{
if (!list_member_oid(referenceTableIdList, relatedRelationId))
{
continue;
}
List *relatedShardIntervalList = LoadShardIntervalList(
relatedRelationId);
ShardInterval *relatedShardInterval = (ShardInterval *) linitial(
relatedShardIntervalList);
uint64 relatedShardId = relatedShardInterval->shardId;
bool taskFound = false;
ShardTaskEntry *taskEntry = hash_search(shardTaskMap, &relatedShardId,
HASH_FIND, &taskFound);
if (taskFound)
{
dependencyTaskList = lappend(dependencyTaskList, taskEntry);
}
}
int nDepends = list_length(dependencyTaskList);
int64 *dependsArray = NULL;
if (nDepends > 0)
{
dependsArray = (int64 *) palloc(sizeof(int64) * nDepends);
int i = 0;
ListCell *lc;
foreach(lc, dependencyTaskList)
{
dependsArray[i++] = ((ShardTaskEntry *) lfirst(lc))->taskId;
}
}
int32 nodesInvolved[2] = { 0 };
nodesInvolved[0] = sourceShardPlacement->nodeId;
nodesInvolved[1] = newWorkerNode->nodeId;
task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data,
nDepends,
dependsArray, 2,
nodesInvolved);
bool found = false;
ShardTaskEntry *taskEntry = hash_search(shardTaskMap, &shardId, HASH_ENTER,
&found);
if (!found)
{
taskEntry->taskId = task->taskid;
ereport(DEBUG2,
(errmsg(
"Added hash entry in scheduled task hash "
"with task %ld for shard %ld",
task->taskid, shardId)));
}
else
{
ereport(ERROR, (errmsg("failed to record task dependency for shard %ld",
shardId)));
}
nodeTasksList = lappend(nodeTasksList, task);
if (dependsArray)
{
pfree(dependsArray);
}
list_free(dependencyTaskList);
}
if (list_length(nodeTasksList) > 0)
{
int nDepends = list_length(nodeTasksList);
int32 nodesInvolved[2] = { 0 };
nodesInvolved[0] = sourceShardPlacement->nodeId;
nodesInvolved[1] = newWorkerNode->nodeId;
int64 *dependsArray = palloc(sizeof(int64) * list_length(nodeTasksList));
int idx = 0;
foreach_declared_ptr(task, nodeTasksList)
{
dependsArray[idx++] = task->taskid;
}
resetStringInfo(&buf);
uint32 shardTransferFlags = SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY;
appendStringInfo(&buf,
"SET LOCAL application_name TO '%s%ld';\n",
CITUS_REBALANCER_APPLICATION_NAME_PREFIX,
GetGlobalPID());
appendStringInfo(&buf,
"SELECT "
"citus_internal.citus_internal_copy_single_shard_placement"
"(%ld,%u,%u,%u,%s)",
shardId,
sourceShardPlacement->nodeId,
newWorkerNode->nodeId,
shardTransferFlags,
quote_literal_cstr(transferModeString));
ereport(DEBUG2,
(errmsg(
"creating relations for reference table '%s' on %s:%d ... "
"QUERY= %s",
referenceTableName, newWorkerNode->workerName,
newWorkerNode->workerPort, buf.data)));
task = ScheduleBackgroundTask(jobId, CitusExtensionOwner(), buf.data,
nDepends,
dependsArray, 2,
nodesInvolved);
depTasksList = lappend(depTasksList, task);
pfree(dependsArray);
list_free(nodeTasksList);
nodeTasksList = NIL;
}
hash_destroy(shardTaskMap);
}
/*
* compute a dependent task list array to be used to indicate the completion of all
* reference table shards copy, so that we can start with distributed shard copy
*/
if (list_length(depTasksList) > 0)
{
*nDependTasks = list_length(depTasksList);
dependsTaskArray = palloc(sizeof(int64) * *nDependTasks);
int idx = 0;
foreach_declared_ptr(task, depTasksList)
{
dependsTaskArray[idx++] = task->taskid;
}
list_free(depTasksList);
}
/*
* Since reference tables have been copied via a loopback connection we do not have to
* retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure
@ -291,6 +679,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
{
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
}
return dependsTaskArray;
}
@ -614,8 +1003,9 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly)
* connections.
*/
void
DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(
MetadataSyncContext *context, int32 groupId, bool localOnly)
DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(MetadataSyncContext *
context, int32 groupId,
bool localOnly)
{
List *replicatedPlacementListForGroup = ReplicatedPlacementsForNodeGroup(groupId);

View File

@ -131,7 +131,8 @@ typedef enum LogicalRepType
extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
int sourceNodePort, char *targetNodeName,
int targetNodePort);
int targetNodePort,
bool skipInterShardRelationshipCreation);
extern void ConflictWithIsolationTestingBeforeCopy(void);
extern void ConflictWithIsolationTestingAfterCopy(void);
@ -177,7 +178,8 @@ extern void CompleteNonBlockingShardTransfer(List *shardList,
HTAB *publicationInfoHash,
List *logicalRepTargetList,
HTAB *groupedLogicalRepTargetsHash,
LogicalRepType type);
LogicalRepType type,
bool skipInterShardRelationshipCreation);
extern void CreateUncheckedForeignKeyConstraints(List *logicalRepTargetList);
extern void CreatePartitioningHierarchy(List *logicalRepTargetList);

View File

@ -21,6 +21,11 @@
extern void EnsureReferenceTablesExistOnAllNodes(void);
extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode);
extern int64 * ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId,
char
transferMode,
int *
nDependTasks);
extern bool HasNodesWithMissingReferenceTables(List **referenceTableList);
extern uint32 CreateReferenceTableColocationId(void);
extern uint32 GetReferenceTableColocationId(void);

View File

@ -44,10 +44,11 @@ typedef enum AdvisoryLocktagClass
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9,
ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID = 10,
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12,
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12, /* Not used anymore */
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13,
ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK = 14,
ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15
ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION = 15,
ADV_LOCKTAG_CLASS_CITUS_SHARD_MOVE = 16
} AdvisoryLocktagClass;
/* CitusOperations has constants for citus operations */
@ -84,6 +85,16 @@ typedef enum CitusOperations
(uint32) (shardid), \
ADV_LOCKTAG_CLASS_CITUS_SHARD)
/* advisory lock for citus shard move/copy operations,
* also it has the database hardcoded to MyDatabaseId,
* to ensure the locks are local to each database */
#define SET_LOCKTAG_SHARD_MOVE(tag, shardid) \
SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \
(uint32) ((shardid) >> 32), \
(uint32) (shardid), \
ADV_LOCKTAG_CLASS_CITUS_SHARD_MOVE)
/* reuse advisory lock, but with different, unused field 4 (7)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */
@ -124,16 +135,6 @@ typedef enum CitusOperations
(uint32) operationId, \
ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID)
/* reuse advisory lock, but with different, unused field 4 (12)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */
#define SET_LOCKTAG_LOGICAL_REPLICATION(tag) \
SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \
(uint32) 0, \
(uint32) 0, \
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION)
/* reuse advisory lock, but with different, unused field 4 (14)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */

View File

@ -23,10 +23,54 @@ typedef enum
SHARD_TRANSFER_COPY = 2
} ShardTransferType;
/*
* ShardTransferOperationMode is used to pass flags to the shard transfer
* function. The flags are used to control the behavior of the transfer
* function.
* Currently, optionFlags are only used to customize reference table transfers.
* For distributed tables, optionFlags should always be set to 0.
*/
typedef enum
{
/*
* This flag instructs the transfer function to only transfer single shard
* rather than transfer all the colocated shards for the shard interval.
* Using this flag mean we might break the colocated shard
* relationship on the source node. So this is only usefull when setting up
* the new node and we are sure that the node would not be used until we have
* transfered all the shards.
* The reason we need this flag is that we want to be able to transfer
* colocated shards in parallel and for now it is only used for the reference
* table shards.
* Finally if you are using this flag, you should also use consider defering
* the creation of the relationships on the source node until all colocated
* shards are transfered (see: SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS).
*/
SHARD_TRANSFER_SINGLE_SHARD_ONLY = 1 << 0,
/* With this flag the shard transfer function does not create any constrainsts
* or foreign relations defined on the shard, This can be used to defer the
* creation of the relationships until all the shards are transfered.
* This is usefull when we are transfering colocated shards in parallel and
* we want to avoid the creation of the relationships on the source node
* until all the shards are transfered.
*/
SHARD_TRANSFER_SKIP_CREATE_RELATIONSHIPS = 1 << 1,
/* This flag is used to indicate that the shard transfer function should
* only create the relationships on the target node and not transfer any data.
* This is can be used to create the relationships that were defered
* during the transfering of shards.
*/
SHARD_TRANSFER_CREATE_RELATIONSHIPS_ONLY = 1 << 2
} ShardTransferOperationMode;
extern void TransferShards(int64 shardId,
char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort,
char shardReplicationMode, ShardTransferType transferType);
char shardReplicationMode, ShardTransferType transferType,
uint32 optionFlags);
extern uint64 ShardListSizeInBytes(List *colocatedShardList,
char *workerNodeName, uint32 workerNodePort);
extern void ErrorIfMoveUnsupportedTableType(Oid relationId);

View File

@ -140,6 +140,12 @@ DEPS = {
"background_rebalance_parallel": TestDeps(
None, ["multi_test_helpers", "multi_cluster_management"], worker_count=6
),
"background_rebalance_parallel_reference_tables": TestDeps(
None,
["multi_test_helpers", "multi_cluster_management"],
repeatable=False,
worker_count=6,
),
"function_propagation": TestDeps("minimal_schedule"),
"citus_shards": TestDeps("minimal_schedule"),
"grant_on_foreign_server_propagation": TestDeps("minimal_schedule"),

View File

@ -534,7 +534,7 @@ FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
17779 | 1013 | done | {50,56}
17779 | 1014 | running | {50,57}
17779 | 1014 | done | {50,57}
17779 | 1015 | running | {50,56}
17779 | 1016 | blocked | {50,57}
17779 | 1017 | runnable | {50,56}

View File

@ -0,0 +1,561 @@
--
-- BACKGROUND_REBALANCE_PARALLEL_REFERENCE_TABLES
--
-- Test to check if the background tasks scheduled for moving reference tables
-- shards in parallel by the background rebalancer have the correct dependencies
--
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO ERROR;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50;
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 5;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Create reference tables with primary-foreign key relationships
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL REFERENCES customers(id),
order_date DATE NOT NULL DEFAULT CURRENT_DATE
);
CREATE TABLE order_items (
id SERIAL PRIMARY KEY,
order_id INTEGER NOT NULL REFERENCES orders(id),
product_name TEXT NOT NULL,
quantity INTEGER NOT NULL,
price NUMERIC(10, 2) NOT NULL
);
SELECT create_reference_table('customers');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('orders');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('order_items');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- INSERT SOME DATA
-- Insert 10 customers
INSERT INTO customers (name, email)
SELECT
'Customer ' || i,
'customer' || i || '@example.com'
FROM generate_series(1, 10) AS i;
-- Insert 30 orders: each customer gets 3 orders
INSERT INTO orders (customer_id, order_date)
SELECT
(i % 10) + 1, -- customer_id between 1 and 10
CURRENT_DATE - (i % 7)
FROM generate_series(1, 30) AS i;
-- Insert 90 order_items: each order has 3 items
INSERT INTO order_items (order_id, product_name, quantity, price)
SELECT
(i % 30) + 1, -- order_id between 1 and 30
'Product ' || (i % 5 + 1),
(i % 10) + 1,
round((random() * 100 + 10)::numeric, 2)
FROM generate_series(1, 90) AS i;
SELECT
c.id AS customer_id,
c.name AS customer_name,
c.email AS customer_email,
COUNT(oi.id) AS total_order_items
FROM customers c
JOIN orders o
ON c.id = o.customer_id
JOIN order_items oi
ON o.id = oi.order_id
GROUP BY c.id, c.name, c.email
ORDER BY c.id;
customer_id | customer_name | customer_email | total_order_items
---------------------------------------------------------------------
1 | Customer 1 | customer1@example.com | 9
2 | Customer 2 | customer2@example.com | 9
3 | Customer 3 | customer3@example.com | 9
4 | Customer 4 | customer4@example.com | 9
5 | Customer 5 | customer5@example.com | 9
6 | Customer 6 | customer6@example.com | 9
7 | Customer 7 | customer7@example.com | 9
8 | Customer 8 | customer8@example.com | 9
9 | Customer 9 | customer9@example.com | 9
10 | Customer 10 | customer10@example.com | 9
(10 rows)
-- Add two new nodes so that we can rebalance
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_4_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid;
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
table1_colg1 | 85674000 | 0 | localhost | 57637 | localhost | 57640
table1_colg1 | 85674001 | 0 | localhost | 57638 | localhost | 57639
table2_colg1 | 85674004 | 0 | localhost | 57637 | localhost | 57640
table2_colg1 | 85674005 | 0 | localhost | 57638 | localhost | 57639
table1_colg2 | 85674008 | 0 | localhost | 57637 | localhost | 57640
table1_colg2 | 85674009 | 0 | localhost | 57638 | localhost | 57639
table2_colg2 | 85674012 | 0 | localhost | 57637 | localhost | 57640
table2_colg2 | 85674013 | 0 | localhost | 57638 | localhost | 57639
table1_colg3 | 85674016 | 0 | localhost | 57637 | localhost | 57640
table1_colg3 | 85674017 | 0 | localhost | 57638 | localhost | 57639
table2_colg3 | 85674020 | 0 | localhost | 57637 | localhost | 57640
table2_colg3 | 85674021 | 0 | localhost | 57638 | localhost | 57639
(12 rows)
SET client_min_messages TO DEBUG1;
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(
shard_transfer_mode := 'force_logical',
parallel_transfer_colocated_shards := true,
parallel_transfer_reference_tables := true) \gset
DEBUG: skipping child tables for relation named: table1_colg1
DEBUG: skipping child tables for relation named: table1_colg1
DEBUG: skipping child tables for relation named: table1_colg1
DEBUG: skipping child tables for relation named: table1_colg1
DEBUG: skipping child tables for relation named: table1_colg2
DEBUG: skipping child tables for relation named: table1_colg2
DEBUG: skipping child tables for relation named: table1_colg2
DEBUG: skipping child tables for relation named: table1_colg2
DEBUG: skipping child tables for relation named: table1_colg3
DEBUG: skipping child tables for relation named: table1_colg3
DEBUG: skipping child tables for relation named: table1_colg3
DEBUG: skipping child tables for relation named: table1_colg3
NOTICE: Scheduled 6 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
SET client_min_messages TO ERROR;
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
-- see the dependencies of the tasks scheduled by the background rebalancer
SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on;
job_id | task_id | depends_on
---------------------------------------------------------------------
17777 | 1001 | 1000
17777 | 1002 | 1000
17777 | 1002 | 1001
17777 | 1003 | 1000
17777 | 1003 | 1001
17777 | 1003 | 1002
17777 | 1005 | 1004
17777 | 1006 | 1004
17777 | 1006 | 1005
17777 | 1007 | 1004
17777 | 1007 | 1005
17777 | 1007 | 1006
17777 | 1008 | 1003
17777 | 1008 | 1007
17777 | 1009 | 1003
17777 | 1009 | 1007
17777 | 1010 | 1003
17777 | 1010 | 1007
17777 | 1011 | 1003
17777 | 1011 | 1007
17777 | 1012 | 1003
17777 | 1012 | 1007
17777 | 1013 | 1003
17777 | 1013 | 1007
(24 rows)
-- Temporary hack to eliminate SET application name from command until we get the
-- background job enhancement done.
SELECT D.task_id,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1001 | 85674025 | 1000 | 85674024
1002 | 85674026 | 1000 | 85674024
1002 | 85674026 | 1001 | 85674025
1003 | 85674026 | 1000 | 85674024
1003 | 85674026 | 1001 | 85674025
1003 | 85674026 | 1002 | 85674026
1005 | 85674025 | 1004 | 85674024
1006 | 85674026 | 1004 | 85674024
1006 | 85674026 | 1005 | 85674025
1007 | 85674026 | 1004 | 85674024
1007 | 85674026 | 1005 | 85674025
1007 | 85674026 | 1006 | 85674026
1008 | 85674001 | 1003 | 85674026
1008 | 85674001 | 1007 | 85674026
1009 | 85674000 | 1003 | 85674026
1009 | 85674000 | 1007 | 85674026
1010 | 85674009 | 1003 | 85674026
1010 | 85674009 | 1007 | 85674026
1011 | 85674008 | 1003 | 85674026
1011 | 85674008 | 1007 | 85674026
1012 | 85674017 | 1003 | 85674026
1012 | 85674017 | 1007 | 85674026
1013 | 85674016 | 1003 | 85674026
1013 | 85674016 | 1007 | 85674026
(24 rows)
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
-- Drain worker_3 so that we can move only one colocation group to worker_3
-- to create an unbalance that would cause parallel rebalancing.
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
-- Move all the shards of Colocation group 3 to worker_3.
SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port
ORDER BY
shardid;
master_move_shard_placement
---------------------------------------------------------------------
(4 rows)
CALL citus_cleanup_orphaned_resources();
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_5_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_6_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid;
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
table1_colg1 | 85674001 | 0 | localhost | 57637 | localhost | 57642
table1_colg1 | 85674003 | 0 | localhost | 57638 | localhost | 57641
table2_colg1 | 85674005 | 0 | localhost | 57637 | localhost | 57642
table2_colg1 | 85674007 | 0 | localhost | 57638 | localhost | 57641
table1_colg3 | 85674016 | 0 | localhost | 57639 | localhost | 57641
table1_colg3 | 85674017 | 0 | localhost | 57639 | localhost | 57642
table2_colg3 | 85674020 | 0 | localhost | 57639 | localhost | 57641
table2_colg3 | 85674021 | 0 | localhost | 57639 | localhost | 57642
(8 rows)
SET client_min_messages TO DEBUG1;
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(
shard_transfer_mode := 'block_writes',
parallel_transfer_colocated_shards := true,
parallel_transfer_reference_tables := true) \gset
DEBUG: skipping child tables for relation named: table1_colg3
DEBUG: skipping child tables for relation named: table1_colg1
DEBUG: skipping child tables for relation named: table1_colg1
DEBUG: skipping child tables for relation named: table1_colg1
DEBUG: skipping child tables for relation named: table1_colg2
DEBUG: skipping child tables for relation named: table1_colg3
DEBUG: skipping child tables for relation named: table1_colg2
DEBUG: skipping child tables for relation named: table1_colg2
DEBUG: skipping child tables for relation named: table1_colg3
DEBUG: skipping child tables for relation named: table1_colg3
DEBUG: skipping child tables for relation named: table1_colg1
DEBUG: skipping child tables for relation named: table1_colg2
NOTICE: Scheduled 4 moves as job xxx
DETAIL: Rebalance scheduled as background job
HINT: To monitor progress, run: SELECT * FROM citus_rebalance_status();
SET client_min_messages TO ERROR;
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
-- see the dependencies of the tasks scheduled by the background rebalancer
SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on;
job_id | task_id | depends_on
---------------------------------------------------------------------
17778 | 1015 | 1014
17778 | 1016 | 1014
17778 | 1016 | 1015
17778 | 1017 | 1014
17778 | 1017 | 1015
17778 | 1017 | 1016
17778 | 1019 | 1018
17778 | 1020 | 1018
17778 | 1020 | 1019
17778 | 1021 | 1018
17778 | 1021 | 1019
17778 | 1021 | 1020
17778 | 1022 | 1017
17778 | 1022 | 1021
17778 | 1023 | 1017
17778 | 1023 | 1021
17778 | 1024 | 1017
17778 | 1024 | 1021
17778 | 1025 | 1017
17778 | 1025 | 1021
(20 rows)
-- Temporary hack to eliminate SET application name from command until we get the
-- background job enhancement done.
SELECT D.task_id,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1015 | 85674025 | 1014 | 85674024
1016 | 85674026 | 1014 | 85674024
1016 | 85674026 | 1015 | 85674025
1017 | 85674026 | 1014 | 85674024
1017 | 85674026 | 1015 | 85674025
1017 | 85674026 | 1016 | 85674026
1019 | 85674025 | 1018 | 85674024
1020 | 85674026 | 1018 | 85674024
1020 | 85674026 | 1019 | 85674025
1021 | 85674026 | 1018 | 85674024
1021 | 85674026 | 1019 | 85674025
1021 | 85674026 | 1020 | 85674026
1022 | 85674016 | 1017 | 85674026
1022 | 85674016 | 1021 | 85674026
1023 | 85674017 | 1017 | 85674026
1023 | 85674017 | 1021 | 85674026
1024 | 85674003 | 1017 | 85674026
1024 | 85674003 | 1021 | 85674026
1025 | 85674001 | 1017 | 85674026
1025 | 85674001 | 1021 | 85674026
(20 rows)
SELECT
c.id AS customer_id,
c.name AS customer_name,
c.email AS customer_email,
COUNT(oi.id) AS total_order_items
FROM customers c
JOIN orders o
ON c.id = o.customer_id
JOIN order_items oi
ON o.id = oi.order_id
GROUP BY c.id, c.name, c.email
ORDER BY c.id;
customer_id | customer_name | customer_email | total_order_items
---------------------------------------------------------------------
1 | Customer 1 | customer1@example.com | 9
2 | Customer 2 | customer2@example.com | 9
3 | Customer 3 | customer3@example.com | 9
4 | Customer 4 | customer4@example.com | 9
5 | Customer 5 | customer5@example.com | 9
6 | Customer 6 | customer6@example.com | 9
7 | Customer 7 | customer7@example.com | 9
8 | Customer 8 | customer8@example.com | 9
9 | Customer 9 | customer9@example.com | 9
10 | Customer 10 | customer10@example.com | 9
(10 rows)
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_3_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_4_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_5_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_6_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;

View File

@ -252,9 +252,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable)
job_id | task_id | status
---------------------------------------------------------------------
1450005 | 1450009 | running
1450005 | 1450010 | running
1450005 | 1450011 | running
1450005 | 1450009 | done
1450005 | 1450010 | done
1450005 | 1450011 | done
1450006 | 1450012 | running
1450007 | 1450013 | runnable
(5 rows)
@ -282,9 +282,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
ORDER BY job_id, task_id; -- show that last task is running
job_id | task_id | status
---------------------------------------------------------------------
1450005 | 1450009 | running
1450005 | 1450010 | running
1450005 | 1450011 | running
1450005 | 1450009 | done
1450005 | 1450010 | done
1450005 | 1450011 | done
1450006 | 1450012 | cancelled
1450007 | 1450013 | running
(5 rows)
@ -318,9 +318,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
ORDER BY job_id, task_id; -- show that multiple cancels worked
job_id | task_id | status
---------------------------------------------------------------------
1450005 | 1450009 | cancelled
1450005 | 1450010 | cancelled
1450005 | 1450011 | cancelled
1450005 | 1450009 | done
1450005 | 1450010 | done
1450005 | 1450011 | done
1450006 | 1450012 | cancelled
1450007 | 1450013 | cancelled
(5 rows)
@ -372,9 +372,9 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id; -- show that last task is not running but ready to run(runnable)
task_id | status
---------------------------------------------------------------------
1450014 | running
1450015 | running
1450016 | running
1450014 | done
1450015 | done
1450016 | done
1450017 | running
1450018 | runnable
(5 rows)
@ -397,9 +397,9 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id; -- show that last task is running
task_id | status
---------------------------------------------------------------------
1450014 | running
1450015 | running
1450016 | running
1450014 | done
1450015 | done
1450016 | done
1450017 | running
1450018 | running
(5 rows)
@ -445,9 +445,9 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id; -- show that all tasks are cancelled
task_id | status
---------------------------------------------------------------------
1450014 | cancelled
1450015 | cancelled
1450016 | cancelled
1450014 | done
1450015 | done
1450016 | done
1450017 | cancelled
1450018 | cancelled
(5 rows)
@ -825,15 +825,15 @@ SELECT pg_reload_conf();
-- if pg_cancel_backend is called on one of the running task PIDs
-- task doesn't restart because it's not allowed anymore by the limit.
-- node with id 1 can be used only once, unless there are previously running tasks
SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset
SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process
SELECT pid AS task_id7_pid FROM pg_dist_background_task WHERE task_id IN (:task_id7) \gset
SELECT pg_cancel_backend(:task_id7_pid); -- cancel task_id7 process
pg_cancel_backend
---------------------------------------------------------------------
t
(1 row)
-- task goes to only runnable state, not running anymore.
SELECT citus_task_wait(:task_id6, desired_status => 'runnable');
SELECT citus_task_wait(:task_id7, desired_status => 'runnable');
citus_task_wait
---------------------------------------------------------------------
@ -851,8 +851,8 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | runnable | {1,2}
1450017 | 1450031 | running | {1,3}
1450017 | 1450030 | running | {1,2}
1450017 | 1450031 | runnable | {1,3}
1450017 | 1450032 | running | {1,4}
(8 rows)
@ -868,7 +868,7 @@ SELECT citus_task_wait(:task_id8, desired_status => 'done');
(1 row)
SELECT citus_task_wait(:task_id6, desired_status => 'running');
SELECT citus_task_wait(:task_id6, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
@ -880,16 +880,16 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id;
job_id | task_id | status | nodes_involved
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | running | {1,2}
1450017 | 1450031 | done | {1,3}
1450017 | 1450032 | done | {1,4}
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | done | {1,2}
1450017 | 1450031 | done | {1,3}
1450017 | 1450032 | done | {1,4}
(8 rows)
SELECT citus_job_cancel(:job_id1);

View File

@ -594,10 +594,15 @@ step s2-move-placement:
SELECT master_move_shard_placement(
get_shard_id_for_distribution_column('logical_replicate_placement', 4),
'localhost', 57637, 'localhost', 57638);
ERROR: could not acquire the lock required to move public.logical_replicate_placement
<waiting ...>
step s1-end:
COMMIT;
COMMIT;
step s2-move-placement: <... completed>
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
step s2-end:
COMMIT;

View File

@ -1638,9 +1638,12 @@ ALTER EXTENSION citus UPDATE TO '13.2-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) bigint |
function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision) |
| function citus_internal.citus_internal_copy_single_shard_placement(bigint,integer,integer,integer,citus.shard_transfer_mode) void
| function citus_rebalance_start(name,boolean,citus.shard_transfer_mode,boolean,boolean) bigint
| function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision)
(2 rows)
(5 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -30,7 +30,7 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass
AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value(anyelement)'
AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value_agg(anyelement,anyelement)'
ORDER BY 1;
description
description
---------------------------------------------------------------------
event trigger citus_cascade_to_partition
function alter_distributed_table(regclass,text,integer,text,boolean)
@ -86,6 +86,7 @@ ORDER BY 1;
function citus_internal.add_shard_metadata(regclass,bigint,"char",text,text)
function citus_internal.add_tenant_schema(oid,integer)
function citus_internal.adjust_local_clock_to_remote(cluster_clock)
function citus_internal.citus_internal_copy_single_shard_placement(bigint,integer,integer,integer,citus.shard_transfer_mode)
function citus_internal.database_command(text)
function citus_internal.delete_colocation_metadata(integer)
function citus_internal.delete_partition_metadata(regclass)
@ -156,7 +157,7 @@ ORDER BY 1;
function citus_pid_for_gpid(bigint)
function citus_prepare_pg_upgrade()
function citus_query_stats()
function citus_rebalance_start(name,boolean,citus.shard_transfer_mode)
function citus_rebalance_start(name,boolean,citus.shard_transfer_mode,boolean,boolean)
function citus_rebalance_status(boolean)
function citus_rebalance_stop()
function citus_rebalance_wait()
@ -394,6 +395,6 @@ ORDER BY 1;
view citus_tables
view pg_dist_shard_placement
view time_partitions
(362 rows)
(363 rows)
DROP TABLE extension_basic_types;

View File

@ -13,3 +13,4 @@ test: multi_colocated_shard_rebalance
test: cpu_priority
test: check_mx
test: citus_drain_node
test: background_rebalance_parallel_reference_tables

View File

@ -0,0 +1,267 @@
--
-- BACKGROUND_REBALANCE_PARALLEL_REFERENCE_TABLES
--
-- Test to check if the background tasks scheduled for moving reference tables
-- shards in parallel by the background rebalancer have the correct dependencies
--
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO ERROR;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50;
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 5;
SELECT pg_reload_conf();
-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1');
-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2');
-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3');
-- Create reference tables with primary-foreign key relationships
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL REFERENCES customers(id),
order_date DATE NOT NULL DEFAULT CURRENT_DATE
);
CREATE TABLE order_items (
id SERIAL PRIMARY KEY,
order_id INTEGER NOT NULL REFERENCES orders(id),
product_name TEXT NOT NULL,
quantity INTEGER NOT NULL,
price NUMERIC(10, 2) NOT NULL
);
SELECT create_reference_table('customers');
SELECT create_reference_table('orders');
SELECT create_reference_table('order_items');
-- INSERT SOME DATA
-- Insert 10 customers
INSERT INTO customers (name, email)
SELECT
'Customer ' || i,
'customer' || i || '@example.com'
FROM generate_series(1, 10) AS i;
-- Insert 30 orders: each customer gets 3 orders
INSERT INTO orders (customer_id, order_date)
SELECT
(i % 10) + 1, -- customer_id between 1 and 10
CURRENT_DATE - (i % 7)
FROM generate_series(1, 30) AS i;
-- Insert 90 order_items: each order has 3 items
INSERT INTO order_items (order_id, product_name, quantity, price)
SELECT
(i % 30) + 1, -- order_id between 1 and 30
'Product ' || (i % 5 + 1),
(i % 10) + 1,
round((random() * 100 + 10)::numeric, 2)
FROM generate_series(1, 90) AS i;
SELECT
c.id AS customer_id,
c.name AS customer_name,
c.email AS customer_email,
COUNT(oi.id) AS total_order_items
FROM customers c
JOIN orders o
ON c.id = o.customer_id
JOIN order_items oi
ON o.id = oi.order_id
GROUP BY c.id, c.name, c.email
ORDER BY c.id;
-- Add two new nodes so that we can rebalance
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
SELECT 1 FROM citus_add_node('localhost', :worker_4_port);
SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid;
SET client_min_messages TO DEBUG1;
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(
shard_transfer_mode := 'force_logical',
parallel_transfer_colocated_shards := true,
parallel_transfer_reference_tables := true) \gset
SET client_min_messages TO ERROR;
SELECT citus_rebalance_wait();
SELECT citus_rebalance_wait();
-- see the dependencies of the tasks scheduled by the background rebalancer
SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on;
-- Temporary hack to eliminate SET application name from command until we get the
-- background job enhancement done.
SELECT D.task_id,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
-- Drain worker_3 so that we can move only one colocation group to worker_3
-- to create an unbalance that would cause parallel rebalancing.
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true);
CALL citus_cleanup_orphaned_resources();
-- Move all the shards of Colocation group 3 to worker_3.
SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port
ORDER BY
shardid;
CALL citus_cleanup_orphaned_resources();
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true);
SELECT 1 FROM citus_add_node('localhost', :worker_5_port);
SELECT 1 FROM citus_add_node('localhost', :worker_6_port);
SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid;
SET client_min_messages TO DEBUG1;
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(
shard_transfer_mode := 'block_writes',
parallel_transfer_colocated_shards := true,
parallel_transfer_reference_tables := true) \gset
SET client_min_messages TO ERROR;
SELECT citus_rebalance_wait();
-- see the dependencies of the tasks scheduled by the background rebalancer
SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on;
-- Temporary hack to eliminate SET application name from command until we get the
-- background job enhancement done.
SELECT D.task_id,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
SELECT
c.id AS customer_id,
c.name AS customer_name,
c.email AS customer_email,
COUNT(oi.id) AS total_order_items
FROM customers c
JOIN orders o
ON c.id = o.customer_id
JOIN order_items oi
ON o.id = oi.order_id
GROUP BY c.id, c.name, c.email
ORDER BY c.id;
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
select citus_remove_node('localhost', :worker_3_port);
select citus_remove_node('localhost', :worker_4_port);
select citus_remove_node('localhost', :worker_5_port);
select citus_remove_node('localhost', :worker_6_port);
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;

View File

@ -345,11 +345,11 @@ SELECT pg_reload_conf();
-- if pg_cancel_backend is called on one of the running task PIDs
-- task doesn't restart because it's not allowed anymore by the limit.
-- node with id 1 can be used only once, unless there are previously running tasks
SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset
SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process
SELECT pid AS task_id7_pid FROM pg_dist_background_task WHERE task_id IN (:task_id7) \gset
SELECT pg_cancel_backend(:task_id7_pid); -- cancel task_id7 process
-- task goes to only runnable state, not running anymore.
SELECT citus_task_wait(:task_id6, desired_status => 'runnable');
SELECT citus_task_wait(:task_id7, desired_status => 'runnable');
-- show that cancelled task hasn't restarted because limit doesn't allow it
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
@ -359,7 +359,7 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
SELECT citus_task_wait(:task_id7, desired_status => 'done');
SELECT citus_task_wait(:task_id8, desired_status => 'done');
SELECT citus_task_wait(:task_id6, desired_status => 'running');
SELECT citus_task_wait(:task_id6, desired_status => 'done');
-- show that the 6th task has restarted only after both 6 and 7 are done
-- since we have a limit of 1 background task executor per node with id 1