From 122a541c5896ce3d7abc125f9949fcf9d9dcac06 Mon Sep 17 00:00:00 2001 From: Muhammad Usama Date: Tue, 5 Aug 2025 23:46:10 +0300 Subject: [PATCH] Add test for parallel reference table transfer and adjust tests for concurrent locking behavior MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit introduces the following changes: - New test case to verify parallel transfer of reference table shards during rebalance. - Adjustments to existing test cases to reflect the revised locking strategy, which now permits concurrent shard movements within a colocation group. These changes align test expectations with the new execution model, where multiple shard transfers can progress in parallel. Additionally, with the updated locking mechanism, shard transfer tasks that involve minimal or no data movement may complete almost instantly—transitioning from running to done in a split second. As a result, some test assertions were updated accordingly. Note that this does not alter any underlying functionality, only the timing of task state transitions. - Few indent fixes are also part of the commit - Move DROP citus_rebalance_start statement before its creation command. - Fixing the path in downgrade script --- .../distributed/sql/citus--13.1-1--13.2-1.sql | 1 - .../sql/downgrades/citus--13.2-1--13.1-1.sql | 5 +- .../sql/udfs/citus_rebalance_start/11.1-1.sql | 2 + .../sql/udfs/citus_rebalance_start/13.2-1.sql | 2 + .../sql/udfs/citus_rebalance_start/latest.sql | 2 + .../distributed/utils/reference_table_utils.c | 29 +- src/test/regress/citus_tests/run_test.py | 4 + .../background_rebalance_parallel.out | 2 +- ...nd_rebalance_parallel_reference_tables.out | 457 ++++++++++++++++++ .../background_task_queue_monitor.out | 76 +-- ...ical_replication_single_shard_commands.out | 11 +- src/test/regress/expected/multi_extension.out | 7 +- .../expected/upgrade_list_citus_objects.out | 7 +- src/test/regress/operations_schedule | 1 + ...nd_rebalance_parallel_reference_tables.sql | 228 +++++++++ .../sql/background_task_queue_monitor.sql | 8 +- 16 files changed, 775 insertions(+), 67 deletions(-) create mode 100644 src/test/regress/expected/background_rebalance_parallel_reference_tables.out create mode 100644 src/test/regress/sql/background_rebalance_parallel_reference_tables.sql diff --git a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql index be4e0e62f..09cb5c2e3 100644 --- a/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql +++ b/src/backend/distributed/sql/citus--13.1-1--13.2-1.sql @@ -3,6 +3,5 @@ -- bump version to 13.2-1 #include "udfs/worker_last_saved_explain_analyze/13.2-1.sql" -DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode); #include "udfs/citus_rebalance_start/13.2-1.sql" #include "udfs/citus_internal_copy_single_shard_placement/13.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql index cfdf1eb60..22a56810d 100644 --- a/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--13.2-1--13.1-1.sql @@ -1,8 +1,7 @@ -- citus--13.2-1--13.1-1 -DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean); -DROP FUNCTION IF EXISTS citus_internal.citus_internal_copy_single_shard_placement(bigint, integer, integer, integer, citus.shard_transfer_mode); -#include "udfs/citus_rebalance_start/11.1-1.sql" -- downgrade version to 13.1-1 +DROP FUNCTION IF EXISTS citus_internal.citus_internal_copy_single_shard_placement(bigint, integer, integer, integer, citus.shard_transfer_mode); +#include "../udfs/citus_rebalance_start/11.1-1.sql" DROP FUNCTION IF EXISTS pg_catalog.worker_last_saved_explain_analyze(); #include "../udfs/worker_last_saved_explain_analyze/9.4-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/11.1-1.sql index cc84d3142..f9545a6bf 100644 --- a/src/backend/distributed/sql/udfs/citus_rebalance_start/11.1-1.sql +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/11.1-1.sql @@ -1,3 +1,5 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode, boolean, boolean); + CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( rebalance_strategy name DEFAULT NULL, drain_only boolean DEFAULT false, diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql index 658e78dd6..b3886f7a2 100644 --- a/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/13.2-1.sql @@ -1,3 +1,5 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode); + CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( rebalance_strategy name DEFAULT NULL, drain_only boolean DEFAULT false, diff --git a/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql index 658e78dd6..b3886f7a2 100644 --- a/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_rebalance_start/latest.sql @@ -1,3 +1,5 @@ +DROP FUNCTION IF EXISTS pg_catalog.citus_rebalance_start(name, boolean, citus.shard_transfer_mode); + CREATE OR REPLACE FUNCTION pg_catalog.citus_rebalance_start( rebalance_strategy name DEFAULT NULL, drain_only boolean DEFAULT false, diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 5a98c9cdd..cc872b17b 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -139,7 +139,6 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) * DROP TABLE and create_reference_table calls so that the list of reference tables we * operate on are stable. * - * * Since the changes to the reference table placements are made via loopback * connections we release the locks held at the end of this function. Due to Citus * only running transactions in READ COMMITTED mode we can be sure that other @@ -289,9 +288,9 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) } /* - * Since reference tables have been copied via a loopback connection we do not have to - * retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure - * that other transactions will find the reference tables copied. + * Since reference tables have been copied via a loopback connection we do not have + * to retain our locks. Since Citus only runs well in READ COMMITTED mode we can be + * sure that other transactions will find the reference tables copied. * We have obtained and held multiple locks, here we unlock them all in the reverse * order we have obtained them in. */ @@ -306,8 +305,8 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) /* * ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes is essentially a * twin of EnsureReferenceTablesExistOnAllNodesExtended. The difference is instead of - * copying the missing tables on to the worker nodes this function creates the background tasks - * for each required copy operation and schedule it in the background job. + * copying the missing tables on to the worker nodes this function creates the background + * tasks for each required copy operation and schedule it in the background job. * Another difference is that instead of moving all the colocated shards sequencially * this function creates a seperate background task for each shard, even when the shards * are part of same colocated shard group. @@ -502,8 +501,8 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr GetGlobalPID()); /* - * In first step just create and load data in the shards but defer the creation - * of the shard relationships to the next step. + * In first step just create and load data in the shards but defer the + * creation of the shard relationships to the next step. * The reason we want to defer the creation of the shard relationships is that * we want to make sure that all the parallel shard copy task are finished * before we create the relationships. Otherwise we might end up with @@ -511,7 +510,9 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr * create the shard relationships will result in ERROR. */ appendStringInfo(&buf, - "SELECT citus_internal.citus_internal_copy_single_shard_placement(%ld,%u,%u,%u,%s)", + "SELECT " + "citus_internal.citus_internal_copy_single_shard_placement" + "(%ld,%u,%u,%u,%s)", shardId, sourceShardPlacement->nodeId, newWorkerNode->nodeId, @@ -582,7 +583,8 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr taskEntry->taskId = task->taskid; ereport(DEBUG2, (errmsg( - "Added hash entry in scheduled task hash with task %ld for shard %ld", + "Added hash entry in scheduled task hash " + "with task %ld for shard %ld", task->taskid, shardId))); } else @@ -618,7 +620,9 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr CITUS_REBALANCER_APPLICATION_NAME_PREFIX, GetGlobalPID()); appendStringInfo(&buf, - "SELECT citus_internal.citus_internal_copy_single_shard_placement(%ld,%u,%u,%u,%s)", + "SELECT " + "citus_internal.citus_internal_copy_single_shard_placement" + "(%ld,%u,%u,%u,%s)", shardId, sourceShardPlacement->nodeId, newWorkerNode->nodeId, @@ -627,7 +631,8 @@ ScheduleTasksToParallelCopyReferenceTablesOnAllMissingNodes(int64 jobId, char tr ereport(DEBUG2, (errmsg( - "creating relations for reference table '%s' on %s:%d ... QUERY= %s", + "creating relations for reference table '%s' on %s:%d ... " + "QUERY= %s", referenceTableName, newWorkerNode->workerName, newWorkerNode->workerPort, buf.data))); diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 193bdf09f..8772d6944 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -140,6 +140,10 @@ DEPS = { "background_rebalance_parallel": TestDeps( None, ["multi_test_helpers", "multi_cluster_management"], worker_count=6 ), + "background_rebalance_parallel_reference_tables": TestDeps( + None, ["multi_test_helpers", "multi_cluster_management"], + repeatable=False, worker_count=6 + ), "function_propagation": TestDeps("minimal_schedule"), "citus_shards": TestDeps("minimal_schedule"), "grant_on_foreign_server_propagation": TestDeps("minimal_schedule"), diff --git a/src/test/regress/expected/background_rebalance_parallel.out b/src/test/regress/expected/background_rebalance_parallel.out index cc3470de9..90f13d701 100644 --- a/src/test/regress/expected/background_rebalance_parallel.out +++ b/src/test/regress/expected/background_rebalance_parallel.out @@ -534,7 +534,7 @@ FROM pg_dist_background_task WHERE job_id in (:job_id) ORDER BY task_id; job_id | task_id | status | nodes_involved --------------------------------------------------------------------- 17779 | 1013 | done | {50,56} - 17779 | 1014 | running | {50,57} + 17779 | 1014 | done | {50,57} 17779 | 1015 | running | {50,56} 17779 | 1016 | blocked | {50,57} 17779 | 1017 | runnable | {50,56} diff --git a/src/test/regress/expected/background_rebalance_parallel_reference_tables.out b/src/test/regress/expected/background_rebalance_parallel_reference_tables.out new file mode 100644 index 000000000..fe248945b --- /dev/null +++ b/src/test/regress/expected/background_rebalance_parallel_reference_tables.out @@ -0,0 +1,457 @@ +-- +-- BACKGROUND_REBALANCE_PARALLEL_REFERENCE_TABLES +-- +-- Test to check if the background tasks scheduled for moving reference tables +-- shards in parallel by the background rebalancer have the correct dependencies +-- +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; +SET client_min_messages TO ERROR; +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50; +SELECT 1 FROM master_remove_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 5; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg1 (b int PRIMARY KEY); +SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group +CREATE TABLE table1_colg2 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg2 (b int primary key); +SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group +CREATE TABLE table1_colg3 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table2_colg3 (b int primary key); +SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Create reference tables with primary-foreign key relationships +CREATE TABLE customers ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + email TEXT UNIQUE NOT NULL +); +CREATE TABLE orders ( + id SERIAL PRIMARY KEY, + customer_id INTEGER NOT NULL REFERENCES customers(id), + order_date DATE NOT NULL DEFAULT CURRENT_DATE +); +CREATE TABLE order_items ( + id SERIAL PRIMARY KEY, + order_id INTEGER NOT NULL REFERENCES orders(id), + product_name TEXT NOT NULL, + quantity INTEGER NOT NULL, + price NUMERIC(10, 2) NOT NULL +); +SELECT create_reference_table('customers'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('orders'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_reference_table('order_items'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- INSERT SOME DATA +-- Insert 10 customers +INSERT INTO customers (name, email) +SELECT + 'Customer ' || i, + 'customer' || i || '@example.com' +FROM generate_series(1, 10) AS i; +-- Insert 30 orders: each customer gets 3 orders +INSERT INTO orders (customer_id, order_date) +SELECT + (i % 10) + 1, -- customer_id between 1 and 10 + CURRENT_DATE - (i % 7) +FROM generate_series(1, 30) AS i; +-- Insert 90 order_items: each order has 3 items +INSERT INTO order_items (order_id, product_name, quantity, price) +SELECT + (i % 30) + 1, -- order_id between 1 and 30 + 'Product ' || (i % 5 + 1), + (i % 10) + 1, + round((random() * 100 + 10)::numeric, 2) +FROM generate_series(1, 90) AS i; +-- Add two new nodes so that we can rebalance +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + table_name | shardid | shard_size | sourcename | sourceport | targetname | targetport +--------------------------------------------------------------------- + table1_colg1 | 85674000 | 0 | localhost | 57637 | localhost | 57640 + table1_colg1 | 85674001 | 0 | localhost | 57638 | localhost | 57639 + table2_colg1 | 85674004 | 0 | localhost | 57637 | localhost | 57640 + table2_colg1 | 85674005 | 0 | localhost | 57638 | localhost | 57639 + table1_colg2 | 85674008 | 0 | localhost | 57637 | localhost | 57640 + table1_colg2 | 85674009 | 0 | localhost | 57638 | localhost | 57639 + table2_colg2 | 85674012 | 0 | localhost | 57637 | localhost | 57640 + table2_colg2 | 85674013 | 0 | localhost | 57638 | localhost | 57639 + table1_colg3 | 85674016 | 0 | localhost | 57637 | localhost | 57640 + table1_colg3 | 85674017 | 0 | localhost | 57638 | localhost | 57639 + table2_colg3 | 85674020 | 0 | localhost | 57637 | localhost | 57640 + table2_colg3 | 85674021 | 0 | localhost | 57638 | localhost | 57639 +(12 rows) + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'force_logical', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; + job_id | task_id | depends_on +--------------------------------------------------------------------- + 17777 | 1001 | 1000 + 17777 | 1002 | 1000 + 17777 | 1002 | 1001 + 17777 | 1003 | 1000 + 17777 | 1003 | 1001 + 17777 | 1003 | 1002 + 17777 | 1005 | 1004 + 17777 | 1006 | 1004 + 17777 | 1006 | 1005 + 17777 | 1007 | 1004 + 17777 | 1007 | 1005 + 17777 | 1007 | 1006 + 17777 | 1008 | 1003 + 17777 | 1008 | 1007 + 17777 | 1009 | 1003 + 17777 | 1009 | 1007 + 17777 | 1010 | 1003 + 17777 | 1010 | 1007 + 17777 | 1011 | 1003 + 17777 | 1011 | 1007 + 17777 | 1012 | 1003 + 17777 | 1012 | 1007 + 17777 | 1013 | 1003 + 17777 | 1013 | 1007 +(24 rows) + +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1001 | 85674025 | 1000 | 85674024 + 1002 | 85674026 | 1000 | 85674024 + 1002 | 85674026 | 1001 | 85674025 + 1003 | 85674026 | 1000 | 85674024 + 1003 | 85674026 | 1001 | 85674025 + 1003 | 85674026 | 1002 | 85674026 + 1005 | 85674025 | 1004 | 85674024 + 1006 | 85674026 | 1004 | 85674024 + 1006 | 85674026 | 1005 | 85674025 + 1007 | 85674026 | 1004 | 85674024 + 1007 | 85674026 | 1005 | 85674025 + 1007 | 85674026 | 1006 | 85674026 + 1008 | 85674001 | 1003 | 85674026 + 1008 | 85674001 | 1007 | 85674026 + 1009 | 85674000 | 1003 | 85674026 + 1009 | 85674000 | 1007 | 85674026 + 1010 | 85674009 | 1003 | 85674026 + 1010 | 85674009 | 1007 | 85674026 + 1011 | 85674008 | 1003 | 85674026 + 1011 | 85674008 | 1007 | 85674026 + 1012 | 85674017 | 1003 | 85674026 + 1012 | 85674017 | 1007 | 85674026 + 1013 | 85674016 | 1003 | 85674026 + 1013 | 85674016 | 1007 | 85674026 +(24 rows) + +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +-- Drain worker_3 so that we can move only one colocation group to worker_3 +-- to create an unbalance that would cause parallel rebalancing. +SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); +-- Move all the shards of Colocation group 3 to worker_3. +SELECT +master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port +ORDER BY + shardid; + master_move_shard_placement +--------------------------------------------------------------------- + + + + +(4 rows) + +CALL citus_cleanup_orphaned_resources(); +-- Activate and new nodes so that we can rebalance. +SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_5_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT 1 FROM citus_add_node('localhost', :worker_6_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'block_writes', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset +SELECT citus_rebalance_wait(); + citus_rebalance_wait +--------------------------------------------------------------------- + +(1 row) + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; + job_id | task_id | depends_on +--------------------------------------------------------------------- + 17778 | 1015 | 1014 + 17778 | 1016 | 1014 + 17778 | 1016 | 1015 + 17778 | 1017 | 1014 + 17778 | 1017 | 1015 + 17778 | 1017 | 1016 + 17778 | 1019 | 1018 + 17778 | 1020 | 1018 + 17778 | 1020 | 1019 + 17778 | 1021 | 1018 + 17778 | 1021 | 1019 + 17778 | 1021 | 1020 + 17778 | 1022 | 1017 + 17778 | 1022 | 1021 + 17778 | 1023 | 1017 + 17778 | 1023 | 1021 + 17778 | 1024 | 1017 + 17778 | 1024 | 1021 + 17778 | 1025 | 1017 + 17778 | 1025 | 1021 +(20 rows) + +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + task_id | command | depends_on | command +--------------------------------------------------------------------- + 1015 | 85674025 | 1014 | 85674024 + 1016 | 85674026 | 1014 | 85674024 + 1016 | 85674026 | 1015 | 85674025 + 1017 | 85674026 | 1014 | 85674024 + 1017 | 85674026 | 1015 | 85674025 + 1017 | 85674026 | 1016 | 85674026 + 1019 | 85674025 | 1018 | 85674024 + 1020 | 85674026 | 1018 | 85674024 + 1020 | 85674026 | 1019 | 85674025 + 1021 | 85674026 | 1018 | 85674024 + 1021 | 85674026 | 1019 | 85674025 + 1021 | 85674026 | 1020 | 85674026 + 1022 | 85674016 | 1017 | 85674026 + 1022 | 85674016 | 1021 | 85674026 + 1023 | 85674017 | 1017 | 85674026 + 1023 | 85674017 | 1021 | 85674026 + 1024 | 85674003 | 1017 | 85674026 + 1024 | 85674003 | 1021 | 85674026 + 1025 | 85674001 | 1017 | 85674026 + 1025 | 85674001 | 1021 | 85674026 +(20 rows) + +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +SELECT public.wait_for_resource_cleanup(); + wait_for_resource_cleanup +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_3_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_4_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_5_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +select citus_remove_node('localhost', :worker_6_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/expected/background_task_queue_monitor.out b/src/test/regress/expected/background_task_queue_monitor.out index 1d4006377..74570a4d2 100644 --- a/src/test/regress/expected/background_task_queue_monitor.out +++ b/src/test/regress/expected/background_task_queue_monitor.out @@ -252,9 +252,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable) job_id | task_id | status --------------------------------------------------------------------- - 1450005 | 1450009 | running - 1450005 | 1450010 | running - 1450005 | 1450011 | running + 1450005 | 1450009 | done + 1450005 | 1450010 | done + 1450005 | 1450011 | done 1450006 | 1450012 | running 1450007 | 1450013 | runnable (5 rows) @@ -282,9 +282,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is running job_id | task_id | status --------------------------------------------------------------------- - 1450005 | 1450009 | running - 1450005 | 1450010 | running - 1450005 | 1450011 | running + 1450005 | 1450009 | done + 1450005 | 1450010 | done + 1450005 | 1450011 | done 1450006 | 1450012 | cancelled 1450007 | 1450013 | running (5 rows) @@ -318,9 +318,9 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that multiple cancels worked job_id | task_id | status --------------------------------------------------------------------- - 1450005 | 1450009 | cancelled - 1450005 | 1450010 | cancelled - 1450005 | 1450011 | cancelled + 1450005 | 1450009 | done + 1450005 | 1450010 | done + 1450005 | 1450011 | done 1450006 | 1450012 | cancelled 1450007 | 1450013 | cancelled (5 rows) @@ -372,9 +372,9 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that last task is not running but ready to run(runnable) task_id | status --------------------------------------------------------------------- - 1450014 | running - 1450015 | running - 1450016 | running + 1450014 | done + 1450015 | done + 1450016 | done 1450017 | running 1450018 | runnable (5 rows) @@ -397,9 +397,9 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that last task is running task_id | status --------------------------------------------------------------------- - 1450014 | running - 1450015 | running - 1450016 | running + 1450014 | done + 1450015 | done + 1450016 | done 1450017 | running 1450018 | running (5 rows) @@ -445,9 +445,9 @@ SELECT task_id, status FROM pg_dist_background_task ORDER BY task_id; -- show that all tasks are cancelled task_id | status --------------------------------------------------------------------- - 1450014 | cancelled - 1450015 | cancelled - 1450016 | cancelled + 1450014 | done + 1450015 | done + 1450016 | done 1450017 | cancelled 1450018 | cancelled (5 rows) @@ -693,9 +693,9 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task --------------------------------------------------------------------- 1450017 | 1450025 | running | {1,2} 1450017 | 1450026 | running | {3,4} - 1450017 | 1450027 | runnable | {1,2} - 1450017 | 1450028 | runnable | {1,3} - 1450017 | 1450029 | runnable | {2,4} + 1450017 | 1450027 | running | {1,2} + 1450017 | 1450028 | running | {1,3} + 1450017 | 1450029 | running | {2,4} 1450017 | 1450030 | runnable | {1,2} 1450017 | 1450031 | runnable | {1,3} 1450017 | 1450032 | runnable | {1,4} @@ -750,7 +750,7 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task 1450017 | 1450027 | running | {1,2} 1450017 | 1450028 | running | {1,3} 1450017 | 1450029 | running | {2,4} - 1450017 | 1450030 | runnable | {1,2} + 1450017 | 1450030 | running | {1,2} 1450017 | 1450031 | runnable | {1,3} 1450017 | 1450032 | runnable | {1,4} (8 rows) @@ -810,7 +810,7 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task 1450017 | 1450027 | done | {1,2} 1450017 | 1450028 | done | {1,3} 1450017 | 1450029 | done | {2,4} - 1450017 | 1450030 | running | {1,2} + 1450017 | 1450030 | done | {1,2} 1450017 | 1450031 | running | {1,3} 1450017 | 1450032 | running | {1,4} (8 rows) @@ -825,15 +825,15 @@ SELECT pg_reload_conf(); -- if pg_cancel_backend is called on one of the running task PIDs -- task doesn't restart because it's not allowed anymore by the limit. -- node with id 1 can be used only once, unless there are previously running tasks -SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset -SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process +SELECT pid AS task_id7_pid FROM pg_dist_background_task WHERE task_id IN (:task_id7) \gset +SELECT pg_cancel_backend(:task_id7_pid); -- cancel task_id7 process pg_cancel_backend --------------------------------------------------------------------- t (1 row) -- task goes to only runnable state, not running anymore. -SELECT citus_task_wait(:task_id6, desired_status => 'runnable'); +SELECT citus_task_wait(:task_id7, desired_status => 'runnable'); citus_task_wait --------------------------------------------------------------------- @@ -851,8 +851,8 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task 1450017 | 1450027 | done | {1,2} 1450017 | 1450028 | done | {1,3} 1450017 | 1450029 | done | {2,4} - 1450017 | 1450030 | runnable | {1,2} - 1450017 | 1450031 | running | {1,3} + 1450017 | 1450030 | done | {1,2} + 1450017 | 1450031 | runnable | {1,3} 1450017 | 1450032 | running | {1,4} (8 rows) @@ -868,7 +868,7 @@ SELECT citus_task_wait(:task_id8, desired_status => 'done'); (1 row) -SELECT citus_task_wait(:task_id6, desired_status => 'running'); +SELECT citus_task_wait(:task_id6, desired_status => 'done'); citus_task_wait --------------------------------------------------------------------- @@ -880,16 +880,16 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5, :task_id6, :task_id7, :task_id8) ORDER BY job_id, task_id; - job_id | task_id | status | nodes_involved + job_id | task_id | status | nodes_involved --------------------------------------------------------------------- - 1450017 | 1450025 | done | {1,2} - 1450017 | 1450026 | done | {3,4} - 1450017 | 1450027 | done | {1,2} - 1450017 | 1450028 | done | {1,3} - 1450017 | 1450029 | done | {2,4} - 1450017 | 1450030 | running | {1,2} - 1450017 | 1450031 | done | {1,3} - 1450017 | 1450032 | done | {1,4} + 1450017 | 1450025 | done | {1,2} + 1450017 | 1450026 | done | {3,4} + 1450017 | 1450027 | done | {1,2} + 1450017 | 1450028 | done | {1,3} + 1450017 | 1450029 | done | {2,4} + 1450017 | 1450030 | done | {1,2} + 1450017 | 1450031 | done | {1,3} + 1450017 | 1450032 | done | {1,4} (8 rows) SELECT citus_job_cancel(:job_id1); diff --git a/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out b/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out index cbb5e3d8d..979d10c14 100644 --- a/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out +++ b/src/test/regress/expected/isolation_logical_replication_single_shard_commands.out @@ -594,10 +594,15 @@ step s2-move-placement: SELECT master_move_shard_placement( get_shard_id_for_distribution_column('logical_replicate_placement', 4), 'localhost', 57637, 'localhost', 57638); + +step s1-end: + COMMIT; -ERROR: could not acquire the lock required to move public.logical_replicate_placement -step s1-end: - COMMIT; +step s2-move-placement: <... completed> +master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) step s2-end: COMMIT; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index defe41f0d..27b006bf8 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1503,9 +1503,12 @@ ALTER EXTENSION citus UPDATE TO '13.2-1'; SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- + function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) bigint | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision) | - | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision) -(2 rows) + | function citus_internal.citus_internal_copy_single_shard_placement(bigint,integer,integer,integer,citus.shard_transfer_mode) void + | function citus_rebalance_start(name,boolean,citus.shard_transfer_mode,boolean,boolean) bigint + | function worker_last_saved_explain_analyze() TABLE(explain_analyze_output text, execution_duration double precision, execution_ntuples double precision, execution_nloops double precision) +(5 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 030228fe3..c47c312bb 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -29,7 +29,7 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value(anyelement)' AND pg_catalog.pg_describe_object(classid, objid, 0) != 'function any_value_agg(anyelement,anyelement)' ORDER BY 1; - description + description --------------------------------------------------------------------- event trigger citus_cascade_to_partition function alter_distributed_table(regclass,text,integer,text,boolean) @@ -85,6 +85,7 @@ ORDER BY 1; function citus_internal.add_shard_metadata(regclass,bigint,"char",text,text) function citus_internal.add_tenant_schema(oid,integer) function citus_internal.adjust_local_clock_to_remote(cluster_clock) + function citus_internal.citus_internal_copy_single_shard_placement(bigint,integer,integer,integer,citus.shard_transfer_mode) function citus_internal.database_command(text) function citus_internal.delete_colocation_metadata(integer) function citus_internal.delete_partition_metadata(regclass) @@ -155,7 +156,7 @@ ORDER BY 1; function citus_pid_for_gpid(bigint) function citus_prepare_pg_upgrade() function citus_query_stats() - function citus_rebalance_start(name,boolean,citus.shard_transfer_mode) + function citus_rebalance_start(name,boolean,citus.shard_transfer_mode,boolean,boolean) function citus_rebalance_status(boolean) function citus_rebalance_stop() function citus_rebalance_wait() @@ -393,6 +394,6 @@ ORDER BY 1; view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(362 rows) +(363 rows) DROP TABLE extension_basic_types; diff --git a/src/test/regress/operations_schedule b/src/test/regress/operations_schedule index 6dbc303c2..5639ea4d6 100644 --- a/src/test/regress/operations_schedule +++ b/src/test/regress/operations_schedule @@ -13,3 +13,4 @@ test: multi_colocated_shard_rebalance test: cpu_priority test: check_mx test: citus_drain_node +test: background_rebalance_parallel_reference_tables diff --git a/src/test/regress/sql/background_rebalance_parallel_reference_tables.sql b/src/test/regress/sql/background_rebalance_parallel_reference_tables.sql new file mode 100644 index 000000000..e384b78cf --- /dev/null +++ b/src/test/regress/sql/background_rebalance_parallel_reference_tables.sql @@ -0,0 +1,228 @@ +-- +-- BACKGROUND_REBALANCE_PARALLEL_REFERENCE_TABLES +-- +-- Test to check if the background tasks scheduled for moving reference tables +-- shards in parallel by the background rebalancer have the correct dependencies +-- +CREATE SCHEMA background_rebalance_parallel; +SET search_path TO background_rebalance_parallel; +SET citus.next_shard_id TO 85674000; +SET citus.shard_replication_factor TO 1; +SET client_min_messages TO ERROR; + +ALTER SEQUENCE pg_dist_background_job_job_id_seq RESTART 17777; +ALTER SEQUENCE pg_dist_background_task_task_id_seq RESTART 1000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 50050; + +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id_cls \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id_cls \gset +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 50; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 50; + +SELECT 1 FROM master_remove_node('localhost', :worker_1_port); +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); + +SELECT 1 FROM master_add_node('localhost', :worker_1_port); +SELECT 1 FROM master_add_node('localhost', :worker_2_port); + +ALTER SYSTEM SET citus.background_task_queue_interval TO '1s'; +ALTER SYSTEM SET citus.max_background_task_executors_per_node = 5; +SELECT pg_reload_conf(); + +-- Colocation group 1: create two tables table1_colg1, table2_colg1 and in a colocation group +CREATE TABLE table1_colg1 (a int PRIMARY KEY); +SELECT create_distributed_table('table1_colg1', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg1 (b int PRIMARY KEY); + +SELECT create_distributed_table('table2_colg1', 'b', colocate_with => 'table1_colg1'); + +-- Colocation group 2: create two tables table1_colg2, table2_colg2 and in a colocation group +CREATE TABLE table1_colg2 (a int PRIMARY KEY); + +SELECT create_distributed_table('table1_colg2', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg2 (b int primary key); + +SELECT create_distributed_table('table2_colg2', 'b', colocate_with => 'table1_colg2'); + +-- Colocation group 3: create two tables table1_colg3, table2_colg3 and in a colocation group +CREATE TABLE table1_colg3 (a int PRIMARY KEY); + +SELECT create_distributed_table('table1_colg3', 'a', shard_count => 4, colocate_with => 'none'); + +CREATE TABLE table2_colg3 (b int primary key); + +SELECT create_distributed_table('table2_colg3', 'b', colocate_with => 'table1_colg3'); + +-- Create reference tables with primary-foreign key relationships + +CREATE TABLE customers ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + email TEXT UNIQUE NOT NULL +); + +CREATE TABLE orders ( + id SERIAL PRIMARY KEY, + customer_id INTEGER NOT NULL REFERENCES customers(id), + order_date DATE NOT NULL DEFAULT CURRENT_DATE +); + +CREATE TABLE order_items ( + id SERIAL PRIMARY KEY, + order_id INTEGER NOT NULL REFERENCES orders(id), + product_name TEXT NOT NULL, + quantity INTEGER NOT NULL, + price NUMERIC(10, 2) NOT NULL +); + +SELECT create_reference_table('customers'); +SELECT create_reference_table('orders'); +SELECT create_reference_table('order_items'); + +-- INSERT SOME DATA +-- Insert 10 customers +INSERT INTO customers (name, email) +SELECT + 'Customer ' || i, + 'customer' || i || '@example.com' +FROM generate_series(1, 10) AS i; + +-- Insert 30 orders: each customer gets 3 orders +INSERT INTO orders (customer_id, order_date) +SELECT + (i % 10) + 1, -- customer_id between 1 and 10 + CURRENT_DATE - (i % 7) +FROM generate_series(1, 30) AS i; + +-- Insert 90 order_items: each order has 3 items +INSERT INTO order_items (order_id, product_name, quantity, price) +SELECT + (i % 30) + 1, -- order_id between 1 and 30 + 'Product ' || (i % 5 + 1), + (i % 10) + 1, + round((random() * 100 + 10)::numeric, 2) +FROM generate_series(1, 90) AS i; + + +-- Add two new nodes so that we can rebalance +SELECT 1 FROM citus_add_node('localhost', :worker_3_port); +SELECT 1 FROM citus_add_node('localhost', :worker_4_port); + +SELECT * FROM get_rebalance_table_shards_plan() ORDER BY shardid; + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'force_logical', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset + +SELECT citus_rebalance_wait(); + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; + +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + + +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; + +-- Drain worker_3 so that we can move only one colocation group to worker_3 +-- to create an unbalance that would cause parallel rebalancing. +SELECT 1 FROM citus_drain_node('localhost',:worker_3_port); +SELECT citus_set_node_property('localhost', :worker_3_port, 'shouldhaveshards', true); + +CALL citus_cleanup_orphaned_resources(); + +-- Move all the shards of Colocation group 3 to worker_3. +SELECT +master_move_shard_placement(shardid, 'localhost', nodeport, 'localhost', :worker_3_port, 'block_writes') +FROM + pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'table1_colg3'::regclass AND nodeport <> :worker_3_port +ORDER BY + shardid; + +CALL citus_cleanup_orphaned_resources(); + +-- Activate and new nodes so that we can rebalance. +SELECT 1 FROM citus_activate_node('localhost', :worker_4_port); +SELECT citus_set_node_property('localhost', :worker_4_port, 'shouldhaveshards', true); + +SELECT 1 FROM citus_add_node('localhost', :worker_5_port); +SELECT 1 FROM citus_add_node('localhost', :worker_6_port); + +SELECT citus_rebalance_start AS job_id from citus_rebalance_start( + shard_transfer_mode := 'block_writes', + parallel_transfer_colocated_shards := true, + parallel_transfer_reference_tables := true) \gset + +SELECT citus_rebalance_wait(); + +-- see the dependencies of the tasks scheduled by the background rebalancer +SELECT * from pg_dist_background_task_depend ORDER BY job_id, task_id, depends_on; +-- Temporary hack to eliminate SET application name from command until we get the +-- background job enhancement done. +SELECT D.task_id, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.task_id), + D.depends_on, + (SELECT + CASE + WHEN T.command LIKE '%citus_internal.citus_internal_copy_single_shard_placement%' THEN + SUBSTRING(T.command FROM 'citus_internal\.citus_internal_copy_single_shard_placement\((\d+)') + WHEN T.command LIKE '%pg_catalog.citus_move_shard_placement%' THEN + SUBSTRING(T.command FROM 'pg_catalog\.citus_move_shard_placement\((\d+)') + ELSE + T.command + END + FROM pg_dist_background_task T WHERE T.task_id = D.depends_on) +FROM pg_dist_background_task_depend D WHERE job_id in (:job_id) ORDER BY D.task_id, D.depends_on ASC; + +DROP SCHEMA background_rebalance_parallel CASCADE; +TRUNCATE pg_dist_background_job CASCADE; +TRUNCATE pg_dist_background_task CASCADE; +TRUNCATE pg_dist_background_task_depend; +SELECT public.wait_for_resource_cleanup(); +select citus_remove_node('localhost', :worker_3_port); +select citus_remove_node('localhost', :worker_4_port); +select citus_remove_node('localhost', :worker_5_port); +select citus_remove_node('localhost', :worker_6_port); + +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id_cls; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id_cls; diff --git a/src/test/regress/sql/background_task_queue_monitor.sql b/src/test/regress/sql/background_task_queue_monitor.sql index 9f6abb73a..6c6a3b078 100644 --- a/src/test/regress/sql/background_task_queue_monitor.sql +++ b/src/test/regress/sql/background_task_queue_monitor.sql @@ -345,11 +345,11 @@ SELECT pg_reload_conf(); -- if pg_cancel_backend is called on one of the running task PIDs -- task doesn't restart because it's not allowed anymore by the limit. -- node with id 1 can be used only once, unless there are previously running tasks -SELECT pid AS task_id6_pid FROM pg_dist_background_task WHERE task_id IN (:task_id6) \gset -SELECT pg_cancel_backend(:task_id6_pid); -- cancel task_id6 process +SELECT pid AS task_id7_pid FROM pg_dist_background_task WHERE task_id IN (:task_id7) \gset +SELECT pg_cancel_backend(:task_id7_pid); -- cancel task_id7 process -- task goes to only runnable state, not running anymore. -SELECT citus_task_wait(:task_id6, desired_status => 'runnable'); +SELECT citus_task_wait(:task_id7, desired_status => 'runnable'); -- show that cancelled task hasn't restarted because limit doesn't allow it SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task @@ -359,7 +359,7 @@ SELECT job_id, task_id, status, nodes_involved FROM pg_dist_background_task SELECT citus_task_wait(:task_id7, desired_status => 'done'); SELECT citus_task_wait(:task_id8, desired_status => 'done'); -SELECT citus_task_wait(:task_id6, desired_status => 'running'); +SELECT citus_task_wait(:task_id6, desired_status => 'done'); -- show that the 6th task has restarted only after both 6 and 7 are done -- since we have a limit of 1 background task executor per node with id 1