Add test for parallel reference table transfer and adjust tests for concurrent locking behavior

This commit introduces the following changes:
- New test case to verify parallel transfer of reference table shards
  during rebalance.

- Adjustments to existing test cases to reflect the revised locking strategy,
  which now permits concurrent shard movements within a colocation group.
  These changes align test expectations with the new execution model,
  where multiple shard transfers can progress in parallel.
  Additionally, with the updated locking mechanism, shard transfer tasks
  that involve minimal or no data movement may complete almost
  instantly—transitioning from running to done in a split second.
  As a result, some test assertions were updated accordingly.
  Note that this does not alter any underlying functionality,
  only the timing of task state transitions.

- Few indent fixes are also part of the commit
- Move DROP citus_rebalance_start statement before its creation command.
- Fixing the path in downgrade script
muhammad/parallel_rebalance
Muhammad Usama 2025-08-05 23:46:10 +03:00
parent b86fde3520
commit 122a541c58
16 changed files with 775 additions and 67 deletions

View File

@ -3,6 +3,5 @@
-- bump version to 13.2-1
#include "udfs/worker_last_saved_explain_analyze/13.2-1.sql"
DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode);
#include "udfs/citus_rebalance_start/13.2-1.sql"
#include "udfs/citus_internal_copy_single_shard_placement/13.2-1.sql"

View File

@ -1,8 +1,7 @@
-- citus--13.2-1--13.1-1
DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean);
DROP FUNCTION IF EXISTS citus_internal.citus_internal_copy_single_shard_placement(bigint, integer, integer, integer, citus.shard_transfer_mode);
#include "udfs/citus_rebalance_start/11.1-1.sql"
-- downgrade version to 13.1-1
DROP FUNCTION IF EXISTS citus_internal.citus_internal_copy_single_shard_placement(bigint, integer, integer, integer, citus.shard_transfer_mode);
#include "../udfs/citus_rebalance_start/11.1-1.sql"
DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze();
#include "../udfs/worker_last_saved_explain_analyze/9.4-1.sql"

View File

@ -1,3 +1,5 @@
DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean);
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start(
rebalance_strategy name DEFAULT NULL,
drain_only boolean DEFAULT false,

View File

@ -1,3 +1,5 @@
DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode);
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start(
rebalance_strategy name DEFAULT NULL,
drain_only boolean DEFAULT false,

View File

@ -1,3 +1,5 @@
DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode);
CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start(
rebalance_strategy name DEFAULT NULL,
drain_only boolean DEFAULT false,

View File

@ -139,7 +139,6 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
* DROP TABLE and create_reference_table calls so that the list of reference tables we
* operate on are stable.
*
*
* Since the changes to the reference table placements are made via loopback
* connections we release the locks held at the end of this function. Due to Citus
* only running transactions in READ COMMITTED mode we can be sure that other
@ -289,9 +288,9 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
}
/*
* Since reference tables have been copied via a loopback connection we do not have to
* retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure
* that other transactions will find the reference tables copied.
* Since reference tables have been copied via a loopback connection we do not have
* to retain our locks. Since Citus only runs well in READ COMMITTED mode we can be
* sure that other transactions will find the reference tables copied.
* We have obtained and held multiple locks, here we unlock them all in the reverse
* order we have obtained them in.
*/
@ -306,8 +305,8 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
/*
* ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes is essentially a
* twin of EnsureReferenceTablesExistOnAllNodesExtended. The difference is instead of
* copying the missing tables on to the worker nodes this function creates the background tasks
* for each required copy operation and schedule it in the background job.
* copying the missing tables on to the worker nodes this function creates the background
* tasks for each required copy operation and schedule it in the background job.
* Another difference is that instead of moving all the colocated shards sequencially
* this function creates a seperate background task for each shard, even when the shards
* are part of same colocated shard group.
@ -502,8 +501,8 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr
GetGlobalPID());
/*
* In first step just create and load data in the shards but defer the creation
* of the shard relationships to the next step.
* In first step just create and load data in the shards but defer the
* creation of the shard relationships to the next step.
* The reason we want to defer the creation of the shard relationships is that
* we want to make sure that all the parallel shard copy task are finished
* before we create the relationships. Otherwise we might end up with
@ -511,7 +510,9 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr
* create the shard relationships will result in ERROR.
*/
appendStringInfo(&buf,
"SELECT citus_internal.citus_internal_copy_single_shard_placement(%ld,%u,%u,%u,%s)",
"SELECT "
"citus_internal.citus_internal_copy_single_shard_placement"
"(%ld,%u,%u,%u,%s)",
shardId,
sourceShardPlacement->nodeId,
newWorkerNode->nodeId,
@ -582,7 +583,8 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr
taskEntry->taskId = task->taskid;
ereport(DEBUG2,
(errmsg(
"Added hash entry in scheduled task hash with task %ld for shard %ld",
"Added hash entry in scheduled task hash "
"with task %ld for shard %ld",
task->taskid, shardId)));
}
else
@ -618,7 +620,9 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr
CITUS_REBALANCER_APPLICATION_NAME_PREFIX,
GetGlobalPID());
appendStringInfo(&buf,
"SELECT citus_internal.citus_internal_copy_single_shard_placement(%ld,%u,%u,%u,%s)",
"SELECT "
"citus_internal.citus_internal_copy_single_shard_placement"
"(%ld,%u,%u,%u,%s)",
shardId,
sourceShardPlacement->nodeId,
newWorkerNode->nodeId,
@ -627,7 +631,8 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr
ereport(DEBUG2,
(errmsg(
"creating relations for reference table '%s' on %s:%d ... QUERY= %s",
"creating relations for reference table '%s' on %s:%d ... "
"QUERY= %s",
referenceTableName, newWorkerNode->workerName,
newWorkerNode->workerPort, buf.data)));

View File

@ -140,6 +140,10 @@ DEPS = {
"background_rebalance_parallel": TestDeps(
None, ["multi_test_helpers", "multi_cluster_management"], worker_count=6
),
"background_rebalance_parallel_reference_tables": TestDeps(
None, ["multi_test_helpers", "multi_cluster_management"],
repeatable=False, worker_count=6
),
"function_propagation": TestDeps("minimal_schedule"),
"citus_shards": TestDeps("minimal_schedule"),
"grant_on_foreign_server_propagation": TestDeps("minimal_schedule"),

View File

@ -534,7 +534,7 @@ FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id;
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
17779 | 1013 | done | {50,56}
17779 | 1014 | running | {50,57}
17779 | 1014 | done | {50,57}
17779 | 1015 | running | {50,56}
17779 | 1016 | blocked | {50,57}
17779 | 1017 | runnable | {50,56}

View File

@ -0,0 +1,457 @@
--
-- BACKGROUND_REBALANCE_PARALLEL_REFERENCE_TABLES
--
-- Test to check if the background tasks scheduled for moving reference tables
-- shards in parallel by the background rebalancer have the correct dependencies
--
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO ERROR;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50;
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 5;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Create reference tables with primary-foreign key relationships
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL REFERENCES customers(id),
order_date DATE NOT NULL DEFAULT CURRENT_DATE
);
CREATE TABLE order_items (
id SERIAL PRIMARY KEY,
order_id INTEGER NOT NULL REFERENCES orders(id),
product_name TEXT NOT NULL,
quantity INTEGER NOT NULL,
price NUMERIC(10, 2) NOT NULL
);
SELECT create_reference_table('customers');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('orders');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('order_items');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- INSERT SOME DATA
-- Insert 10 customers
INSERT INTO customers (name, email)
SELECT
'Customer ' || i,
'customer' || i || '@example.com'
FROM generate_series(1, 10) AS i;
-- Insert 30 orders: each customer gets 3 orders
INSERT INTO orders (customer_id, order_date)
SELECT
(i % 10) + 1, -- customer_id between 1 and 10
CURRENT_DATE - (i % 7)
FROM generate_series(1, 30) AS i;
-- Insert 90 order_items: each order has 3 items
INSERT INTO order_items (order_id, product_name, quantity, price)
SELECT
(i % 30) + 1, -- order_id between 1 and 30
'Product ' || (i % 5 + 1),
(i % 10) + 1,
round((random() * 100 + 10)::numeric, 2)
FROM generate_series(1, 90) AS i;
-- Add two new nodes so that we can rebalance
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_4_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid;
table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport
---------------------------------------------------------------------
table1_colg1 | 85674000 | 0 | localhost | 57637 | localhost | 57640
table1_colg1 | 85674001 | 0 | localhost | 57638 | localhost | 57639
table2_colg1 | 85674004 | 0 | localhost | 57637 | localhost | 57640
table2_colg1 | 85674005 | 0 | localhost | 57638 | localhost | 57639
table1_colg2 | 85674008 | 0 | localhost | 57637 | localhost | 57640
table1_colg2 | 85674009 | 0 | localhost | 57638 | localhost | 57639
table2_colg2 | 85674012 | 0 | localhost | 57637 | localhost | 57640
table2_colg2 | 85674013 | 0 | localhost | 57638 | localhost | 57639
table1_colg3 | 85674016 | 0 | localhost | 57637 | localhost | 57640
table1_colg3 | 85674017 | 0 | localhost | 57638 | localhost | 57639
table2_colg3 | 85674020 | 0 | localhost | 57637 | localhost | 57640
table2_colg3 | 85674021 | 0 | localhost | 57638 | localhost | 57639
(12 rows)
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(
shard_transfer_mode := 'force_logical',
parallel_transfer_colocated_shards := true,
parallel_transfer_reference_tables := true) \gset
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
-- see the dependencies of the tasks scheduled by the background rebalancer
SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on;
job_id | task_id | depends_on
---------------------------------------------------------------------
17777 | 1001 | 1000
17777 | 1002 | 1000
17777 | 1002 | 1001
17777 | 1003 | 1000
17777 | 1003 | 1001
17777 | 1003 | 1002
17777 | 1005 | 1004
17777 | 1006 | 1004
17777 | 1006 | 1005
17777 | 1007 | 1004
17777 | 1007 | 1005
17777 | 1007 | 1006
17777 | 1008 | 1003
17777 | 1008 | 1007
17777 | 1009 | 1003
17777 | 1009 | 1007
17777 | 1010 | 1003
17777 | 1010 | 1007
17777 | 1011 | 1003
17777 | 1011 | 1007
17777 | 1012 | 1003
17777 | 1012 | 1007
17777 | 1013 | 1003
17777 | 1013 | 1007
(24 rows)
-- Temporary hack to eliminate SET application name from command until we get the
-- background job enhancement done.
SELECT D.task_id,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1001 | 85674025 | 1000 | 85674024
1002 | 85674026 | 1000 | 85674024
1002 | 85674026 | 1001 | 85674025
1003 | 85674026 | 1000 | 85674024
1003 | 85674026 | 1001 | 85674025
1003 | 85674026 | 1002 | 85674026
1005 | 85674025 | 1004 | 85674024
1006 | 85674026 | 1004 | 85674024
1006 | 85674026 | 1005 | 85674025
1007 | 85674026 | 1004 | 85674024
1007 | 85674026 | 1005 | 85674025
1007 | 85674026 | 1006 | 85674026
1008 | 85674001 | 1003 | 85674026
1008 | 85674001 | 1007 | 85674026
1009 | 85674000 | 1003 | 85674026
1009 | 85674000 | 1007 | 85674026
1010 | 85674009 | 1003 | 85674026
1010 | 85674009 | 1007 | 85674026
1011 | 85674008 | 1003 | 85674026
1011 | 85674008 | 1007 | 85674026
1012 | 85674017 | 1003 | 85674026
1012 | 85674017 | 1007 | 85674026
1013 | 85674016 | 1003 | 85674026
1013 | 85674016 | 1007 | 85674026
(24 rows)
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
-- Drain worker_3 so that we can move only one colocation group to worker_3
-- to create an unbalance that would cause parallel rebalancing.
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
-- Move all the shards of Colocation group 3 to worker_3.
SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port
ORDER BY
shardid;
master_move_shard_placement
---------------------------------------------------------------------
(4 rows)
CALL citus_cleanup_orphaned_resources();
-- Activate and new nodes so that we can rebalance.
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_5_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_6_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(
shard_transfer_mode := 'block_writes',
parallel_transfer_colocated_shards := true,
parallel_transfer_reference_tables := true) \gset
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------
(1 row)
-- see the dependencies of the tasks scheduled by the background rebalancer
SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on;
job_id | task_id | depends_on
---------------------------------------------------------------------
17778 | 1015 | 1014
17778 | 1016 | 1014
17778 | 1016 | 1015
17778 | 1017 | 1014
17778 | 1017 | 1015
17778 | 1017 | 1016
17778 | 1019 | 1018
17778 | 1020 | 1018
17778 | 1020 | 1019
17778 | 1021 | 1018
17778 | 1021 | 1019
17778 | 1021 | 1020
17778 | 1022 | 1017
17778 | 1022 | 1021
17778 | 1023 | 1017
17778 | 1023 | 1021
17778 | 1024 | 1017
17778 | 1024 | 1021
17778 | 1025 | 1017
17778 | 1025 | 1021
(20 rows)
-- Temporary hack to eliminate SET application name from command until we get the
-- background job enhancement done.
SELECT D.task_id,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
task_id | command | depends_on | command
---------------------------------------------------------------------
1015 | 85674025 | 1014 | 85674024
1016 | 85674026 | 1014 | 85674024
1016 | 85674026 | 1015 | 85674025
1017 | 85674026 | 1014 | 85674024
1017 | 85674026 | 1015 | 85674025
1017 | 85674026 | 1016 | 85674026
1019 | 85674025 | 1018 | 85674024
1020 | 85674026 | 1018 | 85674024
1020 | 85674026 | 1019 | 85674025
1021 | 85674026 | 1018 | 85674024
1021 | 85674026 | 1019 | 85674025
1021 | 85674026 | 1020 | 85674026
1022 | 85674016 | 1017 | 85674026
1022 | 85674016 | 1021 | 85674026
1023 | 85674017 | 1017 | 85674026
1023 | 85674017 | 1021 | 85674026
1024 | 85674003 | 1017 | 85674026
1024 | 85674003 | 1021 | 85674026
1025 | 85674001 | 1017 | 85674026
1025 | 85674001 | 1021 | 85674026
(20 rows)
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_3_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_4_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_5_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
select citus_remove_node('localhost', :worker_6_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;

View File

@ -252,9 +252,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable)
job_id | task_id | status
---------------------------------------------------------------------
1450005 | 1450009 | running
1450005 | 1450010 | running
1450005 | 1450011 | running
1450005 | 1450009 | done
1450005 | 1450010 | done
1450005 | 1450011 | done
1450006 | 1450012 | running
1450007 | 1450013 | runnable
(5 rows)
@ -282,9 +282,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
ORDER BY job_id, task_id; -- show that last task is running
job_id | task_id | status
---------------------------------------------------------------------
1450005 | 1450009 | running
1450005 | 1450010 | running
1450005 | 1450011 | running
1450005 | 1450009 | done
1450005 | 1450010 | done
1450005 | 1450011 | done
1450006 | 1450012 | cancelled
1450007 | 1450013 | running
(5 rows)
@ -318,9 +318,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
ORDER BY job_id, task_id; -- show that multiple cancels worked
job_id | task_id | status
---------------------------------------------------------------------
1450005 | 1450009 | cancelled
1450005 | 1450010 | cancelled
1450005 | 1450011 | cancelled
1450005 | 1450009 | done
1450005 | 1450010 | done
1450005 | 1450011 | done
1450006 | 1450012 | cancelled
1450007 | 1450013 | cancelled
(5 rows)
@ -372,9 +372,9 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id; -- show that last task is not running but ready to run(runnable)
task_id | status
---------------------------------------------------------------------
1450014 | running
1450015 | running
1450016 | running
1450014 | done
1450015 | done
1450016 | done
1450017 | running
1450018 | runnable
(5 rows)
@ -397,9 +397,9 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id; -- show that last task is running
task_id | status
---------------------------------------------------------------------
1450014 | running
1450015 | running
1450016 | running
1450014 | done
1450015 | done
1450016 | done
1450017 | running
1450018 | running
(5 rows)
@ -445,9 +445,9 @@ SELECT task_id, status FROM pg_dist_background_task
ORDER BY task_id; -- show that all tasks are cancelled
task_id | status
---------------------------------------------------------------------
1450014 | cancelled
1450015 | cancelled
1450016 | cancelled
1450014 | done
1450015 | done
1450016 | done
1450017 | cancelled
1450018 | cancelled
(5 rows)
@ -693,9 +693,9 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
---------------------------------------------------------------------
1450017 | 1450025 | running | {1,2}
1450017 | 1450026 | running | {3,4}
1450017 | 1450027 | runnable | {1,2}
1450017 | 1450028 | runnable | {1,3}
1450017 | 1450029 | runnable | {2,4}
1450017 | 1450027 | running | {1,2}
1450017 | 1450028 | running | {1,3}
1450017 | 1450029 | running | {2,4}
1450017 | 1450030 | runnable | {1,2}
1450017 | 1450031 | runnable | {1,3}
1450017 | 1450032 | runnable | {1,4}
@ -750,7 +750,7 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
1450017 | 1450027 | running | {1,2}
1450017 | 1450028 | running | {1,3}
1450017 | 1450029 | running | {2,4}
1450017 | 1450030 | runnable | {1,2}
1450017 | 1450030 | running | {1,2}
1450017 | 1450031 | runnable | {1,3}
1450017 | 1450032 | runnable | {1,4}
(8 rows)
@ -810,7 +810,7 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | running | {1,2}
1450017 | 1450030 | done | {1,2}
1450017 | 1450031 | running | {1,3}
1450017 | 1450032 | running | {1,4}
(8 rows)
@ -825,15 +825,15 @@ SELECT pg_reload_conf();
-- if pg_cancel_backend is called on one of the running task PIDs
-- task doesn't restart because it's not allowed anymore by the limit.
-- node with id 1 can be used only once, unless there are previously running tasks
SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset
SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process
SELECT pid AS task_id7_pid FROM pg_dist_background_task WHERE task_id IN (:task_id7) \gset
SELECT pg_cancel_backend(:task_id7_pid); -- cancel task_id7 process
pg_cancel_backend
---------------------------------------------------------------------
t
(1 row)
-- task goes to only runnable state, not running anymore.
SELECT citus_task_wait(:task_id6, desired_status => 'runnable');
SELECT citus_task_wait(:task_id7, desired_status => 'runnable');
citus_task_wait
---------------------------------------------------------------------
@ -851,8 +851,8 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | runnable | {1,2}
1450017 | 1450031 | running | {1,3}
1450017 | 1450030 | done | {1,2}
1450017 | 1450031 | runnable | {1,3}
1450017 | 1450032 | running | {1,4}
(8 rows)
@ -868,7 +868,7 @@ SELECT citus_task_wait(:task_id8, desired_status => 'done');
(1 row)
SELECT citus_task_wait(:task_id6, desired_status => 'running');
SELECT citus_task_wait(:task_id6, desired_status => 'done');
citus_task_wait
---------------------------------------------------------------------
@ -880,16 +880,16 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4,
:task_id5, :task_id6, :task_id7, :task_id8)
ORDER BY job_id, task_id;
job_id | task_id | status | nodes_involved
job_id | task_id | status | nodes_involved
---------------------------------------------------------------------
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | running | {1,2}
1450017 | 1450031 | done | {1,3}
1450017 | 1450032 | done | {1,4}
1450017 | 1450025 | done | {1,2}
1450017 | 1450026 | done | {3,4}
1450017 | 1450027 | done | {1,2}
1450017 | 1450028 | done | {1,3}
1450017 | 1450029 | done | {2,4}
1450017 | 1450030 | done | {1,2}
1450017 | 1450031 | done | {1,3}
1450017 | 1450032 | done | {1,4}
(8 rows)
SELECT citus_job_cancel(:job_id1);

View File

@ -594,10 +594,15 @@ step s2-move-placement:
SELECT master_move_shard_placement(
get_shard_id_for_distribution_column('logical_replicate_placement', 4),
'localhost', 57637, 'localhost', 57638);
ERROR: could not acquire the lock required to move public.logical_replicate_placement
<waiting ...>
step s1-end:
COMMIT;
COMMIT;
step s2-move-placement: <... completed>
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
step s2-end:
COMMIT;

View File

@ -1503,9 +1503,12 @@ ALTER EXTENSION citus UPDATE TO '13.2-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) bigint |
function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision) |
| function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision)
(2 rows)
| function citus_internal.citus_internal_copy_single_shard_placement(bigint,integer,integer,integer,citus.shard_transfer_mode) void
| function citus_rebalance_start(name,boolean,citus.shard_transfer_mode,boolean,boolean) bigint
| function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision)
(5 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -29,7 +29,7 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass
AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value(anyelement)'
AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value_agg(anyelement,anyelement)'
ORDER BY 1;
description
description
---------------------------------------------------------------------
event trigger citus_cascade_to_partition
function alter_distributed_table(regclass,text,integer,text,boolean)
@ -85,6 +85,7 @@ ORDER BY 1;
function citus_internal.add_shard_metadata(regclass,bigint,"char",text,text)
function citus_internal.add_tenant_schema(oid,integer)
function citus_internal.adjust_local_clock_to_remote(cluster_clock)
function citus_internal.citus_internal_copy_single_shard_placement(bigint,integer,integer,integer,citus.shard_transfer_mode)
function citus_internal.database_command(text)
function citus_internal.delete_colocation_metadata(integer)
function citus_internal.delete_partition_metadata(regclass)
@ -155,7 +156,7 @@ ORDER BY 1;
function citus_pid_for_gpid(bigint)
function citus_prepare_pg_upgrade()
function citus_query_stats()
function citus_rebalance_start(name,boolean,citus.shard_transfer_mode)
function citus_rebalance_start(name,boolean,citus.shard_transfer_mode,boolean,boolean)
function citus_rebalance_status(boolean)
function citus_rebalance_stop()
function citus_rebalance_wait()
@ -393,6 +394,6 @@ ORDER BY 1;
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(362 rows)
(363 rows)
DROP TABLE extension_basic_types;

View File

@ -13,3 +13,4 @@ test: multi_colocated_shard_rebalance
test: cpu_priority
test: check_mx
test: citus_drain_node
test: background_rebalance_parallel_reference_tables

View File

@ -0,0 +1,228 @@
--
-- BACKGROUND_REBALANCE_PARALLEL_REFERENCE_TABLES
--
-- Test to check if the background tasks scheduled for moving reference tables
-- shards in parallel by the background rebalancer have the correct dependencies
--
CREATE SCHEMA background_rebalance_parallel;
SET search_path TO background_rebalance_parallel;
SET citus.next_shard_id TO 85674000;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO ERROR;
ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777;
ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50;
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
ALTER SYSTEM SET citus.max_background_task_executors_per_node = 5;
SELECT pg_reload_conf();
-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group
CREATE TABLE table1_colg1 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg1 (b int PRIMARY KEY);
SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1');
-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group
CREATE TABLE table1_colg2 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg2 (b int primary key);
SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2');
-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group
CREATE TABLE table1_colg3 (a int PRIMARY KEY);
SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none');
CREATE TABLE table2_colg3 (b int primary key);
SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3');
-- Create reference tables with primary-foreign key relationships
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL REFERENCES customers(id),
order_date DATE NOT NULL DEFAULT CURRENT_DATE
);
CREATE TABLE order_items (
id SERIAL PRIMARY KEY,
order_id INTEGER NOT NULL REFERENCES orders(id),
product_name TEXT NOT NULL,
quantity INTEGER NOT NULL,
price NUMERIC(10, 2) NOT NULL
);
SELECT create_reference_table('customers');
SELECT create_reference_table('orders');
SELECT create_reference_table('order_items');
-- INSERT SOME DATA
-- Insert 10 customers
INSERT INTO customers (name, email)
SELECT
'Customer ' || i,
'customer' || i || '@example.com'
FROM generate_series(1, 10) AS i;
-- Insert 30 orders: each customer gets 3 orders
INSERT INTO orders (customer_id, order_date)
SELECT
(i % 10) + 1, -- customer_id between 1 and 10
CURRENT_DATE - (i % 7)
FROM generate_series(1, 30) AS i;
-- Insert 90 order_items: each order has 3 items
INSERT INTO order_items (order_id, product_name, quantity, price)
SELECT
(i % 30) + 1, -- order_id between 1 and 30
'Product ' || (i % 5 + 1),
(i % 10) + 1,
round((random() * 100 + 10)::numeric, 2)
FROM generate_series(1, 90) AS i;
-- Add two new nodes so that we can rebalance
SELECT 1 FROM citus_add_node('localhost', :worker_3_port);
SELECT 1 FROM citus_add_node('localhost', :worker_4_port);
SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid;
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(
shard_transfer_mode := 'force_logical',
parallel_transfer_colocated_shards := true,
parallel_transfer_reference_tables := true) \gset
SELECT citus_rebalance_wait();
-- see the dependencies of the tasks scheduled by the background rebalancer
SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on;
-- Temporary hack to eliminate SET application name from command until we get the
-- background job enhancement done.
SELECT D.task_id,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
-- Drain worker_3 so that we can move only one colocation group to worker_3
-- to create an unbalance that would cause parallel rebalancing.
SELECT 1 FROM citus_drain_node('localhost',:worker_3_port);
SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true);
CALL citus_cleanup_orphaned_resources();
-- Move all the shards of Colocation group 3 to worker_3.
SELECT
master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port
ORDER BY
shardid;
CALL citus_cleanup_orphaned_resources();
-- Activate and new nodes so that we can rebalance.
SELECT 1 FROM citus_activate_node('localhost', :worker_4_port);
SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true);
SELECT 1 FROM citus_add_node('localhost', :worker_5_port);
SELECT 1 FROM citus_add_node('localhost', :worker_6_port);
SELECT citus_rebalance_start AS job_id from citus_rebalance_start(
shard_transfer_mode := 'block_writes',
parallel_transfer_colocated_shards := true,
parallel_transfer_reference_tables := true) \gset
SELECT citus_rebalance_wait();
-- see the dependencies of the tasks scheduled by the background rebalancer
SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on;
-- Temporary hack to eliminate SET application name from command until we get the
-- background job enhancement done.
SELECT D.task_id,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.task_id),
D.depends_on,
(SELECT
CASE
WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN
SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)')
WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN
SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)')
ELSE
T.command
END
FROM pg_dist_background_task T WHERE T.task_id = D.depends_on)
FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC;
DROP SCHEMA background_rebalance_parallel CASCADE;
TRUNCATE pg_dist_background_job CASCADE;
TRUNCATE pg_dist_background_task CASCADE;
TRUNCATE pg_dist_background_task_depend;
SELECT public.wait_for_resource_cleanup();
select citus_remove_node('localhost', :worker_3_port);
select citus_remove_node('localhost', :worker_4_port);
select citus_remove_node('localhost', :worker_5_port);
select citus_remove_node('localhost', :worker_6_port);
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls;

View File

@ -345,11 +345,11 @@ SELECT pg_reload_conf();
-- if pg_cancel_backend is called on one of the running task PIDs
-- task doesn't restart because it's not allowed anymore by the limit.
-- node with id 1 can be used only once, unless there are previously running tasks
SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset
SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process
SELECT pid AS task_id7_pid FROM pg_dist_background_task WHERE task_id IN (:task_id7) \gset
SELECT pg_cancel_backend(:task_id7_pid); -- cancel task_id7 process
-- task goes to only runnable state, not running anymore.
SELECT citus_task_wait(:task_id6, desired_status => 'runnable');
SELECT citus_task_wait(:task_id7, desired_status => 'runnable');
-- show that cancelled task hasn't restarted because limit doesn't allow it
SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
@ -359,7 +359,7 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task
SELECT citus_task_wait(:task_id7, desired_status => 'done');
SELECT citus_task_wait(:task_id8, desired_status => 'done');
SELECT citus_task_wait(:task_id6, desired_status => 'running');
SELECT citus_task_wait(:task_id6, desired_status => 'done');
-- show that the 6th task has restarted only after both 6 and 7 are done
-- since we have a limit of 1 background task executor per node with id 1