Citus’ shard rebalancer has some key performance bottlenecks:
- Sequential Movement of Reference Tables:
Reference tables are often assumed to be small, but in real-world deployments,
they can grow significantly large. Previously, reference table shards were
transferred as a single unit, making the process monolithic and time-consuming.
- No Parallelism Within a Colocation Group:
Although Citus distributes data using colocated shards, shard movements
within the same colocation group were serialized.
In environments with hundreds of distributed tables colocated together,
this serialization significantly slowed down rebalance operations.
- Excessive Locking:
Rebalancer used restrictive locks and redundant logical replication guards,
further limiting concurrency.
The goal of this commit is to eliminate these inefficiencies and enable maximum
parallelism during rebalance, without compromising correctness or compatibility.
Parallelize shard rebalancing to reduce rebalance time
Feature Summary:
1. Parallel Reference Table Rebalancing
Each reference-table shard is now copied in its own background task.
Foreign key and other constraints are deferred until all shards are copied.
For single shard movement without considering colocation a new internal-only
UDF 'citus_internal_copy_single_shard_placement' is introduced to allow
single-shard copy/move operations.
Since this function is internal, we do not allow users to call it directly.
**Temporary Hack to Set Background Task Context**
Background tasks cannot currently set custom GUCs like application_name
before executing internal-only functions.
To work around this, we inject a SET LOCAL application_name TO
'citus_rebalancer ...' statement as a prefix in the task command.
This is a temporary hack to label internal tasks until proper GUC
injection support is added to the background task executor.
2. Changes in Locking Strategy
- Drop the leftover replication lock that previously serialized shard
moves performed via logical replication.
This lock was only needed when we used to drop and recreate the
subscriptions/publications before each move. Since Citus now removes
those objects later as part of the “unused distributed objects” cleanup,
shard moves via logical replication can safely run in parallel without
additional locking.
- Introduced a per-shard advisory lock to prevent concurrent operations
on the same shard while allowing maximum parallelism elsewhere.
- Change the lock mode in AcquirePlacementColocationLock from
ExclusiveLock to RowExclusiveLock to allow concurrent updates within
the same colocation group, while still preventing concurrent DDL operations.
3. citus_rebalance_start() enhancements
The citus_rebalance_start() function now accepts two new optional parameters:
- parallel_transfer_colocated_shards BOOLEAN DEFAULT false,
- parallel_transfer_reference_tables BOOLEAN DEFAULT false
This ensures backward compatibility by preserving the existing behavior
and avoiding any disruption to user expectations and when both are set
to true, the rebalancer operates with full parallelism.
Previous Rebalancer Behavior:
SELECT citus_rebalance_start(shard_transfer_mode := 'force_logical');
This would:
Start a single background task for replicating all reference tables
Then, move all shards serially, one at a time.
Task 1: replicate_reference_tables()
↓
Task 2: move_shard_1()
↓
Task 3: move_shard_2()
↓
Task 4: move_shard_3()
Slow and sequential. Reference table copy is a bottleneck.
Colocated shards must wait for each other.
New Parallel Rebalancer:
SELECT citus_rebalance_start(
shard_transfer_mode := 'force_logical',
parallel_transfer_colocated_shards := true,
parallel_transfer_reference_tables := true
);
This would:
- Schedule independent background tasks for each reference-table shard.
- Move colocated shards in parallel, while still maintaining
dependency order.
- Defer constraint application until all reference shards are in place.
Task 1: copy_ref_shard_1()
Task 2: copy_ref_shard_2()
Task 3: copy_ref_shard_3()
→ Task 4: apply_constraints()
↓
Task 5: copy_shard_1()
Task 6: copy_shard_2()
Task 7: copy_shard_3()
↓
Task 8-10: move_shard_1..3()
Each operation is scheduled independently and can run as
soon as dependencies are satisfied.
This change adds a script to programatically group all includes in a
specific order. The script was used as a one time invocation to group
and sort all includes throught our formatted code. The grouping is as
follows:
- System includes (eg. `#include<...>`)
- Postgres.h (eg. `#include "postgres.h"`)
- Toplevel imports from postgres, not contained in a directory (eg.
`#include "miscadmin.h"`)
- General postgres includes (eg . `#include "nodes/..."`)
- Toplevel citus includes, not contained in a directory (eg. `#include
"citus_verion.h"`)
- Columnar includes (eg. `#include "columnar/..."`)
- Distributed includes (eg. `#include "distributed/..."`)
Because it is quite hard to understand the difference between toplevel
citus includes and toplevel postgres includes it hardcodes the list of
toplevel citus includes. In the same manner it assumes anything not
prefixed with `columnar/` or `distributed/` as a postgres include.
The sorting/grouping is enforced by CI. Since we do so with our own
script there are not changes required in our uncrustify configuration.
Add citus_schema_move() that can be used to move tenant tables within a distributed
schema to another node. The function has two variations as simple wrappers around
citus_move_shard_placement() and citus_move_shard_placement_with_nodeid() respectively.
They pick a shard that belongs to the given tenant schema and resolve the source node
that contain the shards under given tenant schema. Hence their signatures are quite
similar to underlying functions:
```sql
-- citus_schema_move(), using target node name and node port
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move$$;
-- citus_schema_move(), using target node id
CREATE OR REPLACE FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace,
target_node_id integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_schema_move_with_nodeid$$;
```
DESCRIPTION: Refactor and unify shard move and copy functions
Shard move and copy functions share a lot of code in common. This PR
unifies these functions into one, along with some helper functions. To
preserve the current behavior, we'll introduce and use an enum
parameter, and hardcoded strings for producing error/warning messages.
DESCRIPTION: Drop `SHARD_STATE_TO_DELETE` and use the cleanup records
instead
Drops the shard state that is used to mark shards as orphaned. Now we
insert cleanup records into `pg_dist_cleanup` so "orphaned" shards will
be dropped either by maintenance daemon or internal cleanup calls. With
this PR, we make the "cleanup orphaned shards" functions to be no-op, as
they would not be needed anymore.
This PR includes some naming changes about placement functions. We don't
need functions that filter orphaned shards, as there will be no orphaned
shards anymore.
We will also be introducing a small script with this PR, for users with
orphaned shards. We'll basically delete the orphaned shard entries from
`pg_dist_placement` and insert cleanup records into `pg_dist_cleanup`
for each one of them, during Citus upgrade.
We also have a lot of flakiness fixes in this PR.
Co-authored-by: Jelte Fennema <github-tech@jeltef.nl>
DESCRIPTION: Adds status column to get_rebalance_progress()
Introduces a new column named `status` for the function
`get_rebalance_progress()`. For each ongoing shard move, this column
will reveal information about that shard move operation's current
status.
For now, candidate status messages could be one of the below.
* Not Started
* Setting Up
* Copying Data
* Catching Up
* Creating Constraints
* Final Catchup
* Creating Foreign Keys
* Completing
* Completed